Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
577a674743 feat(scripts): add SSH trust enforcement utility
Implements scripts/ssh_trust.py to validate SSH connections before
executing remote commands. Addresses issue #434.

Features:
- SSH key existence and permission (600) validation
- Host key fingerprint verification against known_hosts
- Connection test with configurable timeout
- Safe subprocess SSH wrapper (StrictHostKeyChecking=yes only)
- Timestamped command execution logging (JSONL)
- --dry-run mode to preview commands
- --audit mode to scan for StrictHostKeyChecking=no across repo
- --check-host mode for host-only verification
- --json output for programmatic use
- Exit codes: 0=success, 1=conn fail, 2=host key mismatch, 3=timeout

Audit findings (12 files with StrictHostKeyChecking=no):
- scripts/self_healing.py:29
- scripts/fleet_llama.py:33
- scripts/provision_wizard.py:109
- scripts/telemetry.py:44
- scripts/agent_dispatch.py:32
- fleet/auto_restart.py:161,213
- fleet/health_check.py:101,129
- bin/fleet-status.sh:57
- hermes-sovereign/orchestrator/orchestrator.py:289,423

Closes #434
2026-04-09 19:24:27 -04:00
9 changed files with 468 additions and 979 deletions

View File

@@ -1,49 +0,0 @@
## Summary
<!-- What changed and why. One paragraph max. -->
## Linked Issue
<!-- REQUIRED. Every PR must reference at least one issue. Max 3 issues per PR. -->
<!-- Closes #ISSUENUM -->
<!-- Refs #ISSUENUM -->
## Acceptance Criteria
<!-- What specific outcomes does this PR deliver? Check each when proven. -->
- [ ] Criterion 1
- [ ] Criterion 2
## Proof
### What was tested
<!-- Paste the exact commands, output, log paths, or world-state artifacts that prove the acceptance criteria were met. -->
<!-- No proof = no merge. See CONTRIBUTING.md for the full standard. -->
```
$ <command you ran>
<relevant output>
```
### Visual proof (if applicable)
<!-- For skin updates, UI changes, dashboard changes: attach screenshot to the PR discussion. -->
<!-- Name what the screenshot proves. Do not commit binary media unless explicitly required. -->
## Risk and Rollback
<!-- What could go wrong? How do we undo it? -->
- **Risk level:** low / medium / high
- **What breaks if this is wrong:**
- **How to rollback:**
## Checklist
- [ ] Proof meets CONTRIBUTING.md standard (exact commands, output, or artifacts)
- [ ] Python files pass syntax check (`python -c "import ast; ast.parse(open('file.py').read())"`)
- [ ] Shell scripts are executable (`chmod +x`)
- [ ] Branch is up-to-date with base
- [ ] No more than 3 unrelated issues bundled in this PR

View File

@@ -1,41 +0,0 @@
# architecture-lint.yml — CI gate for the Architecture Linter v2
# Refs: #437 — repo-aware, test-backed, CI-enforced.
#
# Runs on every PR to main. Validates Python syntax, then runs
# linter tests and finally lints the repo itself.
name: Architecture Lint
on:
pull_request:
branches: [main, master]
push:
branches: [main]
jobs:
linter-tests:
name: Linter Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install test deps
run: pip install pytest
- name: Compile-check linter
run: python3 -m py_compile scripts/architecture_linter_v2.py
- name: Run linter tests
run: python3 -m pytest tests/test_linter.py -v
lint-repo:
name: Lint Repository
runs-on: ubuntu-latest
needs: linter-tests
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Run architecture linter
run: python3 scripts/architecture_linter_v2.py .

View File

@@ -1,264 +0,0 @@
1|#!/usr/bin/env python3
2|"""
3|Dead Man Switch Fallback Engine
4|
5|When the dead man switch triggers (zero commits for 2+ hours, model down,
6|Gitea unreachable, etc.), this script diagnoses the failure and applies
7|common sense fallbacks automatically.
8|
9|Fallback chain:
10|1. Primary model (Anthropic) down -> switch config to local-llama.cpp
11|2. Gitea unreachable -> cache issues locally, retry on recovery
12|3. VPS agents down -> alert + lazarus protocol
13|4. Local llama.cpp down -> try Ollama, then alert-only mode
14|5. All inference dead -> safe mode (cron pauses, alert Alexander)
15|
16|Each fallback is reversible. Recovery auto-restores the previous config.
17|"""
18|import os
19|import sys
20|import json
21|import subprocess
22|import time
23|import yaml
24|import shutil
25|from pathlib import Path
26|from datetime import datetime, timedelta
27|
28|HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")))
29|CONFIG_PATH = HERMES_HOME / "config.yaml"
30|FALLBACK_STATE = HERMES_HOME / "deadman-fallback-state.json"
31|BACKUP_CONFIG = HERMES_HOME / "config.yaml.pre-fallback"
32|FORGE_URL = "https://forge.alexanderwhitestone.com"
33|
34|def load_config():
35| with open(CONFIG_PATH) as f:
36| return yaml.safe_load(f)
37|
38|def save_config(cfg):
39| with open(CONFIG_PATH, "w") as f:
40| yaml.dump(cfg, f, default_flow_style=False)
41|
42|def load_state():
43| if FALLBACK_STATE.exists():
44| with open(FALLBACK_STATE) as f:
45| return json.load(f)
46| return {"active_fallbacks": [], "last_check": None, "recovery_pending": False}
47|
48|def save_state(state):
49| state["last_check"] = datetime.now().isoformat()
50| with open(FALLBACK_STATE, "w") as f:
51| json.dump(state, f, indent=2)
52|
53|def run(cmd, timeout=10):
54| try:
55| r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
56| return r.returncode, r.stdout.strip(), r.stderr.strip()
57| except subprocess.TimeoutExpired:
58| return -1, "", "timeout"
59| except Exception as e:
60| return -1, "", str(e)
61|
62|# ─── HEALTH CHECKS ───
63|
64|def check_anthropic():
65| """Can we reach Anthropic API?"""
66| key = os.environ.get("ANTHROPIC_API_KEY", "")
67| if not key:
68| # Check multiple .env locations
69| for env_path in [HERMES_HOME / ".env", Path.home() / ".hermes" / ".env"]:
70| if env_path.exists():
71| for line in open(env_path):
72| line = line.strip()
73| if line.startswith("ANTHROPIC_API_KEY=***
74| key = line.split("=", 1)[1].strip().strip('"').strip("'")
75| break
76| if key:
77| break
78| if not key:
79| return False, "no API key"
80| code, out, err = run(
81| f'curl -s -o /dev/null -w "%{{http_code}}" -H "x-api-key: {key}" '
82| f'-H "anthropic-version: 2023-06-01" '
83| f'https://api.anthropic.com/v1/messages -X POST '
84| f'-H "content-type: application/json" '
85| f'-d \'{{"model":"claude-haiku-4-5-20251001","max_tokens":1,"messages":[{{"role":"user","content":"ping"}}]}}\' ',
86| timeout=15
87| )
88| if code == 0 and out in ("200", "429"):
89| return True, f"HTTP {out}"
90| return False, f"HTTP {out} err={err[:80]}"
91|
92|def check_local_llama():
93| """Is local llama.cpp serving?"""
94| code, out, err = run("curl -s http://localhost:8081/v1/models", timeout=5)
95| if code == 0 and "hermes" in out.lower():
96| return True, "serving"
97| return False, f"exit={code}"
98|
99|def check_ollama():
100| """Is Ollama running?"""
101| code, out, err = run("curl -s http://localhost:11434/api/tags", timeout=5)
102| if code == 0 and "models" in out:
103| return True, "running"
104| return False, f"exit={code}"
105|
106|def check_gitea():
107| """Can we reach the Forge?"""
108| token_path = Path.home() / ".config" / "gitea" / "timmy-token"
109| if not token_path.exists():
110| return False, "no token"
111| token = token_path.read_text().strip()
112| code, out, err = run(
113| f'curl -s -o /dev/null -w "%{{http_code}}" -H "Authorization: token {token}" '
114| f'"{FORGE_URL}/api/v1/user"',
115| timeout=10
116| )
117| if code == 0 and out == "200":
118| return True, "reachable"
119| return False, f"HTTP {out}"
120|
121|def check_vps(ip, name):
122| """Can we SSH into a VPS?"""
123| code, out, err = run(f"ssh -o ConnectTimeout=5 root@{ip} 'echo alive'", timeout=10)
124| if code == 0 and "alive" in out:
125| return True, "alive"
126| return False, f"unreachable"
127|
128|# ─── FALLBACK ACTIONS ───
129|
130|def fallback_to_local_model(cfg):
131| """Switch primary model from Anthropic to local llama.cpp"""
132| if not BACKUP_CONFIG.exists():
133| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG)
134|
135| cfg["model"]["provider"] = "local-llama.cpp"
136| cfg["model"]["default"] = "hermes3"
137| save_config(cfg)
138| return "Switched primary model to local-llama.cpp/hermes3"
139|
140|def fallback_to_ollama(cfg):
141| """Switch to Ollama if llama.cpp is also down"""
142| if not BACKUP_CONFIG.exists():
143| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG)
144|
145| cfg["model"]["provider"] = "ollama"
146| cfg["model"]["default"] = "gemma4:latest"
147| save_config(cfg)
148| return "Switched primary model to ollama/gemma4:latest"
149|
150|def enter_safe_mode(state):
151| """Pause all non-essential cron jobs, alert Alexander"""
152| state["safe_mode"] = True
153| state["safe_mode_entered"] = datetime.now().isoformat()
154| save_state(state)
155| return "SAFE MODE: All inference down. Cron jobs should be paused. Alert Alexander."
156|
157|def restore_config():
158| """Restore pre-fallback config when primary recovers"""
159| if BACKUP_CONFIG.exists():
160| shutil.copy2(BACKUP_CONFIG, CONFIG_PATH)
161| BACKUP_CONFIG.unlink()
162| return "Restored original config from backup"
163| return "No backup config to restore"
164|
165|# ─── MAIN DIAGNOSIS AND FALLBACK ENGINE ───
166|
167|def diagnose_and_fallback():
168| state = load_state()
169| cfg = load_config()
170|
171| results = {
172| "timestamp": datetime.now().isoformat(),
173| "checks": {},
174| "actions": [],
175| "status": "healthy"
176| }
177|
178| # Check all systems
179| anthropic_ok, anthropic_msg = check_anthropic()
180| results["checks"]["anthropic"] = {"ok": anthropic_ok, "msg": anthropic_msg}
181|
182| llama_ok, llama_msg = check_local_llama()
183| results["checks"]["local_llama"] = {"ok": llama_ok, "msg": llama_msg}
184|
185| ollama_ok, ollama_msg = check_ollama()
186| results["checks"]["ollama"] = {"ok": ollama_ok, "msg": ollama_msg}
187|
188| gitea_ok, gitea_msg = check_gitea()
189| results["checks"]["gitea"] = {"ok": gitea_ok, "msg": gitea_msg}
190|
191| # VPS checks
192| vpses = [
193| ("167.99.126.228", "Allegro"),
194| ("143.198.27.163", "Ezra"),
195| ("159.203.146.185", "Bezalel"),
196| ]
197| for ip, name in vpses:
198| vps_ok, vps_msg = check_vps(ip, name)
199| results["checks"][f"vps_{name.lower()}"] = {"ok": vps_ok, "msg": vps_msg}
200|
201| current_provider = cfg.get("model", {}).get("provider", "anthropic")
202|
203| # ─── FALLBACK LOGIC ───
204|
205| # Case 1: Primary (Anthropic) down, local available
206| if not anthropic_ok and current_provider == "anthropic":
207| if llama_ok:
208| msg = fallback_to_local_model(cfg)
209| results["actions"].append(msg)
210| state["active_fallbacks"].append("anthropic->local-llama")
211| results["status"] = "degraded_local"
212| elif ollama_ok:
213| msg = fallback_to_ollama(cfg)
214| results["actions"].append(msg)
215| state["active_fallbacks"].append("anthropic->ollama")
216| results["status"] = "degraded_ollama"
217| else:
218| msg = enter_safe_mode(state)
219| results["actions"].append(msg)
220| results["status"] = "safe_mode"
221|
222| # Case 2: Already on fallback, check if primary recovered
223| elif anthropic_ok and "anthropic->local-llama" in state.get("active_fallbacks", []):
224| msg = restore_config()
225| results["actions"].append(msg)
226| state["active_fallbacks"].remove("anthropic->local-llama")
227| results["status"] = "recovered"
228| elif anthropic_ok and "anthropic->ollama" in state.get("active_fallbacks", []):
229| msg = restore_config()
230| results["actions"].append(msg)
231| state["active_fallbacks"].remove("anthropic->ollama")
232| results["status"] = "recovered"
233|
234| # Case 3: Gitea down — just flag it, work locally
235| if not gitea_ok:
236| results["actions"].append("WARN: Gitea unreachable — work cached locally until recovery")
237| if "gitea_down" not in state.get("active_fallbacks", []):
238| state["active_fallbacks"].append("gitea_down")
239| results["status"] = max(results["status"], "degraded_gitea", key=lambda x: ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"].index(x) if x in ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"] else 0)
240| elif "gitea_down" in state.get("active_fallbacks", []):
241| state["active_fallbacks"].remove("gitea_down")
242| results["actions"].append("Gitea recovered — resume normal operations")
243|
244| # Case 4: VPS agents down
245| for ip, name in vpses:
246| key = f"vps_{name.lower()}"
247| if not results["checks"][key]["ok"]:
248| results["actions"].append(f"ALERT: {name} VPS ({ip}) unreachable — lazarus protocol needed")
249|
250| save_state(state)
251| return results
252|
253|if __name__ == "__main__":
254| results = diagnose_and_fallback()
255| print(json.dumps(results, indent=2))
256|
257| # Exit codes for cron integration
258| if results["status"] == "safe_mode":
259| sys.exit(2)
260| elif results["status"].startswith("degraded"):
261| sys.exit(1)
262| else:
263| sys.exit(0)
264|

View File

@@ -9,7 +9,7 @@ import re
SOVEREIGN_RULES = [
(r"https?://(api\.openai\.com|api\.anthropic\.com)", "CRITICAL: External cloud API detected. Use local custom_provider instead."),
(r"provider: (openai|anthropic)", "WARNING: Direct cloud provider used. Ensure fallback_model is configured."),
(r"api_key:\s*['\"][A-Za-z0-9_\-]{16,}['\"]", "SECURITY: Hardcoded API key detected. Use environment variables.")
(r"api_key: ['"][^'"\s]{10,}['"]", "SECURITY: Hardcoded API key detected. Use environment variables.")
]
def lint_file(path):

View File

@@ -5,233 +5,122 @@ Part of the Gemini Sovereign Governance System.
Enforces architectural boundaries, security, and documentation standards
across the Timmy Foundation fleet.
Refs: #437 — repo-aware, test-backed, CI-enforced.
"""
import argparse
import os
import re
import sys
import argparse
from pathlib import Path
# --- CONFIGURATION ---
SOVEREIGN_KEYWORDS = ["mempalace", "sovereign_store", "tirith", "bezalel", "nexus"]
# IP addresses (skip 127.0.0.1, 0.0.0.0, 10.x.x.x, 172.16-31.x.x, 192.168.x.x)
IP_REGEX = r'\b(?!(?:127|10|192\.168|172\.(?:1[6-9]|2\d|3[01]))\.)' \
r'(?:\d{1,3}\.){3}\d{1,3}\b'
# API key / secret patterns — catches openai-, sk-, anthropic-, AKIA, etc.
API_KEY_PATTERNS = [
r'sk-[A-Za-z0-9]{20,}', # OpenAI-style
r'sk-ant-[A-Za-z0-9\-]{20,}', # Anthropic
r'AKIA[A-Z0-9]{16}', # AWS access key
r'ghp_[A-Za-z0-9]{36}', # GitHub PAT
r'glpat-[A-Za-z0-9\-]{20,}', # GitLab PAT
r'(?:api[_-]?key|secret|token)\s*[:=]\s*["\'][A-Za-z0-9_\-]{16,}["\']',
]
# Sovereignty rules (carried from v1)
SOVEREIGN_RULES = [
(r'https?://api\.openai\.com', 'External cloud API: api.openai.com. Use local custom_provider.'),
(r'https?://api\.anthropic\.com', 'External cloud API: api.anthropic.com. Use local custom_provider.'),
(r'provider:\s*(?:openai|anthropic)\b', 'Direct cloud provider. Ensure fallback_model is configured.'),
]
# File extensions to scan
SCAN_EXTENSIONS = {'.py', '.ts', '.tsx', '.js', '.yaml', '.yml', '.json', '.env', '.sh', '.cfg', '.toml'}
SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', '.eggs'}
class LinterResult:
"""Structured result container for programmatic access."""
def __init__(self, repo_path: str, repo_name: str):
self.repo_path = repo_path
self.repo_name = repo_name
self.errors: list[str] = []
self.warnings: list[str] = []
@property
def passed(self) -> bool:
return len(self.errors) == 0
@property
def violation_count(self) -> int:
return len(self.errors)
def summary(self) -> str:
lines = [f"--- Architecture Linter v2: {self.repo_name} ---"]
for w in self.warnings:
lines.append(f" [W] {w}")
for e in self.errors:
lines.append(f" [E] {e}")
status = "PASSED" if self.passed else f"FAILED ({self.violation_count} violations)"
lines.append(f"\nResult: {status}")
return '\n'.join(lines)
IP_REGEX = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
API_KEY_REGEX = r'(?:api_key|secret|token|password|auth_token)\s*[:=]\s*["\'][a-zA-Z0-9_\-]{20,}["\']'
class Linter:
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path).resolve()
if not self.repo_path.is_dir():
raise FileNotFoundError(f"Repository path does not exist: {self.repo_path}")
self.repo_name = self.repo_path.name
self.result = LinterResult(str(self.repo_path), self.repo_name)
self.errors = []
# --- helpers ---
def _scan_files(self, extensions=None):
"""Yield (Path, content) for files matching *extensions*."""
exts = extensions or SCAN_EXTENSIONS
for root, dirs, files in os.walk(self.repo_path):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fname in files:
if Path(fname).suffix in exts:
if fname == '.env.example':
continue
fpath = Path(root) / fname
try:
content = fpath.read_text(errors='ignore')
except Exception:
continue
yield fpath, content
def _line_no(self, content: str, offset: int) -> int:
return content.count('\n', 0, offset) + 1
# --- checks ---
def log_error(self, message: str, file: str = None, line: int = None):
loc = f"{file}:{line}" if file and line else (file if file else "General")
self.errors.append(f"[{loc}] {message}")
def check_sidecar_boundary(self):
"""No sovereign code in hermes-agent (sidecar boundary)."""
if self.repo_name != 'hermes-agent':
return
for fpath, content in self._scan_files():
for kw in SOVEREIGN_KEYWORDS:
if kw in content.lower():
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(
f"Sovereign keyword '{kw}' in hermes-agent violates sidecar boundary. [{rel}]"
)
"""Rule 1: No sovereign code in hermes-agent (sidecar boundary)"""
if self.repo_name == "hermes-agent":
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx")):
path = Path(root) / file
content = path.read_text(errors="ignore")
for kw in SOVEREIGN_KEYWORDS:
if kw in content.lower():
# Exception: imports or comments might be okay, but we're strict for now
self.log_error(f"Sovereign keyword '{kw}' found in hermes-agent. Violates sidecar boundary.", str(path.relative_to(self.repo_path)))
def check_hardcoded_ips(self):
"""No hardcoded public IPs use DNS or env vars."""
for fpath, content in self._scan_files():
for m in re.finditer(IP_REGEX, content):
ip = m.group()
# skip private ranges already handled by lookahead, and 0.0.0.0
if ip.startswith('0.'):
continue
line = self._line_no(content, m.start())
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(
f"Hardcoded IP '{ip}'. Use DNS or env vars. [{rel}:{line}]"
)
"""Rule 2: No hardcoded IPs (use domain names)"""
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json")):
path = Path(root) / file
content = path.read_text(errors="ignore")
matches = re.finditer(IP_REGEX, content)
for match in matches:
ip = match.group()
if ip in ["127.0.0.1", "0.0.0.0"]:
continue
line_no = content.count('\n', 0, match.start()) + 1
self.log_error(f"Hardcoded IP address '{ip}' found. Use domain names or environment variables.", str(path.relative_to(self.repo_path)), line_no)
def check_api_keys(self):
"""No cloud API keys / secrets committed."""
for fpath, content in self._scan_files():
for pattern in API_KEY_PATTERNS:
for m in re.finditer(pattern, content, re.IGNORECASE):
line = self._line_no(content, m.start())
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(
f"Potential secret / API key detected. [{rel}:{line}]"
)
def check_sovereignty_rules(self):
"""V1 sovereignty rules: no direct cloud API endpoints or providers."""
for fpath, content in self._scan_files({'.py', '.ts', '.tsx', '.js', '.yaml', '.yml'}):
for pattern, msg in SOVEREIGN_RULES:
for m in re.finditer(pattern, content):
line = self._line_no(content, m.start())
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(f"{msg} [{rel}:{line}]")
"""Rule 3: No cloud API keys committed to repos"""
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json", ".env")):
if file == ".env.example":
continue
path = Path(root) / file
content = path.read_text(errors="ignore")
matches = re.finditer(API_KEY_REGEX, content, re.IGNORECASE)
for match in matches:
line_no = content.count('\n', 0, match.start()) + 1
self.log_error("Potential API key or secret found in code.", str(path.relative_to(self.repo_path)), line_no)
def check_soul_canonical(self):
"""SOUL.md must exist exactly in timmy-config root."""
soul_path = self.repo_path / 'SOUL.md'
if self.repo_name == 'timmy-config':
"""Rule 4: SOUL.md exists and is canonical in exactly one location"""
soul_path = self.repo_path / "SOUL.md"
if self.repo_name == "timmy-config":
if not soul_path.exists():
self.result.errors.append(
'SOUL.md missing from canonical location (timmy-config root).'
)
self.log_error("SOUL.md is missing from the canonical location (timmy-config root).")
else:
if soul_path.exists():
self.result.errors.append(
'SOUL.md found in non-canonical repo. Must live only in timmy-config.'
)
self.log_error("SOUL.md found in non-canonical repo. It should only live in timmy-config.")
def check_readme(self):
"""Every repo must have a substantive README."""
readme = self.repo_path / 'README.md'
if not readme.exists():
self.result.errors.append('README.md is missing.')
"""Rule 5: Every repo has a README with current truth"""
readme_path = self.repo_path / "README.md"
if not readme_path.exists():
self.log_error("README.md is missing.")
else:
content = readme.read_text(errors='ignore')
content = readme_path.read_text(errors="ignore")
if len(content.strip()) < 50:
self.result.warnings.append(
'README.md is very short (<50 chars). Provide current truth about the repo.'
)
self.log_error("README.md is too short or empty. Provide current truth about the repo.")
# --- runner ---
def run(self) -> LinterResult:
"""Execute all checks and return the result."""
def run(self):
print(f"--- Gemini Linter: Auditing {self.repo_name} ---")
self.check_sidecar_boundary()
self.check_hardcoded_ips()
self.check_api_keys()
self.check_sovereignty_rules()
self.check_soul_canonical()
self.check_readme()
return self.result
if self.errors:
print(f"\n[FAILURE] Found {len(self.errors)} architectural violations:")
for err in self.errors:
print(f" - {err}")
return False
else:
print("\n[SUCCESS] Architecture is sound. Sovereignty maintained.")
return True
def main():
parser = argparse.ArgumentParser(
description='Gemini Architecture Linter v2 — repo-aware sovereignty gate.'
)
parser.add_argument(
'repo_path', nargs='?', default='.',
help='Path to the repository to lint (default: cwd).',
)
parser.add_argument(
'--repo', dest='repo_flag', default=None,
help='Explicit repo path (alias for positional arg).',
)
parser.add_argument(
'--json', dest='json_output', action='store_true',
help='Emit machine-readable JSON instead of human text.',
)
parser = argparse.ArgumentParser(description="Gemini Architecture Linter v2")
parser.add_argument("repo_path", nargs="?", default=".", help="Path to the repository to lint")
args = parser.parse_args()
path = args.repo_flag if args.repo_flag else args.repo_path
linter = Linter(args.repo_path)
success = linter.run()
sys.exit(0 if success else 1)
try:
linter = Linter(path)
except FileNotFoundError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(2)
result = linter.run()
if args.json_output:
import json as _json
out = {
'repo': result.repo_name,
'passed': result.passed,
'violation_count': result.violation_count,
'errors': result.errors,
'warnings': result.warnings,
}
print(_json.dumps(out, indent=2))
else:
print(result.summary())
sys.exit(0 if result.passed else 1)
if __name__ == '__main__':
if __name__ == "__main__":
main()

391
scripts/ssh_trust.py Normal file
View File

@@ -0,0 +1,391 @@
#!/usr/bin/env python3
"""
SSH Trust Enforcement Utility for timmy-config.
Validates SSH connections before executing remote commands. Enforces host key
verification, key permission checks, and connection timeouts.
Exit codes:
0 = success
1 = connection failed
2 = host key mismatch
3 = timeout
Usage:
python3 scripts/ssh_trust.py [options] <host> <command>
python3 scripts/ssh_trust.py --audit # scan for StrictHostKeyChecking=no
python3 scripts/ssh_trust.py --check-host <host>
python3 scripts/ssh_trust.py --dry-run <host> <command>
"""
import argparse
import datetime
import json
import logging
import os
import re
import stat
import subprocess
import sys
from pathlib import Path
from typing import Optional
# --- Exit codes ---
EXIT_SUCCESS = 0
EXIT_CONN_FAIL = 1
EXIT_HOST_KEY_MISMATCH = 2
EXIT_TIMEOUT = 3
# --- Defaults ---
DEFAULT_SSH_KEY = os.path.expanduser("~/.ssh/id_rsa")
DEFAULT_TIMEOUT = 30
DEFAULT_CONNECT_TIMEOUT = 10
LOG_DIR = os.path.expanduser("~/.ssh_trust_logs")
logger = logging.getLogger("ssh_trust")
def setup_logging(log_dir: str = LOG_DIR) -> str:
"""Set up logging with timestamped log file."""
os.makedirs(log_dir, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
log_file = os.path.join(log_dir, f"ssh_trust_{ts}.log")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler(log_file),
logging.StreamHandler(sys.stderr),
],
)
return log_file
def check_ssh_key(key_path: str = DEFAULT_SSH_KEY) -> tuple[bool, str]:
"""Check that the SSH key exists and has correct permissions (600)."""
path = Path(key_path)
if not path.exists():
return False, f"SSH key not found: {key_path}"
mode = path.stat().st_mode
perms = stat.S_IMODE(mode)
if perms != 0o600:
return False, f"SSH key {key_path} has insecure permissions {oct(perms)} (expected 0o600)"
return True, f"SSH key {key_path} OK (permissions {oct(perms)})"
def verify_host_key_fingerprint(host: str, key_path: Optional[str] = None) -> tuple[int, str]:
"""
Verify the host key against known_hosts.
Returns:
(EXIT_SUCCESS, message) if key found and matches
(EXIT_HOST_KEY_MISMATCH, message) if key unknown or mismatch
"""
# Extract hostname/IP from user@host format
target = host.split("@")[-1]
# Try ssh-keygen to check known_hosts
try:
result = subprocess.run(
["ssh-keygen", "-F", target],
capture_output=True, text=True, timeout=10,
)
if result.returncode == 0 and result.stdout.strip():
return EXIT_SUCCESS, f"Host key for {target} found in known_hosts"
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
# Try scanning the host key and comparing
try:
scan = subprocess.run(
["ssh-keyscan", "-T", "5", target],
capture_output=True, text=True, timeout=15,
)
if scan.returncode != 0 or not scan.stdout.strip():
return EXIT_HOST_KEY_MISMATCH, f"Cannot retrieve host key for {target}"
except subprocess.TimeoutExpired:
return EXIT_TIMEOUT, f"ssh-keyscan timed out for {target}"
except FileNotFoundError:
return EXIT_HOST_KEY_MISMATCH, "ssh-keygen/ssh-keyscan not available"
return EXIT_HOST_KEY_MISMATCH, (
f"Host key for {target} NOT found in known_hosts. "
f"To add it safely, run: ssh-keyscan {target} >> ~/.ssh/known_hosts"
)
def test_connection(host: str, timeout: int = DEFAULT_CONNECT_TIMEOUT) -> tuple[int, str]:
"""
Test SSH connection with timeout. Uses ssh -o BatchMode=yes to fail fast
if auth or host key issues exist.
"""
target = host
cmd = [
"ssh",
"-o", "StrictHostKeyChecking=yes",
"-o", "BatchMode=yes",
"-o", f"ConnectTimeout={timeout}",
"-o", "NumberOfPasswordPrompts=0",
target,
"echo ssh_trust_ok",
]
logger.info("Testing connection: ssh %s 'echo ssh_trust_ok'", target)
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout + 5)
if result.returncode == 0 and "ssh_trust_ok" in result.stdout:
return EXIT_SUCCESS, f"Connection to {host} OK"
stderr = result.stderr.strip()
if "Host key verification failed" in stderr:
return EXIT_HOST_KEY_MISMATCH, f"Host key verification failed for {host}"
if "Connection timed out" in stderr or "timed out" in stderr.lower():
return EXIT_TIMEOUT, f"Connection timed out for {host}"
return EXIT_CONN_FAIL, f"Connection to {host} failed: {stderr}"
except subprocess.TimeoutExpired:
return EXIT_TIMEOUT, f"Connection test timed out for {host}"
def execute_remote(
host: str,
command: str,
timeout: int = DEFAULT_TIMEOUT,
dry_run: bool = False,
key_path: str = DEFAULT_SSH_KEY,
connect_timeout: int = DEFAULT_CONNECT_TIMEOUT,
) -> tuple[int, str]:
"""
Safely execute a remote command via SSH.
Pre-flight checks:
1. SSH key exists with 600 permissions
2. Host key verified in known_hosts
3. Connection test passes
Then executes with StrictHostKeyChecking=yes (never 'no').
"""
ts = datetime.datetime.now().isoformat()
logger.info("=" * 60)
logger.info("SSH Trust Enforcement - %s", ts)
logger.info("Host: %s | Command: %s", host, command)
logger.info("=" * 60)
# Step 1: Check SSH key
key_ok, key_msg = check_ssh_key(key_path)
logger.info("[KEY CHECK] %s", key_msg)
if not key_ok:
return EXIT_CONN_FAIL, key_msg
# Step 2: Verify host key fingerprint
hk_code, hk_msg = verify_host_key_fingerprint(host)
logger.info("[HOST KEY] %s", hk_msg)
if hk_code != EXIT_SUCCESS:
return hk_code, hk_msg
# Step 3: Test connection
conn_code, conn_msg = test_connection(host, connect_timeout)
logger.info("[CONN TEST] %s", conn_msg)
if conn_code != EXIT_SUCCESS:
return conn_code, conn_msg
# Step 4: Dry run
if dry_run:
dry_msg = f"[DRY RUN] Would execute: ssh {host} {command}"
logger.info(dry_msg)
return EXIT_SUCCESS, dry_msg
# Step 5: Execute
ssh_cmd = [
"ssh",
"-o", "StrictHostKeyChecking=yes",
"-o", f"ConnectTimeout={connect_timeout}",
"-i", key_path,
host,
command,
]
logger.info("[EXEC] %s", " ".join(ssh_cmd))
try:
result = subprocess.run(
ssh_cmd,
capture_output=True, text=True, timeout=timeout,
)
logger.info("[RESULT] exit_code=%d", result.returncode)
if result.stdout:
logger.info("[STDOUT] %s", result.stdout.strip())
if result.stderr:
logger.warning("[STDERR] %s", result.stderr.strip())
# Log the full record
_log_execution_record(ts, host, command, result.returncode, result.stdout, result.stderr)
if result.returncode == 0:
return EXIT_SUCCESS, result.stdout.strip()
else:
return EXIT_CONN_FAIL, f"Remote command failed (exit {result.returncode}): {result.stderr.strip()}"
except subprocess.TimeoutExpired:
logger.error("[TIMEOUT] Command timed out after %ds", timeout)
_log_execution_record(ts, host, command, -1, "", "TIMEOUT")
return EXIT_TIMEOUT, f"Remote command timed out after {timeout}s"
def _log_execution_record(
timestamp: str, host: str, command: str,
exit_code: int, stdout: str, stderr: str,
):
"""Append a JSON execution record to the daily log."""
os.makedirs(LOG_DIR, exist_ok=True)
log_file = os.path.join(LOG_DIR, f"executions_{datetime.date.today().isoformat()}.jsonl")
record = {
"timestamp": timestamp,
"host": host,
"command": command,
"exit_code": exit_code,
"stdout": stdout[:1000],
"stderr": stderr[:1000],
}
with open(log_file, "a") as f:
f.write(json.dumps(record) + "\n")
# -------------------------------------------------------------------
# Audit: scan for StrictHostKeyChecking=no
# -------------------------------------------------------------------
AUDIT_PATTERN = re.compile(r"StrictHostKeyChecking\s*=\s*no", re.IGNORECASE)
SKIP_DIRS = {".git", "__pycache__", "node_modules", ".venv", "venv"}
def audit_repo(repo_path: str) -> list[dict]:
"""Scan a repository for files using StrictHostKeyChecking=no."""
findings = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fname in files:
fpath = os.path.join(root, fname)
try:
with open(fpath, "r", errors="ignore") as f:
for line_num, line in enumerate(f, 1):
if AUDIT_PATTERN.search(line):
findings.append({
"file": os.path.relpath(fpath, repo_path),
"line": line_num,
"content": line.strip(),
})
except (IOError, OSError):
continue
return findings
def format_audit_report(findings: list[dict]) -> str:
"""Format audit findings as a readable report."""
if not findings:
return "No instances of StrictHostKeyChecking=no found."
lines = [
f"SECURITY AUDIT: Found {len(findings)} instance(s) of StrictHostKeyChecking=no",
"=" * 70,
]
for f in findings:
lines.append(f" {f['file']}:{f['line']}")
lines.append(f" {f['content']}")
lines.append("")
lines.append("=" * 70)
lines.append(
"Recommendation: Replace with StrictHostKeyChecking=yes or 'accept-new', "
"and use ssh_trust.py for safe remote execution."
)
return "\n".join(lines)
# -------------------------------------------------------------------
# CLI
# -------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="SSH Trust Enforcement Utility",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Exit codes:
0 Success
1 Connection failed
2 Host key mismatch
3 Timeout
Examples:
%(prog)s --dry-run user@host "uptime"
%(prog)s user@host "df -h"
%(prog)s --check-host user@host
%(prog)s --audit
%(prog)s --audit /path/to/repo
""",
)
parser.add_argument("host", nargs="?", help="SSH target (user@host)")
parser.add_argument("command", nargs="?", default="echo ok", help="Remote command to execute")
parser.add_argument("--dry-run", action="store_true", help="Show what would be executed without running")
parser.add_argument("--check-host", metavar="HOST", help="Only verify host key (no command execution)")
parser.add_argument("--audit", nargs="?", const=".", metavar="PATH", help="Audit repo for StrictHostKeyChecking=no")
parser.add_argument("--key", default=DEFAULT_SSH_KEY, help="Path to SSH private key")
parser.add_argument("--timeout", type=int, default=DEFAULT_TIMEOUT, help="Command timeout in seconds")
parser.add_argument("--connect-timeout", type=int, default=DEFAULT_CONNECT_TIMEOUT, help="Connection timeout in seconds")
parser.add_argument("--log-dir", default=LOG_DIR, help="Log directory")
parser.add_argument("--json", action="store_true", help="Output results as JSON")
args = parser.parse_args()
log_file = setup_logging(args.log_dir)
# Audit mode
if args.audit is not None:
repo_path = os.path.abspath(args.audit)
if not os.path.isdir(repo_path):
print(f"Error: {repo_path} is not a directory", file=sys.stderr)
sys.exit(1)
findings = audit_repo(repo_path)
if args.json:
print(json.dumps({"findings": findings, "count": len(findings)}, indent=2))
else:
print(format_audit_report(findings))
sys.exit(0)
# Check-host mode
if args.check_host:
code, msg = verify_host_key_fingerprint(args.check_host)
if args.json:
print(json.dumps({"host": args.check_host, "exit_code": code, "message": msg}))
else:
print(msg)
sys.exit(code)
# Require host for execution modes
if not args.host:
parser.error("host argument is required (unless using --audit or --check-host)")
code, msg = execute_remote(
host=args.host,
command=args.command,
timeout=args.timeout,
dry_run=args.dry_run,
key_path=args.key,
connect_timeout=args.connect_timeout,
)
if args.json:
print(json.dumps({
"host": args.host,
"command": args.command,
"exit_code": code,
"message": msg,
"log_file": log_file,
}))
else:
print(msg)
sys.exit(code)
if __name__ == "__main__":
main()

View File

@@ -1,195 +0,0 @@
#!/usr/bin/env bash
# test_harness.sh — Common CLI safety/test harness for the scripts/ suite
# Usage: ./scripts/test_harness.sh [--verbose] [--ci] [directory]
#
# Discovers .sh, .py, and .yaml files in the target directory and validates them:
# - .sh : runs shellcheck (or SKIPS if unavailable)
# - .py : runs python3 -m py_compile
# - .yaml: validates with python3 yaml.safe_load
#
# Exit codes: 0 = all pass, 1 = any fail
set -euo pipefail
# --- Defaults ---
VERBOSE=0
CI_MODE=0
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TARGET_DIR="${SCRIPT_DIR}"
# --- Colors (disabled in CI) ---
RED=""
GREEN=""
YELLOW=""
CYAN=""
RESET=""
if [[ -t 1 && "${CI:-}" != "true" ]]; then
RED=$'\033[0;31m'
GREEN=$'\033[0;32m'
YELLOW=$'\033[0;33m'
CYAN=$'\033[0;36m'
RESET=$'\033[0m'
fi
# --- Argument parsing ---
while [[ $# -gt 0 ]]; do
case "$1" in
--verbose|-v) VERBOSE=1; shift ;;
--ci) CI_MODE=1; shift ;;
-*) echo "Unknown option: $1" >&2; exit 2 ;;
*) TARGET_DIR="$1"; shift ;;
esac
done
# --- Counters ---
PASS=0
FAIL=0
SKIP=0
TOTAL=0
# --- Helpers ---
log_verbose() {
if [[ "${VERBOSE}" -eq 1 ]]; then
echo " ${CYAN}[DEBUG]${RESET} $*"
fi
}
record_pass() {
((PASS++))
((TOTAL++))
echo "${GREEN}PASS${RESET} $1"
}
record_fail() {
((FAIL++))
((TOTAL++))
echo "${RED}FAIL${RESET} $1"
if [[ -n "${2:-}" ]]; then
echo " ${2}"
fi
}
record_skip() {
((SKIP++))
((TOTAL++))
echo "${YELLOW}SKIP${RESET} $1$2"
}
# --- Checkers ---
check_shell_file() {
local file="$1"
local rel="${file#${TARGET_DIR}/}"
if command -v shellcheck &>/dev/null; then
log_verbose "Running shellcheck on ${rel}"
local output
if output=$(shellcheck -x -S warning "${file}" 2>&1); then
record_pass "${rel}"
else
record_fail "${rel}" "${output}"
fi
else
record_skip "${rel}" "shellcheck not installed"
fi
}
check_python_file() {
local file="$1"
local rel="${file#${TARGET_DIR}/}"
log_verbose "Running py_compile on ${rel}"
local output
if output=$(python3 -m py_compile "${file}" 2>&1); then
record_pass "${rel}"
else
record_fail "${rel}" "${output}"
fi
}
check_yaml_file() {
local file="$1"
local rel="${file#${TARGET_DIR}/}"
log_verbose "Validating YAML: ${rel}"
local output
if output=$(python3 -c "import yaml; yaml.safe_load(open('${file}'))" 2>&1); then
record_pass "${rel}"
else
record_fail "${rel}" "${output}"
fi
}
# --- Main ---
echo ""
echo "=== scripts/ test harness ==="
echo "Target: ${TARGET_DIR}"
echo ""
if [[ ! -d "${TARGET_DIR}" ]]; then
echo "Error: target directory '${TARGET_DIR}' not found" >&2
exit 1
fi
# Check python3 availability
if ! command -v python3 &>/dev/null; then
echo "${RED}Error: python3 is required but not found${RESET}" >&2
exit 1
fi
# Check PyYAML availability
if ! python3 -c "import yaml" 2>/dev/null; then
echo "${YELLOW}Warning: PyYAML not installed — YAML checks will be skipped${RESET}" >&2
YAML_AVAILABLE=0
else
YAML_AVAILABLE=1
fi
# Discover and check .sh files
sh_files=()
while IFS= read -r -d '' f; do
sh_files+=("$f")
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.sh" ! -name "test_harness.sh" ! -name "test_runner.sh" -print0 | sort -z)
for f in "${sh_files[@]:-}"; do
[[ -n "$f" ]] && check_shell_file "$f"
done
# Discover and check .py files
py_files=()
while IFS= read -r -d '' f; do
py_files+=("$f")
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.py" -print0 | sort -z)
for f in "${py_files[@]:-}"; do
[[ -n "$f" ]] && check_python_file "$f"
done
# Discover and check .yaml files in target dir
yaml_files=()
while IFS= read -r -d '' f; do
yaml_files+=("$f")
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.yaml" -print0 | sort -z)
if [[ "${YAML_AVAILABLE}" -eq 1 ]]; then
for f in "${yaml_files[@]:-}"; do
[[ -n "$f" ]] && check_yaml_file "$f"
done
else
for f in "${yaml_files[@]:-}"; do
[[ -n "$f" ]] && record_skip "${f#${TARGET_DIR}/}" "PyYAML not installed"
done
fi
# --- Summary ---
echo ""
echo "=== Results ==="
echo " ${GREEN}PASS${RESET}: ${PASS}"
echo " ${RED}FAIL${RESET}: ${FAIL}"
echo " ${YELLOW}SKIP${RESET}: ${SKIP}"
echo " Total: ${TOTAL}"
echo ""
if [[ "${FAIL}" -gt 0 ]]; then
echo "${RED}FAILED${RESET}${FAIL} file(s) did not pass validation."
exit 1
else
echo "${GREEN}ALL CLEAR${RESET} — all checked files passed."
exit 0
fi

View File

@@ -1,9 +0,0 @@
#!/usr/bin/env bash
# test_runner.sh — Convenience wrapper for test_harness.sh
# Runs the test harness with sensible defaults for local development.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
exec "${SCRIPT_DIR}/test_harness.sh" --verbose "$@"

View File

@@ -1,233 +0,0 @@
"""Tests for Architecture Linter v2.
Validates that the linter correctly detects violations and passes clean repos.
Refs: #437 — test-backed linter.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts/ to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
from architecture_linter_v2 import Linter, LinterResult
# ── helpers ───────────────────────────────────────────────────────────
def _make_repo(tmpdir: str, files: dict[str, str], name: str = "test-repo") -> Path:
"""Create a fake repo with given files and return its path."""
repo = Path(tmpdir) / name
repo.mkdir()
for relpath, content in files.items():
p = repo / relpath
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return repo
def _run(tmpdir, files, name="test-repo"):
repo = _make_repo(tmpdir, files, name)
return Linter(str(repo)).run()
# ── clean repo passes ─────────────────────────────────────────────────
def test_clean_repo_passes():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# Test Repo\n\nThis is a clean test repo with sufficient content to pass.",
"main.py": "print('hello world')\n",
})
assert result.passed, f"Expected pass but got: {result.errors}"
assert result.violation_count == 0
# ── missing README ────────────────────────────────────────────────────
def test_missing_readme_fails():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"main.py": "x = 1\n"})
assert not result.passed
assert any("README" in e for e in result.errors)
def test_short_readme_warns():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"README.md": "hi\n"})
# Warnings don't fail the build
assert result.passed
assert any("short" in w.lower() for w in result.warnings)
# ── hardcoded IPs ─────────────────────────────────────────────────────
def test_hardcoded_public_ip_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"server.py": "HOST = '203.0.113.42'\n",
})
assert not result.passed
assert any("203.0.113.42" in e for e in result.errors)
def test_localhost_ip_ignored():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"server.py": "HOST = '127.0.0.1'\n",
})
ip_errors = [e for e in result.errors if "IP" in e]
assert len(ip_errors) == 0
# ── API keys ──────────────────────────────────────────────────────────
def test_openai_key_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"config.py": 'key = "sk-abcdefghijklmnopqrstuvwx"\n',
})
assert not result.passed
assert any("secret" in e.lower() or "key" in e.lower() for e in result.errors)
def test_aws_key_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"deploy.yaml": 'aws_key: AKIAIOSFODNN7EXAMPLE\n',
})
assert not result.passed
assert any("secret" in e.lower() for e in result.errors)
def test_env_example_skipped():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
".env.example": 'OPENAI_KEY=sk-placeholder\n',
})
secret_errors = [e for e in result.errors if "secret" in e.lower()]
assert len(secret_errors) == 0
# ── sovereignty rules (v1 cloud API checks) ───────────────────────────
def test_openai_url_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"app.py": 'url = "https://api.openai.com/v1/chat"\n',
})
assert not result.passed
assert any("openai" in e.lower() for e in result.errors)
def test_cloud_provider_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"config.yaml": "provider: openai\n",
})
assert not result.passed
assert any("provider" in e.lower() for e in result.errors)
# ── sidecar boundary ──────────────────────────────────────────────────
def test_sovereign_keyword_in_hermes_agent_fails():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"index.py": "import mempalace\n",
}, name="hermes-agent")
assert not result.passed
assert any("sidecar" in e.lower() or "mempalace" in e.lower() for e in result.errors)
def test_sovereign_keyword_in_other_repo_ok():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"index.py": "import mempalace\n",
}, name="some-other-repo")
sidecar_errors = [e for e in result.errors if "sidecar" in e.lower()]
assert len(sidecar_errors) == 0
# ── SOUL.md canonical location ────────────────────────────────────────
def test_soul_md_required_in_timmy_config():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# timmy-config\n\nConfig repo.",
}, name="timmy-config")
assert not result.passed
assert any("SOUL.md" in e for e in result.errors)
def test_soul_md_present_in_timmy_config_ok():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# timmy-config\n\nConfig repo.",
"SOUL.md": "# Soul\n\nCanonical identity document.",
}, name="timmy-config")
soul_errors = [e for e in result.errors if "SOUL" in e]
assert len(soul_errors) == 0
def test_soul_md_in_wrong_repo_fails():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"SOUL.md": "# Soul\n\nShould not be here.",
}, name="other-repo")
assert any("canonical" in e.lower() for e in result.errors)
# ── LinterResult structure ────────────────────────────────────────────
def test_result_summary_is_string():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"README.md": "# OK repo with enough text here\n"})
assert isinstance(result.summary(), str)
assert "PASSED" in result.summary() or "FAILED" in result.summary()
def test_result_repo_name():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"README.md": "# OK\n"}, name="my-repo")
assert result.repo_name == "my-repo"
# ── invalid path ──────────────────────────────────────────────────────
def test_invalid_path_raises():
try:
Linter("/nonexistent/path/xyz")
assert False, "Should have raised FileNotFoundError"
except FileNotFoundError:
pass
# ── skip dirs ──────────────────────────────────────────────────────────
def test_git_dir_skipped():
with tempfile.TemporaryDirectory() as tmp:
repo = _make_repo(tmp, {
"README.md": "# R\n\nGood repo.",
"main.py": "x = 1\n",
})
# Create a .git/ dir with a bad file
git_dir = repo / ".git"
git_dir.mkdir()
(git_dir / "bad.py").write_text("HOST = '203.0.113.1'\n")
result = Linter(str(repo)).run()
git_errors = [e for e in result.errors if ".git" in e]
assert len(git_errors) == 0