fix: prevent unbounded growth of _seen_uids in EmailAdapter (#3490)
EmailAdapter._seen_uids accumulates every IMAP UID ever seen but never removes any. A long-running gateway processing a high-volume inbox would leak memory indefinitely — thousands of integers per day. IMAP UIDs are monotonically increasing integers, so old UIDs are safe to drop: new messages always have higher UIDs, and the IMAP UNSEEN flag already prevents re-delivery regardless of our local tracking. Fix adds _trim_seen_uids() which keeps only the most recent 1000 UIDs (half of the 2000-entry cap) when the set grows too large. Called automatically during connect() and after each fetch cycle. Co-authored-by: memosr.eth <96793918+memosr@users.noreply.github.com>
This commit is contained in:
@@ -213,6 +213,7 @@ class EmailAdapter(BasePlatformAdapter):
|
||||
|
||||
# Track message IDs we've already processed to avoid duplicates
|
||||
self._seen_uids: set = set()
|
||||
self._seen_uids_max: int = 2000 # cap to prevent unbounded memory growth
|
||||
self._poll_task: Optional[asyncio.Task] = None
|
||||
|
||||
# Map chat_id (sender email) -> last subject + message-id for threading
|
||||
@@ -220,6 +221,26 @@ class EmailAdapter(BasePlatformAdapter):
|
||||
|
||||
logger.info("[Email] Adapter initialized for %s", self._address)
|
||||
|
||||
def _trim_seen_uids(self) -> None:
|
||||
"""Keep only the most recent UIDs to prevent unbounded memory growth.
|
||||
|
||||
IMAP UIDs are monotonically increasing integers. When the set grows
|
||||
beyond the cap, we keep only the highest half — old UIDs are safe to
|
||||
drop because new messages always have higher UIDs and IMAP's UNSEEN
|
||||
flag prevents re-delivery regardless.
|
||||
"""
|
||||
if len(self._seen_uids) <= self._seen_uids_max:
|
||||
return
|
||||
try:
|
||||
# UIDs are bytes like b'1234' — sort numerically and keep top half
|
||||
sorted_uids = sorted(self._seen_uids, key=lambda u: int(u))
|
||||
keep = self._seen_uids_max // 2
|
||||
self._seen_uids = set(sorted_uids[-keep:])
|
||||
logger.debug("[Email] Trimmed seen UIDs to %d entries", len(self._seen_uids))
|
||||
except (ValueError, TypeError):
|
||||
# Fallback: just clear old entries if sort fails
|
||||
self._seen_uids = set(list(self._seen_uids)[-self._seen_uids_max // 2:])
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to the IMAP server and start polling for new messages."""
|
||||
try:
|
||||
@@ -232,6 +253,8 @@ class EmailAdapter(BasePlatformAdapter):
|
||||
if status == "OK" and data and data[0]:
|
||||
for uid in data[0].split():
|
||||
self._seen_uids.add(uid)
|
||||
# Keep only the most recent UIDs to prevent unbounded growth
|
||||
self._trim_seen_uids()
|
||||
imap.logout()
|
||||
logger.info("[Email] IMAP connection test passed. %d existing messages skipped.", len(self._seen_uids))
|
||||
except Exception as e:
|
||||
@@ -302,6 +325,9 @@ class EmailAdapter(BasePlatformAdapter):
|
||||
if uid in self._seen_uids:
|
||||
continue
|
||||
self._seen_uids.add(uid)
|
||||
# Trim periodically to prevent unbounded memory growth
|
||||
if len(self._seen_uids) > self._seen_uids_max:
|
||||
self._trim_seen_uids()
|
||||
|
||||
status, msg_data = imap.uid("fetch", uid, "(RFC822)")
|
||||
if status != "OK":
|
||||
|
||||
Reference in New Issue
Block a user