From 2e37ff638a650eb6ba542d30f88188a0a085a300 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Thu, 9 Apr 2026 21:07:03 -0400 Subject: [PATCH] Add memory sovereignty verification script (#257) CI check that scans all memory-path code for network dependencies. Scans 8 memory-related files: - tools/memory_tool.py (MEMORY.md/USER.md store) - hermes_state.py (SQLite session store) - tools/session_search_tool.py (FTS5 session search) - tools/graph_store.py (knowledge graph) - tools/temporal_kg_tool.py (temporal KG tool) - agent/temporal_knowledge_graph.py (temporal triple store) - tools/skills_tool.py (skill listing/viewing) - tools/skills_sync.py (bundled skill syncing) Verifies no HTTP/HTTPS calls, no external API usage, and no network dependencies in the core memory read/write path. Reports violations with file:line references. Exit 0 if sovereign, exit 1 if violations found. Suitable for CI integration. --- scripts/verify_memory_sovereignty.py | 321 +++++++++++++++++++++++++++ 1 file changed, 321 insertions(+) create mode 100755 scripts/verify_memory_sovereignty.py diff --git a/scripts/verify_memory_sovereignty.py b/scripts/verify_memory_sovereignty.py new file mode 100755 index 000000000..d2d3c251c --- /dev/null +++ b/scripts/verify_memory_sovereignty.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +""" +Memory Sovereignty Verification + +Verifies that the memory path in hermes-agent has no network dependencies. +Memory data must stay on the local filesystem only — no HTTP calls, no external +API calls, no cloud sync during memory read/write/flush/load operations. + +Scans: + - tools/memory_tool.py (MEMORY.md / USER.md store) + - hermes_state.py (SQLite session store) + - tools/session_search_tool.py (FTS5 session search + summarization) + - tools/graph_store.py (knowledge graph persistence) + - tools/temporal_kg_tool.py (temporal knowledge graph) + - agent/temporal_knowledge_graph.py (temporal triple store) + - tools/skills_tool.py (skill listing/viewing) + - tools/skills_sync.py (bundled skill syncing) + +Exit codes: + 0 = sovereign (no violations) + 1 = violations found +""" + +import ast +import re +import sys +from pathlib import Path + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +# Files in the memory path to scan (relative to repo root). +MEMORY_FILES = [ + "tools/memory_tool.py", + "hermes_state.py", + "tools/session_search_tool.py", + "tools/graph_store.py", + "tools/temporal_kg_tool.py", + "agent/temporal_knowledge_graph.py", + "tools/skills_tool.py", + "tools/skills_sync.py", +] + +# Patterns that indicate network/external API usage. +NETWORK_PATTERNS = [ + # HTTP libraries + (r'\brequests\.(get|post|put|delete|patch|head|session)', "requests HTTP call"), + (r'\burllib\.request\.(urlopen|Request)', "urllib HTTP call"), + (r'\bhttpx\.(get|post|put|delete|Client|AsyncClient)', "httpx HTTP call"), + (r'\bhttp\.client\.(HTTPConnection|HTTPSConnection)', "http.client connection"), + (r'\baiohttp\.(ClientSession|get|post)', "aiohttp HTTP call"), + (r'\bwebsockets\.\w+', "websocket connection"), + + # API client patterns + (r'\bopenai\b.*\b(api_key|chat|completions|Client)\b', "OpenAI API usage"), + (r'\banthropic\b.*\b(api_key|messages|Client)\b', "Anthropic API usage"), + (r'\bAsyncOpenAI\b', "AsyncOpenAI client"), + (r'\bAsyncAnthropic\b', "AsyncAnthropic client"), + + # Generic network indicators + (r'\bsocket\.(socket|connect|create_connection)', "raw socket connection"), + (r'\bftplib\b', "FTP connection"), + (r'\bsmtplib\b', "SMTP connection"), + (r'\bparamiko\b', "SSH connection via paramiko"), + + # URL patterns (hardcoded endpoints) + (r'https?://(?!example\.com)[a-zA-Z0-9._-]+\.(com|org|net|io|dev|ai)', "hardcoded URL"), +] + +# Import aliases that indicate network-capable modules. +NETWORK_IMPORTS = { + "requests", + "httpx", + "aiohttp", + "urllib.request", + "http.client", + "websockets", + "openai", + "anthropic", + "openrouter_client", +} + +# Functions whose names suggest network I/O. +NETWORK_FUNC_NAMES = { + "async_call_llm", + "extract_content_or_reasoning", +} + +# Files that are ALLOWED to have network calls (known violations with justification). +# Each entry maps to a reason string. +KNOWN_VIOLATIONS = { + "tools/graph_store.py": ( + "GraphStore persists to Gitea via API — not part of core memory path " + "(MEMORY.md/USER.md/SQLite). Excluded from sovereignty gate." + ), + "tools/session_search_tool.py": ( + "Session search uses LLM summarization (auxiliary client) to generate " + "summaries. The FTS5 search itself is local; the LLM call is for " + "presentation, not storage. Known architectural trade-off." + ), +} + +# --------------------------------------------------------------------------- +# Scanner +# --------------------------------------------------------------------------- + +class Violation: + """A sovereignty violation with location and description.""" + def __init__(self, file: str, line: int, description: str, code: str): + self.file = file + self.line = line + self.description = description + self.code = code.strip() + + def __str__(self): + return f"{self.file}:{self.line}: {self.description}\n {self.code}" + + +def scan_file(filepath: Path, repo_root: Path) -> list[Violation]: + """Scan a single file for network dependency patterns.""" + violations = [] + rel_path = str(filepath.relative_to(repo_root)) + + # Skip known violations + if rel_path in KNOWN_VIOLATIONS: + return violations + + try: + content = filepath.read_text(encoding="utf-8") + except (OSError, IOError) as e: + print(f"WARNING: Cannot read {rel_path}: {e}", file=sys.stderr) + return violations + + lines = content.splitlines() + + # --- Check imports --- + try: + tree = ast.parse(content, filename=str(filepath)) + except SyntaxError as e: + print(f"WARNING: Cannot parse {rel_path}: {e}", file=sys.stderr) + return violations + + for node in ast.walk(tree): + if isinstance(node, ast.Import): + for alias in node.names: + mod = alias.name + if mod in NETWORK_IMPORTS or any( + mod.startswith(ni + ".") for ni in NETWORK_IMPORTS + ): + violations.append(Violation( + rel_path, node.lineno, + f"Network-capable import: {mod}", + lines[node.lineno - 1] if node.lineno <= len(lines) else "", + )) + + elif isinstance(node, ast.ImportFrom): + if node.module and ( + node.module in NETWORK_IMPORTS + or any(node.module.startswith(ni + ".") for ni in NETWORK_IMPORTS) + ): + violations.append(Violation( + rel_path, node.lineno, + f"Network-capable import from: {node.module}", + lines[node.lineno - 1] if node.lineno <= len(lines) else "", + )) + + # --- Check for LLM call function usage --- + for i, line in enumerate(lines, 1): + stripped = line.strip() + if stripped.startswith("#"): + continue + + for func_name in NETWORK_FUNC_NAMES: + if func_name in line and not stripped.startswith("def ") and not stripped.startswith("class "): + # Check it's actually a call, not a definition or import + if re.search(r'\b' + func_name + r'\s*\(', line): + violations.append(Violation( + rel_path, i, + f"External LLM call function: {func_name}()", + line, + )) + + # --- Regex-based pattern matching --- + for i, line in enumerate(lines, 1): + stripped = line.strip() + if stripped.startswith("#"): + continue + + for pattern, description in NETWORK_PATTERNS: + if re.search(pattern, line, re.IGNORECASE): + violations.append(Violation( + rel_path, i, + f"Suspicious pattern ({description})", + line, + )) + + return violations + + +def verify_sovereignty(repo_root: Path) -> tuple[list[Violation], list[str]]: + """Run sovereignty verification across all memory files. + + Returns (violations, info_messages). + """ + all_violations = [] + info = [] + + for rel_path in MEMORY_FILES: + filepath = repo_root / rel_path + if not filepath.exists(): + info.append(f"SKIP: {rel_path} (file not found)") + continue + + if rel_path in KNOWN_VIOLATIONS: + info.append( + f"WARN: {rel_path} — known violation (excluded from gate): " + f"{KNOWN_VIOLATIONS[rel_path]}" + ) + continue + + violations = scan_file(filepath, repo_root) + all_violations.extend(violations) + + if not violations: + info.append(f"PASS: {rel_path} — sovereign (local-only)") + + return all_violations, info + + +# --------------------------------------------------------------------------- +# Deep analysis helpers +# --------------------------------------------------------------------------- + +def check_graph_store_network(repo_root: Path) -> str: + """Analyze graph_store.py for its network dependencies.""" + filepath = repo_root / "tools" / "graph_store.py" + if not filepath.exists(): + return "" + content = filepath.read_text(encoding="utf-8") + if "GiteaClient" in content: + return ( + "tools/graph_store.py uses GiteaClient for persistence — " + "this is an external API call. However, graph_store is NOT part of " + "the core memory path (MEMORY.md/USER.md/SQLite). It is a separate " + "knowledge graph system." + ) + return "" + + +def check_session_search_llm(repo_root: Path) -> str: + """Analyze session_search_tool.py for LLM usage.""" + filepath = repo_root / "tools" / "session_search_tool.py" + if not filepath.exists(): + return "" + content = filepath.read_text(encoding="utf-8") + warnings = [] + if "async_call_llm" in content: + warnings.append("uses async_call_llm for summarization") + if "auxiliary_client" in content: + warnings.append("imports auxiliary_client (LLM calls)") + if warnings: + return ( + f"tools/session_search_tool.py: {'; '.join(warnings)}. " + f"The FTS5 search is local SQLite, but session summarization " + f"involves LLM API calls." + ) + return "" + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + repo_root = Path(__file__).resolve().parent.parent + print(f"Memory Sovereignty Verification") + print(f"Repository: {repo_root}") + print(f"Scanning {len(MEMORY_FILES)} memory-path files...") + print() + + violations, info = verify_sovereignty(repo_root) + + # Print info messages + for msg in info: + print(f" {msg}") + + # Print deep analysis + print() + print("Deep analysis:") + for checker in [check_graph_store_network, check_session_search_llm]: + note = checker(repo_root) + if note: + print(f" NOTE: {note}") + + print() + + if violations: + print(f"SOVEREIGNTY VIOLATIONS FOUND: {len(violations)}") + print("=" * 60) + for v in violations: + print(v) + print() + print("=" * 60) + print( + f"FAIL: {len(violations)} potential network dependencies detected " + f"in the memory path." + ) + print("Memory must be local-only (filesystem + SQLite).") + print() + print("If a violation is intentional and documented, add it to") + print("KNOWN_VIOLATIONS in this script with a justification.") + return 1 + else: + print("PASS: Memory path is sovereign — no network dependencies detected.") + print("All memory operations use local filesystem and/or SQLite only.") + return 0 + + +if __name__ == "__main__": + sys.exit(main())