Compare commits

..

6 Commits

Author SHA1 Message Date
7162e4d37a test: add cross-tier deduplication test suite (#254)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 41s
2026-04-13 22:23:55 +00:00
75d46411ad fix: remove redundant loop in on_memory_write dedup check (#254)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m32s
2026-04-13 22:23:24 +00:00
e4574a17fb docs: add memory tier ownership documentation (#254)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 42s
2026-04-13 22:07:58 +00:00
0ed58ab618 feat: add run_dedup_scan to MemoryManager, pass entries to on_memory_write (#254) 2026-04-13 22:07:32 +00:00
a0bce06f07 feat: add dedup-aware on_memory_write and dedup action to fact_store (#254) 2026-04-13 22:07:07 +00:00
56ca70516a feat: add cross-tier deduplication module (#254) 2026-04-13 22:06:30 +00:00
8 changed files with 573 additions and 665 deletions

View File

@@ -309,7 +309,19 @@ class MemoryManager:
"""Notify external providers when the built-in memory tool writes.
Skips the builtin provider itself (it's the source of the write).
Passes current MEMORY.md entries for cross-tier dedup checking.
"""
# Collect current memory entries for dedup context
memory_entries = []
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store") and provider._store:
try:
store = provider._store
if hasattr(store, "get_all_entries"):
memory_entries = store.get_all_entries(target)
except Exception:
pass
for provider in self._providers:
if provider.name == "builtin":
continue
@@ -321,6 +333,54 @@ class MemoryManager:
provider.name, e,
)
def run_dedup_scan(self) -> dict:
"""Run cross-tier deduplication scan across all memory providers.
Returns a report dict with duplicates found and actions taken.
"""
report = {"status": "ok", "duplicates": 0, "actions": []}
# Collect MEMORY.md entries
memory_entries = []
builtin_store = None
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store"):
builtin_store = provider._store
if builtin_store:
try:
entries = builtin_store.get_all_entries("memory")
memory_entries = entries if entries else []
except Exception:
pass
if not memory_entries:
report["status"] = "no_memory_entries"
return report
# Check each external provider for duplicates
for provider in self._providers:
if provider.name == "builtin":
continue
if not hasattr(provider, "_store") or not provider._store:
continue
try:
from plugins.memory.holographic.dedup import scan_cross_tier_duplicates
all_facts = provider._store.list_facts(min_trust=0.0, limit=1000)
dup_report = scan_cross_tier_duplicates(memory_entries, all_facts)
report["duplicates"] += dup_report.duplicates_found
if dup_report.duplicates_found > 0:
from plugins.memory.holographic.dedup import resolve_duplicates
cleaned = resolve_duplicates(dup_report, memory_entries, provider._store)
removed = len(memory_entries) - len(cleaned)
report["actions"].append(
f"{provider.name}: {dup_report.duplicates_found} duplicates, "
f"{removed} MEMORY.md entries removed"
)
except Exception as e:
logger.warning("Dedup scan failed for provider '%s': %s", provider.name, e)
return report
def on_delegation(self, task: str, result: str, *,
child_session_id: str = "", **kwargs) -> None:
"""Notify all providers that a subagent completed."""

View File

@@ -1,18 +0,0 @@
{
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}

View File

@@ -1,551 +0,0 @@
"""
VPS Agent Dispatch Worker for Hermes Cron System
This module provides a dispatch worker that SSHs into remote VPS machines
and runs hermes commands. It ensures that:
1. Remote dispatch only counts as success when the remote hermes command actually launches
2. Stale per-agent hermes binary paths are configurable/validated before queue drain
3. Failed remote launches remain in the queue (or are marked failed) instead of being reported as OK
"""
import json
import logging
import os
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class DispatchStatus(Enum):
"""Status of a dispatch operation."""
PENDING = "pending"
VALIDATING = "validating"
DISPATCHING = "dispatching"
SUCCESS = "success"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class DispatchResult:
"""Result of a dispatch operation."""
status: DispatchStatus
message: str
exit_code: Optional[int] = None
stdout: Optional[str] = None
stderr: Optional[str] = None
execution_time: Optional[float] = None
hermes_path: Optional[str] = None
validated: bool = False
class HermesPathValidator:
"""Validates hermes binary paths on remote VPS machines."""
def __init__(self, ssh_key_path: Optional[str] = None):
self.ssh_key_path = ssh_key_path or os.path.expanduser("~/.ssh/id_rsa")
self.timeout = 30 # SSH timeout in seconds
def validate_hermes_path(self, host: str, hermes_path: str,
username: str = "root") -> DispatchResult:
"""
Validate that the hermes binary exists and is executable on the remote host.
Args:
host: Remote host IP or hostname
hermes_path: Path to hermes binary on remote host
username: SSH username
Returns:
DispatchResult with validation status
"""
start_time = time.time()
# Build SSH command to check hermes binary
ssh_cmd = [
"ssh",
"-i", self.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
"-o", "BatchMode=yes",
f"{username}@{host}",
f"test -x {hermes_path} && echo 'VALID' || echo 'INVALID'"
]
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.timeout
)
execution_time = time.time() - start_time
if result.returncode == 0 and "VALID" in result.stdout:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Hermes binary validated at {hermes_path}",
exit_code=0,
execution_time=execution_time,
hermes_path=hermes_path,
validated=True
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Hermes binary not found or not executable: {hermes_path}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=hermes_path,
validated=False
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"SSH timeout validating hermes path on {host}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error validating hermes path: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=hermes_path,
validated=False
)
class VPSAgentDispatcher:
"""Dispatches hermes commands to remote VPS agents."""
def __init__(self, config_path: Optional[str] = None):
self.config_path = config_path or os.path.expanduser("~/.hermes/dispatch_config.json")
self.validator = HermesPathValidator()
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""Load dispatch configuration."""
try:
if os.path.exists(self.config_path):
with open(self.config_path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch config: {e}")
# Default configuration
return {
"agents": {
"ezra": {
"host": "143.198.27.163",
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
"username": "root"
},
"timmy": {
"host": "timmy",
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
"username": "root"
}
},
"validation_timeout": 30,
"command_timeout": 300,
"max_retries": 2,
"retry_delay": 5
}
def save_config(self):
"""Save dispatch configuration."""
try:
config_dir = Path(self.config_path).parent
config_dir.mkdir(parents=True, exist_ok=True)
with open(self.config_path, 'w') as f:
json.dump(self.config, f, indent=2)
# Set secure permissions
os.chmod(self.config_path, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch config: {e}")
def get_agent_config(self, agent_name: str) -> Optional[Dict[str, Any]]:
"""Get configuration for a specific agent."""
return self.config.get("agents", {}).get(agent_name)
def update_agent_config(self, agent_name: str, host: str, hermes_path: str,
username: str = "root"):
"""Update configuration for a specific agent."""
if "agents" not in self.config:
self.config["agents"] = {}
self.config["agents"][agent_name] = {
"host": host,
"hermes_path": hermes_path,
"username": username
}
self.save_config()
def validate_agent(self, agent_name: str) -> DispatchResult:
"""Validate that an agent's hermes binary is accessible."""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
return self.validator.validate_hermes_path(
host=agent_config["host"],
hermes_path=agent_config["hermes_path"],
username=agent_config.get("username", "root")
)
def dispatch_command(self, agent_name: str, command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
command: Command to execute
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Validate hermes path if requested
if validate_first:
validation_result = self.validate_agent(agent_name)
if validation_result.status != DispatchStatus.SUCCESS:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Validation failed: {validation_result.message}",
hermes_path=agent_config["hermes_path"],
validated=False
)
# Build SSH command to execute hermes command
ssh_cmd = [
"ssh",
"-i", self.validator.ssh_key_path,
"-o", "StrictHostKeyChecking=no",
"-o", "ConnectTimeout=10",
f"{agent_config.get('username', 'root')}@{agent_config['host']}",
f"cd /root/wizards/{agent_name}/hermes-agent && source venv/bin/activate && {command}"
]
start_time = time.time()
try:
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=self.config.get("command_timeout", 300)
)
execution_time = time.time() - start_time
if result.returncode == 0:
return DispatchResult(
status=DispatchStatus.SUCCESS,
message=f"Command executed successfully on {agent_name}",
exit_code=0,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
else:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command failed on {agent_name}: {result.stderr}",
exit_code=result.returncode,
stdout=result.stdout,
stderr=result.stderr,
execution_time=execution_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except subprocess.TimeoutExpired:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Command timeout on {agent_name}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
except Exception as e:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Error executing command on {agent_name}: {str(e)}",
execution_time=time.time() - start_time,
hermes_path=agent_config["hermes_path"],
validated=validate_first
)
def dispatch_hermes_command(self, agent_name: str, hermes_command: str,
validate_first: bool = True) -> DispatchResult:
"""
Dispatch a hermes command to a remote VPS agent.
Args:
agent_name: Name of the agent to dispatch to
hermes_command: Hermes command to execute (e.g., "hermes cron list")
validate_first: Whether to validate hermes path before dispatching
Returns:
DispatchResult with execution status
"""
agent_config = self.get_agent_config(agent_name)
if not agent_config:
return DispatchResult(
status=DispatchStatus.FAILED,
message=f"Agent configuration not found: {agent_name}"
)
# Build full hermes command
full_command = f"{agent_config['hermes_path']} {hermes_command}"
return self.dispatch_command(agent_name, full_command, validate_first)
class DispatchQueue:
"""Queue for managing dispatch operations."""
def __init__(self, queue_file: Optional[str] = None):
self.queue_file = queue_file or os.path.expanduser("~/.hermes/dispatch_queue.json")
self.queue: List[Dict[str, Any]] = self._load_queue()
def _load_queue(self) -> List[Dict[str, Any]]:
"""Load queue from file."""
try:
if os.path.exists(self.queue_file):
with open(self.queue_file, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load dispatch queue: {e}")
return []
def save_queue(self):
"""Save queue to file."""
try:
queue_dir = Path(self.queue_file).parent
queue_dir.mkdir(parents=True, exist_ok=True)
with open(self.queue_file, 'w') as f:
json.dump(self.queue, f, indent=2)
# Set secure permissions
os.chmod(self.queue_file, 0o600)
except Exception as e:
logger.error(f"Failed to save dispatch queue: {e}")
def add_item(self, agent_name: str, command: str, priority: int = 0,
max_retries: int = 3) -> str:
"""
Add an item to the dispatch queue.
Returns:
Queue item ID
"""
item_id = f"dispatch_{int(time.time())}_{len(self.queue)}"
item = {
"id": item_id,
"agent_name": agent_name,
"command": command,
"priority": priority,
"max_retries": max_retries,
"retry_count": 0,
"status": DispatchStatus.PENDING.value,
"created_at": time.time(),
"last_attempt": None,
"result": None
}
self.queue.append(item)
self.save_queue()
return item_id
def get_next_item(self) -> Optional[Dict[str, Any]]:
"""Get the next item from the queue (highest priority, oldest first)."""
if not self.queue:
return None
# Sort by priority (descending) and created_at (ascending)
sorted_queue = sorted(
self.queue,
key=lambda x: (-x.get("priority", 0), x.get("created_at", 0))
)
# Find first pending item
for item in sorted_queue:
if item.get("status") == DispatchStatus.PENDING.value:
return item
return None
def update_item(self, item_id: str, status: DispatchStatus,
result: Optional[DispatchResult] = None):
"""Update a queue item."""
for item in self.queue:
if item.get("id") == item_id:
item["status"] = status.value
item["last_attempt"] = time.time()
if result:
item["result"] = {
"status": result.status.value,
"message": result.message,
"exit_code": result.exit_code,
"stdout": result.stdout,
"stderr": result.stderr,
"execution_time": result.execution_time,
"hermes_path": result.hermes_path,
"validated": result.validated
}
# Update retry count if failed
if status == DispatchStatus.FAILED:
item["retry_count"] = item.get("retry_count", 0) + 1
self.save_queue()
break
def remove_item(self, item_id: str):
"""Remove an item from the queue."""
self.queue = [item for item in self.queue if item.get("id") != item_id]
self.save_queue()
def get_failed_items(self) -> List[Dict[str, Any]]:
"""Get all failed items that can be retried."""
return [
item for item in self.queue
if item.get("status") == DispatchStatus.FAILED.value
and item.get("retry_count", 0) < item.get("max_retries", 3)
]
def get_stats(self) -> Dict[str, Any]:
"""Get queue statistics."""
total = len(self.queue)
pending = sum(1 for item in self.queue if item.get("status") == DispatchStatus.PENDING.value)
success = sum(1 for item in self.queue if item.get("status") == DispatchStatus.SUCCESS.value)
failed = sum(1 for item in self.queue if item.get("status") == DispatchStatus.FAILED.value)
return {
"total": total,
"pending": pending,
"success": success,
"failed": failed,
"retryable": len(self.get_failed_items())
}
def process_dispatch_queue(dispatcher: VPSAgentDispatcher,
queue: DispatchQueue,
batch_size: int = 5) -> Dict[str, Any]:
"""
Process items from the dispatch queue.
Args:
dispatcher: VPS agent dispatcher
queue: Dispatch queue
batch_size: Number of items to process in this batch
Returns:
Processing statistics
"""
processed = 0
success = 0
failed = 0
for _ in range(batch_size):
item = queue.get_next_item()
if not item:
break
item_id = item["id"]
agent_name = item["agent_name"]
command = item["command"]
# Update status to dispatching
queue.update_item(item_id, DispatchStatus.DISPATCHING)
# Dispatch the command
result = dispatcher.dispatch_hermes_command(
agent_name=agent_name,
hermes_command=command,
validate_first=True
)
# Update queue with result
if result.status == DispatchStatus.SUCCESS:
queue.update_item(item_id, DispatchStatus.SUCCESS, result)
success += 1
else:
# Check if we should retry
item_data = next((i for i in queue.queue if i.get("id") == item_id), None)
if item_data and item_data.get("retry_count", 0) < item_data.get("max_retries", 3):
queue.update_item(item_id, DispatchStatus.FAILED, result)
failed += 1
else:
# Max retries reached, remove from queue
queue.remove_item(item_id)
failed += 1
processed += 1
return {
"processed": processed,
"success": success,
"failed": failed,
"queue_stats": queue.get_stats()
}
# Example usage and testing
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
# Create dispatcher and queue
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Example: Add items to queue
queue.add_item("ezra", "cron list")
queue.add_item("timmy", "cron status")
# Process queue
stats = process_dispatch_queue(dispatcher, queue)
print(f"Processing stats: {stats}")
# Show queue stats
queue_stats = queue.get_stats()
print(f"Queue stats: {queue_stats}")

View File

@@ -653,12 +653,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# AIAgent.__init__ is missing params the scheduler expects.
_validate_agent_interface()
# Check if this is a dispatch job
if job.get("type") == "dispatch" or "dispatch" in job.get("name", "").lower():
return _run_dispatch_job(job)
from run_agent import AIAgent
# Initialize SQLite session store so cron job messages are persisted
@@ -1013,89 +1007,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
def _run_dispatch_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"""
Execute a dispatch job that SSHs into remote VPS machines.
Returns:
Tuple of (success, full_output_doc, final_response, error_message)
"""
from cron.dispatch_worker import VPSAgentDispatcher, DispatchQueue, process_dispatch_queue
job_id = job["id"]
job_name = job["name"]
logger.info("Running dispatch job '%s' (ID: %s)", job_name, job_id)
try:
# Load dispatch configuration
dispatcher = VPSAgentDispatcher()
queue = DispatchQueue()
# Get dispatch parameters from job
agent_name = job.get("agent_name", "ezra")
command = job.get("command", "cron list")
batch_size = job.get("batch_size", 5)
# Add command to queue if specified
if command:
queue.add_item(agent_name, command)
# Process the dispatch queue
stats = process_dispatch_queue(dispatcher, queue, batch_size)
# Generate output
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Agent:** {agent_name}
**Command:** {command}
## Dispatch Results
- **Processed:** {stats['processed']}
- **Success:** {stats['success']}
- **Failed:** {stats['failed']}
## Queue Statistics
- **Total items:** {stats['queue_stats']['total']}
- **Pending:** {stats['queue_stats']['pending']}
- **Success:** {stats['queue_stats']['success']}
- **Failed:** {stats['queue_stats']['failed']}
- **Retryable:** {stats['queue_stats']['retryable']}
## Status
{"✅ All dispatches successful" if stats['failed'] == 0 else f"⚠️ {stats['failed']} dispatches failed"}
"""
success = stats['failed'] == 0
error_message = None if success else f"{stats['failed']} dispatches failed"
return (success, output, output, error_message)
except Exception as e:
error_msg = f"Dispatch job failed: {str(e)}"
logger.error(error_msg, exc_info=True)
output = f"""# Dispatch Job: {job_name}
**Job ID:** {job_id}
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
**Status:** ❌ Failed
## Error
{error_msg}
"""
return (False, output, output, error_msg)
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
"""
Check and run all due jobs.

View File

@@ -0,0 +1,91 @@
# Memory Tier Ownership
Each fact lives in exactly one tier. This prevents duplicate tokens on every
prompt injection and eliminates stale-data divergence when one copy is updated
but not the other.
## Tier 1 — MEMORY.md (Built-in)
**Purpose:** Always-on system prompt context — compact, high-signal.
**Contains:**
- Operational notes and active task state
- Immediate context the agent needs every turn
- User preferences that affect agent behavior
**Constraints:**
- Keep under 50 entries (every byte costs prompt tokens)
- Entries >100 chars should migrate to the fact store
- Managed via the `memory` tool (add/replace/remove)
**Examples:**
- "Default model: mimo-v2-pro/Nous"
- "Alexander prefers action over narration"
- "Deploy via Ansible; wants one-command redeploy"
## Tier 2 — Fact Store (Holographic)
**Purpose:** Deep structured storage with search, reasoning, and trust scoring.
**Contains:**
- `user_pref` — User preferences and habits
- `project` — Project-specific facts and conventions
- `tool` — Tool quirks, API behaviors, environment details
- `general` — Everything else worth remembering
**Advantages over MEMORY.md:**
- FTS5 full-text search
- Entity resolution (link facts to people/projects/tools)
- Trust scoring (good facts rise, bad facts sink)
- Compositional reasoning (`reason` across multiple entities)
- Duplicate detection (UNIQUE constraint + similarity matching)
- Unlimited size
**Managed via:** `fact_store` tool (add/search/probe/related/reason/contradict/update/remove/list)
## Tier 3 — MemPalace
**Purpose:** Specialized long-form archives and multi-session research.
**Contains:**
- Detailed analysis and research notes
- Multi-session task context
- Structured "palace rooms" for domain-specific knowledge
## Migration Rules
| Condition | Destination |
|-----------|------------|
| Entry >100 chars | → fact store |
| Category is `user_pref`, `project`, `tool` | → fact store |
| Needs entity linking | → fact store |
| Needs trust scoring | → fact store |
| Short operational note (<80 chars) | → MEMORY.md |
| Always-on context | → MEMORY.md |
| When in doubt | → fact store |
## Cross-Tier Deduplication
**Problem:** The `on_memory_write` bridge mirrors MEMORY.md writes to the fact
store. Without dedup, the same fact exists in both tiers — wasting tokens and
risking stale data.
**Solution:**
1. `on_memory_write` checks the fact store for similar entries before mirroring
2. Similarity threshold: 0.85 (catches rephrasings, avoids false positives)
3. If duplicate found: skip the mirror (fact store entry is canonical)
4. `dedup` action on `fact_store` tool: runtime scan + auto-resolve
5. `MemoryManager.run_dedup_scan()`: programmatic cross-tier cleanup
**Resolution strategy:** Fact store wins by default — it has trust scoring,
FTS5, and entity resolution. MEMORY.md copies are removed.
## Running Dedup
```python
# Via tool
result = fact_store(action="dedup")
# Via MemoryManager
report = memory_manager.run_dedup_scan()
```

View File

@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list", "dedup"],
},
"content": {"type": "string", "description": "Fact content (required for 'add')."},
"query": {"type": "string", "description": "Search query (required for 'search')."},
@@ -242,27 +242,48 @@ class HolographicMemoryProvider(MemoryProvider):
self._auto_extract_facts(messages)
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes as facts.
"""Mirror built-in memory writes as facts with cross-tier dedup.
- add: mirror new fact to holographic store
- replace: search for old content, update or re-add
- remove: lower trust on matching facts so they fade naturally
- add: check for duplicates first, skip if fact already exists
- replace: search for old content, update or re-add (dedup-aware)
- remove: remove matching facts (hard remove, not trust decay)
Dedup strategy: before adding, search existing facts for near-matches.
If similarity > 0.85, skip the add (existing fact store entry wins).
"""
if not self._store:
return
try:
if action == "add" and content:
category = "user_pref" if target == "user" else "general"
# Cross-tier dedup: check if this fact already exists
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug(
"Skipping duplicate mirror: '%s' already exists as fact#%d",
content[:60], dup.get("fact_id", "?")
)
return
self._store.add_fact(content, category=category)
elif action == "replace" and content:
category = "user_pref" if target == "user" else "general"
# Check for duplicate before adding replacement
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug("Skipping duplicate replace mirror: fact#%d already matches", dup.get("fact_id", "?"))
return
self._store.add_fact(content, category=category)
elif action == "remove" and content:
# Lower trust on matching facts so they decay naturally
# Hard remove matching facts (not just trust decay)
results = self._store.search_facts(content, limit=5)
for fact in results:
if content.strip().lower() in fact.get("content", "").lower():
self._store.update_fact(fact["fact_id"], trust=max(0.0, fact.get("trust", 0.5) - 0.4))
self._store.remove_fact(fact["fact_id"])
logger.debug("Removed mirrored fact#%d on memory remove", fact["fact_id"])
except Exception as e:
logger.debug("Holographic memory_write mirror failed: %s", e)
@@ -351,6 +372,31 @@ class HolographicMemoryProvider(MemoryProvider):
)
return json.dumps({"facts": facts, "count": len(facts)})
elif action == "dedup":
from .dedup import scan_cross_tier_duplicates, resolve_duplicates, DedupReport
# Get all facts from store
all_facts = store.list_facts(min_trust=0.0, limit=1000)
# Get memory entries from built-in store (passed via kwargs if available)
memory_entries = kwargs.get("memory_entries", [])
if not memory_entries:
return json.dumps({
"status": "no_memory_entries",
"message": "No MEMORY.md entries provided for comparison. Use memory tool to read first.",
"fact_count": len(all_facts),
})
report = scan_cross_tier_duplicates(memory_entries, all_facts)
if report.duplicates_found == 0:
return json.dumps({"status": "clean", "message": "No cross-tier duplicates found."})
# Auto-resolve: fact store wins
cleaned = resolve_duplicates(report, memory_entries, store)
return json.dumps({
"status": "resolved",
"duplicates_found": report.duplicates_found,
"entries_removed": len(memory_entries) - len(cleaned),
"cleaned_entries": cleaned,
"summary": report.summary(),
})
else:
return json.dumps({"error": f"Unknown action: {action}"})

View File

@@ -0,0 +1,191 @@
"""Cross-tier memory deduplication.
Detects and resolves duplicate facts between MEMORY.md (built-in) and the
holographic fact store. Facts should live in exactly one tier:
Tier 1 — MEMORY.md: Always-on context (compact, <50 entries ideal).
Tier 2 — Fact store: Deep structured storage (unlimited, entity-aware).
Tier 3 — MemPalace: Specialized long-form archives.
Ownership rules:
- user_pref / project / tool facts → fact store (structured, searchable)
- "always-on" operational notes → MEMORY.md (compact, system prompt)
- When in doubt: fact store wins (it has dedup, trust scoring, FTS5)
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_SIMILARITY_THRESHOLD = 0.85
@dataclass
class DuplicatePair:
memory_entry: str
memory_index: int
fact_store_id: int
fact_store_content: str
similarity: float
resolution: str = ""
resolved: bool = False
@dataclass
class DedupReport:
total_memory_entries: int = 0
total_facts: int = 0
duplicates_found: int = 0
pairs: List[DuplicatePair] = field(default_factory=list)
def summary(self) -> str:
lines = [
f"Cross-tier dedup: {self.total_memory_entries} MEMORY.md entries, "
f"{self.total_facts} fact store entries, "
f"{self.duplicates_found} duplicates found",
]
for p in self.pairs:
status = f"[{p.resolution}]" if p.resolved else "[PENDING]"
lines.append(
f" {status} sim={p.similarity:.2f} "
f"mem[{p.memory_index}]: {p.memory_entry[:60]} "
f"<> fact#{p.fact_store_id}: {p.fact_store_content[:60]}"
)
return "\n".join(lines)
def _normalize(text: str) -> str:
text = text.strip().lower()
text = re.sub(r"^[\\s>*\\-\\u2022]+", "", text)
text = re.sub(r"\\s+", " ", text)
text = text.rstrip(".,;:!?",)
return text
def _similarity(a: str, b: str) -> float:
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
def scan_cross_tier_duplicates(
memory_entries: List[str],
fact_store_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> DedupReport:
report = DedupReport(
total_memory_entries=len(memory_entries),
total_facts=len(fact_store_facts),
)
for i, mem_line in enumerate(memory_entries):
mem_norm = _normalize(mem_line)
if not mem_norm or len(mem_norm) < 10:
continue
for fact in fact_store_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
sim = _similarity(mem_norm, fact_norm)
if sim >= threshold:
report.pairs.append(DuplicatePair(
memory_entry=mem_line,
memory_index=i,
fact_store_id=fact.get("fact_id", -1),
fact_store_content=fact.get("content", ""),
similarity=sim,
))
report.duplicates_found = len(report.pairs)
return report
def classify_tier(fact_content: str, category: str = "general") -> str:
if category in ("user_pref", "project", "tool"):
return "factstore"
content = fact_content.strip()
if len(content) < 80 and any(
kw in content.lower() for kw in ("todo", "note:", "fix:", "remember:", "always", "never")
):
return "memory"
return "factstore"
def resolve_pair(pair: DuplicatePair) -> str:
pair.resolution = "keep_factstore"
pair.resolved = True
return pair.resolution
def resolve_duplicates(
report: DedupReport,
memory_entries: List[str],
fact_store=None,
) -> List[str]:
indices_to_remove = set()
for pair in report.pairs:
resolve_pair(pair)
if pair.resolution == "keep_factstore":
indices_to_remove.add(pair.memory_index)
elif pair.resolution == "keep_memory" and fact_store:
try:
fact_store.remove_fact(pair.fact_store_id)
except Exception as e:
logger.debug("Failed to remove fact %d: %s", pair.fact_store_id, e)
cleaned = [e for i, e in enumerate(memory_entries) if i not in indices_to_remove]
removed = len(memory_entries) - len(cleaned)
if removed:
logger.info("Dedup removed %d duplicate entries from MEMORY.md", removed)
return cleaned
def is_duplicate_before_add(
content: str,
existing_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> Optional[Dict[str, Any]]:
"""Check if content is a duplicate of an existing fact BEFORE adding.
Returns the matching fact dict if duplicate, None otherwise.
Used by on_memory_write to prevent cross-tier duplication at write time.
"""
content_norm = _normalize(content)
if not content_norm or len(content_norm) < 10:
return None
for fact in existing_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
if _similarity(content_norm, fact_norm) >= threshold:
return fact
return None
TIER_OWNERSHIP_DOC = """# Memory Tier Ownership
Each fact lives in exactly one tier to prevent duplicate tokens and stale-data divergence.
## Tier 1 — MEMORY.md (built-in)
- Always-on system prompt context (compact, <50 entries ideal).
- Operational notes, active task state, immediate context.
- Managed by: `memory` tool.
## Tier 2 — Fact Store (holographic)
- Deep structured storage with search and reasoning.
- user_pref, project, tool facts; entity-linked knowledge.
- Managed by: `fact_store` tool.
- Has: FTS5 search, trust scoring, entity resolution.
## Tier 3 — MemPalace
- Specialized long-form archives and research.
## Rules
- MEMORY.md entries >100 chars → migrate to fact store.
- Structured categories (user_pref, project, tool) → fact store.
- Duplicate across tiers: fact store wins (it has trust scoring).
- `on_memory_write` checks fact store before mirroring.
"""

178
tests/test_memory_dedup.py Normal file
View File

@@ -0,0 +1,178 @@
"""Tests for cross-tier memory deduplication.
Tests the dedup module's normalize, similarity, scan, resolve, and
is_duplicate_before_add functions.
"""
import pytest
import sys
import os
# Add the plugins path so we can import dedup
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from dedup import (
_normalize,
_similarity,
scan_cross_tier_duplicates,
resolve_duplicates,
is_duplicate_before_add,
classify_tier,
DedupReport,
DuplicatePair,
)
class TestNormalize:
def test_basic_lowercasing(self):
assert _normalize("Hello World") == "hello world"
def test_strips_markdown_bullets(self):
assert _normalize("- some fact") == "some fact"
assert _normalize("* some fact") == "some fact"
assert _normalize(" - some fact ") == "some fact"
def test_strips_trailing_punctuation(self):
assert _normalize("some fact.") == "some fact"
assert _normalize("some fact,") == "some fact"
assert _normalize("some fact;") == "some fact"
def test_collapses_whitespace(self):
assert _normalize("some fact here") == "some fact here"
def test_empty_and_short(self):
assert _normalize("") == ""
assert _normalize(" ") == ""
assert _normalize("abc") == "abc"
class TestSimilarity:
def test_identical_strings(self):
assert _similarity("hello world", "hello world") == 1.0
def test_completely_different(self):
assert _similarity("abc", "xyz") < 0.3
def test_similar_rephrasing(self):
sim = _similarity("deploy via ansible", "deploy with ansible")
assert sim > 0.7
def test_empty_strings(self):
assert _similarity("", "hello") == 0.0
assert _similarity("hello", "") == 0.0
assert _similarity("", "") == 0.0
class TestScanCrossTierDuplicates:
def test_no_duplicates(self):
memory = ["Deploy via Ansible", "Use mimo-v2-pro model"]
facts = [
{"fact_id": 1, "content": "User prefers dark mode"},
{"fact_id": 2, "content": "Project uses Python 3.11"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
assert len(report.pairs) == 0
def test_exact_duplicate(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
assert report.pairs[0].similarity == 1.0
assert report.pairs[0].fact_store_id == 1
def test_near_duplicate_above_threshold(self):
memory = ["Alexander prefers action over narration"]
facts = [{"fact_id": 1, "content": "Alexander prefers action over narration."}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
def test_below_threshold_not_duplicate(self):
memory = ["Deploy via Ansible on VPS"]
facts = [{"fact_id": 1, "content": "Deploy via Docker on local machine"}]
report = scan_cross_tier_duplicates(memory, facts, threshold=0.85)
assert report.duplicates_found == 0
def test_short_entries_skipped(self):
memory = ["OK", "ab"]
facts = [{"fact_id": 1, "content": "OK"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
def test_multiple_duplicates(self):
memory = ["Fact A here", "Fact B here"]
facts = [
{"fact_id": 1, "content": "Fact A here"},
{"fact_id": 2, "content": "Fact B here"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 2
def test_report_summary(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
summary = report.summary()
assert "1 MEMORY.md entries" in summary
assert "1 fact store entries" in summary
assert "1 duplicates" in summary
class TestResolveDuplicates:
def test_removes_memory_duplicates(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 1
assert cleaned[0] == "Use Python 3.11"
def test_no_duplicates_returns_same(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Completely different fact"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 2
class TestIsDuplicateBeforeAdd:
def test_finds_duplicate(self):
existing = [{"fact_id": 1, "content": "Deploy via Ansible"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is not None
assert result["fact_id"] == 1
def test_no_duplicate_returns_none(self):
existing = [{"fact_id": 1, "content": "Use dark mode"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is None
def test_short_content_returns_none(self):
existing = [{"fact_id": 1, "content": "OK"}]
result = is_duplicate_before_add("OK", existing)
assert result is None
def test_empty_existing_returns_none(self):
result = is_duplicate_before_add("Some fact here", [])
assert result is None
class TestClassifyTier:
def test_user_pref_goes_to_factstore(self):
assert classify_tier("anything", "user_pref") == "factstore"
def test_project_goes_to_factstore(self):
assert classify_tier("anything", "project") == "factstore"
def test_short_operational_note_goes_to_memory(self):
assert classify_tier("remember: always use sudo") == "memory"
assert classify_tier("todo: fix the deploy script") == "memory"
def test_long_fact_goes_to_factstore(self):
long_fact = "The deployment process requires running ansible-playbook with the production inventory file and verifying health checks after completion"
assert classify_tier(long_fact) == "factstore"
def test_general_short_goes_to_factstore(self):
# Short but not operational
assert classify_tier("user likes dark mode") == "factstore"