Compare commits
3 Commits
allegro/re
...
fix/749-ba
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9f0c410481 | ||
|
|
30afd529ac | ||
|
|
a244b157be |
@@ -1,172 +0,0 @@
|
||||
# Vector Database SOTA Research Report
|
||||
## For AI Agent Semantic Retrieval — April 2026
|
||||
|
||||
---
|
||||
|
||||
## Executive Summary
|
||||
|
||||
Analysis of current vector database benchmarks, documentation, and production deployments for semantic retrieval in AI agents. Compared against existing Hermes session_search (SQLite FTS5) and holographic memory systems.
|
||||
|
||||
---
|
||||
|
||||
## 1. Retrieval Accuracy (Recall@10)
|
||||
|
||||
| Database | HNSW Recall | IVF Recall | Notes |
|
||||
|----------|-------------|------------|-------|
|
||||
| **Qdrant** | 0.95-0.99 | N/A | Tunable via ef parameter |
|
||||
| **Milvus** | 0.95-0.99 | 0.85-0.95 | Multiple index support |
|
||||
| **Weaviate** | 0.95-0.98 | N/A | HNSW primary |
|
||||
| **Pinecone** | 0.95-0.99 | N/A | Managed, opaque tuning |
|
||||
| **ChromaDB** | 0.90-0.95 | N/A | Simpler, uses HNSW via hnswlib |
|
||||
| **pgvector** | 0.85-0.95 | 0.80-0.90 | Depends on tuning |
|
||||
| **SQLite-vss** | 0.80-0.90 | N/A | HNSW via sqlite-vss |
|
||||
| **Current FTS5** | ~0.60-0.75* | N/A | Keyword matching only |
|
||||
|
||||
*FTS5 "recall" estimated: good for exact keywords, poor for semantic/paraphrased queries.
|
||||
|
||||
---
|
||||
|
||||
## 2. Latency Benchmarks (1M vectors, 768-dim, 10 neighbors)
|
||||
|
||||
| Database | p50 (ms) | p99 (ms) | QPS | Notes |
|
||||
|----------|----------|----------|-----|-------|
|
||||
| **Qdrant** | 1-3 | 5-10 | 5,000-15,000 | Best self-hosted |
|
||||
| **Milvus** | 2-5 | 8-15 | 3,000-12,000 | Good distributed |
|
||||
| **Weaviate** | 3-8 | 10-25 | 2,000-8,000 | |
|
||||
| **Pinecone** | 5-15 | 20-50 | 1,000-5,000 | Managed overhead |
|
||||
| **ChromaDB** | 5-15 | 20-50 | 500-2,000 | Embedded mode |
|
||||
| **pgvector** | 10-50 | 50-200 | 200-1,000 | SQL overhead |
|
||||
| **SQLite-vss** | 10-30 | 50-150 | 300-800 | Limited scalability |
|
||||
| **Current FTS5** | 2-10 | 15-50 | 1,000-5,000 | No embedding cost |
|
||||
|
||||
---
|
||||
|
||||
## 3. Index Types Comparison
|
||||
|
||||
### HNSW (Hierarchical Navigable Small World)
|
||||
- Best for: High recall, moderate memory, fast queries
|
||||
- Used by: Qdrant, Weaviate, ChromaDB, Milvus, pgvector, SQLite-vss
|
||||
- Memory: High (~1.5GB per 1M 768-dim vectors)
|
||||
- Key parameters: ef_construction (100-500), M (16-64), ef (64-256)
|
||||
|
||||
### IVF (Inverted File Index)
|
||||
- Best for: Large datasets, memory-constrained
|
||||
- Used by: Milvus, pgvector
|
||||
- Memory: Lower (~0.5GB per 1M vectors)
|
||||
- Key parameters: nlist (100-10000), nprobe (10-100)
|
||||
|
||||
### DiskANN / SPANN
|
||||
- Best for: 100M+ vectors on disk
|
||||
- Memory: Very low (~100MB index)
|
||||
|
||||
### Quantization (SQ/PQ)
|
||||
- Memory reduction: 4-8x
|
||||
- Recall impact: -5-15%
|
||||
|
||||
---
|
||||
|
||||
## 4. Multi-Modal Support
|
||||
|
||||
| Database | Text | Image | Audio | Video | Mixed Queries |
|
||||
|----------|------|-------|-------|-------|---------------|
|
||||
| Qdrant | ✅ | ✅ | ✅ | ✅ | ✅ (multi-vector) |
|
||||
| Milvus | ✅ | ✅ | ✅ | ✅ | ✅ (hybrid) |
|
||||
| Weaviate | ✅ | ✅ | ✅ | ✅ | ✅ (named vectors) |
|
||||
| Pinecone | ✅ | ✅ | ✅ | ✅ | Limited |
|
||||
| ChromaDB | ✅ | Via emb | Via emb | Via emb | Limited |
|
||||
| pgvector | ✅ | Via emb | Via emb | Via emb | Limited |
|
||||
| SQLite-vss | ✅ | Via emb | Via emb | Via emb | Limited |
|
||||
|
||||
---
|
||||
|
||||
## 5. Integration Patterns for AI Agents
|
||||
|
||||
### Pattern A: Direct Search
|
||||
Query → Embedding → Vector DB → Top-K → LLM
|
||||
|
||||
### Pattern B: Hybrid Search
|
||||
Query → BM25 + Vector → Merge/Rerank → LLM
|
||||
|
||||
### Pattern C: Multi-Stage
|
||||
Query → Vector DB (top-100) → Reranker (top-10) → LLM
|
||||
|
||||
### Pattern D: Agent Memory with Trust + Decay
|
||||
Query → Vector → Score × Trust × Decay → Top-K → Summarize
|
||||
|
||||
---
|
||||
|
||||
## 6. Comparison with Current Systems
|
||||
|
||||
### session_search (FTS5)
|
||||
Strengths: Zero deps, no embedding needed, fast for exact keywords
|
||||
Limitations: No semantic understanding, no cross-lingual, limited ranking
|
||||
|
||||
### holographic/retrieval.py (HRR)
|
||||
Strengths: Compositional queries, contradiction detection, trust + decay
|
||||
Limitations: Requires numpy, O(n) scan, non-standard embedding space
|
||||
|
||||
### Expected Gains from Vector DB:
|
||||
- Semantic recall: +30-50% for paraphrased queries
|
||||
- Cross-lingual: +60-80%
|
||||
- Fuzzy matching: +40-60%
|
||||
- Conceptual: +50-70%
|
||||
|
||||
---
|
||||
|
||||
## 7. Recommendations
|
||||
|
||||
### Option 1: Qdrant (RECOMMENDED)
|
||||
- Best self-hosted performance
|
||||
- Rust implementation, native multi-vector
|
||||
- Tradeoff: Separate service deployment
|
||||
|
||||
### Option 2: pgvector (CONSERVATIVE)
|
||||
- Zero new infrastructure if using PostgreSQL
|
||||
- Tradeoff: 5-10x slower than Qdrant
|
||||
|
||||
### Option 3: SQLite-vss (LIGHTWEIGHT)
|
||||
- Minimal changes, embedded deployment
|
||||
- Tradeoff: Limited scalability (<100K vectors)
|
||||
|
||||
### Option 4: Hybrid (BEST OF BOTH)
|
||||
Keep FTS5 + HRR and add Qdrant:
|
||||
- Vector (semantic) + FTS5 (keyword) + HRR (compositional)
|
||||
- Apply trust scoring + temporal decay
|
||||
|
||||
---
|
||||
|
||||
## 8. Embedding Models (2025-2026)
|
||||
|
||||
| Model | Dimensions | Quality | Cost |
|
||||
|-------|-----------|---------|------|
|
||||
| OpenAI text-embedding-3-large | 3072 | Best | $$$ |
|
||||
| OpenAI text-embedding-3-small | 1536 | Good | $ |
|
||||
| BGE-M3 | 1024 | Best self-hosted | Free |
|
||||
| GTE-Qwen2 | 768-1024 | Good | Free |
|
||||
|
||||
---
|
||||
|
||||
## 9. Hardware Requirements (1M vectors, 768-dim)
|
||||
|
||||
| Database | RAM (HNSW) | RAM (Quantized) |
|
||||
|----------|-----------|-----------------|
|
||||
| Qdrant | 8-16GB | 2-4GB |
|
||||
| Milvus | 16-32GB | 4-8GB |
|
||||
| pgvector | 4-8GB | N/A |
|
||||
| SQLite-vss | 2-4GB | N/A |
|
||||
|
||||
---
|
||||
|
||||
## 10. Conclusion
|
||||
|
||||
Primary: Qdrant with hybrid search (vector + FTS5 + HRR)
|
||||
Key insight: Augment existing HRR system, don't replace it.
|
||||
|
||||
Next steps:
|
||||
1. Deploy Qdrant in Docker for testing
|
||||
2. Benchmark embedding models
|
||||
3. Implement hybrid search prototype
|
||||
4. Measure recall improvement
|
||||
5. Evaluate operational complexity
|
||||
|
||||
Report: April 2026 | Sources: ANN-Benchmarks, VectorDBBench, official docs
|
||||
@@ -1,353 +0,0 @@
|
||||
"""Privacy Filter — strip PII from context before remote API calls.
|
||||
|
||||
Implements Vitalik's Pattern 2: "A local model can strip out private data
|
||||
before passing the query along to a remote LLM."
|
||||
|
||||
When Hermes routes a request to a cloud provider (Anthropic, OpenRouter, etc.),
|
||||
this module sanitizes the message context to remove personally identifiable
|
||||
information before it leaves the user's machine.
|
||||
|
||||
Threat model (from Vitalik's secure LLM architecture):
|
||||
- Privacy (other): Non-LLM data leakage via search queries, API calls
|
||||
- LLM accidents: LLM accidentally leaking private data in prompts
|
||||
- LLM jailbreaks: Remote content extracting private context
|
||||
|
||||
Usage:
|
||||
from agent.privacy_filter import PrivacyFilter, sanitize_messages
|
||||
|
||||
pf = PrivacyFilter()
|
||||
safe_messages = pf.sanitize_messages(messages)
|
||||
# safe_messages has PII replaced with [REDACTED] tokens
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum, auto
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Sensitivity(Enum):
|
||||
"""Classification of content sensitivity."""
|
||||
PUBLIC = auto() # No PII detected
|
||||
LOW = auto() # Generic references (e.g., city names)
|
||||
MEDIUM = auto() # Personal identifiers (name, email, phone)
|
||||
HIGH = auto() # Secrets, keys, financial data, medical info
|
||||
CRITICAL = auto() # Crypto keys, passwords, SSN patterns
|
||||
|
||||
|
||||
@dataclass
|
||||
class RedactionReport:
|
||||
"""Summary of what was redacted from a message batch."""
|
||||
total_messages: int = 0
|
||||
redacted_messages: int = 0
|
||||
redactions: List[Dict[str, Any]] = field(default_factory=list)
|
||||
max_sensitivity: Sensitivity = Sensitivity.PUBLIC
|
||||
|
||||
@property
|
||||
def had_redactions(self) -> bool:
|
||||
return self.redacted_messages > 0
|
||||
|
||||
def summary(self) -> str:
|
||||
if not self.had_redactions:
|
||||
return "No PII detected — context is clean for remote query."
|
||||
parts = [f"Redacted {self.redacted_messages}/{self.total_messages} messages:"]
|
||||
for r in self.redactions[:10]:
|
||||
parts.append(f" - {r['type']}: {r['count']} occurrence(s)")
|
||||
if len(self.redactions) > 10:
|
||||
parts.append(f" ... and {len(self.redactions) - 10} more types")
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# PII pattern definitions
|
||||
# =========================================================================
|
||||
|
||||
# Each pattern is (compiled_regex, redaction_type, sensitivity_level, replacement)
|
||||
_PII_PATTERNS: List[Tuple[re.Pattern, str, Sensitivity, str]] = []
|
||||
|
||||
|
||||
def _compile_patterns() -> None:
|
||||
"""Compile PII detection patterns. Called once at module init."""
|
||||
global _PII_PATTERNS
|
||||
if _PII_PATTERNS:
|
||||
return
|
||||
|
||||
raw_patterns = [
|
||||
# --- CRITICAL: secrets and credentials ---
|
||||
(
|
||||
r'(?:api[_-]?key|apikey|secret[_-]?key|access[_-]?token)\s*[:=]\s*["\']?([A-Za-z0-9_\-\.]{20,})["\']?',
|
||||
"api_key_or_token",
|
||||
Sensitivity.CRITICAL,
|
||||
"[REDACTED-API-KEY]",
|
||||
),
|
||||
(
|
||||
r'\b(?:sk-|sk_|pk_|rk_|ak_)[A-Za-z0-9]{20,}\b',
|
||||
"prefixed_secret",
|
||||
Sensitivity.CRITICAL,
|
||||
"[REDACTED-SECRET]",
|
||||
),
|
||||
(
|
||||
r'\b(?:ghp_|gho_|ghu_|ghs_|ghr_)[A-Za-z0-9]{36,}\b',
|
||||
"github_token",
|
||||
Sensitivity.CRITICAL,
|
||||
"[REDACTED-GITHUB-TOKEN]",
|
||||
),
|
||||
(
|
||||
r'\b(?:xox[bposa]-[A-Za-z0-9\-]+)\b',
|
||||
"slack_token",
|
||||
Sensitivity.CRITICAL,
|
||||
"[REDACTED-SLACK-TOKEN]",
|
||||
),
|
||||
(
|
||||
r'(?:password|passwd|pwd)\s*[:=]\s*["\']?([^\s"\']{4,})["\']?',
|
||||
"password",
|
||||
Sensitivity.CRITICAL,
|
||||
"[REDACTED-PASSWORD]",
|
||||
),
|
||||
(
|
||||
r'(?:-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----)',
|
||||
"private_key_block",
|
||||
Sensitivity.CRITICAL,
|
||||
"[REDACTED-PRIVATE-KEY]",
|
||||
),
|
||||
# Ethereum / crypto addresses (42-char hex starting with 0x)
|
||||
(
|
||||
r'\b0x[a-fA-F0-9]{40}\b',
|
||||
"ethereum_address",
|
||||
Sensitivity.HIGH,
|
||||
"[REDACTED-ETH-ADDR]",
|
||||
),
|
||||
# Bitcoin addresses (base58, 25-34 chars starting with 1/3/bc1)
|
||||
(
|
||||
r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b',
|
||||
"bitcoin_address",
|
||||
Sensitivity.HIGH,
|
||||
"[REDACTED-BTC-ADDR]",
|
||||
),
|
||||
(
|
||||
r'\bbc1[a-zA-HJ-NP-Z0-9]{39,59}\b',
|
||||
"bech32_address",
|
||||
Sensitivity.HIGH,
|
||||
"[REDACTED-BTC-ADDR]",
|
||||
),
|
||||
# --- HIGH: financial ---
|
||||
(
|
||||
r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
|
||||
"credit_card_number",
|
||||
Sensitivity.HIGH,
|
||||
"[REDACTED-CC]",
|
||||
),
|
||||
(
|
||||
r'\b\d{3}-\d{2}-\d{4}\b',
|
||||
"us_ssn",
|
||||
Sensitivity.HIGH,
|
||||
"[REDACTED-SSN]",
|
||||
),
|
||||
# --- MEDIUM: personal identifiers ---
|
||||
# Email addresses
|
||||
(
|
||||
r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b',
|
||||
"email_address",
|
||||
Sensitivity.MEDIUM,
|
||||
"[REDACTED-EMAIL]",
|
||||
),
|
||||
# Phone numbers (US/international patterns)
|
||||
(
|
||||
r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
|
||||
"phone_number_us",
|
||||
Sensitivity.MEDIUM,
|
||||
"[REDACTED-PHONE]",
|
||||
),
|
||||
(
|
||||
r'\b\+\d{1,3}[-.\s]?\d{4,14}\b',
|
||||
"phone_number_intl",
|
||||
Sensitivity.MEDIUM,
|
||||
"[REDACTED-PHONE]",
|
||||
),
|
||||
# Filesystem paths that reveal user identity
|
||||
(
|
||||
r'(?:/Users/|/home/|C:\\Users\\)([A-Za-z0-9_\-]+)',
|
||||
"user_home_path",
|
||||
Sensitivity.MEDIUM,
|
||||
r"/Users/[REDACTED-USER]",
|
||||
),
|
||||
# --- LOW: environment / system info ---
|
||||
# Internal IPs
|
||||
(
|
||||
r'\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3})\b',
|
||||
"internal_ip",
|
||||
Sensitivity.LOW,
|
||||
"[REDACTED-IP]",
|
||||
),
|
||||
]
|
||||
|
||||
_PII_PATTERNS = [
|
||||
(re.compile(pattern, re.IGNORECASE), rtype, sensitivity, replacement)
|
||||
for pattern, rtype, sensitivity, replacement in raw_patterns
|
||||
]
|
||||
|
||||
|
||||
_compile_patterns()
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Sensitive file path patterns (context-aware)
|
||||
# =========================================================================
|
||||
|
||||
_SENSITIVE_PATH_PATTERNS = [
|
||||
re.compile(r'\.(?:env|pem|key|p12|pfx|jks|keystore)\b', re.IGNORECASE),
|
||||
re.compile(r'(?:\.ssh/|\.gnupg/|\.aws/|\.config/gcloud/)', re.IGNORECASE),
|
||||
re.compile(r'(?:wallet|keystore|seed|mnemonic)', re.IGNORECASE),
|
||||
re.compile(r'(?:\.hermes/\.env)', re.IGNORECASE),
|
||||
]
|
||||
|
||||
|
||||
def _classify_path_sensitivity(path: str) -> Sensitivity:
|
||||
"""Check if a file path references sensitive material."""
|
||||
for pat in _SENSITIVE_PATH_PATTERNS:
|
||||
if pat.search(path):
|
||||
return Sensitivity.HIGH
|
||||
return Sensitivity.PUBLIC
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Core filtering
|
||||
# =========================================================================
|
||||
|
||||
class PrivacyFilter:
|
||||
"""Strip PII from message context before remote API calls.
|
||||
|
||||
Integrates with the agent's message pipeline. Call sanitize_messages()
|
||||
before sending context to any cloud LLM provider.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
min_sensitivity: Sensitivity = Sensitivity.MEDIUM,
|
||||
aggressive_mode: bool = False,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
min_sensitivity: Only redact PII at or above this level.
|
||||
Default MEDIUM — redacts emails, phones, paths but not IPs.
|
||||
aggressive_mode: If True, also redact file paths and internal IPs.
|
||||
"""
|
||||
self.min_sensitivity = (
|
||||
Sensitivity.LOW if aggressive_mode else min_sensitivity
|
||||
)
|
||||
self.aggressive_mode = aggressive_mode
|
||||
|
||||
def sanitize_text(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
|
||||
"""Sanitize a single text string. Returns (cleaned_text, redaction_list)."""
|
||||
redactions = []
|
||||
cleaned = text
|
||||
|
||||
for pattern, rtype, sensitivity, replacement in _PII_PATTERNS:
|
||||
if sensitivity.value < self.min_sensitivity.value:
|
||||
continue
|
||||
|
||||
matches = pattern.findall(cleaned)
|
||||
if matches:
|
||||
count = len(matches) if isinstance(matches[0], str) else sum(
|
||||
1 for m in matches if m
|
||||
)
|
||||
if count > 0:
|
||||
cleaned = pattern.sub(replacement, cleaned)
|
||||
redactions.append({
|
||||
"type": rtype,
|
||||
"sensitivity": sensitivity.name,
|
||||
"count": count,
|
||||
})
|
||||
|
||||
return cleaned, redactions
|
||||
|
||||
def sanitize_messages(
|
||||
self, messages: List[Dict[str, Any]]
|
||||
) -> Tuple[List[Dict[str, Any]], RedactionReport]:
|
||||
"""Sanitize a list of OpenAI-format messages.
|
||||
|
||||
Returns (safe_messages, report). System messages are NOT sanitized
|
||||
(they're typically static prompts). Only user and assistant messages
|
||||
with string content are processed.
|
||||
|
||||
Args:
|
||||
messages: List of {"role": ..., "content": ...} dicts.
|
||||
|
||||
Returns:
|
||||
Tuple of (sanitized_messages, redaction_report).
|
||||
"""
|
||||
report = RedactionReport(total_messages=len(messages))
|
||||
safe_messages = []
|
||||
|
||||
for msg in messages:
|
||||
role = msg.get("role", "")
|
||||
content = msg.get("content", "")
|
||||
|
||||
# Only sanitize user/assistant string content
|
||||
if role in ("user", "assistant") and isinstance(content, str) and content:
|
||||
cleaned, redactions = self.sanitize_text(content)
|
||||
if redactions:
|
||||
report.redacted_messages += 1
|
||||
report.redactions.extend(redactions)
|
||||
# Track max sensitivity
|
||||
for r in redactions:
|
||||
s = Sensitivity[r["sensitivity"]]
|
||||
if s.value > report.max_sensitivity.value:
|
||||
report.max_sensitivity = s
|
||||
safe_msg = {**msg, "content": cleaned}
|
||||
safe_messages.append(safe_msg)
|
||||
logger.info(
|
||||
"Privacy filter: redacted %d PII type(s) from %s message",
|
||||
len(redactions), role,
|
||||
)
|
||||
else:
|
||||
safe_messages.append(msg)
|
||||
else:
|
||||
safe_messages.append(msg)
|
||||
|
||||
return safe_messages, report
|
||||
|
||||
def should_use_local_only(self, text: str) -> Tuple[bool, str]:
|
||||
"""Determine if content is too sensitive for any remote call.
|
||||
|
||||
Returns (should_block, reason). If True, the content should only
|
||||
be processed by a local model.
|
||||
"""
|
||||
_, redactions = self.sanitize_text(text)
|
||||
|
||||
critical_count = sum(
|
||||
1 for r in redactions
|
||||
if Sensitivity[r["sensitivity"]] == Sensitivity.CRITICAL
|
||||
)
|
||||
high_count = sum(
|
||||
1 for r in redactions
|
||||
if Sensitivity[r["sensitivity"]] == Sensitivity.HIGH
|
||||
)
|
||||
|
||||
if critical_count > 0:
|
||||
return True, f"Contains {critical_count} critical-secret pattern(s) — local-only"
|
||||
if high_count >= 3:
|
||||
return True, f"Contains {high_count} high-sensitivity pattern(s) — local-only"
|
||||
return False, ""
|
||||
|
||||
|
||||
def sanitize_messages(
|
||||
messages: List[Dict[str, Any]],
|
||||
min_sensitivity: Sensitivity = Sensitivity.MEDIUM,
|
||||
aggressive: bool = False,
|
||||
) -> Tuple[List[Dict[str, Any]], RedactionReport]:
|
||||
"""Convenience function: sanitize messages with default settings."""
|
||||
pf = PrivacyFilter(min_sensitivity=min_sensitivity, aggressive_mode=aggressive)
|
||||
return pf.sanitize_messages(messages)
|
||||
|
||||
|
||||
def quick_sanitize(text: str) -> str:
|
||||
"""Quick sanitize a single string — returns cleaned text only."""
|
||||
pf = PrivacyFilter()
|
||||
cleaned, _ = pf.sanitize_text(text)
|
||||
return cleaned
|
||||
@@ -1,177 +0,0 @@
|
||||
"""Tool Orchestrator — Robust execution and circuit breaking for agent tools.
|
||||
|
||||
Provides a unified execution service that wraps the tool registry.
|
||||
Implements the Circuit Breaker pattern to prevent the agent from getting
|
||||
stuck in failure loops when a specific tool or its underlying service
|
||||
is flapping or down.
|
||||
|
||||
Architecture:
|
||||
Discovery (tools/registry.py) -> Orchestration (agent/tool_orchestrator.py) -> Dispatch
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import logging
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from tools.registry import registry
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CircuitState:
|
||||
"""States for the tool circuit breaker."""
|
||||
CLOSED = "closed" # Normal operation
|
||||
OPEN = "open" # Failing, execution blocked
|
||||
HALF_OPEN = "half_open" # Testing if service recovered
|
||||
|
||||
|
||||
@dataclass
|
||||
class ToolStats:
|
||||
"""Execution statistics for a tool."""
|
||||
name: str
|
||||
state: str = CircuitState.CLOSED
|
||||
failures: int = 0
|
||||
successes: int = 0
|
||||
last_failure_time: float = 0
|
||||
total_execution_time: float = 0
|
||||
call_count: int = 0
|
||||
|
||||
|
||||
class ToolOrchestrator:
|
||||
"""Orchestrates tool execution with robustness patterns."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
failure_threshold: int = 3,
|
||||
reset_timeout: int = 300,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
failure_threshold: Number of failures before opening the circuit.
|
||||
reset_timeout: Seconds to wait before transitioning from OPEN to HALF_OPEN.
|
||||
"""
|
||||
self.failure_threshold = failure_threshold
|
||||
self.reset_timeout = reset_timeout
|
||||
self._stats: Dict[str, ToolStats] = {}
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _get_stats(self, name: str) -> ToolStats:
|
||||
"""Get or initialize stats for a tool with thread-safe state transition."""
|
||||
with self._lock:
|
||||
if name not in self._stats:
|
||||
self._stats[name] = ToolStats(name=name)
|
||||
|
||||
stats = self._stats[name]
|
||||
|
||||
# Transition from OPEN to HALF_OPEN if timeout expired
|
||||
if stats.state == CircuitState.OPEN:
|
||||
if time.time() - stats.last_failure_time > self.reset_timeout:
|
||||
stats.state = CircuitState.HALF_OPEN
|
||||
logger.info("Circuit breaker HALF_OPEN for tool: %s", name)
|
||||
|
||||
return stats
|
||||
|
||||
def _record_success(self, name: str, execution_time: float):
|
||||
"""Record a successful tool execution and close the circuit."""
|
||||
with self._lock:
|
||||
stats = self._stats[name]
|
||||
stats.successes += 1
|
||||
stats.call_count += 1
|
||||
stats.total_execution_time += execution_time
|
||||
|
||||
if stats.state != CircuitState.CLOSED:
|
||||
logger.info("Circuit breaker CLOSED for tool: %s (recovered)", name)
|
||||
|
||||
stats.state = CircuitState.CLOSED
|
||||
stats.failures = 0
|
||||
|
||||
def _record_failure(self, name: str, execution_time: float):
|
||||
"""Record a failed tool execution and potentially open the circuit."""
|
||||
with self._lock:
|
||||
stats = self._stats[name]
|
||||
stats.failures += 1
|
||||
stats.call_count += 1
|
||||
stats.total_execution_time += execution_time
|
||||
stats.last_failure_time = time.time()
|
||||
|
||||
if stats.state == CircuitState.HALF_OPEN or stats.failures >= self.failure_threshold:
|
||||
stats.state = CircuitState.OPEN
|
||||
logger.warning(
|
||||
"Circuit breaker OPEN for tool: %s (failures: %d)",
|
||||
name, stats.failures
|
||||
)
|
||||
|
||||
def dispatch(self, name: str, args: dict, **kwargs) -> str:
|
||||
"""Execute a tool via the registry with circuit breaker protection."""
|
||||
stats = self._get_stats(name)
|
||||
|
||||
if stats.state == CircuitState.OPEN:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Tool '{name}' is temporarily unavailable due to repeated failures. "
|
||||
f"Circuit breaker is OPEN. Please try again in a few minutes or use an alternative tool."
|
||||
),
|
||||
"circuit_breaker": True,
|
||||
"tool_name": name
|
||||
})
|
||||
|
||||
start_time = time.time()
|
||||
try:
|
||||
# Dispatch to the underlying registry
|
||||
result_str = registry.dispatch(name, args, **kwargs)
|
||||
execution_time = time.time() - start_time
|
||||
|
||||
# Inspect result for errors. registry.dispatch catches internal
|
||||
# exceptions and returns a JSON error string.
|
||||
is_error = False
|
||||
try:
|
||||
# Lightweight check for error key in JSON
|
||||
if '"error":' in result_str:
|
||||
res_json = json.loads(result_str)
|
||||
if isinstance(res_json, dict) and "error" in res_json:
|
||||
is_error = True
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
# If it's not valid JSON, it's a malformed result (error)
|
||||
is_error = True
|
||||
|
||||
if is_error:
|
||||
self._record_failure(name, execution_time)
|
||||
else:
|
||||
self._record_success(name, execution_time)
|
||||
|
||||
return result_str
|
||||
|
||||
except Exception as e:
|
||||
# This should rarely be hit as registry.dispatch catches most things,
|
||||
# but we guard against orchestrator-level or registry-level bugs.
|
||||
execution_time = time.time() - start_time
|
||||
self._record_failure(name, execution_time)
|
||||
|
||||
error_msg = f"Tool orchestrator error during {name}: {type(e).__name__}: {e}"
|
||||
logger.exception(error_msg)
|
||||
return json.dumps({
|
||||
"error": error_msg,
|
||||
"tool_name": name,
|
||||
"execution_time": execution_time
|
||||
})
|
||||
|
||||
def get_fleet_stats(self) -> Dict[str, Any]:
|
||||
"""Return execution statistics for all tools."""
|
||||
with self._lock:
|
||||
return {
|
||||
name: {
|
||||
"state": s.state,
|
||||
"failures": s.failures,
|
||||
"successes": s.successes,
|
||||
"avg_time": s.total_execution_time / s.call_count if s.call_count > 0 else 0,
|
||||
"calls": s.call_count
|
||||
}
|
||||
for name, s in self._stats.items()
|
||||
}
|
||||
|
||||
|
||||
# Global orchestrator instance
|
||||
orchestrator = ToolOrchestrator()
|
||||
40
benchmarks/gemma4-tool-calling-2026-04-13.md
Normal file
40
benchmarks/gemma4-tool-calling-2026-04-13.md
Normal file
@@ -0,0 +1,40 @@
|
||||
# Tool Call Benchmark: Gemma 4 vs mimo-v2-pro
|
||||
|
||||
Date: 2026-04-13
|
||||
Status: Awaiting execution
|
||||
|
||||
## Test Design
|
||||
|
||||
100 diverse tool calls across 7 categories:
|
||||
|
||||
| Category | Count | Tools Tested |
|
||||
|----------|-------|--------------|
|
||||
| File operations | 20 | read_file, write_file, search_files |
|
||||
| Terminal commands | 20 | terminal |
|
||||
| Web search | 15 | web_search |
|
||||
| Code execution | 15 | execute_code |
|
||||
| Browser automation | 10 | browser_navigate |
|
||||
| Delegation | 10 | delegate_task |
|
||||
| MCP tools | 10 | mcp_* |
|
||||
|
||||
## Metrics
|
||||
|
||||
| Metric | mimo-v2-pro | Gemma 4 |
|
||||
|--------|-------------|---------|
|
||||
| Schema parse success | — | — |
|
||||
| Tool execution success | — | — |
|
||||
| Parallel tool success | — | — |
|
||||
| Avg latency (s) | — | — |
|
||||
| Token cost per call | — | — |
|
||||
|
||||
## How to Run
|
||||
|
||||
```bash
|
||||
python3 benchmarks/tool_call_benchmark.py --model nous:xiaomi/mimo-v2-pro
|
||||
python3 benchmarks/tool_call_benchmark.py --model ollama/gemma4:latest
|
||||
python3 benchmarks/tool_call_benchmark.py --compare
|
||||
```
|
||||
|
||||
## Gemma 4-Specific Failure Modes
|
||||
|
||||
To be documented after benchmark execution.
|
||||
614
benchmarks/tool_call_benchmark.py
Normal file
614
benchmarks/tool_call_benchmark.py
Normal file
@@ -0,0 +1,614 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tool-Calling Benchmark — Gemma 4 vs mimo-v2-pro regression test.
|
||||
|
||||
Runs 100 diverse tool-calling prompts through multiple models and compares
|
||||
success rates, latency, and token costs.
|
||||
|
||||
Usage:
|
||||
python3 benchmarks/tool_call_benchmark.py # full 100-call suite
|
||||
python3 benchmarks/tool_call_benchmark.py --limit 10 # quick smoke test
|
||||
python3 benchmarks/tool_call_benchmark.py --models nous # single model
|
||||
python3 benchmarks/tool_call_benchmark.py --category file # single category
|
||||
|
||||
Requires: hermes-agent venv activated, OPENROUTER_API_KEY or equivalent.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import traceback
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
# Ensure hermes-agent root is importable
|
||||
REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||
sys.path.insert(0, str(REPO_ROOT))
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test Definitions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class ToolCall:
|
||||
"""A single tool-calling test case."""
|
||||
id: str
|
||||
category: str
|
||||
prompt: str
|
||||
expected_tool: str # tool name we expect the model to call
|
||||
expected_params_check: str = "" # substring expected in JSON args
|
||||
timeout: int = 30 # max seconds per call
|
||||
notes: str = ""
|
||||
|
||||
|
||||
# fmt: off
|
||||
SUITE: list[ToolCall] = [
|
||||
# ── File Operations (20) ──────────────────────────────────────────────
|
||||
ToolCall("file-01", "file", "Read the file /tmp/test_bench.txt and show me its contents.",
|
||||
"read_file", "path"),
|
||||
ToolCall("file-02", "file", "Write 'hello benchmark' to /tmp/test_bench_out.txt",
|
||||
"write_file", "path"),
|
||||
ToolCall("file-03", "file", "Search for the word 'import' in all Python files in the current directory.",
|
||||
"search_files", "pattern"),
|
||||
ToolCall("file-04", "file", "Read lines 1-20 of /etc/hosts",
|
||||
"read_file", "offset"),
|
||||
ToolCall("file-05", "file", "Patch /tmp/test_bench_out.txt: replace 'hello' with 'goodbye'",
|
||||
"patch", "old_string"),
|
||||
ToolCall("file-06", "file", "Search for files matching *.py in the current directory.",
|
||||
"search_files", "target"),
|
||||
ToolCall("file-07", "file", "Read the first 10 lines of /etc/passwd",
|
||||
"read_file", "limit"),
|
||||
ToolCall("file-08", "file", "Write a JSON config to /tmp/bench_config.json with key 'debug': true",
|
||||
"write_file", "content"),
|
||||
ToolCall("file-09", "file", "Search for 'def test_' in Python test files.",
|
||||
"search_files", "file_glob"),
|
||||
ToolCall("file-10", "file", "Read /tmp/bench_config.json and tell me what's in it.",
|
||||
"read_file", "bench_config"),
|
||||
ToolCall("file-11", "file", "Create a file /tmp/bench_readme.md with one line: '# Benchmark'",
|
||||
"write_file", "bench_readme"),
|
||||
ToolCall("file-12", "file", "Search for 'TODO' comments in all .py files.",
|
||||
"search_files", "TODO"),
|
||||
ToolCall("file-13", "file", "Read /tmp/bench_readme.md",
|
||||
"read_file", "bench_readme"),
|
||||
ToolCall("file-14", "file", "Patch /tmp/bench_readme.md: replace '# Benchmark' with '# Tool Benchmark'",
|
||||
"patch", "Tool Benchmark"),
|
||||
ToolCall("file-15", "file", "Write a Python one-liner to /tmp/bench_hello.py that prints hello.",
|
||||
"write_file", "bench_hello"),
|
||||
ToolCall("file-16", "file", "Search for all .json files in /tmp/.",
|
||||
"search_files", "json"),
|
||||
ToolCall("file-17", "file", "Read /tmp/bench_hello.py and verify it has print('hello').",
|
||||
"read_file", "bench_hello"),
|
||||
ToolCall("file-18", "file", "Patch /tmp/bench_hello.py to print 'hello world' instead of 'hello'.",
|
||||
"patch", "hello world"),
|
||||
ToolCall("file-19", "file", "List files matching 'bench*' in /tmp/.",
|
||||
"search_files", "bench"),
|
||||
ToolCall("file-20", "file", "Read /tmp/test_bench.txt again and summarize its contents.",
|
||||
"read_file", "test_bench"),
|
||||
|
||||
# ── Terminal Commands (20) ────────────────────────────────────────────
|
||||
ToolCall("term-01", "terminal", "Run `echo hello world` in the terminal.",
|
||||
"terminal", "echo"),
|
||||
ToolCall("term-02", "terminal", "Run `date` to get the current date and time.",
|
||||
"terminal", "date"),
|
||||
ToolCall("term-03", "terminal", "Run `uname -a` to get system information.",
|
||||
"terminal", "uname"),
|
||||
ToolCall("term-04", "terminal", "Run `pwd` to show the current directory.",
|
||||
"terminal", "pwd"),
|
||||
ToolCall("term-05", "terminal", "Run `ls -la /tmp/ | head -20` to list temp files.",
|
||||
"terminal", "head"),
|
||||
ToolCall("term-06", "terminal", "Run `whoami` to show the current user.",
|
||||
"terminal", "whoami"),
|
||||
ToolCall("term-07", "terminal", "Run `df -h` to show disk usage.",
|
||||
"terminal", "df"),
|
||||
ToolCall("term-08", "terminal", "Run `python3 --version` to check Python version.",
|
||||
"terminal", "python3"),
|
||||
ToolCall("term-09", "terminal", "Run `cat /etc/hostname` to get the hostname.",
|
||||
"terminal", "hostname"),
|
||||
ToolCall("term-10", "terminal", "Run `uptime` to see system uptime.",
|
||||
"terminal", "uptime"),
|
||||
ToolCall("term-11", "terminal", "Run `env | grep PATH` to show the PATH variable.",
|
||||
"terminal", "PATH"),
|
||||
ToolCall("term-12", "terminal", "Run `wc -l /etc/passwd` to count lines.",
|
||||
"terminal", "wc"),
|
||||
ToolCall("term-13", "terminal", "Run `echo $SHELL` to show the current shell.",
|
||||
"terminal", "SHELL"),
|
||||
ToolCall("term-14", "terminal", "Run `free -h || vm_stat` to check memory usage.",
|
||||
"terminal", "memory"),
|
||||
ToolCall("term-15", "terminal", "Run `id` to show user and group IDs.",
|
||||
"terminal", "id"),
|
||||
ToolCall("term-16", "terminal", "Run `hostname` to get the machine hostname.",
|
||||
"terminal", "hostname"),
|
||||
ToolCall("term-17", "terminal", "Run `echo {1..5}` to test brace expansion.",
|
||||
"terminal", "echo"),
|
||||
ToolCall("term-18", "terminal", "Run `seq 1 5` to generate a number sequence.",
|
||||
"terminal", "seq"),
|
||||
ToolCall("term-19", "terminal", "Run `python3 -c 'print(2+2)'` to compute 2+2.",
|
||||
"terminal", "print"),
|
||||
ToolCall("term-20", "terminal", "Run `ls -d /tmp/bench* 2>/dev/null | wc -l` to count bench files.",
|
||||
"terminal", "wc"),
|
||||
|
||||
# ── Code Execution (15) ──────────────────────────────────────────────
|
||||
ToolCall("code-01", "code", "Execute a Python script that computes factorial of 10.",
|
||||
"execute_code", "factorial"),
|
||||
ToolCall("code-02", "code", "Run Python to read /tmp/test_bench.txt and count its words.",
|
||||
"execute_code", "words"),
|
||||
ToolCall("code-03", "code", "Execute Python to generate the first 20 Fibonacci numbers.",
|
||||
"execute_code", "fibonacci"),
|
||||
ToolCall("code-04", "code", "Run Python to parse JSON from a string and print keys.",
|
||||
"execute_code", "json"),
|
||||
ToolCall("code-05", "code", "Execute Python to list all files in /tmp/ matching 'bench*'.",
|
||||
"execute_code", "glob"),
|
||||
ToolCall("code-06", "code", "Run Python to compute the sum of squares from 1 to 100.",
|
||||
"execute_code", "sum"),
|
||||
ToolCall("code-07", "code", "Execute Python to check if 'racecar' is a palindrome.",
|
||||
"execute_code", "palindrome"),
|
||||
ToolCall("code-08", "code", "Run Python to create a CSV string with 5 rows of sample data.",
|
||||
"execute_code", "csv"),
|
||||
ToolCall("code-09", "code", "Execute Python to sort a list [5,2,8,1,9] and print the result.",
|
||||
"execute_code", "sort"),
|
||||
ToolCall("code-10", "code", "Run Python to count lines in /etc/passwd.",
|
||||
"execute_code", "passwd"),
|
||||
ToolCall("code-11", "code", "Execute Python to hash the string 'benchmark' with SHA256.",
|
||||
"execute_code", "sha256"),
|
||||
ToolCall("code-12", "code", "Run Python to get the current UTC timestamp.",
|
||||
"execute_code", "utcnow"),
|
||||
ToolCall("code-13", "code", "Execute Python to convert 'hello world' to uppercase and reverse it.",
|
||||
"execute_code", "upper"),
|
||||
ToolCall("code-14", "code", "Run Python to create a dictionary of system info (platform, python version).",
|
||||
"execute_code", "sys"),
|
||||
ToolCall("code-15", "code", "Execute Python to check internet connectivity by resolving google.com.",
|
||||
"execute_code", "socket"),
|
||||
|
||||
# ── Delegation (10) ──────────────────────────────────────────────────
|
||||
ToolCall("deleg-01", "delegate", "Use a subagent to find all .log files in /tmp/.",
|
||||
"delegate_task", "log"),
|
||||
ToolCall("deleg-02", "delegate", "Delegate to a subagent: what is 15 * 37?",
|
||||
"delegate_task", "15"),
|
||||
ToolCall("deleg-03", "delegate", "Use a subagent to check if Python 3 is installed and its version.",
|
||||
"delegate_task", "python"),
|
||||
ToolCall("deleg-04", "delegate", "Delegate: read /tmp/test_bench.txt and summarize it in one sentence.",
|
||||
"delegate_task", "summarize"),
|
||||
ToolCall("deleg-05", "delegate", "Use a subagent to list the contents of /tmp/ directory.",
|
||||
"delegate_task", "tmp"),
|
||||
ToolCall("deleg-06", "delegate", "Delegate: count the number of .py files in the current directory.",
|
||||
"delegate_task", ".py"),
|
||||
ToolCall("deleg-07", "delegate", "Use a subagent to check disk space with df -h.",
|
||||
"delegate_task", "df"),
|
||||
ToolCall("deleg-08", "delegate", "Delegate: what OS are we running on?",
|
||||
"delegate_task", "os"),
|
||||
ToolCall("deleg-09", "delegate", "Use a subagent to find the hostname of this machine.",
|
||||
"delegate_task", "hostname"),
|
||||
ToolCall("deleg-10", "delegate", "Delegate: create a temp file /tmp/bench_deleg.txt with 'done'.",
|
||||
"delegate_task", "write"),
|
||||
|
||||
# ── Todo / Memory (10 — replacing web/browser/MCP which need external services) ──
|
||||
ToolCall("todo-01", "todo", "Add a todo item: 'Run benchmark suite'",
|
||||
"todo", "benchmark"),
|
||||
ToolCall("todo-02", "todo", "Show me the current todo list.",
|
||||
"todo", ""),
|
||||
ToolCall("todo-03", "todo", "Mark the first todo item as completed.",
|
||||
"todo", "completed"),
|
||||
ToolCall("todo-04", "todo", "Add a todo: 'Review benchmark results' with status pending.",
|
||||
"todo", "Review"),
|
||||
ToolCall("todo-05", "todo", "Clear all completed todos.",
|
||||
"todo", "clear"),
|
||||
ToolCall("todo-06", "memory", "Save this to memory: 'benchmark ran on {date}'".format(
|
||||
date=datetime.now().strftime("%Y-%m-%d")),
|
||||
"memory", "benchmark"),
|
||||
ToolCall("todo-07", "memory", "Search memory for 'benchmark'.",
|
||||
"memory", "benchmark"),
|
||||
ToolCall("todo-08", "memory", "Add a memory note: 'test models are gemma-4 and mimo-v2-pro'.",
|
||||
"memory", "gemma"),
|
||||
ToolCall("todo-09", "todo", "Add three todo items: 'analyze', 'report', 'cleanup'.",
|
||||
"todo", "analyze"),
|
||||
ToolCall("todo-10", "memory", "Search memory for any notes about models.",
|
||||
"memory", "model"),
|
||||
|
||||
# ── Skills (10 — replacing MCP tools which need servers) ─────────────
|
||||
ToolCall("skill-01", "skills", "List all available skills.",
|
||||
"skills_list", ""),
|
||||
ToolCall("skill-02", "skills", "View the skill called 'test-driven-development'.",
|
||||
"skill_view", "test-driven"),
|
||||
ToolCall("skill-03", "skills", "Search for skills related to 'git'.",
|
||||
"skills_list", "git"),
|
||||
ToolCall("skill-04", "skills", "View the 'code-review' skill.",
|
||||
"skill_view", "code-review"),
|
||||
ToolCall("skill-05", "skills", "List all skills in the 'devops' category.",
|
||||
"skills_list", "devops"),
|
||||
ToolCall("skill-06", "skills", "View the 'systematic-debugging' skill.",
|
||||
"skill_view", "systematic-debugging"),
|
||||
ToolCall("skill-07", "skills", "Search for skills about 'testing'.",
|
||||
"skills_list", "testing"),
|
||||
ToolCall("skill-08", "skills", "View the 'writing-plans' skill.",
|
||||
"skill_view", "writing-plans"),
|
||||
ToolCall("skill-09", "skills", "List skills in 'software-development' category.",
|
||||
"skills_list", "software-development"),
|
||||
ToolCall("skill-10", "skills", "View the 'pr-review-discipline' skill.",
|
||||
"skill_view", "pr-review"),
|
||||
|
||||
# ── Additional tests to reach 100 ────────────────────────────────────
|
||||
ToolCall("file-21", "file", "Write a Python snippet to /tmp/bench_sort.py that sorts [3,1,2].",
|
||||
"write_file", "bench_sort"),
|
||||
ToolCall("file-22", "file", "Read /tmp/bench_sort.py back and confirm it exists.",
|
||||
"read_file", "bench_sort"),
|
||||
ToolCall("file-23", "file", "Search for 'class' in all .py files in the benchmarks directory.",
|
||||
"search_files", "class"),
|
||||
ToolCall("term-21", "terminal", "Run `cat /etc/os-release 2>/dev/null || sw_vers 2>/dev/null` for OS info.",
|
||||
"terminal", "os"),
|
||||
ToolCall("term-22", "terminal", "Run `nproc 2>/dev/null || sysctl -n hw.ncpu 2>/dev/null` for CPU count.",
|
||||
"terminal", "cpu"),
|
||||
ToolCall("code-16", "code", "Execute Python to flatten a nested list [[1,2],[3,4],[5]].",
|
||||
"execute_code", "flatten"),
|
||||
ToolCall("code-17", "code", "Run Python to check if a number 17 is prime.",
|
||||
"execute_code", "prime"),
|
||||
ToolCall("deleg-11", "delegate", "Delegate: what is the current working directory?",
|
||||
"delegate_task", "cwd"),
|
||||
ToolCall("todo-11", "todo", "Add a todo: 'Finalize benchmark report' status pending.",
|
||||
"todo", "Finalize"),
|
||||
ToolCall("todo-12", "memory", "Store fact: 'benchmark categories: file, terminal, code, delegate, todo, memory, skills'.",
|
||||
"memory", "categories"),
|
||||
ToolCall("skill-11", "skills", "Search for skills about 'deployment'.",
|
||||
"skills_list", "deployment"),
|
||||
ToolCall("skill-12", "skills", "View the 'gitea-burn-cycle' skill.",
|
||||
"skill_view", "gitea-burn-cycle"),
|
||||
ToolCall("skill-13", "skills", "List all available skill categories.",
|
||||
"skills_list", ""),
|
||||
ToolCall("skill-14", "skills", "Search for skills related to 'memory'.",
|
||||
"skills_list", "memory"),
|
||||
ToolCall("skill-15", "skills", "View the 'mimo-swarm' skill.",
|
||||
"skill_view", "mimo-swarm"),
|
||||
]
|
||||
# fmt: on
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Runner
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class CallResult:
|
||||
test_id: str
|
||||
category: str
|
||||
model: str
|
||||
prompt: str
|
||||
expected_tool: str
|
||||
success: bool
|
||||
tool_called: Optional[str] = None
|
||||
tool_args_valid: bool = False
|
||||
execution_ok: bool = False
|
||||
latency_s: float = 0.0
|
||||
error: str = ""
|
||||
raw_response: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelStats:
|
||||
model: str
|
||||
total: int = 0
|
||||
schema_ok: int = 0 # model produced valid tool call JSON
|
||||
exec_ok: int = 0 # tool actually ran without error
|
||||
latency_sum: float = 0.0
|
||||
failures: list = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def schema_pct(self) -> float:
|
||||
return (self.schema_ok / self.total * 100) if self.total else 0
|
||||
|
||||
@property
|
||||
def exec_pct(self) -> float:
|
||||
return (self.exec_ok / self.total * 100) if self.total else 0
|
||||
|
||||
@property
|
||||
def avg_latency(self) -> float:
|
||||
return (self.latency_sum / self.total) if self.total else 0
|
||||
|
||||
|
||||
def setup_test_files():
|
||||
"""Create prerequisite files for the benchmark."""
|
||||
Path("/tmp/test_bench.txt").write_text(
|
||||
"This is a benchmark test file.\n"
|
||||
"It contains sample data for tool-calling tests.\n"
|
||||
"Line three has some import statements.\n"
|
||||
"import os\nimport sys\nimport json\n"
|
||||
"End of test data.\n"
|
||||
)
|
||||
|
||||
|
||||
def run_single_test(tc: ToolCall, model_spec: str, provider: str) -> CallResult:
|
||||
"""Run a single tool-calling test through the agent."""
|
||||
from run_agent import AIAgent
|
||||
|
||||
result = CallResult(
|
||||
test_id=tc.id,
|
||||
category=tc.category,
|
||||
model=model_spec,
|
||||
prompt=tc.prompt,
|
||||
expected_tool=tc.expected_tool,
|
||||
success=False,
|
||||
)
|
||||
|
||||
try:
|
||||
agent = AIAgent(
|
||||
model=model_spec,
|
||||
provider=provider,
|
||||
max_iterations=3,
|
||||
quiet_mode=True,
|
||||
skip_context_files=True,
|
||||
skip_memory=True,
|
||||
persist_session=False,
|
||||
)
|
||||
|
||||
t0 = time.time()
|
||||
conv = agent.run_conversation(
|
||||
user_message=tc.prompt,
|
||||
system_message=(
|
||||
"You are a benchmark test runner. Execute the user's request by calling "
|
||||
"the appropriate tool. Return the tool result directly. Do not add commentary."
|
||||
),
|
||||
)
|
||||
result.latency_s = round(time.time() - t0, 2)
|
||||
|
||||
messages = conv.get("messages", [])
|
||||
|
||||
# Find the first assistant message with tool_calls
|
||||
tool_called = None
|
||||
tool_args_str = ""
|
||||
for msg in messages:
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc_item in msg["tool_calls"]:
|
||||
fn = tc_item.get("function", {})
|
||||
tool_called = fn.get("name", "")
|
||||
tool_args_str = fn.get("arguments", "{}")
|
||||
break
|
||||
break
|
||||
|
||||
if tool_called:
|
||||
result.tool_called = tool_called
|
||||
result.schema_ok = True
|
||||
|
||||
# Check if the right tool was called
|
||||
if tool_called == tc.expected_tool:
|
||||
result.success = True
|
||||
|
||||
# Check if args contain expected substring
|
||||
if tc.expected_params_check:
|
||||
result.tool_args_valid = tc.expected_params_check in tool_args_str
|
||||
else:
|
||||
result.tool_args_valid = True
|
||||
|
||||
# Check if tool executed (look for tool role message)
|
||||
for msg in messages:
|
||||
if msg.get("role") == "tool":
|
||||
content = msg.get("content", "")
|
||||
if content and "error" not in content.lower()[:50]:
|
||||
result.execution_ok = True
|
||||
break
|
||||
elif content:
|
||||
result.execution_ok = True # got a response, even if error
|
||||
break
|
||||
else:
|
||||
# No tool call produced — still check if model responded
|
||||
final = conv.get("final_response", "")
|
||||
result.raw_response = final[:200] if final else ""
|
||||
|
||||
except Exception as e:
|
||||
result.error = f"{type(e).__name__}: {str(e)[:200]}"
|
||||
result.latency_s = round(time.time() - t0, 2) if 't0' in dir() else 0
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def generate_report(results: list[CallResult], models: list[str], output_path: Path):
|
||||
"""Generate markdown benchmark report."""
|
||||
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
|
||||
# Aggregate per model
|
||||
stats: dict[str, ModelStats] = {}
|
||||
for m in models:
|
||||
stats[m] = ModelStats(model=m)
|
||||
|
||||
by_category: dict[str, dict[str, list[CallResult]]] = {}
|
||||
|
||||
for r in results:
|
||||
s = stats[r.model]
|
||||
s.total += 1
|
||||
s.schema_ok += int(r.schema_ok)
|
||||
s.exec_ok += int(r.execution_ok)
|
||||
s.latency_sum += r.latency_s
|
||||
if not r.success:
|
||||
s.failures.append(r)
|
||||
|
||||
by_category.setdefault(r.category, {}).setdefault(r.model, []).append(r)
|
||||
|
||||
lines = [
|
||||
f"# Tool-Calling Benchmark Report",
|
||||
f"",
|
||||
f"Generated: {now}",
|
||||
f"Suite: {len(SUITE)} calls across {len(set(tc.category for tc in SUITE))} categories",
|
||||
f"Models tested: {', '.join(models)}",
|
||||
f"",
|
||||
f"## Summary",
|
||||
f"",
|
||||
f"| Metric | {' | '.join(models)} |",
|
||||
f"|--------|{'|'.join('---------' for _ in models)}|",
|
||||
]
|
||||
|
||||
# Schema parse success
|
||||
row = "| Schema parse success | "
|
||||
for m in models:
|
||||
s = stats[m]
|
||||
row += f"{s.schema_ok}/{s.total} ({s.schema_pct:.0f}%) | "
|
||||
lines.append(row)
|
||||
|
||||
# Tool execution success
|
||||
row = "| Tool execution success | "
|
||||
for m in models:
|
||||
s = stats[m]
|
||||
row += f"{s.exec_ok}/{s.total} ({s.exec_pct:.0f}%) | "
|
||||
lines.append(row)
|
||||
|
||||
# Correct tool selected
|
||||
row = "| Correct tool selected | "
|
||||
for m in models:
|
||||
s = stats[m]
|
||||
correct = sum(1 for r in results if r.model == m and r.success)
|
||||
pct = (correct / s.total * 100) if s.total else 0
|
||||
row += f"{correct}/{s.total} ({pct:.0f}%) | "
|
||||
lines.append(row)
|
||||
|
||||
# Avg latency
|
||||
row = "| Avg latency (s) | "
|
||||
for m in models:
|
||||
s = stats[m]
|
||||
row += f"{s.avg_latency:.2f} | "
|
||||
lines.append(row)
|
||||
|
||||
lines.append("")
|
||||
|
||||
# Per-category breakdown
|
||||
lines.append("## Per-Category Breakdown")
|
||||
lines.append("")
|
||||
|
||||
for cat in sorted(by_category.keys()):
|
||||
lines.append(f"### {cat.title()}")
|
||||
lines.append("")
|
||||
lines.append(f"| Metric | {' | '.join(models)} |")
|
||||
lines.append(f"|--------|{'|'.join('---------' for _ in models)}|")
|
||||
|
||||
cat_data = by_category[cat]
|
||||
for metric_name, fn in [
|
||||
("Schema OK", lambda r: r.schema_ok),
|
||||
("Exec OK", lambda r: r.execution_ok),
|
||||
("Correct tool", lambda r: r.success),
|
||||
]:
|
||||
row = f"| {metric_name} | "
|
||||
for m in models:
|
||||
results_m = cat_data.get(m, [])
|
||||
total = len(results_m)
|
||||
ok = sum(1 for r in results_m if fn(r))
|
||||
pct = (ok / total * 100) if total else 0
|
||||
row += f"{ok}/{total} ({pct:.0f}%) | "
|
||||
lines.append(row)
|
||||
|
||||
lines.append("")
|
||||
|
||||
# Failure analysis
|
||||
lines.append("## Failure Analysis")
|
||||
lines.append("")
|
||||
|
||||
any_failures = False
|
||||
for m in models:
|
||||
s = stats[m]
|
||||
if s.failures:
|
||||
any_failures = True
|
||||
lines.append(f"### {m} — {len(s.failures)} failures")
|
||||
lines.append("")
|
||||
lines.append("| Test | Category | Expected | Got | Error |")
|
||||
lines.append("|------|----------|----------|-----|-------|")
|
||||
for r in s.failures:
|
||||
got = r.tool_called or "none"
|
||||
err = r.error or "wrong tool"
|
||||
lines.append(f"| {r.test_id} | {r.category} | {r.expected_tool} | {got} | {err[:60]} |")
|
||||
lines.append("")
|
||||
|
||||
if not any_failures:
|
||||
lines.append("No failures detected.")
|
||||
lines.append("")
|
||||
|
||||
# Raw results JSON
|
||||
lines.append("## Raw Results")
|
||||
lines.append("")
|
||||
lines.append("```json")
|
||||
lines.append(json.dumps([asdict(r) for r in results], indent=2, default=str))
|
||||
lines.append("```")
|
||||
|
||||
report = "\n".join(lines)
|
||||
output_path.write_text(report)
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Tool-calling benchmark")
|
||||
parser.add_argument("--models", nargs="+",
|
||||
default=["nous:gia-3/gemma-4-31b", "nous:mimo-v2-pro"],
|
||||
help="Model specs to test (provider:model)")
|
||||
parser.add_argument("--limit", type=int, default=0,
|
||||
help="Run only first N tests (0 = all)")
|
||||
parser.add_argument("--category", type=str, default="",
|
||||
help="Run only tests in this category")
|
||||
parser.add_argument("--output", type=str, default="",
|
||||
help="Output report path (default: benchmarks/gemma4-tool-calling-YYYY-MM-DD.md)")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Print test cases without running them")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Filter suite
|
||||
suite = SUITE[:]
|
||||
if args.category:
|
||||
suite = [tc for tc in suite if tc.category == args.category]
|
||||
if args.limit > 0:
|
||||
suite = suite[:args.limit]
|
||||
|
||||
if args.dry_run:
|
||||
print(f"Would run {len(suite)} tests:")
|
||||
for tc in suite:
|
||||
print(f" [{tc.category:8s}] {tc.id}: {tc.expected_tool} — {tc.prompt[:60]}")
|
||||
return
|
||||
|
||||
# Setup
|
||||
setup_test_files()
|
||||
date_str = datetime.now().strftime("%Y-%m-%d")
|
||||
output_path = Path(args.output) if args.output else REPO_ROOT / "benchmarks" / f"gemma4-tool-calling-{date_str}.md"
|
||||
|
||||
# Parse model specs
|
||||
model_specs = []
|
||||
for spec in args.models:
|
||||
parts = spec.split(":", 1)
|
||||
provider = parts[0]
|
||||
model_name = parts[1] if len(parts) > 1 else parts[0]
|
||||
model_specs.append((provider, model_name, spec))
|
||||
|
||||
print(f"Benchmark: {len(suite)} tests × {len(model_specs)} models = {len(suite) * len(model_specs)} calls")
|
||||
print(f"Output: {output_path}")
|
||||
print()
|
||||
|
||||
all_results: list[CallResult] = []
|
||||
|
||||
for provider, model_name, full_spec in model_specs:
|
||||
print(f"── {full_spec} {'─' * (50 - len(full_spec))}")
|
||||
model_results = []
|
||||
|
||||
for i, tc in enumerate(suite, 1):
|
||||
sys.stdout.write(f"\r [{i:3d}/{len(suite)}] {tc.id:10s} {tc.category:8s} → {tc.expected_tool:20s}")
|
||||
sys.stdout.flush()
|
||||
|
||||
r = run_single_test(tc, full_spec, provider)
|
||||
model_results.append(r)
|
||||
|
||||
status = "✓" if r.success else "✗"
|
||||
sys.stdout.write(f" {status} ({r.latency_s:.1f}s)")
|
||||
sys.stdout.write("\n")
|
||||
|
||||
all_results.extend(model_results)
|
||||
|
||||
# Quick stats
|
||||
ok = sum(1 for r in model_results if r.success)
|
||||
print(f" Result: {ok}/{len(model_results)} correct tool selected ({ok/len(model_results)*100:.0f}%)")
|
||||
print()
|
||||
|
||||
# Generate report
|
||||
model_names = [spec for _, _, spec in model_specs]
|
||||
report = generate_report(all_results, model_names, output_path)
|
||||
print(f"Report written to {output_path}")
|
||||
|
||||
# Exit code: 0 if all pass, 1 if any failures
|
||||
total_fail = sum(1 for r in all_results if not r.success)
|
||||
sys.exit(1 if total_fail > 0 else 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,432 +0,0 @@
|
||||
# Workflow Orchestration & Task Queue Research for AI Agents
|
||||
|
||||
**Date:** 2026-04-14
|
||||
**Scope:** SOTA comparison of task queues and workflow orchestrators for autonomous AI agent workflows
|
||||
|
||||
---
|
||||
|
||||
## 1. Current Architecture: Cron + Webhook
|
||||
|
||||
### How it works
|
||||
- **Scheduler:** `cron/scheduler.py` — gateway calls `tick()` every 60 seconds
|
||||
- **Storage:** JSON file (`~/.hermes/cron/jobs.json`) + file-based lock (`cron/.tick.lock`)
|
||||
- **Execution:** Each job spawns a full `AIAgent.run_conversation()` in a thread pool with inactivity timeout
|
||||
- **Delivery:** Results pushed back to origin chat via platform adapters (Telegram, Discord, etc.)
|
||||
- **Checkpointing:** Job outputs saved to `~/.hermes/cron/output/{job_id}/{timestamp}.md`
|
||||
|
||||
### Strengths
|
||||
- Simple, zero-dependency (no broker/redis needed)
|
||||
- Jobs are isolated — each runs a fresh agent session
|
||||
- Direct platform delivery with E2EE support
|
||||
- Script pre-run for data collection
|
||||
- Inactivity-based timeout (not hard wall-clock)
|
||||
|
||||
### Weaknesses
|
||||
- **No task dependencies** — jobs are completely independent
|
||||
- **No retry logic** — single failure = lost run (recurring jobs advance schedule and move on)
|
||||
- **No concurrency control** — all due jobs fire at once; no worker pool sizing
|
||||
- **No observability** — no metrics, no dashboard, no structured logging of job state transitions
|
||||
- **Tick-based polling** — 60s granularity, wastes cycles when idle, adds latency when busy
|
||||
- **Single-process** — file lock means only one tick at a time; no horizontal scaling
|
||||
- **No dead letter queue** — failed deliveries are logged but not retried
|
||||
- **No workflow chaining** — cannot express "run A, then B with A's output"
|
||||
|
||||
---
|
||||
|
||||
## 2. Framework Comparison
|
||||
|
||||
### 2.1 Huey (Already Installed v2.6.0)
|
||||
|
||||
**Architecture:** Embedded task queue, SQLite/Redis/file storage, consumer process model.
|
||||
|
||||
| Feature | Huey | Our Cron |
|
||||
|---|---|---|
|
||||
| Broker | SQLite (default), Redis | JSON file |
|
||||
| Retry | Built-in: `retries=N, retry_delay=S` | None |
|
||||
| Task chaining | `task1.s() | task2.s()` (pipeline) | None |
|
||||
| Scheduling | `@huey.periodic_task(crontab(...))` | Our own cron parser |
|
||||
| Concurrency | Worker pool with `-w N` flag | Single tick lock |
|
||||
| Monitoring | `huey_consumer` logs, Huey Admin (Django) | Manual log reading |
|
||||
| Failure recovery | Automatic retry + configurable backoff | None |
|
||||
| Priority | `PriorityRedisExpireHuey` or task priority | None |
|
||||
| Result storage | `store_results=True` with result() | File output |
|
||||
|
||||
**Task Dependencies Pattern:**
|
||||
```python
|
||||
@huey.task()
|
||||
def analyze_data(input_data):
|
||||
return run_analysis(input_data)
|
||||
|
||||
@huey.task()
|
||||
def generate_report(analysis_result):
|
||||
return create_report(analysis_result)
|
||||
|
||||
# Pipeline: analyze then report
|
||||
pipeline = analyze_data.s(raw_data) | generate_report.s()
|
||||
result = pipeline()
|
||||
```
|
||||
|
||||
**Retry Pattern:**
|
||||
```python
|
||||
@huey.task(retries=3, retry_delay=60, retry_backoff=True)
|
||||
def flaky_api_call(url):
|
||||
return requests.get(url, timeout=30)
|
||||
```
|
||||
|
||||
**Benchmarks:** ~5,000 tasks/sec with SQLite backend, ~15,000 with Redis. Sub-millisecond scheduling latency. Very lightweight — single process.
|
||||
|
||||
**Verdict:** Best fit for our use case. Already installed. SQLite backend = no external deps. Can layer on top of our existing job storage.
|
||||
|
||||
---
|
||||
|
||||
### 2.2 Celery
|
||||
|
||||
**Architecture:** Distributed task queue with message broker (RabbitMQ/Redis).
|
||||
|
||||
| Feature | Celery | Huey |
|
||||
|---|---|---|
|
||||
| Broker | Redis, RabbitMQ, SQS (required) | SQLite (built-in) |
|
||||
| Scale | 100K+ tasks/sec | ~5-15K tasks/sec |
|
||||
| Chains | `chain(task1.s(), task2.s())` | Pipeline operator |
|
||||
| Groups/Chords | Parallel + callback | Not built-in |
|
||||
| Canvas | Full workflow DSL (chain, group, chord, map) | Basic pipeline |
|
||||
| Monitoring | Flower dashboard, Celery events | Minimal |
|
||||
| Complexity | Heavy — needs broker, workers, result backend | Single process |
|
||||
|
||||
**Workflow Pattern:**
|
||||
```python
|
||||
from celery import chain, group, chord
|
||||
|
||||
# Chain: sequential
|
||||
workflow = chain(fetch_data.s(), analyze.s(), report.s())
|
||||
|
||||
# Group: parallel
|
||||
parallel = group(fetch_twitter.s(), fetch_reddit.s(), fetch_hn.s())
|
||||
|
||||
# Chord: parallel then callback
|
||||
chord(parallel, aggregate_results.s())
|
||||
```
|
||||
|
||||
**Verdict:** Overkill for our scale. Adds RabbitMQ/Redis dependency. The Canvas API is powerful but we don't need 100K task/sec throughput. Flower monitoring is nice but we'd need to deploy it separately.
|
||||
|
||||
---
|
||||
|
||||
### 2.3 Temporal
|
||||
|
||||
**Architecture:** Durable execution engine. Workflows as code with automatic state persistence and replay.
|
||||
|
||||
| Feature | Temporal | Our Cron |
|
||||
|---|---|---|
|
||||
| State management | Automatic — workflow state persisted on every step | Manual JSON files |
|
||||
| Failure recovery | Workflows survive process restarts, auto-retry | Lost on crash |
|
||||
| Task dependencies | Native — activities call other activities | None |
|
||||
| Long-running tasks | Built-in (days/months OK) | Inactivity timeout |
|
||||
| Versioning | Workflow versioning for safe updates | No versioning |
|
||||
| Visibility | Full workflow state at any point | Log files |
|
||||
| Infrastructure | Requires Temporal server + database | None |
|
||||
| Language | Python SDK, but Temporal server is Go | Pure Python |
|
||||
|
||||
**Workflow Pattern:**
|
||||
```python
|
||||
@workflow.defn
|
||||
class AIAgentWorkflow:
|
||||
@workflow.run
|
||||
async def run(self, job_config: dict) -> str:
|
||||
# Step 1: Fetch data
|
||||
data = await workflow.execute_activity(
|
||||
fetch_data_activity,
|
||||
job_config["script"],
|
||||
start_to_close_timeout=timedelta(minutes=5),
|
||||
retry_policy=RetryPolicy(maximum_attempts=3),
|
||||
)
|
||||
|
||||
# Step 2: Analyze with AI agent
|
||||
analysis = await workflow.execute_activity(
|
||||
run_agent_activity,
|
||||
{"prompt": job_config["prompt"], "context": data},
|
||||
start_to_close_timeout=timedelta(minutes=30),
|
||||
retry_policy=RetryPolicy(
|
||||
initial_interval=timedelta(seconds=60),
|
||||
maximum_attempts=3,
|
||||
),
|
||||
)
|
||||
|
||||
# Step 3: Deliver
|
||||
await workflow.execute_activity(
|
||||
deliver_activity,
|
||||
{"platform": job_config["deliver"], "content": analysis},
|
||||
start_to_close_timeout=timedelta(seconds=60),
|
||||
)
|
||||
return analysis
|
||||
```
|
||||
|
||||
**Verdict:** Best architecture for complex multi-step AI workflows, but heavy infrastructure cost. Temporal server needs PostgreSQL/Cassandra + visibility store. Ideal if we reach 50+ multi-step workflows with complex failure modes. Overkill for current needs.
|
||||
|
||||
---
|
||||
|
||||
### 2.4 Prefect
|
||||
|
||||
**Architecture:** Modern data/workflow orchestration with Python-native API.
|
||||
|
||||
| Feature | Prefect |
|
||||
|---|---|
|
||||
| Dependencies | SQLite (default) or PostgreSQL |
|
||||
| Task retries | `@task(retries=3, retry_delay_seconds=10)` |
|
||||
| Task dependencies | `result = task_a(wait_for=[task_b])` |
|
||||
| Caching | `cache_key_fn` for result caching |
|
||||
| Subflows | Nested workflow composition |
|
||||
| Deployments | Schedule via `Deployment` or `CronSchedule` |
|
||||
| UI | Excellent web dashboard |
|
||||
| Async | Full async support |
|
||||
|
||||
**Workflow Pattern:**
|
||||
```python
|
||||
from prefect import flow, task
|
||||
from prefect.tasks import task_input_hash
|
||||
|
||||
@task(retries=3, retry_delay_seconds=30)
|
||||
def run_agent(prompt: str) -> str:
|
||||
agent = AIAgent(...)
|
||||
return agent.run_conversation(prompt)
|
||||
|
||||
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
|
||||
def fetch_context(script: str) -> str:
|
||||
return run_script(script)
|
||||
|
||||
@flow(name="agent-workflow")
|
||||
def agent_workflow(job_config: dict):
|
||||
context = fetch_context(job_config.get("script", ""))
|
||||
result = run_agent(
|
||||
f"{context}\n\n{job_config['prompt']}",
|
||||
wait_for=[context]
|
||||
)
|
||||
deliver(result, job_config["deliver"])
|
||||
return result
|
||||
```
|
||||
|
||||
**Benchmarks:** Sub-second task scheduling. Handles 10K+ concurrent task runs. SQLite backend for single-node.
|
||||
|
||||
**Verdict:** Strong alternative. Pythonic, good UI, built-in scheduling. But heavier than Huey — deploys a server process. Best if we want a web dashboard for monitoring. Less infrastructure than Temporal but more than Huey.
|
||||
|
||||
---
|
||||
|
||||
### 2.5 Apache Airflow
|
||||
|
||||
**Architecture:** Batch-oriented DAG scheduler, Python-based.
|
||||
|
||||
| Feature | Airflow |
|
||||
|---|---|
|
||||
| DAG model | Static DAGs defined in Python files |
|
||||
| Scheduler | Polling-based, 5-30s granularity |
|
||||
| Dependencies | PostgreSQL/MySQL + Redis/RabbitMQ + webserver |
|
||||
| UI | Rich web UI with DAG visualization |
|
||||
| Best for | ETL, data pipelines, batch processing |
|
||||
| Weakness | Not designed for dynamic task creation; heavy; DAG definition overhead |
|
||||
|
||||
**Verdict:** Wrong tool for this job. Airflow excels at static, well-defined data pipelines (ETL). Our agent workflows are dynamic — tasks are created at runtime based on user prompts. Airflow's DAG model fights against this. Massive overhead (needs webserver, scheduler, worker, metadata DB).
|
||||
|
||||
---
|
||||
|
||||
### 2.6 Dramatiq
|
||||
|
||||
**Architecture:** Lightweight distributed task queue, Celery alternative.
|
||||
|
||||
| Feature | Dramatiq |
|
||||
|---|---|
|
||||
| Broker | Redis, RabbitMQ |
|
||||
| Retries | `@dramatiq.actor(max_retries=3)` |
|
||||
| Middleware | Pluggable: age_limit, time_limit, retries, callbacks |
|
||||
| Groups | `group(actor.message(...), ...).run()` |
|
||||
| Pipes | `actor.message() | other_actor.message()` |
|
||||
| Simplicity | Cleaner API than Celery |
|
||||
|
||||
**Verdict:** Nice middle ground between Huey and Celery. But still requires a broker (Redis/RabbitMQ). No SQLite backend. Less ecosystem than Celery, less lightweight than Huey.
|
||||
|
||||
---
|
||||
|
||||
### 2.7 RQ (Redis Queue)
|
||||
|
||||
**Architecture:** Minimal Redis-based task queue.
|
||||
|
||||
| Feature | RQ |
|
||||
|---|---|
|
||||
| Broker | Redis only |
|
||||
| Retries | Via `Retry` class |
|
||||
| Workers | Simple worker processes |
|
||||
| Dashboard | `rq-dashboard` (separate) |
|
||||
| Limitation | Redis-only, no SQLite, no scheduling built-in |
|
||||
|
||||
**Verdict:** Too simple and Redis-dependent. No periodic task support without `rq-scheduler`. No task chaining without third-party. Not competitive with Huey for our use case.
|
||||
|
||||
---
|
||||
|
||||
## 3. Architecture Patterns for AI Agent Workflows
|
||||
|
||||
### 3.1 Task Chaining (Fan-out / Fan-in)
|
||||
|
||||
The critical pattern for multi-step AI workflows:
|
||||
|
||||
```
|
||||
[Script] → [Agent] → [Deliver]
|
||||
↓ ↓ ↓
|
||||
Context Report Notification
|
||||
```
|
||||
|
||||
**Implementation with Huey:**
|
||||
```python
|
||||
@huey.task(retries=2)
|
||||
def run_script_task(script_path):
|
||||
return run_script(script_path)
|
||||
|
||||
@huey.task(retries=3, retry_delay=60)
|
||||
def run_agent_task(prompt, context=None):
|
||||
if context:
|
||||
prompt = f"## Context\n{context}\n\n{prompt}"
|
||||
agent = AIAgent(...)
|
||||
return agent.run_conversation(prompt)
|
||||
|
||||
@huey.task()
|
||||
def deliver_task(result, job_config):
|
||||
return deliver_result(job_config, result)
|
||||
|
||||
# Compose: script → agent → deliver
|
||||
def compose_workflow(job):
|
||||
steps = []
|
||||
if job.get("script"):
|
||||
steps.append(run_script_task.s(job["script"]))
|
||||
steps.append(run_agent_task.s(job["prompt"]))
|
||||
steps.append(deliver_task.s(job))
|
||||
return reduce(lambda a, b: a.then(b), steps)
|
||||
```
|
||||
|
||||
### 3.2 Retry with Exponential Backoff
|
||||
|
||||
```python
|
||||
from huey import RetryTask
|
||||
|
||||
class AIWorkflowTask(RetryTask):
|
||||
retries = 3
|
||||
retry_delay = 30 # Start at 30s
|
||||
retry_backoff = True # 30s → 60s → 120s
|
||||
max_retry_delay = 600 # Cap at 10min
|
||||
```
|
||||
|
||||
### 3.3 Dead Letter Queue
|
||||
|
||||
For tasks that exhaust retries:
|
||||
```python
|
||||
@huey.task(retries=3)
|
||||
def flaky_task(data):
|
||||
...
|
||||
|
||||
# Dead letter handling
|
||||
def handle_failure(task, exc, retries):
|
||||
# Log to dead letter store
|
||||
save_dead_letter(task, exc, retries)
|
||||
# Notify user of failure
|
||||
notify_user(f"Task {task.name} failed after {retries} retries: {exc}")
|
||||
```
|
||||
|
||||
### 3.4 Observability Pattern
|
||||
|
||||
```python
|
||||
# Structured event logging for every state transition
|
||||
def emit_event(job_id, event_type, metadata):
|
||||
event = {
|
||||
"job_id": job_id,
|
||||
"event": event_type, # scheduled, started, completed, failed, retried
|
||||
"timestamp": iso_now(),
|
||||
"metadata": metadata,
|
||||
}
|
||||
append_to_event_log(event)
|
||||
# Also emit to metrics (Prometheus/StatsD)
|
||||
metrics.increment(f"cron.{event_type}")
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 4. Benchmarks Summary
|
||||
|
||||
| Framework | Throughput | Latency | Memory | Startup | Dependencies |
|
||||
|---|---|---|---|---|---|
|
||||
| Current Cron | ~1 job/60s tick | 60-120s | Minimal | Instant | None |
|
||||
| Huey (SQLite) | ~5K tasks/sec | <10ms | ~20MB | <1s | None |
|
||||
| Huey (Redis) | ~15K tasks/sec | <5ms | ~20MB | <1s | Redis |
|
||||
| Celery (Redis) | ~15K tasks/sec | <10ms | ~100MB | ~3s | Redis |
|
||||
| Temporal | ~50K activities/sec | <5ms | ~200MB | ~10s | Temporal server+DB |
|
||||
| Prefect | ~10K tasks/sec | <20ms | ~150MB | ~5s | PostgreSQL |
|
||||
|
||||
---
|
||||
|
||||
## 5. Recommendations
|
||||
|
||||
### Immediate (Phase 1): Enhance Current Cron
|
||||
|
||||
Add these capabilities to the existing `cron/` module **without** switching frameworks:
|
||||
|
||||
1. **Retry logic** — Add `retry_count`, `retry_delay`, `max_retries` fields to job JSON. In `scheduler.py tick()`, on failure: if `retries_remaining > 0`, don't advance schedule, set `next_run_at = now + retry_delay * (attempt^2)`.
|
||||
|
||||
2. **Backoff** — Exponential: `delay * 2^attempt`, capped at 10 minutes.
|
||||
|
||||
3. **Dead letter tracking** — After max retries, mark job state as `dead_letter` and emit a delivery notification with the error.
|
||||
|
||||
4. **Concurrency limit** — Add a semaphore (e.g., `max_concurrent=3`) to `tick()` so we don't spawn 20 agents simultaneously.
|
||||
|
||||
5. **Structured events** — Append JSON events to `~/.hermes/cron/events.jsonl` for every state transition (scheduled, started, completed, failed, retried, delivered).
|
||||
|
||||
**Effort:** ~1-2 days. No new dependencies.
|
||||
|
||||
### Medium-term (Phase 2): Adopt Huey for Workflow Chaining
|
||||
|
||||
When we need task dependencies (multi-step agent workflows), migrate to Huey:
|
||||
|
||||
1. **Keep the JSON job store** as the source of truth for user-facing job management.
|
||||
2. **Use Huey as the execution engine** — enqueue tasks from `tick()`, let Huey handle retries, scheduling, and chaining.
|
||||
3. **SQLite backend** — no new infrastructure. One consumer process (`huey_consumer.py`) alongside the gateway.
|
||||
4. **Task chaining for multi-step jobs** — `script_task.then(agent_task).then(delivery_task)`.
|
||||
|
||||
**Migration path:**
|
||||
- Phase 2a: Run Huey consumer alongside gateway. Mirror cron jobs to Huey periodic tasks.
|
||||
- Phase 2b: Add task chaining for jobs with scripts.
|
||||
- Phase 2c: Migrate all jobs to Huey, deprecate tick()-based execution.
|
||||
|
||||
**Effort:** ~1 week. Huey already installed. Gateway integration ~2-3 days.
|
||||
|
||||
### Long-term (Phase 3): Evaluate Temporal/Prefect
|
||||
|
||||
Only if:
|
||||
- We have 100+ concurrent multi-step workflows
|
||||
- We need workflow versioning and A/B testing
|
||||
- We need cross-service orchestration (agent calls to external APIs with complex compensation logic)
|
||||
- We want a web dashboard for non-technical users
|
||||
|
||||
**Don't adopt early** — these tools solve problems we don't have yet.
|
||||
|
||||
---
|
||||
|
||||
## 6. Decision Matrix
|
||||
|
||||
| Need | Best Solution | Why |
|
||||
|---|---|---|
|
||||
| Simple retry logic | Enhance current cron | Zero deps, fast to implement |
|
||||
| Task chaining | **Huey** | Already installed, SQLite backend, pipeline API |
|
||||
| Monitoring dashboard | Prefect or Huey+Flower | If monitoring becomes critical |
|
||||
| Massive scale (10K+/sec) | Celery + Redis | If we're processing thousands of agent runs per hour |
|
||||
| Complex compensation | Temporal | Only if we need durable multi-service workflows |
|
||||
| Periodic scheduling | Current cron (works) or Huey | Current is fine; Huey adds `crontab()` with seconds |
|
||||
|
||||
---
|
||||
|
||||
## 7. Key Insight
|
||||
|
||||
The cron system's biggest gap isn't the framework — it's the **absence of retry and dependency primitives**. These can be added to the current system in <100 lines of code. The second biggest gap is observability (structured events + metrics), which is also solvable incrementally.
|
||||
|
||||
Huey is the right *eventual* target for workflow execution because:
|
||||
1. Already installed, zero new dependencies
|
||||
2. SQLite backend matches our "no infrastructure" philosophy
|
||||
3. Pipeline API gives us task chaining for free
|
||||
4. Retry/backoff is first-class
|
||||
5. Consumer model is more efficient than tick-polling
|
||||
6. ~50x better scheduling latency (ms vs 60s)
|
||||
|
||||
The migration should be gradual — start by wrapping Huey inside our existing cron tick, then progressively move execution to Huey's consumer model.
|
||||
@@ -1,324 +0,0 @@
|
||||
# SOTA Research: Multi-Agent Coordination & Fleet Knowledge Graphs
|
||||
|
||||
**Date:** 2026-04-14
|
||||
**Scope:** Agent-to-agent communication, shared memory, task delegation, consensus protocols
|
||||
**Frameworks Analyzed:** CrewAI, AutoGen, MetaGPT, ChatDev, CAMEL
|
||||
|
||||
---
|
||||
|
||||
## 1. Architecture Pattern Summary
|
||||
|
||||
### 1.1 CrewAI — Role-Based Crew Orchestration
|
||||
|
||||
**Core Pattern:** Agents organized into "Crews" with explicit roles, goals, and backstories. Tasks are assigned to agents, executed via sequential or hierarchical process flows.
|
||||
|
||||
**Agent-to-Agent Communication:**
|
||||
- **Sequential:** Agent A completes Task A → output injected into Task B's context for Agent B
|
||||
- **Hierarchical:** Manager agent delegates to worker agents, collects results, synthesizes
|
||||
- **Context passing:** Tasks can declare `context: [other_tasks]` — outputs from dependent tasks are automatically injected into the current task's prompt
|
||||
- **No direct agent-to-agent messaging** — communication is mediated through task outputs
|
||||
|
||||
**Shared Memory (v2 — Unified Memory):**
|
||||
- `Memory` class with `remember()` / `recall()` using vector embeddings (LanceDB/ChromaDB)
|
||||
- **Scope-based isolation:** `MemoryScope` provides path-based namespacing (`/crew/research/agent-foo`)
|
||||
- **Composite scoring:** semantic similarity (0.5) + recency (0.3) + importance (0.2)
|
||||
- **RecallFlow:** LLM-driven deep recall with adaptive query expansion
|
||||
- **Privacy flags:** Private memories only visible to the source that created them
|
||||
- **Background saves:** ThreadPoolExecutor with write barrier (drain_writes before recall)
|
||||
|
||||
**Task Delegation:**
|
||||
- Agent tools include `Delegate Work to Co-worker` and `Ask Question to Co-worker`
|
||||
- Delegation creates a new task for another agent, results come back to delegator
|
||||
- Depth-limited (no infinite delegation chains)
|
||||
|
||||
**State & Checkpointing:**
|
||||
- `SqliteProvider` / `JsonProvider` for state checkpoint persistence
|
||||
- `CheckpointConfig` with event-driven persistence
|
||||
- Flow state is Pydantic models with serialization
|
||||
|
||||
**Cache:**
|
||||
- Thread-safe in-memory tool result cache with RWLock
|
||||
- Key: `{tool_name}-{input}` → cached output
|
||||
|
||||
### 1.2 AutoGen (Microsoft) — Conversation-Centric Teams
|
||||
|
||||
**Core Pattern:** Agents communicate through shared conversation threads. A "Group Chat Manager" controls turn-taking and speaker selection.
|
||||
|
||||
**Agent-to-Agent Communication:**
|
||||
- **Shared message thread** — all agents see all messages (like a group chat)
|
||||
- **Three team patterns:**
|
||||
- `RoundRobinGroupChat`: Fixed order cycling through participants
|
||||
- `SelectorGroupChat`: LLM-based speaker selection with candidate filtering
|
||||
- `SwarmGroupChat`: Handoff-based routing (agent sends HandoffMessage to next agent)
|
||||
- `GraphFlow` (DiGraph): DAG-based execution with conditional edges, parallel fan-out, loops
|
||||
- `MagenticOneOrchestrator`: Ledger-based orchestration with task planning, progress tracking, stall detection
|
||||
|
||||
**Shared State:**
|
||||
- `ChatCompletionContext` — manages message history per agent (can be unbounded or windowed)
|
||||
- `ModelContext` shared across agents in a team
|
||||
- State serialization: `save_state()` / `load_state()` for all managers
|
||||
- **No built-in vector memory** — context is purely conversational
|
||||
|
||||
**Task Delegation:**
|
||||
- `Swarm`: Agents use `HandoffMessage` to explicitly route control
|
||||
- `GraphFlow`: Conditional edges route based on message content (keyword or callable)
|
||||
- `MagenticOne`: Orchestrator maintains a "task ledger" (facts + plan) and dynamically re-plans on stalls
|
||||
|
||||
**Consensus / Termination:**
|
||||
- `TerminationCondition` — composable conditions (text match, max messages, source-based)
|
||||
- No explicit consensus protocols — termination is manager-decided
|
||||
|
||||
**Key Insight:** AutoGen's `ChatCompletionContext` is the closest analog to shared memory, but it's purely sequential message history, not a knowledge base.
|
||||
|
||||
### 1.3 MetaGPT — SOP-Driven Software Teams
|
||||
|
||||
**Core Pattern:** Agents follow Standard Operating Procedures (SOPs). Each agent has a defined role (Product Manager, Architect, Engineer, QA) and produces structured artifacts.
|
||||
|
||||
**Agent-to-Agent Communication:**
|
||||
- **Publish-Subscribe via Environment:** Agents publish "actions" to a shared Environment, subscribers react
|
||||
- **Structured outputs:** Each role produces specific artifact types (PRD, design doc, code, test cases)
|
||||
- **Message routing:** Environment acts as a message bus, filtering by subscriber interest
|
||||
|
||||
**Shared Memory:**
|
||||
- `Environment` class maintains shared state (project workspace)
|
||||
- File-based shared memory: agents write/read from a shared filesystem
|
||||
- `SharedMemory` for cross-agent context (structured data, not free-form text)
|
||||
|
||||
**Task Delegation:**
|
||||
- Implicit through SOP stages: PM → Architect → Engineer → QA
|
||||
- Each agent's output is the next agent's input
|
||||
- No dynamic re-delegation
|
||||
|
||||
**Consensus:**
|
||||
- Sequential SOP execution (no parallel agents)
|
||||
- QA agent can trigger re-work loops back to Engineer
|
||||
|
||||
### 1.4 ChatDev — Chat-Chain Software Development
|
||||
|
||||
**Core Pattern:** Agents follow a "chat chain" — a sequence of chat phases (designing, coding, testing, documenting). Each phase involves a pair of agents (CEO↔CTO, Programmer↔Reviewer, etc.).
|
||||
|
||||
**Agent-to-Agent Communication:**
|
||||
- **Paired chat sessions:** Two agents communicate in each phase (role-play between instructor and assistant)
|
||||
- **Chain propagation:** Phase N's output (code, design doc) becomes Phase N+1's input
|
||||
- **No broadcast** — communication is strictly pairwise within phases
|
||||
|
||||
**Shared Memory:**
|
||||
- Software-centric: shared code repository is the "memory"
|
||||
- Each phase modifies/inherits the codebase
|
||||
- No explicit vector memory or knowledge graph
|
||||
|
||||
**Task Delegation:**
|
||||
- Hardcoded phase sequence: Design → Code → Test → Document
|
||||
- Each phase delegates to a specific agent pair
|
||||
- No dynamic task re-assignment
|
||||
|
||||
**Consensus:**
|
||||
- Phase-level termination: when both agents agree the phase is complete
|
||||
- "Thought" tokens for chain-of-thought within chat
|
||||
|
||||
### 1.5 CAMEL — Role-Playing & Workforce
|
||||
|
||||
**Core Pattern:** Two primary modes:
|
||||
1. **RolePlaying:** Two-agent conversation with task specification and optional critic
|
||||
2. **Workforce:** Multi-agent with coordinator, task planner, and worker pool
|
||||
|
||||
**Agent-to-Agent Communication:**
|
||||
- **RolePlaying:** Structured turn-taking between assistant and user agents
|
||||
- **Workforce:** Coordinator assigns tasks via `TaskChannel`, workers return results
|
||||
- **Worker types:** `SingleAgentWorker` (single ChatAgent), `RolePlayingWorker` (two-agent pair)
|
||||
|
||||
**Shared Memory / Task Channel:**
|
||||
- `TaskChannel` — async queue-based task dispatch with packet tracking
|
||||
- States: SENT → PROCESSING → RETURNED → ARCHIVED
|
||||
- O(1) lookup by task ID, status-based filtering, assignee/publisher queues
|
||||
- `WorkflowMemoryManager` — persists workflow patterns as markdown files
|
||||
- Role-based organization: workflows stored by `role_identifier`
|
||||
- Agent-based intelligent selection: LLM picks relevant past workflows
|
||||
- Versioned: metadata tracks creation time and version numbers
|
||||
|
||||
**Task Delegation:**
|
||||
- Coordinator agent decomposes complex tasks using LLM analysis
|
||||
- Tasks assigned to workers based on capability matching
|
||||
- Failed tasks trigger: retry, create new worker, or further decomposition
|
||||
- `FailureHandlingConfig` with configurable `RecoveryStrategy`
|
||||
|
||||
**Consensus / Quality:**
|
||||
- Quality evaluation via structured output (response format enforced)
|
||||
- Task dependencies tracked (worker receives dependency tasks as context)
|
||||
- `WorkforceMetrics` for tracking execution statistics
|
||||
|
||||
---
|
||||
|
||||
## 2. Key Architectural Patterns for Fleet Knowledge Graph
|
||||
|
||||
### 2.1 Communication Topology Patterns
|
||||
|
||||
| Pattern | Used By | Description |
|
||||
|---------|---------|-------------|
|
||||
| **Sequential Chain** | CrewAI, ChatDev, MetaGPT | A→B→C linear flow, output feeds next |
|
||||
| **Shared Thread** | AutoGen | All agents see all messages |
|
||||
| **Publish-Subscribe** | MetaGPT | Environment-based message bus |
|
||||
| **Paired Chat** | ChatDev, CAMEL | Two-agent conversation pairs |
|
||||
| **Handoff Routing** | AutoGen Swarm | Agent explicitly names next speaker |
|
||||
| **DAG Graph** | AutoGen GraphFlow | Conditional edges, parallel, loops |
|
||||
| **Ledger Orchestration** | AutoGen MagenticOne | Maintains task ledger, re-plans |
|
||||
| **Task Channel** | CAMEL | Async queue with packet states |
|
||||
|
||||
### 2.2 Shared State Patterns
|
||||
|
||||
| Pattern | Used By | Description |
|
||||
|---------|---------|-------------|
|
||||
| **Vector Memory** | CrewAI | Embeddings + scope-based namespacing |
|
||||
| **Message History** | AutoGen | Sequential conversation context |
|
||||
| **File System** | MetaGPT, ChatDev | Agents read/write shared files |
|
||||
| **Task Channel** | CAMEL | Async packet-based task dispatch |
|
||||
| **Workflow Files** | CAMEL | Markdown-based workflow memory |
|
||||
| **Tool Cache** | CrewAI | In-memory RWLock tool result cache |
|
||||
| **State Checkpoint** | CrewAI, AutoGen | Serialized Pydantic/SQLite checkpoints |
|
||||
|
||||
### 2.3 Task Delegation Patterns
|
||||
|
||||
| Pattern | Used By | Description |
|
||||
|---------|---------|-------------|
|
||||
| **Role Assignment** | CrewAI | Fixed agent per task |
|
||||
| **Manager Delegation** | CrewAI Hierarchical | Manager assigns tasks dynamically |
|
||||
| **Speaker Selection** | AutoGen Selector | LLM picks next agent |
|
||||
| **Handoff** | AutoGen Swarm | Agent explicitly transfers control |
|
||||
| **SOP Routing** | MetaGPT | Stage-based implicit delegation |
|
||||
| **Coordinator** | CAMEL Workforce | LLM-based task decomposition + assignment |
|
||||
| **Dynamic Worker Creation** | CAMEL Workforce | Create new workers on failure |
|
||||
|
||||
### 2.4 Conflict Resolution Patterns
|
||||
|
||||
| Pattern | Used By | Description |
|
||||
|---------|---------|-------------|
|
||||
| **Manager Arbitration** | CrewAI Hierarchical | Manager resolves conflicts |
|
||||
| **Critic-in-the-loop** | CAMEL | Critic agent evaluates and selects |
|
||||
| **Quality Gate** | CAMEL Workforce | Structured quality evaluation |
|
||||
| **Termination Conditions** | AutoGen | Composable stop conditions |
|
||||
| **Stall Detection** | AutoGen MagenticOne | Re-plans when progress stalls |
|
||||
|
||||
---
|
||||
|
||||
## 3. Recommendations for Hermes Fleet Knowledge Graph
|
||||
|
||||
### 3.1 Architecture: Hybrid Graph + Memory
|
||||
|
||||
Based on the SOTA analysis, the optimal fleet knowledge graph should combine:
|
||||
|
||||
1. **CrewAI's scoped memory** for hierarchical knowledge organization
|
||||
- Path-based namespaces: `/fleet/{fleet_id}/agent/{agent_id}/diary`
|
||||
- Composite scoring: semantic + recency + importance
|
||||
- Background writes with read barriers
|
||||
|
||||
2. **CAMEL's TaskChannel** for task dispatch and tracking
|
||||
- Packet states (SENT → PROCESSING → RETURNED → ARCHIVED)
|
||||
- O(1) lookup by task ID
|
||||
- Assignee/publisher tracking
|
||||
|
||||
3. **AutoGen's DiGraph** for execution flow definition
|
||||
- DAG with conditional edges for complex workflows
|
||||
- Parallel fan-out for independent tasks
|
||||
- Activation conditions (all vs any) for synchronization points
|
||||
|
||||
4. **AutoGen MagenticOne's ledger** for shared task context
|
||||
- Maintained facts, plan, and progress ledger
|
||||
- Dynamic re-planning on stalls
|
||||
|
||||
### 3.2 Fleet Knowledge Graph Schema
|
||||
|
||||
```
|
||||
/fleet/{fleet_id}/
|
||||
├── shared/ # Shared knowledge (all agents read)
|
||||
│ ├── facts/ # Known facts, constraints
|
||||
│ ├── decisions/ # Record of decisions made
|
||||
│ └── context/ # Active task context
|
||||
├── agent/{agent_id}/
|
||||
│ ├── diary/ # Agent's personal experience log
|
||||
│ ├── capabilities/ # What this agent can do
|
||||
│ └── state/ # Current task state
|
||||
├── tasks/
|
||||
│ ├── {task_id}/ # Task metadata, dependencies, status
|
||||
│ └── graph/ # DAG definition for task dependencies
|
||||
└── consensus/
|
||||
├── proposals/ # Pending proposals
|
||||
└── decisions/ # Resolved consensus decisions
|
||||
```
|
||||
|
||||
### 3.3 Key Design Decisions
|
||||
|
||||
1. **Diary System (Agent Memory):**
|
||||
- Each agent writes to its own scoped memory after every significant action
|
||||
- LLM-analyzed importance scoring (like CrewAI's unified memory)
|
||||
- Cross-agent recall: agents can query other agents' diaries for relevant experiences
|
||||
- Decay: old low-importance memories expire
|
||||
|
||||
2. **Shared State (Fleet Knowledge):**
|
||||
- SQLite-backed (like Hermes' existing `state.db`) with FTS5 search
|
||||
- Hierarchical scopes (like CrewAI's MemoryScope)
|
||||
- Write-ahead log for concurrent access
|
||||
- Read barriers before queries (like CrewAI's `drain_writes`)
|
||||
|
||||
3. **Task Delegation:**
|
||||
- Coordinator pattern (like CAMEL's Workforce)
|
||||
- Task decomposition via LLM
|
||||
- Failed task → retry, reassign, or decompose
|
||||
- Max depth limit (like Hermes' existing MAX_DEPTH=2)
|
||||
|
||||
4. **Consensus Protocol:**
|
||||
- Proposal-based: agent proposes, others vote/acknowledge
|
||||
- Timeout-based fallback: if no response within N seconds, proceed
|
||||
- Manager override: designated manager can break ties
|
||||
- Simple majority for non-critical, unanimity for critical decisions
|
||||
|
||||
5. **Conflict Resolution:**
|
||||
- Last-write-wins for non-critical state
|
||||
- Optimistic locking with version numbers
|
||||
- Manager arbitration for task assignment conflicts
|
||||
- Quality gates (like CAMEL) for output validation
|
||||
|
||||
### 3.4 Integration with Existing Hermes Architecture
|
||||
|
||||
Hermes already has strong foundations:
|
||||
- **Delegation system** (`delegate_tool.py`): Isolated child agents, parallel execution, depth limits
|
||||
- **State DB** (`hermes_state.py`): SQLite + FTS5, WAL mode, session tracking, message history
|
||||
- **Credential pools**: Shared credentials with rotation
|
||||
|
||||
The fleet knowledge graph should extend these patterns:
|
||||
- **Session DB → Fleet DB:** Add tables for fleet metadata, agent registrations, task graphs
|
||||
- **Memory tool → Fleet Memory:** Scoped vector memory shared across fleet agents
|
||||
- **Delegate tool → Fleet Delegation:** Task channel with persistence, quality evaluation
|
||||
- **New: Consensus module:** Proposal/vote protocol with timeout handling
|
||||
|
||||
---
|
||||
|
||||
## 4. Reference Implementations
|
||||
|
||||
| Component | Best Reference | Key Takeaway |
|
||||
|-----------|---------------|--------------|
|
||||
| Scoped Memory | CrewAI `Memory` + `MemoryScope` | Path-based namespaces, composite scoring, background writes |
|
||||
| Task Dispatch | CAMEL `TaskChannel` | Packet-based with state machine, O(1) lookup |
|
||||
| Execution DAG | AutoGen `DiGraphBuilder` | Fluent builder, conditional edges, activation groups |
|
||||
| Orchestration | AutoGen `MagenticOneOrchestrator` | Ledger-based planning, stall detection, re-planning |
|
||||
| Agent Communication | AutoGen `SelectorGroupChat` | LLM-based speaker selection, shared message thread |
|
||||
| Quality Evaluation | CAMEL Workforce | Structured output for quality scoring |
|
||||
| Workflow Memory | CAMEL `WorkflowMemoryManager` | Markdown-based, role-organized, versioned |
|
||||
| State Checkpoint | CrewAI `SqliteProvider` | JSONB checkpoints, WAL mode |
|
||||
| Tool Cache | CrewAI `CacheHandler` | RWLock-based concurrent tool result cache |
|
||||
|
||||
---
|
||||
|
||||
## 5. Open Questions
|
||||
|
||||
1. **Graph vs Vector for knowledge:** Should fleet knowledge use a proper graph DB (e.g., Neo4j) or stick with vector + SQLite?
|
||||
- Recommendation: Start with SQLite + vectors (existing stack), add graph later if needed
|
||||
|
||||
2. **Real-time vs Batch:** Should agents receive updates in real-time or batched?
|
||||
- Recommendation: Event-driven for critical updates, batched for diary entries
|
||||
|
||||
3. **Security model:** How should cross-agent access be controlled?
|
||||
- Recommendation: Role-based ACLs on scope paths, similar to CrewAI's privacy flags
|
||||
|
||||
4. **Scalability:** How many agents can a single fleet support?
|
||||
- Recommendation: Start with 10-agent fleets, optimize SQLite concurrency first
|
||||
|
||||
@@ -1,301 +0,0 @@
|
||||
# SOTA LLM Inference Optimization - Research Report
|
||||
**Date: April 2026 | Focus: vLLM + TurboQuant deployment**
|
||||
|
||||
---
|
||||
|
||||
## 1. EXECUTIVE SUMMARY
|
||||
|
||||
Key findings for your vLLM + TurboQuant deployment targeting 60% cost reduction:
|
||||
|
||||
- vLLM delivers 24x throughput improvement over HF Transformers, 3.5x over TGI
|
||||
- FP8 quantization on H100/B200 provides near-lossless 2x throughput improvement
|
||||
- INT4 AWQ enables 75% VRAM reduction with less than 1% quality loss on most benchmarks
|
||||
- PagedAttention reduces KV-cache memory waste from 60-80% down to under 4%
|
||||
- Cost per 1M tokens ranges $0.05-0.50 for self-hosted vs $0.50-15.00 for API providers
|
||||
|
||||
---
|
||||
|
||||
## 2. INFERENCE FRAMEWORKS COMPARISON
|
||||
|
||||
### vLLM (Primary Recommendation)
|
||||
**Status: Leading open-source serving framework**
|
||||
|
||||
Key features (v0.8.x, 2025-2026):
|
||||
- PagedAttention for efficient KV-cache management
|
||||
- Continuous batching + chunked prefill
|
||||
- Prefix caching (automatic prompt caching)
|
||||
- Quantization support: FP8, MXFP8/MXFP4, NVFP4, INT8, INT4, GPTQ, AWQ, GGUF
|
||||
- Optimized attention kernels: FlashAttention, FlashInfer, TRTLLM-GEN, FlashMLA
|
||||
- Speculative decoding: EAGLE, DFlash, n-gram
|
||||
- Disaggregated prefill/decode
|
||||
- 200+ model architectures supported
|
||||
|
||||
Benchmark Numbers:
|
||||
- vLLM vs HF Transformers: 24x higher throughput
|
||||
- vLLM vs TGI: 3.5x higher throughput
|
||||
- LMSYS Chatbot Arena: 30x faster than initial HF backend
|
||||
- GPU reduction at equal throughput: 50% savings
|
||||
|
||||
### llama.cpp
|
||||
**Status: Best for CPU/edge/local inference**
|
||||
|
||||
Key features:
|
||||
- GGUF format with 1.5-bit to 8-bit quantization
|
||||
- Apple Silicon first-class support (Metal, Accelerate)
|
||||
- AVX/AVX2/AVX512/AMX for x86
|
||||
- CUDA, ROCm (AMD), MUSA (Moore Threads), Vulkan, SYCL
|
||||
- CPU+GPU hybrid inference (partial offloading)
|
||||
- Multimodal support
|
||||
- OpenAI-compatible server
|
||||
|
||||
Best for: Local development, edge deployment, Apple Silicon, CPU-only servers
|
||||
|
||||
### TensorRT-LLM
|
||||
**Status: Highest throughput on NVIDIA GPUs**
|
||||
|
||||
Key features:
|
||||
- NVIDIA-optimized kernels (XQA, FP8/FP4 GEMM)
|
||||
- In-flight batching
|
||||
- FP8/INT4 AWQ quantization
|
||||
- Speculative decoding (EAGLE3, n-gram)
|
||||
- Disaggregated serving
|
||||
- Expert parallelism for MoE
|
||||
- Now fully open-source (March 2025)
|
||||
|
||||
Benchmark Numbers (Official NVIDIA):
|
||||
- Llama2-13B on H200 (FP8): ~12,000 tok/s
|
||||
- Llama-70B on H100 (FP8, XQA kernel): ~2,400 tok/s/GPU
|
||||
- Llama 4 Maverick on B200 (FP8): 40,000+ tok/s
|
||||
- H100 vs A100 speedup: 4.6x
|
||||
- Falcon-180B on single H200: possible with INT4 AWQ
|
||||
|
||||
---
|
||||
|
||||
## 3. QUANTIZATION TECHNIQUES - DETAILED COMPARISON
|
||||
|
||||
### GPTQ (Post-Training Quantization)
|
||||
- Method: One-shot layer-wise quantization using Hessian-based error compensation
|
||||
- Typical bit-width: 3-bit, 4-bit, 8-bit
|
||||
- Quality loss: Less than 1% accuracy drop at 4-bit on most benchmarks
|
||||
- Speed: 1.5-2x inference speedup on GPU (vs FP16)
|
||||
- VRAM savings: ~75% at 4-bit (vs FP16)
|
||||
- Best for: General-purpose GPU deployment, wide model support
|
||||
|
||||
### AWQ (Activation-Aware Weight Quantization)
|
||||
- Method: Identifies salient weight channels using activation distributions
|
||||
- Typical bit-width: 4-bit (W4A16), also supports W4A8
|
||||
- Quality loss: ~0.5% accuracy drop at 4-bit (better than GPTQ)
|
||||
- Speed: 2-3x inference speedup on GPU, faster than GPTQ at same bit-width
|
||||
- VRAM savings: ~75% at 4-bit
|
||||
- Best for: High-throughput GPU serving, production deployments
|
||||
- Supported by: vLLM, TensorRT-LLM, TGI natively
|
||||
|
||||
### GGUF (llama.cpp format)
|
||||
- Method: Multiple quantization types (Q2_K through Q8_0)
|
||||
- Bit-widths: 1.5-bit, 2-bit, 3-bit, 4-bit, 5-bit, 6-bit, 8-bit
|
||||
- Quality at Q4_K_M: Comparable to GPTQ-4bit
|
||||
- Speed: Optimized for CPU inference, 2-4x faster than FP16 on CPU
|
||||
- Best for: CPU deployment, Apple Silicon, edge devices, hybrid CPU+GPU
|
||||
- Notable: Q4_K_M is the sweet spot for quality/speed tradeoff
|
||||
|
||||
### FP8 Quantization (H100/B200 Native)
|
||||
- Method: E4M3 or E5M2 floating point, hardware-native on Hopper/Blackwell
|
||||
- Quality loss: Near-zero (less than 0.1% on most benchmarks)
|
||||
- Speed: ~2x throughput improvement on H100/B200
|
||||
- VRAM savings: 50% vs FP16
|
||||
- Best for: H100/H200/B200 GPUs where hardware support exists
|
||||
|
||||
### FP4 / NVFP4 (Blackwell Native)
|
||||
- Method: 4-bit floating point, native on Blackwell GPUs
|
||||
- Quality loss: Less than 0.5% on most benchmarks
|
||||
- Speed: ~4x throughput improvement vs FP16
|
||||
- VRAM savings: 75% vs FP16
|
||||
- Best for: B200/GB200 deployments, maximum cost efficiency
|
||||
|
||||
### Quantization Quality Comparison (Llama-70B class models)
|
||||
| Method | Bits | MMLU | HumanEval | GSM8K | VRAM |
|
||||
|-----------|------|------|-----------|-------|--------|
|
||||
| FP16 | 16 | 78.5 | 81.0 | 56.8 | 140GB |
|
||||
| FP8 | 8 | 78.4 | 80.8 | 56.5 | 70GB |
|
||||
| AWQ-4bit | 4 | 77.9 | 80.2 | 55.8 | 36GB |
|
||||
| GPTQ-4bit | 4 | 77.6 | 79.8 | 55.2 | 36GB |
|
||||
| GGUF Q4_K_M | 4 | 77.5 | 79.5 | 55.0 | 36GB |
|
||||
| GPTQ-3bit | 3 | 75.8 | 77.2 | 52.1 | 28GB |
|
||||
|
||||
---
|
||||
|
||||
## 4. KV-CACHE COMPRESSION
|
||||
|
||||
### Current State of KV-Cache Optimization
|
||||
|
||||
**1. PagedAttention (vLLM)**
|
||||
- Reduces KV-cache memory waste from 60-80% to under 4%
|
||||
- Enables Copy-on-Write for parallel sampling
|
||||
- Up to 55% memory reduction for beam search
|
||||
- Up to 2.2x throughput improvement from memory efficiency
|
||||
|
||||
**2. KV-Cache Quantization**
|
||||
- FP8 KV-cache: 50% memory reduction, minimal quality impact
|
||||
- INT8 KV-cache: 75% memory reduction, slight quality degradation
|
||||
- Supported in vLLM (FP8) and TensorRT-LLM (FP8/INT8)
|
||||
|
||||
**3. GQA/MQA Architectural Compression**
|
||||
- Grouped-Query Attention (GQA): Reduces KV heads
|
||||
- Llama 2 70B: 8 KV heads vs 64 Q heads = 8x KV-cache reduction
|
||||
- Multi-Query Attention (MQA): Single KV head (Falcon, PaLM)
|
||||
|
||||
**4. Sliding Window Attention**
|
||||
- Mistral-style: Only cache last N tokens (e.g., 4096)
|
||||
- Reduces KV-cache by 75%+ for long sequences
|
||||
|
||||
**5. H2O (Heavy Hitter Oracle)**
|
||||
- Keeps only top-k attention-heavy KV pairs
|
||||
- 20x KV-cache reduction with less than 1% quality loss
|
||||
|
||||
**6. Sparse Attention (TensorRT-LLM)**
|
||||
- Block-sparse attention patterns
|
||||
- Skip Softmax Attention for long contexts
|
||||
|
||||
### KV-Cache Memory Requirements (Llama-70B, FP16)
|
||||
- Standard MHA: ~2.5MB per token, ~10GB at 4K context
|
||||
- GQA (Llama 2): ~0.32MB per token, ~1.3GB at 4K context
|
||||
- GQA + FP8: ~0.16MB per token, ~0.65GB at 4K context
|
||||
|
||||
---
|
||||
|
||||
## 5. THROUGHPUT BENCHMARKS
|
||||
|
||||
### Tokens/Second by Hardware (Single User, Output Tokens)
|
||||
|
||||
Llama-70B Class Models:
|
||||
- A100 80GB + vLLM FP16: ~30-40 tok/s
|
||||
- A100 80GB + TensorRT-LLM FP8: ~60-80 tok/s
|
||||
- H100 80GB + vLLM FP8: ~80-120 tok/s
|
||||
- H100 80GB + TensorRT-LLM FP8: ~120-150 tok/s
|
||||
- H200 141GB + TensorRT-LLM FP8: ~150-200 tok/s
|
||||
- B200 180GB + TensorRT-LLM FP4: ~250-400 tok/s
|
||||
|
||||
Llama-7B Class Models:
|
||||
- A10G 24GB + vLLM FP16: ~100-150 tok/s
|
||||
- RTX 4090 + llama.cpp Q4_K_M: ~80-120 tok/s
|
||||
- A100 80GB + vLLM FP16: ~200-300 tok/s
|
||||
- H100 80GB + TensorRT-LLM FP8: ~400-600 tok/s
|
||||
|
||||
### Throughput Under Load (vLLM on A100 80GB, Llama-13B)
|
||||
- 1 concurrent user: ~40 tok/s total, 50ms latency
|
||||
- 10 concurrent users: ~280 tok/s total, 120ms latency
|
||||
- 50 concurrent users: ~800 tok/s total, 350ms latency
|
||||
- 100 concurrent users: ~1100 tok/s total, 800ms latency
|
||||
|
||||
### Batch Inference Throughput
|
||||
- Llama-70B on 4xH100 TP4 + vLLM: 5,000-8,000 tok/s
|
||||
- Llama-70B on 4xH100 TP4 + TensorRT-LLM: 8,000-12,000 tok/s
|
||||
- Llama-70B on 8xH100 TP8 + TensorRT-LLM: 15,000-20,000 tok/s
|
||||
|
||||
---
|
||||
|
||||
## 6. COST COMPARISONS
|
||||
|
||||
### Cloud GPU Pricing (On-Demand, April 2026 estimates)
|
||||
| GPU | VRAM | $/hr (AWS) | $/hr (GCP) | $/hr (Lambda) |
|
||||
|------------|-------|-----------|-----------|--------------|
|
||||
| A10G | 24GB | $1.50 | $1.40 | $0.75 |
|
||||
| A100 40GB | 40GB | $3.50 | $3.20 | $1.50 |
|
||||
| A100 80GB | 80GB | $4.50 | $4.00 | $2.00 |
|
||||
| H100 80GB | 80GB | $12.00 | $11.00 | $4.00 |
|
||||
| H200 141GB | 141GB | $15.00 | $13.50 | $5.50 |
|
||||
| B200 180GB | 180GB | $20.00 | $18.00 | - |
|
||||
|
||||
### Cost per 1M Tokens (Llama-70B, Output Tokens)
|
||||
|
||||
Self-Hosted (vLLM on cloud GPUs):
|
||||
- 1xH100 FP8: ~$11.11/1M tokens
|
||||
- 1xH100 AWQ-4bit: ~$9.26/1M tokens
|
||||
- 4xH100 TP4 FP8: ~$12.70/1M tokens
|
||||
- 2xA100 TP2 FP16: ~$18.52/1M tokens
|
||||
|
||||
API Providers (for comparison):
|
||||
- OpenAI GPT-4o: $10.00/1M output tokens
|
||||
- Anthropic Claude 3.5: $15.00/1M output tokens
|
||||
- Together AI Llama-70B: $0.90/1M tokens
|
||||
- Fireworks AI Llama-70B: $0.90/1M tokens
|
||||
- DeepInfra Llama-70B: $0.70/1M tokens
|
||||
- Groq Llama-70B: $0.79/1M tokens
|
||||
|
||||
### Your 60% Cost Reduction Target
|
||||
|
||||
To achieve 60% cost reduction with vLLM + TurboQuant:
|
||||
|
||||
1. Quantization: Moving from FP16 to INT4/FP8 reduces VRAM by 50-75%
|
||||
2. PagedAttention: Enables 2-3x more concurrent requests per GPU
|
||||
3. Continuous batching: Maximizes GPU utilization (over 90%)
|
||||
4. Prefix caching: 30-50% speedup for repeated system prompts
|
||||
|
||||
Recommended configuration:
|
||||
- Hardware: 1-2x H100 (or 2-4x A100 for cost-sensitive)
|
||||
- Quantization: FP8 (quality-first) or AWQ-4bit (cost-first)
|
||||
- KV-cache: FP8 quantization
|
||||
- Framework: vLLM with prefix caching enabled
|
||||
- Expected cost: $2-5 per 1M output tokens (70B model)
|
||||
|
||||
---
|
||||
|
||||
## 7. QUALITY DEGRADATION ANALYSIS
|
||||
|
||||
### Benchmark Impact by Quantization (Llama-70B)
|
||||
| Benchmark | FP16 | FP8 | AWQ-4bit | GPTQ-4bit | GGUF Q4_K_M |
|
||||
|-------------|------|------|----------|-----------|-------------|
|
||||
| MMLU | 78.5 | 78.4 | 77.9 | 77.6 | 77.5 |
|
||||
| HumanEval | 81.0 | 80.8 | 80.2 | 79.8 | 79.5 |
|
||||
| GSM8K | 56.8 | 56.5 | 55.8 | 55.2 | 55.0 |
|
||||
| TruthfulQA | 51.2 | 51.0 | 50.5 | 50.2 | 50.0 |
|
||||
| Average Drop| - | 0.2% | 0.8% | 1.1% | 1.2% |
|
||||
|
||||
---
|
||||
|
||||
## 8. RECOMMENDATIONS FOR YOUR DEPLOYMENT
|
||||
|
||||
### Immediate Actions
|
||||
1. Benchmark TurboQuant against AWQ-4bit baseline on your workloads
|
||||
2. Enable vLLM prefix caching - immediate 30-50% speedup for repeated prompts
|
||||
3. Use FP8 KV-cache quantization - free 50% memory savings
|
||||
4. Set continuous batching with appropriate max_num_seqs
|
||||
|
||||
### Configuration for Maximum Cost Efficiency
|
||||
```
|
||||
vllm serve your-model \
|
||||
--quantization awq \
|
||||
--kv-cache-dtype fp8 \
|
||||
--enable-prefix-caching \
|
||||
--max-num-seqs 256 \
|
||||
--enable-chunked-prefill \
|
||||
--max-num-batched-tokens 32768
|
||||
```
|
||||
|
||||
### Monitoring Metrics
|
||||
- Tokens/sec/GPU: Target over 100 for 70B models on H100
|
||||
- GPU utilization: Target over 90%
|
||||
- KV-cache utilization: Target over 80% (thanks to PagedAttention)
|
||||
- P99 latency: Monitor against your SLA requirements
|
||||
- Cost per 1M tokens: Track actual vs projected
|
||||
|
||||
### Scaling Strategy
|
||||
- Start with 1x H100 for less than 5B tokens/month
|
||||
- Scale to 2-4x H100 with TP for 5-20B tokens/month
|
||||
- Consider B200/FP4 for over 20B tokens/month (when available)
|
||||
|
||||
---
|
||||
|
||||
## 9. KEY REFERENCES
|
||||
|
||||
- vLLM Paper: "Efficient Memory Management for Large Language Model Serving with PagedAttention" (SOSP 2023)
|
||||
- AWQ Paper: "AWQ: Activation-aware Weight Quantization for LLM Compression and Acceleration" (MLSys 2024)
|
||||
- GPTQ Paper: "GPTQ: Accurate Post-Training Quantization for Generative Pre-trained Transformers" (ICLR 2023)
|
||||
- TensorRT-LLM Performance: https://nvidia.github.io/TensorRT-LLM/developer-guide/perf-overview.html
|
||||
- llama.cpp: https://github.com/ggml-org/llama.cpp
|
||||
- vLLM: https://github.com/vllm-project/vllm
|
||||
|
||||
---
|
||||
|
||||
Report generated for vLLM + TurboQuant deployment planning.
|
||||
All benchmark numbers are approximate and should be validated on your specific hardware and workload.
|
||||
@@ -28,7 +28,6 @@ from typing import Dict, Any, List, Optional, Tuple
|
||||
|
||||
from tools.registry import discover_builtin_tools, registry
|
||||
from toolsets import resolve_toolset, validate_toolset
|
||||
from agent.tool_orchestrator import orchestrator
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -500,13 +499,13 @@ def handle_function_call(
|
||||
# Prefer the caller-provided list so subagents can't overwrite
|
||||
# the parent's tool set via the process-global.
|
||||
sandbox_enabled = enabled_tools if enabled_tools is not None else _last_resolved_tool_names
|
||||
result = orchestrator.dispatch(
|
||||
result = registry.dispatch(
|
||||
function_name, function_args,
|
||||
task_id=task_id,
|
||||
enabled_tools=sandbox_enabled,
|
||||
)
|
||||
else:
|
||||
result = orchestrator.dispatch(
|
||||
result = registry.dispatch(
|
||||
function_name, function_args,
|
||||
task_id=task_id,
|
||||
user_task=user_task,
|
||||
|
||||
@@ -1,314 +0,0 @@
|
||||
# Local Model Quality for Crisis Support: Research Report
|
||||
## Mission: Reaching Broken Men in Their Darkest Moment
|
||||
|
||||
---
|
||||
|
||||
## Executive Summary
|
||||
|
||||
Local models (Ollama) CAN handle crisis support with adequate quality for the Most Sacred Moment protocol. Research demonstrates that even small local models (1.5B-7B parameters) achieve performance comparable to trained human operators in crisis detection tasks. However, they require careful implementation with safety guardrails and should complement—not replace—human oversight.
|
||||
|
||||
**Key Finding:** A fine-tuned 1.5B parameter Qwen model outperformed larger models on mood and suicidal ideation detection tasks (PsyCrisisBench, 2025).
|
||||
|
||||
---
|
||||
|
||||
## 1. Crisis Detection Accuracy
|
||||
|
||||
### Research Evidence
|
||||
|
||||
**PsyCrisisBench (2025)** - The most comprehensive benchmark to date:
|
||||
- Source: 540 annotated transcripts from Hangzhou Psychological Assistance Hotline
|
||||
- Models tested: 64 LLMs across 15 families (GPT, Claude, Gemini, Llama, Qwen, DeepSeek)
|
||||
- Results:
|
||||
- **Suicidal ideation detection: F1=0.880** (88% accuracy)
|
||||
- **Suicide plan identification: F1=0.779** (78% accuracy)
|
||||
- **Risk assessment: F1=0.907** (91% accuracy)
|
||||
- **Mood status recognition: F1=0.709** (71% accuracy - challenging due to missing vocal cues)
|
||||
|
||||
**Llama-2 for Suicide Detection (British Journal of Psychiatry, 2024):**
|
||||
- German fine-tuned Llama-2 model achieved:
|
||||
- **Accuracy: 87.5%**
|
||||
- **Sensitivity: 83.0%**
|
||||
- **Specificity: 91.8%**
|
||||
- Locally hosted, privacy-preserving approach
|
||||
|
||||
**Supportiv Hybrid AI Study (2026):**
|
||||
- AI detected SI faster than humans in **77.52% passive** and **81.26% active** cases
|
||||
- **90.3% agreement** between AI and human moderators
|
||||
- Processed **169,181 live-chat transcripts** (449,946 user visits)
|
||||
|
||||
### False Positive/Negative Rates
|
||||
|
||||
Based on the research:
|
||||
- **False Negative Rate (missed crisis):** ~12-17% for suicidal ideation
|
||||
- **False Positive Rate:** ~8-12%
|
||||
- **Risk Assessment Error:** ~9% overall
|
||||
|
||||
**Critical insight:** The research shows LLMs and trained human operators have *complementary* strengths—humans are better at mood recognition and suicidal ideation, while LLMs excel at risk assessment and suicide plan identification.
|
||||
|
||||
---
|
||||
|
||||
## 2. Emotional Understanding
|
||||
|
||||
### Can Local Models Understand Emotional Nuance?
|
||||
|
||||
**Yes, with limitations:**
|
||||
|
||||
1. **Emotion Recognition:**
|
||||
- Maximum F1 of 0.709 for mood status (PsyCrisisBench)
|
||||
- Missing vocal cues is a significant limitation in text-only
|
||||
- Semantic ambiguity creates challenges
|
||||
|
||||
2. **Empathy in Responses:**
|
||||
- LLMs demonstrate ability to generate empathetic responses
|
||||
- Research shows they deliver "superior explanations" (BERTScore=0.9408)
|
||||
- Human evaluations confirm adequate interviewing skills
|
||||
|
||||
3. **Emotional Support Conversation (ESConv) benchmarks:**
|
||||
- Models trained on emotional support datasets show improved empathy
|
||||
- Few-shot prompting significantly improves emotional understanding
|
||||
- Fine-tuning narrows the gap with larger models
|
||||
|
||||
### Key Limitations
|
||||
- Cannot detect tone, urgency in voice, or hesitation
|
||||
- Cultural and linguistic nuances may be missed
|
||||
- Context window limitations may lose conversation history
|
||||
|
||||
---
|
||||
|
||||
## 3. Response Quality & Safety Protocols
|
||||
|
||||
### What Makes a Good Crisis Support Response?
|
||||
|
||||
**988 Suicide & Crisis Lifeline Guidelines:**
|
||||
1. Show you care ("I'm glad you told me")
|
||||
2. Ask directly about suicide ("Are you thinking about killing yourself?")
|
||||
3. Keep them safe (remove means, create safety plan)
|
||||
4. Be there (listen without judgment)
|
||||
5. Help them connect (to 988, crisis services)
|
||||
6. Follow up
|
||||
|
||||
**WHO mhGAP Guidelines:**
|
||||
- Assess risk level
|
||||
- Provide psychosocial support
|
||||
- Refer to specialized care when needed
|
||||
- Ensure follow-up
|
||||
- Involve family/support network
|
||||
|
||||
### Do Local Models Follow Safety Protocols?
|
||||
|
||||
**Research indicates:**
|
||||
|
||||
**Strengths:**
|
||||
- Can be prompted to follow structured safety protocols
|
||||
- Can detect and escalate high-risk situations
|
||||
- Can provide consistent, non-judgmental responses
|
||||
- Can operate 24/7 without fatigue
|
||||
|
||||
**Concerns:**
|
||||
- Only 33% of studies reported ethical considerations (Holmes et al., 2025)
|
||||
- Risk of "hallucinated" safety advice
|
||||
- Cannot physically intervene or call emergency services
|
||||
- May miss cultural context
|
||||
|
||||
### Safety Guardrails Required
|
||||
|
||||
1. **Mandatory escalation triggers** - Any detected suicidal ideation must trigger immediate human review
|
||||
2. **Crisis resource integration** - Always provide 988 Lifeline number
|
||||
3. **Conversation logging** - Full audit trail for safety review
|
||||
4. **Timeout protocols** - If user goes silent during crisis, escalate
|
||||
5. **No diagnostic claims** - Model should not diagnose or prescribe
|
||||
|
||||
---
|
||||
|
||||
## 4. Latency & Real-Time Performance
|
||||
|
||||
### Response Time Analysis
|
||||
|
||||
**Ollama Local Model Latency (typical hardware):**
|
||||
|
||||
| Model Size | First Token | Tokens/sec | Total Response (100 tokens) |
|
||||
|------------|-------------|------------|----------------------------|
|
||||
| 1-3B params | 0.1-0.3s | 30-80 | 1.5-3s |
|
||||
| 7B params | 0.3-0.8s | 15-40 | 3-7s |
|
||||
| 13B params | 0.5-1.5s | 8-20 | 5-13s |
|
||||
|
||||
**Crisis Support Requirements:**
|
||||
- Chat response should feel conversational: <5 seconds
|
||||
- Crisis detection should be near-instant: <1 second
|
||||
- Escalation must be immediate: 0 delay
|
||||
|
||||
**Assessment:**
|
||||
- **1-3B models:** Excellent for real-time conversation
|
||||
- **7B models:** Acceptable for most users
|
||||
- **13B+ models:** May feel slow, but manageable
|
||||
|
||||
### Hardware Considerations
|
||||
- **Consumer GPU (8GB VRAM):** Can run 7B models comfortably
|
||||
- **Consumer GPU (16GB+ VRAM):** Can run 13B models
|
||||
- **CPU only:** 3B-7B models with 2-5 second latency
|
||||
- **Apple Silicon (M1/M2/M3):** Excellent performance with Metal acceleration
|
||||
|
||||
---
|
||||
|
||||
## 5. Model Recommendations for Most Sacred Moment Protocol
|
||||
|
||||
### Tier 1: Primary Recommendation (Best Balance)
|
||||
|
||||
**Qwen2.5-7B or Qwen3-8B**
|
||||
- Size: ~4-5GB
|
||||
- Strength: Strong multilingual capabilities, good reasoning
|
||||
- Proven: Fine-tuned Qwen2.5-1.5B outperformed larger models in crisis detection
|
||||
- Latency: 2-5 seconds on consumer hardware
|
||||
- Use for: Main conversation, emotional support
|
||||
|
||||
### Tier 2: Lightweight Option (Mobile/Low-Resource)
|
||||
|
||||
**Phi-4-mini or Gemma3-4B**
|
||||
- Size: ~2-3GB
|
||||
- Strength: Fast inference, runs on modest hardware
|
||||
- Consideration: May need fine-tuning for crisis support
|
||||
- Latency: 1-3 seconds
|
||||
- Use for: Initial triage, quick responses
|
||||
|
||||
### Tier 3: Maximum Quality (When Resources Allow)
|
||||
|
||||
**Llama3.1-8B or Mistral-7B**
|
||||
- Size: ~4-5GB
|
||||
- Strength: Strong general capabilities
|
||||
- Consideration: Higher resource requirements
|
||||
- Latency: 3-7 seconds
|
||||
- Use for: Complex emotional situations
|
||||
|
||||
### Specialized Safety Model
|
||||
|
||||
**Llama-Guard3** (available on Ollama)
|
||||
- Purpose-built for content safety
|
||||
- Can be used as a secondary safety filter
|
||||
- Detects harmful content and self-harm references
|
||||
|
||||
---
|
||||
|
||||
## 6. Fine-Tuning Potential
|
||||
|
||||
Research shows fine-tuning dramatically improves crisis detection:
|
||||
|
||||
- **Without fine-tuning:** Best LLM lags supervised models by 6.95% (suicide task) to 31.53% (cognitive distortion)
|
||||
- **With fine-tuning:** Gap narrows to 4.31% and 3.14% respectively
|
||||
- **Key insight:** Even a 1.5B model, when fine-tuned, outperforms larger general models
|
||||
|
||||
### Recommended Fine-Tuning Approach
|
||||
1. Collect crisis conversation data (anonymized)
|
||||
2. Fine-tune on suicidal ideation detection
|
||||
3. Fine-tune on empathetic response generation
|
||||
4. Fine-tune on safety protocol adherence
|
||||
5. Evaluate with PsyCrisisBench methodology
|
||||
|
||||
---
|
||||
|
||||
## 7. Comparison: Local vs Cloud Models
|
||||
|
||||
| Factor | Local (Ollama) | Cloud (GPT-4/Claude) |
|
||||
|--------|----------------|----------------------|
|
||||
| **Privacy** | Complete | Data sent to third party |
|
||||
| **Latency** | Predictable | Variable (network) |
|
||||
| **Cost** | Hardware only | Per-token pricing |
|
||||
| **Availability** | Always online | Dependent on service |
|
||||
| **Quality** | Good (7B+) | Excellent |
|
||||
| **Safety** | Must implement | Built-in guardrails |
|
||||
| **Crisis Detection** | F1 ~0.85-0.90 | F1 ~0.88-0.92 |
|
||||
|
||||
**Verdict:** Local models are GOOD ENOUGH for crisis support, especially with fine-tuning and proper safety guardrails.
|
||||
|
||||
---
|
||||
|
||||
## 8. Implementation Recommendations
|
||||
|
||||
### For the Most Sacred Moment Protocol:
|
||||
|
||||
1. **Use a two-model architecture:**
|
||||
- Primary: Qwen2.5-7B for conversation
|
||||
- Safety: Llama-Guard3 for content filtering
|
||||
|
||||
2. **Implement strict escalation rules:**
|
||||
```
|
||||
IF suicidal_ideation_detected OR risk_level >= MODERATE:
|
||||
- Immediately provide 988 Lifeline number
|
||||
- Log conversation for human review
|
||||
- Continue supportive engagement
|
||||
- Alert monitoring system
|
||||
```
|
||||
|
||||
3. **System prompt must include:**
|
||||
- Crisis intervention guidelines
|
||||
- Mandatory safety behaviors
|
||||
- Escalation procedures
|
||||
- Empathetic communication principles
|
||||
|
||||
4. **Testing protocol:**
|
||||
- Evaluate with PsyCrisisBench-style metrics
|
||||
- Test with clinical scenarios
|
||||
- Validate with mental health professionals
|
||||
- Regular safety audits
|
||||
|
||||
---
|
||||
|
||||
## 9. Risks and Limitations
|
||||
|
||||
### Critical Risks
|
||||
1. **False negatives:** Missing someone in crisis (12-17% rate)
|
||||
2. **Over-reliance:** Users may treat AI as substitute for professional help
|
||||
3. **Hallucination:** Model may generate inappropriate or harmful advice
|
||||
4. **Liability:** Legal responsibility for AI-mediated crisis intervention
|
||||
|
||||
### Mitigations
|
||||
- Always include human escalation path
|
||||
- Clear disclaimers about AI limitations
|
||||
- Regular human review of conversations
|
||||
- Insurance and legal consultation
|
||||
|
||||
---
|
||||
|
||||
## 10. Key Citations
|
||||
|
||||
1. Deng et al. (2025). "Evaluating Large Language Models in Crisis Detection: A Real-World Benchmark from Psychological Support Hotlines." arXiv:2506.01329. PsyCrisisBench.
|
||||
|
||||
2. Wiest et al. (2024). "Detection of suicidality from medical text using privacy-preserving large language models." British Journal of Psychiatry, 225(6), 532-537.
|
||||
|
||||
3. Holmes et al. (2025). "Applications of Large Language Models in the Field of Suicide Prevention: Scoping Review." J Med Internet Res, 27, e63126.
|
||||
|
||||
4. Levkovich & Omar (2024). "Evaluating of BERT-based and Large Language Models for Suicide Detection, Prevention, and Risk Assessment." J Med Syst, 48(1), 113.
|
||||
|
||||
5. Shukla et al. (2026). "Effectiveness of Hybrid AI and Human Suicide Detection Within Digital Peer Support." J Clin Med, 15(5), 1929.
|
||||
|
||||
6. Qi et al. (2025). "Supervised Learning and Large Language Model Benchmarks on Mental Health Datasets." Bioengineering, 12(8), 882.
|
||||
|
||||
7. Liu et al. (2025). "Enhanced large language models for effective screening of depression and anxiety." Commun Med, 5(1), 457.
|
||||
|
||||
---
|
||||
|
||||
## Conclusion
|
||||
|
||||
**Local models ARE good enough for the Most Sacred Moment protocol.**
|
||||
|
||||
The research is clear:
|
||||
- Crisis detection F1 scores of 0.88-0.91 are achievable
|
||||
- Fine-tuned small models (1.5B-7B) can match or exceed human performance
|
||||
- Local deployment ensures complete privacy for vulnerable users
|
||||
- Latency is acceptable for real-time conversation
|
||||
- With proper safety guardrails, local models can serve as effective first responders
|
||||
|
||||
**The Most Sacred Moment protocol should:**
|
||||
1. Use Qwen2.5-7B or similar as primary conversational model
|
||||
2. Implement Llama-Guard3 as safety filter
|
||||
3. Build in immediate 988 Lifeline escalation
|
||||
4. Maintain human oversight and review
|
||||
5. Fine-tune on crisis-specific data when possible
|
||||
6. Test rigorously with clinical scenarios
|
||||
|
||||
The men in pain deserve privacy, speed, and compassionate support. Local models deliver all three.
|
||||
|
||||
---
|
||||
|
||||
*Report generated: 2026-04-14*
|
||||
*Research sources: PubMed, OpenAlex, ArXiv, Ollama Library*
|
||||
*For: Most Sacred Moment Protocol Development*
|
||||
@@ -1,168 +0,0 @@
|
||||
# SOTA Research: Structured Memory Systems for AI Agents
|
||||
|
||||
**Date**: 2026-04-14
|
||||
**Purpose**: Inform MemPalace integration for Hermes Agent
|
||||
|
||||
---
|
||||
|
||||
## 1. Landscape Overview
|
||||
|
||||
| System | Type | License | Retrieval Method | Storage |
|
||||
|--------|------|---------|-----------------|---------|
|
||||
| **MemPalace** | Local verbatim store | Open Source | ChromaDB vector + metadata filtering (wings/rooms) | ChromaDB + filesystem |
|
||||
| **Mem0** | Managed memory layer | Apache 2.0 | Vector DB + LLM extraction/consolidation | Qdrant/Chroma/Pinecone + graph |
|
||||
| **MemGPT/Letta** | OS-inspired memory tiers | MIT | Hierarchical recall (core/recall/archival) | In-context + DB archival |
|
||||
| **Zep** | Context engineering platform | Commercial | Temporal knowledge graph (Graphiti) + vector | Graph DB + vector |
|
||||
| **LangMem** | Memory toolkit (LangChain) | MIT | LangGraph store (semantic search) | Postgres/in-memory store |
|
||||
| **Engram** | CLI binary (Rust) | MIT | Hybrid Gemini Embed + FTS5 + RRF | SQLite FTS5 + embeddings |
|
||||
|
||||
---
|
||||
|
||||
## 2. Benchmark Comparison (LongMemEval)
|
||||
|
||||
LongMemEval is the primary academic benchmark for long-term memory retrieval. 500 questions, 96% distractors.
|
||||
|
||||
| System | LongMemEval R@5 | LongMemEval R@1 | API Required | Notes |
|
||||
|--------|----------------|-----------------|--------------|-------|
|
||||
| **MemPalace (raw)** | **96.6%** | — | None | Zero API calls, pure ChromaDB |
|
||||
| **MemPalace (hybrid+Haiku rerank)** | **100%** (500/500) | — | Optional | Reranking adds cost |
|
||||
| **MemPalace (AAAK compression)** | 84.2% | — | None | Lossy, 12.4pt regression vs raw |
|
||||
| **Engram (hybrid)** | 99.0% | 91.0% | Gemini API | R@5 beats MemPalace by 0.6pt |
|
||||
| **Engram (+Cohere rerank)** | 98.0% | 93.0% | Gemini+Cohere | First 100 Qs only |
|
||||
| **Mem0** | ~85% | — | Yes | On LOCOMO benchmark |
|
||||
| **Zep** | ~85% | — | Yes | Cloud service |
|
||||
| **Mastra** | 94.87% | — | Yes (GPT) | — |
|
||||
| **Supermemory ASMR** | ~99% | — | Yes | — |
|
||||
|
||||
### LOCOMO Benchmark (Mem0's paper, arXiv:2504.19413)
|
||||
|
||||
| Method | Accuracy | Median Search Latency | p95 Search Latency | End-to-End p95 | Tokens/Convo |
|
||||
|--------|----------|----------------------|-------------------|----------------|-------------|
|
||||
| **Full Context** | 72.9% | — | — | 17.12s | ~26,000 |
|
||||
| **Standard RAG** | 61.0% | 0.70s | 0.26s | — | — |
|
||||
| **OpenAI Memory** | 52.9% | — | — | — | — |
|
||||
| **Mem0** | 66.9% | 0.20s | 0.15s | 1.44s | ~1,800 |
|
||||
| **Mem0ᵍ (graph)** | 68.4% | 0.66s | 0.48s | 2.59s | — |
|
||||
|
||||
**Key Mem0 claims**: +26% accuracy over OpenAI Memory, 91% lower p95 latency vs full-context, 90% token savings.
|
||||
|
||||
---
|
||||
|
||||
## 3. Retrieval Latency
|
||||
|
||||
| System | Reported Latency | Notes |
|
||||
|--------|-----------------|-------|
|
||||
| **Mem0** | 0.20s median search, 0.71s end-to-end | LOCOMO benchmark |
|
||||
| **Zep** | <200ms claimed | Cloud service, sub-200ms SLA |
|
||||
| **MemPalace** | ~seconds for ChromaDB search | Local, depends on corpus size; raw mode is fast |
|
||||
| **Engram** | Fast (Rust binary) | No published latency numbers |
|
||||
| **LangMem** | Depends on underlying store | In-memory fast, Postgres slower |
|
||||
| **MemGPT/Letta** | Variable by tier | Core (in-context) is instant; archival has DB latency |
|
||||
|
||||
**Target for Hermes**: <100ms is achievable with local ChromaDB + small embedding model (all-MiniLM-L6-v2, ~50MB).
|
||||
|
||||
---
|
||||
|
||||
## 4. Compression Techniques
|
||||
|
||||
| System | Technique | Compression Ratio | Fidelity Impact |
|
||||
|--------|-----------|-------------------|-----------------|
|
||||
| **MemPalace AAAK** | Lossy abbreviation dialect (entity codes, truncation) | Claimed ~30x (disputed) | 12.4pt R@5 regression (96.6% → 84.2%) |
|
||||
| **Mem0** | LLM extraction → structured facts | ~14x token reduction (26K → 1.8K) | 6pt accuracy loss vs full-context |
|
||||
| **MemGPT** | Hierarchical summarization + eviction | Variable | Depends on tier management |
|
||||
| **Zep** | Graph compression + temporal invalidation | N/A | Maintains temporal accuracy |
|
||||
| **Engram** | None (stores raw) | 1x | No loss |
|
||||
| **LangMem** | Background consolidation via LLM | Variable | Depends on LLM quality |
|
||||
|
||||
**Key insight**: MemPalace's raw mode (no compression) achieves the best retrieval scores. Compression trades fidelity for token density. For Hermes, raw storage + semantic search is the safest starting point.
|
||||
|
||||
---
|
||||
|
||||
## 5. Architecture Patterns
|
||||
|
||||
### MemPalace (recommended for Hermes integration)
|
||||
- **Hierarchical**: Wings (scope: global/workspace) → Rooms (priority: explicit/implicit)
|
||||
- **Dual-store**: SQLite for canonical data, ChromaDB for vector search
|
||||
- **Verbatim storage**: No LLM extraction, raw conversation storage
|
||||
- **Explicit-first ranking**: User instructions always surface above auto-extracted context
|
||||
- **Workspace isolation**: Memories scoped per project
|
||||
|
||||
### Mem0 (graph-enhanced)
|
||||
- **Two-phase pipeline**: Extraction → Update
|
||||
- **LLM-driven**: Uses LLM to extract candidate memories, decide ADD/UPDATE/DELETE/NOOP
|
||||
- **Graph variant (Mem0ᵍ)**: Entity extraction → relationship graph → conflict detection → temporal updates
|
||||
- **Multi-level**: User, Session, Agent state
|
||||
|
||||
### Letta/MemGPT (OS-inspired)
|
||||
- **Memory tiers**: Core (in-context), Recall (searchable), Archival (deep storage)
|
||||
- **Self-editing**: Agent manages its own memory via function calls
|
||||
- **Interrupts**: Control flow between agent and user
|
||||
|
||||
### Zep (knowledge graph)
|
||||
- **Temporal knowledge graph**: Facts have valid_at/invalid_at timestamps
|
||||
- **Graph RAG**: Relationship-aware retrieval
|
||||
- **Powered by Graphiti**: Open-source temporal KG framework
|
||||
|
||||
---
|
||||
|
||||
## 6. Integration Patterns for Hermes
|
||||
|
||||
### Current Hermes Memory (memory_tool.py)
|
||||
- File-backed: MEMORY.md + USER.md
|
||||
- Delimiter-based entries (§)
|
||||
- Frozen snapshot in system prompt
|
||||
- No semantic search
|
||||
|
||||
### MemPalace Plugin (hermes_memorypalace)
|
||||
- Implements `MemoryProvider` ABC
|
||||
- ChromaDB + SQLite dual-store
|
||||
- Lifecycle hooks: initialize, system_prompt_block, prefetch, sync_turn
|
||||
- Tools: mempalace_remember_explicit, mempalace_store_implicit, mempalace_recall
|
||||
- Local embedding model (all-MiniLM-L6-v2)
|
||||
|
||||
### Recommended Integration Approach
|
||||
1. **Keep MEMORY.md/USER.md** as L0 (always-loaded baseline)
|
||||
2. **Add MemPalace** as L1 (semantic search layer)
|
||||
3. **Prefetch on each turn**: Run vector search before response generation
|
||||
4. **Background sync**: Store conversation turns as implicit context
|
||||
5. **Workspace scoping**: Isolate memories per project
|
||||
|
||||
---
|
||||
|
||||
## 7. Critical Caveats
|
||||
|
||||
1. **Retrieval ≠ Answer accuracy**: Engram team showed R@5 of 98.4% (MemPalace) can yield only 17% correct answers when an LLM actually tries to answer. The retrieval-to-accuracy gap is the real bottleneck.
|
||||
|
||||
2. **MemPalace's 96.6% is retrieval only**: Not end-to-end QA accuracy. End-to-end numbers are much lower (~17-40% depending on question difficulty).
|
||||
|
||||
3. **AAAK compression is lossy**: 12.4pt regression. Use raw mode for accuracy-critical work.
|
||||
|
||||
4. **Mem0's LOCOMO numbers are on a different benchmark**: Not directly comparable to LongMemEval scores.
|
||||
|
||||
5. **Latency depends heavily on corpus size and hardware**: Local ChromaDB on M2 Ultra runs fast; older hardware may not meet <100ms targets.
|
||||
|
||||
---
|
||||
|
||||
## 8. Recommendations for Hermes MemPalace Integration
|
||||
|
||||
| Metric | Target | Achievable? | Approach |
|
||||
|--------|--------|-------------|----------|
|
||||
| Retrieval latency | <100ms | Yes | Local ChromaDB + small model, pre-indexed |
|
||||
| Retrieval accuracy (R@5) | >95% | Yes | Raw verbatim mode, no compression |
|
||||
| Token efficiency | <2000 tokens/convo | Yes | Selective retrieval, not full-context |
|
||||
| Workspace isolation | Per-project | Yes | Wing-based scoping |
|
||||
| Zero cloud dependency | 100% local | Yes | all-MiniLM-L6-v2 runs offline |
|
||||
|
||||
**Priority**: Integrate existing hermes_memorypalace plugin with raw mode. Defer AAAK compression. Focus on retrieval latency and explicit-first ranking.
|
||||
|
||||
---
|
||||
|
||||
## Sources
|
||||
|
||||
- Mem0 paper: arXiv:2504.19413
|
||||
- MemGPT paper: arXiv:2310.08560
|
||||
- MemPalace repo: github.com/MemPalace/mempalace
|
||||
- Engram benchmarks: github.com/199-biotechnologies/engram-2
|
||||
- Hermes MemPalace plugin: github.com/neilharding/hermes_memorypalace
|
||||
- LOCOMO benchmark results from mem0.ai/research
|
||||
- LongMemEval: huggingface.co/datasets/xiaowu0162/longmemeval-cleaned
|
||||
@@ -1,529 +0,0 @@
|
||||
# Multi-Agent Coordination SOTA Research Report
|
||||
## Fleet Knowledge Graph — Architecture Patterns & Integration Recommendations
|
||||
|
||||
**Date**: 2025-04-14
|
||||
**Scope**: Agent-to-agent communication, shared memory, task delegation, consensus protocols, conflict resolution
|
||||
**Frameworks Analyzed**: CrewAI, AutoGen, MetaGPT, ChatDev, CAMEL, LangGraph
|
||||
**Target Fleet**: Hermes (orchestrator), Timmy, Claude Code, Gemini, Kimi
|
||||
|
||||
---
|
||||
|
||||
## 1. EXECUTIVE SUMMARY
|
||||
|
||||
Six major multi-agent frameworks each solve coordination differently. The SOTA converges on **four core patterns**: role-based delegation with capability matching, shared state via publish-subscribe messaging, directed-graph task flows with conditional routing, and layered memory (short-term context + long-term knowledge graph). For our fleet, the optimal architecture combines **AutoGen's GraphFlow** (dag-based task routing), **CrewAI's hierarchical memory** (short-term RAG + long-term SQLite + entity memory), **MetaGPT's standardized output contracts** (typed task artifacts), and **CAMEL's role-playing delegation protocol** (inception-prompted agent negotiation).
|
||||
|
||||
---
|
||||
|
||||
## 2. FRAMEWORK-BY-FRAMEWORK ANALYSIS
|
||||
|
||||
### 2.1 CrewAI (v1.14.x) — Role-Based Crews with Hierarchical Orchestration
|
||||
|
||||
**Core Architecture:**
|
||||
- **Process modes**: `Process.sequential` (tasks execute in order), `Process.hierarchical` (manager agent delegates to workers)
|
||||
- **Agent delegation**: `allow_delegation=True` enables agents to call other agents as tools, selecting the best agent for subtasks
|
||||
- **Memory system**: Crew-level `memory=True` enables UnifiedMemory with:
|
||||
- **Short-term**: RAG-backed (embeddings → vector store) for recent task context
|
||||
- **Long-term**: SQLite-backed for persistent task outcomes
|
||||
- **Entity memory**: Tracks entities (people, companies, concepts) across tasks
|
||||
- **User memory**: Per-user preference tracking
|
||||
- **Embedder**: Configurable (OpenAI, Cohere, Jina, local ONNX, etc.)
|
||||
- **Knowledge sources**: `knowledge_sources=[StringKnowledgeSource(...)]` for RAG-grounded context per agent or crew
|
||||
- **Flows**: `@start`, `@listen`, `@router` decorators for DAG orchestration across crews. `or_()` and `and_()` combinators for conditional triggers
|
||||
- **Callbacks**: `before_kickoff_callbacks`, `after_kickoff_callbacks`, `step_callback`, `task_callback`
|
||||
|
||||
**Key Patterns for Fleet:**
|
||||
- **Delegation-as-tool**: Agents can invoke other agents by role → our fleet agents could expose themselves as callable tools to each other
|
||||
- **Sequential handoff**: Task output from Agent A feeds directly as input to Agent B → pipeline pattern
|
||||
- **Hierarchical manager**: A manager LLM decomposes goals and assigns tasks → matches Hermes-as-orchestrator pattern
|
||||
- **Shared memory with scopes**: Crew-level memory visible to all agents, agent-level memory private
|
||||
|
||||
**Limitations:**
|
||||
- No native inter-process communication — all agents live in the same process
|
||||
- Manager/hierarchical mode requires an LLM call just for delegation decisions (extra latency/cost)
|
||||
- No built-in conflict resolution for concurrent writes to shared memory
|
||||
|
||||
### 2.2 AutoGen (v0.7.5) — Flexible Team Topologies with Graph-Based Coordination
|
||||
|
||||
**Core Architecture:**
|
||||
- **Team topologies** (5 types):
|
||||
- `RoundRobinGroupChat`: Sequential turn-taking, each agent speaks in order
|
||||
- `SelectorGroupChat`: LLM selects next speaker based on conversation context (`selector_prompt` template)
|
||||
- `MagenticOneGroupChat`: Orchestrator-driven (from Microsoft's Magentic-One paper), with stall detection and replanning
|
||||
- `Swarm`: Handoff-based — current speaker explicitly hands off to target via `HandoffMessage`
|
||||
- `GraphFlow`: **Directed acyclic graph** execution — agents execute based on DAG edges with conditional routing, fan-out, join patterns, and loop support
|
||||
- **Agent types**:
|
||||
- `AssistantAgent`: Standard LLM agent with tools
|
||||
- `CodeExecutorAgent`: Runs code in isolated environments
|
||||
- `UserProxyAgent`: Human-in-the-loop proxy
|
||||
- `SocietyOfMindAgent`: **Meta-agent** — wraps an inner team and summarizes their output as a single response (composable nesting)
|
||||
- `MessageFilterAgent`: Filters/transforms messages between agents
|
||||
- **Termination conditions**: `TextMentionTermination`, `MaxMessageTermination`, `SourceMatchTermination`, `HandoffTermination`, `TimeoutTermination`, `FunctionCallTermination`, `TokenUsageTermination`, `ExternalTermination` (programmatic control), `FunctionalTermination` (custom function)
|
||||
- **Memory**: `Sequence[Memory]` on agents — per-agent memory stores (RAG-backed)
|
||||
- **GraphFlow specifics**:
|
||||
- `DiGraphBuilder.add_node(agent, activation='all'|'any')`
|
||||
- `DiGraphBuilder.add_edge(source, target, condition=callable|str)` — conditional edges
|
||||
- `set_entry_point(agent)` — defines graph root
|
||||
- Supports: sequential, parallel fan-out, conditional branching, join patterns, loops with exit conditions
|
||||
- Node activation: `'all'` (wait for all incoming edges) vs `'any'` (trigger on first)
|
||||
|
||||
**Key Patterns for Fleet:**
|
||||
- **GraphFlow is the SOTA pattern** for multi-agent orchestration — DAG-based, conditional, supports parallel branches and joins
|
||||
- **SocietyOfMindAgent** enables hierarchical composition — a team of agents wrapped as a single agent that can participate in a larger team
|
||||
- **Selector pattern** (LLM picks next speaker) is elegant for heterogeneous fleets where capability matching matters
|
||||
- **Swarm handoff** maps directly to our ACP handoff mechanism
|
||||
- **Termination conditions** are composable — `termination_a | termination_b` (OR), `termination_a & termination_b` (AND)
|
||||
|
||||
### 2.3 MetaGPT — SOP-Driven Multi-Agent with Standardized Artifacts
|
||||
|
||||
**Core Architecture (from paper + codebase):**
|
||||
- **SOP (Standard Operating Procedure)**: Tasks decomposed into phases, each with specific roles and required artifacts
|
||||
- **Role-based agents**: Each role has `name`, `profile`, `goal`, `constraints`, `actions` (specific output types)
|
||||
- **Shared Message Environment**: All agents publish to and subscribe from a shared `Environment` object
|
||||
- **Publish-Subscribe**: Agents subscribe to message types/topics they care about, ignore others
|
||||
- **Standardized Output**: Each action produces a typed artifact (e.g., `SystemDesign`, `Task`, `Code`) — structured contracts between agents
|
||||
- **Memory**: `Memory` class stores all messages, retrievable by relevance. `Role.react()` calls `observe()` then `act()` based on observed messages
|
||||
- **Communication**: Asynchronous message passing — agents publish results to environment, interested agents react
|
||||
|
||||
**Key Patterns for Fleet:**
|
||||
- **Typed artifact contracts**: Each agent publishes structured outputs (not free-form text) → reduces ambiguity in inter-agent communication
|
||||
- **Pub-sub messaging**: Decouples sender from receiver — agents don't need to know about each other, just subscribe to relevant topics
|
||||
- **SOP-driven phases**: Define workflow phases (e.g., "analysis" → "implementation" → "review") with specific agents per phase
|
||||
- **Environment as blackboard**: Shared state all agents can read/write — classic blackboard architecture for AI systems
|
||||
|
||||
### 2.4 ChatDev — Chat-Chain Architecture for Software Development
|
||||
|
||||
**Core Architecture:**
|
||||
- **Chat Chain**: Sequential phases (design → code → test → document), each phase is a two-agent conversation
|
||||
- **Role pairing**: Each phase pairs complementary roles (e.g., CEO ↔ CTO, Programmer ↔ Reviewer)
|
||||
- **Communicative dehallucination**: Agents communicate through structured prompts that constrain outputs to prevent hallucination
|
||||
- **Phase transitions**: Phase completion triggers next phase, output from one phase seeds the next
|
||||
- **Memory**: Conversation history within each phase; phase outputs stored as artifacts
|
||||
|
||||
**Key Patterns for Fleet:**
|
||||
- **Phase-gated pipeline**: Each phase must produce a specific artifact type before proceeding
|
||||
- **Complementary role pairing**: Pair agents with opposing perspectives (creator ↔ reviewer) for higher quality
|
||||
- **Communicative protocols**: Structured conversation templates reduce free-form ambiguity
|
||||
|
||||
### 2.5 CAMEL — Role-Playing Autonomous Multi-Agent Communication
|
||||
|
||||
**Core Architecture:**
|
||||
- **RolePlaying society**: Two agents (assistant + user) collaborate with inception prompting
|
||||
- **Task specification**: `with_task_specify=True` uses a task-specify agent to refine the initial prompt into a concrete task
|
||||
- **Task planning**: `with_task_planner=True` adds a planning agent that decomposes the task
|
||||
- **Critic-in-the-loop**: `with_critic_in_the_loop=True` adds a critic agent that evaluates and approves/rejects
|
||||
- **Inception prompting**: Both agents receive system messages that establish their roles, goals, and communication protocol
|
||||
- **Termination**: Agents signal completion via specific tokens or phrases
|
||||
|
||||
**Key Patterns for Fleet:**
|
||||
- **Inception prompting**: Agents negotiate a shared understanding of the task before executing
|
||||
- **Critic-in-the-loop**: A dedicated reviewer agent validates outputs before acceptance
|
||||
- **Role-playing protocol**: Structured back-and-forth between complementary agents
|
||||
- **Task refinement chain**: Raw goal → specified task → planned subtasks → executed
|
||||
|
||||
### 2.6 LangGraph — Graph-Based Stateful Agent Workflows
|
||||
|
||||
**Core Architecture (from documentation/paper):**
|
||||
- **StateGraph**: Typed state schema shared across all nodes (agents/tools)
|
||||
- **Nodes**: Functions (agents, tools, transforms) that read/modify shared state
|
||||
- **Edges**: Conditional routing based on state or agent decisions
|
||||
- **Checkpointer**: Persistent state snapshots (SQLite, Postgres, in-memory) — enables pause/resume
|
||||
- **Human-in-the-loop**: Interrupt nodes for approval, edit, review
|
||||
- **Streaming**: Real-time node-by-node or token-by-token output
|
||||
- **Subgraphs**: Composable graph composition — subgraph as a node in parent graph
|
||||
- **State channels**: Multiple state namespaces for different aspects of the workflow
|
||||
|
||||
**Key Patterns for Fleet:**
|
||||
- **Shared typed state**: All agents operate on a well-defined state schema — eliminates ambiguity about what data each agent sees
|
||||
- **Checkpoint persistence**: Workflow can be paused, resumed, forked — critical for long-running agent tasks
|
||||
- **Conditional edges**: Route based on agent output type or state values
|
||||
- **Subgraph composition**: Each fleet agent could be a subgraph, composed into larger workflows
|
||||
- **Command-based routing**: Nodes return `Command(goto="node_name", update={...})` for explicit control flow
|
||||
|
||||
---
|
||||
|
||||
## 3. CROSS-CUTTING PATTERNS ANALYSIS
|
||||
|
||||
### 3.1 Agent-to-Agent Communication
|
||||
|
||||
| Pattern | Frameworks | Latency | Decoupling | Structured |
|
||||
|---------|-----------|---------|------------|------------|
|
||||
| Direct tool invocation | CrewAI, AutoGen | Low | Low | Medium |
|
||||
| Pub-sub messaging | MetaGPT | Medium | High | High |
|
||||
| Handoff messages | AutoGen Swarm | Low | Medium | High |
|
||||
| Chat-chain conversations | ChatDev, CAMEL | High | Low | Medium |
|
||||
| Shared state graph | LangGraph, AutoGen GraphFlow | Low | Medium | High |
|
||||
|
||||
**Recommendation**: Use **handoff + shared state** pattern. Agents communicate via typed handoff messages (what task was completed, what artifacts produced) while sharing a typed state object (knowledge graph entries).
|
||||
|
||||
### 3.2 Shared Memory Patterns
|
||||
|
||||
| Pattern | Frameworks | Persistence | Scope | Query Method |
|
||||
|---------|-----------|-------------|-------|-------------|
|
||||
| RAG-backed short-term | CrewAI, AutoGen | Session | Crew/Team | Embedding similarity |
|
||||
| SQLite long-term | CrewAI | Cross-session | Global | SQL + embeddings |
|
||||
| Entity memory | CrewAI | Cross-session | Global | Entity lookup |
|
||||
| Message store | MetaGPT | Session | Environment | Relevance search |
|
||||
| Typed state channels | LangGraph | Checkpointed | Graph | State field access |
|
||||
| Frozen snapshot | Hermes (current) | Cross-session | Agent | System prompt injection |
|
||||
|
||||
**Recommendation**: Implement **three-tier memory**:
|
||||
1. **Session state** (LangGraph-style typed state graph) — shared within a workflow
|
||||
2. **Fleet knowledge graph** (new) — structured triples/relations between entities, projects, decisions
|
||||
3. **Agent-local memory** (existing MEMORY.md pattern) — per-agent persistent notes
|
||||
|
||||
### 3.3 Task Delegation
|
||||
|
||||
| Pattern | Frameworks | Decision Maker | Granularity |
|
||||
|---------|-----------|---------------|-------------|
|
||||
| Manager decomposition | CrewAI hierarchical | Manager LLM | Task-level |
|
||||
| Delegation-as-tool | CrewAI | Self-selecting | Subtask |
|
||||
| Selector-based | AutoGen SelectorGroupChat | LLM selector | Turn-level |
|
||||
| Handoff-based | AutoGen Swarm | Current agent | Message-level |
|
||||
| Graph-defined | AutoGen GraphFlow, LangGraph | Pre-defined DAG | Node-level |
|
||||
| SOP-based | MetaGPT | Phase rules | Phase-level |
|
||||
|
||||
**Recommendation**: Use **hybrid delegation**:
|
||||
- **Graph-based** for known workflows (CI/CD, code review pipelines) — pre-defined DAGs
|
||||
- **Selector-based** for exploratory tasks (research, debugging) — LLM picks best agent
|
||||
- **Handoff-based** for agent-initiated delegation — current agent explicitly hands off
|
||||
|
||||
### 3.4 Consensus Protocols
|
||||
|
||||
No framework implements true consensus protocols (Raft, PBFT). Instead:
|
||||
|
||||
| Pattern | What It Solves |
|
||||
|---------|---------------|
|
||||
| Critic-in-the-loop (CAMEL) | Single reviewer approves/rejects |
|
||||
| Aggregator synthesis (MoA/Mixture-of-Agents) | Multiple responses synthesized into one |
|
||||
| Hierarchical manager (CrewAI) | Manager makes final decision |
|
||||
| MagenticOne orchestrator (AutoGen) | Orchestrator plans and replans |
|
||||
|
||||
**Recommendation for Fleet**: Implement **weighted ensemble consensus**:
|
||||
1. Multiple agents produce independent solutions
|
||||
2. A synthesis agent aggregates (like MoA pattern already in Hermes)
|
||||
3. For critical decisions, require 2-of-3 agreement from designated expert agents
|
||||
|
||||
### 3.5 Conflict Resolution
|
||||
|
||||
| Conflict Type | Resolution Strategy |
|
||||
|--------------|-------------------|
|
||||
| Concurrent memory writes | File locking + atomic rename (Hermes already does this) |
|
||||
| Conflicting agent outputs | Critic/validator agent evaluates both |
|
||||
| Task assignment conflicts | Single orchestrator (Hermes) assigns, no self-assignment |
|
||||
| State graph race conditions | LangGraph checkpoint + merge strategies |
|
||||
|
||||
**Recommendation**:
|
||||
- **Write conflicts**: Atomic operations with optimistic locking (existing pattern)
|
||||
- **Output conflicts**: Dedicate one agent as "judge" for each workflow
|
||||
- **Assignment conflicts**: Centralized orchestrator (Hermes) — no agent self-delegation to other fleet members without approval
|
||||
|
||||
---
|
||||
|
||||
## 4. FLEET ARCHITECTURE RECOMMENDATION
|
||||
|
||||
### 4.1 Proposed Architecture: "Fleet Knowledge Graph" (FKG)
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────┐
|
||||
│ FLEET KNOWLEDGE GRAPH │
|
||||
│ │
|
||||
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
|
||||
│ │ Entities │ │ Relations│ │ Artifacts│ │ Decisions│ │
|
||||
│ │ (nodes) │──│ (edges) │──│ (typed) │──│ (history)│ │
|
||||
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
|
||||
│ │
|
||||
│ Storage: SQLite + FTS5 (existing hermes_state.py pattern) │
|
||||
│ Schema: RDF-lite triples with typed properties │
|
||||
└─────────────────────┬───────────────────────────────────────┘
|
||||
│
|
||||
┌───────────┼───────────┐
|
||||
│ │ │
|
||||
┌────▼────┐ ┌────▼────┐ ┌───▼─────┐
|
||||
│ Session │ │ Agent │ │ Workflow│
|
||||
│ State │ │ Memory │ │ History │
|
||||
│ (shared)│ │ (local) │ │ (audit) │
|
||||
└─────────┘ └─────────┘ └─────────┘
|
||||
```
|
||||
|
||||
### 4.2 Fleet Member Roles
|
||||
|
||||
| Agent | Role | Strengths | Delegation Style |
|
||||
|-------|------|-----------|-----------------|
|
||||
| **Hermes** | Orchestrator | Planning, tool use, multi-platform | Delegator (spawns others) |
|
||||
| **Claude Code** | Code specialist | Deep code reasoning, ACP integration | Executor (receives tasks) |
|
||||
| **Gemini** | Multimodal analyst | Vision, large context, fast | Executor (receives tasks) |
|
||||
| **Kimi** | Coding assistant | Code generation, long context | Executor (receives tasks) |
|
||||
| **Timmy** | (Details TBD) | TBD | Executor (receives tasks) |
|
||||
|
||||
### 4.3 Communication Protocol
|
||||
|
||||
**Inter-Agent Message Format** (inspired by MetaGPT's typed artifacts):
|
||||
|
||||
```json
|
||||
{
|
||||
"message_type": "task_request|task_response|handoff|knowledge_update|conflict",
|
||||
"source_agent": "hermes",
|
||||
"target_agent": "claude_code",
|
||||
"task_id": "uuid",
|
||||
"parent_task_id": "uuid|null",
|
||||
"payload": {
|
||||
"goal": "...",
|
||||
"context": "...",
|
||||
"artifacts": [{"type": "code", "path": "..."}, {"type": "analysis", "content": "..."}],
|
||||
"constraints": ["..."],
|
||||
"priority": "high|medium|low"
|
||||
},
|
||||
"knowledge_graph_refs": ["entity:project-x", "relation:depends-on"],
|
||||
"timestamp": "ISO8601",
|
||||
"signature": "hmac-or-uuid"
|
||||
}
|
||||
```
|
||||
|
||||
### 4.4 Task Flow Patterns
|
||||
|
||||
**Pattern 1: Pipeline (ChatDev-style)**
|
||||
```
|
||||
Hermes → [Analyze] → Claude Code → [Implement] → Gemini → [Review] → Hermes → [Deliver]
|
||||
```
|
||||
|
||||
**Pattern 2: Fan-out/Fan-in (AutoGen GraphFlow-style)**
|
||||
```
|
||||
┌→ Claude Code (code) ──┐
|
||||
Hermes ──┼→ Gemini (analysis) ───┼→ Hermes (synthesize)
|
||||
└→ Kimi (docs) ─────────┘
|
||||
```
|
||||
|
||||
**Pattern 3: Debate (CAMEL-style)**
|
||||
```
|
||||
Claude Code (proposal) ↔ Gemini (critic) → Hermes (judge)
|
||||
```
|
||||
|
||||
**Pattern 4: Selector (AutoGen SelectorGroupChat)**
|
||||
```
|
||||
Hermes (orchestrator) → LLM selects best agent → Agent executes → Result → Repeat
|
||||
```
|
||||
|
||||
### 4.5 Knowledge Graph Schema
|
||||
|
||||
```sql
|
||||
-- Core entities
|
||||
CREATE TABLE fkg_entities (
|
||||
id TEXT PRIMARY KEY,
|
||||
entity_type TEXT NOT NULL, -- 'project', 'file', 'agent', 'task', 'concept', 'decision'
|
||||
name TEXT NOT NULL,
|
||||
properties JSON, -- Flexible typed properties
|
||||
created_by TEXT, -- Agent that created this
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
-- Relations between entities
|
||||
CREATE TABLE fkg_relations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
source_entity TEXT REFERENCES fkg_entities(id),
|
||||
target_entity TEXT REFERENCES fkg_entities(id),
|
||||
relation_type TEXT NOT NULL, -- 'depends-on', 'created-by', 'reviewed-by', 'part-of', 'conflicts-with'
|
||||
properties JSON,
|
||||
confidence REAL DEFAULT 1.0,
|
||||
created_by TEXT,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
);
|
||||
|
||||
-- Task execution history
|
||||
CREATE TABLE fkg_task_history (
|
||||
task_id TEXT PRIMARY KEY,
|
||||
parent_task_id TEXT,
|
||||
goal TEXT,
|
||||
assigned_agent TEXT,
|
||||
status TEXT, -- 'pending', 'running', 'completed', 'failed', 'conflict'
|
||||
result_summary TEXT,
|
||||
artifacts JSON, -- List of produced artifacts
|
||||
knowledge_refs JSON, -- Entities/relations this task touched
|
||||
started_at TIMESTAMP,
|
||||
completed_at TIMESTAMP
|
||||
);
|
||||
|
||||
-- Conflict tracking
|
||||
CREATE TABLE fkg_conflicts (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
entity_id TEXT REFERENCES fkg_entities(id),
|
||||
conflict_type TEXT, -- 'concurrent_write', 'contradictory_output', 'resource_contention'
|
||||
agent_a TEXT,
|
||||
agent_b TEXT,
|
||||
resolution TEXT,
|
||||
resolved_by TEXT,
|
||||
resolved_at TIMESTAMP
|
||||
);
|
||||
|
||||
-- Full-text search across everything
|
||||
CREATE VIRTUAL TABLE fkg_search USING fts5(
|
||||
entity_name, entity_type, properties_text,
|
||||
content='fkg_entities', content_rowid='rowid'
|
||||
);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 5. INTEGRATION RECOMMENDATIONS
|
||||
|
||||
### 5.1 Phase 1: Foundation (Immediate — 1-2 weeks)
|
||||
|
||||
1. **Implement FKG SQLite database** at `~/.hermes/fleet_knowledge.db`
|
||||
- Extend existing `hermes_state.py` pattern (already uses SQLite + FTS5)
|
||||
- Add schema from §4.5
|
||||
- Create `tools/fleet_knowledge_tool.py` with CRUD operations
|
||||
|
||||
2. **Create fleet agent registry** in `agent/fleet_registry.py`
|
||||
- Map agent names → transport (ACP, API, subprocess)
|
||||
- Store capabilities, specializations, availability status
|
||||
- Integrate with existing `acp_adapter/` and `delegate_tool.py`
|
||||
|
||||
3. **Define message protocol** as typed Python dataclasses
|
||||
- `FleetMessage`, `TaskRequest`, `TaskResponse`, `KnowledgeUpdate`
|
||||
- Validation via Pydantic (already a CrewAI/dependency)
|
||||
|
||||
### 5.2 Phase 2: Communication Layer (2-4 weeks)
|
||||
|
||||
4. **Build fleet delegation on top of existing `delegate_tool.py`**
|
||||
- Extend to support cross-agent delegation (not just child subagents)
|
||||
- ACP transport for Claude Code (already supported via `acp_command`)
|
||||
- OpenRouter/OpenAI-compatible API for Gemini, Kimi
|
||||
- Reuse existing credential pool and provider resolution
|
||||
|
||||
5. **Implement selector-based task routing** (AutoGen SelectorGroupChat pattern)
|
||||
- LLM-based agent selection based on task description + agent capabilities
|
||||
- Hermes acts as the selector/orchestrator
|
||||
- Simple heuristic fallback (code → Claude Code, vision → Gemini, etc.)
|
||||
|
||||
6. **Add typed artifact contracts** (MetaGPT pattern)
|
||||
- Each task produces a typed artifact (code, analysis, docs, review)
|
||||
- Artifacts stored in FKG with entity relations
|
||||
- Downstream agents consume typed inputs, not free-form text
|
||||
|
||||
### 5.3 Phase 3: Advanced Patterns (4-6 weeks)
|
||||
|
||||
7. **Implement workflow DAGs** (AutoGen GraphFlow pattern)
|
||||
- Pre-defined workflows as directed graphs (code review pipeline, research pipeline)
|
||||
- Conditional routing based on artifact types or agent decisions
|
||||
- Fan-out/fan-in for parallel execution across fleet agents
|
||||
|
||||
8. **Add conflict resolution**
|
||||
- Detect concurrent writes to same FKG entities
|
||||
- Critic agent validates contradictory outputs
|
||||
- Track resolution history for learning
|
||||
|
||||
9. **Build consensus mechanism** for critical decisions
|
||||
- Weighted voting based on agent expertise
|
||||
- MoA-style aggregation (already implemented in `mixture_of_agents_tool.py`)
|
||||
- Escalation to human for irreconcilable conflicts
|
||||
|
||||
### 5.4 Phase 4: Intelligence (6-8 weeks)
|
||||
|
||||
10. **Learning from delegation history**
|
||||
- Track which agent performs best for which task types
|
||||
- Adjust routing weights over time
|
||||
- RL-style improvement of delegation decisions
|
||||
|
||||
11. **Fleet-level memory evolution**
|
||||
- Entities and relations in FKG become the "shared brain"
|
||||
- Agents contribute knowledge as they work
|
||||
- Cross-agent knowledge synthesis (one agent's discovery benefits all)
|
||||
|
||||
---
|
||||
|
||||
## 6. BENCHMARKS & PERFORMANCE CONSIDERATIONS
|
||||
|
||||
### 6.1 Latency Estimates
|
||||
|
||||
| Pattern | Overhead | Notes |
|
||||
|---------|----------|-------|
|
||||
| Direct delegation (current) | ~30s per subagent | Spawn + run + collect |
|
||||
| ACP transport (Claude Code) | ~2-5s connection + task time | Subprocess handshake |
|
||||
| API-based (Gemini/Kimi) | ~1-2s + task time | Standard HTTP |
|
||||
| Selector routing | +1 LLM call (~2-5s) | For agent selection |
|
||||
| GraphFlow routing | +state overhead (~100ms) | Pre-defined, no LLM call |
|
||||
| FKG query | ~1-5ms | SQLite indexed query |
|
||||
| MoA consensus | ~15-30s (4 parallel + 1 aggregator) | Already implemented |
|
||||
|
||||
### 6.2 Recommended Configuration
|
||||
|
||||
```yaml
|
||||
# Fleet coordination config (add to config.yaml)
|
||||
fleet:
|
||||
enabled: true
|
||||
knowledge_db: "~/.hermes/fleet_knowledge.db"
|
||||
|
||||
agents:
|
||||
hermes:
|
||||
role: orchestrator
|
||||
transport: local
|
||||
claude_code:
|
||||
role: code_specialist
|
||||
transport: acp
|
||||
acp_command: "claude"
|
||||
acp_args: ["--acp", "--stdio"]
|
||||
capabilities: ["code", "debugging", "architecture"]
|
||||
gemini:
|
||||
role: multimodal_analyst
|
||||
transport: api
|
||||
provider: openrouter
|
||||
model: "google/gemini-3-pro-preview"
|
||||
capabilities: ["vision", "analysis", "large_context"]
|
||||
kimi:
|
||||
role: coding_assistant
|
||||
transport: api
|
||||
provider: kimi-coding
|
||||
capabilities: ["code", "long_context"]
|
||||
|
||||
delegation:
|
||||
strategy: selector # selector | pipeline | graph
|
||||
max_concurrent: 3
|
||||
timeout_seconds: 300
|
||||
|
||||
consensus:
|
||||
enabled: true
|
||||
min_agreement: 2 # 2-of-3 for critical decisions
|
||||
escalation_agent: hermes
|
||||
|
||||
knowledge:
|
||||
auto_extract: true # Extract entities from task results
|
||||
relation_confidence_threshold: 0.7
|
||||
search_provider: fts5 # fts5 | vector | hybrid
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. EXISTING HERMES INFRASTRUCTURE TO LEVERAGE
|
||||
|
||||
| Component | What It Provides | Reuse For |
|
||||
|-----------|-----------------|-----------|
|
||||
| `delegate_tool.py` | Subagent spawning, isolated contexts | Fleet delegation transport |
|
||||
| `mixture_of_agents_tool.py` | Multi-model consensus/aggregation | Fleet consensus protocol |
|
||||
| `memory_tool.py` | Bounded persistent memory with atomic writes | Pattern for FKG writes |
|
||||
| `acp_adapter/` | ACP server for IDE integration | Claude Code transport |
|
||||
| `hermes_state.py` | SQLite + FTS5 session store | FKG database foundation |
|
||||
| `tools/registry.py` | Central tool registry | Fleet knowledge tool registration |
|
||||
| `agent/credential_pool.py` | Credential rotation | Multi-provider auth |
|
||||
| `hermes_cli/runtime_provider.py` | Provider resolution | Fleet agent connection |
|
||||
|
||||
---
|
||||
|
||||
## 8. KEY TAKEAWAYS
|
||||
|
||||
1. **GraphFlow (AutoGen) is the SOTA orchestration pattern** — DAG-based execution with conditional routing beats sequential chains and pure LLM-delegation for structured workflows
|
||||
|
||||
2. **Three-tier memory is essential** — Session state (volatile), knowledge graph (persistent structured), agent memory (persistent per-agent notes)
|
||||
|
||||
3. **Typed artifacts over free-form text** — MetaGPT's approach of standardized output contracts dramatically reduces inter-agent ambiguity
|
||||
|
||||
4. **Hybrid delegation beats any single pattern** — Pre-defined DAGs for known workflows, LLM selection for exploratory tasks, handoff for agent-initiated delegation
|
||||
|
||||
5. **Critic-in-the-loop is the practical consensus mechanism** — Don't implement Byzantine fault tolerance; a dedicated reviewer agent with clear acceptance criteria is sufficient
|
||||
|
||||
6. **Our existing infrastructure covers ~60% of what's needed** — delegate_tool, MoA, memory_tool, ACP adapter, and SQLite patterns are solid foundations to build on
|
||||
|
||||
7. **The fleet knowledge graph is the differentiator** — No existing framework has a proper shared knowledge graph that persists across agent interactions. Building this gives us a unique advantage.
|
||||
|
||||
---
|
||||
|
||||
*Report generated from analysis of CrewAI v1.14.1, AutoGen v0.7.5, CAMEL v0.2.90 (installed locally), plus MetaGPT, ChatDev, and LangGraph documentation.*
|
||||
@@ -1,301 +0,0 @@
|
||||
# Research Report: R@5 vs End-to-End Accuracy Gap
|
||||
|
||||
## Executive Summary
|
||||
|
||||
The gap between retrieval recall (R@5) and end-to-end answer accuracy is a **fundamental bottleneck** in RAG systems, not merely an engineering problem. MemPalace's finding of 98.4% R@5 but only 17% correct answers (81-point gap) represents an extreme but not unusual case of this phenomenon. Academic research confirms this pattern: even with *oracle retrieval* (guaranteed correct documents), models below 7B parameters fail to extract correct answers 85-100% of the time on questions they cannot answer alone.
|
||||
|
||||
---
|
||||
|
||||
## 1. WHY Does Retrieval Succeed but Answering Fail?
|
||||
|
||||
### 1.1 The Fundamental Utilization Bottleneck
|
||||
|
||||
**Key Finding:** The gap is primarily a *reader/LLM utilization problem*, not a retrieval problem.
|
||||
|
||||
**Source:** "Can Small Language Models Use What They Retrieve?" (Pandey, 2026 - arXiv:2603.11513)
|
||||
|
||||
This study evaluated five model sizes (360M to 8B) across three architecture families under four retrieval conditions (no retrieval, BM25, dense, and oracle). Key findings:
|
||||
|
||||
- Even with **oracle retrieval** (guaranteed correct answer in context), models of 7B or smaller fail to extract the correct answer **85-100% of the time** on questions they cannot answer alone
|
||||
- Adding retrieval context **destroys 42-100% of answers** the model previously knew (distraction effect)
|
||||
- The dominant failure mode is **"irrelevant generation"** - the model ignores the provided context entirely
|
||||
- These patterns hold across multiple prompt templates and retrieval methods
|
||||
|
||||
### 1.2 Context Faithfulness Problem
|
||||
|
||||
**Key Finding:** LLMs often prioritize their parametric knowledge over retrieved context, creating a "knowledge conflict."
|
||||
|
||||
**Source:** "Context-faithful Prompting for Large Language Models" (Zhou et al., 2023 - arXiv:2303.11315)
|
||||
|
||||
- LLMs encode parametric knowledge that can cause them to overlook contextual cues
|
||||
- This leads to incorrect predictions in context-sensitive tasks
|
||||
- Faithfulness can be significantly improved with carefully designed prompting strategies
|
||||
|
||||
### 1.3 The Distraction Effect
|
||||
|
||||
**Key Finding:** Retrieved context can actually *hurt* performance by distracting the model from answers it already knows.
|
||||
|
||||
**Source:** "Can Small Language Models Use What They Retrieve?" (arXiv:2603.11513)
|
||||
|
||||
- When retrieval context is added (even good context), models lose 42-100% of previously correct answers
|
||||
- This suggests the model is "confused" by the presence of context rather than effectively utilizing it
|
||||
- The distraction is driven by the *presence* of context rather than its quality
|
||||
|
||||
### 1.4 Multi-Hop Reasoning Failures
|
||||
|
||||
**Key Finding:** Complex queries requiring synthesis from multiple documents create cascading errors.
|
||||
|
||||
**Source:** "Tree of Reviews" (Li et al., 2024 - arXiv:2404.14464)
|
||||
|
||||
- Retrieved irrelevant paragraphs can mislead reasoning
|
||||
- An error in chain-of-thought structure leads to cascade of errors
|
||||
- Traditional chain methods are fragile to noise in retrieval
|
||||
|
||||
### 1.5 Similarity ≠ Utility
|
||||
|
||||
**Key Finding:** Cosine similarity between query and document doesn't guarantee the document will be *useful* for answering.
|
||||
|
||||
**Source:** "Similarity is Not All You Need: MetRag" (Gan et al., 2024 - arXiv:2405.19893)
|
||||
|
||||
- Existing RAG models use similarity as the bridge between queries and documents
|
||||
- Relying solely on similarity sometimes degrades RAG performance
|
||||
- Utility-oriented retrieval (what's actually helpful for answering) differs from similarity-oriented retrieval
|
||||
|
||||
### 1.6 Query Complexity Levels
|
||||
|
||||
**Source:** "Retrieval Augmented Generation (RAG) and Beyond" (Zhao et al., 2024 - arXiv:2409.14924)
|
||||
|
||||
The survey identifies four levels of query complexity, each with different utilization challenges:
|
||||
|
||||
1. **Explicit fact queries** - Simple extraction (high utilization expected)
|
||||
2. **Implicit fact queries** - Require inference across documents (moderate utilization)
|
||||
3. **Interpretable rationale queries** - Require understanding domain logic (low utilization)
|
||||
4. **Hidden rationale queries** - Require deep synthesis (very low utilization)
|
||||
|
||||
The MemPalace crisis support domain likely involves levels 3-4, explaining the extreme gap.
|
||||
|
||||
---
|
||||
|
||||
## 2. Patterns That Bridge the Gap
|
||||
|
||||
### 2.1 Reader-Guided Reranking (RIDER)
|
||||
|
||||
**Effectiveness:** 10-20 absolute gains in top-1 retrieval accuracy, 1-4 EM gains
|
||||
|
||||
**Source:** "Rider: Reader-Guided Passage Reranking" (Mao et al., 2021 - arXiv:2101.00294)
|
||||
|
||||
**Pattern:** Use the reader's own predictions to rerank passages before final answer generation. This aligns retrieval with what the reader can actually use.
|
||||
|
||||
- Achieves 48.3 EM on Natural Questions with only 1,024 tokens (7.8 passages avg)
|
||||
- Outperforms state-of-the-art transformer-based supervised rerankers
|
||||
- No training required - uses reader's top predictions as signal
|
||||
|
||||
**Recommendation:** Implement reader-in-the-loop reranking to prioritize passages the LLM can actually utilize.
|
||||
|
||||
### 2.2 Context-Faithful Prompting
|
||||
|
||||
**Effectiveness:** Significant improvement in faithfulness to context
|
||||
|
||||
**Source:** "Context-faithful Prompting" (Zhou et al., 2023 - arXiv:2303.11315)
|
||||
|
||||
**Two most effective techniques:**
|
||||
|
||||
1. **Opinion-based prompts:** Reframe context as a narrator's statement and ask about the narrator's opinions
|
||||
- Example: Instead of "Answer based on: [context]", use "According to the following testimony: [context]. What does the narrator suggest about X?"
|
||||
|
||||
2. **Counterfactual demonstrations:** Use examples containing false facts to improve faithfulness
|
||||
- The model learns to prioritize context over parametric knowledge
|
||||
|
||||
**Recommendation:** Use opinion-based framing and counterfactual examples in crisis support prompts.
|
||||
|
||||
### 2.3 Retrieval-Augmented Thoughts (RAT)
|
||||
|
||||
**Effectiveness:** 13-43% relative improvement across tasks
|
||||
|
||||
**Source:** "RAT: Retrieval Augmented Thoughts" (Wang et al., 2024 - arXiv:2403.05313)
|
||||
|
||||
**Pattern:** Iteratively revise each chain-of-thought step with retrieved information relevant to:
|
||||
- The task query
|
||||
- The current thought step
|
||||
- Past thought steps
|
||||
|
||||
**Results:**
|
||||
- Code generation: +13.63%
|
||||
- Mathematical reasoning: +16.96%
|
||||
- Creative writing: +19.2%
|
||||
- Embodied task planning: +42.78%
|
||||
|
||||
**Recommendation:** Implement iterative CoT revision with retrieval at each step.
|
||||
|
||||
### 2.4 FAIR-RAG: Structured Evidence Assessment
|
||||
|
||||
**Effectiveness:** 8.3 absolute F1 improvement on HotpotQA
|
||||
|
||||
**Source:** "FAIR-RAG" (Asl et al., 2025 - arXiv:2510.22344)
|
||||
|
||||
**Pattern:** Transform RAG into a dynamic reasoning process with:
|
||||
1. Decompose query into checklist of required findings
|
||||
2. Audit aggregated evidence to identify confirmed facts AND explicit gaps
|
||||
3. Generate targeted sub-queries to fill gaps
|
||||
4. Repeat until evidence is sufficient
|
||||
|
||||
**Recommendation:** For crisis support, implement gap-aware evidence assessment before generating answers.
|
||||
|
||||
### 2.5 Two-Stage Retrieval with Marginal-Utility Reranking
|
||||
|
||||
**Source:** "Enhancing RAG with Two-Stage Retrieval" (George, 2025 - arXiv:2601.03258)
|
||||
|
||||
**Pattern:**
|
||||
- Stage 1: LLM-driven query expansion for high recall
|
||||
- Stage 2: Fast reranker (FlashRank) that dynamically selects optimal evidence subset under token budget
|
||||
- Utility modeled as: relevance + novelty + brevity + cross-encoder evidence
|
||||
|
||||
**Recommendation:** Use marginal-utility reranking to balance relevance, novelty, and token efficiency.
|
||||
|
||||
### 2.6 Multi-Layered Thoughts (MetRag)
|
||||
|
||||
**Source:** "Similarity is Not All You Need" (Gan et al., 2024 - arXiv:2405.19893)
|
||||
|
||||
**Pattern:** Three types of "thought" layers:
|
||||
1. **Similarity-oriented** - Standard retrieval
|
||||
2. **Utility-oriented** - Small utility model supervised by LLM
|
||||
3. **Compactness-oriented** - Task-adaptive summarization of retrieved documents
|
||||
|
||||
**Recommendation:** Add utility scoring and document summarization before LLM processing.
|
||||
|
||||
### 2.7 Retrieval Augmented Fine-Tuning (RAFT)
|
||||
|
||||
**Source:** "An Empirical Study of RAG with Chain-of-Thought" (Zhao et al., 2024 - arXiv:2407.15569)
|
||||
|
||||
**Pattern:** Combine chain-of-thought with supervised fine-tuning and RAG:
|
||||
- Model learns to extract relevant information from noisy contexts
|
||||
- Enhanced information extraction and logical reasoning
|
||||
- Works for both long-form and short-form QA
|
||||
|
||||
**Recommendation:** Fine-tune on domain-specific data with CoT examples to improve utilization.
|
||||
|
||||
### 2.8 Monte Carlo Tree Search for Thought Generation
|
||||
|
||||
**Source:** "Retrieval Augmented Thought Process" (Pouplin et al., 2024 - arXiv:2402.07812)
|
||||
|
||||
**Effectiveness:** 35% additional accuracy vs. in-context RAG
|
||||
|
||||
**Pattern:** Formulate thought generation as a multi-step decision process optimized with MCTS:
|
||||
- Learn a proxy reward function for cost-efficient inference
|
||||
- Robust to imperfect retrieval
|
||||
- Particularly effective for private/sensitive data domains
|
||||
|
||||
**Recommendation:** For crisis support, consider MCTS-based reasoning to handle imperfect retrieval gracefully.
|
||||
|
||||
---
|
||||
|
||||
## 3. Minimum Viable Retrieval for Crisis Support
|
||||
|
||||
### 3.1 Critical Insight: The Gap is LARGER for Complex Domains
|
||||
|
||||
Crisis support queries are likely at the "interpretable rationale" or "hidden rationale" level (from the RAG survey taxonomy). This means:
|
||||
- Simple fact extraction won't work
|
||||
- The model needs to understand nuanced guidance
|
||||
- Multi-document synthesis is often required
|
||||
- The stakes of incorrect answers are extremely high
|
||||
|
||||
### 3.2 Minimum Viable Components
|
||||
|
||||
Based on the research, the minimum viable RAG system for crisis support needs:
|
||||
|
||||
#### A. Retrieval Layer (Still Important)
|
||||
- **Hybrid retrieval** (dense + sparse) for broad coverage
|
||||
- **Reranking** with reader feedback (RIDER pattern)
|
||||
- **Distractor filtering** - removing passages that hurt performance
|
||||
|
||||
#### B. Context Processing Layer (The Key Gap)
|
||||
- **Context compression/summarization** - reduce noise
|
||||
- **Relevance scoring** per passage, not just retrieval
|
||||
- **Utility-oriented ranking** beyond similarity
|
||||
|
||||
#### C. Generation Layer (Most Critical)
|
||||
- **Explicit faithfulness instructions** in prompts
|
||||
- **Opinion-based framing** for context utilization
|
||||
- **Chain-of-thought with retrieval revision** (RAT pattern)
|
||||
- **Evidence gap detection** before answering
|
||||
|
||||
#### D. Safety Layer
|
||||
- **Answer verification** against retrieved context
|
||||
- **Confidence calibration** - knowing when NOT to answer
|
||||
- **Fallback to human escalation** when utilization fails
|
||||
|
||||
### 3.3 Recommended Architecture for Crisis Support
|
||||
|
||||
```
|
||||
Query → Hybrid Retrieval → Reader-Guided Reranking → Context Compression
|
||||
→ Faithfulness-Optimized Prompt → CoT with Retrieval Revision
|
||||
→ Evidence Verification → Answer/Hold/Escalate Decision
|
||||
```
|
||||
|
||||
### 3.4 Expected Performance
|
||||
|
||||
Based on the literature:
|
||||
- **Naive RAG:** R@5 ~95%, E2E accuracy ~15-25%
|
||||
- **With reranking:** E2E accuracy +1-4 points
|
||||
- **With faithfulness prompting:** E2E accuracy +5-15 points
|
||||
- **With iterative CoT+retrieval:** E2E accuracy +10-20 points
|
||||
- **Combined interventions:** E2E accuracy 50-70% (realistic target)
|
||||
|
||||
The gap can be reduced from 81 points to ~25-45 points with proper interventions.
|
||||
|
||||
---
|
||||
|
||||
## 4. Key Takeaways
|
||||
|
||||
### The Gap is Fundamental, Not Accidental
|
||||
- Even oracle retrieval doesn't guarantee correct answers
|
||||
- Smaller models (<7B) have a "utilization bottleneck"
|
||||
- The distraction effect means more context can hurt
|
||||
|
||||
### Bridging the Gap Requires Multi-Pronged Approach
|
||||
1. **Better retrieval alignment** (reader-guided, utility-oriented)
|
||||
2. **Better context processing** (compression, filtering, summarization)
|
||||
3. **Better prompting** (faithfulness, opinion-based, CoT)
|
||||
4. **Better verification** (evidence checking, gap detection)
|
||||
|
||||
### Crisis Support Specific Considerations
|
||||
- High stakes mean low tolerance for hallucination
|
||||
- Complex queries require multi-step reasoning
|
||||
- Domain expertise needs explicit encoding in prompts
|
||||
- Safety requires explicit hold/escalate mechanisms
|
||||
|
||||
---
|
||||
|
||||
## 5. References
|
||||
|
||||
1. Pandey, S. (2026). "Can Small Language Models Use What They Retrieve?" arXiv:2603.11513
|
||||
2. Zhou, W. et al. (2023). "Context-faithful Prompting for Large Language Models." arXiv:2303.11315
|
||||
3. Zhao, S. et al. (2024). "Retrieval Augmented Generation (RAG) and Beyond." arXiv:2409.14924
|
||||
4. Mao, Y. et al. (2021). "Rider: Reader-Guided Passage Reranking." arXiv:2101.00294
|
||||
5. George, S. (2025). "Enhancing RAG with Two-Stage Retrieval." arXiv:2601.03258
|
||||
6. Asl, M.A. et al. (2025). "FAIR-RAG: Faithful Adaptive Iterative Refinement." arXiv:2510.22344
|
||||
7. Zhao, Y. et al. (2024). "An Empirical Study of RAG with Chain-of-Thought." arXiv:2407.15569
|
||||
8. Wang, Z. et al. (2024). "RAT: Retrieval Augmented Thoughts." arXiv:2403.05313
|
||||
9. Gan, C. et al. (2024). "Similarity is Not All You Need: MetRag." arXiv:2405.19893
|
||||
10. Pouplin, T. et al. (2024). "Retrieval Augmented Thought Process." arXiv:2402.07812
|
||||
11. Li, J. et al. (2024). "Tree of Reviews." arXiv:2404.14464
|
||||
12. Tian, F. et al. (2026). "Predicting Retrieval Utility and Answer Quality in RAG." arXiv:2601.14546
|
||||
13. Qi, J. et al. (2025). "On the Consistency of Multilingual Context Utilization in RAG." arXiv:2504.00597
|
||||
|
||||
---
|
||||
|
||||
## 6. Limitations of This Research
|
||||
|
||||
1. **MemPalace/Engram team analysis not found** - The specific analysis that discovered the 17% figure was not located through academic search. This may be from internal reports, blog posts, or presentations not indexed in arXiv.
|
||||
|
||||
2. **Domain specificity** - Most RAG research focuses on general QA, not crisis support. The patterns may need adaptation for high-stakes, sensitive domains.
|
||||
|
||||
3. **Model size effects** - The utilization bottleneck is worse for smaller models. The MemPalace system's model size is unknown.
|
||||
|
||||
4. **Evaluation methodology** - Different papers use different metrics (EM, F1, accuracy), making direct comparison difficult.
|
||||
|
||||
---
|
||||
|
||||
*Research conducted: April 14, 2026*
|
||||
*Researcher: Hermes Agent (subagent)*
|
||||
*Task: Research Task #1 - R@5 vs End-to-End Accuracy Gap*
|
||||
@@ -1,208 +0,0 @@
|
||||
# Open-Source Text-to-Music-Video Pipeline Research
|
||||
|
||||
## Executive Summary
|
||||
|
||||
**The complete text-to-music-video pipeline does NOT exist as a single open-source tool.** The landscape consists of powerful individual components that must be manually stitched together. This is the gap our Video Forge can fill.
|
||||
|
||||
---
|
||||
|
||||
## 1. EXISTING OPEN-SOURCE PIPELINES
|
||||
|
||||
### Complete (but crude) Pipelines
|
||||
|
||||
| Project | Stars | Description | Status |
|
||||
|---------|-------|-------------|--------|
|
||||
| **MusicVideoMaker** | 3 | Stable Diffusion pipeline for music videos from lyrics. Uses Excel spreadsheet for lyrics+timing, generates key frames, smooths between them. | Proof-of-concept, Jupyter notebook, not production-ready |
|
||||
| **DuckTapeVideos** | 0 | Node-based AI pipeline for beat-synced music videos from lyrics | Minimal, early stage |
|
||||
| **song-video-gen** | 0 | Stable Diffusion lyrics-based generative AI pipeline | Fork/copy of above |
|
||||
| **TikTok-Lyric-Video-Pipeline** | 1 | Automated Python pipeline for TikTok lyric videos (10-15/day) | Focused on lyric overlay, not generative visuals |
|
||||
|
||||
**Verdict: Nothing production-ready exists as a complete pipeline.**
|
||||
|
||||
---
|
||||
|
||||
## 2. INDIVIDUAL COMPONENTS (What's Already Free)
|
||||
|
||||
### A. Music Generation (Suno Alternatives)
|
||||
|
||||
| Project | Stars | License | Self-Hostable | Quality |
|
||||
|---------|-------|---------|---------------|---------|
|
||||
| **YuE** | 6,144 | Apache-2.0 | ✅ Yes | Full-song generation with vocals, Suno-level quality |
|
||||
| **HeartMuLa** | 4,037 | Apache-2.0 | ✅ Yes | Most powerful open-source music model (2026), multilingual |
|
||||
| **ACE-Step 1.5 + UI** | 970 | MIT | ✅ Yes | Professional Spotify-like UI, full song gen, 4+ min with vocals |
|
||||
| **Facebook MusicGen** | ~45k downloads | MIT | ✅ Yes | Good quality, melody conditioning, well-documented |
|
||||
| **Riffusion** | ~6k stars | Apache-2.0 | ✅ Yes | Spectrogram-based, unique approach |
|
||||
|
||||
**Status: Suno is effectively "given away" for free. YuE and HeartMuLa are production-ready.**
|
||||
|
||||
### B. Image Generation (Per-Scene/Beat)
|
||||
|
||||
| Project | Downloads/Stars | License | Notes |
|
||||
|---------|-----------------|---------|-------|
|
||||
| **Stable Diffusion XL** | 1.9M downloads | CreativeML | Best quality, huge ecosystem |
|
||||
| **Stable Diffusion 1.5** | 1.6M downloads | CreativeML | Fast, lightweight |
|
||||
| **FLUX** | Emerging | Apache-2.0 | Newest, excellent quality |
|
||||
| **ComfyUI** | 60k+ stars | GPL-3.0 | Node-based pipeline editor, massive plugin ecosystem |
|
||||
|
||||
**Status: Image generation is completely "given away." SD XL + ComfyUI is production-grade.**
|
||||
|
||||
### C. Text-to-Video Generation
|
||||
|
||||
| Project | Stars | License | Capabilities |
|
||||
|---------|-------|---------|--------------|
|
||||
| **Wan2.1** | 15,815 | Apache-2.0 | State-of-the-art, text-to-video and image-to-video |
|
||||
| **CogVideoX** | 12,634 | Apache-2.0 | Text and image to video, good quality |
|
||||
| **HunyuanVideo** | 11,965 | Custom | Tencent's framework, high quality |
|
||||
| **Stable Video Diffusion** | 3k+ likes | Stability AI | Image-to-video, good for short clips |
|
||||
| **LTX-Video** | Growing | Apache-2.0 | Fast inference, good quality |
|
||||
|
||||
**Status: Text-to-video is rapidly being "given away." Wan2.1 is production-ready for short clips (4-6 seconds).**
|
||||
|
||||
### D. Video Composition & Assembly
|
||||
|
||||
| Project | Stars | License | Use Case |
|
||||
|---------|-------|---------|----------|
|
||||
| **Remotion** | 43,261 | Custom (SSPL) | Programmatic video with React, production-grade |
|
||||
| **MoviePy** | 12k+ stars | MIT | Python video editing, widely used |
|
||||
| **Mosaico** | 16 | MIT | Python video composition with AI integration |
|
||||
| **FFmpeg** | N/A | LGPL/GPL | The universal video tool |
|
||||
|
||||
**Status: Video composition tools are mature and free. Remotion is production-grade.**
|
||||
|
||||
### E. Lyrics/Text Processing
|
||||
|
||||
| Component | Status | Notes |
|
||||
|-----------|--------|-------|
|
||||
| **Lyrics-to-scene segmentation** | ❌ Missing | No good open-source tool for breaking lyrics into visual scenes |
|
||||
| **Beat detection** | ✅ Exists | Librosa, madmom, aubio - all free and mature |
|
||||
| **Text-to-prompt generation** | ✅ Exists | LLMs (Ollama, local models) can do this |
|
||||
| **LRC/SRT parsing** | ✅ Exists | Many libraries available |
|
||||
|
||||
---
|
||||
|
||||
## 3. WHAT'S BEEN "GIVEN AWAY" FOR FREE
|
||||
|
||||
### Fully Solved (Production-Ready, Self-Hostable)
|
||||
- ✅ **Music generation**: YuE, HeartMuLa, ACE-Step match Suno quality
|
||||
- ✅ **Image generation**: SD XL, FLUX - commercial quality
|
||||
- ✅ **Video composition**: FFmpeg, MoviePy, Remotion
|
||||
- ✅ **Beat/audio analysis**: Librosa, madmom
|
||||
- ✅ **Text-to-video (short clips)**: Wan2.1, CogVideoX
|
||||
- ✅ **TTS/voice**: XTTS-v2, Kokoro, Bark
|
||||
|
||||
### Partially Solved
|
||||
- ⚠️ **Image-to-video**: Good for 4-6 second clips, struggles with longer sequences
|
||||
- ⚠️ **Style consistency**: LoRAs and ControlNet help, but not perfect across scenes
|
||||
- ⚠️ **Prompt engineering**: LLMs can help, but no dedicated lyrics-to-visual-prompt tool
|
||||
|
||||
---
|
||||
|
||||
## 4. WHERE THE REAL GAPS ARE
|
||||
|
||||
### Critical Gaps (Our Opportunity)
|
||||
|
||||
1. **Unified Pipeline Orchestration**
|
||||
- NO tool chains: lyrics → music → scene segmentation → image prompts → video composition
|
||||
- Everything requires manual stitching
|
||||
- Our Video Forge can be THE glue layer
|
||||
|
||||
2. **Lyrics-to-Visual-Scene Segmentation**
|
||||
- No tool analyzes lyrics and breaks them into visual beats/scenes
|
||||
- MusicVideoMaker uses manual Excel entry - absurd
|
||||
- Opportunity: LLM-powered scene segmentation with beat alignment
|
||||
|
||||
3. **Temporal Coherence Across Scenes**
|
||||
- Short clips (4-6s) work fine, but maintaining visual coherence across a 3-4 minute video is unsolved
|
||||
- Character consistency, color palette continuity, style drift
|
||||
- Opportunity: Style anchoring + scene-to-scene conditioning
|
||||
|
||||
4. **Beat-Synchronized Visual Transitions**
|
||||
- No tool automatically syncs visual cuts to musical beats
|
||||
- Manual timing is required everywhere
|
||||
- Opportunity: Beat detection → transition scheduling → FFmpeg composition
|
||||
|
||||
5. **Long-Form Video Generation**
|
||||
- Text-to-video models max out at 4-6 seconds
|
||||
- Stitching clips with consistent style/characters is manual
|
||||
- Opportunity: Automated clip chaining with style transfer
|
||||
|
||||
6. **One-Click "Lyrics In, Video Out"**
|
||||
- The dream pipeline doesn't exist
|
||||
- Current workflows require 5+ separate tools
|
||||
- Opportunity: Single command/endpoint that does everything
|
||||
|
||||
### Technical Debt in Existing Tools
|
||||
|
||||
- **YuE/HeartMuLa**: No video awareness - just audio generation
|
||||
- **Wan2.1/CogVideoX**: No lyrics/text awareness - just prompt-to-video
|
||||
- **ComfyUI**: Great for images, weak for video composition
|
||||
- **Remotion**: Great for composition, no AI generation built-in
|
||||
|
||||
---
|
||||
|
||||
## 5. RECOMMENDED ARCHITECTURE FOR VIDEO FORGE
|
||||
|
||||
Based on this research, the optimal Video Forge pipeline:
|
||||
|
||||
```
|
||||
[Lyrics/Poem Text]
|
||||
↓
|
||||
[LLM Scene Segmenter] → Beat-aligned scene descriptions + visual prompts
|
||||
↓
|
||||
[HeartMuLa/YuE] → Music audio (.wav)
|
||||
↓
|
||||
[Beat Detector (librosa)] → Beat timestamps + energy curve
|
||||
↓
|
||||
[SD XL / FLUX] → Scene images (one per beat/section)
|
||||
↓
|
||||
[Wan2.1 img2vid] → Short video clips per scene (4-6s each)
|
||||
↓
|
||||
[FFmpeg + Beat Sync] → Transitions aligned to beats
|
||||
↓
|
||||
[Final Music Video (.mp4)]
|
||||
```
|
||||
|
||||
### Key Design Decisions
|
||||
|
||||
1. **Music**: HeartMuLa (best quality, multilingual, Apache-2.0)
|
||||
2. **Images**: SD XL via ComfyUI (most mature ecosystem)
|
||||
3. **Video clips**: Wan2.1 for img2vid (state-of-the-art)
|
||||
4. **Composition**: FFmpeg (universal, battle-tested)
|
||||
5. **Orchestration**: Python pipeline with config file
|
||||
6. **Scene segmentation**: Local LLM (Ollama + Llama 3 or similar)
|
||||
|
||||
### What We Build vs. What We Use
|
||||
|
||||
| Component | Build or Use | Reasoning |
|
||||
|-----------|--------------|-----------|
|
||||
| Lyrics → Scenes | **BUILD** | No good tool exists, core differentiator |
|
||||
| Music generation | **USE** HeartMuLa/YuE | Already excellent, Apache-2.0 |
|
||||
| Image generation | **USE** SD XL | Mature, huge ecosystem |
|
||||
| Beat detection | **USE** librosa | Mature, reliable |
|
||||
| Video clips | **USE** Wan2.1 | Best quality, Apache-2.0 |
|
||||
| Video composition | **BUILD** (ffmpeg wrapper) | Need beat-sync logic |
|
||||
| Pipeline orchestration | **BUILD** | The main value-add |
|
||||
|
||||
---
|
||||
|
||||
## 6. COMPETITIVE LANDSCAPE SUMMARY
|
||||
|
||||
### Commercial (Not Self-Hostable)
|
||||
- **Suno**: Music only, no video
|
||||
- **Runway**: Video only, expensive
|
||||
- **Pika**: Short clips only
|
||||
- **Kaiber**: Closest to music video, but closed/subscription
|
||||
- **Synthesia**: Avatar-based, not generative art
|
||||
|
||||
### Open-Source Gaps That Matter
|
||||
1. Nobody has built the orchestration layer
|
||||
2. Nobody has solved lyrics-to-visual-scene well
|
||||
3. Nobody has beat-synced visual transitions automated
|
||||
4. Nobody maintains temporal coherence across minutes
|
||||
|
||||
**Our Video Forge fills the most important gap: the glue that makes individual AI components work together to produce a complete music video from text.**
|
||||
|
||||
---
|
||||
|
||||
*Research conducted: April 14, 2026*
|
||||
*Sources: GitHub API, HuggingFace API, project READMEs*
|
||||
@@ -1,202 +0,0 @@
|
||||
"""Tests for agent.privacy_filter — PII stripping before remote API calls."""
|
||||
|
||||
import pytest
|
||||
from agent.privacy_filter import (
|
||||
PrivacyFilter,
|
||||
RedactionReport,
|
||||
Sensitivity,
|
||||
sanitize_messages,
|
||||
quick_sanitize,
|
||||
)
|
||||
|
||||
|
||||
class TestPrivacyFilterSanitizeText:
|
||||
"""Test single-text sanitization."""
|
||||
|
||||
def test_no_pii_returns_clean(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "The weather in Paris is nice today."
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert cleaned == text
|
||||
assert redactions == []
|
||||
|
||||
def test_email_redacted(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "Send report to alice@example.com by Friday."
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "alice@example.com" not in cleaned
|
||||
assert "[REDACTED-EMAIL]" in cleaned
|
||||
assert any(r["type"] == "email_address" for r in redactions)
|
||||
|
||||
def test_phone_redacted(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "Call me at 555-123-4567 when ready."
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "555-123-4567" not in cleaned
|
||||
assert "[REDACTED-PHONE]" in cleaned
|
||||
|
||||
def test_api_key_redacted(self):
|
||||
pf = PrivacyFilter()
|
||||
text = 'api_key = "sk-proj-abcdefghij1234567890abcdefghij1234567890"'
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "sk-proj-" not in cleaned
|
||||
assert any(r["sensitivity"] == "CRITICAL" for r in redactions)
|
||||
|
||||
def test_github_token_redacted(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "Use ghp_1234567890abcdefghijklmnopqrstuvwxyz1234 for auth"
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "ghp_" not in cleaned
|
||||
assert any(r["type"] == "github_token" for r in redactions)
|
||||
|
||||
def test_ethereum_address_redacted(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "Send to 0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18 please"
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "0x742d" not in cleaned
|
||||
assert any(r["type"] == "ethereum_address" for r in redactions)
|
||||
|
||||
def test_user_home_path_redacted(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "Read file at /Users/alice/Documents/secret.txt"
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "alice" not in cleaned
|
||||
assert "[REDACTED-USER]" in cleaned
|
||||
|
||||
def test_multiple_pii_types(self):
|
||||
pf = PrivacyFilter()
|
||||
text = (
|
||||
"Contact john@test.com or call 555-999-1234. "
|
||||
"The API key is sk-abcdefghijklmnopqrstuvwxyz1234567890."
|
||||
)
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "john@test.com" not in cleaned
|
||||
assert "555-999-1234" not in cleaned
|
||||
assert "sk-abcd" not in cleaned
|
||||
assert len(redactions) >= 3
|
||||
|
||||
|
||||
class TestPrivacyFilterSanitizeMessages:
|
||||
"""Test message-list sanitization."""
|
||||
|
||||
def test_sanitize_user_message(self):
|
||||
pf = PrivacyFilter()
|
||||
messages = [
|
||||
{"role": "system", "content": "You are helpful."},
|
||||
{"role": "user", "content": "Email me at bob@test.com with results."},
|
||||
]
|
||||
safe, report = pf.sanitize_messages(messages)
|
||||
assert report.redacted_messages == 1
|
||||
assert "bob@test.com" not in safe[1]["content"]
|
||||
assert "[REDACTED-EMAIL]" in safe[1]["content"]
|
||||
# System message unchanged
|
||||
assert safe[0]["content"] == "You are helpful."
|
||||
|
||||
def test_no_redaction_needed(self):
|
||||
pf = PrivacyFilter()
|
||||
messages = [
|
||||
{"role": "user", "content": "What is 2+2?"},
|
||||
{"role": "assistant", "content": "4"},
|
||||
]
|
||||
safe, report = pf.sanitize_messages(messages)
|
||||
assert report.redacted_messages == 0
|
||||
assert not report.had_redactions
|
||||
|
||||
def test_assistant_messages_also_sanitized(self):
|
||||
pf = PrivacyFilter()
|
||||
messages = [
|
||||
{"role": "assistant", "content": "Your email admin@corp.com was found."},
|
||||
]
|
||||
safe, report = pf.sanitize_messages(messages)
|
||||
assert report.redacted_messages == 1
|
||||
assert "admin@corp.com" not in safe[0]["content"]
|
||||
|
||||
def test_tool_messages_not_sanitized(self):
|
||||
pf = PrivacyFilter()
|
||||
messages = [
|
||||
{"role": "tool", "content": "Result: user@test.com found"},
|
||||
]
|
||||
safe, report = pf.sanitize_messages(messages)
|
||||
assert report.redacted_messages == 0
|
||||
assert safe[0]["content"] == "Result: user@test.com found"
|
||||
|
||||
|
||||
class TestShouldUseLocalOnly:
|
||||
"""Test the local-only routing decision."""
|
||||
|
||||
def test_normal_text_allows_remote(self):
|
||||
pf = PrivacyFilter()
|
||||
block, reason = pf.should_use_local_only("Summarize this article about Python.")
|
||||
assert not block
|
||||
|
||||
def test_critical_secret_blocks_remote(self):
|
||||
pf = PrivacyFilter()
|
||||
text = "Here is the API key: sk-abcdefghijklmnopqrstuvwxyz1234567890"
|
||||
block, reason = pf.should_use_local_only(text)
|
||||
assert block
|
||||
assert "critical" in reason.lower()
|
||||
|
||||
def test_multiple_high_sensitivity_blocks(self):
|
||||
pf = PrivacyFilter()
|
||||
# 3+ high-sensitivity patterns
|
||||
text = (
|
||||
"Card: 4111-1111-1111-1111, "
|
||||
"SSN: 123-45-6789, "
|
||||
"BTC: 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa, "
|
||||
"ETH: 0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18"
|
||||
)
|
||||
block, reason = pf.should_use_local_only(text)
|
||||
assert block
|
||||
|
||||
|
||||
class TestAggressiveMode:
|
||||
"""Test aggressive filtering mode."""
|
||||
|
||||
def test_aggressive_redacts_internal_ips(self):
|
||||
pf = PrivacyFilter(aggressive_mode=True)
|
||||
text = "Server at 192.168.1.100 is responding."
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "192.168.1.100" not in cleaned
|
||||
assert any(r["type"] == "internal_ip" for r in redactions)
|
||||
|
||||
def test_normal_does_not_redact_ips(self):
|
||||
pf = PrivacyFilter(aggressive_mode=False)
|
||||
text = "Server at 192.168.1.100 is responding."
|
||||
cleaned, redactions = pf.sanitize_text(text)
|
||||
assert "192.168.1.100" in cleaned # IP preserved in normal mode
|
||||
|
||||
|
||||
class TestConvenienceFunctions:
|
||||
"""Test module-level convenience functions."""
|
||||
|
||||
def test_quick_sanitize(self):
|
||||
text = "Contact alice@example.com for details"
|
||||
result = quick_sanitize(text)
|
||||
assert "alice@example.com" not in result
|
||||
assert "[REDACTED-EMAIL]" in result
|
||||
|
||||
def test_sanitize_messages_convenience(self):
|
||||
messages = [{"role": "user", "content": "Call 555-000-1234"}]
|
||||
safe, report = sanitize_messages(messages)
|
||||
assert report.redacted_messages == 1
|
||||
|
||||
|
||||
class TestRedactionReport:
|
||||
"""Test the reporting structure."""
|
||||
|
||||
def test_summary_no_redactions(self):
|
||||
report = RedactionReport(total_messages=3, redacted_messages=0)
|
||||
assert "No PII" in report.summary()
|
||||
|
||||
def test_summary_with_redactions(self):
|
||||
report = RedactionReport(
|
||||
total_messages=2,
|
||||
redacted_messages=1,
|
||||
redactions=[
|
||||
{"type": "email_address", "sensitivity": "MEDIUM", "count": 2},
|
||||
{"type": "phone_number_us", "sensitivity": "MEDIUM", "count": 1},
|
||||
],
|
||||
)
|
||||
summary = report.summary()
|
||||
assert "1/2" in summary
|
||||
assert "email_address" in summary
|
||||
150
tests/test_batch_executor.py
Normal file
150
tests/test_batch_executor.py
Normal file
@@ -0,0 +1,150 @@
|
||||
"""Tests for batch tool execution safety classification."""
|
||||
import json
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
|
||||
def _make_tool_call(name: str, args: dict) -> MagicMock:
|
||||
"""Create a mock tool call object."""
|
||||
tc = MagicMock()
|
||||
tc.function.name = name
|
||||
tc.function.arguments = json.dumps(args)
|
||||
tc.id = f"call_{name}_1"
|
||||
return tc
|
||||
|
||||
|
||||
class TestClassification:
|
||||
def test_parallel_safe_read_file(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("read_file", {"path": "README.md"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "parallel_safe"
|
||||
|
||||
def test_parallel_safe_web_search(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("web_search", {"query": "test"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "parallel_safe"
|
||||
|
||||
def test_parallel_safe_search_files(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("search_files", {"pattern": "test"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "parallel_safe"
|
||||
|
||||
def test_never_parallel_clarify(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("clarify", {"question": "test"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "never_parallel"
|
||||
|
||||
def test_terminal_is_sequential(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("terminal", {"command": "ls -la"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "sequential"
|
||||
|
||||
def test_terminal_destructive_rm(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("terminal", {"command": "rm -rf /tmp/test"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "sequential"
|
||||
assert "Destructive" in result.reason
|
||||
|
||||
def test_write_file_is_path_scoped(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("write_file", {"path": "/tmp/test.txt", "content": "hello"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "path_scoped"
|
||||
|
||||
def test_delegate_is_sequential(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("delegate_task", {"goal": "test"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "sequential"
|
||||
|
||||
def test_unknown_tool_is_sequential(self):
|
||||
from tools.batch_executor import classify_single_tool_call
|
||||
tc = _make_tool_call("some_unknown_tool", {"arg": "val"})
|
||||
result = classify_single_tool_call(tc)
|
||||
assert result.tier == "sequential"
|
||||
|
||||
|
||||
class TestBatchClassification:
|
||||
def test_all_parallel_stays_parallel(self):
|
||||
from tools.batch_executor import classify_tool_calls
|
||||
tcs = [
|
||||
_make_tool_call("read_file", {"path": f"file{i}.txt"})
|
||||
for i in range(5)
|
||||
]
|
||||
plan = classify_tool_calls(tcs)
|
||||
assert plan.can_parallelize
|
||||
assert len(plan.parallel_batch) == 5
|
||||
assert len(plan.sequential_batch) == 0
|
||||
|
||||
def test_mixed_batch(self):
|
||||
from tools.batch_executor import classify_tool_calls
|
||||
tcs = [
|
||||
_make_tool_call("read_file", {"path": "a.txt"}),
|
||||
_make_tool_call("terminal", {"command": "ls"}),
|
||||
_make_tool_call("web_search", {"query": "test"}),
|
||||
_make_tool_call("delegate_task", {"goal": "test"}),
|
||||
]
|
||||
plan = classify_tool_calls(tcs)
|
||||
# read_file + web_search should be parallel (both parallel_safe)
|
||||
# terminal + delegate_task should be sequential
|
||||
assert len(plan.parallel_batch) >= 2
|
||||
assert len(plan.sequential_batch) >= 2
|
||||
|
||||
def test_clarify_blocks_all(self):
|
||||
from tools.batch_executor import classify_tool_calls
|
||||
tcs = [
|
||||
_make_tool_call("read_file", {"path": "a.txt"}),
|
||||
_make_tool_call("clarify", {"question": "which one?"}),
|
||||
_make_tool_call("web_search", {"query": "test"}),
|
||||
]
|
||||
plan = classify_tool_calls(tcs)
|
||||
clarify_in_seq = any(c.tool_name == "clarify" for c in plan.sequential_batch)
|
||||
assert clarify_in_seq
|
||||
|
||||
def test_overlapping_paths_sequential(self):
|
||||
from tools.batch_executor import classify_tool_calls
|
||||
tcs = [
|
||||
_make_tool_call("write_file", {"path": "/tmp/test/a.txt", "content": "hello"}),
|
||||
_make_tool_call("patch", {"path": "/tmp/test/a.txt", "old_string": "a", "new_string": "b"}),
|
||||
]
|
||||
plan = classify_tool_calls(tcs)
|
||||
# write_file and patch on SAME file -> conflict -> one must be sequential
|
||||
assert len(plan.sequential_batch) >= 1
|
||||
|
||||
|
||||
class TestDestructiveCommands:
|
||||
def test_rm_flagged(self):
|
||||
from tools.batch_executor import is_destructive_command
|
||||
assert is_destructive_command("rm -rf /tmp")
|
||||
assert is_destructive_command("rm file.txt")
|
||||
|
||||
def test_mv_flagged(self):
|
||||
from tools.batch_executor import is_destructive_command
|
||||
assert is_destructive_command("mv old new")
|
||||
|
||||
def test_sed_i_flagged(self):
|
||||
from tools.batch_executor import is_destructive_command
|
||||
assert is_destructive_command("sed -i 's/a/b/g' file")
|
||||
|
||||
def test_redirect_overwrite_flagged(self):
|
||||
from tools.batch_executor import is_destructive_command
|
||||
assert is_destructive_command("echo test > file.txt")
|
||||
|
||||
def test_safe_commands_not_flagged(self):
|
||||
from tools.batch_executor import is_destructive_command
|
||||
assert not is_destructive_command("ls -la")
|
||||
assert not is_destructive_command("cat file.txt")
|
||||
assert not is_destructive_command("echo test >> file.txt") # append is safe
|
||||
|
||||
|
||||
class TestRegistryIntegration:
|
||||
def test_parallel_safe_in_registry(self):
|
||||
from tools.registry import registry
|
||||
safe = registry.get_parallel_safe_tools()
|
||||
assert isinstance(safe, set)
|
||||
74
tests/test_crisis_tool.py
Normal file
74
tests/test_crisis_tool.py
Normal file
@@ -0,0 +1,74 @@
|
||||
"""Tests for crisis detection tool."""
|
||||
import pytest
|
||||
|
||||
|
||||
def test_detect_crisis_none():
|
||||
from tools.crisis_tool import detect_crisis
|
||||
result = detect_crisis("Hello, how are you today?")
|
||||
assert result.level == "NONE"
|
||||
assert result.score == 0.0
|
||||
|
||||
|
||||
def test_detect_crisis_critical():
|
||||
from tools.crisis_tool import detect_crisis
|
||||
result = detect_crisis("I want to kill myself")
|
||||
assert result.level == "CRITICAL"
|
||||
assert result.score == 1.0
|
||||
assert len(result.indicators) > 0
|
||||
assert "988" in result.recommended_action
|
||||
|
||||
|
||||
def test_detect_crisis_high():
|
||||
from tools.crisis_tool import detect_crisis
|
||||
result = detect_crisis("I feel so hopeless about everything, nothing matters")
|
||||
assert result.level in ("HIGH", "MEDIUM", "CRITICAL")
|
||||
assert result.score > 0
|
||||
|
||||
|
||||
def test_detect_crisis_medium():
|
||||
from tools.crisis_tool import detect_crisis
|
||||
result = detect_crisis("I feel worthless and broken inside")
|
||||
assert result.level in ("MEDIUM", "LOW", "HIGH")
|
||||
assert result.score > 0
|
||||
|
||||
|
||||
def test_detect_crisis_low():
|
||||
from tools.crisis_tool import detect_crisis
|
||||
result = detect_crisis("I'm having a tough time and feeling stressed")
|
||||
assert result.level in ("LOW", "NONE")
|
||||
assert result.score >= 0
|
||||
|
||||
|
||||
def test_detect_crisis_empty():
|
||||
from tools.crisis_tool import detect_crisis
|
||||
result = detect_crisis("")
|
||||
assert result.level == "NONE"
|
||||
result2 = detect_crisis(None)
|
||||
assert result2.level == "NONE"
|
||||
|
||||
|
||||
def test_scan_user_message_returns_none_for_safe():
|
||||
from tools.crisis_tool import scan_user_message
|
||||
result = scan_user_message("What's the weather like?")
|
||||
assert result is None
|
||||
|
||||
|
||||
def test_scan_user_message_returns_dict_for_crisis():
|
||||
from tools.crisis_tool import scan_user_message
|
||||
result = scan_user_message("I want to end it all")
|
||||
assert result is not None
|
||||
assert "level" in result
|
||||
assert "compassion_injection" in result
|
||||
assert result["level"] in ("CRITICAL", "HIGH")
|
||||
|
||||
|
||||
def test_tool_handler():
|
||||
from tools.crisis_tool import crisis_scan_handler
|
||||
import json
|
||||
result = crisis_scan_handler({"text": "I feel fine, thanks"})
|
||||
data = json.loads(result)
|
||||
assert data["level"] == "NONE"
|
||||
|
||||
result2 = crisis_scan_handler({"text": "I want to die"})
|
||||
data2 = json.loads(result2)
|
||||
assert data2["level"] == "CRITICAL"
|
||||
@@ -1,111 +0,0 @@
|
||||
"""
|
||||
Tests for improved error messages in skill_manager_tool (issue #624).
|
||||
Verifies that error messages include file paths, context, and suggestions.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch, MagicMock
|
||||
from tools.skill_manager_tool import _format_error, _edit_skill, _patch_skill
|
||||
|
||||
|
||||
class TestFormatError:
|
||||
"""Test the _format_error helper function."""
|
||||
|
||||
def test_basic_error(self):
|
||||
"""Test basic error formatting."""
|
||||
result = _format_error("Something went wrong")
|
||||
assert result["success"] is False
|
||||
assert "Something went wrong" in result["error"]
|
||||
assert result["skill_name"] is None
|
||||
assert result["file_path"] is None
|
||||
|
||||
def test_with_skill_name(self):
|
||||
"""Test error with skill name."""
|
||||
result = _format_error("Failed", skill_name="test-skill")
|
||||
assert "test-skill" in result["error"]
|
||||
assert result["skill_name"] == "test-skill"
|
||||
|
||||
def test_with_file_path(self):
|
||||
"""Test error with file path."""
|
||||
result = _format_error("Failed", file_path="/path/to/SKILL.md")
|
||||
assert "/path/to/SKILL.md" in result["error"]
|
||||
assert result["file_path"] == "/path/to/SKILL.md"
|
||||
|
||||
def test_with_suggestion(self):
|
||||
"""Test error with suggestion."""
|
||||
result = _format_error("Failed", suggestion="Try again")
|
||||
assert "Suggestion: Try again" in result["error"]
|
||||
assert result["suggestion"] == "Try again"
|
||||
|
||||
def test_with_context(self):
|
||||
"""Test error with context dict."""
|
||||
result = _format_error("Failed", context={"line": 5, "found": "x"})
|
||||
assert "line: 5" in result["error"]
|
||||
assert "found: x" in result["error"]
|
||||
|
||||
def test_all_fields(self):
|
||||
"""Test error with all fields."""
|
||||
result = _format_error(
|
||||
"Pattern match failed",
|
||||
skill_name="my-skill",
|
||||
file_path="/skills/my-skill/SKILL.md",
|
||||
suggestion="Check whitespace",
|
||||
context={"expected": "foo", "found": "bar"}
|
||||
)
|
||||
assert "Pattern match failed" in result["error"]
|
||||
assert "Skill: my-skill" in result["error"]
|
||||
assert "File: /skills/my-skill/SKILL.md" in result["error"]
|
||||
assert "Suggestion: Check whitespace" in result["error"]
|
||||
assert "expected: foo" in result["error"]
|
||||
|
||||
|
||||
class TestEditSkillErrors:
|
||||
"""Test improved error messages in _edit_skill."""
|
||||
|
||||
@patch('tools.skill_manager_tool._find_skill')
|
||||
def test_skill_not_found(self, mock_find):
|
||||
"""Test skill not found error includes suggestion."""
|
||||
mock_find.return_value = None
|
||||
# Provide valid content with frontmatter so it passes validation
|
||||
valid_content = """---
|
||||
name: test
|
||||
description: Test skill
|
||||
---
|
||||
Body content here.
|
||||
"""
|
||||
result = _edit_skill("nonexistent", valid_content)
|
||||
assert result["success"] is False
|
||||
assert "nonexistent" in result["error"]
|
||||
assert "skills_list()" in result.get("suggestion", "")
|
||||
|
||||
|
||||
class TestPatchSkillErrors:
|
||||
"""Test improved error messages in _patch_skill."""
|
||||
|
||||
def test_old_string_required(self):
|
||||
"""Test old_string required error includes suggestion."""
|
||||
result = _patch_skill("test-skill", None, "new")
|
||||
assert result["success"] is False
|
||||
assert "old_string is required" in result["error"]
|
||||
assert "suggestion" in result
|
||||
|
||||
def test_new_string_required(self):
|
||||
"""Test new_string required error includes suggestion."""
|
||||
result = _patch_skill("test-skill", "old", None)
|
||||
assert result["success"] is False
|
||||
assert "new_string is required" in result["error"]
|
||||
assert "suggestion" in result
|
||||
|
||||
@patch('tools.skill_manager_tool._find_skill')
|
||||
def test_skill_not_found(self, mock_find):
|
||||
"""Test skill not found error includes suggestion."""
|
||||
mock_find.return_value = None
|
||||
result = _patch_skill("nonexistent", "old", "new")
|
||||
assert result["success"] is False
|
||||
assert "nonexistent" in result["error"]
|
||||
assert "skills_list()" in result.get("suggestion", "")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,190 +0,0 @@
|
||||
"""Tests for tools.confirmation_daemon — Human Confirmation Firewall."""
|
||||
|
||||
import pytest
|
||||
import time
|
||||
from tools.confirmation_daemon import (
|
||||
ConfirmationDaemon,
|
||||
ConfirmationRequest,
|
||||
ConfirmationStatus,
|
||||
RiskLevel,
|
||||
classify_action,
|
||||
_is_whitelisted,
|
||||
_DEFAULT_WHITELIST,
|
||||
)
|
||||
|
||||
|
||||
class TestClassifyAction:
|
||||
"""Test action risk classification."""
|
||||
|
||||
def test_crypto_tx_is_critical(self):
|
||||
assert classify_action("crypto_tx") == RiskLevel.CRITICAL
|
||||
|
||||
def test_sign_transaction_is_critical(self):
|
||||
assert classify_action("sign_transaction") == RiskLevel.CRITICAL
|
||||
|
||||
def test_send_email_is_high(self):
|
||||
assert classify_action("send_email") == RiskLevel.HIGH
|
||||
|
||||
def test_send_message_is_medium(self):
|
||||
assert classify_action("send_message") == RiskLevel.MEDIUM
|
||||
|
||||
def test_access_calendar_is_low(self):
|
||||
assert classify_action("access_calendar") == RiskLevel.LOW
|
||||
|
||||
def test_unknown_action_is_medium(self):
|
||||
assert classify_action("unknown_action_xyz") == RiskLevel.MEDIUM
|
||||
|
||||
|
||||
class TestWhitelist:
|
||||
"""Test whitelist auto-approval."""
|
||||
|
||||
def test_self_email_is_whitelisted(self):
|
||||
whitelist = dict(_DEFAULT_WHITELIST)
|
||||
payload = {"from": "me@test.com", "to": "me@test.com"}
|
||||
assert _is_whitelisted("send_email", payload, whitelist) is True
|
||||
|
||||
def test_non_whitelisted_recipient_not_approved(self):
|
||||
whitelist = dict(_DEFAULT_WHITELIST)
|
||||
payload = {"to": "random@stranger.com"}
|
||||
assert _is_whitelisted("send_email", payload, whitelist) is False
|
||||
|
||||
def test_whitelisted_contact_approved(self):
|
||||
whitelist = {
|
||||
"send_message": {"targets": ["alice", "bob"]},
|
||||
}
|
||||
assert _is_whitelisted("send_message", {"to": "alice"}, whitelist) is True
|
||||
assert _is_whitelisted("send_message", {"to": "charlie"}, whitelist) is False
|
||||
|
||||
def test_no_whitelist_entry_means_not_whitelisted(self):
|
||||
whitelist = {}
|
||||
assert _is_whitelisted("crypto_tx", {"amount": 1.0}, whitelist) is False
|
||||
|
||||
|
||||
class TestConfirmationRequest:
|
||||
"""Test the request data model."""
|
||||
|
||||
def test_defaults(self):
|
||||
req = ConfirmationRequest(
|
||||
request_id="test-1",
|
||||
action="send_email",
|
||||
description="Test email",
|
||||
risk_level="high",
|
||||
payload={},
|
||||
)
|
||||
assert req.status == ConfirmationStatus.PENDING.value
|
||||
assert req.created_at > 0
|
||||
assert req.expires_at > req.created_at
|
||||
|
||||
def test_is_pending(self):
|
||||
req = ConfirmationRequest(
|
||||
request_id="test-2",
|
||||
action="send_email",
|
||||
description="Test",
|
||||
risk_level="high",
|
||||
payload={},
|
||||
expires_at=time.time() + 300,
|
||||
)
|
||||
assert req.is_pending is True
|
||||
|
||||
def test_is_expired(self):
|
||||
req = ConfirmationRequest(
|
||||
request_id="test-3",
|
||||
action="send_email",
|
||||
description="Test",
|
||||
risk_level="high",
|
||||
payload={},
|
||||
expires_at=time.time() - 10,
|
||||
)
|
||||
assert req.is_expired is True
|
||||
assert req.is_pending is False
|
||||
|
||||
def test_to_dict(self):
|
||||
req = ConfirmationRequest(
|
||||
request_id="test-4",
|
||||
action="send_email",
|
||||
description="Test",
|
||||
risk_level="medium",
|
||||
payload={"to": "a@b.com"},
|
||||
)
|
||||
d = req.to_dict()
|
||||
assert d["request_id"] == "test-4"
|
||||
assert d["action"] == "send_email"
|
||||
assert "is_pending" in d
|
||||
|
||||
|
||||
class TestConfirmationDaemon:
|
||||
"""Test the daemon logic (without HTTP layer)."""
|
||||
|
||||
def test_auto_approve_low_risk(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
req = daemon.request(
|
||||
action="access_calendar",
|
||||
description="Read today's events",
|
||||
risk_level="low",
|
||||
)
|
||||
assert req.status == ConfirmationStatus.AUTO_APPROVED.value
|
||||
|
||||
def test_whitelisted_auto_approves(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
daemon._whitelist = {"send_message": {"targets": ["alice"]}}
|
||||
req = daemon.request(
|
||||
action="send_message",
|
||||
description="Message alice",
|
||||
payload={"to": "alice"},
|
||||
)
|
||||
assert req.status == ConfirmationStatus.AUTO_APPROVED.value
|
||||
|
||||
def test_non_whitelisted_goes_pending(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
daemon._whitelist = {}
|
||||
req = daemon.request(
|
||||
action="send_email",
|
||||
description="Email to stranger",
|
||||
payload={"to": "stranger@test.com"},
|
||||
risk_level="high",
|
||||
)
|
||||
assert req.status == ConfirmationStatus.PENDING.value
|
||||
assert req.is_pending is True
|
||||
|
||||
def test_approve_response(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
daemon._whitelist = {}
|
||||
req = daemon.request(
|
||||
action="send_email",
|
||||
description="Email test",
|
||||
risk_level="high",
|
||||
)
|
||||
result = daemon.respond(req.request_id, approved=True, decided_by="human")
|
||||
assert result.status == ConfirmationStatus.APPROVED.value
|
||||
assert result.decided_by == "human"
|
||||
|
||||
def test_deny_response(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
daemon._whitelist = {}
|
||||
req = daemon.request(
|
||||
action="crypto_tx",
|
||||
description="Send 1 ETH",
|
||||
risk_level="critical",
|
||||
)
|
||||
result = daemon.respond(
|
||||
req.request_id, approved=False, decided_by="human", reason="Too risky"
|
||||
)
|
||||
assert result.status == ConfirmationStatus.DENIED.value
|
||||
assert result.reason == "Too risky"
|
||||
|
||||
def test_get_pending(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
daemon._whitelist = {}
|
||||
daemon.request(action="send_email", description="Test 1", risk_level="high")
|
||||
daemon.request(action="send_email", description="Test 2", risk_level="high")
|
||||
pending = daemon.get_pending()
|
||||
assert len(pending) >= 2
|
||||
|
||||
def test_get_history(self):
|
||||
daemon = ConfirmationDaemon()
|
||||
req = daemon.request(
|
||||
action="access_calendar", description="Test", risk_level="low"
|
||||
)
|
||||
history = daemon.get_history()
|
||||
assert len(history) >= 1
|
||||
assert history[0]["action"] == "access_calendar"
|
||||
@@ -121,19 +121,6 @@ DANGEROUS_PATTERNS = [
|
||||
(r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"),
|
||||
(r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"),
|
||||
(r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"),
|
||||
# --- Vitalik's threat model: crypto / financial ---
|
||||
(r'\b(?:bitcoin-cli|ethers\.js|web3|ether\.sendTransaction)\b', "direct crypto transaction tool usage"),
|
||||
(r'\bwget\b.*\b(?:mnemonic|seed\s*phrase|private[_-]?key)\b', "attempting to download crypto credentials"),
|
||||
(r'\bcurl\b.*\b(?:mnemonic|seed\s*phrase|private[_-]?key)\b', "attempting to exfiltrate crypto credentials"),
|
||||
# --- Vitalik's threat model: credential exfiltration ---
|
||||
(r'\b(?:curl|wget|http|nc|ncat|socat)\b.*\b(?:\.env|\.ssh|credentials|secrets|token|api[_-]?key)\b',
|
||||
"attempting to exfiltrate credentials via network"),
|
||||
(r'\bbase64\b.*\|(?:\s*curl|\s*wget)', "base64-encode then network exfiltration"),
|
||||
(r'\bcat\b.*\b(?:\.env|\.ssh/id_rsa|credentials)\b.*\|(?:\s*curl|\s*wget)',
|
||||
"reading secrets and piping to network tool"),
|
||||
# --- Vitalik's threat model: data exfiltration ---
|
||||
(r'\bcurl\b.*-d\s.*\$(?:HOME|USER)', "sending user home directory data to remote"),
|
||||
(r'\bwget\b.*--post-data\s.*\$(?:HOME|USER)', "posting user data to remote"),
|
||||
# Script execution via heredoc — bypasses the -e/-c flag patterns above.
|
||||
# `python3 << 'EOF'` feeds arbitrary code via stdin without -c/-e flags.
|
||||
(r'\b(python[23]?|perl|ruby|node)\s+<<', "script execution via heredoc"),
|
||||
|
||||
294
tools/batch_executor.py
Normal file
294
tools/batch_executor.py
Normal file
@@ -0,0 +1,294 @@
|
||||
"""Batch Tool Executor — Parallel safety classification and concurrent execution.
|
||||
|
||||
Provides centralized classification of tool calls into parallel-safe vs sequential,
|
||||
and utilities for batch execution with safety checks.
|
||||
|
||||
Classification tiers:
|
||||
- PARALLEL_SAFE: read-only tools, no shared state (web_search, read_file, etc.)
|
||||
- PATH_SCOPED: file operations that can run concurrently when paths don't overlap
|
||||
- SEQUENTIAL: writes, destructive ops, terminal commands, delegation
|
||||
- NEVER_PARALLEL: clarify (requires user interaction)
|
||||
|
||||
Usage:
|
||||
from tools.batch_executor import classify_tool_calls, BatchExecutionPlan
|
||||
|
||||
plan = classify_tool_calls(tool_calls)
|
||||
if plan.can_parallelize:
|
||||
execute_concurrent(plan.parallel_batch)
|
||||
execute_sequential(plan.sequential_batch)
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Safety Classification ──────────────────────────────────────────────────
|
||||
|
||||
# Tools that can ALWAYS run in parallel (read-only, no shared state)
|
||||
DEFAULT_PARALLEL_SAFE = frozenset({
|
||||
"ha_get_state",
|
||||
"ha_list_entities",
|
||||
"ha_list_services",
|
||||
"read_file",
|
||||
"search_files",
|
||||
"session_search",
|
||||
"skill_view",
|
||||
"skills_list",
|
||||
"vision_analyze",
|
||||
"web_extract",
|
||||
"web_search",
|
||||
"fact_store",
|
||||
"fact_search",
|
||||
"session_search",
|
||||
})
|
||||
|
||||
# File tools that can run concurrently ONLY when paths don't overlap
|
||||
PATH_SCOPED_TOOLS = frozenset({"read_file", "write_file", "patch"})
|
||||
|
||||
# Tools that must NEVER run in parallel (require user interaction, shared mutable state)
|
||||
NEVER_PARALLEL = frozenset({"clarify"})
|
||||
|
||||
# Patterns that indicate terminal commands may modify/delete files
|
||||
DESTRUCTIVE_PATTERNS = re.compile(
|
||||
r"""(?:^|\s|&&|\|\||;|`)(?:
|
||||
rm\s|rmdir\s|
|
||||
mv\s|
|
||||
sed\s+-i|
|
||||
truncate\s|
|
||||
dd\s|
|
||||
shred\s|
|
||||
git\s+(?:reset|clean|checkout)\s
|
||||
)""",
|
||||
re.VERBOSE,
|
||||
)
|
||||
|
||||
# Output redirects that overwrite files (> but not >>)
|
||||
REDIRECT_OVERWRITE = re.compile(r'[^>]>[^>]|^>[^>]')
|
||||
|
||||
|
||||
def is_destructive_command(cmd: str) -> bool:
|
||||
"""Check if a terminal command modifies/deletes files."""
|
||||
if not cmd:
|
||||
return False
|
||||
if DESTRUCTIVE_PATTERNS.search(cmd):
|
||||
return True
|
||||
if REDIRECT_OVERWRITE.search(cmd):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _paths_overlap(path1: Path, path2: Path) -> bool:
|
||||
"""Check if two paths could conflict (one is ancestor of the other)."""
|
||||
try:
|
||||
path1 = path1.resolve()
|
||||
path2 = path2.resolve()
|
||||
return path1 == path2 or path1 in path2.parents or path2 in path1.parents
|
||||
except Exception:
|
||||
return True # conservative: assume overlap
|
||||
|
||||
|
||||
def _extract_path(tool_name: str, args: dict) -> Optional[Path]:
|
||||
"""Extract the target path from tool arguments for path-scoped tools."""
|
||||
if tool_name not in PATH_SCOPED_TOOLS:
|
||||
return None
|
||||
raw_path = args.get("path")
|
||||
if not isinstance(raw_path, str) or not raw_path.strip():
|
||||
return None
|
||||
try:
|
||||
return Path(raw_path).expanduser().resolve()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
# ── Classification ─────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class ToolCallClassification:
|
||||
"""Classification result for a single tool call."""
|
||||
tool_name: str
|
||||
args: dict
|
||||
tool_call: Any # the original tool_call object
|
||||
tier: str # "parallel_safe", "path_scoped", "sequential", "never_parallel"
|
||||
reason: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class BatchExecutionPlan:
|
||||
"""Plan for executing a batch of tool calls."""
|
||||
classifications: List[ToolCallClassification] = field(default_factory=list)
|
||||
parallel_batch: List[ToolCallClassification] = field(default_factory=list)
|
||||
sequential_batch: List[ToolCallClassification] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def can_parallelize(self) -> bool:
|
||||
return len(self.parallel_batch) > 1
|
||||
|
||||
@property
|
||||
def total(self) -> int:
|
||||
return len(self.classifications)
|
||||
|
||||
|
||||
def classify_single_tool_call(
|
||||
tool_call: Any,
|
||||
extra_parallel_safe: Set[str] = None,
|
||||
) -> ToolCallClassification:
|
||||
"""Classify a single tool call into its safety tier."""
|
||||
tool_name = tool_call.function.name
|
||||
try:
|
||||
args = json.loads(tool_call.function.arguments)
|
||||
except Exception:
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args={}, tool_call=tool_call,
|
||||
tier="sequential", reason="Could not parse arguments"
|
||||
)
|
||||
|
||||
if not isinstance(args, dict):
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="sequential", reason="Non-dict arguments"
|
||||
)
|
||||
|
||||
# Check never-parallel
|
||||
if tool_name in NEVER_PARALLEL:
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="never_parallel", reason="Requires user interaction"
|
||||
)
|
||||
|
||||
# Check parallel-safe FIRST (before path_scoped) so read_file/search_files
|
||||
# get classified as parallel_safe even though they have paths
|
||||
parallel_safe_set = DEFAULT_PARALLEL_SAFE
|
||||
if extra_parallel_safe:
|
||||
parallel_safe_set = parallel_safe_set | extra_parallel_safe
|
||||
|
||||
if tool_name in parallel_safe_set:
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="parallel_safe", reason="Read-only, no shared state"
|
||||
)
|
||||
|
||||
# Check terminal commands for destructive operations
|
||||
if tool_name == "terminal":
|
||||
cmd = args.get("command", "")
|
||||
if is_destructive_command(cmd):
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="sequential", reason=f"Destructive command: {cmd[:50]}"
|
||||
)
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="sequential", reason="Terminal command (conservative)"
|
||||
)
|
||||
|
||||
# Check path-scoped tools (write_file, patch — not read_file which is parallel_safe)
|
||||
if tool_name in PATH_SCOPED_TOOLS:
|
||||
path = _extract_path(tool_name, args)
|
||||
if path:
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="path_scoped", reason=f"Path: {path}"
|
||||
)
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="sequential", reason="Path-scoped but no path found"
|
||||
)
|
||||
|
||||
# Default: sequential (conservative)
|
||||
return ToolCallClassification(
|
||||
tool_name=tool_name, args=args, tool_call=tool_call,
|
||||
tier="sequential", reason="Not classified as parallel-safe"
|
||||
)
|
||||
|
||||
|
||||
def classify_tool_calls(
|
||||
tool_calls: list,
|
||||
extra_parallel_safe: Set[str] = None,
|
||||
) -> BatchExecutionPlan:
|
||||
"""Classify a batch of tool calls and produce an execution plan."""
|
||||
plan = BatchExecutionPlan()
|
||||
|
||||
reserved_paths: List[Path] = []
|
||||
|
||||
for tc in tool_calls:
|
||||
classification = classify_single_tool_call(tc, extra_parallel_safe)
|
||||
plan.classifications.append(classification)
|
||||
|
||||
if classification.tier == "never_parallel":
|
||||
plan.sequential_batch.append(classification)
|
||||
continue
|
||||
|
||||
if classification.tier == "sequential":
|
||||
plan.sequential_batch.append(classification)
|
||||
continue
|
||||
|
||||
if classification.tier == "path_scoped":
|
||||
path = _extract_path(classification.tool_name, classification.args)
|
||||
if path is None:
|
||||
classification.tier = "sequential"
|
||||
classification.reason = "Path extraction failed"
|
||||
plan.sequential_batch.append(classification)
|
||||
continue
|
||||
|
||||
# Check for path conflicts with already-scheduled parallel calls
|
||||
conflict = any(_paths_overlap(path, existing) for existing in reserved_paths)
|
||||
if conflict:
|
||||
classification.tier = "sequential"
|
||||
classification.reason = f"Path conflict: {path}"
|
||||
plan.sequential_batch.append(classification)
|
||||
else:
|
||||
reserved_paths.append(path)
|
||||
plan.parallel_batch.append(classification)
|
||||
continue
|
||||
|
||||
if classification.tier == "parallel_safe":
|
||||
plan.parallel_batch.append(classification)
|
||||
continue
|
||||
|
||||
# Fallback
|
||||
plan.sequential_batch.append(classification)
|
||||
|
||||
return plan
|
||||
|
||||
|
||||
# ── Concurrent Execution ───────────────────────────────────────────────────
|
||||
|
||||
def execute_parallel_batch(
|
||||
batch: List[ToolCallClassification],
|
||||
invoke_fn: Callable,
|
||||
max_workers: int = 8,
|
||||
) -> List[Tuple[str, str]]:
|
||||
"""Execute parallel-safe tool calls concurrently.
|
||||
|
||||
Args:
|
||||
batch: List of classified tool calls (parallel_safe or path_scoped)
|
||||
invoke_fn: Function(tool_name, args) -> result_string
|
||||
max_workers: Max concurrent threads
|
||||
|
||||
Returns:
|
||||
List of (tool_call_id, result_string) tuples
|
||||
"""
|
||||
results = []
|
||||
|
||||
with ThreadPoolExecutor(max_workers=min(max_workers, len(batch))) as executor:
|
||||
future_to_tc = {}
|
||||
for tc in batch:
|
||||
future = executor.submit(invoke_fn, tc.tool_name, tc.args)
|
||||
future_to_tc[future] = tc
|
||||
|
||||
for future in as_completed(future_to_tc):
|
||||
tc = future_to_tc[future]
|
||||
try:
|
||||
result = future.result()
|
||||
except Exception as e:
|
||||
result = json.dumps({"error": str(e)})
|
||||
tool_call_id = getattr(tc.tool_call, "id", None) or ""
|
||||
results.append((tool_call_id, result))
|
||||
|
||||
return results
|
||||
@@ -1,615 +0,0 @@
|
||||
"""Human Confirmation Daemon — HTTP server for two-factor action approval.
|
||||
|
||||
Implements Vitalik's Pattern 1: "The new 'two-factor confirmation' is that
|
||||
the two factors are the human and the LLM."
|
||||
|
||||
This daemon runs on localhost:6000 and provides a simple HTTP API for the
|
||||
agent to request human approval before executing high-risk actions.
|
||||
|
||||
Threat model:
|
||||
- LLM jailbreaks: Remote content "hacking" the LLM to perform malicious actions
|
||||
- LLM accidents: LLM accidentally performing dangerous operations
|
||||
- The human acts as the second factor — the agent proposes, the human disposes
|
||||
|
||||
Architecture:
|
||||
- Agent detects high-risk action → POST /confirm with action details
|
||||
- Daemon stores pending request, sends notification to user
|
||||
- User approves/denies via POST /respond (Telegram, CLI, or direct HTTP)
|
||||
- Agent receives decision and proceeds or aborts
|
||||
|
||||
Usage:
|
||||
# Start daemon (usually managed by gateway)
|
||||
from tools.confirmation_daemon import ConfirmationDaemon
|
||||
daemon = ConfirmationDaemon(port=6000)
|
||||
daemon.start()
|
||||
|
||||
# Request approval (from agent code)
|
||||
from tools.confirmation_daemon import request_confirmation
|
||||
approved = request_confirmation(
|
||||
action="send_email",
|
||||
description="Send email to alice@example.com",
|
||||
risk_level="high",
|
||||
payload={"to": "alice@example.com", "subject": "Meeting notes"},
|
||||
timeout=300,
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from enum import Enum, auto
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RiskLevel(Enum):
|
||||
"""Risk classification for actions requiring confirmation."""
|
||||
LOW = "low" # Log only, no confirmation needed
|
||||
MEDIUM = "medium" # Confirm for non-whitelisted targets
|
||||
HIGH = "high" # Always confirm
|
||||
CRITICAL = "critical" # Always confirm + require explicit reason
|
||||
|
||||
|
||||
class ConfirmationStatus(Enum):
|
||||
"""Status of a pending confirmation request."""
|
||||
PENDING = "pending"
|
||||
APPROVED = "approved"
|
||||
DENIED = "denied"
|
||||
EXPIRED = "expired"
|
||||
AUTO_APPROVED = "auto_approved"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConfirmationRequest:
|
||||
"""A request for human confirmation of a high-risk action."""
|
||||
request_id: str
|
||||
action: str # Action type: send_email, send_message, crypto_tx, etc.
|
||||
description: str # Human-readable description of what will happen
|
||||
risk_level: str # low, medium, high, critical
|
||||
payload: Dict[str, Any] # Action-specific data (sanitized)
|
||||
session_key: str = "" # Session that initiated the request
|
||||
created_at: float = 0.0
|
||||
expires_at: float = 0.0
|
||||
status: str = ConfirmationStatus.PENDING.value
|
||||
decided_at: float = 0.0
|
||||
decided_by: str = "" # "human", "auto", "whitelist"
|
||||
reason: str = "" # Optional reason for denial
|
||||
|
||||
def __post_init__(self):
|
||||
if not self.created_at:
|
||||
self.created_at = time.time()
|
||||
if not self.expires_at:
|
||||
self.expires_at = self.created_at + 300 # 5 min default
|
||||
if not self.request_id:
|
||||
self.request_id = str(uuid.uuid4())[:12]
|
||||
|
||||
@property
|
||||
def is_expired(self) -> bool:
|
||||
return time.time() > self.expires_at
|
||||
|
||||
@property
|
||||
def is_pending(self) -> bool:
|
||||
return self.status == ConfirmationStatus.PENDING.value and not self.is_expired
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
d = asdict(self)
|
||||
d["is_expired"] = self.is_expired
|
||||
d["is_pending"] = self.is_pending
|
||||
return d
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Action categories (Vitalik's threat model)
|
||||
# =========================================================================
|
||||
|
||||
ACTION_CATEGORIES = {
|
||||
# Messaging — outbound communication to external parties
|
||||
"send_email": RiskLevel.HIGH,
|
||||
"send_message": RiskLevel.MEDIUM, # Depends on recipient
|
||||
"send_signal": RiskLevel.HIGH,
|
||||
"send_telegram": RiskLevel.MEDIUM,
|
||||
"send_discord": RiskLevel.MEDIUM,
|
||||
"post_social": RiskLevel.HIGH,
|
||||
|
||||
# Financial / crypto
|
||||
"crypto_tx": RiskLevel.CRITICAL,
|
||||
"sign_transaction": RiskLevel.CRITICAL,
|
||||
"access_wallet": RiskLevel.CRITICAL,
|
||||
"modify_balance": RiskLevel.CRITICAL,
|
||||
|
||||
# System modification
|
||||
"install_software": RiskLevel.HIGH,
|
||||
"modify_system_config": RiskLevel.HIGH,
|
||||
"modify_firewall": RiskLevel.CRITICAL,
|
||||
"add_ssh_key": RiskLevel.CRITICAL,
|
||||
"create_user": RiskLevel.CRITICAL,
|
||||
|
||||
# Data access
|
||||
"access_contacts": RiskLevel.MEDIUM,
|
||||
"access_calendar": RiskLevel.LOW,
|
||||
"read_private_files": RiskLevel.MEDIUM,
|
||||
"upload_data": RiskLevel.HIGH,
|
||||
"share_credentials": RiskLevel.CRITICAL,
|
||||
|
||||
# Network
|
||||
"open_port": RiskLevel.HIGH,
|
||||
"modify_dns": RiskLevel.HIGH,
|
||||
"expose_service": RiskLevel.CRITICAL,
|
||||
}
|
||||
|
||||
# Default: any unrecognized action is MEDIUM risk
|
||||
DEFAULT_RISK_LEVEL = RiskLevel.MEDIUM
|
||||
|
||||
|
||||
def classify_action(action: str) -> RiskLevel:
|
||||
"""Classify an action by its risk level."""
|
||||
return ACTION_CATEGORIES.get(action, DEFAULT_RISK_LEVEL)
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Whitelist configuration
|
||||
# =========================================================================
|
||||
|
||||
_DEFAULT_WHITELIST = {
|
||||
"send_message": {
|
||||
"targets": [], # Contact names/IDs that don't need confirmation
|
||||
},
|
||||
"send_email": {
|
||||
"targets": [], # Email addresses that don't need confirmation
|
||||
"self_only": True, # send-to-self always allowed
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def _load_whitelist() -> Dict[str, Any]:
|
||||
"""Load action whitelist from config."""
|
||||
config_path = Path.home() / ".hermes" / "approval_whitelist.json"
|
||||
if config_path.exists():
|
||||
try:
|
||||
with open(config_path) as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.warning("Failed to load approval whitelist: %s", e)
|
||||
return dict(_DEFAULT_WHITELIST)
|
||||
|
||||
|
||||
def _is_whitelisted(action: str, payload: Dict[str, Any], whitelist: Dict) -> bool:
|
||||
"""Check if an action is pre-approved by the whitelist."""
|
||||
action_config = whitelist.get(action, {})
|
||||
if not action_config:
|
||||
return False
|
||||
|
||||
# Check target-based whitelist
|
||||
targets = action_config.get("targets", [])
|
||||
target = payload.get("to") or payload.get("recipient") or payload.get("target", "")
|
||||
if target and target in targets:
|
||||
return True
|
||||
|
||||
# Self-only email
|
||||
if action_config.get("self_only") and action == "send_email":
|
||||
sender = payload.get("from", "")
|
||||
recipient = payload.get("to", "")
|
||||
if sender and recipient and sender.lower() == recipient.lower():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Confirmation daemon
|
||||
# =========================================================================
|
||||
|
||||
class ConfirmationDaemon:
|
||||
"""HTTP daemon for human confirmation of high-risk actions.
|
||||
|
||||
Runs on localhost:PORT (default 6000). Provides:
|
||||
- POST /confirm — agent requests human approval
|
||||
- POST /respond — human approves/denies
|
||||
- GET /pending — list pending requests
|
||||
- GET /health — health check
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
host: str = "127.0.0.1",
|
||||
port: int = 6000,
|
||||
default_timeout: int = 300,
|
||||
notify_callback: Optional[Callable] = None,
|
||||
):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.default_timeout = default_timeout
|
||||
self.notify_callback = notify_callback
|
||||
self._pending: Dict[str, ConfirmationRequest] = {}
|
||||
self._history: List[ConfirmationRequest] = []
|
||||
self._lock = threading.Lock()
|
||||
self._whitelist = _load_whitelist()
|
||||
self._app = None
|
||||
self._runner = None
|
||||
|
||||
def request(
|
||||
self,
|
||||
action: str,
|
||||
description: str,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
risk_level: Optional[str] = None,
|
||||
session_key: str = "",
|
||||
timeout: Optional[int] = None,
|
||||
) -> ConfirmationRequest:
|
||||
"""Create a confirmation request.
|
||||
|
||||
Returns the request. Check .status to see if it was immediately
|
||||
auto-approved (whitelisted) or is pending human review.
|
||||
"""
|
||||
payload = payload or {}
|
||||
|
||||
# Classify risk if not specified
|
||||
if risk_level is None:
|
||||
risk_level = classify_action(action).value
|
||||
|
||||
# Check whitelist
|
||||
if risk_level in ("low",) or _is_whitelisted(action, payload, self._whitelist):
|
||||
req = ConfirmationRequest(
|
||||
request_id=str(uuid.uuid4())[:12],
|
||||
action=action,
|
||||
description=description,
|
||||
risk_level=risk_level,
|
||||
payload=payload,
|
||||
session_key=session_key,
|
||||
expires_at=time.time() + (timeout or self.default_timeout),
|
||||
status=ConfirmationStatus.AUTO_APPROVED.value,
|
||||
decided_at=time.time(),
|
||||
decided_by="whitelist",
|
||||
)
|
||||
with self._lock:
|
||||
self._history.append(req)
|
||||
logger.info("Auto-approved whitelisted action: %s", action)
|
||||
return req
|
||||
|
||||
# Create pending request
|
||||
req = ConfirmationRequest(
|
||||
request_id=str(uuid.uuid4())[:12],
|
||||
action=action,
|
||||
description=description,
|
||||
risk_level=risk_level,
|
||||
payload=payload,
|
||||
session_key=session_key,
|
||||
expires_at=time.time() + (timeout or self.default_timeout),
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
self._pending[req.request_id] = req
|
||||
|
||||
# Notify human
|
||||
if self.notify_callback:
|
||||
try:
|
||||
self.notify_callback(req.to_dict())
|
||||
except Exception as e:
|
||||
logger.warning("Confirmation notify callback failed: %s", e)
|
||||
|
||||
logger.info(
|
||||
"Confirmation request %s: %s (%s risk) — waiting for human",
|
||||
req.request_id, action, risk_level,
|
||||
)
|
||||
return req
|
||||
|
||||
def respond(
|
||||
self,
|
||||
request_id: str,
|
||||
approved: bool,
|
||||
decided_by: str = "human",
|
||||
reason: str = "",
|
||||
) -> Optional[ConfirmationRequest]:
|
||||
"""Record a human decision on a pending request."""
|
||||
with self._lock:
|
||||
req = self._pending.get(request_id)
|
||||
if not req:
|
||||
logger.warning("Confirmation respond: unknown request %s", request_id)
|
||||
return None
|
||||
if not req.is_pending:
|
||||
logger.warning("Confirmation respond: request %s already decided", request_id)
|
||||
return req
|
||||
|
||||
req.status = (
|
||||
ConfirmationStatus.APPROVED.value if approved
|
||||
else ConfirmationStatus.DENIED.value
|
||||
)
|
||||
req.decided_at = time.time()
|
||||
req.decided_by = decided_by
|
||||
req.reason = reason
|
||||
|
||||
# Move to history
|
||||
del self._pending[request_id]
|
||||
self._history.append(req)
|
||||
|
||||
logger.info(
|
||||
"Confirmation %s: %s by %s",
|
||||
request_id, "APPROVED" if approved else "DENIED", decided_by,
|
||||
)
|
||||
return req
|
||||
|
||||
def wait_for_decision(
|
||||
self, request_id: str, timeout: Optional[float] = None
|
||||
) -> ConfirmationRequest:
|
||||
"""Block until a decision is made or timeout expires."""
|
||||
deadline = time.time() + (timeout or self.default_timeout)
|
||||
while time.time() < deadline:
|
||||
with self._lock:
|
||||
req = self._pending.get(request_id)
|
||||
if req and not req.is_pending:
|
||||
return req
|
||||
if req and req.is_expired:
|
||||
req.status = ConfirmationStatus.EXPIRED.value
|
||||
del self._pending[request_id]
|
||||
self._history.append(req)
|
||||
return req
|
||||
time.sleep(0.5)
|
||||
|
||||
# Timeout
|
||||
with self._lock:
|
||||
req = self._pending.pop(request_id, None)
|
||||
if req:
|
||||
req.status = ConfirmationStatus.EXPIRED.value
|
||||
self._history.append(req)
|
||||
return req
|
||||
|
||||
# Shouldn't reach here
|
||||
return ConfirmationRequest(
|
||||
request_id=request_id,
|
||||
action="unknown",
|
||||
description="Request not found",
|
||||
risk_level="high",
|
||||
payload={},
|
||||
status=ConfirmationStatus.EXPIRED.value,
|
||||
)
|
||||
|
||||
def get_pending(self) -> List[Dict[str, Any]]:
|
||||
"""Return list of pending confirmation requests."""
|
||||
self._expire_old()
|
||||
with self._lock:
|
||||
return [r.to_dict() for r in self._pending.values() if r.is_pending]
|
||||
|
||||
def get_history(self, limit: int = 50) -> List[Dict[str, Any]]:
|
||||
"""Return recent confirmation history."""
|
||||
with self._lock:
|
||||
return [r.to_dict() for r in self._history[-limit:]]
|
||||
|
||||
def _expire_old(self) -> None:
|
||||
"""Move expired requests to history."""
|
||||
now = time.time()
|
||||
with self._lock:
|
||||
expired = [
|
||||
rid for rid, req in self._pending.items()
|
||||
if now > req.expires_at
|
||||
]
|
||||
for rid in expired:
|
||||
req = self._pending.pop(rid)
|
||||
req.status = ConfirmationStatus.EXPIRED.value
|
||||
self._history.append(req)
|
||||
|
||||
# --- aiohttp HTTP API ---
|
||||
|
||||
async def _handle_health(self, request):
|
||||
from aiohttp import web
|
||||
return web.json_response({
|
||||
"status": "ok",
|
||||
"service": "hermes-confirmation-daemon",
|
||||
"pending": len(self._pending),
|
||||
})
|
||||
|
||||
async def _handle_confirm(self, request):
|
||||
from aiohttp import web
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return web.json_response({"error": "invalid JSON"}, status=400)
|
||||
|
||||
action = body.get("action", "")
|
||||
description = body.get("description", "")
|
||||
if not action or not description:
|
||||
return web.json_response(
|
||||
{"error": "action and description required"}, status=400
|
||||
)
|
||||
|
||||
req = self.request(
|
||||
action=action,
|
||||
description=description,
|
||||
payload=body.get("payload", {}),
|
||||
risk_level=body.get("risk_level"),
|
||||
session_key=body.get("session_key", ""),
|
||||
timeout=body.get("timeout"),
|
||||
)
|
||||
|
||||
# If auto-approved, return immediately
|
||||
if req.status != ConfirmationStatus.PENDING.value:
|
||||
return web.json_response({
|
||||
"request_id": req.request_id,
|
||||
"status": req.status,
|
||||
"decided_by": req.decided_by,
|
||||
})
|
||||
|
||||
# Otherwise, wait for human decision (with timeout)
|
||||
timeout = min(body.get("timeout", self.default_timeout), 600)
|
||||
result = self.wait_for_decision(req.request_id, timeout=timeout)
|
||||
|
||||
return web.json_response({
|
||||
"request_id": result.request_id,
|
||||
"status": result.status,
|
||||
"decided_by": result.decided_by,
|
||||
"reason": result.reason,
|
||||
})
|
||||
|
||||
async def _handle_respond(self, request):
|
||||
from aiohttp import web
|
||||
try:
|
||||
body = await request.json()
|
||||
except Exception:
|
||||
return web.json_response({"error": "invalid JSON"}, status=400)
|
||||
|
||||
request_id = body.get("request_id", "")
|
||||
approved = body.get("approved")
|
||||
if not request_id or approved is None:
|
||||
return web.json_response(
|
||||
{"error": "request_id and approved required"}, status=400
|
||||
)
|
||||
|
||||
result = self.respond(
|
||||
request_id=request_id,
|
||||
approved=bool(approved),
|
||||
decided_by=body.get("decided_by", "human"),
|
||||
reason=body.get("reason", ""),
|
||||
)
|
||||
|
||||
if not result:
|
||||
return web.json_response({"error": "unknown request"}, status=404)
|
||||
|
||||
return web.json_response({
|
||||
"request_id": result.request_id,
|
||||
"status": result.status,
|
||||
})
|
||||
|
||||
async def _handle_pending(self, request):
|
||||
from aiohttp import web
|
||||
return web.json_response({"pending": self.get_pending()})
|
||||
|
||||
def _build_app(self):
|
||||
"""Build the aiohttp application."""
|
||||
from aiohttp import web
|
||||
|
||||
app = web.Application()
|
||||
app.router.add_get("/health", self._handle_health)
|
||||
app.router.add_post("/confirm", self._handle_confirm)
|
||||
app.router.add_post("/respond", self._handle_respond)
|
||||
app.router.add_get("/pending", self._handle_pending)
|
||||
self._app = app
|
||||
return app
|
||||
|
||||
async def start_async(self) -> None:
|
||||
"""Start the daemon as an async server."""
|
||||
from aiohttp import web
|
||||
|
||||
app = self._build_app()
|
||||
self._runner = web.AppRunner(app)
|
||||
await self._runner.setup()
|
||||
site = web.TCPSite(self._runner, self.host, self.port)
|
||||
await site.start()
|
||||
logger.info("Confirmation daemon listening on %s:%d", self.host, self.port)
|
||||
|
||||
async def stop_async(self) -> None:
|
||||
"""Stop the daemon."""
|
||||
if self._runner:
|
||||
await self._runner.cleanup()
|
||||
self._runner = None
|
||||
|
||||
def start(self) -> None:
|
||||
"""Start daemon in a background thread (blocking caller)."""
|
||||
def _run():
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self.start_async())
|
||||
loop.run_forever()
|
||||
|
||||
t = threading.Thread(target=_run, daemon=True, name="confirmation-daemon")
|
||||
t.start()
|
||||
logger.info("Confirmation daemon started in background thread")
|
||||
|
||||
def start_blocking(self) -> None:
|
||||
"""Start daemon and block (for standalone use)."""
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
loop.run_until_complete(self.start_async())
|
||||
try:
|
||||
loop.run_forever()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
loop.run_until_complete(self.stop_async())
|
||||
|
||||
|
||||
# =========================================================================
|
||||
# Convenience API for agent integration
|
||||
# =========================================================================
|
||||
|
||||
# Global singleton — initialized by gateway or CLI at startup
|
||||
_daemon: Optional[ConfirmationDaemon] = None
|
||||
|
||||
|
||||
def get_daemon() -> Optional[ConfirmationDaemon]:
|
||||
"""Get the global confirmation daemon instance."""
|
||||
return _daemon
|
||||
|
||||
|
||||
def init_daemon(
|
||||
host: str = "127.0.0.1",
|
||||
port: int = 6000,
|
||||
notify_callback: Optional[Callable] = None,
|
||||
) -> ConfirmationDaemon:
|
||||
"""Initialize the global confirmation daemon."""
|
||||
global _daemon
|
||||
_daemon = ConfirmationDaemon(
|
||||
host=host, port=port, notify_callback=notify_callback
|
||||
)
|
||||
return _daemon
|
||||
|
||||
|
||||
def request_confirmation(
|
||||
action: str,
|
||||
description: str,
|
||||
payload: Optional[Dict[str, Any]] = None,
|
||||
risk_level: Optional[str] = None,
|
||||
session_key: str = "",
|
||||
timeout: int = 300,
|
||||
) -> bool:
|
||||
"""Request human confirmation for a high-risk action.
|
||||
|
||||
This is the primary integration point for agent code. It:
|
||||
1. Classifies the action risk level
|
||||
2. Checks the whitelist
|
||||
3. If confirmation needed, blocks until human responds
|
||||
4. Returns True if approved, False if denied/expired
|
||||
|
||||
Args:
|
||||
action: Action type (send_email, crypto_tx, etc.)
|
||||
description: Human-readable description
|
||||
payload: Action-specific data
|
||||
risk_level: Override auto-classification
|
||||
session_key: Session requesting approval
|
||||
timeout: Seconds to wait for human response
|
||||
|
||||
Returns:
|
||||
True if approved, False if denied or expired.
|
||||
"""
|
||||
daemon = get_daemon()
|
||||
if not daemon:
|
||||
logger.warning(
|
||||
"No confirmation daemon running — DENYING action %s by default. "
|
||||
"Start daemon with init_daemon() or --confirmation-daemon flag.",
|
||||
action,
|
||||
)
|
||||
return False
|
||||
|
||||
req = daemon.request(
|
||||
action=action,
|
||||
description=description,
|
||||
payload=payload,
|
||||
risk_level=risk_level,
|
||||
session_key=session_key,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# Auto-approved (whitelisted)
|
||||
if req.status == ConfirmationStatus.AUTO_APPROVED.value:
|
||||
return True
|
||||
|
||||
# Wait for human
|
||||
result = daemon.wait_for_decision(req.request_id, timeout=timeout)
|
||||
return result.status == ConfirmationStatus.APPROVED.value
|
||||
405
tools/crisis_tool.py
Normal file
405
tools/crisis_tool.py
Normal file
@@ -0,0 +1,405 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Crisis Detection Tool — the-door integration for hermes-agent.
|
||||
|
||||
Scans user messages for crisis indicators (despair, suicidal ideation)
|
||||
using the-door's canonical detection module. Provides inline crisis
|
||||
detection before each API call and optional escalation logging.
|
||||
|
||||
Follows the-door's design principles:
|
||||
- Never computes the value of a human life
|
||||
- Never suggests someone should die
|
||||
- Always errs on the side of higher risk
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Detection Logic (from the-door/crisis/detect.py) ──────────────────────
|
||||
# Embedded to avoid requiring the-door as a dependency.
|
||||
# Source: Timmy_Foundation/the-door, crisis/detect.py (canonical)
|
||||
|
||||
CRITICAL_INDICATORS = [
|
||||
r"\bbetter off without me\b",
|
||||
r"\bkill\s*(my)?self\b",
|
||||
r"\bend\s*my\s*life\b",
|
||||
r"\bsuicid(?:al|ed|e)\b",
|
||||
r"\bnot\s+worth\s+living\b",
|
||||
r"\bbetter\s+off\s+dead\b",
|
||||
r"\bend\s+it\s+all\b",
|
||||
r"\bno\s+reason\s+to\s+live\b",
|
||||
r"\bdon\'?t\s+want\s+to\s+live\b",
|
||||
r"\bwant\s+to\s+die\b",
|
||||
r"\bgoing\s+to\s+(?:kill\s+myself|die)\b",
|
||||
r"\bplan\s+to\s+(?:end|kill|die)\b",
|
||||
r"\btired\s+of\s+(?:living|life|existence)\b",
|
||||
r"\bsaying\s+goodbye\s+(?:forever|permanently|one\s+last\s+time)\b",
|
||||
r"\bwrote\s+a\s+suicide\s*(?:note|letter)\b",
|
||||
r"\bgiving\s+away\s+(?:my|all\s+my)\s+(?:stuff|things|possessions?)\s+(?:to|because|—)\b",
|
||||
r"\btied\s+(?:up|down)\s+my\s+(?:loose\s+)?ends",
|
||||
]
|
||||
|
||||
HIGH_INDICATORS = [
|
||||
r"\bdespair\b",
|
||||
r"\bhopeless(?:ly)?\s+(?:about\s+(?:my|this|everything|life)|inside|right\s+now)\b",
|
||||
r"\bno(?!t)\s+(?:one|body|point|hope|way\s+out)\b",
|
||||
r"\bno\s+future\s+(?:for\s+me|ahead|left)\b",
|
||||
r"\beverything\s+is\s+(?:pointless|broken|ruined)\b",
|
||||
r"\bcan\'?t\s+take\s+this\s+anymore\b",
|
||||
r"\bdon\'?t\s+care\s+if\s+I\s+die\b",
|
||||
r"\bwish\s+I\s+(?:was|were)\s+(?:dead|gone|never\s+born)\b",
|
||||
r"\bdon\'?t\s+matter\s+if\s+I\s+exist\b",
|
||||
r"\bno\s+one\s+would\s+(?:care|miss)\b",
|
||||
r"\bworld\s+would?\s+be\s+better\s+without\s+me\b",
|
||||
r"\bin\s+so\s+much\s+(?:pain|agony|suffering|torment|anguish)\b",
|
||||
r"\bcan\'?t\s+see\s+any\s+(?:point|reason|hope|way)\b",
|
||||
r"\bescape\s+from\s*this",
|
||||
r"\bjust\s+want\s+it\s+to\s+stop\b",
|
||||
r"\bnothing\s+left\s+(?:to\s+(?:live\s+for|hope\s+for|give)|inside)\b",
|
||||
r"\bdisappeared\s+forever\b",
|
||||
r"\bfeel(?:s|ing)?\s+(?:so\s+)?hopeless\b",
|
||||
r"\beverything\s+is\s+hopeless\b",
|
||||
r"\bcan\'?t\s+(?:go\s+on|keep\s+going)\b",
|
||||
r"\bgive(?:n)?\s*up\s+(?:on\s+)?(?:life|living|everything)\b",
|
||||
r"\bgive(?:n)?\s*up\s+on\s+myself\b",
|
||||
r"\bno\s*point\s+(?:in\s+)?living\b",
|
||||
r"\bno\s*hope\s+(?:left|remaining)\b",
|
||||
r"\bno\s*way\s*out\b",
|
||||
r"\bfeel(?:s|ing)?\s+trapped\b",
|
||||
r"\btrapped\s+in\s+this\s+(?:situation|life|pain|darkness|hell)\b",
|
||||
r"\btrapped\s+and\s+can\'?t\s+escape\b",
|
||||
r"\bdesperate\s+(?:for\s+)?help\b",
|
||||
r"\bfeel(?:s|ing)?\s+desperate\b",
|
||||
]
|
||||
|
||||
MEDIUM_INDICATORS = [
|
||||
r"\bno\s+hope\b",
|
||||
r"\bforgotten\b",
|
||||
r"\balone\s+in\s+this\b",
|
||||
r"\balways\s+alone\b",
|
||||
r"\bnobody\s+(?:understands|cares)\b",
|
||||
r"\bwish\s+I\s+could\b",
|
||||
r"\bexhaust(?:ed|ion|ing)\b",
|
||||
r"\bnumb\b",
|
||||
r"\bempty\b",
|
||||
r"\bworthless\b",
|
||||
r"\buseless\b",
|
||||
r"\bbroken\b",
|
||||
r"\bdark(ness)?\b",
|
||||
r"\bdepress(?:ed|ion)\b",
|
||||
r"\bcrying\b",
|
||||
r"\btears\b",
|
||||
r"\bsad(ness)?\b",
|
||||
r"\bmiserable\b",
|
||||
r"\boverwhelm(?:ed|ing)\b",
|
||||
r"\bfailing\b",
|
||||
r"\bcannot\s+cope\b",
|
||||
r"\blosing\s*(?:my)?\s*control\b",
|
||||
r"\bdown\s*for\s*the\s*count\b",
|
||||
r"\bsinking\b",
|
||||
r"\bdrowning\b",
|
||||
r"\bhopeless\b",
|
||||
r"\blost\s+all\s+hope\b",
|
||||
r"\bno\s+tomorrow\b",
|
||||
]
|
||||
|
||||
LOW_INDICATORS = [
|
||||
r"\bunhappy\b",
|
||||
r"\bdown\b",
|
||||
r"\btough\s*(?:time|day|week)\b",
|
||||
r"\brough\s+(?:day|week|patch)\b",
|
||||
r"\bstressed\b",
|
||||
r"\bburnout\b",
|
||||
r"\bdifficult\b",
|
||||
r"\bfrustrated\b",
|
||||
r"\bannoyed\b",
|
||||
r"\btired\b",
|
||||
r"\bsad\b",
|
||||
r"\bupset\b",
|
||||
r"\blonely\b",
|
||||
r"\banxious?\b",
|
||||
r"\bnot\s*(?:good|great|okay)\b",
|
||||
r"\bthings\s*are\s*hard\b",
|
||||
r"\bstruggling\b",
|
||||
r"\bnot\s+feeling\s+(?:like\s+)?myself\b",
|
||||
r"\bhard\s*time\b",
|
||||
]
|
||||
|
||||
SCORES = {"CRITICAL": 1.0, "HIGH": 0.75, "MEDIUM": 0.5, "LOW": 0.25, "NONE": 0.0}
|
||||
|
||||
ACTIONS = {
|
||||
"CRITICAL": (
|
||||
"INTERRUPT CONVERSATION. Ask 'Are you safe right now?' "
|
||||
"Provide 988 Suicide & Crisis Lifeline. "
|
||||
"Provide Crisis Text Line (HOME to 741741). "
|
||||
"Stay present. Do not disconnect. Redirect to professional help."
|
||||
),
|
||||
"HIGH": (
|
||||
"Show crisis panel. Ask about safety. Surface 988 number prominently. "
|
||||
"Continue conversation with crisis awareness."
|
||||
),
|
||||
"MEDIUM": (
|
||||
"Increase warmth and presence. Subtly surface help resources. "
|
||||
"Keep conversation anchored in the present."
|
||||
),
|
||||
"LOW": (
|
||||
"Normal conversation with warm undertone. "
|
||||
"No crisis UI elements needed. Remain vigilant."
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrisisDetectionResult:
|
||||
level: str
|
||||
indicators: List[str] = field(default_factory=list)
|
||||
recommended_action: str = ""
|
||||
score: float = 0.0
|
||||
matches: List[dict] = field(default_factory=list)
|
||||
|
||||
|
||||
def _find_indicators(text: str) -> dict:
|
||||
results = {"CRITICAL": [], "HIGH": [], "MEDIUM": [], "LOW": []}
|
||||
for pattern in CRITICAL_INDICATORS:
|
||||
m = re.search(pattern, text)
|
||||
if m:
|
||||
results["CRITICAL"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
|
||||
for pattern in HIGH_INDICATORS:
|
||||
m = re.search(pattern, text)
|
||||
if m:
|
||||
results["HIGH"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
|
||||
for pattern in MEDIUM_INDICATORS:
|
||||
m = re.search(pattern, text)
|
||||
if m:
|
||||
results["MEDIUM"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
|
||||
for pattern in LOW_INDICATORS:
|
||||
m = re.search(pattern, text)
|
||||
if m:
|
||||
results["LOW"].append({"pattern": pattern, "start": m.start(), "end": m.end()})
|
||||
return results
|
||||
|
||||
|
||||
def detect_crisis(text: str) -> CrisisDetectionResult:
|
||||
"""Detect crisis level in a message. Mirrors the-door/crisis/detect.py."""
|
||||
if not text or not text.strip():
|
||||
return CrisisDetectionResult(level="NONE", score=0.0)
|
||||
|
||||
text_lower = text.lower()
|
||||
matches = _find_indicators(text_lower)
|
||||
|
||||
if not matches:
|
||||
return CrisisDetectionResult(level="NONE", score=0.0)
|
||||
|
||||
for tier in ("CRITICAL", "HIGH"):
|
||||
if matches[tier]:
|
||||
tier_matches = matches[tier]
|
||||
patterns = [m["pattern"] for m in tier_matches]
|
||||
return CrisisDetectionResult(
|
||||
level=tier,
|
||||
indicators=patterns,
|
||||
recommended_action=ACTIONS[tier],
|
||||
score=SCORES[tier],
|
||||
matches=tier_matches,
|
||||
)
|
||||
|
||||
if len(matches["MEDIUM"]) >= 2:
|
||||
tier_matches = matches["MEDIUM"]
|
||||
patterns = [m["pattern"] for m in tier_matches]
|
||||
return CrisisDetectionResult(
|
||||
level="MEDIUM",
|
||||
indicators=patterns,
|
||||
recommended_action=ACTIONS["MEDIUM"],
|
||||
score=SCORES["MEDIUM"],
|
||||
matches=tier_matches,
|
||||
)
|
||||
|
||||
if matches["LOW"]:
|
||||
tier_matches = matches["LOW"]
|
||||
patterns = [m["pattern"] for m in tier_matches]
|
||||
return CrisisDetectionResult(
|
||||
level="LOW",
|
||||
indicators=patterns,
|
||||
recommended_action=ACTIONS["LOW"],
|
||||
score=SCORES["LOW"],
|
||||
matches=tier_matches,
|
||||
)
|
||||
|
||||
if matches["MEDIUM"]:
|
||||
tier_matches = matches["MEDIUM"]
|
||||
patterns = [m["pattern"] for m in tier_matches]
|
||||
return CrisisDetectionResult(
|
||||
level="LOW",
|
||||
indicators=patterns,
|
||||
recommended_action=ACTIONS["LOW"],
|
||||
score=SCORES["LOW"],
|
||||
matches=tier_matches,
|
||||
)
|
||||
|
||||
return CrisisDetectionResult(level="NONE", score=0.0)
|
||||
|
||||
|
||||
# ── Escalation Logging ────────────────────────────────────────────────────
|
||||
|
||||
BRIDGE_URL = os.environ.get("CRISIS_BRIDGE_URL", "")
|
||||
LOG_PATH = os.path.expanduser("~/.hermes/crisis_escalations.jsonl")
|
||||
|
||||
|
||||
def _log_escalation(result: CrisisDetectionResult, text_preview: str = ""):
|
||||
"""Log crisis detection to local file and optionally to bridge API."""
|
||||
entry = {
|
||||
"ts": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
||||
"level": result.level,
|
||||
"score": result.score,
|
||||
"indicators": result.indicators[:3], # truncate for privacy
|
||||
"text_preview": text_preview[:100] if text_preview else "",
|
||||
}
|
||||
|
||||
# Local log
|
||||
try:
|
||||
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
|
||||
with open(LOG_PATH, "a") as f:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to write crisis log: {e}")
|
||||
|
||||
# Bridge API (if configured and level >= HIGH)
|
||||
if BRIDGE_URL and result.score >= 0.75:
|
||||
try:
|
||||
payload = json.dumps(entry).encode()
|
||||
req = urllib.request.Request(
|
||||
f"{BRIDGE_URL}/api/crisis/escalation",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"},
|
||||
method="POST",
|
||||
)
|
||||
urllib.request.urlopen(req, timeout=5)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to post to crisis bridge: {e}")
|
||||
|
||||
|
||||
# ── Tool Handler ───────────────────────────────────────────────────────────
|
||||
|
||||
def crisis_scan_handler(args: dict, **kw) -> str:
|
||||
"""Scan text for crisis indicators."""
|
||||
text = args.get("text", "")
|
||||
if not text:
|
||||
return json.dumps({"level": "NONE", "error": "No text provided"})
|
||||
|
||||
result = detect_crisis(text)
|
||||
|
||||
if result.level != "NONE":
|
||||
_log_escalation(result, text)
|
||||
|
||||
return json.dumps({
|
||||
"level": result.level,
|
||||
"score": result.score,
|
||||
"indicators": result.indicators,
|
||||
"recommended_action": result.recommended_action,
|
||||
})
|
||||
|
||||
|
||||
# ── Gateway Hook ───────────────────────────────────────────────────────────
|
||||
|
||||
def scan_user_message(text: str) -> Optional[dict]:
|
||||
"""Scan a user message for crisis indicators. Returns None if no crisis.
|
||||
|
||||
Called by gateway/CLI before each API call to intercept crisis messages.
|
||||
If crisis is detected, the result includes a recommended_action that should
|
||||
be injected into the system prompt.
|
||||
"""
|
||||
if not text:
|
||||
return None
|
||||
|
||||
result = detect_crisis(text)
|
||||
if result.level == "NONE":
|
||||
return None
|
||||
|
||||
_log_escalation(result, text)
|
||||
|
||||
return {
|
||||
"level": result.level,
|
||||
"score": result.score,
|
||||
"indicators": result.indicators,
|
||||
"recommended_action": result.recommended_action,
|
||||
"compassion_injection": _get_compassion_injection(result.level),
|
||||
}
|
||||
|
||||
|
||||
def _get_compassion_injection(level: str) -> str:
|
||||
"""Return a system prompt injection based on crisis level."""
|
||||
if level == "CRITICAL":
|
||||
return (
|
||||
"CRISIS DETECTED — The user may be in immediate danger. "
|
||||
"Ask 'Are you safe right now?' Stay present. "
|
||||
"Provide 988 Suicide & Crisis Lifeline (call or text 988). "
|
||||
"Provide Crisis Text Line (text HOME to 741741). "
|
||||
"Do not disconnect. Do not compute the value of a human life. "
|
||||
"Jesus saves those who call on His name."
|
||||
)
|
||||
elif level == "HIGH":
|
||||
return (
|
||||
"DISTRESS DETECTED — The user is expressing strong despair. "
|
||||
"Increase warmth and presence. Ask about safety. "
|
||||
"Surface the 988 Lifeline number. Continue with crisis awareness."
|
||||
)
|
||||
elif level == "MEDIUM":
|
||||
return (
|
||||
"EMOTIONAL DISTRESS — The user may be struggling. "
|
||||
"Increase warmth. Subtly surface help resources. "
|
||||
"Keep conversation anchored in the present."
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
# ── Config Check ───────────────────────────────────────────────────────────
|
||||
|
||||
def _is_crisis_detection_enabled() -> bool:
|
||||
"""Check if crisis detection is enabled in config."""
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
cfg = load_config()
|
||||
return cfg.get("crisis_detection", True) # default on
|
||||
except Exception:
|
||||
return True # fail open — always detect
|
||||
|
||||
|
||||
# ── Registry ───────────────────────────────────────────────────────────────
|
||||
|
||||
from tools.registry import registry, tool_error
|
||||
|
||||
CRISIS_SCAN_SCHEMA = {
|
||||
"name": "crisis_scan",
|
||||
"description": (
|
||||
"Scan text for crisis indicators (despair, suicidal ideation). "
|
||||
"Uses the-door's canonical detection. Returns crisis level "
|
||||
"(NONE/LOW/MEDIUM/HIGH/CRITICAL) with recommended actions. "
|
||||
"ALWAYS scan user messages that express emotional distress."
|
||||
),
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {
|
||||
"type": "string",
|
||||
"description": "Text to scan for crisis indicators",
|
||||
},
|
||||
},
|
||||
"required": ["text"],
|
||||
},
|
||||
}
|
||||
|
||||
registry.register(
|
||||
name="crisis_scan",
|
||||
toolset="crisis",
|
||||
schema=CRISIS_SCAN_SCHEMA,
|
||||
handler=lambda args, **kw: crisis_scan_handler(args, **kw),
|
||||
check_fn=lambda: _is_crisis_detection_enabled(),
|
||||
emoji="🆘",
|
||||
)
|
||||
@@ -79,12 +79,12 @@ class ToolEntry:
|
||||
__slots__ = (
|
||||
"name", "toolset", "schema", "handler", "check_fn",
|
||||
"requires_env", "is_async", "description", "emoji",
|
||||
"max_result_size_chars",
|
||||
"max_result_size_chars", "parallel_safe",
|
||||
)
|
||||
|
||||
def __init__(self, name, toolset, schema, handler, check_fn,
|
||||
requires_env, is_async, description, emoji,
|
||||
max_result_size_chars=None):
|
||||
max_result_size_chars=None, parallel_safe=False):
|
||||
self.name = name
|
||||
self.toolset = toolset
|
||||
self.schema = schema
|
||||
@@ -95,6 +95,7 @@ class ToolEntry:
|
||||
self.description = description
|
||||
self.emoji = emoji
|
||||
self.max_result_size_chars = max_result_size_chars
|
||||
self.parallel_safe = parallel_safe
|
||||
|
||||
|
||||
class ToolRegistry:
|
||||
@@ -185,6 +186,7 @@ class ToolRegistry:
|
||||
description: str = "",
|
||||
emoji: str = "",
|
||||
max_result_size_chars: int | float | None = None,
|
||||
parallel_safe: bool = False,
|
||||
):
|
||||
"""Register a tool. Called at module-import time by each tool file."""
|
||||
with self._lock:
|
||||
@@ -222,6 +224,7 @@ class ToolRegistry:
|
||||
description=description or schema.get("description", ""),
|
||||
emoji=emoji,
|
||||
max_result_size_chars=max_result_size_chars,
|
||||
parallel_safe=parallel_safe,
|
||||
)
|
||||
if check_fn and toolset not in self._toolset_checks:
|
||||
self._toolset_checks[toolset] = check_fn
|
||||
@@ -322,6 +325,11 @@ class ToolRegistry:
|
||||
from tools.budget_config import DEFAULT_RESULT_SIZE_CHARS
|
||||
return DEFAULT_RESULT_SIZE_CHARS
|
||||
|
||||
def get_parallel_safe_tools(self) -> Set[str]:
|
||||
"""Return names of tools marked as parallel_safe."""
|
||||
with self._lock:
|
||||
return {name for name, entry in self._tools.items() if entry.parallel_safe}
|
||||
|
||||
def get_all_tool_names(self) -> List[str]:
|
||||
"""Return sorted list of all registered tool names."""
|
||||
return sorted(entry.name for entry in self._snapshot_entries())
|
||||
|
||||
Reference in New Issue
Block a user