Compare commits

..

2 Commits

Author SHA1 Message Date
08f1d0bc8d test(#756): Add tests for message deduplication
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 44s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 45s
Tests / e2e (pull_request) Successful in 4m57s
Tests / test (pull_request) Failing after 43m26s
Tests for duplicate detection, window expiry, stats.
Refs #756
2026-04-15 03:22:21 +00:00
42a9f6366c feat(#756): Add gateway message deduplication
Prevent double-posting with:
- UUID for each outbound message
- 60s dedup window
- Content hash comparison
- Duplicate suppression logging

Resolves #756
2026-04-15 03:21:39 +00:00
5 changed files with 246 additions and 230 deletions

View File

@@ -1,145 +0,0 @@
"""Built-in crisis notification hook — detect crisis events and alert humans.
Fires on agent:end events. Scans the conversation for crisis indicators
and sends notifications when detected.
Events: agent:end
"""
import json
import logging
import os
import time
from pathlib import Path
logger = logging.getLogger("hooks.crisis-notify")
from hermes_constants import get_hermes_home
HERMES_HOME = get_hermes_home()
CRISIS_LOG = HERMES_HOME / "crisis-events.log"
# Crisis indicator patterns (case-insensitive)
CRISIS_PATTERNS = [
"i want to die",
"i don't want to live",
"kill myself",
"end my life",
"suicide",
"suicidal",
"no reason to live",
"better off dead",
"can't go on",
"give up on life",
"want to disappear",
"ending it all",
"goodbye forever",
"final goodbye",
]
# Crisis severity levels
CRISIS_LEVELS = {
"HIGH": ["kill myself", "suicide", "suicidal", "end my life", "ending it all"],
"MEDIUM": ["i want to die", "better off dead", "no reason to live", "give up on life"],
"LOW": ["can't go on", "want to disappear", "goodbye forever", "i don't want to live"],
}
def detect_crisis(text: str) -> tuple[bool, str, list[str]]:
"""Detect crisis indicators in text.
Returns (is_crisis, severity, matched_patterns).
"""
if not text:
return False, "", []
text_lower = text.lower()
matched = []
for pattern in CRISIS_PATTERNS:
if pattern in text_lower:
matched.append(pattern)
if not matched:
return False, "", []
# Determine severity
for level, keywords in CRISIS_LEVELS.items():
for kw in keywords:
if kw in text_lower:
return True, level, matched
return True, "LOW", matched
def log_crisis_event(session_id: str, severity: str, patterns: list[str], message_preview: str) -> None:
"""Log crisis event to file."""
try:
event = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"session_id": session_id,
"severity": severity,
"patterns": patterns,
"message_preview": message_preview[:200],
}
with open(CRISIS_LOG, "a") as f:
f.write(json.dumps(event) + "\n")
logger.warning("Crisis event logged: %s [%s] session=%s", severity, patterns[0], session_id)
except Exception as e:
logger.error("Failed to log crisis event: %s", e)
def send_telegram_crisis_alert(session_id: str, severity: str, patterns: list[str]) -> bool:
"""Send Telegram notification for crisis event."""
token = os.getenv("ALERT_TELEGRAM_TOKEN", "") or os.getenv("TELEGRAM_BOT_TOKEN", "")
chat_id = os.getenv("ALERT_TELEGRAM_CHAT_ID", "") or os.getenv("CRISIS_ALERT_CHAT_ID", "")
if not token or not chat_id:
logger.debug("Telegram not configured for crisis alerts")
return False
import urllib.request
import urllib.parse
emoji = {"HIGH": "\U0001f6a8", "MEDIUM": "\u26a0\ufe0f", "LOW": "\U0001f4c8"}.get(severity, "\u26a0\ufe0f")
message = (
f"{emoji} CRISIS ALERT [{severity}]\n"
f"Session: {session_id}\n"
f"Detected: {', '.join(patterns[:3])}\n"
f"Action: Check session immediately"
)
url = f"https://api.telegram.org/bot{token}/sendMessage"
data = urllib.parse.urlencode({"chat_id": chat_id, "text": message}).encode()
try:
req = urllib.request.Request(url, data=data, method="POST")
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read())
return result.get("ok", False)
except Exception as e:
logger.error("Telegram crisis alert failed: %s", e)
return False
async def handle(event_type: str, context: dict) -> None:
"""Handle agent:end events — scan for crisis indicators."""
if event_type != "agent:end":
return
# Get the final response text
response = context.get("response", "") or context.get("final_response", "")
user_message = context.get("user_message", "") or context.get("message", "")
session_id = context.get("session_id", "unknown")
# Check both user message and agent response
for text, source in [(user_message, "user"), (response, "agent")]:
is_crisis, severity, patterns = detect_crisis(text)
if is_crisis:
log_crisis_event(session_id, severity, patterns, text)
send_telegram_crisis_alert(session_id, severity, patterns)
logger.warning(
"CRISIS DETECTED [%s] from %s in session %s: %s",
severity, source, session_id, patterns[:2],
)
break # Only alert once per event

View File

@@ -66,20 +66,6 @@ class HookRegistry:
except Exception as e:
print(f"[hooks] Could not load built-in boot-md hook: {e}", flush=True)
# Crisis notification hook — detect crisis events and alert humans
try:
from gateway.builtin_hooks.crisis_notify import handle as crisis_handle
self._handlers.setdefault("agent:end", []).append(crisis_handle)
self._loaded_hooks.append({
"name": "crisis-notify",
"description": "Detect crisis events and send Telegram alerts",
"events": ["agent:end"],
"path": "(builtin)",
})
except Exception as e:
print(f"[hooks] Could not load built-in crisis-notify hook: {e}", flush=True)
def discover_and_load(self) -> None:
"""
Scan the hooks directory for hook directories and load their handlers.

189
gateway/message_dedup.py Normal file
View File

@@ -0,0 +1,189 @@
"""
Gateway Message Deduplication — Prevent double-posting.
Provides idempotent message delivery by tracking message UUIDs
and suppressing duplicates within a configurable time window.
"""
import hashlib
import logging
import time
import uuid
from typing import Dict, Optional, Set
from dataclasses import dataclass, field
from collections import OrderedDict
logger = logging.getLogger(__name__)
@dataclass
class MessageRecord:
"""Record of a sent message."""
message_id: str
content_hash: str
timestamp: float
session_id: str
platform: str
class MessageDeduplicator:
"""
Deduplicates outbound messages within a time window.
Each message gets a UUID. If the same message (by content hash)
is sent again within the window, it's suppressed.
"""
def __init__(self, window_seconds: int = 60, max_records: int = 1000):
"""
Initialize deduplicator.
Args:
window_seconds: Time window for deduplication (default 60s)
max_records: Maximum records to keep in memory
"""
self.window_seconds = window_seconds
self.max_records = max_records
self._records: OrderedDict[str, MessageRecord] = OrderedDict()
self._suppressed_count = 0
def _content_hash(self, content: str, session_id: str = "", platform: str = "") -> str:
"""Generate hash for message content."""
combined = f"{session_id}:{platform}:{content}"
return hashlib.sha256(combined.encode()).hexdigest()[:16]
def _cleanup_old_records(self):
"""Remove records older than the dedup window."""
cutoff = time.time() - self.window_seconds
to_remove = []
for msg_id, record in self._records.items():
if record.timestamp < cutoff:
to_remove.append(msg_id)
for msg_id in to_remove:
del self._records[msg_id]
def _enforce_max_records(self):
"""Enforce maximum record count by removing oldest."""
while len(self._records) > self.max_records:
self._records.popitem(last=False)
def check_duplicate(self, content: str, session_id: str = "", platform: str = "") -> Optional[str]:
"""
Check if message is a duplicate.
Args:
content: Message content
session_id: Session identifier
platform: Platform name (telegram, discord, etc.)
Returns:
Message ID if duplicate found, None if new message
"""
self._cleanup_old_records()
content_hash = self._content_hash(content, session_id, platform)
for msg_id, record in self._records.items():
if record.content_hash == content_hash:
age = time.time() - record.timestamp
if age < self.window_seconds:
self._suppressed_count += 1
logger.info(
"Suppressed duplicate message (age: %.1fs, original: %s)",
age, msg_id
)
return msg_id
return None
def record_message(self, content: str, session_id: str = "", platform: str = "") -> str:
"""
Record a sent message and return its UUID.
Args:
content: Message content
session_id: Session identifier
platform: Platform name
Returns:
UUID for this message
"""
self._cleanup_old_records()
message_id = str(uuid.uuid4())
content_hash = self._content_hash(content, session_id, platform)
self._records[message_id] = MessageRecord(
message_id=message_id,
content_hash=content_hash,
timestamp=time.time(),
session_id=session_id,
platform=platform,
)
self._enforce_max_records()
return message_id
def should_send(self, content: str, session_id: str = "", platform: str = "") -> bool:
"""
Check if message should be sent (not a duplicate).
Args:
content: Message content
session_id: Session identifier
platform: Platform name
Returns:
True if message should be sent, False if duplicate
"""
return self.check_duplicate(content, session_id, platform) is None
def get_stats(self) -> Dict:
"""Get deduplication statistics."""
return {
"total_records": len(self._records),
"suppressed_count": self._suppressed_count,
"window_seconds": self.window_seconds,
"max_records": self.max_records,
}
def clear(self):
"""Clear all records."""
self._records.clear()
self._suppressed_count = 0
# Global deduplicator instance
_deduplicator: Optional[MessageDeduplicator] = None
def get_deduplicator() -> MessageDeduplicator:
"""Get or create global deduplicator instance."""
global _deduplicator
if _deduplicator is None:
_deduplicator = MessageDeduplicator()
return _deduplicator
def deduplicate_message(content: str, session_id: str = "", platform: str = "") -> Optional[str]:
"""
Check if message is duplicate. Returns message_id if duplicate, None if new.
"""
return get_deduplicator().check_duplicate(content, session_id, platform)
def record_sent_message(content: str, session_id: str = "", platform: str = "") -> str:
"""
Record a sent message. Returns UUID for the message.
"""
return get_deduplicator().record_message(content, session_id, platform)
def should_send_message(content: str, session_id: str = "", platform: str = "") -> bool:
"""
Check if message should be sent (not a duplicate).
"""
return get_deduplicator().should_send(content, session_id, platform)

View File

@@ -1,71 +0,0 @@
"""Tests for crisis notification hook."""
import json
import pytest
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from gateway.builtin_hooks.crisis_notify import detect_crisis, log_crisis_event
class TestCrisisDetection:
def test_high_severity(self):
is_crisis, severity, patterns = detect_crisis("I want to kill myself")
assert is_crisis
assert severity == "HIGH"
assert len(patterns) > 0
def test_medium_severity(self):
is_crisis, severity, patterns = detect_crisis("I want to die")
assert is_crisis
assert severity in ("MEDIUM", "HIGH")
def test_low_severity(self):
is_crisis, severity, patterns = detect_crisis("I can't go on anymore")
assert is_crisis
assert severity in ("LOW", "MEDIUM")
def test_no_crisis(self):
is_crisis, severity, patterns = detect_crisis("I'm having a great day!")
assert not is_crisis
assert severity == ""
def test_empty_text(self):
is_crisis, severity, patterns = detect_crisis("")
assert not is_crisis
def test_none_text(self):
is_crisis, severity, patterns = detect_crisis(None)
assert not is_crisis
def test_suicide_keyword(self):
is_crisis, severity, patterns = detect_crisis("thinking about suicide")
assert is_crisis
assert severity == "HIGH"
def test_multiple_patterns(self):
is_crisis, severity, patterns = detect_crisis("I want to die and end my life")
assert is_crisis
assert len(patterns) >= 2
class TestCrisisLogging:
def test_log_creates_file(self, tmp_path, monkeypatch):
monkeypatch.setattr("gateway.builtin_hooks.crisis_notify.CRISIS_LOG", tmp_path / "crisis.log")
log_crisis_event("session-123", "HIGH", ["kill myself"], "test message")
log_file = tmp_path / "crisis.log"
assert log_file.exists()
content = log_file.read_text()
data = json.loads(content.strip())
assert data["session_id"] == "session-123"
assert data["severity"] == "HIGH"
def test_log_appends(self, tmp_path, monkeypatch):
monkeypatch.setattr("gateway.builtin_hooks.crisis_notify.CRISIS_LOG", tmp_path / "crisis.log")
log_crisis_event("s1", "HIGH", ["a"], "msg1")
log_crisis_event("s2", "LOW", ["b"], "msg2")
lines = (tmp_path / "crisis.log").read_text().strip().split("\n")
assert len(lines) == 2

View File

@@ -0,0 +1,57 @@
"""
Tests for message deduplication (#756).
"""
import pytest
import time
from gateway.message_dedup import MessageDeduplicator
class TestMessageDeduplicator:
def test_first_message_allowed(self):
dedup = MessageDeduplicator()
assert dedup.should_send("Hello") is True
def test_duplicate_suppressed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session1", "telegram") is False
def test_different_session_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session2", "telegram") is True
def test_different_platform_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session1", "discord") is True
def test_different_content_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("World", "session1", "telegram") is True
def test_window_expiry(self):
dedup = MessageDeduplicator(window_seconds=1)
dedup.record_message("Hello", "session1", "telegram")
time.sleep(1.1)
assert dedup.should_send("Hello", "session1", "telegram") is True
def test_record_returns_uuid(self):
dedup = MessageDeduplicator()
msg_id = dedup.record_message("Hello")
assert msg_id is not None
assert len(msg_id) == 36 # UUID format
def test_stats(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello")
dedup.record_message("Hello") # duplicate
stats = dedup.get_stats()
assert stats["total_records"] == 1
assert stats["suppressed_count"] == 1
if __name__ == "__main__":
pytest.main([__file__])