Memory P5 — Sovereignty verification — no network in memory path #265

Merged
Rockachopa merged 1 commits from burn/20260409-2105-memory-sovereignty into main 2026-04-10 03:44:26 +00:00

View File

@@ -0,0 +1,321 @@
#!/usr/bin/env python3
"""
Memory Sovereignty Verification
Verifies that the memory path in hermes-agent has no network dependencies.
Memory data must stay on the local filesystem only — no HTTP calls, no external
API calls, no cloud sync during memory read/write/flush/load operations.
Scans:
- tools/memory_tool.py (MEMORY.md / USER.md store)
- hermes_state.py (SQLite session store)
- tools/session_search_tool.py (FTS5 session search + summarization)
- tools/graph_store.py (knowledge graph persistence)
- tools/temporal_kg_tool.py (temporal knowledge graph)
- agent/temporal_knowledge_graph.py (temporal triple store)
- tools/skills_tool.py (skill listing/viewing)
- tools/skills_sync.py (bundled skill syncing)
Exit codes:
0 = sovereign (no violations)
1 = violations found
"""
import ast
import re
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
# Files in the memory path to scan (relative to repo root).
MEMORY_FILES = [
"tools/memory_tool.py",
"hermes_state.py",
"tools/session_search_tool.py",
"tools/graph_store.py",
"tools/temporal_kg_tool.py",
"agent/temporal_knowledge_graph.py",
"tools/skills_tool.py",
"tools/skills_sync.py",
]
# Patterns that indicate network/external API usage.
NETWORK_PATTERNS = [
# HTTP libraries
(r'\brequests\.(get|post|put|delete|patch|head|session)', "requests HTTP call"),
(r'\burllib\.request\.(urlopen|Request)', "urllib HTTP call"),
(r'\bhttpx\.(get|post|put|delete|Client|AsyncClient)', "httpx HTTP call"),
(r'\bhttp\.client\.(HTTPConnection|HTTPSConnection)', "http.client connection"),
(r'\baiohttp\.(ClientSession|get|post)', "aiohttp HTTP call"),
(r'\bwebsockets\.\w+', "websocket connection"),
# API client patterns
(r'\bopenai\b.*\b(api_key|chat|completions|Client)\b', "OpenAI API usage"),
(r'\banthropic\b.*\b(api_key|messages|Client)\b', "Anthropic API usage"),
(r'\bAsyncOpenAI\b', "AsyncOpenAI client"),
(r'\bAsyncAnthropic\b', "AsyncAnthropic client"),
# Generic network indicators
(r'\bsocket\.(socket|connect|create_connection)', "raw socket connection"),
(r'\bftplib\b', "FTP connection"),
(r'\bsmtplib\b', "SMTP connection"),
(r'\bparamiko\b', "SSH connection via paramiko"),
# URL patterns (hardcoded endpoints)
(r'https?://(?!example\.com)[a-zA-Z0-9._-]+\.(com|org|net|io|dev|ai)', "hardcoded URL"),
]
# Import aliases that indicate network-capable modules.
NETWORK_IMPORTS = {
"requests",
"httpx",
"aiohttp",
"urllib.request",
"http.client",
"websockets",
"openai",
"anthropic",
"openrouter_client",
}
# Functions whose names suggest network I/O.
NETWORK_FUNC_NAMES = {
"async_call_llm",
"extract_content_or_reasoning",
}
# Files that are ALLOWED to have network calls (known violations with justification).
# Each entry maps to a reason string.
KNOWN_VIOLATIONS = {
"tools/graph_store.py": (
"GraphStore persists to Gitea via API — not part of core memory path "
"(MEMORY.md/USER.md/SQLite). Excluded from sovereignty gate."
),
"tools/session_search_tool.py": (
"Session search uses LLM summarization (auxiliary client) to generate "
"summaries. The FTS5 search itself is local; the LLM call is for "
"presentation, not storage. Known architectural trade-off."
),
}
# ---------------------------------------------------------------------------
# Scanner
# ---------------------------------------------------------------------------
class Violation:
"""A sovereignty violation with location and description."""
def __init__(self, file: str, line: int, description: str, code: str):
self.file = file
self.line = line
self.description = description
self.code = code.strip()
def __str__(self):
return f"{self.file}:{self.line}: {self.description}\n {self.code}"
def scan_file(filepath: Path, repo_root: Path) -> list[Violation]:
"""Scan a single file for network dependency patterns."""
violations = []
rel_path = str(filepath.relative_to(repo_root))
# Skip known violations
if rel_path in KNOWN_VIOLATIONS:
return violations
try:
content = filepath.read_text(encoding="utf-8")
except (OSError, IOError) as e:
print(f"WARNING: Cannot read {rel_path}: {e}", file=sys.stderr)
return violations
lines = content.splitlines()
# --- Check imports ---
try:
tree = ast.parse(content, filename=str(filepath))
except SyntaxError as e:
print(f"WARNING: Cannot parse {rel_path}: {e}", file=sys.stderr)
return violations
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
mod = alias.name
if mod in NETWORK_IMPORTS or any(
mod.startswith(ni + ".") for ni in NETWORK_IMPORTS
):
violations.append(Violation(
rel_path, node.lineno,
f"Network-capable import: {mod}",
lines[node.lineno - 1] if node.lineno <= len(lines) else "",
))
elif isinstance(node, ast.ImportFrom):
if node.module and (
node.module in NETWORK_IMPORTS
or any(node.module.startswith(ni + ".") for ni in NETWORK_IMPORTS)
):
violations.append(Violation(
rel_path, node.lineno,
f"Network-capable import from: {node.module}",
lines[node.lineno - 1] if node.lineno <= len(lines) else "",
))
# --- Check for LLM call function usage ---
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
for func_name in NETWORK_FUNC_NAMES:
if func_name in line and not stripped.startswith("def ") and not stripped.startswith("class "):
# Check it's actually a call, not a definition or import
if re.search(r'\b' + func_name + r'\s*\(', line):
violations.append(Violation(
rel_path, i,
f"External LLM call function: {func_name}()",
line,
))
# --- Regex-based pattern matching ---
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("#"):
continue
for pattern, description in NETWORK_PATTERNS:
if re.search(pattern, line, re.IGNORECASE):
violations.append(Violation(
rel_path, i,
f"Suspicious pattern ({description})",
line,
))
return violations
def verify_sovereignty(repo_root: Path) -> tuple[list[Violation], list[str]]:
"""Run sovereignty verification across all memory files.
Returns (violations, info_messages).
"""
all_violations = []
info = []
for rel_path in MEMORY_FILES:
filepath = repo_root / rel_path
if not filepath.exists():
info.append(f"SKIP: {rel_path} (file not found)")
continue
if rel_path in KNOWN_VIOLATIONS:
info.append(
f"WARN: {rel_path} — known violation (excluded from gate): "
f"{KNOWN_VIOLATIONS[rel_path]}"
)
continue
violations = scan_file(filepath, repo_root)
all_violations.extend(violations)
if not violations:
info.append(f"PASS: {rel_path} — sovereign (local-only)")
return all_violations, info
# ---------------------------------------------------------------------------
# Deep analysis helpers
# ---------------------------------------------------------------------------
def check_graph_store_network(repo_root: Path) -> str:
"""Analyze graph_store.py for its network dependencies."""
filepath = repo_root / "tools" / "graph_store.py"
if not filepath.exists():
return ""
content = filepath.read_text(encoding="utf-8")
if "GiteaClient" in content:
return (
"tools/graph_store.py uses GiteaClient for persistence — "
"this is an external API call. However, graph_store is NOT part of "
"the core memory path (MEMORY.md/USER.md/SQLite). It is a separate "
"knowledge graph system."
)
return ""
def check_session_search_llm(repo_root: Path) -> str:
"""Analyze session_search_tool.py for LLM usage."""
filepath = repo_root / "tools" / "session_search_tool.py"
if not filepath.exists():
return ""
content = filepath.read_text(encoding="utf-8")
warnings = []
if "async_call_llm" in content:
warnings.append("uses async_call_llm for summarization")
if "auxiliary_client" in content:
warnings.append("imports auxiliary_client (LLM calls)")
if warnings:
return (
f"tools/session_search_tool.py: {'; '.join(warnings)}. "
f"The FTS5 search is local SQLite, but session summarization "
f"involves LLM API calls."
)
return ""
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
repo_root = Path(__file__).resolve().parent.parent
print(f"Memory Sovereignty Verification")
print(f"Repository: {repo_root}")
print(f"Scanning {len(MEMORY_FILES)} memory-path files...")
print()
violations, info = verify_sovereignty(repo_root)
# Print info messages
for msg in info:
print(f" {msg}")
# Print deep analysis
print()
print("Deep analysis:")
for checker in [check_graph_store_network, check_session_search_llm]:
note = checker(repo_root)
if note:
print(f" NOTE: {note}")
print()
if violations:
print(f"SOVEREIGNTY VIOLATIONS FOUND: {len(violations)}")
print("=" * 60)
for v in violations:
print(v)
print()
print("=" * 60)
print(
f"FAIL: {len(violations)} potential network dependencies detected "
f"in the memory path."
)
print("Memory must be local-only (filesystem + SQLite).")
print()
print("If a violation is intentional and documented, add it to")
print("KNOWN_VIOLATIONS in this script with a justification.")
return 1
else:
print("PASS: Memory path is sovereign — no network dependencies detected.")
print("All memory operations use local filesystem and/or SQLite only.")
return 0
if __name__ == "__main__":
sys.exit(main())