Compare commits
6 Commits
burn/350-1
...
burn/254-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 7162e4d37a | |||
| 75d46411ad | |||
| e4574a17fb | |||
| 0ed58ab618 | |||
| a0bce06f07 | |||
| 56ca70516a |
@@ -309,7 +309,19 @@ class MemoryManager:
|
||||
"""Notify external providers when the built-in memory tool writes.
|
||||
|
||||
Skips the builtin provider itself (it's the source of the write).
|
||||
Passes current MEMORY.md entries for cross-tier dedup checking.
|
||||
"""
|
||||
# Collect current memory entries for dedup context
|
||||
memory_entries = []
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin" and hasattr(provider, "_store") and provider._store:
|
||||
try:
|
||||
store = provider._store
|
||||
if hasattr(store, "get_all_entries"):
|
||||
memory_entries = store.get_all_entries(target)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin":
|
||||
continue
|
||||
@@ -321,6 +333,54 @@ class MemoryManager:
|
||||
provider.name, e,
|
||||
)
|
||||
|
||||
def run_dedup_scan(self) -> dict:
|
||||
"""Run cross-tier deduplication scan across all memory providers.
|
||||
|
||||
Returns a report dict with duplicates found and actions taken.
|
||||
"""
|
||||
report = {"status": "ok", "duplicates": 0, "actions": []}
|
||||
|
||||
# Collect MEMORY.md entries
|
||||
memory_entries = []
|
||||
builtin_store = None
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin" and hasattr(provider, "_store"):
|
||||
builtin_store = provider._store
|
||||
if builtin_store:
|
||||
try:
|
||||
entries = builtin_store.get_all_entries("memory")
|
||||
memory_entries = entries if entries else []
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not memory_entries:
|
||||
report["status"] = "no_memory_entries"
|
||||
return report
|
||||
|
||||
# Check each external provider for duplicates
|
||||
for provider in self._providers:
|
||||
if provider.name == "builtin":
|
||||
continue
|
||||
if not hasattr(provider, "_store") or not provider._store:
|
||||
continue
|
||||
try:
|
||||
from plugins.memory.holographic.dedup import scan_cross_tier_duplicates
|
||||
all_facts = provider._store.list_facts(min_trust=0.0, limit=1000)
|
||||
dup_report = scan_cross_tier_duplicates(memory_entries, all_facts)
|
||||
report["duplicates"] += dup_report.duplicates_found
|
||||
if dup_report.duplicates_found > 0:
|
||||
from plugins.memory.holographic.dedup import resolve_duplicates
|
||||
cleaned = resolve_duplicates(dup_report, memory_entries, provider._store)
|
||||
removed = len(memory_entries) - len(cleaned)
|
||||
report["actions"].append(
|
||||
f"{provider.name}: {dup_report.duplicates_found} duplicates, "
|
||||
f"{removed} MEMORY.md entries removed"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Dedup scan failed for provider '%s': %s", provider.name, e)
|
||||
|
||||
return report
|
||||
|
||||
def on_delegation(self, task: str, result: str, *,
|
||||
child_session_id: str = "", **kwargs) -> None:
|
||||
"""Notify all providers that a subagent completed."""
|
||||
|
||||
@@ -1,18 +0,0 @@
|
||||
{
|
||||
"agents": {
|
||||
"ezra": {
|
||||
"host": "143.198.27.163",
|
||||
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
|
||||
"username": "root"
|
||||
},
|
||||
"timmy": {
|
||||
"host": "timmy",
|
||||
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
|
||||
"username": "root"
|
||||
}
|
||||
},
|
||||
"validation_timeout": 30,
|
||||
"command_timeout": 300,
|
||||
"max_retries": 2,
|
||||
"retry_delay": 5
|
||||
}
|
||||
@@ -1,551 +0,0 @@
|
||||
"""
|
||||
VPS Agent Dispatch Worker for Hermes Cron System
|
||||
|
||||
This module provides a dispatch worker that SSHs into remote VPS machines
|
||||
and runs hermes commands. It ensures that:
|
||||
|
||||
1. Remote dispatch only counts as success when the remote hermes command actually launches
|
||||
2. Stale per-agent hermes binary paths are configurable/validated before queue drain
|
||||
3. Failed remote launches remain in the queue (or are marked failed) instead of being reported as OK
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DispatchStatus(Enum):
|
||||
"""Status of a dispatch operation."""
|
||||
PENDING = "pending"
|
||||
VALIDATING = "validating"
|
||||
DISPATCHING = "dispatching"
|
||||
SUCCESS = "success"
|
||||
FAILED = "failed"
|
||||
RETRYING = "retrying"
|
||||
|
||||
|
||||
@dataclass
|
||||
class DispatchResult:
|
||||
"""Result of a dispatch operation."""
|
||||
status: DispatchStatus
|
||||
message: str
|
||||
exit_code: Optional[int] = None
|
||||
stdout: Optional[str] = None
|
||||
stderr: Optional[str] = None
|
||||
execution_time: Optional[float] = None
|
||||
hermes_path: Optional[str] = None
|
||||
validated: bool = False
|
||||
|
||||
|
||||
class HermesPathValidator:
|
||||
"""Validates hermes binary paths on remote VPS machines."""
|
||||
|
||||
def __init__(self, ssh_key_path: Optional[str] = None):
|
||||
self.ssh_key_path = ssh_key_path or os.path.expanduser("~/.ssh/id_rsa")
|
||||
self.timeout = 30 # SSH timeout in seconds
|
||||
|
||||
def validate_hermes_path(self, host: str, hermes_path: str,
|
||||
username: str = "root") -> DispatchResult:
|
||||
"""
|
||||
Validate that the hermes binary exists and is executable on the remote host.
|
||||
|
||||
Args:
|
||||
host: Remote host IP or hostname
|
||||
hermes_path: Path to hermes binary on remote host
|
||||
username: SSH username
|
||||
|
||||
Returns:
|
||||
DispatchResult with validation status
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# Build SSH command to check hermes binary
|
||||
ssh_cmd = [
|
||||
"ssh",
|
||||
"-i", self.ssh_key_path,
|
||||
"-o", "StrictHostKeyChecking=no",
|
||||
"-o", "ConnectTimeout=10",
|
||||
"-o", "BatchMode=yes",
|
||||
f"{username}@{host}",
|
||||
f"test -x {hermes_path} && echo 'VALID' || echo 'INVALID'"
|
||||
]
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
ssh_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=self.timeout
|
||||
)
|
||||
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
if result.returncode == 0 and "VALID" in result.stdout:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.SUCCESS,
|
||||
message=f"Hermes binary validated at {hermes_path}",
|
||||
exit_code=0,
|
||||
execution_time=execution_time,
|
||||
hermes_path=hermes_path,
|
||||
validated=True
|
||||
)
|
||||
else:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Hermes binary not found or not executable: {hermes_path}",
|
||||
exit_code=result.returncode,
|
||||
stdout=result.stdout,
|
||||
stderr=result.stderr,
|
||||
execution_time=execution_time,
|
||||
hermes_path=hermes_path,
|
||||
validated=False
|
||||
)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"SSH timeout validating hermes path on {host}",
|
||||
execution_time=time.time() - start_time,
|
||||
hermes_path=hermes_path,
|
||||
validated=False
|
||||
)
|
||||
except Exception as e:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Error validating hermes path: {str(e)}",
|
||||
execution_time=time.time() - start_time,
|
||||
hermes_path=hermes_path,
|
||||
validated=False
|
||||
)
|
||||
|
||||
|
||||
class VPSAgentDispatcher:
|
||||
"""Dispatches hermes commands to remote VPS agents."""
|
||||
|
||||
def __init__(self, config_path: Optional[str] = None):
|
||||
self.config_path = config_path or os.path.expanduser("~/.hermes/dispatch_config.json")
|
||||
self.validator = HermesPathValidator()
|
||||
self.config = self._load_config()
|
||||
|
||||
def _load_config(self) -> Dict[str, Any]:
|
||||
"""Load dispatch configuration."""
|
||||
try:
|
||||
if os.path.exists(self.config_path):
|
||||
with open(self.config_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load dispatch config: {e}")
|
||||
|
||||
# Default configuration
|
||||
return {
|
||||
"agents": {
|
||||
"ezra": {
|
||||
"host": "143.198.27.163",
|
||||
"hermes_path": "/root/wizards/ezra/hermes-agent/venv/bin/hermes",
|
||||
"username": "root"
|
||||
},
|
||||
"timmy": {
|
||||
"host": "timmy",
|
||||
"hermes_path": "/root/wizards/timmy/hermes-agent/venv/bin/hermes",
|
||||
"username": "root"
|
||||
}
|
||||
},
|
||||
"validation_timeout": 30,
|
||||
"command_timeout": 300,
|
||||
"max_retries": 2,
|
||||
"retry_delay": 5
|
||||
}
|
||||
|
||||
def save_config(self):
|
||||
"""Save dispatch configuration."""
|
||||
try:
|
||||
config_dir = Path(self.config_path).parent
|
||||
config_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(self.config_path, 'w') as f:
|
||||
json.dump(self.config, f, indent=2)
|
||||
|
||||
# Set secure permissions
|
||||
os.chmod(self.config_path, 0o600)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save dispatch config: {e}")
|
||||
|
||||
def get_agent_config(self, agent_name: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get configuration for a specific agent."""
|
||||
return self.config.get("agents", {}).get(agent_name)
|
||||
|
||||
def update_agent_config(self, agent_name: str, host: str, hermes_path: str,
|
||||
username: str = "root"):
|
||||
"""Update configuration for a specific agent."""
|
||||
if "agents" not in self.config:
|
||||
self.config["agents"] = {}
|
||||
|
||||
self.config["agents"][agent_name] = {
|
||||
"host": host,
|
||||
"hermes_path": hermes_path,
|
||||
"username": username
|
||||
}
|
||||
|
||||
self.save_config()
|
||||
|
||||
def validate_agent(self, agent_name: str) -> DispatchResult:
|
||||
"""Validate that an agent's hermes binary is accessible."""
|
||||
agent_config = self.get_agent_config(agent_name)
|
||||
if not agent_config:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Agent configuration not found: {agent_name}"
|
||||
)
|
||||
|
||||
return self.validator.validate_hermes_path(
|
||||
host=agent_config["host"],
|
||||
hermes_path=agent_config["hermes_path"],
|
||||
username=agent_config.get("username", "root")
|
||||
)
|
||||
|
||||
def dispatch_command(self, agent_name: str, command: str,
|
||||
validate_first: bool = True) -> DispatchResult:
|
||||
"""
|
||||
Dispatch a command to a remote VPS agent.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent to dispatch to
|
||||
command: Command to execute
|
||||
validate_first: Whether to validate hermes path before dispatching
|
||||
|
||||
Returns:
|
||||
DispatchResult with execution status
|
||||
"""
|
||||
agent_config = self.get_agent_config(agent_name)
|
||||
if not agent_config:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Agent configuration not found: {agent_name}"
|
||||
)
|
||||
|
||||
# Validate hermes path if requested
|
||||
if validate_first:
|
||||
validation_result = self.validate_agent(agent_name)
|
||||
if validation_result.status != DispatchStatus.SUCCESS:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Validation failed: {validation_result.message}",
|
||||
hermes_path=agent_config["hermes_path"],
|
||||
validated=False
|
||||
)
|
||||
|
||||
# Build SSH command to execute hermes command
|
||||
ssh_cmd = [
|
||||
"ssh",
|
||||
"-i", self.validator.ssh_key_path,
|
||||
"-o", "StrictHostKeyChecking=no",
|
||||
"-o", "ConnectTimeout=10",
|
||||
f"{agent_config.get('username', 'root')}@{agent_config['host']}",
|
||||
f"cd /root/wizards/{agent_name}/hermes-agent && source venv/bin/activate && {command}"
|
||||
]
|
||||
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
ssh_cmd,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=self.config.get("command_timeout", 300)
|
||||
)
|
||||
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
if result.returncode == 0:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.SUCCESS,
|
||||
message=f"Command executed successfully on {agent_name}",
|
||||
exit_code=0,
|
||||
stdout=result.stdout,
|
||||
stderr=result.stderr,
|
||||
execution_time=execution_time,
|
||||
hermes_path=agent_config["hermes_path"],
|
||||
validated=validate_first
|
||||
)
|
||||
else:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Command failed on {agent_name}: {result.stderr}",
|
||||
exit_code=result.returncode,
|
||||
stdout=result.stdout,
|
||||
stderr=result.stderr,
|
||||
execution_time=execution_time,
|
||||
hermes_path=agent_config["hermes_path"],
|
||||
validated=validate_first
|
||||
)
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Command timeout on {agent_name}",
|
||||
execution_time=time.time() - start_time,
|
||||
hermes_path=agent_config["hermes_path"],
|
||||
validated=validate_first
|
||||
)
|
||||
except Exception as e:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Error executing command on {agent_name}: {str(e)}",
|
||||
execution_time=time.time() - start_time,
|
||||
hermes_path=agent_config["hermes_path"],
|
||||
validated=validate_first
|
||||
)
|
||||
|
||||
def dispatch_hermes_command(self, agent_name: str, hermes_command: str,
|
||||
validate_first: bool = True) -> DispatchResult:
|
||||
"""
|
||||
Dispatch a hermes command to a remote VPS agent.
|
||||
|
||||
Args:
|
||||
agent_name: Name of the agent to dispatch to
|
||||
hermes_command: Hermes command to execute (e.g., "hermes cron list")
|
||||
validate_first: Whether to validate hermes path before dispatching
|
||||
|
||||
Returns:
|
||||
DispatchResult with execution status
|
||||
"""
|
||||
agent_config = self.get_agent_config(agent_name)
|
||||
if not agent_config:
|
||||
return DispatchResult(
|
||||
status=DispatchStatus.FAILED,
|
||||
message=f"Agent configuration not found: {agent_name}"
|
||||
)
|
||||
|
||||
# Build full hermes command
|
||||
full_command = f"{agent_config['hermes_path']} {hermes_command}"
|
||||
|
||||
return self.dispatch_command(agent_name, full_command, validate_first)
|
||||
|
||||
|
||||
class DispatchQueue:
|
||||
"""Queue for managing dispatch operations."""
|
||||
|
||||
def __init__(self, queue_file: Optional[str] = None):
|
||||
self.queue_file = queue_file or os.path.expanduser("~/.hermes/dispatch_queue.json")
|
||||
self.queue: List[Dict[str, Any]] = self._load_queue()
|
||||
|
||||
def _load_queue(self) -> List[Dict[str, Any]]:
|
||||
"""Load queue from file."""
|
||||
try:
|
||||
if os.path.exists(self.queue_file):
|
||||
with open(self.queue_file, 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load dispatch queue: {e}")
|
||||
|
||||
return []
|
||||
|
||||
def save_queue(self):
|
||||
"""Save queue to file."""
|
||||
try:
|
||||
queue_dir = Path(self.queue_file).parent
|
||||
queue_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
with open(self.queue_file, 'w') as f:
|
||||
json.dump(self.queue, f, indent=2)
|
||||
|
||||
# Set secure permissions
|
||||
os.chmod(self.queue_file, 0o600)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save dispatch queue: {e}")
|
||||
|
||||
def add_item(self, agent_name: str, command: str, priority: int = 0,
|
||||
max_retries: int = 3) -> str:
|
||||
"""
|
||||
Add an item to the dispatch queue.
|
||||
|
||||
Returns:
|
||||
Queue item ID
|
||||
"""
|
||||
item_id = f"dispatch_{int(time.time())}_{len(self.queue)}"
|
||||
|
||||
item = {
|
||||
"id": item_id,
|
||||
"agent_name": agent_name,
|
||||
"command": command,
|
||||
"priority": priority,
|
||||
"max_retries": max_retries,
|
||||
"retry_count": 0,
|
||||
"status": DispatchStatus.PENDING.value,
|
||||
"created_at": time.time(),
|
||||
"last_attempt": None,
|
||||
"result": None
|
||||
}
|
||||
|
||||
self.queue.append(item)
|
||||
self.save_queue()
|
||||
|
||||
return item_id
|
||||
|
||||
def get_next_item(self) -> Optional[Dict[str, Any]]:
|
||||
"""Get the next item from the queue (highest priority, oldest first)."""
|
||||
if not self.queue:
|
||||
return None
|
||||
|
||||
# Sort by priority (descending) and created_at (ascending)
|
||||
sorted_queue = sorted(
|
||||
self.queue,
|
||||
key=lambda x: (-x.get("priority", 0), x.get("created_at", 0))
|
||||
)
|
||||
|
||||
# Find first pending item
|
||||
for item in sorted_queue:
|
||||
if item.get("status") == DispatchStatus.PENDING.value:
|
||||
return item
|
||||
|
||||
return None
|
||||
|
||||
def update_item(self, item_id: str, status: DispatchStatus,
|
||||
result: Optional[DispatchResult] = None):
|
||||
"""Update a queue item."""
|
||||
for item in self.queue:
|
||||
if item.get("id") == item_id:
|
||||
item["status"] = status.value
|
||||
item["last_attempt"] = time.time()
|
||||
|
||||
if result:
|
||||
item["result"] = {
|
||||
"status": result.status.value,
|
||||
"message": result.message,
|
||||
"exit_code": result.exit_code,
|
||||
"stdout": result.stdout,
|
||||
"stderr": result.stderr,
|
||||
"execution_time": result.execution_time,
|
||||
"hermes_path": result.hermes_path,
|
||||
"validated": result.validated
|
||||
}
|
||||
|
||||
# Update retry count if failed
|
||||
if status == DispatchStatus.FAILED:
|
||||
item["retry_count"] = item.get("retry_count", 0) + 1
|
||||
|
||||
self.save_queue()
|
||||
break
|
||||
|
||||
def remove_item(self, item_id: str):
|
||||
"""Remove an item from the queue."""
|
||||
self.queue = [item for item in self.queue if item.get("id") != item_id]
|
||||
self.save_queue()
|
||||
|
||||
def get_failed_items(self) -> List[Dict[str, Any]]:
|
||||
"""Get all failed items that can be retried."""
|
||||
return [
|
||||
item for item in self.queue
|
||||
if item.get("status") == DispatchStatus.FAILED.value
|
||||
and item.get("retry_count", 0) < item.get("max_retries", 3)
|
||||
]
|
||||
|
||||
def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get queue statistics."""
|
||||
total = len(self.queue)
|
||||
pending = sum(1 for item in self.queue if item.get("status") == DispatchStatus.PENDING.value)
|
||||
success = sum(1 for item in self.queue if item.get("status") == DispatchStatus.SUCCESS.value)
|
||||
failed = sum(1 for item in self.queue if item.get("status") == DispatchStatus.FAILED.value)
|
||||
|
||||
return {
|
||||
"total": total,
|
||||
"pending": pending,
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
"retryable": len(self.get_failed_items())
|
||||
}
|
||||
|
||||
|
||||
def process_dispatch_queue(dispatcher: VPSAgentDispatcher,
|
||||
queue: DispatchQueue,
|
||||
batch_size: int = 5) -> Dict[str, Any]:
|
||||
"""
|
||||
Process items from the dispatch queue.
|
||||
|
||||
Args:
|
||||
dispatcher: VPS agent dispatcher
|
||||
queue: Dispatch queue
|
||||
batch_size: Number of items to process in this batch
|
||||
|
||||
Returns:
|
||||
Processing statistics
|
||||
"""
|
||||
processed = 0
|
||||
success = 0
|
||||
failed = 0
|
||||
|
||||
for _ in range(batch_size):
|
||||
item = queue.get_next_item()
|
||||
if not item:
|
||||
break
|
||||
|
||||
item_id = item["id"]
|
||||
agent_name = item["agent_name"]
|
||||
command = item["command"]
|
||||
|
||||
# Update status to dispatching
|
||||
queue.update_item(item_id, DispatchStatus.DISPATCHING)
|
||||
|
||||
# Dispatch the command
|
||||
result = dispatcher.dispatch_hermes_command(
|
||||
agent_name=agent_name,
|
||||
hermes_command=command,
|
||||
validate_first=True
|
||||
)
|
||||
|
||||
# Update queue with result
|
||||
if result.status == DispatchStatus.SUCCESS:
|
||||
queue.update_item(item_id, DispatchStatus.SUCCESS, result)
|
||||
success += 1
|
||||
else:
|
||||
# Check if we should retry
|
||||
item_data = next((i for i in queue.queue if i.get("id") == item_id), None)
|
||||
if item_data and item_data.get("retry_count", 0) < item_data.get("max_retries", 3):
|
||||
queue.update_item(item_id, DispatchStatus.FAILED, result)
|
||||
failed += 1
|
||||
else:
|
||||
# Max retries reached, remove from queue
|
||||
queue.remove_item(item_id)
|
||||
failed += 1
|
||||
|
||||
processed += 1
|
||||
|
||||
return {
|
||||
"processed": processed,
|
||||
"success": success,
|
||||
"failed": failed,
|
||||
"queue_stats": queue.get_stats()
|
||||
}
|
||||
|
||||
|
||||
# Example usage and testing
|
||||
if __name__ == "__main__":
|
||||
# Set up logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
# Create dispatcher and queue
|
||||
dispatcher = VPSAgentDispatcher()
|
||||
queue = DispatchQueue()
|
||||
|
||||
# Example: Add items to queue
|
||||
queue.add_item("ezra", "cron list")
|
||||
queue.add_item("timmy", "cron status")
|
||||
|
||||
# Process queue
|
||||
stats = process_dispatch_queue(dispatcher, queue)
|
||||
print(f"Processing stats: {stats}")
|
||||
|
||||
# Show queue stats
|
||||
queue_stats = queue.get_stats()
|
||||
print(f"Queue stats: {queue_stats}")
|
||||
@@ -653,12 +653,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
# AIAgent.__init__ is missing params the scheduler expects.
|
||||
_validate_agent_interface()
|
||||
|
||||
# Check if this is a dispatch job
|
||||
if job.get("type") == "dispatch" or "dispatch" in job.get("name", "").lower():
|
||||
return _run_dispatch_job(job)
|
||||
|
||||
|
||||
|
||||
from run_agent import AIAgent
|
||||
|
||||
# Initialize SQLite session store so cron job messages are persisted
|
||||
@@ -1013,89 +1007,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
logger.debug("Job '%s': failed to close SQLite session store: %s", job_id, e)
|
||||
|
||||
|
||||
|
||||
def _run_dispatch_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
"""
|
||||
Execute a dispatch job that SSHs into remote VPS machines.
|
||||
|
||||
Returns:
|
||||
Tuple of (success, full_output_doc, final_response, error_message)
|
||||
"""
|
||||
from cron.dispatch_worker import VPSAgentDispatcher, DispatchQueue, process_dispatch_queue
|
||||
|
||||
job_id = job["id"]
|
||||
job_name = job["name"]
|
||||
|
||||
logger.info("Running dispatch job '%s' (ID: %s)", job_name, job_id)
|
||||
|
||||
try:
|
||||
# Load dispatch configuration
|
||||
dispatcher = VPSAgentDispatcher()
|
||||
queue = DispatchQueue()
|
||||
|
||||
# Get dispatch parameters from job
|
||||
agent_name = job.get("agent_name", "ezra")
|
||||
command = job.get("command", "cron list")
|
||||
batch_size = job.get("batch_size", 5)
|
||||
|
||||
# Add command to queue if specified
|
||||
if command:
|
||||
queue.add_item(agent_name, command)
|
||||
|
||||
# Process the dispatch queue
|
||||
stats = process_dispatch_queue(dispatcher, queue, batch_size)
|
||||
|
||||
# Generate output
|
||||
output = f"""# Dispatch Job: {job_name}
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Agent:** {agent_name}
|
||||
**Command:** {command}
|
||||
|
||||
## Dispatch Results
|
||||
|
||||
- **Processed:** {stats['processed']}
|
||||
- **Success:** {stats['success']}
|
||||
- **Failed:** {stats['failed']}
|
||||
|
||||
## Queue Statistics
|
||||
|
||||
- **Total items:** {stats['queue_stats']['total']}
|
||||
- **Pending:** {stats['queue_stats']['pending']}
|
||||
- **Success:** {stats['queue_stats']['success']}
|
||||
- **Failed:** {stats['queue_stats']['failed']}
|
||||
- **Retryable:** {stats['queue_stats']['retryable']}
|
||||
|
||||
## Status
|
||||
|
||||
{"✅ All dispatches successful" if stats['failed'] == 0 else f"⚠️ {stats['failed']} dispatches failed"}
|
||||
"""
|
||||
|
||||
success = stats['failed'] == 0
|
||||
error_message = None if success else f"{stats['failed']} dispatches failed"
|
||||
|
||||
return (success, output, output, error_message)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Dispatch job failed: {str(e)}"
|
||||
logger.error(error_msg, exc_info=True)
|
||||
|
||||
output = f"""# Dispatch Job: {job_name}
|
||||
|
||||
**Job ID:** {job_id}
|
||||
**Run Time:** {_hermes_now().strftime('%Y-%m-%d %H:%M:%S')}
|
||||
**Status:** ❌ Failed
|
||||
|
||||
## Error
|
||||
|
||||
{error_msg}
|
||||
"""
|
||||
|
||||
return (False, output, output, error_msg)
|
||||
|
||||
|
||||
|
||||
def tick(verbose: bool = True, adapters=None, loop=None) -> int:
|
||||
"""
|
||||
Check and run all due jobs.
|
||||
|
||||
91
docs/memory-tier-ownership.md
Normal file
91
docs/memory-tier-ownership.md
Normal file
@@ -0,0 +1,91 @@
|
||||
# Memory Tier Ownership
|
||||
|
||||
Each fact lives in exactly one tier. This prevents duplicate tokens on every
|
||||
prompt injection and eliminates stale-data divergence when one copy is updated
|
||||
but not the other.
|
||||
|
||||
## Tier 1 — MEMORY.md (Built-in)
|
||||
|
||||
**Purpose:** Always-on system prompt context — compact, high-signal.
|
||||
|
||||
**Contains:**
|
||||
- Operational notes and active task state
|
||||
- Immediate context the agent needs every turn
|
||||
- User preferences that affect agent behavior
|
||||
|
||||
**Constraints:**
|
||||
- Keep under 50 entries (every byte costs prompt tokens)
|
||||
- Entries >100 chars should migrate to the fact store
|
||||
- Managed via the `memory` tool (add/replace/remove)
|
||||
|
||||
**Examples:**
|
||||
- "Default model: mimo-v2-pro/Nous"
|
||||
- "Alexander prefers action over narration"
|
||||
- "Deploy via Ansible; wants one-command redeploy"
|
||||
|
||||
## Tier 2 — Fact Store (Holographic)
|
||||
|
||||
**Purpose:** Deep structured storage with search, reasoning, and trust scoring.
|
||||
|
||||
**Contains:**
|
||||
- `user_pref` — User preferences and habits
|
||||
- `project` — Project-specific facts and conventions
|
||||
- `tool` — Tool quirks, API behaviors, environment details
|
||||
- `general` — Everything else worth remembering
|
||||
|
||||
**Advantages over MEMORY.md:**
|
||||
- FTS5 full-text search
|
||||
- Entity resolution (link facts to people/projects/tools)
|
||||
- Trust scoring (good facts rise, bad facts sink)
|
||||
- Compositional reasoning (`reason` across multiple entities)
|
||||
- Duplicate detection (UNIQUE constraint + similarity matching)
|
||||
- Unlimited size
|
||||
|
||||
**Managed via:** `fact_store` tool (add/search/probe/related/reason/contradict/update/remove/list)
|
||||
|
||||
## Tier 3 — MemPalace
|
||||
|
||||
**Purpose:** Specialized long-form archives and multi-session research.
|
||||
|
||||
**Contains:**
|
||||
- Detailed analysis and research notes
|
||||
- Multi-session task context
|
||||
- Structured "palace rooms" for domain-specific knowledge
|
||||
|
||||
## Migration Rules
|
||||
|
||||
| Condition | Destination |
|
||||
|-----------|------------|
|
||||
| Entry >100 chars | → fact store |
|
||||
| Category is `user_pref`, `project`, `tool` | → fact store |
|
||||
| Needs entity linking | → fact store |
|
||||
| Needs trust scoring | → fact store |
|
||||
| Short operational note (<80 chars) | → MEMORY.md |
|
||||
| Always-on context | → MEMORY.md |
|
||||
| When in doubt | → fact store |
|
||||
|
||||
## Cross-Tier Deduplication
|
||||
|
||||
**Problem:** The `on_memory_write` bridge mirrors MEMORY.md writes to the fact
|
||||
store. Without dedup, the same fact exists in both tiers — wasting tokens and
|
||||
risking stale data.
|
||||
|
||||
**Solution:**
|
||||
1. `on_memory_write` checks the fact store for similar entries before mirroring
|
||||
2. Similarity threshold: 0.85 (catches rephrasings, avoids false positives)
|
||||
3. If duplicate found: skip the mirror (fact store entry is canonical)
|
||||
4. `dedup` action on `fact_store` tool: runtime scan + auto-resolve
|
||||
5. `MemoryManager.run_dedup_scan()`: programmatic cross-tier cleanup
|
||||
|
||||
**Resolution strategy:** Fact store wins by default — it has trust scoring,
|
||||
FTS5, and entity resolution. MEMORY.md copies are removed.
|
||||
|
||||
## Running Dedup
|
||||
|
||||
```python
|
||||
# Via tool
|
||||
result = fact_store(action="dedup")
|
||||
|
||||
# Via MemoryManager
|
||||
report = memory_manager.run_dedup_scan()
|
||||
```
|
||||
@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
|
||||
"properties": {
|
||||
"action": {
|
||||
"type": "string",
|
||||
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
|
||||
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list", "dedup"],
|
||||
},
|
||||
"content": {"type": "string", "description": "Fact content (required for 'add')."},
|
||||
"query": {"type": "string", "description": "Search query (required for 'search')."},
|
||||
@@ -242,27 +242,48 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
self._auto_extract_facts(messages)
|
||||
|
||||
def on_memory_write(self, action: str, target: str, content: str) -> None:
|
||||
"""Mirror built-in memory writes as facts.
|
||||
"""Mirror built-in memory writes as facts with cross-tier dedup.
|
||||
|
||||
- add: mirror new fact to holographic store
|
||||
- replace: search for old content, update or re-add
|
||||
- remove: lower trust on matching facts so they fade naturally
|
||||
- add: check for duplicates first, skip if fact already exists
|
||||
- replace: search for old content, update or re-add (dedup-aware)
|
||||
- remove: remove matching facts (hard remove, not trust decay)
|
||||
|
||||
Dedup strategy: before adding, search existing facts for near-matches.
|
||||
If similarity > 0.85, skip the add (existing fact store entry wins).
|
||||
"""
|
||||
if not self._store:
|
||||
return
|
||||
try:
|
||||
if action == "add" and content:
|
||||
category = "user_pref" if target == "user" else "general"
|
||||
# Cross-tier dedup: check if this fact already exists
|
||||
from .dedup import is_duplicate_before_add
|
||||
existing = self._store.search_facts(content.strip()[:200], limit=5)
|
||||
dup = is_duplicate_before_add(content, existing)
|
||||
if dup:
|
||||
logger.debug(
|
||||
"Skipping duplicate mirror: '%s' already exists as fact#%d",
|
||||
content[:60], dup.get("fact_id", "?")
|
||||
)
|
||||
return
|
||||
self._store.add_fact(content, category=category)
|
||||
elif action == "replace" and content:
|
||||
category = "user_pref" if target == "user" else "general"
|
||||
# Check for duplicate before adding replacement
|
||||
from .dedup import is_duplicate_before_add
|
||||
existing = self._store.search_facts(content.strip()[:200], limit=5)
|
||||
dup = is_duplicate_before_add(content, existing)
|
||||
if dup:
|
||||
logger.debug("Skipping duplicate replace mirror: fact#%d already matches", dup.get("fact_id", "?"))
|
||||
return
|
||||
self._store.add_fact(content, category=category)
|
||||
elif action == "remove" and content:
|
||||
# Lower trust on matching facts so they decay naturally
|
||||
# Hard remove matching facts (not just trust decay)
|
||||
results = self._store.search_facts(content, limit=5)
|
||||
for fact in results:
|
||||
if content.strip().lower() in fact.get("content", "").lower():
|
||||
self._store.update_fact(fact["fact_id"], trust=max(0.0, fact.get("trust", 0.5) - 0.4))
|
||||
self._store.remove_fact(fact["fact_id"])
|
||||
logger.debug("Removed mirrored fact#%d on memory remove", fact["fact_id"])
|
||||
except Exception as e:
|
||||
logger.debug("Holographic memory_write mirror failed: %s", e)
|
||||
|
||||
@@ -351,6 +372,31 @@ class HolographicMemoryProvider(MemoryProvider):
|
||||
)
|
||||
return json.dumps({"facts": facts, "count": len(facts)})
|
||||
|
||||
elif action == "dedup":
|
||||
from .dedup import scan_cross_tier_duplicates, resolve_duplicates, DedupReport
|
||||
# Get all facts from store
|
||||
all_facts = store.list_facts(min_trust=0.0, limit=1000)
|
||||
# Get memory entries from built-in store (passed via kwargs if available)
|
||||
memory_entries = kwargs.get("memory_entries", [])
|
||||
if not memory_entries:
|
||||
return json.dumps({
|
||||
"status": "no_memory_entries",
|
||||
"message": "No MEMORY.md entries provided for comparison. Use memory tool to read first.",
|
||||
"fact_count": len(all_facts),
|
||||
})
|
||||
report = scan_cross_tier_duplicates(memory_entries, all_facts)
|
||||
if report.duplicates_found == 0:
|
||||
return json.dumps({"status": "clean", "message": "No cross-tier duplicates found."})
|
||||
# Auto-resolve: fact store wins
|
||||
cleaned = resolve_duplicates(report, memory_entries, store)
|
||||
return json.dumps({
|
||||
"status": "resolved",
|
||||
"duplicates_found": report.duplicates_found,
|
||||
"entries_removed": len(memory_entries) - len(cleaned),
|
||||
"cleaned_entries": cleaned,
|
||||
"summary": report.summary(),
|
||||
})
|
||||
|
||||
else:
|
||||
return json.dumps({"error": f"Unknown action: {action}"})
|
||||
|
||||
|
||||
191
plugins/memory/holographic/dedup.py
Normal file
191
plugins/memory/holographic/dedup.py
Normal file
@@ -0,0 +1,191 @@
|
||||
"""Cross-tier memory deduplication.
|
||||
|
||||
Detects and resolves duplicate facts between MEMORY.md (built-in) and the
|
||||
holographic fact store. Facts should live in exactly one tier:
|
||||
|
||||
Tier 1 — MEMORY.md: Always-on context (compact, <50 entries ideal).
|
||||
Tier 2 — Fact store: Deep structured storage (unlimited, entity-aware).
|
||||
Tier 3 — MemPalace: Specialized long-form archives.
|
||||
|
||||
Ownership rules:
|
||||
- user_pref / project / tool facts → fact store (structured, searchable)
|
||||
- "always-on" operational notes → MEMORY.md (compact, system prompt)
|
||||
- When in doubt: fact store wins (it has dedup, trust scoring, FTS5)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SIMILARITY_THRESHOLD = 0.85
|
||||
|
||||
|
||||
@dataclass
|
||||
class DuplicatePair:
|
||||
memory_entry: str
|
||||
memory_index: int
|
||||
fact_store_id: int
|
||||
fact_store_content: str
|
||||
similarity: float
|
||||
resolution: str = ""
|
||||
resolved: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class DedupReport:
|
||||
total_memory_entries: int = 0
|
||||
total_facts: int = 0
|
||||
duplicates_found: int = 0
|
||||
pairs: List[DuplicatePair] = field(default_factory=list)
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [
|
||||
f"Cross-tier dedup: {self.total_memory_entries} MEMORY.md entries, "
|
||||
f"{self.total_facts} fact store entries, "
|
||||
f"{self.duplicates_found} duplicates found",
|
||||
]
|
||||
for p in self.pairs:
|
||||
status = f"[{p.resolution}]" if p.resolved else "[PENDING]"
|
||||
lines.append(
|
||||
f" {status} sim={p.similarity:.2f} "
|
||||
f"mem[{p.memory_index}]: {p.memory_entry[:60]} "
|
||||
f"<> fact#{p.fact_store_id}: {p.fact_store_content[:60]}"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _normalize(text: str) -> str:
|
||||
text = text.strip().lower()
|
||||
text = re.sub(r"^[\\s>*\\-\\u2022]+", "", text)
|
||||
text = re.sub(r"\\s+", " ", text)
|
||||
text = text.rstrip(".,;:!?",)
|
||||
return text
|
||||
|
||||
|
||||
def _similarity(a: str, b: str) -> float:
|
||||
if not a or not b:
|
||||
return 0.0
|
||||
return SequenceMatcher(None, a, b).ratio()
|
||||
|
||||
|
||||
def scan_cross_tier_duplicates(
|
||||
memory_entries: List[str],
|
||||
fact_store_facts: List[Dict[str, Any]],
|
||||
threshold: float = _SIMILARITY_THRESHOLD,
|
||||
) -> DedupReport:
|
||||
report = DedupReport(
|
||||
total_memory_entries=len(memory_entries),
|
||||
total_facts=len(fact_store_facts),
|
||||
)
|
||||
for i, mem_line in enumerate(memory_entries):
|
||||
mem_norm = _normalize(mem_line)
|
||||
if not mem_norm or len(mem_norm) < 10:
|
||||
continue
|
||||
for fact in fact_store_facts:
|
||||
fact_norm = _normalize(fact.get("content", ""))
|
||||
if not fact_norm or len(fact_norm) < 10:
|
||||
continue
|
||||
sim = _similarity(mem_norm, fact_norm)
|
||||
if sim >= threshold:
|
||||
report.pairs.append(DuplicatePair(
|
||||
memory_entry=mem_line,
|
||||
memory_index=i,
|
||||
fact_store_id=fact.get("fact_id", -1),
|
||||
fact_store_content=fact.get("content", ""),
|
||||
similarity=sim,
|
||||
))
|
||||
report.duplicates_found = len(report.pairs)
|
||||
return report
|
||||
|
||||
|
||||
def classify_tier(fact_content: str, category: str = "general") -> str:
|
||||
if category in ("user_pref", "project", "tool"):
|
||||
return "factstore"
|
||||
content = fact_content.strip()
|
||||
if len(content) < 80 and any(
|
||||
kw in content.lower() for kw in ("todo", "note:", "fix:", "remember:", "always", "never")
|
||||
):
|
||||
return "memory"
|
||||
return "factstore"
|
||||
|
||||
|
||||
def resolve_pair(pair: DuplicatePair) -> str:
|
||||
pair.resolution = "keep_factstore"
|
||||
pair.resolved = True
|
||||
return pair.resolution
|
||||
|
||||
|
||||
def resolve_duplicates(
|
||||
report: DedupReport,
|
||||
memory_entries: List[str],
|
||||
fact_store=None,
|
||||
) -> List[str]:
|
||||
indices_to_remove = set()
|
||||
for pair in report.pairs:
|
||||
resolve_pair(pair)
|
||||
if pair.resolution == "keep_factstore":
|
||||
indices_to_remove.add(pair.memory_index)
|
||||
elif pair.resolution == "keep_memory" and fact_store:
|
||||
try:
|
||||
fact_store.remove_fact(pair.fact_store_id)
|
||||
except Exception as e:
|
||||
logger.debug("Failed to remove fact %d: %s", pair.fact_store_id, e)
|
||||
cleaned = [e for i, e in enumerate(memory_entries) if i not in indices_to_remove]
|
||||
removed = len(memory_entries) - len(cleaned)
|
||||
if removed:
|
||||
logger.info("Dedup removed %d duplicate entries from MEMORY.md", removed)
|
||||
return cleaned
|
||||
|
||||
|
||||
def is_duplicate_before_add(
|
||||
content: str,
|
||||
existing_facts: List[Dict[str, Any]],
|
||||
threshold: float = _SIMILARITY_THRESHOLD,
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Check if content is a duplicate of an existing fact BEFORE adding.
|
||||
|
||||
Returns the matching fact dict if duplicate, None otherwise.
|
||||
Used by on_memory_write to prevent cross-tier duplication at write time.
|
||||
"""
|
||||
content_norm = _normalize(content)
|
||||
if not content_norm or len(content_norm) < 10:
|
||||
return None
|
||||
for fact in existing_facts:
|
||||
fact_norm = _normalize(fact.get("content", ""))
|
||||
if not fact_norm or len(fact_norm) < 10:
|
||||
continue
|
||||
if _similarity(content_norm, fact_norm) >= threshold:
|
||||
return fact
|
||||
return None
|
||||
|
||||
|
||||
TIER_OWNERSHIP_DOC = """# Memory Tier Ownership
|
||||
|
||||
Each fact lives in exactly one tier to prevent duplicate tokens and stale-data divergence.
|
||||
|
||||
## Tier 1 — MEMORY.md (built-in)
|
||||
- Always-on system prompt context (compact, <50 entries ideal).
|
||||
- Operational notes, active task state, immediate context.
|
||||
- Managed by: `memory` tool.
|
||||
|
||||
## Tier 2 — Fact Store (holographic)
|
||||
- Deep structured storage with search and reasoning.
|
||||
- user_pref, project, tool facts; entity-linked knowledge.
|
||||
- Managed by: `fact_store` tool.
|
||||
- Has: FTS5 search, trust scoring, entity resolution.
|
||||
|
||||
## Tier 3 — MemPalace
|
||||
- Specialized long-form archives and research.
|
||||
|
||||
## Rules
|
||||
- MEMORY.md entries >100 chars → migrate to fact store.
|
||||
- Structured categories (user_pref, project, tool) → fact store.
|
||||
- Duplicate across tiers: fact store wins (it has trust scoring).
|
||||
- `on_memory_write` checks fact store before mirroring.
|
||||
"""
|
||||
178
tests/test_memory_dedup.py
Normal file
178
tests/test_memory_dedup.py
Normal file
@@ -0,0 +1,178 @@
|
||||
"""Tests for cross-tier memory deduplication.
|
||||
|
||||
Tests the dedup module's normalize, similarity, scan, resolve, and
|
||||
is_duplicate_before_add functions.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Add the plugins path so we can import dedup
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
|
||||
|
||||
from dedup import (
|
||||
_normalize,
|
||||
_similarity,
|
||||
scan_cross_tier_duplicates,
|
||||
resolve_duplicates,
|
||||
is_duplicate_before_add,
|
||||
classify_tier,
|
||||
DedupReport,
|
||||
DuplicatePair,
|
||||
)
|
||||
|
||||
|
||||
class TestNormalize:
|
||||
def test_basic_lowercasing(self):
|
||||
assert _normalize("Hello World") == "hello world"
|
||||
|
||||
def test_strips_markdown_bullets(self):
|
||||
assert _normalize("- some fact") == "some fact"
|
||||
assert _normalize("* some fact") == "some fact"
|
||||
assert _normalize(" - some fact ") == "some fact"
|
||||
|
||||
def test_strips_trailing_punctuation(self):
|
||||
assert _normalize("some fact.") == "some fact"
|
||||
assert _normalize("some fact,") == "some fact"
|
||||
assert _normalize("some fact;") == "some fact"
|
||||
|
||||
def test_collapses_whitespace(self):
|
||||
assert _normalize("some fact here") == "some fact here"
|
||||
|
||||
def test_empty_and_short(self):
|
||||
assert _normalize("") == ""
|
||||
assert _normalize(" ") == ""
|
||||
assert _normalize("abc") == "abc"
|
||||
|
||||
|
||||
class TestSimilarity:
|
||||
def test_identical_strings(self):
|
||||
assert _similarity("hello world", "hello world") == 1.0
|
||||
|
||||
def test_completely_different(self):
|
||||
assert _similarity("abc", "xyz") < 0.3
|
||||
|
||||
def test_similar_rephrasing(self):
|
||||
sim = _similarity("deploy via ansible", "deploy with ansible")
|
||||
assert sim > 0.7
|
||||
|
||||
def test_empty_strings(self):
|
||||
assert _similarity("", "hello") == 0.0
|
||||
assert _similarity("hello", "") == 0.0
|
||||
assert _similarity("", "") == 0.0
|
||||
|
||||
|
||||
class TestScanCrossTierDuplicates:
|
||||
def test_no_duplicates(self):
|
||||
memory = ["Deploy via Ansible", "Use mimo-v2-pro model"]
|
||||
facts = [
|
||||
{"fact_id": 1, "content": "User prefers dark mode"},
|
||||
{"fact_id": 2, "content": "Project uses Python 3.11"},
|
||||
]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 0
|
||||
assert len(report.pairs) == 0
|
||||
|
||||
def test_exact_duplicate(self):
|
||||
memory = ["Deploy via Ansible"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 1
|
||||
assert report.pairs[0].similarity == 1.0
|
||||
assert report.pairs[0].fact_store_id == 1
|
||||
|
||||
def test_near_duplicate_above_threshold(self):
|
||||
memory = ["Alexander prefers action over narration"]
|
||||
facts = [{"fact_id": 1, "content": "Alexander prefers action over narration."}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 1
|
||||
|
||||
def test_below_threshold_not_duplicate(self):
|
||||
memory = ["Deploy via Ansible on VPS"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Docker on local machine"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts, threshold=0.85)
|
||||
assert report.duplicates_found == 0
|
||||
|
||||
def test_short_entries_skipped(self):
|
||||
memory = ["OK", "ab"]
|
||||
facts = [{"fact_id": 1, "content": "OK"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 0
|
||||
|
||||
def test_multiple_duplicates(self):
|
||||
memory = ["Fact A here", "Fact B here"]
|
||||
facts = [
|
||||
{"fact_id": 1, "content": "Fact A here"},
|
||||
{"fact_id": 2, "content": "Fact B here"},
|
||||
]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
assert report.duplicates_found == 2
|
||||
|
||||
def test_report_summary(self):
|
||||
memory = ["Deploy via Ansible"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
summary = report.summary()
|
||||
assert "1 MEMORY.md entries" in summary
|
||||
assert "1 fact store entries" in summary
|
||||
assert "1 duplicates" in summary
|
||||
|
||||
|
||||
class TestResolveDuplicates:
|
||||
def test_removes_memory_duplicates(self):
|
||||
memory = ["Deploy via Ansible", "Use Python 3.11"]
|
||||
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
cleaned = resolve_duplicates(report, memory)
|
||||
assert len(cleaned) == 1
|
||||
assert cleaned[0] == "Use Python 3.11"
|
||||
|
||||
def test_no_duplicates_returns_same(self):
|
||||
memory = ["Deploy via Ansible", "Use Python 3.11"]
|
||||
facts = [{"fact_id": 1, "content": "Completely different fact"}]
|
||||
report = scan_cross_tier_duplicates(memory, facts)
|
||||
cleaned = resolve_duplicates(report, memory)
|
||||
assert len(cleaned) == 2
|
||||
|
||||
|
||||
class TestIsDuplicateBeforeAdd:
|
||||
def test_finds_duplicate(self):
|
||||
existing = [{"fact_id": 1, "content": "Deploy via Ansible"}]
|
||||
result = is_duplicate_before_add("Deploy via Ansible", existing)
|
||||
assert result is not None
|
||||
assert result["fact_id"] == 1
|
||||
|
||||
def test_no_duplicate_returns_none(self):
|
||||
existing = [{"fact_id": 1, "content": "Use dark mode"}]
|
||||
result = is_duplicate_before_add("Deploy via Ansible", existing)
|
||||
assert result is None
|
||||
|
||||
def test_short_content_returns_none(self):
|
||||
existing = [{"fact_id": 1, "content": "OK"}]
|
||||
result = is_duplicate_before_add("OK", existing)
|
||||
assert result is None
|
||||
|
||||
def test_empty_existing_returns_none(self):
|
||||
result = is_duplicate_before_add("Some fact here", [])
|
||||
assert result is None
|
||||
|
||||
|
||||
class TestClassifyTier:
|
||||
def test_user_pref_goes_to_factstore(self):
|
||||
assert classify_tier("anything", "user_pref") == "factstore"
|
||||
|
||||
def test_project_goes_to_factstore(self):
|
||||
assert classify_tier("anything", "project") == "factstore"
|
||||
|
||||
def test_short_operational_note_goes_to_memory(self):
|
||||
assert classify_tier("remember: always use sudo") == "memory"
|
||||
assert classify_tier("todo: fix the deploy script") == "memory"
|
||||
|
||||
def test_long_fact_goes_to_factstore(self):
|
||||
long_fact = "The deployment process requires running ansible-playbook with the production inventory file and verifying health checks after completion"
|
||||
assert classify_tier(long_fact) == "factstore"
|
||||
|
||||
def test_general_short_goes_to_factstore(self):
|
||||
# Short but not operational
|
||||
assert classify_tier("user likes dark mode") == "factstore"
|
||||
Reference in New Issue
Block a user