Compare commits

..

2 Commits

Author SHA1 Message Date
48c3466540 fix: resolve merge conflict — rebase Vitalik patterns onto current main
PR #397 had merge conflicts in tools/approval.py because main diverged.
This commit brings approval.py up to date with main and adds the
Vitalik security patterns in the correct location.
2026-04-16 01:33:11 +00:00
Alexander Whitestone
34e646ad6d feat(security): implement Vitalik's secure LLM patterns — privacy filter + confirmation daemon
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m17s
Implements design patterns from Vitalik Buterin's 'Secure LLM Architecture'
(2026-04-02). Closes #280.

Three components:

1. agent/privacy_filter.py — Strip PII from context before remote API calls.
   - 16 regex patterns: emails, phones, API keys, tokens, crypto addresses,
     credit cards, SSNs, file paths, internal IPs
   - Sensitivity tiers: PUBLIC < LOW < MEDIUM < HIGH < CRITICAL
   - should_use_local_only() blocks remote calls when secrets detected
   - sanitize_messages() integrates with OpenAI-format message pipeline
   - Aggressive mode for paranoid filtering (includes IPs, file paths)
   - 21 tests (all passing)

2. tools/confirmation_daemon.py — Human Confirmation Firewall.
   HTTP daemon on port 6000 implementing Vitalik's two-factor pattern:
   'The two factors are the human and the LLM.'
   - POST /confirm — agent requests approval for high-risk actions
   - POST /respond — human approves/denies
   - GET  /pending — list waiting requests
   - Action risk classification: LOW/MEDIUM/HIGH/CRITICAL
   - Whitelist auto-approval for trusted contacts
   - Categories: messaging, crypto, system modification, data access, network
   - request_confirmation() API for agent integration
   - 18 tests (all passing)

3. tools/approval.py — Extended dangerous patterns for Vitalik's threat model:
   - Crypto transaction detection (bitcoin-cli, ethers.js, web3)
   - Credential exfiltration detection (curl/wget with .env, .ssh, tokens)
   - Data exfiltration detection (sending user data to remote)

Test results: 42 new tests passing, 99/100 existing approval tests passing
(1 pre-existing flaky threading test unrelated to changes).
2026-04-13 18:10:26 -04:00
10 changed files with 1468 additions and 612 deletions

View File

@@ -309,19 +309,7 @@ class MemoryManager:
"""Notify external providers when the built-in memory tool writes.
Skips the builtin provider itself (it's the source of the write).
Passes current MEMORY.md entries for cross-tier dedup checking.
"""
# Collect current memory entries for dedup context
memory_entries = []
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store") and provider._store:
try:
store = provider._store
if hasattr(store, "get_all_entries"):
memory_entries = store.get_all_entries(target)
except Exception:
pass
for provider in self._providers:
if provider.name == "builtin":
continue
@@ -333,54 +321,6 @@ class MemoryManager:
provider.name, e,
)
def run_dedup_scan(self) -> dict:
"""Run cross-tier deduplication scan across all memory providers.
Returns a report dict with duplicates found and actions taken.
"""
report = {"status": "ok", "duplicates": 0, "actions": []}
# Collect MEMORY.md entries
memory_entries = []
builtin_store = None
for provider in self._providers:
if provider.name == "builtin" and hasattr(provider, "_store"):
builtin_store = provider._store
if builtin_store:
try:
entries = builtin_store.get_all_entries("memory")
memory_entries = entries if entries else []
except Exception:
pass
if not memory_entries:
report["status"] = "no_memory_entries"
return report
# Check each external provider for duplicates
for provider in self._providers:
if provider.name == "builtin":
continue
if not hasattr(provider, "_store") or not provider._store:
continue
try:
from plugins.memory.holographic.dedup import scan_cross_tier_duplicates
all_facts = provider._store.list_facts(min_trust=0.0, limit=1000)
dup_report = scan_cross_tier_duplicates(memory_entries, all_facts)
report["duplicates"] += dup_report.duplicates_found
if dup_report.duplicates_found > 0:
from plugins.memory.holographic.dedup import resolve_duplicates
cleaned = resolve_duplicates(dup_report, memory_entries, provider._store)
removed = len(memory_entries) - len(cleaned)
report["actions"].append(
f"{provider.name}: {dup_report.duplicates_found} duplicates, "
f"{removed} MEMORY.md entries removed"
)
except Exception as e:
logger.warning("Dedup scan failed for provider '%s': %s", provider.name, e)
return report
def on_delegation(self, task: str, result: str, *,
child_session_id: str = "", **kwargs) -> None:
"""Notify all providers that a subagent completed."""

353
agent/privacy_filter.py Normal file
View File

@@ -0,0 +1,353 @@
"""Privacy Filter — strip PII from context before remote API calls.
Implements Vitalik's Pattern 2: "A local model can strip out private data
before passing the query along to a remote LLM."
When Hermes routes a request to a cloud provider (Anthropic, OpenRouter, etc.),
this module sanitizes the message context to remove personally identifiable
information before it leaves the user's machine.
Threat model (from Vitalik's secure LLM architecture):
- Privacy (other): Non-LLM data leakage via search queries, API calls
- LLM accidents: LLM accidentally leaking private data in prompts
- LLM jailbreaks: Remote content extracting private context
Usage:
from agent.privacy_filter import PrivacyFilter, sanitize_messages
pf = PrivacyFilter()
safe_messages = pf.sanitize_messages(messages)
# safe_messages has PII replaced with [REDACTED] tokens
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class Sensitivity(Enum):
"""Classification of content sensitivity."""
PUBLIC = auto() # No PII detected
LOW = auto() # Generic references (e.g., city names)
MEDIUM = auto() # Personal identifiers (name, email, phone)
HIGH = auto() # Secrets, keys, financial data, medical info
CRITICAL = auto() # Crypto keys, passwords, SSN patterns
@dataclass
class RedactionReport:
"""Summary of what was redacted from a message batch."""
total_messages: int = 0
redacted_messages: int = 0
redactions: List[Dict[str, Any]] = field(default_factory=list)
max_sensitivity: Sensitivity = Sensitivity.PUBLIC
@property
def had_redactions(self) -> bool:
return self.redacted_messages > 0
def summary(self) -> str:
if not self.had_redactions:
return "No PII detected — context is clean for remote query."
parts = [f"Redacted {self.redacted_messages}/{self.total_messages} messages:"]
for r in self.redactions[:10]:
parts.append(f" - {r['type']}: {r['count']} occurrence(s)")
if len(self.redactions) > 10:
parts.append(f" ... and {len(self.redactions) - 10} more types")
return "\n".join(parts)
# =========================================================================
# PII pattern definitions
# =========================================================================
# Each pattern is (compiled_regex, redaction_type, sensitivity_level, replacement)
_PII_PATTERNS: List[Tuple[re.Pattern, str, Sensitivity, str]] = []
def _compile_patterns() -> None:
"""Compile PII detection patterns. Called once at module init."""
global _PII_PATTERNS
if _PII_PATTERNS:
return
raw_patterns = [
# --- CRITICAL: secrets and credentials ---
(
r'(?:api[_-]?key|apikey|secret[_-]?key|access[_-]?token)\s*[:=]\s*["\']?([A-Za-z0-9_\-\.]{20,})["\']?',
"api_key_or_token",
Sensitivity.CRITICAL,
"[REDACTED-API-KEY]",
),
(
r'\b(?:sk-|sk_|pk_|rk_|ak_)[A-Za-z0-9]{20,}\b',
"prefixed_secret",
Sensitivity.CRITICAL,
"[REDACTED-SECRET]",
),
(
r'\b(?:ghp_|gho_|ghu_|ghs_|ghr_)[A-Za-z0-9]{36,}\b',
"github_token",
Sensitivity.CRITICAL,
"[REDACTED-GITHUB-TOKEN]",
),
(
r'\b(?:xox[bposa]-[A-Za-z0-9\-]+)\b',
"slack_token",
Sensitivity.CRITICAL,
"[REDACTED-SLACK-TOKEN]",
),
(
r'(?:password|passwd|pwd)\s*[:=]\s*["\']?([^\s"\']{4,})["\']?',
"password",
Sensitivity.CRITICAL,
"[REDACTED-PASSWORD]",
),
(
r'(?:-----BEGIN (?:RSA |EC |OPENSSH )?PRIVATE KEY-----)',
"private_key_block",
Sensitivity.CRITICAL,
"[REDACTED-PRIVATE-KEY]",
),
# Ethereum / crypto addresses (42-char hex starting with 0x)
(
r'\b0x[a-fA-F0-9]{40}\b',
"ethereum_address",
Sensitivity.HIGH,
"[REDACTED-ETH-ADDR]",
),
# Bitcoin addresses (base58, 25-34 chars starting with 1/3/bc1)
(
r'\b[13][a-km-zA-HJ-NP-Z1-9]{25,34}\b',
"bitcoin_address",
Sensitivity.HIGH,
"[REDACTED-BTC-ADDR]",
),
(
r'\bbc1[a-zA-HJ-NP-Z0-9]{39,59}\b',
"bech32_address",
Sensitivity.HIGH,
"[REDACTED-BTC-ADDR]",
),
# --- HIGH: financial ---
(
r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
"credit_card_number",
Sensitivity.HIGH,
"[REDACTED-CC]",
),
(
r'\b\d{3}-\d{2}-\d{4}\b',
"us_ssn",
Sensitivity.HIGH,
"[REDACTED-SSN]",
),
# --- MEDIUM: personal identifiers ---
# Email addresses
(
r'\b[A-Za-z0-9._%+\-]+@[A-Za-z0-9.\-]+\.[A-Za-z]{2,}\b',
"email_address",
Sensitivity.MEDIUM,
"[REDACTED-EMAIL]",
),
# Phone numbers (US/international patterns)
(
r'\b(?:\+?1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b',
"phone_number_us",
Sensitivity.MEDIUM,
"[REDACTED-PHONE]",
),
(
r'\b\+\d{1,3}[-.\s]?\d{4,14}\b',
"phone_number_intl",
Sensitivity.MEDIUM,
"[REDACTED-PHONE]",
),
# Filesystem paths that reveal user identity
(
r'(?:/Users/|/home/|C:\\Users\\)([A-Za-z0-9_\-]+)',
"user_home_path",
Sensitivity.MEDIUM,
r"/Users/[REDACTED-USER]",
),
# --- LOW: environment / system info ---
# Internal IPs
(
r'\b(?:10\.\d{1,3}\.\d{1,3}\.\d{1,3}|172\.(?:1[6-9]|2\d|3[01])\.\d{1,3}\.\d{1,3}|192\.168\.\d{1,3}\.\d{1,3})\b',
"internal_ip",
Sensitivity.LOW,
"[REDACTED-IP]",
),
]
_PII_PATTERNS = [
(re.compile(pattern, re.IGNORECASE), rtype, sensitivity, replacement)
for pattern, rtype, sensitivity, replacement in raw_patterns
]
_compile_patterns()
# =========================================================================
# Sensitive file path patterns (context-aware)
# =========================================================================
_SENSITIVE_PATH_PATTERNS = [
re.compile(r'\.(?:env|pem|key|p12|pfx|jks|keystore)\b', re.IGNORECASE),
re.compile(r'(?:\.ssh/|\.gnupg/|\.aws/|\.config/gcloud/)', re.IGNORECASE),
re.compile(r'(?:wallet|keystore|seed|mnemonic)', re.IGNORECASE),
re.compile(r'(?:\.hermes/\.env)', re.IGNORECASE),
]
def _classify_path_sensitivity(path: str) -> Sensitivity:
"""Check if a file path references sensitive material."""
for pat in _SENSITIVE_PATH_PATTERNS:
if pat.search(path):
return Sensitivity.HIGH
return Sensitivity.PUBLIC
# =========================================================================
# Core filtering
# =========================================================================
class PrivacyFilter:
"""Strip PII from message context before remote API calls.
Integrates with the agent's message pipeline. Call sanitize_messages()
before sending context to any cloud LLM provider.
"""
def __init__(
self,
min_sensitivity: Sensitivity = Sensitivity.MEDIUM,
aggressive_mode: bool = False,
):
"""
Args:
min_sensitivity: Only redact PII at or above this level.
Default MEDIUM — redacts emails, phones, paths but not IPs.
aggressive_mode: If True, also redact file paths and internal IPs.
"""
self.min_sensitivity = (
Sensitivity.LOW if aggressive_mode else min_sensitivity
)
self.aggressive_mode = aggressive_mode
def sanitize_text(self, text: str) -> Tuple[str, List[Dict[str, Any]]]:
"""Sanitize a single text string. Returns (cleaned_text, redaction_list)."""
redactions = []
cleaned = text
for pattern, rtype, sensitivity, replacement in _PII_PATTERNS:
if sensitivity.value < self.min_sensitivity.value:
continue
matches = pattern.findall(cleaned)
if matches:
count = len(matches) if isinstance(matches[0], str) else sum(
1 for m in matches if m
)
if count > 0:
cleaned = pattern.sub(replacement, cleaned)
redactions.append({
"type": rtype,
"sensitivity": sensitivity.name,
"count": count,
})
return cleaned, redactions
def sanitize_messages(
self, messages: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], RedactionReport]:
"""Sanitize a list of OpenAI-format messages.
Returns (safe_messages, report). System messages are NOT sanitized
(they're typically static prompts). Only user and assistant messages
with string content are processed.
Args:
messages: List of {"role": ..., "content": ...} dicts.
Returns:
Tuple of (sanitized_messages, redaction_report).
"""
report = RedactionReport(total_messages=len(messages))
safe_messages = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content", "")
# Only sanitize user/assistant string content
if role in ("user", "assistant") and isinstance(content, str) and content:
cleaned, redactions = self.sanitize_text(content)
if redactions:
report.redacted_messages += 1
report.redactions.extend(redactions)
# Track max sensitivity
for r in redactions:
s = Sensitivity[r["sensitivity"]]
if s.value > report.max_sensitivity.value:
report.max_sensitivity = s
safe_msg = {**msg, "content": cleaned}
safe_messages.append(safe_msg)
logger.info(
"Privacy filter: redacted %d PII type(s) from %s message",
len(redactions), role,
)
else:
safe_messages.append(msg)
else:
safe_messages.append(msg)
return safe_messages, report
def should_use_local_only(self, text: str) -> Tuple[bool, str]:
"""Determine if content is too sensitive for any remote call.
Returns (should_block, reason). If True, the content should only
be processed by a local model.
"""
_, redactions = self.sanitize_text(text)
critical_count = sum(
1 for r in redactions
if Sensitivity[r["sensitivity"]] == Sensitivity.CRITICAL
)
high_count = sum(
1 for r in redactions
if Sensitivity[r["sensitivity"]] == Sensitivity.HIGH
)
if critical_count > 0:
return True, f"Contains {critical_count} critical-secret pattern(s) — local-only"
if high_count >= 3:
return True, f"Contains {high_count} high-sensitivity pattern(s) — local-only"
return False, ""
def sanitize_messages(
messages: List[Dict[str, Any]],
min_sensitivity: Sensitivity = Sensitivity.MEDIUM,
aggressive: bool = False,
) -> Tuple[List[Dict[str, Any]], RedactionReport]:
"""Convenience function: sanitize messages with default settings."""
pf = PrivacyFilter(min_sensitivity=min_sensitivity, aggressive_mode=aggressive)
return pf.sanitize_messages(messages)
def quick_sanitize(text: str) -> str:
"""Quick sanitize a single string — returns cleaned text only."""
pf = PrivacyFilter()
cleaned, _ = pf.sanitize_text(text)
return cleaned

View File

@@ -1,91 +0,0 @@
# Memory Tier Ownership
Each fact lives in exactly one tier. This prevents duplicate tokens on every
prompt injection and eliminates stale-data divergence when one copy is updated
but not the other.
## Tier 1 — MEMORY.md (Built-in)
**Purpose:** Always-on system prompt context — compact, high-signal.
**Contains:**
- Operational notes and active task state
- Immediate context the agent needs every turn
- User preferences that affect agent behavior
**Constraints:**
- Keep under 50 entries (every byte costs prompt tokens)
- Entries >100 chars should migrate to the fact store
- Managed via the `memory` tool (add/replace/remove)
**Examples:**
- "Default model: mimo-v2-pro/Nous"
- "Alexander prefers action over narration"
- "Deploy via Ansible; wants one-command redeploy"
## Tier 2 — Fact Store (Holographic)
**Purpose:** Deep structured storage with search, reasoning, and trust scoring.
**Contains:**
- `user_pref` — User preferences and habits
- `project` — Project-specific facts and conventions
- `tool` — Tool quirks, API behaviors, environment details
- `general` — Everything else worth remembering
**Advantages over MEMORY.md:**
- FTS5 full-text search
- Entity resolution (link facts to people/projects/tools)
- Trust scoring (good facts rise, bad facts sink)
- Compositional reasoning (`reason` across multiple entities)
- Duplicate detection (UNIQUE constraint + similarity matching)
- Unlimited size
**Managed via:** `fact_store` tool (add/search/probe/related/reason/contradict/update/remove/list)
## Tier 3 — MemPalace
**Purpose:** Specialized long-form archives and multi-session research.
**Contains:**
- Detailed analysis and research notes
- Multi-session task context
- Structured "palace rooms" for domain-specific knowledge
## Migration Rules
| Condition | Destination |
|-----------|------------|
| Entry >100 chars | → fact store |
| Category is `user_pref`, `project`, `tool` | → fact store |
| Needs entity linking | → fact store |
| Needs trust scoring | → fact store |
| Short operational note (<80 chars) | → MEMORY.md |
| Always-on context | → MEMORY.md |
| When in doubt | → fact store |
## Cross-Tier Deduplication
**Problem:** The `on_memory_write` bridge mirrors MEMORY.md writes to the fact
store. Without dedup, the same fact exists in both tiers — wasting tokens and
risking stale data.
**Solution:**
1. `on_memory_write` checks the fact store for similar entries before mirroring
2. Similarity threshold: 0.85 (catches rephrasings, avoids false positives)
3. If duplicate found: skip the mirror (fact store entry is canonical)
4. `dedup` action on `fact_store` tool: runtime scan + auto-resolve
5. `MemoryManager.run_dedup_scan()`: programmatic cross-tier cleanup
**Resolution strategy:** Fact store wins by default — it has trust scoring,
FTS5, and entity resolution. MEMORY.md copies are removed.
## Running Dedup
```python
# Via tool
result = fact_store(action="dedup")
# Via MemoryManager
report = memory_manager.run_dedup_scan()
```

View File

@@ -55,7 +55,7 @@ FACT_STORE_SCHEMA = {
"properties": {
"action": {
"type": "string",
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list", "dedup"],
"enum": ["add", "search", "probe", "related", "reason", "contradict", "update", "remove", "list"],
},
"content": {"type": "string", "description": "Fact content (required for 'add')."},
"query": {"type": "string", "description": "Search query (required for 'search')."},
@@ -242,48 +242,27 @@ class HolographicMemoryProvider(MemoryProvider):
self._auto_extract_facts(messages)
def on_memory_write(self, action: str, target: str, content: str) -> None:
"""Mirror built-in memory writes as facts with cross-tier dedup.
"""Mirror built-in memory writes as facts.
- add: check for duplicates first, skip if fact already exists
- replace: search for old content, update or re-add (dedup-aware)
- remove: remove matching facts (hard remove, not trust decay)
Dedup strategy: before adding, search existing facts for near-matches.
If similarity > 0.85, skip the add (existing fact store entry wins).
- add: mirror new fact to holographic store
- replace: search for old content, update or re-add
- remove: lower trust on matching facts so they fade naturally
"""
if not self._store:
return
try:
if action == "add" and content:
category = "user_pref" if target == "user" else "general"
# Cross-tier dedup: check if this fact already exists
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug(
"Skipping duplicate mirror: '%s' already exists as fact#%d",
content[:60], dup.get("fact_id", "?")
)
return
self._store.add_fact(content, category=category)
elif action == "replace" and content:
category = "user_pref" if target == "user" else "general"
# Check for duplicate before adding replacement
from .dedup import is_duplicate_before_add
existing = self._store.search_facts(content.strip()[:200], limit=5)
dup = is_duplicate_before_add(content, existing)
if dup:
logger.debug("Skipping duplicate replace mirror: fact#%d already matches", dup.get("fact_id", "?"))
return
self._store.add_fact(content, category=category)
elif action == "remove" and content:
# Hard remove matching facts (not just trust decay)
# Lower trust on matching facts so they decay naturally
results = self._store.search_facts(content, limit=5)
for fact in results:
if content.strip().lower() in fact.get("content", "").lower():
self._store.remove_fact(fact["fact_id"])
logger.debug("Removed mirrored fact#%d on memory remove", fact["fact_id"])
self._store.update_fact(fact["fact_id"], trust=max(0.0, fact.get("trust", 0.5) - 0.4))
except Exception as e:
logger.debug("Holographic memory_write mirror failed: %s", e)
@@ -372,31 +351,6 @@ class HolographicMemoryProvider(MemoryProvider):
)
return json.dumps({"facts": facts, "count": len(facts)})
elif action == "dedup":
from .dedup import scan_cross_tier_duplicates, resolve_duplicates, DedupReport
# Get all facts from store
all_facts = store.list_facts(min_trust=0.0, limit=1000)
# Get memory entries from built-in store (passed via kwargs if available)
memory_entries = kwargs.get("memory_entries", [])
if not memory_entries:
return json.dumps({
"status": "no_memory_entries",
"message": "No MEMORY.md entries provided for comparison. Use memory tool to read first.",
"fact_count": len(all_facts),
})
report = scan_cross_tier_duplicates(memory_entries, all_facts)
if report.duplicates_found == 0:
return json.dumps({"status": "clean", "message": "No cross-tier duplicates found."})
# Auto-resolve: fact store wins
cleaned = resolve_duplicates(report, memory_entries, store)
return json.dumps({
"status": "resolved",
"duplicates_found": report.duplicates_found,
"entries_removed": len(memory_entries) - len(cleaned),
"cleaned_entries": cleaned,
"summary": report.summary(),
})
else:
return json.dumps({"error": f"Unknown action: {action}"})

View File

@@ -1,191 +0,0 @@
"""Cross-tier memory deduplication.
Detects and resolves duplicate facts between MEMORY.md (built-in) and the
holographic fact store. Facts should live in exactly one tier:
Tier 1 — MEMORY.md: Always-on context (compact, <50 entries ideal).
Tier 2 — Fact store: Deep structured storage (unlimited, entity-aware).
Tier 3 — MemPalace: Specialized long-form archives.
Ownership rules:
- user_pref / project / tool facts → fact store (structured, searchable)
- "always-on" operational notes → MEMORY.md (compact, system prompt)
- When in doubt: fact store wins (it has dedup, trust scoring, FTS5)
"""
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from difflib import SequenceMatcher
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
_SIMILARITY_THRESHOLD = 0.85
@dataclass
class DuplicatePair:
memory_entry: str
memory_index: int
fact_store_id: int
fact_store_content: str
similarity: float
resolution: str = ""
resolved: bool = False
@dataclass
class DedupReport:
total_memory_entries: int = 0
total_facts: int = 0
duplicates_found: int = 0
pairs: List[DuplicatePair] = field(default_factory=list)
def summary(self) -> str:
lines = [
f"Cross-tier dedup: {self.total_memory_entries} MEMORY.md entries, "
f"{self.total_facts} fact store entries, "
f"{self.duplicates_found} duplicates found",
]
for p in self.pairs:
status = f"[{p.resolution}]" if p.resolved else "[PENDING]"
lines.append(
f" {status} sim={p.similarity:.2f} "
f"mem[{p.memory_index}]: {p.memory_entry[:60]} "
f"<> fact#{p.fact_store_id}: {p.fact_store_content[:60]}"
)
return "\n".join(lines)
def _normalize(text: str) -> str:
text = text.strip().lower()
text = re.sub(r"^[\\s>*\\-\\u2022]+", "", text)
text = re.sub(r"\\s+", " ", text)
text = text.rstrip(".,;:!?",)
return text
def _similarity(a: str, b: str) -> float:
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
def scan_cross_tier_duplicates(
memory_entries: List[str],
fact_store_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> DedupReport:
report = DedupReport(
total_memory_entries=len(memory_entries),
total_facts=len(fact_store_facts),
)
for i, mem_line in enumerate(memory_entries):
mem_norm = _normalize(mem_line)
if not mem_norm or len(mem_norm) < 10:
continue
for fact in fact_store_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
sim = _similarity(mem_norm, fact_norm)
if sim >= threshold:
report.pairs.append(DuplicatePair(
memory_entry=mem_line,
memory_index=i,
fact_store_id=fact.get("fact_id", -1),
fact_store_content=fact.get("content", ""),
similarity=sim,
))
report.duplicates_found = len(report.pairs)
return report
def classify_tier(fact_content: str, category: str = "general") -> str:
if category in ("user_pref", "project", "tool"):
return "factstore"
content = fact_content.strip()
if len(content) < 80 and any(
kw in content.lower() for kw in ("todo", "note:", "fix:", "remember:", "always", "never")
):
return "memory"
return "factstore"
def resolve_pair(pair: DuplicatePair) -> str:
pair.resolution = "keep_factstore"
pair.resolved = True
return pair.resolution
def resolve_duplicates(
report: DedupReport,
memory_entries: List[str],
fact_store=None,
) -> List[str]:
indices_to_remove = set()
for pair in report.pairs:
resolve_pair(pair)
if pair.resolution == "keep_factstore":
indices_to_remove.add(pair.memory_index)
elif pair.resolution == "keep_memory" and fact_store:
try:
fact_store.remove_fact(pair.fact_store_id)
except Exception as e:
logger.debug("Failed to remove fact %d: %s", pair.fact_store_id, e)
cleaned = [e for i, e in enumerate(memory_entries) if i not in indices_to_remove]
removed = len(memory_entries) - len(cleaned)
if removed:
logger.info("Dedup removed %d duplicate entries from MEMORY.md", removed)
return cleaned
def is_duplicate_before_add(
content: str,
existing_facts: List[Dict[str, Any]],
threshold: float = _SIMILARITY_THRESHOLD,
) -> Optional[Dict[str, Any]]:
"""Check if content is a duplicate of an existing fact BEFORE adding.
Returns the matching fact dict if duplicate, None otherwise.
Used by on_memory_write to prevent cross-tier duplication at write time.
"""
content_norm = _normalize(content)
if not content_norm or len(content_norm) < 10:
return None
for fact in existing_facts:
fact_norm = _normalize(fact.get("content", ""))
if not fact_norm or len(fact_norm) < 10:
continue
if _similarity(content_norm, fact_norm) >= threshold:
return fact
return None
TIER_OWNERSHIP_DOC = """# Memory Tier Ownership
Each fact lives in exactly one tier to prevent duplicate tokens and stale-data divergence.
## Tier 1 — MEMORY.md (built-in)
- Always-on system prompt context (compact, <50 entries ideal).
- Operational notes, active task state, immediate context.
- Managed by: `memory` tool.
## Tier 2 — Fact Store (holographic)
- Deep structured storage with search and reasoning.
- user_pref, project, tool facts; entity-linked knowledge.
- Managed by: `fact_store` tool.
- Has: FTS5 search, trust scoring, entity resolution.
## Tier 3 — MemPalace
- Specialized long-form archives and research.
## Rules
- MEMORY.md entries >100 chars → migrate to fact store.
- Structured categories (user_pref, project, tool) → fact store.
- Duplicate across tiers: fact store wins (it has trust scoring).
- `on_memory_write` checks fact store before mirroring.
"""

View File

@@ -0,0 +1,202 @@
"""Tests for agent.privacy_filter — PII stripping before remote API calls."""
import pytest
from agent.privacy_filter import (
PrivacyFilter,
RedactionReport,
Sensitivity,
sanitize_messages,
quick_sanitize,
)
class TestPrivacyFilterSanitizeText:
"""Test single-text sanitization."""
def test_no_pii_returns_clean(self):
pf = PrivacyFilter()
text = "The weather in Paris is nice today."
cleaned, redactions = pf.sanitize_text(text)
assert cleaned == text
assert redactions == []
def test_email_redacted(self):
pf = PrivacyFilter()
text = "Send report to alice@example.com by Friday."
cleaned, redactions = pf.sanitize_text(text)
assert "alice@example.com" not in cleaned
assert "[REDACTED-EMAIL]" in cleaned
assert any(r["type"] == "email_address" for r in redactions)
def test_phone_redacted(self):
pf = PrivacyFilter()
text = "Call me at 555-123-4567 when ready."
cleaned, redactions = pf.sanitize_text(text)
assert "555-123-4567" not in cleaned
assert "[REDACTED-PHONE]" in cleaned
def test_api_key_redacted(self):
pf = PrivacyFilter()
text = 'api_key = "sk-proj-abcdefghij1234567890abcdefghij1234567890"'
cleaned, redactions = pf.sanitize_text(text)
assert "sk-proj-" not in cleaned
assert any(r["sensitivity"] == "CRITICAL" for r in redactions)
def test_github_token_redacted(self):
pf = PrivacyFilter()
text = "Use ghp_1234567890abcdefghijklmnopqrstuvwxyz1234 for auth"
cleaned, redactions = pf.sanitize_text(text)
assert "ghp_" not in cleaned
assert any(r["type"] == "github_token" for r in redactions)
def test_ethereum_address_redacted(self):
pf = PrivacyFilter()
text = "Send to 0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18 please"
cleaned, redactions = pf.sanitize_text(text)
assert "0x742d" not in cleaned
assert any(r["type"] == "ethereum_address" for r in redactions)
def test_user_home_path_redacted(self):
pf = PrivacyFilter()
text = "Read file at /Users/alice/Documents/secret.txt"
cleaned, redactions = pf.sanitize_text(text)
assert "alice" not in cleaned
assert "[REDACTED-USER]" in cleaned
def test_multiple_pii_types(self):
pf = PrivacyFilter()
text = (
"Contact john@test.com or call 555-999-1234. "
"The API key is sk-abcdefghijklmnopqrstuvwxyz1234567890."
)
cleaned, redactions = pf.sanitize_text(text)
assert "john@test.com" not in cleaned
assert "555-999-1234" not in cleaned
assert "sk-abcd" not in cleaned
assert len(redactions) >= 3
class TestPrivacyFilterSanitizeMessages:
"""Test message-list sanitization."""
def test_sanitize_user_message(self):
pf = PrivacyFilter()
messages = [
{"role": "system", "content": "You are helpful."},
{"role": "user", "content": "Email me at bob@test.com with results."},
]
safe, report = pf.sanitize_messages(messages)
assert report.redacted_messages == 1
assert "bob@test.com" not in safe[1]["content"]
assert "[REDACTED-EMAIL]" in safe[1]["content"]
# System message unchanged
assert safe[0]["content"] == "You are helpful."
def test_no_redaction_needed(self):
pf = PrivacyFilter()
messages = [
{"role": "user", "content": "What is 2+2?"},
{"role": "assistant", "content": "4"},
]
safe, report = pf.sanitize_messages(messages)
assert report.redacted_messages == 0
assert not report.had_redactions
def test_assistant_messages_also_sanitized(self):
pf = PrivacyFilter()
messages = [
{"role": "assistant", "content": "Your email admin@corp.com was found."},
]
safe, report = pf.sanitize_messages(messages)
assert report.redacted_messages == 1
assert "admin@corp.com" not in safe[0]["content"]
def test_tool_messages_not_sanitized(self):
pf = PrivacyFilter()
messages = [
{"role": "tool", "content": "Result: user@test.com found"},
]
safe, report = pf.sanitize_messages(messages)
assert report.redacted_messages == 0
assert safe[0]["content"] == "Result: user@test.com found"
class TestShouldUseLocalOnly:
"""Test the local-only routing decision."""
def test_normal_text_allows_remote(self):
pf = PrivacyFilter()
block, reason = pf.should_use_local_only("Summarize this article about Python.")
assert not block
def test_critical_secret_blocks_remote(self):
pf = PrivacyFilter()
text = "Here is the API key: sk-abcdefghijklmnopqrstuvwxyz1234567890"
block, reason = pf.should_use_local_only(text)
assert block
assert "critical" in reason.lower()
def test_multiple_high_sensitivity_blocks(self):
pf = PrivacyFilter()
# 3+ high-sensitivity patterns
text = (
"Card: 4111-1111-1111-1111, "
"SSN: 123-45-6789, "
"BTC: 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa, "
"ETH: 0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18"
)
block, reason = pf.should_use_local_only(text)
assert block
class TestAggressiveMode:
"""Test aggressive filtering mode."""
def test_aggressive_redacts_internal_ips(self):
pf = PrivacyFilter(aggressive_mode=True)
text = "Server at 192.168.1.100 is responding."
cleaned, redactions = pf.sanitize_text(text)
assert "192.168.1.100" not in cleaned
assert any(r["type"] == "internal_ip" for r in redactions)
def test_normal_does_not_redact_ips(self):
pf = PrivacyFilter(aggressive_mode=False)
text = "Server at 192.168.1.100 is responding."
cleaned, redactions = pf.sanitize_text(text)
assert "192.168.1.100" in cleaned # IP preserved in normal mode
class TestConvenienceFunctions:
"""Test module-level convenience functions."""
def test_quick_sanitize(self):
text = "Contact alice@example.com for details"
result = quick_sanitize(text)
assert "alice@example.com" not in result
assert "[REDACTED-EMAIL]" in result
def test_sanitize_messages_convenience(self):
messages = [{"role": "user", "content": "Call 555-000-1234"}]
safe, report = sanitize_messages(messages)
assert report.redacted_messages == 1
class TestRedactionReport:
"""Test the reporting structure."""
def test_summary_no_redactions(self):
report = RedactionReport(total_messages=3, redacted_messages=0)
assert "No PII" in report.summary()
def test_summary_with_redactions(self):
report = RedactionReport(
total_messages=2,
redacted_messages=1,
redactions=[
{"type": "email_address", "sensitivity": "MEDIUM", "count": 2},
{"type": "phone_number_us", "sensitivity": "MEDIUM", "count": 1},
],
)
summary = report.summary()
assert "1/2" in summary
assert "email_address" in summary

View File

@@ -1,178 +0,0 @@
"""Tests for cross-tier memory deduplication.
Tests the dedup module's normalize, similarity, scan, resolve, and
is_duplicate_before_add functions.
"""
import pytest
import sys
import os
# Add the plugins path so we can import dedup
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from dedup import (
_normalize,
_similarity,
scan_cross_tier_duplicates,
resolve_duplicates,
is_duplicate_before_add,
classify_tier,
DedupReport,
DuplicatePair,
)
class TestNormalize:
def test_basic_lowercasing(self):
assert _normalize("Hello World") == "hello world"
def test_strips_markdown_bullets(self):
assert _normalize("- some fact") == "some fact"
assert _normalize("* some fact") == "some fact"
assert _normalize(" - some fact ") == "some fact"
def test_strips_trailing_punctuation(self):
assert _normalize("some fact.") == "some fact"
assert _normalize("some fact,") == "some fact"
assert _normalize("some fact;") == "some fact"
def test_collapses_whitespace(self):
assert _normalize("some fact here") == "some fact here"
def test_empty_and_short(self):
assert _normalize("") == ""
assert _normalize(" ") == ""
assert _normalize("abc") == "abc"
class TestSimilarity:
def test_identical_strings(self):
assert _similarity("hello world", "hello world") == 1.0
def test_completely_different(self):
assert _similarity("abc", "xyz") < 0.3
def test_similar_rephrasing(self):
sim = _similarity("deploy via ansible", "deploy with ansible")
assert sim > 0.7
def test_empty_strings(self):
assert _similarity("", "hello") == 0.0
assert _similarity("hello", "") == 0.0
assert _similarity("", "") == 0.0
class TestScanCrossTierDuplicates:
def test_no_duplicates(self):
memory = ["Deploy via Ansible", "Use mimo-v2-pro model"]
facts = [
{"fact_id": 1, "content": "User prefers dark mode"},
{"fact_id": 2, "content": "Project uses Python 3.11"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
assert len(report.pairs) == 0
def test_exact_duplicate(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
assert report.pairs[0].similarity == 1.0
assert report.pairs[0].fact_store_id == 1
def test_near_duplicate_above_threshold(self):
memory = ["Alexander prefers action over narration"]
facts = [{"fact_id": 1, "content": "Alexander prefers action over narration."}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 1
def test_below_threshold_not_duplicate(self):
memory = ["Deploy via Ansible on VPS"]
facts = [{"fact_id": 1, "content": "Deploy via Docker on local machine"}]
report = scan_cross_tier_duplicates(memory, facts, threshold=0.85)
assert report.duplicates_found == 0
def test_short_entries_skipped(self):
memory = ["OK", "ab"]
facts = [{"fact_id": 1, "content": "OK"}]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 0
def test_multiple_duplicates(self):
memory = ["Fact A here", "Fact B here"]
facts = [
{"fact_id": 1, "content": "Fact A here"},
{"fact_id": 2, "content": "Fact B here"},
]
report = scan_cross_tier_duplicates(memory, facts)
assert report.duplicates_found == 2
def test_report_summary(self):
memory = ["Deploy via Ansible"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
summary = report.summary()
assert "1 MEMORY.md entries" in summary
assert "1 fact store entries" in summary
assert "1 duplicates" in summary
class TestResolveDuplicates:
def test_removes_memory_duplicates(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Deploy via Ansible"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 1
assert cleaned[0] == "Use Python 3.11"
def test_no_duplicates_returns_same(self):
memory = ["Deploy via Ansible", "Use Python 3.11"]
facts = [{"fact_id": 1, "content": "Completely different fact"}]
report = scan_cross_tier_duplicates(memory, facts)
cleaned = resolve_duplicates(report, memory)
assert len(cleaned) == 2
class TestIsDuplicateBeforeAdd:
def test_finds_duplicate(self):
existing = [{"fact_id": 1, "content": "Deploy via Ansible"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is not None
assert result["fact_id"] == 1
def test_no_duplicate_returns_none(self):
existing = [{"fact_id": 1, "content": "Use dark mode"}]
result = is_duplicate_before_add("Deploy via Ansible", existing)
assert result is None
def test_short_content_returns_none(self):
existing = [{"fact_id": 1, "content": "OK"}]
result = is_duplicate_before_add("OK", existing)
assert result is None
def test_empty_existing_returns_none(self):
result = is_duplicate_before_add("Some fact here", [])
assert result is None
class TestClassifyTier:
def test_user_pref_goes_to_factstore(self):
assert classify_tier("anything", "user_pref") == "factstore"
def test_project_goes_to_factstore(self):
assert classify_tier("anything", "project") == "factstore"
def test_short_operational_note_goes_to_memory(self):
assert classify_tier("remember: always use sudo") == "memory"
assert classify_tier("todo: fix the deploy script") == "memory"
def test_long_fact_goes_to_factstore(self):
long_fact = "The deployment process requires running ansible-playbook with the production inventory file and verifying health checks after completion"
assert classify_tier(long_fact) == "factstore"
def test_general_short_goes_to_factstore(self):
# Short but not operational
assert classify_tier("user likes dark mode") == "factstore"

View File

@@ -0,0 +1,190 @@
"""Tests for tools.confirmation_daemon — Human Confirmation Firewall."""
import pytest
import time
from tools.confirmation_daemon import (
ConfirmationDaemon,
ConfirmationRequest,
ConfirmationStatus,
RiskLevel,
classify_action,
_is_whitelisted,
_DEFAULT_WHITELIST,
)
class TestClassifyAction:
"""Test action risk classification."""
def test_crypto_tx_is_critical(self):
assert classify_action("crypto_tx") == RiskLevel.CRITICAL
def test_sign_transaction_is_critical(self):
assert classify_action("sign_transaction") == RiskLevel.CRITICAL
def test_send_email_is_high(self):
assert classify_action("send_email") == RiskLevel.HIGH
def test_send_message_is_medium(self):
assert classify_action("send_message") == RiskLevel.MEDIUM
def test_access_calendar_is_low(self):
assert classify_action("access_calendar") == RiskLevel.LOW
def test_unknown_action_is_medium(self):
assert classify_action("unknown_action_xyz") == RiskLevel.MEDIUM
class TestWhitelist:
"""Test whitelist auto-approval."""
def test_self_email_is_whitelisted(self):
whitelist = dict(_DEFAULT_WHITELIST)
payload = {"from": "me@test.com", "to": "me@test.com"}
assert _is_whitelisted("send_email", payload, whitelist) is True
def test_non_whitelisted_recipient_not_approved(self):
whitelist = dict(_DEFAULT_WHITELIST)
payload = {"to": "random@stranger.com"}
assert _is_whitelisted("send_email", payload, whitelist) is False
def test_whitelisted_contact_approved(self):
whitelist = {
"send_message": {"targets": ["alice", "bob"]},
}
assert _is_whitelisted("send_message", {"to": "alice"}, whitelist) is True
assert _is_whitelisted("send_message", {"to": "charlie"}, whitelist) is False
def test_no_whitelist_entry_means_not_whitelisted(self):
whitelist = {}
assert _is_whitelisted("crypto_tx", {"amount": 1.0}, whitelist) is False
class TestConfirmationRequest:
"""Test the request data model."""
def test_defaults(self):
req = ConfirmationRequest(
request_id="test-1",
action="send_email",
description="Test email",
risk_level="high",
payload={},
)
assert req.status == ConfirmationStatus.PENDING.value
assert req.created_at > 0
assert req.expires_at > req.created_at
def test_is_pending(self):
req = ConfirmationRequest(
request_id="test-2",
action="send_email",
description="Test",
risk_level="high",
payload={},
expires_at=time.time() + 300,
)
assert req.is_pending is True
def test_is_expired(self):
req = ConfirmationRequest(
request_id="test-3",
action="send_email",
description="Test",
risk_level="high",
payload={},
expires_at=time.time() - 10,
)
assert req.is_expired is True
assert req.is_pending is False
def test_to_dict(self):
req = ConfirmationRequest(
request_id="test-4",
action="send_email",
description="Test",
risk_level="medium",
payload={"to": "a@b.com"},
)
d = req.to_dict()
assert d["request_id"] == "test-4"
assert d["action"] == "send_email"
assert "is_pending" in d
class TestConfirmationDaemon:
"""Test the daemon logic (without HTTP layer)."""
def test_auto_approve_low_risk(self):
daemon = ConfirmationDaemon()
req = daemon.request(
action="access_calendar",
description="Read today's events",
risk_level="low",
)
assert req.status == ConfirmationStatus.AUTO_APPROVED.value
def test_whitelisted_auto_approves(self):
daemon = ConfirmationDaemon()
daemon._whitelist = {"send_message": {"targets": ["alice"]}}
req = daemon.request(
action="send_message",
description="Message alice",
payload={"to": "alice"},
)
assert req.status == ConfirmationStatus.AUTO_APPROVED.value
def test_non_whitelisted_goes_pending(self):
daemon = ConfirmationDaemon()
daemon._whitelist = {}
req = daemon.request(
action="send_email",
description="Email to stranger",
payload={"to": "stranger@test.com"},
risk_level="high",
)
assert req.status == ConfirmationStatus.PENDING.value
assert req.is_pending is True
def test_approve_response(self):
daemon = ConfirmationDaemon()
daemon._whitelist = {}
req = daemon.request(
action="send_email",
description="Email test",
risk_level="high",
)
result = daemon.respond(req.request_id, approved=True, decided_by="human")
assert result.status == ConfirmationStatus.APPROVED.value
assert result.decided_by == "human"
def test_deny_response(self):
daemon = ConfirmationDaemon()
daemon._whitelist = {}
req = daemon.request(
action="crypto_tx",
description="Send 1 ETH",
risk_level="critical",
)
result = daemon.respond(
req.request_id, approved=False, decided_by="human", reason="Too risky"
)
assert result.status == ConfirmationStatus.DENIED.value
assert result.reason == "Too risky"
def test_get_pending(self):
daemon = ConfirmationDaemon()
daemon._whitelist = {}
daemon.request(action="send_email", description="Test 1", risk_level="high")
daemon.request(action="send_email", description="Test 2", risk_level="high")
pending = daemon.get_pending()
assert len(pending) >= 2
def test_get_history(self):
daemon = ConfirmationDaemon()
req = daemon.request(
action="access_calendar", description="Test", risk_level="low"
)
history = daemon.get_history()
assert len(history) >= 1
assert history[0]["action"] == "access_calendar"

View File

@@ -40,11 +40,18 @@ def reset_current_session_key(token: contextvars.Token[str]) -> None:
def get_current_session_key(default: str = "default") -> str:
"""Return the active session key, preferring context-local state."""
"""Return the active session key, preferring context-local state.
Resolution order:
1. approval-specific contextvars (set by gateway before agent.run)
2. session_context contextvars (set by _set_session_env)
3. os.environ fallback (CLI, cron, tests)
"""
session_key = _approval_session_key.get()
if session_key:
return session_key
return os.getenv("HERMES_SESSION_KEY", default)
from gateway.session_context import get_session_env
return get_session_env("HERMES_SESSION_KEY", default)
# Sensitive write targets that should trigger approval even when referenced
# via shell expansions like $HOME or $HERMES_HOME.
@@ -80,7 +87,7 @@ DANGEROUS_PATTERNS = [
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
(r'>\s*/etc/', "overwrite system config"),
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
(r'\bsystemctl\s+(-[^\s]+\s+)*(stop|restart|disable|mask)\b', "stop/restart system service"),
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
(r'\bpkill\s+-9\b', "force kill processes"),
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
@@ -94,15 +101,53 @@ DANGEROUS_PATTERNS = [
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
(r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"),
(r'\bfind\b.*-delete\b', "find -delete"),
# Gateway lifecycle protection: prevent the agent from killing its own
# gateway process. These commands trigger a gateway restart/stop that
# terminates all running agents mid-work.
(r'\bhermes\s+gateway\s+(stop|restart)\b', "stop/restart hermes gateway (kills running agents)"),
(r'\bhermes\s+update\b', "hermes update (restarts gateway, kills running agents)"),
# Gateway protection: never start gateway outside systemd management
(r'gateway\s+run\b.*(&\s*$|&\s*;|\bdisown\b|\bsetsid\b)', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
(r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
# Self-termination protection: prevent agent from killing its own process
(r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
# Self-termination via kill + command substitution (pgrep/pidof).
# The name-based pattern above catches `pkill hermes` but not
# `kill -9 $(pgrep -f hermes)` because the substitution is opaque
# to regex at detection time. Catch the structural pattern instead.
(r'\bkill\b.*\$\(\s*pgrep\b', "kill process via pgrep expansion (self-termination)"),
(r'\bkill\b.*`\s*pgrep\b', "kill process via backtick pgrep expansion (self-termination)"),
# File copy/move/edit into sensitive system paths
(r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"),
(r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"),
(r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"),
# --- Vitalik's threat model: crypto / financial ---
(r'\b(?:bitcoin-cli|ethers\.js|web3|ether\.sendTransaction)\b', "direct crypto transaction tool usage"),
(r'\bwget\b.*\b(?:mnemonic|seed\s*phrase|private[_-]?key)\b', "attempting to download crypto credentials"),
(r'\bcurl\b.*\b(?:mnemonic|seed\s*phrase|private[_-]?key)\b', "attempting to exfiltrate crypto credentials"),
# --- Vitalik's threat model: credential exfiltration ---
(r'\b(?:curl|wget|http|nc|ncat|socat)\b.*\b(?:\.env|\.ssh|credentials|secrets|token|api[_-]?key)\b',
"attempting to exfiltrate credentials via network"),
(r'\bbase64\b.*\|(?:\s*curl|\s*wget)', "base64-encode then network exfiltration"),
(r'\bcat\b.*\b(?:\.env|\.ssh/id_rsa|credentials)\b.*\|(?:\s*curl|\s*wget)',
"reading secrets and piping to network tool"),
# --- Vitalik's threat model: data exfiltration ---
(r'\bcurl\b.*-d\s.*\$(?:HOME|USER)', "sending user home directory data to remote"),
(r'\bwget\b.*--post-data\s.*\$(?:HOME|USER)', "posting user data to remote"),
# Script execution via heredoc — bypasses the -e/-c flag patterns above.
# `python3 << 'EOF'` feeds arbitrary code via stdin without -c/-e flags.
(r'\b(python[23]?|perl|ruby|node)\s+<<', "script execution via heredoc"),
# Git destructive operations that can lose uncommitted work or rewrite
# shared history. Not captured by rm/chmod/etc patterns.
(r'\bgit\s+reset\s+--hard\b', "git reset --hard (destroys uncommitted changes)"),
(r'\bgit\s+push\b.*--force\b', "git force push (rewrites remote history)"),
(r'\bgit\s+push\b.*-f\b', "git force push short flag (rewrites remote history)"),
(r'\bgit\s+clean\s+-[^\s]*f', "git clean with force (deletes untracked files)"),
(r'\bgit\s+branch\s+-D\b', "git branch force delete"),
# Script execution after chmod +x — catches the two-step pattern where
# a script is first made executable then immediately run. The script
# content may contain dangerous commands that individual patterns miss.
(r'\bchmod\s+\+x\b.*[;&|]+\s*\./', "chmod +x followed by immediate execution"),
]
@@ -172,6 +217,7 @@ def detect_dangerous_command(command: str) -> tuple:
_lock = threading.Lock()
_pending: dict[str, dict] = {}
_session_approved: dict[str, set] = {}
_session_yolo: set[str] = set()
_permanent_approved: set = set()
# =========================================================================
@@ -257,36 +303,58 @@ def has_blocking_approval(session_key: str) -> bool:
return bool(_gateway_queues.get(session_key))
def pending_approval_count(session_key: str) -> int:
"""Return the number of pending blocking approvals for a session."""
with _lock:
return len(_gateway_queues.get(session_key, []))
def submit_pending(session_key: str, approval: dict):
"""Store a pending approval request for a session."""
with _lock:
_pending[session_key] = approval
def pop_pending(session_key: str) -> Optional[dict]:
"""Retrieve and remove a pending approval for a session."""
with _lock:
return _pending.pop(session_key, None)
def has_pending(session_key: str) -> bool:
"""Check if a session has a pending approval request."""
with _lock:
return session_key in _pending
def approve_session(session_key: str, pattern_key: str):
"""Approve a pattern for this session only."""
with _lock:
_session_approved.setdefault(session_key, set()).add(pattern_key)
def enable_session_yolo(session_key: str) -> None:
"""Enable YOLO bypass for a single session key."""
if not session_key:
return
with _lock:
_session_yolo.add(session_key)
def disable_session_yolo(session_key: str) -> None:
"""Disable YOLO bypass for a single session key."""
if not session_key:
return
with _lock:
_session_yolo.discard(session_key)
def clear_session(session_key: str) -> None:
"""Remove all approval and yolo state for a given session."""
if not session_key:
return
with _lock:
_session_approved.pop(session_key, None)
_session_yolo.discard(session_key)
_pending.pop(session_key, None)
_gateway_queues.pop(session_key, None)
def is_session_yolo_enabled(session_key: str) -> bool:
"""Return True when YOLO bypass is enabled for a specific session."""
if not session_key:
return False
with _lock:
return session_key in _session_yolo
def is_current_session_yolo_enabled() -> bool:
"""Return True when the active approval session has YOLO bypass enabled."""
return is_session_yolo_enabled(get_current_session_key(default=""))
def is_approved(session_key: str, pattern_key: str) -> bool:
"""Check if a pattern is approved (session-scoped or permanent).
@@ -313,17 +381,6 @@ def load_permanent(patterns: set):
_permanent_approved.update(patterns)
def clear_session(session_key: str):
"""Clear all approvals and pending requests for a session."""
with _lock:
_session_approved.pop(session_key, None)
_pending.pop(session_key, None)
_gateway_notify_cbs.pop(session_key, None)
# Signal ALL blocked threads so they don't hang forever
entries = _gateway_queues.pop(session_key, [])
for entry in entries:
entry.event.set()
# =========================================================================
# Config persistence for permanent allowlist
@@ -342,7 +399,8 @@ def load_permanent_allowlist() -> set:
if patterns:
load_permanent(patterns)
return patterns
except Exception:
except Exception as e:
logger.warning("Failed to load permanent allowlist: %s", e)
return set()
@@ -384,7 +442,8 @@ def prompt_dangerous_approval(command: str, description: str,
try:
return approval_callback(command, description,
allow_permanent=allow_permanent)
except Exception:
except Exception as e:
logger.error("Approval callback failed: %s", e, exc_info=True)
return "deny"
os.environ["HERMES_SPINNER_PAUSE"] = "1"
@@ -466,7 +525,8 @@ def _get_approval_config() -> dict:
from hermes_cli.config import load_config
config = load_config()
return config.get("approvals", {}) or {}
except Exception:
except Exception as e:
logger.warning("Failed to load approval config: %s", e)
return {}
@@ -554,8 +614,9 @@ def check_dangerous_command(command: str, env_type: str,
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts
if os.getenv("HERMES_YOLO_MODE"):
# --yolo: bypass all approval prompts. Gateway /yolo is session-scoped;
# CLI --yolo remains process-scoped via the env var for local use.
if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled():
return {"approved": True, "message": None}
is_dangerous, pattern_key, description = detect_dangerous_command(command)
@@ -655,9 +716,10 @@ def check_all_command_guards(command: str, env_type: str,
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo or approvals.mode=off: bypass all approval prompts
# --yolo or approvals.mode=off: bypass all approval prompts.
# Gateway /yolo is session-scoped; CLI --yolo remains process-scoped.
approval_mode = _get_approval_mode()
if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off":
if os.getenv("HERMES_YOLO_MODE") or is_current_session_yolo_enabled() or approval_mode == "off":
return {"approved": True, "message": None}
is_cli = os.getenv("HERMES_INTERACTIVE")

View File

@@ -0,0 +1,615 @@
"""Human Confirmation Daemon — HTTP server for two-factor action approval.
Implements Vitalik's Pattern 1: "The new 'two-factor confirmation' is that
the two factors are the human and the LLM."
This daemon runs on localhost:6000 and provides a simple HTTP API for the
agent to request human approval before executing high-risk actions.
Threat model:
- LLM jailbreaks: Remote content "hacking" the LLM to perform malicious actions
- LLM accidents: LLM accidentally performing dangerous operations
- The human acts as the second factor — the agent proposes, the human disposes
Architecture:
- Agent detects high-risk action → POST /confirm with action details
- Daemon stores pending request, sends notification to user
- User approves/denies via POST /respond (Telegram, CLI, or direct HTTP)
- Agent receives decision and proceeds or aborts
Usage:
# Start daemon (usually managed by gateway)
from tools.confirmation_daemon import ConfirmationDaemon
daemon = ConfirmationDaemon(port=6000)
daemon.start()
# Request approval (from agent code)
from tools.confirmation_daemon import request_confirmation
approved = request_confirmation(
action="send_email",
description="Send email to alice@example.com",
risk_level="high",
payload={"to": "alice@example.com", "subject": "Meeting notes"},
timeout=300,
)
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import threading
import time
import uuid
from dataclasses import dataclass, field, asdict
from enum import Enum, auto
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
class RiskLevel(Enum):
"""Risk classification for actions requiring confirmation."""
LOW = "low" # Log only, no confirmation needed
MEDIUM = "medium" # Confirm for non-whitelisted targets
HIGH = "high" # Always confirm
CRITICAL = "critical" # Always confirm + require explicit reason
class ConfirmationStatus(Enum):
"""Status of a pending confirmation request."""
PENDING = "pending"
APPROVED = "approved"
DENIED = "denied"
EXPIRED = "expired"
AUTO_APPROVED = "auto_approved"
@dataclass
class ConfirmationRequest:
"""A request for human confirmation of a high-risk action."""
request_id: str
action: str # Action type: send_email, send_message, crypto_tx, etc.
description: str # Human-readable description of what will happen
risk_level: str # low, medium, high, critical
payload: Dict[str, Any] # Action-specific data (sanitized)
session_key: str = "" # Session that initiated the request
created_at: float = 0.0
expires_at: float = 0.0
status: str = ConfirmationStatus.PENDING.value
decided_at: float = 0.0
decided_by: str = "" # "human", "auto", "whitelist"
reason: str = "" # Optional reason for denial
def __post_init__(self):
if not self.created_at:
self.created_at = time.time()
if not self.expires_at:
self.expires_at = self.created_at + 300 # 5 min default
if not self.request_id:
self.request_id = str(uuid.uuid4())[:12]
@property
def is_expired(self) -> bool:
return time.time() > self.expires_at
@property
def is_pending(self) -> bool:
return self.status == ConfirmationStatus.PENDING.value and not self.is_expired
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["is_expired"] = self.is_expired
d["is_pending"] = self.is_pending
return d
# =========================================================================
# Action categories (Vitalik's threat model)
# =========================================================================
ACTION_CATEGORIES = {
# Messaging — outbound communication to external parties
"send_email": RiskLevel.HIGH,
"send_message": RiskLevel.MEDIUM, # Depends on recipient
"send_signal": RiskLevel.HIGH,
"send_telegram": RiskLevel.MEDIUM,
"send_discord": RiskLevel.MEDIUM,
"post_social": RiskLevel.HIGH,
# Financial / crypto
"crypto_tx": RiskLevel.CRITICAL,
"sign_transaction": RiskLevel.CRITICAL,
"access_wallet": RiskLevel.CRITICAL,
"modify_balance": RiskLevel.CRITICAL,
# System modification
"install_software": RiskLevel.HIGH,
"modify_system_config": RiskLevel.HIGH,
"modify_firewall": RiskLevel.CRITICAL,
"add_ssh_key": RiskLevel.CRITICAL,
"create_user": RiskLevel.CRITICAL,
# Data access
"access_contacts": RiskLevel.MEDIUM,
"access_calendar": RiskLevel.LOW,
"read_private_files": RiskLevel.MEDIUM,
"upload_data": RiskLevel.HIGH,
"share_credentials": RiskLevel.CRITICAL,
# Network
"open_port": RiskLevel.HIGH,
"modify_dns": RiskLevel.HIGH,
"expose_service": RiskLevel.CRITICAL,
}
# Default: any unrecognized action is MEDIUM risk
DEFAULT_RISK_LEVEL = RiskLevel.MEDIUM
def classify_action(action: str) -> RiskLevel:
"""Classify an action by its risk level."""
return ACTION_CATEGORIES.get(action, DEFAULT_RISK_LEVEL)
# =========================================================================
# Whitelist configuration
# =========================================================================
_DEFAULT_WHITELIST = {
"send_message": {
"targets": [], # Contact names/IDs that don't need confirmation
},
"send_email": {
"targets": [], # Email addresses that don't need confirmation
"self_only": True, # send-to-self always allowed
},
}
def _load_whitelist() -> Dict[str, Any]:
"""Load action whitelist from config."""
config_path = Path.home() / ".hermes" / "approval_whitelist.json"
if config_path.exists():
try:
with open(config_path) as f:
return json.load(f)
except Exception as e:
logger.warning("Failed to load approval whitelist: %s", e)
return dict(_DEFAULT_WHITELIST)
def _is_whitelisted(action: str, payload: Dict[str, Any], whitelist: Dict) -> bool:
"""Check if an action is pre-approved by the whitelist."""
action_config = whitelist.get(action, {})
if not action_config:
return False
# Check target-based whitelist
targets = action_config.get("targets", [])
target = payload.get("to") or payload.get("recipient") or payload.get("target", "")
if target and target in targets:
return True
# Self-only email
if action_config.get("self_only") and action == "send_email":
sender = payload.get("from", "")
recipient = payload.get("to", "")
if sender and recipient and sender.lower() == recipient.lower():
return True
return False
# =========================================================================
# Confirmation daemon
# =========================================================================
class ConfirmationDaemon:
"""HTTP daemon for human confirmation of high-risk actions.
Runs on localhost:PORT (default 6000). Provides:
- POST /confirm — agent requests human approval
- POST /respond — human approves/denies
- GET /pending — list pending requests
- GET /health — health check
"""
def __init__(
self,
host: str = "127.0.0.1",
port: int = 6000,
default_timeout: int = 300,
notify_callback: Optional[Callable] = None,
):
self.host = host
self.port = port
self.default_timeout = default_timeout
self.notify_callback = notify_callback
self._pending: Dict[str, ConfirmationRequest] = {}
self._history: List[ConfirmationRequest] = []
self._lock = threading.Lock()
self._whitelist = _load_whitelist()
self._app = None
self._runner = None
def request(
self,
action: str,
description: str,
payload: Optional[Dict[str, Any]] = None,
risk_level: Optional[str] = None,
session_key: str = "",
timeout: Optional[int] = None,
) -> ConfirmationRequest:
"""Create a confirmation request.
Returns the request. Check .status to see if it was immediately
auto-approved (whitelisted) or is pending human review.
"""
payload = payload or {}
# Classify risk if not specified
if risk_level is None:
risk_level = classify_action(action).value
# Check whitelist
if risk_level in ("low",) or _is_whitelisted(action, payload, self._whitelist):
req = ConfirmationRequest(
request_id=str(uuid.uuid4())[:12],
action=action,
description=description,
risk_level=risk_level,
payload=payload,
session_key=session_key,
expires_at=time.time() + (timeout or self.default_timeout),
status=ConfirmationStatus.AUTO_APPROVED.value,
decided_at=time.time(),
decided_by="whitelist",
)
with self._lock:
self._history.append(req)
logger.info("Auto-approved whitelisted action: %s", action)
return req
# Create pending request
req = ConfirmationRequest(
request_id=str(uuid.uuid4())[:12],
action=action,
description=description,
risk_level=risk_level,
payload=payload,
session_key=session_key,
expires_at=time.time() + (timeout or self.default_timeout),
)
with self._lock:
self._pending[req.request_id] = req
# Notify human
if self.notify_callback:
try:
self.notify_callback(req.to_dict())
except Exception as e:
logger.warning("Confirmation notify callback failed: %s", e)
logger.info(
"Confirmation request %s: %s (%s risk) — waiting for human",
req.request_id, action, risk_level,
)
return req
def respond(
self,
request_id: str,
approved: bool,
decided_by: str = "human",
reason: str = "",
) -> Optional[ConfirmationRequest]:
"""Record a human decision on a pending request."""
with self._lock:
req = self._pending.get(request_id)
if not req:
logger.warning("Confirmation respond: unknown request %s", request_id)
return None
if not req.is_pending:
logger.warning("Confirmation respond: request %s already decided", request_id)
return req
req.status = (
ConfirmationStatus.APPROVED.value if approved
else ConfirmationStatus.DENIED.value
)
req.decided_at = time.time()
req.decided_by = decided_by
req.reason = reason
# Move to history
del self._pending[request_id]
self._history.append(req)
logger.info(
"Confirmation %s: %s by %s",
request_id, "APPROVED" if approved else "DENIED", decided_by,
)
return req
def wait_for_decision(
self, request_id: str, timeout: Optional[float] = None
) -> ConfirmationRequest:
"""Block until a decision is made or timeout expires."""
deadline = time.time() + (timeout or self.default_timeout)
while time.time() < deadline:
with self._lock:
req = self._pending.get(request_id)
if req and not req.is_pending:
return req
if req and req.is_expired:
req.status = ConfirmationStatus.EXPIRED.value
del self._pending[request_id]
self._history.append(req)
return req
time.sleep(0.5)
# Timeout
with self._lock:
req = self._pending.pop(request_id, None)
if req:
req.status = ConfirmationStatus.EXPIRED.value
self._history.append(req)
return req
# Shouldn't reach here
return ConfirmationRequest(
request_id=request_id,
action="unknown",
description="Request not found",
risk_level="high",
payload={},
status=ConfirmationStatus.EXPIRED.value,
)
def get_pending(self) -> List[Dict[str, Any]]:
"""Return list of pending confirmation requests."""
self._expire_old()
with self._lock:
return [r.to_dict() for r in self._pending.values() if r.is_pending]
def get_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Return recent confirmation history."""
with self._lock:
return [r.to_dict() for r in self._history[-limit:]]
def _expire_old(self) -> None:
"""Move expired requests to history."""
now = time.time()
with self._lock:
expired = [
rid for rid, req in self._pending.items()
if now > req.expires_at
]
for rid in expired:
req = self._pending.pop(rid)
req.status = ConfirmationStatus.EXPIRED.value
self._history.append(req)
# --- aiohttp HTTP API ---
async def _handle_health(self, request):
from aiohttp import web
return web.json_response({
"status": "ok",
"service": "hermes-confirmation-daemon",
"pending": len(self._pending),
})
async def _handle_confirm(self, request):
from aiohttp import web
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON"}, status=400)
action = body.get("action", "")
description = body.get("description", "")
if not action or not description:
return web.json_response(
{"error": "action and description required"}, status=400
)
req = self.request(
action=action,
description=description,
payload=body.get("payload", {}),
risk_level=body.get("risk_level"),
session_key=body.get("session_key", ""),
timeout=body.get("timeout"),
)
# If auto-approved, return immediately
if req.status != ConfirmationStatus.PENDING.value:
return web.json_response({
"request_id": req.request_id,
"status": req.status,
"decided_by": req.decided_by,
})
# Otherwise, wait for human decision (with timeout)
timeout = min(body.get("timeout", self.default_timeout), 600)
result = self.wait_for_decision(req.request_id, timeout=timeout)
return web.json_response({
"request_id": result.request_id,
"status": result.status,
"decided_by": result.decided_by,
"reason": result.reason,
})
async def _handle_respond(self, request):
from aiohttp import web
try:
body = await request.json()
except Exception:
return web.json_response({"error": "invalid JSON"}, status=400)
request_id = body.get("request_id", "")
approved = body.get("approved")
if not request_id or approved is None:
return web.json_response(
{"error": "request_id and approved required"}, status=400
)
result = self.respond(
request_id=request_id,
approved=bool(approved),
decided_by=body.get("decided_by", "human"),
reason=body.get("reason", ""),
)
if not result:
return web.json_response({"error": "unknown request"}, status=404)
return web.json_response({
"request_id": result.request_id,
"status": result.status,
})
async def _handle_pending(self, request):
from aiohttp import web
return web.json_response({"pending": self.get_pending()})
def _build_app(self):
"""Build the aiohttp application."""
from aiohttp import web
app = web.Application()
app.router.add_get("/health", self._handle_health)
app.router.add_post("/confirm", self._handle_confirm)
app.router.add_post("/respond", self._handle_respond)
app.router.add_get("/pending", self._handle_pending)
self._app = app
return app
async def start_async(self) -> None:
"""Start the daemon as an async server."""
from aiohttp import web
app = self._build_app()
self._runner = web.AppRunner(app)
await self._runner.setup()
site = web.TCPSite(self._runner, self.host, self.port)
await site.start()
logger.info("Confirmation daemon listening on %s:%d", self.host, self.port)
async def stop_async(self) -> None:
"""Stop the daemon."""
if self._runner:
await self._runner.cleanup()
self._runner = None
def start(self) -> None:
"""Start daemon in a background thread (blocking caller)."""
def _run():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.start_async())
loop.run_forever()
t = threading.Thread(target=_run, daemon=True, name="confirmation-daemon")
t.start()
logger.info("Confirmation daemon started in background thread")
def start_blocking(self) -> None:
"""Start daemon and block (for standalone use)."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.start_async())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(self.stop_async())
# =========================================================================
# Convenience API for agent integration
# =========================================================================
# Global singleton — initialized by gateway or CLI at startup
_daemon: Optional[ConfirmationDaemon] = None
def get_daemon() -> Optional[ConfirmationDaemon]:
"""Get the global confirmation daemon instance."""
return _daemon
def init_daemon(
host: str = "127.0.0.1",
port: int = 6000,
notify_callback: Optional[Callable] = None,
) -> ConfirmationDaemon:
"""Initialize the global confirmation daemon."""
global _daemon
_daemon = ConfirmationDaemon(
host=host, port=port, notify_callback=notify_callback
)
return _daemon
def request_confirmation(
action: str,
description: str,
payload: Optional[Dict[str, Any]] = None,
risk_level: Optional[str] = None,
session_key: str = "",
timeout: int = 300,
) -> bool:
"""Request human confirmation for a high-risk action.
This is the primary integration point for agent code. It:
1. Classifies the action risk level
2. Checks the whitelist
3. If confirmation needed, blocks until human responds
4. Returns True if approved, False if denied/expired
Args:
action: Action type (send_email, crypto_tx, etc.)
description: Human-readable description
payload: Action-specific data
risk_level: Override auto-classification
session_key: Session requesting approval
timeout: Seconds to wait for human response
Returns:
True if approved, False if denied or expired.
"""
daemon = get_daemon()
if not daemon:
logger.warning(
"No confirmation daemon running — DENYING action %s by default. "
"Start daemon with init_daemon() or --confirmation-daemon flag.",
action,
)
return False
req = daemon.request(
action=action,
description=description,
payload=payload,
risk_level=risk_level,
session_key=session_key,
timeout=timeout,
)
# Auto-approved (whitelisted)
if req.status == ConfirmationStatus.AUTO_APPROVED.value:
return True
# Wait for human
result = daemon.wait_for_decision(req.request_id, timeout=timeout)
return result.status == ConfirmationStatus.APPROVED.value