diff --git a/.gitea/workflows/architecture-lint.yml b/.gitea/workflows/architecture-lint.yml new file mode 100644 index 00000000..e7fccd6c --- /dev/null +++ b/.gitea/workflows/architecture-lint.yml @@ -0,0 +1,41 @@ +# architecture-lint.yml — CI gate for the Architecture Linter v2 +# Refs: #437 — repo-aware, test-backed, CI-enforced. +# +# Runs on every PR to main. Validates Python syntax, then runs +# linter tests and finally lints the repo itself. + +name: Architecture Lint + +on: + pull_request: + branches: [main, master] + push: + branches: [main] + +jobs: + linter-tests: + name: Linter Tests + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Install test deps + run: pip install pytest + - name: Compile-check linter + run: python3 -m py_compile scripts/architecture_linter_v2.py + - name: Run linter tests + run: python3 -m pytest tests/test_linter.py -v + + lint-repo: + name: Lint Repository + runs-on: ubuntu-latest + needs: linter-tests + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.11" + - name: Run architecture linter + run: python3 scripts/architecture_linter_v2.py . diff --git a/bin/deadman-fallback.py b/bin/deadman-fallback.py new file mode 100644 index 00000000..c094361c --- /dev/null +++ b/bin/deadman-fallback.py @@ -0,0 +1,264 @@ + 1|#!/usr/bin/env python3 + 2|""" + 3|Dead Man Switch Fallback Engine + 4| + 5|When the dead man switch triggers (zero commits for 2+ hours, model down, + 6|Gitea unreachable, etc.), this script diagnoses the failure and applies + 7|common sense fallbacks automatically. + 8| + 9|Fallback chain: + 10|1. Primary model (Anthropic) down -> switch config to local-llama.cpp + 11|2. Gitea unreachable -> cache issues locally, retry on recovery + 12|3. VPS agents down -> alert + lazarus protocol + 13|4. Local llama.cpp down -> try Ollama, then alert-only mode + 14|5. All inference dead -> safe mode (cron pauses, alert Alexander) + 15| + 16|Each fallback is reversible. Recovery auto-restores the previous config. + 17|""" + 18|import os + 19|import sys + 20|import json + 21|import subprocess + 22|import time + 23|import yaml + 24|import shutil + 25|from pathlib import Path + 26|from datetime import datetime, timedelta + 27| + 28|HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes"))) + 29|CONFIG_PATH = HERMES_HOME / "config.yaml" + 30|FALLBACK_STATE = HERMES_HOME / "deadman-fallback-state.json" + 31|BACKUP_CONFIG = HERMES_HOME / "config.yaml.pre-fallback" + 32|FORGE_URL = "https://forge.alexanderwhitestone.com" + 33| + 34|def load_config(): + 35| with open(CONFIG_PATH) as f: + 36| return yaml.safe_load(f) + 37| + 38|def save_config(cfg): + 39| with open(CONFIG_PATH, "w") as f: + 40| yaml.dump(cfg, f, default_flow_style=False) + 41| + 42|def load_state(): + 43| if FALLBACK_STATE.exists(): + 44| with open(FALLBACK_STATE) as f: + 45| return json.load(f) + 46| return {"active_fallbacks": [], "last_check": None, "recovery_pending": False} + 47| + 48|def save_state(state): + 49| state["last_check"] = datetime.now().isoformat() + 50| with open(FALLBACK_STATE, "w") as f: + 51| json.dump(state, f, indent=2) + 52| + 53|def run(cmd, timeout=10): + 54| try: + 55| r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout) + 56| return r.returncode, r.stdout.strip(), r.stderr.strip() + 57| except subprocess.TimeoutExpired: + 58| return -1, "", "timeout" + 59| except Exception as e: + 60| return -1, "", str(e) + 61| + 62|# ─── HEALTH CHECKS ─── + 63| + 64|def check_anthropic(): + 65| """Can we reach Anthropic API?""" + 66| key = os.environ.get("ANTHROPIC_API_KEY", "") + 67| if not key: + 68| # Check multiple .env locations + 69| for env_path in [HERMES_HOME / ".env", Path.home() / ".hermes" / ".env"]: + 70| if env_path.exists(): + 71| for line in open(env_path): + 72| line = line.strip() + 73| if line.startswith("ANTHROPIC_API_KEY=*** + 74| key = line.split("=", 1)[1].strip().strip('"').strip("'") + 75| break + 76| if key: + 77| break + 78| if not key: + 79| return False, "no API key" + 80| code, out, err = run( + 81| f'curl -s -o /dev/null -w "%{{http_code}}" -H "x-api-key: {key}" ' + 82| f'-H "anthropic-version: 2023-06-01" ' + 83| f'https://api.anthropic.com/v1/messages -X POST ' + 84| f'-H "content-type: application/json" ' + 85| f'-d \'{{"model":"claude-haiku-4-5-20251001","max_tokens":1,"messages":[{{"role":"user","content":"ping"}}]}}\' ', + 86| timeout=15 + 87| ) + 88| if code == 0 and out in ("200", "429"): + 89| return True, f"HTTP {out}" + 90| return False, f"HTTP {out} err={err[:80]}" + 91| + 92|def check_local_llama(): + 93| """Is local llama.cpp serving?""" + 94| code, out, err = run("curl -s http://localhost:8081/v1/models", timeout=5) + 95| if code == 0 and "hermes" in out.lower(): + 96| return True, "serving" + 97| return False, f"exit={code}" + 98| + 99|def check_ollama(): + 100| """Is Ollama running?""" + 101| code, out, err = run("curl -s http://localhost:11434/api/tags", timeout=5) + 102| if code == 0 and "models" in out: + 103| return True, "running" + 104| return False, f"exit={code}" + 105| + 106|def check_gitea(): + 107| """Can we reach the Forge?""" + 108| token_path = Path.home() / ".config" / "gitea" / "timmy-token" + 109| if not token_path.exists(): + 110| return False, "no token" + 111| token = token_path.read_text().strip() + 112| code, out, err = run( + 113| f'curl -s -o /dev/null -w "%{{http_code}}" -H "Authorization: token {token}" ' + 114| f'"{FORGE_URL}/api/v1/user"', + 115| timeout=10 + 116| ) + 117| if code == 0 and out == "200": + 118| return True, "reachable" + 119| return False, f"HTTP {out}" + 120| + 121|def check_vps(ip, name): + 122| """Can we SSH into a VPS?""" + 123| code, out, err = run(f"ssh -o ConnectTimeout=5 root@{ip} 'echo alive'", timeout=10) + 124| if code == 0 and "alive" in out: + 125| return True, "alive" + 126| return False, f"unreachable" + 127| + 128|# ─── FALLBACK ACTIONS ─── + 129| + 130|def fallback_to_local_model(cfg): + 131| """Switch primary model from Anthropic to local llama.cpp""" + 132| if not BACKUP_CONFIG.exists(): + 133| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG) + 134| + 135| cfg["model"]["provider"] = "local-llama.cpp" + 136| cfg["model"]["default"] = "hermes3" + 137| save_config(cfg) + 138| return "Switched primary model to local-llama.cpp/hermes3" + 139| + 140|def fallback_to_ollama(cfg): + 141| """Switch to Ollama if llama.cpp is also down""" + 142| if not BACKUP_CONFIG.exists(): + 143| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG) + 144| + 145| cfg["model"]["provider"] = "ollama" + 146| cfg["model"]["default"] = "gemma4:latest" + 147| save_config(cfg) + 148| return "Switched primary model to ollama/gemma4:latest" + 149| + 150|def enter_safe_mode(state): + 151| """Pause all non-essential cron jobs, alert Alexander""" + 152| state["safe_mode"] = True + 153| state["safe_mode_entered"] = datetime.now().isoformat() + 154| save_state(state) + 155| return "SAFE MODE: All inference down. Cron jobs should be paused. Alert Alexander." + 156| + 157|def restore_config(): + 158| """Restore pre-fallback config when primary recovers""" + 159| if BACKUP_CONFIG.exists(): + 160| shutil.copy2(BACKUP_CONFIG, CONFIG_PATH) + 161| BACKUP_CONFIG.unlink() + 162| return "Restored original config from backup" + 163| return "No backup config to restore" + 164| + 165|# ─── MAIN DIAGNOSIS AND FALLBACK ENGINE ─── + 166| + 167|def diagnose_and_fallback(): + 168| state = load_state() + 169| cfg = load_config() + 170| + 171| results = { + 172| "timestamp": datetime.now().isoformat(), + 173| "checks": {}, + 174| "actions": [], + 175| "status": "healthy" + 176| } + 177| + 178| # Check all systems + 179| anthropic_ok, anthropic_msg = check_anthropic() + 180| results["checks"]["anthropic"] = {"ok": anthropic_ok, "msg": anthropic_msg} + 181| + 182| llama_ok, llama_msg = check_local_llama() + 183| results["checks"]["local_llama"] = {"ok": llama_ok, "msg": llama_msg} + 184| + 185| ollama_ok, ollama_msg = check_ollama() + 186| results["checks"]["ollama"] = {"ok": ollama_ok, "msg": ollama_msg} + 187| + 188| gitea_ok, gitea_msg = check_gitea() + 189| results["checks"]["gitea"] = {"ok": gitea_ok, "msg": gitea_msg} + 190| + 191| # VPS checks + 192| vpses = [ + 193| ("167.99.126.228", "Allegro"), + 194| ("143.198.27.163", "Ezra"), + 195| ("159.203.146.185", "Bezalel"), + 196| ] + 197| for ip, name in vpses: + 198| vps_ok, vps_msg = check_vps(ip, name) + 199| results["checks"][f"vps_{name.lower()}"] = {"ok": vps_ok, "msg": vps_msg} + 200| + 201| current_provider = cfg.get("model", {}).get("provider", "anthropic") + 202| + 203| # ─── FALLBACK LOGIC ─── + 204| + 205| # Case 1: Primary (Anthropic) down, local available + 206| if not anthropic_ok and current_provider == "anthropic": + 207| if llama_ok: + 208| msg = fallback_to_local_model(cfg) + 209| results["actions"].append(msg) + 210| state["active_fallbacks"].append("anthropic->local-llama") + 211| results["status"] = "degraded_local" + 212| elif ollama_ok: + 213| msg = fallback_to_ollama(cfg) + 214| results["actions"].append(msg) + 215| state["active_fallbacks"].append("anthropic->ollama") + 216| results["status"] = "degraded_ollama" + 217| else: + 218| msg = enter_safe_mode(state) + 219| results["actions"].append(msg) + 220| results["status"] = "safe_mode" + 221| + 222| # Case 2: Already on fallback, check if primary recovered + 223| elif anthropic_ok and "anthropic->local-llama" in state.get("active_fallbacks", []): + 224| msg = restore_config() + 225| results["actions"].append(msg) + 226| state["active_fallbacks"].remove("anthropic->local-llama") + 227| results["status"] = "recovered" + 228| elif anthropic_ok and "anthropic->ollama" in state.get("active_fallbacks", []): + 229| msg = restore_config() + 230| results["actions"].append(msg) + 231| state["active_fallbacks"].remove("anthropic->ollama") + 232| results["status"] = "recovered" + 233| + 234| # Case 3: Gitea down — just flag it, work locally + 235| if not gitea_ok: + 236| results["actions"].append("WARN: Gitea unreachable — work cached locally until recovery") + 237| if "gitea_down" not in state.get("active_fallbacks", []): + 238| state["active_fallbacks"].append("gitea_down") + 239| results["status"] = max(results["status"], "degraded_gitea", key=lambda x: ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"].index(x) if x in ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"] else 0) + 240| elif "gitea_down" in state.get("active_fallbacks", []): + 241| state["active_fallbacks"].remove("gitea_down") + 242| results["actions"].append("Gitea recovered — resume normal operations") + 243| + 244| # Case 4: VPS agents down + 245| for ip, name in vpses: + 246| key = f"vps_{name.lower()}" + 247| if not results["checks"][key]["ok"]: + 248| results["actions"].append(f"ALERT: {name} VPS ({ip}) unreachable — lazarus protocol needed") + 249| + 250| save_state(state) + 251| return results + 252| + 253|if __name__ == "__main__": + 254| results = diagnose_and_fallback() + 255| print(json.dumps(results, indent=2)) + 256| + 257| # Exit codes for cron integration + 258| if results["status"] == "safe_mode": + 259| sys.exit(2) + 260| elif results["status"].startswith("degraded"): + 261| sys.exit(1) + 262| else: + 263| sys.exit(0) + 264| \ No newline at end of file diff --git a/scripts/architecture_linter.py b/scripts/architecture_linter.py index fa426442..a3adb307 100644 --- a/scripts/architecture_linter.py +++ b/scripts/architecture_linter.py @@ -9,7 +9,7 @@ import re SOVEREIGN_RULES = [ (r"https?://(api\.openai\.com|api\.anthropic\.com)", "CRITICAL: External cloud API detected. Use local custom_provider instead."), (r"provider: (openai|anthropic)", "WARNING: Direct cloud provider used. Ensure fallback_model is configured."), - (r"api_key: ['"][^'"\s]{10,}['"]", "SECURITY: Hardcoded API key detected. Use environment variables.") + (r"api_key:\s*['\"][A-Za-z0-9_\-]{16,}['\"]", "SECURITY: Hardcoded API key detected. Use environment variables.") ] def lint_file(path): diff --git a/scripts/architecture_linter_v2.py b/scripts/architecture_linter_v2.py index 60bcd99a..154dd9ee 100644 --- a/scripts/architecture_linter_v2.py +++ b/scripts/architecture_linter_v2.py @@ -5,122 +5,233 @@ Part of the Gemini Sovereign Governance System. Enforces architectural boundaries, security, and documentation standards across the Timmy Foundation fleet. + +Refs: #437 — repo-aware, test-backed, CI-enforced. """ +import argparse import os import re import sys -import argparse from pathlib import Path # --- CONFIGURATION --- + SOVEREIGN_KEYWORDS = ["mempalace", "sovereign_store", "tirith", "bezalel", "nexus"] -IP_REGEX = r'\b(?:\d{1,3}\.){3}\d{1,3}\b' -API_KEY_REGEX = r'(?:api_key|secret|token|password|auth_token)\s*[:=]\s*["\'][a-zA-Z0-9_\-]{20,}["\']' + +# IP addresses (skip 127.0.0.1, 0.0.0.0, 10.x.x.x, 172.16-31.x.x, 192.168.x.x) +IP_REGEX = r'\b(?!(?:127|10|192\.168|172\.(?:1[6-9]|2\d|3[01]))\.)' \ + r'(?:\d{1,3}\.){3}\d{1,3}\b' + +# API key / secret patterns — catches openai-, sk-, anthropic-, AKIA, etc. +API_KEY_PATTERNS = [ + r'sk-[A-Za-z0-9]{20,}', # OpenAI-style + r'sk-ant-[A-Za-z0-9\-]{20,}', # Anthropic + r'AKIA[A-Z0-9]{16}', # AWS access key + r'ghp_[A-Za-z0-9]{36}', # GitHub PAT + r'glpat-[A-Za-z0-9\-]{20,}', # GitLab PAT + r'(?:api[_-]?key|secret|token)\s*[:=]\s*["\'][A-Za-z0-9_\-]{16,}["\']', +] + +# Sovereignty rules (carried from v1) +SOVEREIGN_RULES = [ + (r'https?://api\.openai\.com', 'External cloud API: api.openai.com. Use local custom_provider.'), + (r'https?://api\.anthropic\.com', 'External cloud API: api.anthropic.com. Use local custom_provider.'), + (r'provider:\s*(?:openai|anthropic)\b', 'Direct cloud provider. Ensure fallback_model is configured.'), +] + +# File extensions to scan +SCAN_EXTENSIONS = {'.py', '.ts', '.tsx', '.js', '.yaml', '.yml', '.json', '.env', '.sh', '.cfg', '.toml'} +SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', '.eggs'} + + +class LinterResult: + """Structured result container for programmatic access.""" + + def __init__(self, repo_path: str, repo_name: str): + self.repo_path = repo_path + self.repo_name = repo_name + self.errors: list[str] = [] + self.warnings: list[str] = [] + + @property + def passed(self) -> bool: + return len(self.errors) == 0 + + @property + def violation_count(self) -> int: + return len(self.errors) + + def summary(self) -> str: + lines = [f"--- Architecture Linter v2: {self.repo_name} ---"] + for w in self.warnings: + lines.append(f" [W] {w}") + for e in self.errors: + lines.append(f" [E] {e}") + status = "PASSED" if self.passed else f"FAILED ({self.violation_count} violations)" + lines.append(f"\nResult: {status}") + return '\n'.join(lines) + class Linter: def __init__(self, repo_path: str): self.repo_path = Path(repo_path).resolve() + if not self.repo_path.is_dir(): + raise FileNotFoundError(f"Repository path does not exist: {self.repo_path}") self.repo_name = self.repo_path.name - self.errors = [] + self.result = LinterResult(str(self.repo_path), self.repo_name) - def log_error(self, message: str, file: str = None, line: int = None): - loc = f"{file}:{line}" if file and line else (file if file else "General") - self.errors.append(f"[{loc}] {message}") + # --- helpers --- + + def _scan_files(self, extensions=None): + """Yield (Path, content) for files matching *extensions*.""" + exts = extensions or SCAN_EXTENSIONS + for root, dirs, files in os.walk(self.repo_path): + dirs[:] = [d for d in dirs if d not in SKIP_DIRS] + for fname in files: + if Path(fname).suffix in exts: + if fname == '.env.example': + continue + fpath = Path(root) / fname + try: + content = fpath.read_text(errors='ignore') + except Exception: + continue + yield fpath, content + + def _line_no(self, content: str, offset: int) -> int: + return content.count('\n', 0, offset) + 1 + + # --- checks --- def check_sidecar_boundary(self): - """Rule 1: No sovereign code in hermes-agent (sidecar boundary)""" - if self.repo_name == "hermes-agent": - for root, _, files in os.walk(self.repo_path): - if "node_modules" in root or ".git" in root: - continue - for file in files: - if file.endswith((".py", ".ts", ".js", ".tsx")): - path = Path(root) / file - content = path.read_text(errors="ignore") - for kw in SOVEREIGN_KEYWORDS: - if kw in content.lower(): - # Exception: imports or comments might be okay, but we're strict for now - self.log_error(f"Sovereign keyword '{kw}' found in hermes-agent. Violates sidecar boundary.", str(path.relative_to(self.repo_path))) + """No sovereign code in hermes-agent (sidecar boundary).""" + if self.repo_name != 'hermes-agent': + return + for fpath, content in self._scan_files(): + for kw in SOVEREIGN_KEYWORDS: + if kw in content.lower(): + rel = str(fpath.relative_to(self.repo_path)) + self.result.errors.append( + f"Sovereign keyword '{kw}' in hermes-agent violates sidecar boundary. [{rel}]" + ) def check_hardcoded_ips(self): - """Rule 2: No hardcoded IPs (use domain names)""" - for root, _, files in os.walk(self.repo_path): - if "node_modules" in root or ".git" in root: - continue - for file in files: - if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json")): - path = Path(root) / file - content = path.read_text(errors="ignore") - matches = re.finditer(IP_REGEX, content) - for match in matches: - ip = match.group() - if ip in ["127.0.0.1", "0.0.0.0"]: - continue - line_no = content.count('\n', 0, match.start()) + 1 - self.log_error(f"Hardcoded IP address '{ip}' found. Use domain names or environment variables.", str(path.relative_to(self.repo_path)), line_no) + """No hardcoded public IPs — use DNS or env vars.""" + for fpath, content in self._scan_files(): + for m in re.finditer(IP_REGEX, content): + ip = m.group() + # skip private ranges already handled by lookahead, and 0.0.0.0 + if ip.startswith('0.'): + continue + line = self._line_no(content, m.start()) + rel = str(fpath.relative_to(self.repo_path)) + self.result.errors.append( + f"Hardcoded IP '{ip}'. Use DNS or env vars. [{rel}:{line}]" + ) def check_api_keys(self): - """Rule 3: No cloud API keys committed to repos""" - for root, _, files in os.walk(self.repo_path): - if "node_modules" in root or ".git" in root: - continue - for file in files: - if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json", ".env")): - if file == ".env.example": - continue - path = Path(root) / file - content = path.read_text(errors="ignore") - matches = re.finditer(API_KEY_REGEX, content, re.IGNORECASE) - for match in matches: - line_no = content.count('\n', 0, match.start()) + 1 - self.log_error("Potential API key or secret found in code.", str(path.relative_to(self.repo_path)), line_no) + """No cloud API keys / secrets committed.""" + for fpath, content in self._scan_files(): + for pattern in API_KEY_PATTERNS: + for m in re.finditer(pattern, content, re.IGNORECASE): + line = self._line_no(content, m.start()) + rel = str(fpath.relative_to(self.repo_path)) + self.result.errors.append( + f"Potential secret / API key detected. [{rel}:{line}]" + ) + + def check_sovereignty_rules(self): + """V1 sovereignty rules: no direct cloud API endpoints or providers.""" + for fpath, content in self._scan_files({'.py', '.ts', '.tsx', '.js', '.yaml', '.yml'}): + for pattern, msg in SOVEREIGN_RULES: + for m in re.finditer(pattern, content): + line = self._line_no(content, m.start()) + rel = str(fpath.relative_to(self.repo_path)) + self.result.errors.append(f"{msg} [{rel}:{line}]") def check_soul_canonical(self): - """Rule 4: SOUL.md exists and is canonical in exactly one location""" - soul_path = self.repo_path / "SOUL.md" - if self.repo_name == "timmy-config": + """SOUL.md must exist exactly in timmy-config root.""" + soul_path = self.repo_path / 'SOUL.md' + if self.repo_name == 'timmy-config': if not soul_path.exists(): - self.log_error("SOUL.md is missing from the canonical location (timmy-config root).") + self.result.errors.append( + 'SOUL.md missing from canonical location (timmy-config root).' + ) else: if soul_path.exists(): - self.log_error("SOUL.md found in non-canonical repo. It should only live in timmy-config.") + self.result.errors.append( + 'SOUL.md found in non-canonical repo. Must live only in timmy-config.' + ) def check_readme(self): - """Rule 5: Every repo has a README with current truth""" - readme_path = self.repo_path / "README.md" - if not readme_path.exists(): - self.log_error("README.md is missing.") + """Every repo must have a substantive README.""" + readme = self.repo_path / 'README.md' + if not readme.exists(): + self.result.errors.append('README.md is missing.') else: - content = readme_path.read_text(errors="ignore") + content = readme.read_text(errors='ignore') if len(content.strip()) < 50: - self.log_error("README.md is too short or empty. Provide current truth about the repo.") + self.result.warnings.append( + 'README.md is very short (<50 chars). Provide current truth about the repo.' + ) - def run(self): - print(f"--- Gemini Linter: Auditing {self.repo_name} ---") + # --- runner --- + + def run(self) -> LinterResult: + """Execute all checks and return the result.""" self.check_sidecar_boundary() self.check_hardcoded_ips() self.check_api_keys() + self.check_sovereignty_rules() self.check_soul_canonical() self.check_readme() + return self.result - if self.errors: - print(f"\n[FAILURE] Found {len(self.errors)} architectural violations:") - for err in self.errors: - print(f" - {err}") - return False - else: - print("\n[SUCCESS] Architecture is sound. Sovereignty maintained.") - return True def main(): - parser = argparse.ArgumentParser(description="Gemini Architecture Linter v2") - parser.add_argument("repo_path", nargs="?", default=".", help="Path to the repository to lint") + parser = argparse.ArgumentParser( + description='Gemini Architecture Linter v2 — repo-aware sovereignty gate.' + ) + parser.add_argument( + 'repo_path', nargs='?', default='.', + help='Path to the repository to lint (default: cwd).', + ) + parser.add_argument( + '--repo', dest='repo_flag', default=None, + help='Explicit repo path (alias for positional arg).', + ) + parser.add_argument( + '--json', dest='json_output', action='store_true', + help='Emit machine-readable JSON instead of human text.', + ) args = parser.parse_args() - linter = Linter(args.repo_path) - success = linter.run() - sys.exit(0 if success else 1) + path = args.repo_flag if args.repo_flag else args.repo_path -if __name__ == "__main__": + try: + linter = Linter(path) + except FileNotFoundError as exc: + print(f"ERROR: {exc}", file=sys.stderr) + sys.exit(2) + + result = linter.run() + + if args.json_output: + import json as _json + out = { + 'repo': result.repo_name, + 'passed': result.passed, + 'violation_count': result.violation_count, + 'errors': result.errors, + 'warnings': result.warnings, + } + print(_json.dumps(out, indent=2)) + else: + print(result.summary()) + + sys.exit(0 if result.passed else 1) + + +if __name__ == '__main__': main() diff --git a/tests/test_linter.py b/tests/test_linter.py new file mode 100644 index 00000000..071da039 --- /dev/null +++ b/tests/test_linter.py @@ -0,0 +1,233 @@ +"""Tests for Architecture Linter v2. + +Validates that the linter correctly detects violations and passes clean repos. +Refs: #437 — test-backed linter. +""" + +import json +import sys +import tempfile +from pathlib import Path + +# Add scripts/ to path +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts")) + +from architecture_linter_v2 import Linter, LinterResult + + +# ── helpers ─────────────────────────────────────────────────────────── + +def _make_repo(tmpdir: str, files: dict[str, str], name: str = "test-repo") -> Path: + """Create a fake repo with given files and return its path.""" + repo = Path(tmpdir) / name + repo.mkdir() + for relpath, content in files.items(): + p = repo / relpath + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(content) + return repo + + +def _run(tmpdir, files, name="test-repo"): + repo = _make_repo(tmpdir, files, name) + return Linter(str(repo)).run() + + +# ── clean repo passes ───────────────────────────────────────────────── + +def test_clean_repo_passes(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# Test Repo\n\nThis is a clean test repo with sufficient content to pass.", + "main.py": "print('hello world')\n", + }) + assert result.passed, f"Expected pass but got: {result.errors}" + assert result.violation_count == 0 + + +# ── missing README ──────────────────────────────────────────────────── + +def test_missing_readme_fails(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, {"main.py": "x = 1\n"}) + assert not result.passed + assert any("README" in e for e in result.errors) + + +def test_short_readme_warns(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, {"README.md": "hi\n"}) + # Warnings don't fail the build + assert result.passed + assert any("short" in w.lower() for w in result.warnings) + + +# ── hardcoded IPs ───────────────────────────────────────────────────── + +def test_hardcoded_public_ip_detected(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "server.py": "HOST = '203.0.113.42'\n", + }) + assert not result.passed + assert any("203.0.113.42" in e for e in result.errors) + + +def test_localhost_ip_ignored(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "server.py": "HOST = '127.0.0.1'\n", + }) + ip_errors = [e for e in result.errors if "IP" in e] + assert len(ip_errors) == 0 + + +# ── API keys ────────────────────────────────────────────────────────── + +def test_openai_key_detected(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "config.py": 'key = "sk-abcdefghijklmnopqrstuvwx"\n', + }) + assert not result.passed + assert any("secret" in e.lower() or "key" in e.lower() for e in result.errors) + + +def test_aws_key_detected(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "deploy.yaml": 'aws_key: AKIAIOSFODNN7EXAMPLE\n', + }) + assert not result.passed + assert any("secret" in e.lower() for e in result.errors) + + +def test_env_example_skipped(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + ".env.example": 'OPENAI_KEY=sk-placeholder\n', + }) + secret_errors = [e for e in result.errors if "secret" in e.lower()] + assert len(secret_errors) == 0 + + +# ── sovereignty rules (v1 cloud API checks) ─────────────────────────── + +def test_openai_url_detected(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "app.py": 'url = "https://api.openai.com/v1/chat"\n', + }) + assert not result.passed + assert any("openai" in e.lower() for e in result.errors) + + +def test_cloud_provider_detected(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "config.yaml": "provider: openai\n", + }) + assert not result.passed + assert any("provider" in e.lower() for e in result.errors) + + +# ── sidecar boundary ────────────────────────────────────────────────── + +def test_sovereign_keyword_in_hermes_agent_fails(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "index.py": "import mempalace\n", + }, name="hermes-agent") + assert not result.passed + assert any("sidecar" in e.lower() or "mempalace" in e.lower() for e in result.errors) + + +def test_sovereign_keyword_in_other_repo_ok(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "index.py": "import mempalace\n", + }, name="some-other-repo") + sidecar_errors = [e for e in result.errors if "sidecar" in e.lower()] + assert len(sidecar_errors) == 0 + + +# ── SOUL.md canonical location ──────────────────────────────────────── + +def test_soul_md_required_in_timmy_config(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# timmy-config\n\nConfig repo.", + }, name="timmy-config") + assert not result.passed + assert any("SOUL.md" in e for e in result.errors) + + +def test_soul_md_present_in_timmy_config_ok(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# timmy-config\n\nConfig repo.", + "SOUL.md": "# Soul\n\nCanonical identity document.", + }, name="timmy-config") + soul_errors = [e for e in result.errors if "SOUL" in e] + assert len(soul_errors) == 0 + + +def test_soul_md_in_wrong_repo_fails(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, { + "README.md": "# R\n\nGood repo.", + "SOUL.md": "# Soul\n\nShould not be here.", + }, name="other-repo") + assert any("canonical" in e.lower() for e in result.errors) + + +# ── LinterResult structure ──────────────────────────────────────────── + +def test_result_summary_is_string(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, {"README.md": "# OK repo with enough text here\n"}) + assert isinstance(result.summary(), str) + assert "PASSED" in result.summary() or "FAILED" in result.summary() + + +def test_result_repo_name(): + with tempfile.TemporaryDirectory() as tmp: + result = _run(tmp, {"README.md": "# OK\n"}, name="my-repo") + assert result.repo_name == "my-repo" + + +# ── invalid path ────────────────────────────────────────────────────── + +def test_invalid_path_raises(): + try: + Linter("/nonexistent/path/xyz") + assert False, "Should have raised FileNotFoundError" + except FileNotFoundError: + pass + + +# ── skip dirs ────────────────────────────────────────────────────────── + +def test_git_dir_skipped(): + with tempfile.TemporaryDirectory() as tmp: + repo = _make_repo(tmp, { + "README.md": "# R\n\nGood repo.", + "main.py": "x = 1\n", + }) + # Create a .git/ dir with a bad file + git_dir = repo / ".git" + git_dir.mkdir() + (git_dir / "bad.py").write_text("HOST = '203.0.113.1'\n") + + result = Linter(str(repo)).run() + git_errors = [e for e in result.errors if ".git" in e] + assert len(git_errors) == 0