#!/usr/bin/env python3 """ Memory Sovereignty Verification Verifies that the memory path in hermes-agent has no network dependencies. Memory data must stay on the local filesystem only — no HTTP calls, no external API calls, no cloud sync during memory read/write/flush/load operations. Scans: - tools/memory_tool.py (MEMORY.md / USER.md store) - hermes_state.py (SQLite session store) - tools/session_search_tool.py (FTS5 session search + summarization) - tools/graph_store.py (knowledge graph persistence) - tools/temporal_kg_tool.py (temporal knowledge graph) - agent/temporal_knowledge_graph.py (temporal triple store) - tools/skills_tool.py (skill listing/viewing) - tools/skills_sync.py (bundled skill syncing) Exit codes: 0 = sovereign (no violations) 1 = violations found """ import ast import re import sys from pathlib import Path # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- # Files in the memory path to scan (relative to repo root). MEMORY_FILES = [ "tools/memory_tool.py", "hermes_state.py", "tools/session_search_tool.py", "tools/graph_store.py", "tools/temporal_kg_tool.py", "agent/temporal_knowledge_graph.py", "tools/skills_tool.py", "tools/skills_sync.py", ] # Patterns that indicate network/external API usage. NETWORK_PATTERNS = [ # HTTP libraries (r'\brequests\.(get|post|put|delete|patch|head|session)', "requests HTTP call"), (r'\burllib\.request\.(urlopen|Request)', "urllib HTTP call"), (r'\bhttpx\.(get|post|put|delete|Client|AsyncClient)', "httpx HTTP call"), (r'\bhttp\.client\.(HTTPConnection|HTTPSConnection)', "http.client connection"), (r'\baiohttp\.(ClientSession|get|post)', "aiohttp HTTP call"), (r'\bwebsockets\.\w+', "websocket connection"), # API client patterns (r'\bopenai\b.*\b(api_key|chat|completions|Client)\b', "OpenAI API usage"), (r'\banthropic\b.*\b(api_key|messages|Client)\b', "Anthropic API usage"), (r'\bAsyncOpenAI\b', "AsyncOpenAI client"), (r'\bAsyncAnthropic\b', "AsyncAnthropic client"), # Generic network indicators (r'\bsocket\.(socket|connect|create_connection)', "raw socket connection"), (r'\bftplib\b', "FTP connection"), (r'\bsmtplib\b', "SMTP connection"), (r'\bparamiko\b', "SSH connection via paramiko"), # URL patterns (hardcoded endpoints) (r'https?://(?!example\.com)[a-zA-Z0-9._-]+\.(com|org|net|io|dev|ai)', "hardcoded URL"), ] # Import aliases that indicate network-capable modules. NETWORK_IMPORTS = { "requests", "httpx", "aiohttp", "urllib.request", "http.client", "websockets", "openai", "anthropic", "openrouter_client", } # Functions whose names suggest network I/O. NETWORK_FUNC_NAMES = { "async_call_llm", "extract_content_or_reasoning", } # Files that are ALLOWED to have network calls (known violations with justification). # Each entry maps to a reason string. KNOWN_VIOLATIONS = { "tools/graph_store.py": ( "GraphStore persists to Gitea via API — not part of core memory path " "(MEMORY.md/USER.md/SQLite). Excluded from sovereignty gate." ), "tools/session_search_tool.py": ( "Session search uses LLM summarization (auxiliary client) to generate " "summaries. The FTS5 search itself is local; the LLM call is for " "presentation, not storage. Known architectural trade-off." ), } # --------------------------------------------------------------------------- # Scanner # --------------------------------------------------------------------------- class Violation: """A sovereignty violation with location and description.""" def __init__(self, file: str, line: int, description: str, code: str): self.file = file self.line = line self.description = description self.code = code.strip() def __str__(self): return f"{self.file}:{self.line}: {self.description}\n {self.code}" def scan_file(filepath: Path, repo_root: Path) -> list[Violation]: """Scan a single file for network dependency patterns.""" violations = [] rel_path = str(filepath.relative_to(repo_root)) # Skip known violations if rel_path in KNOWN_VIOLATIONS: return violations try: content = filepath.read_text(encoding="utf-8") except (OSError, IOError) as e: print(f"WARNING: Cannot read {rel_path}: {e}", file=sys.stderr) return violations lines = content.splitlines() # --- Check imports --- try: tree = ast.parse(content, filename=str(filepath)) except SyntaxError as e: print(f"WARNING: Cannot parse {rel_path}: {e}", file=sys.stderr) return violations for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: mod = alias.name if mod in NETWORK_IMPORTS or any( mod.startswith(ni + ".") for ni in NETWORK_IMPORTS ): violations.append(Violation( rel_path, node.lineno, f"Network-capable import: {mod}", lines[node.lineno - 1] if node.lineno <= len(lines) else "", )) elif isinstance(node, ast.ImportFrom): if node.module and ( node.module in NETWORK_IMPORTS or any(node.module.startswith(ni + ".") for ni in NETWORK_IMPORTS) ): violations.append(Violation( rel_path, node.lineno, f"Network-capable import from: {node.module}", lines[node.lineno - 1] if node.lineno <= len(lines) else "", )) # --- Check for LLM call function usage --- for i, line in enumerate(lines, 1): stripped = line.strip() if stripped.startswith("#"): continue for func_name in NETWORK_FUNC_NAMES: if func_name in line and not stripped.startswith("def ") and not stripped.startswith("class "): # Check it's actually a call, not a definition or import if re.search(r'\b' + func_name + r'\s*\(', line): violations.append(Violation( rel_path, i, f"External LLM call function: {func_name}()", line, )) # --- Regex-based pattern matching --- for i, line in enumerate(lines, 1): stripped = line.strip() if stripped.startswith("#"): continue for pattern, description in NETWORK_PATTERNS: if re.search(pattern, line, re.IGNORECASE): violations.append(Violation( rel_path, i, f"Suspicious pattern ({description})", line, )) return violations def verify_sovereignty(repo_root: Path) -> tuple[list[Violation], list[str]]: """Run sovereignty verification across all memory files. Returns (violations, info_messages). """ all_violations = [] info = [] for rel_path in MEMORY_FILES: filepath = repo_root / rel_path if not filepath.exists(): info.append(f"SKIP: {rel_path} (file not found)") continue if rel_path in KNOWN_VIOLATIONS: info.append( f"WARN: {rel_path} — known violation (excluded from gate): " f"{KNOWN_VIOLATIONS[rel_path]}" ) continue violations = scan_file(filepath, repo_root) all_violations.extend(violations) if not violations: info.append(f"PASS: {rel_path} — sovereign (local-only)") return all_violations, info # --------------------------------------------------------------------------- # Deep analysis helpers # --------------------------------------------------------------------------- def check_graph_store_network(repo_root: Path) -> str: """Analyze graph_store.py for its network dependencies.""" filepath = repo_root / "tools" / "graph_store.py" if not filepath.exists(): return "" content = filepath.read_text(encoding="utf-8") if "GiteaClient" in content: return ( "tools/graph_store.py uses GiteaClient for persistence — " "this is an external API call. However, graph_store is NOT part of " "the core memory path (MEMORY.md/USER.md/SQLite). It is a separate " "knowledge graph system." ) return "" def check_session_search_llm(repo_root: Path) -> str: """Analyze session_search_tool.py for LLM usage.""" filepath = repo_root / "tools" / "session_search_tool.py" if not filepath.exists(): return "" content = filepath.read_text(encoding="utf-8") warnings = [] if "async_call_llm" in content: warnings.append("uses async_call_llm for summarization") if "auxiliary_client" in content: warnings.append("imports auxiliary_client (LLM calls)") if warnings: return ( f"tools/session_search_tool.py: {'; '.join(warnings)}. " f"The FTS5 search is local SQLite, but session summarization " f"involves LLM API calls." ) return "" # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): repo_root = Path(__file__).resolve().parent.parent print(f"Memory Sovereignty Verification") print(f"Repository: {repo_root}") print(f"Scanning {len(MEMORY_FILES)} memory-path files...") print() violations, info = verify_sovereignty(repo_root) # Print info messages for msg in info: print(f" {msg}") # Print deep analysis print() print("Deep analysis:") for checker in [check_graph_store_network, check_session_search_llm]: note = checker(repo_root) if note: print(f" NOTE: {note}") print() if violations: print(f"SOVEREIGNTY VIOLATIONS FOUND: {len(violations)}") print("=" * 60) for v in violations: print(v) print() print("=" * 60) print( f"FAIL: {len(violations)} potential network dependencies detected " f"in the memory path." ) print("Memory must be local-only (filesystem + SQLite).") print() print("If a violation is intentional and documented, add it to") print("KNOWN_VIOLATIONS in this script with a justification.") return 1 else: print("PASS: Memory path is sovereign — no network dependencies detected.") print("All memory operations use local filesystem and/or SQLite only.") return 0 if __name__ == "__main__": sys.exit(main())