Compare commits

..

2 Commits

Author SHA1 Message Date
08f1d0bc8d test(#756): Add tests for message deduplication
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 44s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 45s
Tests / e2e (pull_request) Successful in 4m57s
Tests / test (pull_request) Failing after 43m26s
Tests for duplicate detection, window expiry, stats.
Refs #756
2026-04-15 03:22:21 +00:00
42a9f6366c feat(#756): Add gateway message deduplication
Prevent double-posting with:
- UUID for each outbound message
- 60s dedup window
- Content hash comparison
- Duplicate suppression logging

Resolves #756
2026-04-15 03:21:39 +00:00
3 changed files with 249 additions and 163 deletions

189
gateway/message_dedup.py Normal file
View File

@@ -0,0 +1,189 @@
"""
Gateway Message Deduplication — Prevent double-posting.
Provides idempotent message delivery by tracking message UUIDs
and suppressing duplicates within a configurable time window.
"""
import hashlib
import logging
import time
import uuid
from typing import Dict, Optional, Set
from dataclasses import dataclass, field
from collections import OrderedDict
logger = logging.getLogger(__name__)
@dataclass
class MessageRecord:
"""Record of a sent message."""
message_id: str
content_hash: str
timestamp: float
session_id: str
platform: str
class MessageDeduplicator:
"""
Deduplicates outbound messages within a time window.
Each message gets a UUID. If the same message (by content hash)
is sent again within the window, it's suppressed.
"""
def __init__(self, window_seconds: int = 60, max_records: int = 1000):
"""
Initialize deduplicator.
Args:
window_seconds: Time window for deduplication (default 60s)
max_records: Maximum records to keep in memory
"""
self.window_seconds = window_seconds
self.max_records = max_records
self._records: OrderedDict[str, MessageRecord] = OrderedDict()
self._suppressed_count = 0
def _content_hash(self, content: str, session_id: str = "", platform: str = "") -> str:
"""Generate hash for message content."""
combined = f"{session_id}:{platform}:{content}"
return hashlib.sha256(combined.encode()).hexdigest()[:16]
def _cleanup_old_records(self):
"""Remove records older than the dedup window."""
cutoff = time.time() - self.window_seconds
to_remove = []
for msg_id, record in self._records.items():
if record.timestamp < cutoff:
to_remove.append(msg_id)
for msg_id in to_remove:
del self._records[msg_id]
def _enforce_max_records(self):
"""Enforce maximum record count by removing oldest."""
while len(self._records) > self.max_records:
self._records.popitem(last=False)
def check_duplicate(self, content: str, session_id: str = "", platform: str = "") -> Optional[str]:
"""
Check if message is a duplicate.
Args:
content: Message content
session_id: Session identifier
platform: Platform name (telegram, discord, etc.)
Returns:
Message ID if duplicate found, None if new message
"""
self._cleanup_old_records()
content_hash = self._content_hash(content, session_id, platform)
for msg_id, record in self._records.items():
if record.content_hash == content_hash:
age = time.time() - record.timestamp
if age < self.window_seconds:
self._suppressed_count += 1
logger.info(
"Suppressed duplicate message (age: %.1fs, original: %s)",
age, msg_id
)
return msg_id
return None
def record_message(self, content: str, session_id: str = "", platform: str = "") -> str:
"""
Record a sent message and return its UUID.
Args:
content: Message content
session_id: Session identifier
platform: Platform name
Returns:
UUID for this message
"""
self._cleanup_old_records()
message_id = str(uuid.uuid4())
content_hash = self._content_hash(content, session_id, platform)
self._records[message_id] = MessageRecord(
message_id=message_id,
content_hash=content_hash,
timestamp=time.time(),
session_id=session_id,
platform=platform,
)
self._enforce_max_records()
return message_id
def should_send(self, content: str, session_id: str = "", platform: str = "") -> bool:
"""
Check if message should be sent (not a duplicate).
Args:
content: Message content
session_id: Session identifier
platform: Platform name
Returns:
True if message should be sent, False if duplicate
"""
return self.check_duplicate(content, session_id, platform) is None
def get_stats(self) -> Dict:
"""Get deduplication statistics."""
return {
"total_records": len(self._records),
"suppressed_count": self._suppressed_count,
"window_seconds": self.window_seconds,
"max_records": self.max_records,
}
def clear(self):
"""Clear all records."""
self._records.clear()
self._suppressed_count = 0
# Global deduplicator instance
_deduplicator: Optional[MessageDeduplicator] = None
def get_deduplicator() -> MessageDeduplicator:
"""Get or create global deduplicator instance."""
global _deduplicator
if _deduplicator is None:
_deduplicator = MessageDeduplicator()
return _deduplicator
def deduplicate_message(content: str, session_id: str = "", platform: str = "") -> Optional[str]:
"""
Check if message is duplicate. Returns message_id if duplicate, None if new.
"""
return get_deduplicator().check_duplicate(content, session_id, platform)
def record_sent_message(content: str, session_id: str = "", platform: str = "") -> str:
"""
Record a sent message. Returns UUID for the message.
"""
return get_deduplicator().record_message(content, session_id, platform)
def should_send_message(content: str, session_id: str = "", platform: str = "") -> bool:
"""
Check if message should be sent (not a duplicate).
"""
return get_deduplicator().should_send(content, session_id, platform)

View File

@@ -0,0 +1,57 @@
"""
Tests for message deduplication (#756).
"""
import pytest
import time
from gateway.message_dedup import MessageDeduplicator
class TestMessageDeduplicator:
def test_first_message_allowed(self):
dedup = MessageDeduplicator()
assert dedup.should_send("Hello") is True
def test_duplicate_suppressed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session1", "telegram") is False
def test_different_session_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session2", "telegram") is True
def test_different_platform_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("Hello", "session1", "discord") is True
def test_different_content_allowed(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello", "session1", "telegram")
assert dedup.should_send("World", "session1", "telegram") is True
def test_window_expiry(self):
dedup = MessageDeduplicator(window_seconds=1)
dedup.record_message("Hello", "session1", "telegram")
time.sleep(1.1)
assert dedup.should_send("Hello", "session1", "telegram") is True
def test_record_returns_uuid(self):
dedup = MessageDeduplicator()
msg_id = dedup.record_message("Hello")
assert msg_id is not None
assert len(msg_id) == 36 # UUID format
def test_stats(self):
dedup = MessageDeduplicator()
dedup.record_message("Hello")
dedup.record_message("Hello") # duplicate
stats = dedup.get_stats()
assert stats["total_records"] == 1
assert stats["suppressed_count"] == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -18,7 +18,6 @@ Actions:
delete -- Remove a user skill entirely
write_file -- Add/overwrite a supporting file (reference, template, script, asset)
remove_file-- Remove a supporting file from a user skill
validate -- Validate a skill: check structure, sizes, and all supporting files
Directory layout for user skills:
~/.hermes/skills/
@@ -175,162 +174,6 @@ def _validate_frontmatter(content: str) -> Optional[str]:
return None
def _validate_skill(name: str) -> Dict[str, Any]:
"""
Validate an existing skill: check SKILL.md structure, file sizes,
and all supporting files. Returns detailed results with specific
file paths, error descriptions, and suggested fixes.
Addresses:
- #623: Checks file size before reading; sets reasonable limits
- #624: Error messages include file path, specific error, and suggested fix
- #626: Validation feedback includes actionable suggestions
"""
existing = _find_skill(name)
if not existing:
return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."}
skill_dir = existing["path"]
skill_md = skill_dir / "SKILL.md"
errors = []
warnings = []
# --- Check SKILL.md existence ---
if not skill_md.exists():
errors.append({
"file": "SKILL.md",
"error": "SKILL.md is missing from the skill directory.",
"suggestion": f"Create SKILL.md with: skill_manage(action='create', name='{name}', content='---\\nname: {name}\\ndescription: ...\\n---\\nYour instructions here.')",
})
# Can't continue validation without SKILL.md
return {
"success": False,
"skill": name,
"path": str(skill_dir),
"errors": errors,
"warnings": warnings,
}
# --- Size check BEFORE reading (issue #623) ---
skill_md_bytes = skill_md.stat().st_size
if skill_md_bytes > MAX_SKILL_CONTENT_CHARS:
errors.append({
"file": "SKILL.md",
"error": (
f"SKILL.md is {skill_md_bytes:,} bytes "
f"(limit: {MAX_SKILL_CONTENT_CHARS:,} chars). "
f"Reading was skipped to avoid memory issues."
),
"suggestion": (
"Split large content into supporting files under references/ or templates/. "
"Keep SKILL.md focused on the core instructions."
),
})
# Can't validate frontmatter if file is too large
else:
# --- Validate SKILL.md content ---
try:
content = skill_md.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError) as e:
errors.append({
"file": "SKILL.md",
"error": f"Cannot read SKILL.md: {e}",
"suggestion": "Check file encoding (should be UTF-8) and file permissions.",
})
content = None
if content is not None:
# Frontmatter validation
fm_error = _validate_frontmatter(content)
if fm_error:
errors.append({
"file": "SKILL.md",
"error": fm_error,
"suggestion": "Ensure SKILL.md starts with '---', has valid YAML frontmatter with 'name' and 'description' fields, and closes with '---'.",
})
# Content size warning (80% threshold)
size_pct = len(content) / MAX_SKILL_CONTENT_CHARS * 100
if size_pct > 80:
warnings.append({
"file": "SKILL.md",
"message": (
f"SKILL.md is at {size_pct:.0f}% of size limit "
f"({len(content):,} / {MAX_SKILL_CONTENT_CHARS:,} chars)."
),
"suggestion": "Consider extracting reference material to supporting files.",
})
# --- Validate supporting files ---
for subdir in ALLOWED_SUBDIRS:
subdir_path = skill_dir / subdir
if not subdir_path.exists():
continue
for f in subdir_path.rglob("*"):
if not f.is_file():
continue
rel_path = str(f.relative_to(skill_dir))
file_bytes = f.stat().st_size
# Size check before reading (issue #623)
if file_bytes > MAX_SKILL_FILE_BYTES:
errors.append({
"file": rel_path,
"error": (
f"File is {file_bytes:,} bytes "
f"(limit: {MAX_SKILL_FILE_BYTES:,} bytes / 1 MiB)."
),
"suggestion": "Split into smaller files or compress the content.",
})
continue
# Check for common issues in markdown files
if f.suffix in (".md", ".txt"):
try:
f_content = f.read_text(encoding="utf-8")
if not f_content.strip():
warnings.append({
"file": rel_path,
"message": "File is empty.",
"suggestion": "Add content or remove the file with skill_manage(action='remove_file').",
})
except (OSError, UnicodeDecodeError) as e:
errors.append({
"file": rel_path,
"error": f"Cannot read file: {e}",
"suggestion": "Check file encoding (should be UTF-8) and permissions.",
})
# Size warning for large files (80% threshold)
size_pct = file_bytes / MAX_SKILL_FILE_BYTES * 100
if size_pct > 80:
warnings.append({
"file": rel_path,
"message": f"File is at {size_pct:.0f}% of size limit ({file_bytes:,} / {MAX_SKILL_FILE_BYTES:,} bytes).",
"suggestion": "Consider splitting or compressing.",
})
valid = len(errors) == 0
result = {
"success": True,
"valid": valid,
"skill": name,
"path": str(skill_dir),
"errors": errors,
"warnings": warnings,
"summary": (
f"Skill '{name}' is {'valid' if valid else 'INVALID'}"
f"{len(errors)} error(s), {len(warnings)} warning(s)."
),
}
if valid:
result["hint"] = (
"Tip: Run validate periodically after edits to catch issues early. "
"Use skill_manage(action='edit') or skill_manage(action='patch') to fix problems."
)
return result
def _validate_content_size(content: str, label: str = "SKILL.md") -> Optional[str]:
"""Check that content doesn't exceed the character limit for agent writes.
@@ -790,11 +633,8 @@ def skill_manage(
return tool_error("file_path is required for 'remove_file'.", success=False)
result = _remove_file(name, file_path)
elif action == "validate":
result = _validate_skill(name)
else:
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file, validate"}
result = {"success": False, "error": f"Unknown action '{action}'. Use: create, edit, patch, delete, write_file, remove_file"}
if result.get("success"):
try:
@@ -819,7 +659,7 @@ SKILL_MANAGE_SCHEMA = {
"Actions: create (full SKILL.md + optional category), "
"patch (old_string/new_string — preferred for fixes), "
"edit (full SKILL.md rewrite — major overhauls only), "
"delete, write_file, remove_file, validate (check skill health).\n\n"
"delete, write_file, remove_file.\n\n"
"Create when: complex task succeeded (5+ calls), errors overcome, "
"user-corrected approach worked, non-trivial workflow discovered, "
"or user asks you to remember a procedure.\n"
@@ -836,7 +676,7 @@ SKILL_MANAGE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file", "validate"],
"enum": ["create", "patch", "edit", "delete", "write_file", "remove_file"],
"description": "The action to perform."
},
"name": {