Memory P5 — Sovereignty verification — no network in memory path #265
321
scripts/verify_memory_sovereignty.py
Executable file
321
scripts/verify_memory_sovereignty.py
Executable file
@@ -0,0 +1,321 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Memory Sovereignty Verification
|
||||
|
||||
Verifies that the memory path in hermes-agent has no network dependencies.
|
||||
Memory data must stay on the local filesystem only — no HTTP calls, no external
|
||||
API calls, no cloud sync during memory read/write/flush/load operations.
|
||||
|
||||
Scans:
|
||||
- tools/memory_tool.py (MEMORY.md / USER.md store)
|
||||
- hermes_state.py (SQLite session store)
|
||||
- tools/session_search_tool.py (FTS5 session search + summarization)
|
||||
- tools/graph_store.py (knowledge graph persistence)
|
||||
- tools/temporal_kg_tool.py (temporal knowledge graph)
|
||||
- agent/temporal_knowledge_graph.py (temporal triple store)
|
||||
- tools/skills_tool.py (skill listing/viewing)
|
||||
- tools/skills_sync.py (bundled skill syncing)
|
||||
|
||||
Exit codes:
|
||||
0 = sovereign (no violations)
|
||||
1 = violations found
|
||||
"""
|
||||
|
||||
import ast
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Files in the memory path to scan (relative to repo root).
|
||||
MEMORY_FILES = [
|
||||
"tools/memory_tool.py",
|
||||
"hermes_state.py",
|
||||
"tools/session_search_tool.py",
|
||||
"tools/graph_store.py",
|
||||
"tools/temporal_kg_tool.py",
|
||||
"agent/temporal_knowledge_graph.py",
|
||||
"tools/skills_tool.py",
|
||||
"tools/skills_sync.py",
|
||||
]
|
||||
|
||||
# Patterns that indicate network/external API usage.
|
||||
NETWORK_PATTERNS = [
|
||||
# HTTP libraries
|
||||
(r'\brequests\.(get|post|put|delete|patch|head|session)', "requests HTTP call"),
|
||||
(r'\burllib\.request\.(urlopen|Request)', "urllib HTTP call"),
|
||||
(r'\bhttpx\.(get|post|put|delete|Client|AsyncClient)', "httpx HTTP call"),
|
||||
(r'\bhttp\.client\.(HTTPConnection|HTTPSConnection)', "http.client connection"),
|
||||
(r'\baiohttp\.(ClientSession|get|post)', "aiohttp HTTP call"),
|
||||
(r'\bwebsockets\.\w+', "websocket connection"),
|
||||
|
||||
# API client patterns
|
||||
(r'\bopenai\b.*\b(api_key|chat|completions|Client)\b', "OpenAI API usage"),
|
||||
(r'\banthropic\b.*\b(api_key|messages|Client)\b', "Anthropic API usage"),
|
||||
(r'\bAsyncOpenAI\b', "AsyncOpenAI client"),
|
||||
(r'\bAsyncAnthropic\b', "AsyncAnthropic client"),
|
||||
|
||||
# Generic network indicators
|
||||
(r'\bsocket\.(socket|connect|create_connection)', "raw socket connection"),
|
||||
(r'\bftplib\b', "FTP connection"),
|
||||
(r'\bsmtplib\b', "SMTP connection"),
|
||||
(r'\bparamiko\b', "SSH connection via paramiko"),
|
||||
|
||||
# URL patterns (hardcoded endpoints)
|
||||
(r'https?://(?!example\.com)[a-zA-Z0-9._-]+\.(com|org|net|io|dev|ai)', "hardcoded URL"),
|
||||
]
|
||||
|
||||
# Import aliases that indicate network-capable modules.
|
||||
NETWORK_IMPORTS = {
|
||||
"requests",
|
||||
"httpx",
|
||||
"aiohttp",
|
||||
"urllib.request",
|
||||
"http.client",
|
||||
"websockets",
|
||||
"openai",
|
||||
"anthropic",
|
||||
"openrouter_client",
|
||||
}
|
||||
|
||||
# Functions whose names suggest network I/O.
|
||||
NETWORK_FUNC_NAMES = {
|
||||
"async_call_llm",
|
||||
"extract_content_or_reasoning",
|
||||
}
|
||||
|
||||
# Files that are ALLOWED to have network calls (known violations with justification).
|
||||
# Each entry maps to a reason string.
|
||||
KNOWN_VIOLATIONS = {
|
||||
"tools/graph_store.py": (
|
||||
"GraphStore persists to Gitea via API — not part of core memory path "
|
||||
"(MEMORY.md/USER.md/SQLite). Excluded from sovereignty gate."
|
||||
),
|
||||
"tools/session_search_tool.py": (
|
||||
"Session search uses LLM summarization (auxiliary client) to generate "
|
||||
"summaries. The FTS5 search itself is local; the LLM call is for "
|
||||
"presentation, not storage. Known architectural trade-off."
|
||||
),
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scanner
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class Violation:
|
||||
"""A sovereignty violation with location and description."""
|
||||
def __init__(self, file: str, line: int, description: str, code: str):
|
||||
self.file = file
|
||||
self.line = line
|
||||
self.description = description
|
||||
self.code = code.strip()
|
||||
|
||||
def __str__(self):
|
||||
return f"{self.file}:{self.line}: {self.description}\n {self.code}"
|
||||
|
||||
|
||||
def scan_file(filepath: Path, repo_root: Path) -> list[Violation]:
|
||||
"""Scan a single file for network dependency patterns."""
|
||||
violations = []
|
||||
rel_path = str(filepath.relative_to(repo_root))
|
||||
|
||||
# Skip known violations
|
||||
if rel_path in KNOWN_VIOLATIONS:
|
||||
return violations
|
||||
|
||||
try:
|
||||
content = filepath.read_text(encoding="utf-8")
|
||||
except (OSError, IOError) as e:
|
||||
print(f"WARNING: Cannot read {rel_path}: {e}", file=sys.stderr)
|
||||
return violations
|
||||
|
||||
lines = content.splitlines()
|
||||
|
||||
# --- Check imports ---
|
||||
try:
|
||||
tree = ast.parse(content, filename=str(filepath))
|
||||
except SyntaxError as e:
|
||||
print(f"WARNING: Cannot parse {rel_path}: {e}", file=sys.stderr)
|
||||
return violations
|
||||
|
||||
for node in ast.walk(tree):
|
||||
if isinstance(node, ast.Import):
|
||||
for alias in node.names:
|
||||
mod = alias.name
|
||||
if mod in NETWORK_IMPORTS or any(
|
||||
mod.startswith(ni + ".") for ni in NETWORK_IMPORTS
|
||||
):
|
||||
violations.append(Violation(
|
||||
rel_path, node.lineno,
|
||||
f"Network-capable import: {mod}",
|
||||
lines[node.lineno - 1] if node.lineno <= len(lines) else "",
|
||||
))
|
||||
|
||||
elif isinstance(node, ast.ImportFrom):
|
||||
if node.module and (
|
||||
node.module in NETWORK_IMPORTS
|
||||
or any(node.module.startswith(ni + ".") for ni in NETWORK_IMPORTS)
|
||||
):
|
||||
violations.append(Violation(
|
||||
rel_path, node.lineno,
|
||||
f"Network-capable import from: {node.module}",
|
||||
lines[node.lineno - 1] if node.lineno <= len(lines) else "",
|
||||
))
|
||||
|
||||
# --- Check for LLM call function usage ---
|
||||
for i, line in enumerate(lines, 1):
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
continue
|
||||
|
||||
for func_name in NETWORK_FUNC_NAMES:
|
||||
if func_name in line and not stripped.startswith("def ") and not stripped.startswith("class "):
|
||||
# Check it's actually a call, not a definition or import
|
||||
if re.search(r'\b' + func_name + r'\s*\(', line):
|
||||
violations.append(Violation(
|
||||
rel_path, i,
|
||||
f"External LLM call function: {func_name}()",
|
||||
line,
|
||||
))
|
||||
|
||||
# --- Regex-based pattern matching ---
|
||||
for i, line in enumerate(lines, 1):
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("#"):
|
||||
continue
|
||||
|
||||
for pattern, description in NETWORK_PATTERNS:
|
||||
if re.search(pattern, line, re.IGNORECASE):
|
||||
violations.append(Violation(
|
||||
rel_path, i,
|
||||
f"Suspicious pattern ({description})",
|
||||
line,
|
||||
))
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def verify_sovereignty(repo_root: Path) -> tuple[list[Violation], list[str]]:
|
||||
"""Run sovereignty verification across all memory files.
|
||||
|
||||
Returns (violations, info_messages).
|
||||
"""
|
||||
all_violations = []
|
||||
info = []
|
||||
|
||||
for rel_path in MEMORY_FILES:
|
||||
filepath = repo_root / rel_path
|
||||
if not filepath.exists():
|
||||
info.append(f"SKIP: {rel_path} (file not found)")
|
||||
continue
|
||||
|
||||
if rel_path in KNOWN_VIOLATIONS:
|
||||
info.append(
|
||||
f"WARN: {rel_path} — known violation (excluded from gate): "
|
||||
f"{KNOWN_VIOLATIONS[rel_path]}"
|
||||
)
|
||||
continue
|
||||
|
||||
violations = scan_file(filepath, repo_root)
|
||||
all_violations.extend(violations)
|
||||
|
||||
if not violations:
|
||||
info.append(f"PASS: {rel_path} — sovereign (local-only)")
|
||||
|
||||
return all_violations, info
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Deep analysis helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_graph_store_network(repo_root: Path) -> str:
|
||||
"""Analyze graph_store.py for its network dependencies."""
|
||||
filepath = repo_root / "tools" / "graph_store.py"
|
||||
if not filepath.exists():
|
||||
return ""
|
||||
content = filepath.read_text(encoding="utf-8")
|
||||
if "GiteaClient" in content:
|
||||
return (
|
||||
"tools/graph_store.py uses GiteaClient for persistence — "
|
||||
"this is an external API call. However, graph_store is NOT part of "
|
||||
"the core memory path (MEMORY.md/USER.md/SQLite). It is a separate "
|
||||
"knowledge graph system."
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
def check_session_search_llm(repo_root: Path) -> str:
|
||||
"""Analyze session_search_tool.py for LLM usage."""
|
||||
filepath = repo_root / "tools" / "session_search_tool.py"
|
||||
if not filepath.exists():
|
||||
return ""
|
||||
content = filepath.read_text(encoding="utf-8")
|
||||
warnings = []
|
||||
if "async_call_llm" in content:
|
||||
warnings.append("uses async_call_llm for summarization")
|
||||
if "auxiliary_client" in content:
|
||||
warnings.append("imports auxiliary_client (LLM calls)")
|
||||
if warnings:
|
||||
return (
|
||||
f"tools/session_search_tool.py: {'; '.join(warnings)}. "
|
||||
f"The FTS5 search is local SQLite, but session summarization "
|
||||
f"involves LLM API calls."
|
||||
)
|
||||
return ""
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
repo_root = Path(__file__).resolve().parent.parent
|
||||
print(f"Memory Sovereignty Verification")
|
||||
print(f"Repository: {repo_root}")
|
||||
print(f"Scanning {len(MEMORY_FILES)} memory-path files...")
|
||||
print()
|
||||
|
||||
violations, info = verify_sovereignty(repo_root)
|
||||
|
||||
# Print info messages
|
||||
for msg in info:
|
||||
print(f" {msg}")
|
||||
|
||||
# Print deep analysis
|
||||
print()
|
||||
print("Deep analysis:")
|
||||
for checker in [check_graph_store_network, check_session_search_llm]:
|
||||
note = checker(repo_root)
|
||||
if note:
|
||||
print(f" NOTE: {note}")
|
||||
|
||||
print()
|
||||
|
||||
if violations:
|
||||
print(f"SOVEREIGNTY VIOLATIONS FOUND: {len(violations)}")
|
||||
print("=" * 60)
|
||||
for v in violations:
|
||||
print(v)
|
||||
print()
|
||||
print("=" * 60)
|
||||
print(
|
||||
f"FAIL: {len(violations)} potential network dependencies detected "
|
||||
f"in the memory path."
|
||||
)
|
||||
print("Memory must be local-only (filesystem + SQLite).")
|
||||
print()
|
||||
print("If a violation is intentional and documented, add it to")
|
||||
print("KNOWN_VIOLATIONS in this script with a justification.")
|
||||
return 1
|
||||
else:
|
||||
print("PASS: Memory path is sovereign — no network dependencies detected.")
|
||||
print("All memory operations use local filesystem and/or SQLite only.")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user