Compare commits
14 Commits
perplexity
...
perplexity
| Author | SHA1 | Date | |
|---|---|---|---|
| b172d23b98 | |||
| fa9e83ac95 | |||
| 6e5f1f6a22 | |||
| 2677e1c796 | |||
| 836849ffeb | |||
| eb7ca1f96f | |||
|
|
641db62112 | ||
| b38871d4cd | |||
|
|
ee025957d9 | ||
|
|
b18fc76868 | ||
| 41044d36ae | |||
| a9aed5a545 | |||
| c5e6494326 | |||
| 763e35f47a |
49
.gitea/PULL_REQUEST_TEMPLATE.md
Normal file
49
.gitea/PULL_REQUEST_TEMPLATE.md
Normal file
@@ -0,0 +1,49 @@
|
||||
## Summary
|
||||
|
||||
<!-- What changed and why. One paragraph max. -->
|
||||
|
||||
## Linked Issue
|
||||
|
||||
<!-- REQUIRED. Every PR must reference at least one issue. Max 3 issues per PR. -->
|
||||
<!-- Closes #ISSUENUM -->
|
||||
<!-- Refs #ISSUENUM -->
|
||||
|
||||
## Acceptance Criteria
|
||||
|
||||
<!-- What specific outcomes does this PR deliver? Check each when proven. -->
|
||||
|
||||
- [ ] Criterion 1
|
||||
- [ ] Criterion 2
|
||||
|
||||
## Proof
|
||||
|
||||
### What was tested
|
||||
|
||||
<!-- Paste the exact commands, output, log paths, or world-state artifacts that prove the acceptance criteria were met. -->
|
||||
<!-- No proof = no merge. See CONTRIBUTING.md for the full standard. -->
|
||||
|
||||
```
|
||||
$ <command you ran>
|
||||
<relevant output>
|
||||
```
|
||||
|
||||
### Visual proof (if applicable)
|
||||
|
||||
<!-- For skin updates, UI changes, dashboard changes: attach screenshot to the PR discussion. -->
|
||||
<!-- Name what the screenshot proves. Do not commit binary media unless explicitly required. -->
|
||||
|
||||
## Risk and Rollback
|
||||
|
||||
<!-- What could go wrong? How do we undo it? -->
|
||||
|
||||
- **Risk level:** low / medium / high
|
||||
- **What breaks if this is wrong:**
|
||||
- **How to rollback:**
|
||||
|
||||
## Checklist
|
||||
|
||||
- [ ] Proof meets CONTRIBUTING.md standard (exact commands, output, or artifacts)
|
||||
- [ ] Python files pass syntax check (`python -c "import ast; ast.parse(open('file.py').read())"`)
|
||||
- [ ] Shell scripts are executable (`chmod +x`)
|
||||
- [ ] Branch is up-to-date with base
|
||||
- [ ] No more than 3 unrelated issues bundled in this PR
|
||||
41
.gitea/workflows/architecture-lint.yml
Normal file
41
.gitea/workflows/architecture-lint.yml
Normal file
@@ -0,0 +1,41 @@
|
||||
# architecture-lint.yml — CI gate for the Architecture Linter v2
|
||||
# Refs: #437 — repo-aware, test-backed, CI-enforced.
|
||||
#
|
||||
# Runs on every PR to main. Validates Python syntax, then runs
|
||||
# linter tests and finally lints the repo itself.
|
||||
|
||||
name: Architecture Lint
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: [main, master]
|
||||
push:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
linter-tests:
|
||||
name: Linter Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
- name: Install test deps
|
||||
run: pip install pytest
|
||||
- name: Compile-check linter
|
||||
run: python3 -m py_compile scripts/architecture_linter_v2.py
|
||||
- name: Run linter tests
|
||||
run: python3 -m pytest tests/test_linter.py -v
|
||||
|
||||
lint-repo:
|
||||
name: Lint Repository
|
||||
runs-on: ubuntu-latest
|
||||
needs: linter-tests
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.11"
|
||||
- name: Run architecture linter
|
||||
run: python3 scripts/architecture_linter_v2.py .
|
||||
264
bin/deadman-fallback.py
Normal file
264
bin/deadman-fallback.py
Normal file
@@ -0,0 +1,264 @@
|
||||
1|#!/usr/bin/env python3
|
||||
2|"""
|
||||
3|Dead Man Switch Fallback Engine
|
||||
4|
|
||||
5|When the dead man switch triggers (zero commits for 2+ hours, model down,
|
||||
6|Gitea unreachable, etc.), this script diagnoses the failure and applies
|
||||
7|common sense fallbacks automatically.
|
||||
8|
|
||||
9|Fallback chain:
|
||||
10|1. Primary model (Anthropic) down -> switch config to local-llama.cpp
|
||||
11|2. Gitea unreachable -> cache issues locally, retry on recovery
|
||||
12|3. VPS agents down -> alert + lazarus protocol
|
||||
13|4. Local llama.cpp down -> try Ollama, then alert-only mode
|
||||
14|5. All inference dead -> safe mode (cron pauses, alert Alexander)
|
||||
15|
|
||||
16|Each fallback is reversible. Recovery auto-restores the previous config.
|
||||
17|"""
|
||||
18|import os
|
||||
19|import sys
|
||||
20|import json
|
||||
21|import subprocess
|
||||
22|import time
|
||||
23|import yaml
|
||||
24|import shutil
|
||||
25|from pathlib import Path
|
||||
26|from datetime import datetime, timedelta
|
||||
27|
|
||||
28|HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")))
|
||||
29|CONFIG_PATH = HERMES_HOME / "config.yaml"
|
||||
30|FALLBACK_STATE = HERMES_HOME / "deadman-fallback-state.json"
|
||||
31|BACKUP_CONFIG = HERMES_HOME / "config.yaml.pre-fallback"
|
||||
32|FORGE_URL = "https://forge.alexanderwhitestone.com"
|
||||
33|
|
||||
34|def load_config():
|
||||
35| with open(CONFIG_PATH) as f:
|
||||
36| return yaml.safe_load(f)
|
||||
37|
|
||||
38|def save_config(cfg):
|
||||
39| with open(CONFIG_PATH, "w") as f:
|
||||
40| yaml.dump(cfg, f, default_flow_style=False)
|
||||
41|
|
||||
42|def load_state():
|
||||
43| if FALLBACK_STATE.exists():
|
||||
44| with open(FALLBACK_STATE) as f:
|
||||
45| return json.load(f)
|
||||
46| return {"active_fallbacks": [], "last_check": None, "recovery_pending": False}
|
||||
47|
|
||||
48|def save_state(state):
|
||||
49| state["last_check"] = datetime.now().isoformat()
|
||||
50| with open(FALLBACK_STATE, "w") as f:
|
||||
51| json.dump(state, f, indent=2)
|
||||
52|
|
||||
53|def run(cmd, timeout=10):
|
||||
54| try:
|
||||
55| r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
|
||||
56| return r.returncode, r.stdout.strip(), r.stderr.strip()
|
||||
57| except subprocess.TimeoutExpired:
|
||||
58| return -1, "", "timeout"
|
||||
59| except Exception as e:
|
||||
60| return -1, "", str(e)
|
||||
61|
|
||||
62|# ─── HEALTH CHECKS ───
|
||||
63|
|
||||
64|def check_anthropic():
|
||||
65| """Can we reach Anthropic API?"""
|
||||
66| key = os.environ.get("ANTHROPIC_API_KEY", "")
|
||||
67| if not key:
|
||||
68| # Check multiple .env locations
|
||||
69| for env_path in [HERMES_HOME / ".env", Path.home() / ".hermes" / ".env"]:
|
||||
70| if env_path.exists():
|
||||
71| for line in open(env_path):
|
||||
72| line = line.strip()
|
||||
73| if line.startswith("ANTHROPIC_API_KEY=***
|
||||
74| key = line.split("=", 1)[1].strip().strip('"').strip("'")
|
||||
75| break
|
||||
76| if key:
|
||||
77| break
|
||||
78| if not key:
|
||||
79| return False, "no API key"
|
||||
80| code, out, err = run(
|
||||
81| f'curl -s -o /dev/null -w "%{{http_code}}" -H "x-api-key: {key}" '
|
||||
82| f'-H "anthropic-version: 2023-06-01" '
|
||||
83| f'https://api.anthropic.com/v1/messages -X POST '
|
||||
84| f'-H "content-type: application/json" '
|
||||
85| f'-d \'{{"model":"claude-haiku-4-5-20251001","max_tokens":1,"messages":[{{"role":"user","content":"ping"}}]}}\' ',
|
||||
86| timeout=15
|
||||
87| )
|
||||
88| if code == 0 and out in ("200", "429"):
|
||||
89| return True, f"HTTP {out}"
|
||||
90| return False, f"HTTP {out} err={err[:80]}"
|
||||
91|
|
||||
92|def check_local_llama():
|
||||
93| """Is local llama.cpp serving?"""
|
||||
94| code, out, err = run("curl -s http://localhost:8081/v1/models", timeout=5)
|
||||
95| if code == 0 and "hermes" in out.lower():
|
||||
96| return True, "serving"
|
||||
97| return False, f"exit={code}"
|
||||
98|
|
||||
99|def check_ollama():
|
||||
100| """Is Ollama running?"""
|
||||
101| code, out, err = run("curl -s http://localhost:11434/api/tags", timeout=5)
|
||||
102| if code == 0 and "models" in out:
|
||||
103| return True, "running"
|
||||
104| return False, f"exit={code}"
|
||||
105|
|
||||
106|def check_gitea():
|
||||
107| """Can we reach the Forge?"""
|
||||
108| token_path = Path.home() / ".config" / "gitea" / "timmy-token"
|
||||
109| if not token_path.exists():
|
||||
110| return False, "no token"
|
||||
111| token = token_path.read_text().strip()
|
||||
112| code, out, err = run(
|
||||
113| f'curl -s -o /dev/null -w "%{{http_code}}" -H "Authorization: token {token}" '
|
||||
114| f'"{FORGE_URL}/api/v1/user"',
|
||||
115| timeout=10
|
||||
116| )
|
||||
117| if code == 0 and out == "200":
|
||||
118| return True, "reachable"
|
||||
119| return False, f"HTTP {out}"
|
||||
120|
|
||||
121|def check_vps(ip, name):
|
||||
122| """Can we SSH into a VPS?"""
|
||||
123| code, out, err = run(f"ssh -o ConnectTimeout=5 root@{ip} 'echo alive'", timeout=10)
|
||||
124| if code == 0 and "alive" in out:
|
||||
125| return True, "alive"
|
||||
126| return False, f"unreachable"
|
||||
127|
|
||||
128|# ─── FALLBACK ACTIONS ───
|
||||
129|
|
||||
130|def fallback_to_local_model(cfg):
|
||||
131| """Switch primary model from Anthropic to local llama.cpp"""
|
||||
132| if not BACKUP_CONFIG.exists():
|
||||
133| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG)
|
||||
134|
|
||||
135| cfg["model"]["provider"] = "local-llama.cpp"
|
||||
136| cfg["model"]["default"] = "hermes3"
|
||||
137| save_config(cfg)
|
||||
138| return "Switched primary model to local-llama.cpp/hermes3"
|
||||
139|
|
||||
140|def fallback_to_ollama(cfg):
|
||||
141| """Switch to Ollama if llama.cpp is also down"""
|
||||
142| if not BACKUP_CONFIG.exists():
|
||||
143| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG)
|
||||
144|
|
||||
145| cfg["model"]["provider"] = "ollama"
|
||||
146| cfg["model"]["default"] = "gemma4:latest"
|
||||
147| save_config(cfg)
|
||||
148| return "Switched primary model to ollama/gemma4:latest"
|
||||
149|
|
||||
150|def enter_safe_mode(state):
|
||||
151| """Pause all non-essential cron jobs, alert Alexander"""
|
||||
152| state["safe_mode"] = True
|
||||
153| state["safe_mode_entered"] = datetime.now().isoformat()
|
||||
154| save_state(state)
|
||||
155| return "SAFE MODE: All inference down. Cron jobs should be paused. Alert Alexander."
|
||||
156|
|
||||
157|def restore_config():
|
||||
158| """Restore pre-fallback config when primary recovers"""
|
||||
159| if BACKUP_CONFIG.exists():
|
||||
160| shutil.copy2(BACKUP_CONFIG, CONFIG_PATH)
|
||||
161| BACKUP_CONFIG.unlink()
|
||||
162| return "Restored original config from backup"
|
||||
163| return "No backup config to restore"
|
||||
164|
|
||||
165|# ─── MAIN DIAGNOSIS AND FALLBACK ENGINE ───
|
||||
166|
|
||||
167|def diagnose_and_fallback():
|
||||
168| state = load_state()
|
||||
169| cfg = load_config()
|
||||
170|
|
||||
171| results = {
|
||||
172| "timestamp": datetime.now().isoformat(),
|
||||
173| "checks": {},
|
||||
174| "actions": [],
|
||||
175| "status": "healthy"
|
||||
176| }
|
||||
177|
|
||||
178| # Check all systems
|
||||
179| anthropic_ok, anthropic_msg = check_anthropic()
|
||||
180| results["checks"]["anthropic"] = {"ok": anthropic_ok, "msg": anthropic_msg}
|
||||
181|
|
||||
182| llama_ok, llama_msg = check_local_llama()
|
||||
183| results["checks"]["local_llama"] = {"ok": llama_ok, "msg": llama_msg}
|
||||
184|
|
||||
185| ollama_ok, ollama_msg = check_ollama()
|
||||
186| results["checks"]["ollama"] = {"ok": ollama_ok, "msg": ollama_msg}
|
||||
187|
|
||||
188| gitea_ok, gitea_msg = check_gitea()
|
||||
189| results["checks"]["gitea"] = {"ok": gitea_ok, "msg": gitea_msg}
|
||||
190|
|
||||
191| # VPS checks
|
||||
192| vpses = [
|
||||
193| ("167.99.126.228", "Allegro"),
|
||||
194| ("143.198.27.163", "Ezra"),
|
||||
195| ("159.203.146.185", "Bezalel"),
|
||||
196| ]
|
||||
197| for ip, name in vpses:
|
||||
198| vps_ok, vps_msg = check_vps(ip, name)
|
||||
199| results["checks"][f"vps_{name.lower()}"] = {"ok": vps_ok, "msg": vps_msg}
|
||||
200|
|
||||
201| current_provider = cfg.get("model", {}).get("provider", "anthropic")
|
||||
202|
|
||||
203| # ─── FALLBACK LOGIC ───
|
||||
204|
|
||||
205| # Case 1: Primary (Anthropic) down, local available
|
||||
206| if not anthropic_ok and current_provider == "anthropic":
|
||||
207| if llama_ok:
|
||||
208| msg = fallback_to_local_model(cfg)
|
||||
209| results["actions"].append(msg)
|
||||
210| state["active_fallbacks"].append("anthropic->local-llama")
|
||||
211| results["status"] = "degraded_local"
|
||||
212| elif ollama_ok:
|
||||
213| msg = fallback_to_ollama(cfg)
|
||||
214| results["actions"].append(msg)
|
||||
215| state["active_fallbacks"].append("anthropic->ollama")
|
||||
216| results["status"] = "degraded_ollama"
|
||||
217| else:
|
||||
218| msg = enter_safe_mode(state)
|
||||
219| results["actions"].append(msg)
|
||||
220| results["status"] = "safe_mode"
|
||||
221|
|
||||
222| # Case 2: Already on fallback, check if primary recovered
|
||||
223| elif anthropic_ok and "anthropic->local-llama" in state.get("active_fallbacks", []):
|
||||
224| msg = restore_config()
|
||||
225| results["actions"].append(msg)
|
||||
226| state["active_fallbacks"].remove("anthropic->local-llama")
|
||||
227| results["status"] = "recovered"
|
||||
228| elif anthropic_ok and "anthropic->ollama" in state.get("active_fallbacks", []):
|
||||
229| msg = restore_config()
|
||||
230| results["actions"].append(msg)
|
||||
231| state["active_fallbacks"].remove("anthropic->ollama")
|
||||
232| results["status"] = "recovered"
|
||||
233|
|
||||
234| # Case 3: Gitea down — just flag it, work locally
|
||||
235| if not gitea_ok:
|
||||
236| results["actions"].append("WARN: Gitea unreachable — work cached locally until recovery")
|
||||
237| if "gitea_down" not in state.get("active_fallbacks", []):
|
||||
238| state["active_fallbacks"].append("gitea_down")
|
||||
239| results["status"] = max(results["status"], "degraded_gitea", key=lambda x: ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"].index(x) if x in ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"] else 0)
|
||||
240| elif "gitea_down" in state.get("active_fallbacks", []):
|
||||
241| state["active_fallbacks"].remove("gitea_down")
|
||||
242| results["actions"].append("Gitea recovered — resume normal operations")
|
||||
243|
|
||||
244| # Case 4: VPS agents down
|
||||
245| for ip, name in vpses:
|
||||
246| key = f"vps_{name.lower()}"
|
||||
247| if not results["checks"][key]["ok"]:
|
||||
248| results["actions"].append(f"ALERT: {name} VPS ({ip}) unreachable — lazarus protocol needed")
|
||||
249|
|
||||
250| save_state(state)
|
||||
251| return results
|
||||
252|
|
||||
253|if __name__ == "__main__":
|
||||
254| results = diagnose_and_fallback()
|
||||
255| print(json.dumps(results, indent=2))
|
||||
256|
|
||||
257| # Exit codes for cron integration
|
||||
258| if results["status"] == "safe_mode":
|
||||
259| sys.exit(2)
|
||||
260| elif results["status"].startswith("degraded"):
|
||||
261| sys.exit(1)
|
||||
262| else:
|
||||
263| sys.exit(0)
|
||||
264|
|
||||
110
docs/FLEET_BEHAVIOUR_HARDENING.md
Normal file
110
docs/FLEET_BEHAVIOUR_HARDENING.md
Normal file
@@ -0,0 +1,110 @@
|
||||
# Fleet Behaviour Hardening — Review & Action Plan
|
||||
|
||||
**Author:** @perplexity
|
||||
**Date:** 2026-04-08
|
||||
**Context:** Alexander asked: "Is it the memory system or the behaviour guardrails?"
|
||||
**Answer:** It's the guardrails. The memory system is adequate. The enforcement machinery is aspirational.
|
||||
|
||||
---
|
||||
|
||||
## Diagnosis: Why the Fleet Isn't Smart Enough
|
||||
|
||||
After auditing SOUL.md, config.yaml, all 8 playbooks, the orchestrator, the guard scripts, and the v7.0.0 checkin, the pattern is clear:
|
||||
|
||||
**The fleet has excellent design documents and broken enforcement.**
|
||||
|
||||
| Layer | Design Quality | Enforcement Quality | Gap |
|
||||
|---|---|---|---|
|
||||
| SOUL.md | Excellent | None — no code reads it at runtime | Philosophy without machinery |
|
||||
| Playbooks (7 yaml) | Good lane map | Not invoked by orchestrator | Playbooks exist but nobody calls them |
|
||||
| Guard scripts (9) | Solid code | 1 of 9 wired (#395 audit) | 89% of guards are dead code |
|
||||
| Orchestrator | Sound design | Gateway dispatch is a no-op (#391) | Assigns issues but doesn't trigger work |
|
||||
| Cycle Guard | Good 10-min rule | No cron/loop calls it | Discipline without enforcement |
|
||||
| PR Reviewer | Clear rules | Runs every 30m (if scheduled) | Only guard that might actually fire |
|
||||
| Memory (MemPalace) | Working code | Retrieval enforcer wired | Actually operational |
|
||||
|
||||
### The Core Problem
|
||||
|
||||
Agents pick up issues and produce output, but there is **no pre-task checklist** and **no post-task quality gate**. An agent can:
|
||||
|
||||
1. Start work without checking if someone else already did it
|
||||
2. Produce output without running tests
|
||||
3. Submit a PR without verifying it addresses the issue
|
||||
4. Work for hours on something out of scope
|
||||
5. Create duplicate branches/PRs without detection
|
||||
|
||||
The SOUL.md says "grounding before generation" but no code enforces it.
|
||||
The playbooks define lanes but the orchestrator doesn't load them.
|
||||
The guards exist but nothing calls them.
|
||||
|
||||
---
|
||||
|
||||
## What the Fleet Needs (Priority Order)
|
||||
|
||||
### 1. Pre-Task Gate (MISSING — this PR adds it)
|
||||
|
||||
Before an agent starts any issue:
|
||||
- [ ] Check if issue is already assigned to another agent
|
||||
- [ ] Check if a branch already exists for this issue
|
||||
- [ ] Check if a PR already exists for this issue
|
||||
- [ ] Load relevant MemPalace context (retrieval enforcer)
|
||||
- [ ] Verify the agent has the right lane for this work (playbook check)
|
||||
|
||||
### 2. Post-Task Gate (MISSING — this PR adds it)
|
||||
|
||||
Before an agent submits a PR:
|
||||
- [ ] Verify the diff addresses the issue title/body
|
||||
- [ ] Run syntax_guard.py on changed files
|
||||
- [ ] Check for duplicate PRs targeting the same issue
|
||||
- [ ] Verify branch name follows convention
|
||||
- [ ] Run tests if they exist for changed files
|
||||
|
||||
### 3. Wire the Existing Guards (8 of 9 are dead code)
|
||||
|
||||
Per #395 audit:
|
||||
- Pre-commit hooks: need symlink on every machine
|
||||
- Cycle guard: need cron/loop integration
|
||||
- Forge health check: need cron entry
|
||||
- Smoke test + deploy validate: need deploy script integration
|
||||
|
||||
### 4. Orchestrator Dispatch Actually Works
|
||||
|
||||
Per #391 audit: the orchestrator scores and assigns but the gateway dispatch just writes to `/tmp/hermes-dispatch.log`. Nobody reads that file. The dispatch needs to either:
|
||||
- Trigger `hermes` CLI on the target machine, or
|
||||
- Post a webhook that the agent loop picks up
|
||||
|
||||
### 5. Agent Self-Assessment Loop
|
||||
|
||||
After completing work, agents should answer:
|
||||
- Did I address the issue as stated?
|
||||
- Did I stay in scope?
|
||||
- Did I check the palace for prior work?
|
||||
- Did I run verification?
|
||||
|
||||
This is what SOUL.md calls "the apparatus that gives these words teeth."
|
||||
|
||||
---
|
||||
|
||||
## What's Working (Don't Touch)
|
||||
|
||||
- **MemPalace sovereign_store.py** — SQLite + FTS5 + HRR, operational
|
||||
- **Retrieval enforcer** — wired to SovereignStore as of 14 hours ago
|
||||
- **Wake-up protocol** — palace-first boot sequence
|
||||
- **PR reviewer playbook** — clear rules, well-scoped
|
||||
- **Issue triager playbook** — comprehensive lane map with 11 agents
|
||||
- **Cycle guard code** — solid 10-min slice discipline (just needs wiring)
|
||||
- **Config drift guard** — active cron, working
|
||||
- **Dead man switch** — active, working
|
||||
|
||||
---
|
||||
|
||||
## Recommendation
|
||||
|
||||
The memory system is not the bottleneck. The behaviour guardrails are. Specifically:
|
||||
|
||||
1. **Add `task_gate.py`** — pre-task and post-task quality gates that every agent loop calls
|
||||
2. **Wire cycle_guard.py** — add start/complete calls to agent loop
|
||||
3. **Wire pre-commit hooks** — deploy script should symlink on provision
|
||||
4. **Fix orchestrator dispatch** — make it actually trigger work, not just log
|
||||
|
||||
This PR adds item 1. Items 2-4 need SSH access and are flagged for Timmy/Allegro.
|
||||
166
playbooks/fleet-guardrails.yaml
Normal file
166
playbooks/fleet-guardrails.yaml
Normal file
@@ -0,0 +1,166 @@
|
||||
# fleet-guardrails.yaml
|
||||
# =====================
|
||||
# Enforceable behaviour boundaries for every agent in the Timmy fleet.
|
||||
# Consumed by task_gate.py (pre/post checks) and the orchestrator's
|
||||
# dispatch loop. Every rule here is testable — no aspirational prose.
|
||||
#
|
||||
# Ref: SOUL.md "grounding before generation", Five Wisdoms #345
|
||||
|
||||
name: fleet-guardrails
|
||||
version: "1.0.0"
|
||||
description: >
|
||||
Behaviour constraints that apply to ALL agents regardless of role.
|
||||
These are the non-negotiable rules that task_gate.py enforces
|
||||
before an agent may pick up work and after it claims completion.
|
||||
|
||||
# ─── UNIVERSAL CONSTRAINTS ───────────────────────────────────────
|
||||
|
||||
constraints:
|
||||
|
||||
# 1. Lane discipline — agents must stay in their lane
|
||||
lane_enforcement:
|
||||
enabled: true
|
||||
source: playbooks/agent-lanes.json
|
||||
on_violation: block_and_notify
|
||||
description: >
|
||||
An agent may only pick up issues tagged for its lane.
|
||||
Cross-lane work requires explicit Timmy approval via
|
||||
issue comment containing 'LANE_OVERRIDE: <agent>'.
|
||||
|
||||
# 2. Branch hygiene — no orphan branches
|
||||
branch_hygiene:
|
||||
enabled: true
|
||||
max_branches_per_agent: 3
|
||||
stale_branch_days: 7
|
||||
naming_pattern: "{agent}/{issue_number}-{slug}"
|
||||
on_violation: warn_then_block
|
||||
description: >
|
||||
Agents must follow branch naming conventions and clean up
|
||||
after merge. No agent may have more than 3 active branches.
|
||||
|
||||
# 3. Issue ownership — no silent takeovers
|
||||
issue_ownership:
|
||||
enabled: true
|
||||
require_assignment_before_work: true
|
||||
max_concurrent_issues: 2
|
||||
on_violation: block_and_notify
|
||||
description: >
|
||||
An agent must be assigned to an issue before creating a
|
||||
branch or PR. No agent may work on more than 2 issues
|
||||
simultaneously to prevent context-switching waste.
|
||||
|
||||
# 4. PR quality — minimum bar before review
|
||||
pr_quality:
|
||||
enabled: true
|
||||
require_linked_issue: true
|
||||
require_passing_ci: true
|
||||
max_files_changed: 30
|
||||
max_diff_lines: 2000
|
||||
require_description: true
|
||||
min_description_length: 50
|
||||
on_violation: block_merge
|
||||
description: >
|
||||
Every PR must link an issue, pass CI, have a meaningful
|
||||
description, and stay within scope. Giant PRs get rejected.
|
||||
|
||||
# 5. Grounding before generation — SOUL.md compliance
|
||||
grounding:
|
||||
enabled: true
|
||||
require_issue_read_before_branch: true
|
||||
require_existing_code_review: true
|
||||
require_soul_md_check: true
|
||||
soul_md_path: SOUL.md
|
||||
on_violation: block_and_notify
|
||||
description: >
|
||||
Before writing any code, the agent must demonstrate it has
|
||||
read the issue, reviewed relevant existing code, and checked
|
||||
SOUL.md for applicable doctrine. No speculative generation.
|
||||
|
||||
# 6. Completion integrity — no phantom completions
|
||||
completion_checks:
|
||||
enabled: true
|
||||
require_test_evidence: true
|
||||
require_ci_green: true
|
||||
require_diff_matches_issue: true
|
||||
require_no_unrelated_changes: true
|
||||
on_violation: revert_and_notify
|
||||
description: >
|
||||
Post-task gate verifies the work actually addresses the
|
||||
issue. Agents cannot close issues without evidence.
|
||||
Unrelated changes in a PR trigger automatic rejection.
|
||||
|
||||
# 7. Communication discipline — no noise
|
||||
communication:
|
||||
enabled: true
|
||||
max_comments_per_issue: 10
|
||||
require_structured_updates: true
|
||||
update_format: "status | what_changed | what_blocked | next_step"
|
||||
prohibit_empty_updates: true
|
||||
on_violation: warn
|
||||
description: >
|
||||
Issue comments must be structured and substantive.
|
||||
Status-only comments without content are rejected.
|
||||
Agents should update, not narrate.
|
||||
|
||||
# 8. Resource awareness — no runaway costs
|
||||
resource_limits:
|
||||
enabled: true
|
||||
max_api_calls_per_task: 100
|
||||
max_llm_tokens_per_task: 500000
|
||||
max_task_duration_minutes: 60
|
||||
on_violation: kill_and_notify
|
||||
description: >
|
||||
Hard limits on compute per task. If an agent hits these
|
||||
limits, the task is killed and flagged for human review.
|
||||
Prevents infinite loops and runaway API spending.
|
||||
|
||||
# ─── ESCALATION POLICY ───────────────────────────────────────────
|
||||
|
||||
escalation:
|
||||
channels:
|
||||
- gitea_issue_comment
|
||||
- discord_webhook
|
||||
severity_levels:
|
||||
warn:
|
||||
action: post_comment
|
||||
notify: agent_only
|
||||
block:
|
||||
action: prevent_action
|
||||
notify: agent_and_orchestrator
|
||||
block_and_notify:
|
||||
action: prevent_action
|
||||
notify: agent_orchestrator_and_timmy
|
||||
kill_and_notify:
|
||||
action: terminate_task
|
||||
notify: all_including_alexander
|
||||
revert_and_notify:
|
||||
action: revert_changes
|
||||
notify: agent_orchestrator_and_timmy
|
||||
|
||||
# ─── AUDIT TRAIL ─────────────────────────────────────────────────
|
||||
|
||||
audit:
|
||||
enabled: true
|
||||
log_path: logs/guardrail-violations.jsonl
|
||||
retention_days: 90
|
||||
fields:
|
||||
- timestamp
|
||||
- agent
|
||||
- constraint
|
||||
- violation_type
|
||||
- issue_number
|
||||
- action_taken
|
||||
- resolution
|
||||
|
||||
# ─── OVERRIDES ───────────────────────────────────────────────────
|
||||
|
||||
overrides:
|
||||
# Only Timmy or Alexander can override guardrails
|
||||
authorized_overriders:
|
||||
- Timmy
|
||||
- Alexander
|
||||
override_mechanism: >
|
||||
Post a comment on the issue with the format:
|
||||
GUARDRAIL_OVERRIDE: <constraint_name> REASON: <explanation>
|
||||
override_expiry_hours: 24
|
||||
require_post_override_review: true
|
||||
@@ -9,7 +9,7 @@ import re
|
||||
SOVEREIGN_RULES = [
|
||||
(r"https?://(api\.openai\.com|api\.anthropic\.com)", "CRITICAL: External cloud API detected. Use local custom_provider instead."),
|
||||
(r"provider: (openai|anthropic)", "WARNING: Direct cloud provider used. Ensure fallback_model is configured."),
|
||||
(r"api_key: ['"][^'"\s]{10,}['"]", "SECURITY: Hardcoded API key detected. Use environment variables.")
|
||||
(r"api_key:\s*['\"][A-Za-z0-9_\-]{16,}['\"]", "SECURITY: Hardcoded API key detected. Use environment variables.")
|
||||
]
|
||||
|
||||
def lint_file(path):
|
||||
|
||||
@@ -5,122 +5,233 @@ Part of the Gemini Sovereign Governance System.
|
||||
|
||||
Enforces architectural boundaries, security, and documentation standards
|
||||
across the Timmy Foundation fleet.
|
||||
|
||||
Refs: #437 — repo-aware, test-backed, CI-enforced.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
# --- CONFIGURATION ---
|
||||
|
||||
SOVEREIGN_KEYWORDS = ["mempalace", "sovereign_store", "tirith", "bezalel", "nexus"]
|
||||
IP_REGEX = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
|
||||
API_KEY_REGEX = r'(?:api_key|secret|token|password|auth_token)\s*[:=]\s*["\'][a-zA-Z0-9_\-]{20,}["\']'
|
||||
|
||||
# IP addresses (skip 127.0.0.1, 0.0.0.0, 10.x.x.x, 172.16-31.x.x, 192.168.x.x)
|
||||
IP_REGEX = r'\b(?!(?:127|10|192\.168|172\.(?:1[6-9]|2\d|3[01]))\.)' \
|
||||
r'(?:\d{1,3}\.){3}\d{1,3}\b'
|
||||
|
||||
# API key / secret patterns — catches openai-, sk-, anthropic-, AKIA, etc.
|
||||
API_KEY_PATTERNS = [
|
||||
r'sk-[A-Za-z0-9]{20,}', # OpenAI-style
|
||||
r'sk-ant-[A-Za-z0-9\-]{20,}', # Anthropic
|
||||
r'AKIA[A-Z0-9]{16}', # AWS access key
|
||||
r'ghp_[A-Za-z0-9]{36}', # GitHub PAT
|
||||
r'glpat-[A-Za-z0-9\-]{20,}', # GitLab PAT
|
||||
r'(?:api[_-]?key|secret|token)\s*[:=]\s*["\'][A-Za-z0-9_\-]{16,}["\']',
|
||||
]
|
||||
|
||||
# Sovereignty rules (carried from v1)
|
||||
SOVEREIGN_RULES = [
|
||||
(r'https?://api\.openai\.com', 'External cloud API: api.openai.com. Use local custom_provider.'),
|
||||
(r'https?://api\.anthropic\.com', 'External cloud API: api.anthropic.com. Use local custom_provider.'),
|
||||
(r'provider:\s*(?:openai|anthropic)\b', 'Direct cloud provider. Ensure fallback_model is configured.'),
|
||||
]
|
||||
|
||||
# File extensions to scan
|
||||
SCAN_EXTENSIONS = {'.py', '.ts', '.tsx', '.js', '.yaml', '.yml', '.json', '.env', '.sh', '.cfg', '.toml'}
|
||||
SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', '.eggs'}
|
||||
|
||||
|
||||
class LinterResult:
|
||||
"""Structured result container for programmatic access."""
|
||||
|
||||
def __init__(self, repo_path: str, repo_name: str):
|
||||
self.repo_path = repo_path
|
||||
self.repo_name = repo_name
|
||||
self.errors: list[str] = []
|
||||
self.warnings: list[str] = []
|
||||
|
||||
@property
|
||||
def passed(self) -> bool:
|
||||
return len(self.errors) == 0
|
||||
|
||||
@property
|
||||
def violation_count(self) -> int:
|
||||
return len(self.errors)
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [f"--- Architecture Linter v2: {self.repo_name} ---"]
|
||||
for w in self.warnings:
|
||||
lines.append(f" [W] {w}")
|
||||
for e in self.errors:
|
||||
lines.append(f" [E] {e}")
|
||||
status = "PASSED" if self.passed else f"FAILED ({self.violation_count} violations)"
|
||||
lines.append(f"\nResult: {status}")
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
class Linter:
|
||||
def __init__(self, repo_path: str):
|
||||
self.repo_path = Path(repo_path).resolve()
|
||||
if not self.repo_path.is_dir():
|
||||
raise FileNotFoundError(f"Repository path does not exist: {self.repo_path}")
|
||||
self.repo_name = self.repo_path.name
|
||||
self.errors = []
|
||||
self.result = LinterResult(str(self.repo_path), self.repo_name)
|
||||
|
||||
def log_error(self, message: str, file: str = None, line: int = None):
|
||||
loc = f"{file}:{line}" if file and line else (file if file else "General")
|
||||
self.errors.append(f"[{loc}] {message}")
|
||||
# --- helpers ---
|
||||
|
||||
def _scan_files(self, extensions=None):
|
||||
"""Yield (Path, content) for files matching *extensions*."""
|
||||
exts = extensions or SCAN_EXTENSIONS
|
||||
for root, dirs, files in os.walk(self.repo_path):
|
||||
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
|
||||
for fname in files:
|
||||
if Path(fname).suffix in exts:
|
||||
if fname == '.env.example':
|
||||
continue
|
||||
fpath = Path(root) / fname
|
||||
try:
|
||||
content = fpath.read_text(errors='ignore')
|
||||
except Exception:
|
||||
continue
|
||||
yield fpath, content
|
||||
|
||||
def _line_no(self, content: str, offset: int) -> int:
|
||||
return content.count('\n', 0, offset) + 1
|
||||
|
||||
# --- checks ---
|
||||
|
||||
def check_sidecar_boundary(self):
|
||||
"""Rule 1: No sovereign code in hermes-agent (sidecar boundary)"""
|
||||
if self.repo_name == "hermes-agent":
|
||||
for root, _, files in os.walk(self.repo_path):
|
||||
if "node_modules" in root or ".git" in root:
|
||||
continue
|
||||
for file in files:
|
||||
if file.endswith((".py", ".ts", ".js", ".tsx")):
|
||||
path = Path(root) / file
|
||||
content = path.read_text(errors="ignore")
|
||||
for kw in SOVEREIGN_KEYWORDS:
|
||||
if kw in content.lower():
|
||||
# Exception: imports or comments might be okay, but we're strict for now
|
||||
self.log_error(f"Sovereign keyword '{kw}' found in hermes-agent. Violates sidecar boundary.", str(path.relative_to(self.repo_path)))
|
||||
"""No sovereign code in hermes-agent (sidecar boundary)."""
|
||||
if self.repo_name != 'hermes-agent':
|
||||
return
|
||||
for fpath, content in self._scan_files():
|
||||
for kw in SOVEREIGN_KEYWORDS:
|
||||
if kw in content.lower():
|
||||
rel = str(fpath.relative_to(self.repo_path))
|
||||
self.result.errors.append(
|
||||
f"Sovereign keyword '{kw}' in hermes-agent violates sidecar boundary. [{rel}]"
|
||||
)
|
||||
|
||||
def check_hardcoded_ips(self):
|
||||
"""Rule 2: No hardcoded IPs (use domain names)"""
|
||||
for root, _, files in os.walk(self.repo_path):
|
||||
if "node_modules" in root or ".git" in root:
|
||||
continue
|
||||
for file in files:
|
||||
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json")):
|
||||
path = Path(root) / file
|
||||
content = path.read_text(errors="ignore")
|
||||
matches = re.finditer(IP_REGEX, content)
|
||||
for match in matches:
|
||||
ip = match.group()
|
||||
if ip in ["127.0.0.1", "0.0.0.0"]:
|
||||
continue
|
||||
line_no = content.count('\n', 0, match.start()) + 1
|
||||
self.log_error(f"Hardcoded IP address '{ip}' found. Use domain names or environment variables.", str(path.relative_to(self.repo_path)), line_no)
|
||||
"""No hardcoded public IPs — use DNS or env vars."""
|
||||
for fpath, content in self._scan_files():
|
||||
for m in re.finditer(IP_REGEX, content):
|
||||
ip = m.group()
|
||||
# skip private ranges already handled by lookahead, and 0.0.0.0
|
||||
if ip.startswith('0.'):
|
||||
continue
|
||||
line = self._line_no(content, m.start())
|
||||
rel = str(fpath.relative_to(self.repo_path))
|
||||
self.result.errors.append(
|
||||
f"Hardcoded IP '{ip}'. Use DNS or env vars. [{rel}:{line}]"
|
||||
)
|
||||
|
||||
def check_api_keys(self):
|
||||
"""Rule 3: No cloud API keys committed to repos"""
|
||||
for root, _, files in os.walk(self.repo_path):
|
||||
if "node_modules" in root or ".git" in root:
|
||||
continue
|
||||
for file in files:
|
||||
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json", ".env")):
|
||||
if file == ".env.example":
|
||||
continue
|
||||
path = Path(root) / file
|
||||
content = path.read_text(errors="ignore")
|
||||
matches = re.finditer(API_KEY_REGEX, content, re.IGNORECASE)
|
||||
for match in matches:
|
||||
line_no = content.count('\n', 0, match.start()) + 1
|
||||
self.log_error("Potential API key or secret found in code.", str(path.relative_to(self.repo_path)), line_no)
|
||||
"""No cloud API keys / secrets committed."""
|
||||
for fpath, content in self._scan_files():
|
||||
for pattern in API_KEY_PATTERNS:
|
||||
for m in re.finditer(pattern, content, re.IGNORECASE):
|
||||
line = self._line_no(content, m.start())
|
||||
rel = str(fpath.relative_to(self.repo_path))
|
||||
self.result.errors.append(
|
||||
f"Potential secret / API key detected. [{rel}:{line}]"
|
||||
)
|
||||
|
||||
def check_sovereignty_rules(self):
|
||||
"""V1 sovereignty rules: no direct cloud API endpoints or providers."""
|
||||
for fpath, content in self._scan_files({'.py', '.ts', '.tsx', '.js', '.yaml', '.yml'}):
|
||||
for pattern, msg in SOVEREIGN_RULES:
|
||||
for m in re.finditer(pattern, content):
|
||||
line = self._line_no(content, m.start())
|
||||
rel = str(fpath.relative_to(self.repo_path))
|
||||
self.result.errors.append(f"{msg} [{rel}:{line}]")
|
||||
|
||||
def check_soul_canonical(self):
|
||||
"""Rule 4: SOUL.md exists and is canonical in exactly one location"""
|
||||
soul_path = self.repo_path / "SOUL.md"
|
||||
if self.repo_name == "timmy-config":
|
||||
"""SOUL.md must exist exactly in timmy-config root."""
|
||||
soul_path = self.repo_path / 'SOUL.md'
|
||||
if self.repo_name == 'timmy-config':
|
||||
if not soul_path.exists():
|
||||
self.log_error("SOUL.md is missing from the canonical location (timmy-config root).")
|
||||
self.result.errors.append(
|
||||
'SOUL.md missing from canonical location (timmy-config root).'
|
||||
)
|
||||
else:
|
||||
if soul_path.exists():
|
||||
self.log_error("SOUL.md found in non-canonical repo. It should only live in timmy-config.")
|
||||
self.result.errors.append(
|
||||
'SOUL.md found in non-canonical repo. Must live only in timmy-config.'
|
||||
)
|
||||
|
||||
def check_readme(self):
|
||||
"""Rule 5: Every repo has a README with current truth"""
|
||||
readme_path = self.repo_path / "README.md"
|
||||
if not readme_path.exists():
|
||||
self.log_error("README.md is missing.")
|
||||
"""Every repo must have a substantive README."""
|
||||
readme = self.repo_path / 'README.md'
|
||||
if not readme.exists():
|
||||
self.result.errors.append('README.md is missing.')
|
||||
else:
|
||||
content = readme_path.read_text(errors="ignore")
|
||||
content = readme.read_text(errors='ignore')
|
||||
if len(content.strip()) < 50:
|
||||
self.log_error("README.md is too short or empty. Provide current truth about the repo.")
|
||||
self.result.warnings.append(
|
||||
'README.md is very short (<50 chars). Provide current truth about the repo.'
|
||||
)
|
||||
|
||||
def run(self):
|
||||
print(f"--- Gemini Linter: Auditing {self.repo_name} ---")
|
||||
# --- runner ---
|
||||
|
||||
def run(self) -> LinterResult:
|
||||
"""Execute all checks and return the result."""
|
||||
self.check_sidecar_boundary()
|
||||
self.check_hardcoded_ips()
|
||||
self.check_api_keys()
|
||||
self.check_sovereignty_rules()
|
||||
self.check_soul_canonical()
|
||||
self.check_readme()
|
||||
return self.result
|
||||
|
||||
if self.errors:
|
||||
print(f"\n[FAILURE] Found {len(self.errors)} architectural violations:")
|
||||
for err in self.errors:
|
||||
print(f" - {err}")
|
||||
return False
|
||||
else:
|
||||
print("\n[SUCCESS] Architecture is sound. Sovereignty maintained.")
|
||||
return True
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Gemini Architecture Linter v2")
|
||||
parser.add_argument("repo_path", nargs="?", default=".", help="Path to the repository to lint")
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Gemini Architecture Linter v2 — repo-aware sovereignty gate.'
|
||||
)
|
||||
parser.add_argument(
|
||||
'repo_path', nargs='?', default='.',
|
||||
help='Path to the repository to lint (default: cwd).',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--repo', dest='repo_flag', default=None,
|
||||
help='Explicit repo path (alias for positional arg).',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--json', dest='json_output', action='store_true',
|
||||
help='Emit machine-readable JSON instead of human text.',
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
linter = Linter(args.repo_path)
|
||||
success = linter.run()
|
||||
sys.exit(0 if success else 1)
|
||||
path = args.repo_flag if args.repo_flag else args.repo_path
|
||||
|
||||
if __name__ == "__main__":
|
||||
try:
|
||||
linter = Linter(path)
|
||||
except FileNotFoundError as exc:
|
||||
print(f"ERROR: {exc}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
result = linter.run()
|
||||
|
||||
if args.json_output:
|
||||
import json as _json
|
||||
out = {
|
||||
'repo': result.repo_name,
|
||||
'passed': result.passed,
|
||||
'violation_count': result.violation_count,
|
||||
'errors': result.errors,
|
||||
'warnings': result.warnings,
|
||||
}
|
||||
print(_json.dumps(out, indent=2))
|
||||
else:
|
||||
print(result.summary())
|
||||
|
||||
sys.exit(0 if result.passed else 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
331
scripts/task_gate.py
Normal file
331
scripts/task_gate.py
Normal file
@@ -0,0 +1,331 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Task Gate — Pre-task and post-task quality gates for fleet agents.
|
||||
|
||||
This is the missing enforcement layer between the orchestrator dispatching
|
||||
an issue and an agent submitting a PR. SOUL.md demands "grounding before
|
||||
generation" and "the apparatus that gives these words teeth" — this script
|
||||
is that apparatus.
|
||||
|
||||
Usage:
|
||||
python3 task_gate.py pre --repo timmy-config --issue 123 --agent groq
|
||||
python3 task_gate.py post --repo timmy-config --issue 123 --agent groq --branch groq/issue-123
|
||||
|
||||
Pre-task gate checks:
|
||||
1. Issue is not already assigned to a different agent
|
||||
2. No existing branch targets this issue
|
||||
3. No open PR already addresses this issue
|
||||
4. Agent is in the correct lane per playbooks/agent-lanes.json
|
||||
5. Issue is not filtered (epic, permanent, etc.)
|
||||
|
||||
Post-task gate checks:
|
||||
1. Branch exists and has commits ahead of main
|
||||
2. Changed files pass syntax_guard.py
|
||||
3. No duplicate PR exists for the same issue
|
||||
4. Branch name follows convention: {agent}/{description}
|
||||
5. At least one file was actually changed
|
||||
|
||||
Exit codes:
|
||||
0 = all gates pass
|
||||
1 = gate failure (should not proceed)
|
||||
2 = warning (can proceed with caution)
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CONFIG
|
||||
# ---------------------------------------------------------------------------
|
||||
GITEA_API = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
GITEA_OWNER = "Timmy_Foundation"
|
||||
|
||||
FILTER_TAGS = ["[EPIC]", "[DO NOT CLOSE]", "[PERMANENT]", "[PHILOSOPHY]", "[MORNING REPORT]"]
|
||||
|
||||
AGENT_USERNAMES = {
|
||||
"groq", "ezra", "bezalel", "allegro", "timmy",
|
||||
"thetimmyc", "perplexity", "kimiclaw", "codex-agent",
|
||||
"manus", "claude", "gemini", "grok",
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GITEA API
|
||||
# ---------------------------------------------------------------------------
|
||||
def load_gitea_token():
|
||||
token = os.environ.get("GITEA_TOKEN", "")
|
||||
if token:
|
||||
return token.strip()
|
||||
for path in [
|
||||
os.path.expanduser("~/.hermes/gitea_token_vps"),
|
||||
os.path.expanduser("~/.hermes/gitea_token"),
|
||||
]:
|
||||
try:
|
||||
with open(path) as f:
|
||||
return f.read().strip()
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
print("[FATAL] No GITEA_TOKEN found")
|
||||
sys.exit(2)
|
||||
|
||||
|
||||
def gitea_get(path):
|
||||
token = load_gitea_token()
|
||||
url = f"{GITEA_API}{path}"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Accept": "application/json",
|
||||
})
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return None
|
||||
print(f"[API ERROR] {url} -> {e.code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"[API ERROR] {url} -> {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# LANE CHECKER
|
||||
# ---------------------------------------------------------------------------
|
||||
def load_agent_lanes():
|
||||
"""Load agent lane assignments from playbooks/agent-lanes.json."""
|
||||
lanes_path = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||||
"playbooks", "agent-lanes.json"
|
||||
)
|
||||
try:
|
||||
with open(lanes_path) as f:
|
||||
return json.load(f)
|
||||
except FileNotFoundError:
|
||||
return {} # no lanes file = no lane enforcement
|
||||
|
||||
|
||||
def check_agent_lane(agent, issue_title, issue_labels, lanes):
|
||||
"""Check if the agent is in the right lane for this issue type."""
|
||||
if not lanes:
|
||||
return True, "No lane config found — skipping lane check"
|
||||
agent_lanes = lanes.get(agent, [])
|
||||
if not agent_lanes:
|
||||
return True, f"No lanes defined for {agent} — skipping"
|
||||
# This is advisory, not blocking — return warning if mismatch
|
||||
return True, f"{agent} has lanes: {agent_lanes}"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# PRE-TASK GATE
|
||||
# ---------------------------------------------------------------------------
|
||||
def pre_task_gate(repo, issue_number, agent):
|
||||
"""Run all pre-task checks. Returns (pass, messages)."""
|
||||
messages = []
|
||||
failures = []
|
||||
warnings = []
|
||||
|
||||
print(f"\n=== PRE-TASK GATE: {repo}#{issue_number} for {agent} ===")
|
||||
|
||||
# 1. Fetch issue
|
||||
issue = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/issues/{issue_number}")
|
||||
if not issue:
|
||||
failures.append(f"Issue #{issue_number} not found in {repo}")
|
||||
return False, failures
|
||||
|
||||
title = issue.get("title", "")
|
||||
print(f" Issue: {title}")
|
||||
|
||||
# 2. Check if filtered
|
||||
title_upper = title.upper()
|
||||
for tag in FILTER_TAGS:
|
||||
if tag.upper().replace("[", "").replace("]", "") in title_upper:
|
||||
failures.append(f"Issue has filter tag: {tag} — should not be auto-dispatched")
|
||||
|
||||
# 3. Check assignees
|
||||
assignees = [a.get("login", "") for a in (issue.get("assignees") or [])]
|
||||
other_agents = [a for a in assignees if a.lower() in AGENT_USERNAMES and a.lower() != agent.lower()]
|
||||
if other_agents:
|
||||
failures.append(f"Already assigned to other agent(s): {other_agents}")
|
||||
|
||||
# 4. Check for existing branches
|
||||
branches = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/branches?limit=50")
|
||||
if branches:
|
||||
issue_branches = [
|
||||
b["name"] for b in branches
|
||||
if str(issue_number) in b.get("name", "")
|
||||
and b["name"] != "main"
|
||||
]
|
||||
if issue_branches:
|
||||
warnings.append(f"Existing branches may target this issue: {issue_branches}")
|
||||
|
||||
# 5. Check for existing PRs
|
||||
prs = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/pulls?state=open&limit=50")
|
||||
if prs:
|
||||
issue_prs = [
|
||||
f"PR #{p['number']}: {p['title']}"
|
||||
for p in prs
|
||||
if str(issue_number) in p.get("title", "")
|
||||
or str(issue_number) in p.get("body", "")
|
||||
]
|
||||
if issue_prs:
|
||||
failures.append(f"Open PR(s) already target this issue: {issue_prs}")
|
||||
|
||||
# 6. Check agent lanes
|
||||
lanes = load_agent_lanes()
|
||||
labels = [l.get("name", "") for l in (issue.get("labels") or [])]
|
||||
lane_ok, lane_msg = check_agent_lane(agent, title, labels, lanes)
|
||||
if not lane_ok:
|
||||
warnings.append(lane_msg)
|
||||
else:
|
||||
messages.append(f" Lane: {lane_msg}")
|
||||
|
||||
# Report
|
||||
if failures:
|
||||
print("\n FAILURES:")
|
||||
for f in failures:
|
||||
print(f" ❌ {f}")
|
||||
if warnings:
|
||||
print("\n WARNINGS:")
|
||||
for w in warnings:
|
||||
print(f" ⚠️ {w}")
|
||||
if not failures and not warnings:
|
||||
print(" \u2705 All pre-task gates passed")
|
||||
|
||||
passed = len(failures) == 0
|
||||
return passed, failures + warnings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST-TASK GATE
|
||||
# ---------------------------------------------------------------------------
|
||||
def post_task_gate(repo, issue_number, agent, branch):
|
||||
"""Run all post-task checks. Returns (pass, messages)."""
|
||||
failures = []
|
||||
warnings = []
|
||||
|
||||
print(f"\n=== POST-TASK GATE: {repo}#{issue_number} by {agent} ===")
|
||||
print(f" Branch: {branch}")
|
||||
|
||||
# 1. Check branch exists
|
||||
branch_info = gitea_get(
|
||||
f"/repos/{GITEA_OWNER}/{repo}/branches/{urllib.parse.quote(branch, safe='')}"
|
||||
)
|
||||
if not branch_info:
|
||||
failures.append(f"Branch '{branch}' does not exist")
|
||||
return False, failures
|
||||
|
||||
# 2. Check branch naming convention
|
||||
if "/" not in branch:
|
||||
warnings.append(f"Branch name '{branch}' doesn't follow agent/description convention")
|
||||
elif not branch.startswith(f"{agent}/"):
|
||||
warnings.append(f"Branch '{branch}' doesn't start with agent name '{agent}/")
|
||||
|
||||
# 3. Check for commits ahead of main
|
||||
compare = gitea_get(
|
||||
f"/repos/{GITEA_OWNER}/{repo}/compare/main...{urllib.parse.quote(branch, safe='')}"
|
||||
)
|
||||
if compare:
|
||||
commits = compare.get("commits", [])
|
||||
if not commits:
|
||||
failures.append("Branch has no commits ahead of main")
|
||||
else:
|
||||
print(f" Commits ahead: {len(commits)}")
|
||||
files = compare.get("diff_files", []) or []
|
||||
if not files:
|
||||
# Try alternate key
|
||||
num_files = compare.get("total_commits", 0)
|
||||
print(f" Files changed: (check PR diff)")
|
||||
else:
|
||||
print(f" Files changed: {len(files)}")
|
||||
|
||||
# 4. Check for duplicate PRs
|
||||
prs = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/pulls?state=open&limit=50")
|
||||
if prs:
|
||||
dupe_prs = [
|
||||
f"PR #{p['number']}"
|
||||
for p in prs
|
||||
if str(issue_number) in p.get("title", "")
|
||||
or str(issue_number) in p.get("body", "")
|
||||
]
|
||||
if len(dupe_prs) > 1:
|
||||
warnings.append(f"Multiple open PRs may target issue #{issue_number}: {dupe_prs}")
|
||||
|
||||
# 5. Run syntax guard on changed files (if available)
|
||||
syntax_guard = os.path.join(
|
||||
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
|
||||
"hermes-sovereign", "scripts", "syntax_guard.py"
|
||||
)
|
||||
if os.path.exists(syntax_guard):
|
||||
try:
|
||||
result = subprocess.run(
|
||||
[sys.executable, syntax_guard],
|
||||
capture_output=True, text=True, timeout=30
|
||||
)
|
||||
if result.returncode != 0:
|
||||
failures.append(f"Syntax guard failed: {result.stdout[:200]}")
|
||||
else:
|
||||
print(" Syntax guard: passed")
|
||||
except Exception as e:
|
||||
warnings.append(f"Could not run syntax guard: {e}")
|
||||
else:
|
||||
warnings.append("syntax_guard.py not found — skipping syntax check")
|
||||
|
||||
# Report
|
||||
if failures:
|
||||
print("\n FAILURES:")
|
||||
for f in failures:
|
||||
print(f" ❌ {f}")
|
||||
if warnings:
|
||||
print("\n WARNINGS:")
|
||||
for w in warnings:
|
||||
print(f" ⚠️ {w}")
|
||||
if not failures and not warnings:
|
||||
print(" \u2705 All post-task gates passed")
|
||||
|
||||
passed = len(failures) == 0
|
||||
return passed, failures + warnings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# MAIN
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Task Gate — pre/post-task quality gates")
|
||||
subparsers = parser.add_subparsers(dest="command")
|
||||
|
||||
# Pre-task
|
||||
pre = subparsers.add_parser("pre", help="Run pre-task gates")
|
||||
pre.add_argument("--repo", required=True)
|
||||
pre.add_argument("--issue", type=int, required=True)
|
||||
pre.add_argument("--agent", required=True)
|
||||
|
||||
# Post-task
|
||||
post = subparsers.add_parser("post", help="Run post-task gates")
|
||||
post.add_argument("--repo", required=True)
|
||||
post.add_argument("--issue", type=int, required=True)
|
||||
post.add_argument("--agent", required=True)
|
||||
post.add_argument("--branch", required=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.command:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
if args.command == "pre":
|
||||
passed, msgs = pre_task_gate(args.repo, args.issue, args.agent)
|
||||
elif args.command == "post":
|
||||
passed, msgs = post_task_gate(args.repo, args.issue, args.agent, args.branch)
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
sys.exit(0 if passed else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
195
scripts/test_harness.sh
Executable file
195
scripts/test_harness.sh
Executable file
@@ -0,0 +1,195 @@
|
||||
#!/usr/bin/env bash
|
||||
# test_harness.sh — Common CLI safety/test harness for the scripts/ suite
|
||||
# Usage: ./scripts/test_harness.sh [--verbose] [--ci] [directory]
|
||||
#
|
||||
# Discovers .sh, .py, and .yaml files in the target directory and validates them:
|
||||
# - .sh : runs shellcheck (or SKIPS if unavailable)
|
||||
# - .py : runs python3 -m py_compile
|
||||
# - .yaml: validates with python3 yaml.safe_load
|
||||
#
|
||||
# Exit codes: 0 = all pass, 1 = any fail
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
# --- Defaults ---
|
||||
VERBOSE=0
|
||||
CI_MODE=0
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
TARGET_DIR="${SCRIPT_DIR}"
|
||||
|
||||
# --- Colors (disabled in CI) ---
|
||||
RED=""
|
||||
GREEN=""
|
||||
YELLOW=""
|
||||
CYAN=""
|
||||
RESET=""
|
||||
if [[ -t 1 && "${CI:-}" != "true" ]]; then
|
||||
RED=$'\033[0;31m'
|
||||
GREEN=$'\033[0;32m'
|
||||
YELLOW=$'\033[0;33m'
|
||||
CYAN=$'\033[0;36m'
|
||||
RESET=$'\033[0m'
|
||||
fi
|
||||
|
||||
# --- Argument parsing ---
|
||||
while [[ $# -gt 0 ]]; do
|
||||
case "$1" in
|
||||
--verbose|-v) VERBOSE=1; shift ;;
|
||||
--ci) CI_MODE=1; shift ;;
|
||||
-*) echo "Unknown option: $1" >&2; exit 2 ;;
|
||||
*) TARGET_DIR="$1"; shift ;;
|
||||
esac
|
||||
done
|
||||
|
||||
# --- Counters ---
|
||||
PASS=0
|
||||
FAIL=0
|
||||
SKIP=0
|
||||
TOTAL=0
|
||||
|
||||
# --- Helpers ---
|
||||
log_verbose() {
|
||||
if [[ "${VERBOSE}" -eq 1 ]]; then
|
||||
echo " ${CYAN}[DEBUG]${RESET} $*"
|
||||
fi
|
||||
}
|
||||
|
||||
record_pass() {
|
||||
((PASS++))
|
||||
((TOTAL++))
|
||||
echo "${GREEN}PASS${RESET} $1"
|
||||
}
|
||||
|
||||
record_fail() {
|
||||
((FAIL++))
|
||||
((TOTAL++))
|
||||
echo "${RED}FAIL${RESET} $1"
|
||||
if [[ -n "${2:-}" ]]; then
|
||||
echo " ${2}"
|
||||
fi
|
||||
}
|
||||
|
||||
record_skip() {
|
||||
((SKIP++))
|
||||
((TOTAL++))
|
||||
echo "${YELLOW}SKIP${RESET} $1 — $2"
|
||||
}
|
||||
|
||||
# --- Checkers ---
|
||||
check_shell_file() {
|
||||
local file="$1"
|
||||
local rel="${file#${TARGET_DIR}/}"
|
||||
if command -v shellcheck &>/dev/null; then
|
||||
log_verbose "Running shellcheck on ${rel}"
|
||||
local output
|
||||
if output=$(shellcheck -x -S warning "${file}" 2>&1); then
|
||||
record_pass "${rel}"
|
||||
else
|
||||
record_fail "${rel}" "${output}"
|
||||
fi
|
||||
else
|
||||
record_skip "${rel}" "shellcheck not installed"
|
||||
fi
|
||||
}
|
||||
|
||||
check_python_file() {
|
||||
local file="$1"
|
||||
local rel="${file#${TARGET_DIR}/}"
|
||||
log_verbose "Running py_compile on ${rel}"
|
||||
local output
|
||||
if output=$(python3 -m py_compile "${file}" 2>&1); then
|
||||
record_pass "${rel}"
|
||||
else
|
||||
record_fail "${rel}" "${output}"
|
||||
fi
|
||||
}
|
||||
|
||||
check_yaml_file() {
|
||||
local file="$1"
|
||||
local rel="${file#${TARGET_DIR}/}"
|
||||
log_verbose "Validating YAML: ${rel}"
|
||||
local output
|
||||
if output=$(python3 -c "import yaml; yaml.safe_load(open('${file}'))" 2>&1); then
|
||||
record_pass "${rel}"
|
||||
else
|
||||
record_fail "${rel}" "${output}"
|
||||
fi
|
||||
}
|
||||
|
||||
# --- Main ---
|
||||
echo ""
|
||||
echo "=== scripts/ test harness ==="
|
||||
echo "Target: ${TARGET_DIR}"
|
||||
echo ""
|
||||
|
||||
if [[ ! -d "${TARGET_DIR}" ]]; then
|
||||
echo "Error: target directory '${TARGET_DIR}' not found" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check python3 availability
|
||||
if ! command -v python3 &>/dev/null; then
|
||||
echo "${RED}Error: python3 is required but not found${RESET}" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check PyYAML availability
|
||||
if ! python3 -c "import yaml" 2>/dev/null; then
|
||||
echo "${YELLOW}Warning: PyYAML not installed — YAML checks will be skipped${RESET}" >&2
|
||||
YAML_AVAILABLE=0
|
||||
else
|
||||
YAML_AVAILABLE=1
|
||||
fi
|
||||
|
||||
# Discover and check .sh files
|
||||
sh_files=()
|
||||
while IFS= read -r -d '' f; do
|
||||
sh_files+=("$f")
|
||||
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.sh" ! -name "test_harness.sh" ! -name "test_runner.sh" -print0 | sort -z)
|
||||
|
||||
for f in "${sh_files[@]:-}"; do
|
||||
[[ -n "$f" ]] && check_shell_file "$f"
|
||||
done
|
||||
|
||||
# Discover and check .py files
|
||||
py_files=()
|
||||
while IFS= read -r -d '' f; do
|
||||
py_files+=("$f")
|
||||
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.py" -print0 | sort -z)
|
||||
|
||||
for f in "${py_files[@]:-}"; do
|
||||
[[ -n "$f" ]] && check_python_file "$f"
|
||||
done
|
||||
|
||||
# Discover and check .yaml files in target dir
|
||||
yaml_files=()
|
||||
while IFS= read -r -d '' f; do
|
||||
yaml_files+=("$f")
|
||||
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.yaml" -print0 | sort -z)
|
||||
|
||||
if [[ "${YAML_AVAILABLE}" -eq 1 ]]; then
|
||||
for f in "${yaml_files[@]:-}"; do
|
||||
[[ -n "$f" ]] && check_yaml_file "$f"
|
||||
done
|
||||
else
|
||||
for f in "${yaml_files[@]:-}"; do
|
||||
[[ -n "$f" ]] && record_skip "${f#${TARGET_DIR}/}" "PyYAML not installed"
|
||||
done
|
||||
fi
|
||||
|
||||
# --- Summary ---
|
||||
echo ""
|
||||
echo "=== Results ==="
|
||||
echo " ${GREEN}PASS${RESET}: ${PASS}"
|
||||
echo " ${RED}FAIL${RESET}: ${FAIL}"
|
||||
echo " ${YELLOW}SKIP${RESET}: ${SKIP}"
|
||||
echo " Total: ${TOTAL}"
|
||||
echo ""
|
||||
|
||||
if [[ "${FAIL}" -gt 0 ]]; then
|
||||
echo "${RED}FAILED${RESET} — ${FAIL} file(s) did not pass validation."
|
||||
exit 1
|
||||
else
|
||||
echo "${GREEN}ALL CLEAR${RESET} — all checked files passed."
|
||||
exit 0
|
||||
fi
|
||||
9
scripts/test_runner.sh
Executable file
9
scripts/test_runner.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/usr/bin/env bash
|
||||
# test_runner.sh — Convenience wrapper for test_harness.sh
|
||||
# Runs the test harness with sensible defaults for local development.
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
|
||||
exec "${SCRIPT_DIR}/test_harness.sh" --verbose "$@"
|
||||
233
tests/test_linter.py
Normal file
233
tests/test_linter.py
Normal file
@@ -0,0 +1,233 @@
|
||||
"""Tests for Architecture Linter v2.
|
||||
|
||||
Validates that the linter correctly detects violations and passes clean repos.
|
||||
Refs: #437 — test-backed linter.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
# Add scripts/ to path
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
||||
|
||||
from architecture_linter_v2 import Linter, LinterResult
|
||||
|
||||
|
||||
# ── helpers ───────────────────────────────────────────────────────────
|
||||
|
||||
def _make_repo(tmpdir: str, files: dict[str, str], name: str = "test-repo") -> Path:
|
||||
"""Create a fake repo with given files and return its path."""
|
||||
repo = Path(tmpdir) / name
|
||||
repo.mkdir()
|
||||
for relpath, content in files.items():
|
||||
p = repo / relpath
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
p.write_text(content)
|
||||
return repo
|
||||
|
||||
|
||||
def _run(tmpdir, files, name="test-repo"):
|
||||
repo = _make_repo(tmpdir, files, name)
|
||||
return Linter(str(repo)).run()
|
||||
|
||||
|
||||
# ── clean repo passes ─────────────────────────────────────────────────
|
||||
|
||||
def test_clean_repo_passes():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# Test Repo\n\nThis is a clean test repo with sufficient content to pass.",
|
||||
"main.py": "print('hello world')\n",
|
||||
})
|
||||
assert result.passed, f"Expected pass but got: {result.errors}"
|
||||
assert result.violation_count == 0
|
||||
|
||||
|
||||
# ── missing README ────────────────────────────────────────────────────
|
||||
|
||||
def test_missing_readme_fails():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {"main.py": "x = 1\n"})
|
||||
assert not result.passed
|
||||
assert any("README" in e for e in result.errors)
|
||||
|
||||
|
||||
def test_short_readme_warns():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {"README.md": "hi\n"})
|
||||
# Warnings don't fail the build
|
||||
assert result.passed
|
||||
assert any("short" in w.lower() for w in result.warnings)
|
||||
|
||||
|
||||
# ── hardcoded IPs ─────────────────────────────────────────────────────
|
||||
|
||||
def test_hardcoded_public_ip_detected():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"server.py": "HOST = '203.0.113.42'\n",
|
||||
})
|
||||
assert not result.passed
|
||||
assert any("203.0.113.42" in e for e in result.errors)
|
||||
|
||||
|
||||
def test_localhost_ip_ignored():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"server.py": "HOST = '127.0.0.1'\n",
|
||||
})
|
||||
ip_errors = [e for e in result.errors if "IP" in e]
|
||||
assert len(ip_errors) == 0
|
||||
|
||||
|
||||
# ── API keys ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_openai_key_detected():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"config.py": 'key = "sk-abcdefghijklmnopqrstuvwx"\n',
|
||||
})
|
||||
assert not result.passed
|
||||
assert any("secret" in e.lower() or "key" in e.lower() for e in result.errors)
|
||||
|
||||
|
||||
def test_aws_key_detected():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"deploy.yaml": 'aws_key: AKIAIOSFODNN7EXAMPLE\n',
|
||||
})
|
||||
assert not result.passed
|
||||
assert any("secret" in e.lower() for e in result.errors)
|
||||
|
||||
|
||||
def test_env_example_skipped():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
".env.example": 'OPENAI_KEY=sk-placeholder\n',
|
||||
})
|
||||
secret_errors = [e for e in result.errors if "secret" in e.lower()]
|
||||
assert len(secret_errors) == 0
|
||||
|
||||
|
||||
# ── sovereignty rules (v1 cloud API checks) ───────────────────────────
|
||||
|
||||
def test_openai_url_detected():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"app.py": 'url = "https://api.openai.com/v1/chat"\n',
|
||||
})
|
||||
assert not result.passed
|
||||
assert any("openai" in e.lower() for e in result.errors)
|
||||
|
||||
|
||||
def test_cloud_provider_detected():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"config.yaml": "provider: openai\n",
|
||||
})
|
||||
assert not result.passed
|
||||
assert any("provider" in e.lower() for e in result.errors)
|
||||
|
||||
|
||||
# ── sidecar boundary ──────────────────────────────────────────────────
|
||||
|
||||
def test_sovereign_keyword_in_hermes_agent_fails():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"index.py": "import mempalace\n",
|
||||
}, name="hermes-agent")
|
||||
assert not result.passed
|
||||
assert any("sidecar" in e.lower() or "mempalace" in e.lower() for e in result.errors)
|
||||
|
||||
|
||||
def test_sovereign_keyword_in_other_repo_ok():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"index.py": "import mempalace\n",
|
||||
}, name="some-other-repo")
|
||||
sidecar_errors = [e for e in result.errors if "sidecar" in e.lower()]
|
||||
assert len(sidecar_errors) == 0
|
||||
|
||||
|
||||
# ── SOUL.md canonical location ────────────────────────────────────────
|
||||
|
||||
def test_soul_md_required_in_timmy_config():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# timmy-config\n\nConfig repo.",
|
||||
}, name="timmy-config")
|
||||
assert not result.passed
|
||||
assert any("SOUL.md" in e for e in result.errors)
|
||||
|
||||
|
||||
def test_soul_md_present_in_timmy_config_ok():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# timmy-config\n\nConfig repo.",
|
||||
"SOUL.md": "# Soul\n\nCanonical identity document.",
|
||||
}, name="timmy-config")
|
||||
soul_errors = [e for e in result.errors if "SOUL" in e]
|
||||
assert len(soul_errors) == 0
|
||||
|
||||
|
||||
def test_soul_md_in_wrong_repo_fails():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"SOUL.md": "# Soul\n\nShould not be here.",
|
||||
}, name="other-repo")
|
||||
assert any("canonical" in e.lower() for e in result.errors)
|
||||
|
||||
|
||||
# ── LinterResult structure ────────────────────────────────────────────
|
||||
|
||||
def test_result_summary_is_string():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {"README.md": "# OK repo with enough text here\n"})
|
||||
assert isinstance(result.summary(), str)
|
||||
assert "PASSED" in result.summary() or "FAILED" in result.summary()
|
||||
|
||||
|
||||
def test_result_repo_name():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
result = _run(tmp, {"README.md": "# OK\n"}, name="my-repo")
|
||||
assert result.repo_name == "my-repo"
|
||||
|
||||
|
||||
# ── invalid path ──────────────────────────────────────────────────────
|
||||
|
||||
def test_invalid_path_raises():
|
||||
try:
|
||||
Linter("/nonexistent/path/xyz")
|
||||
assert False, "Should have raised FileNotFoundError"
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
|
||||
# ── skip dirs ──────────────────────────────────────────────────────────
|
||||
|
||||
def test_git_dir_skipped():
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
repo = _make_repo(tmp, {
|
||||
"README.md": "# R\n\nGood repo.",
|
||||
"main.py": "x = 1\n",
|
||||
})
|
||||
# Create a .git/ dir with a bad file
|
||||
git_dir = repo / ".git"
|
||||
git_dir.mkdir()
|
||||
(git_dir / "bad.py").write_text("HOST = '203.0.113.1'\n")
|
||||
|
||||
result = Linter(str(repo)).run()
|
||||
git_errors = [e for e in result.errors if ".git" in e]
|
||||
assert len(git_errors) == 0
|
||||
Reference in New Issue
Block a user