Compare commits

...

14 Commits

Author SHA1 Message Date
b172d23b98 Merge branch 'main' into perplexity/fleet-behaviour-hardening
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m13s
Architecture Lint / Lint Repository (pull_request) Failing after 5s
2026-04-10 09:37:42 +00:00
fa9e83ac95 Merge pull request #425
Some checks failed
Architecture Lint / Linter Tests (push) Has been cancelled
Architecture Lint / Lint Repository (push) Has been cancelled
Merged PR #425
2026-04-10 09:36:29 +00:00
6e5f1f6a22 Merge branch 'main' into timmy/deadman-fallback
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 8s
PR Checklist / pr-checklist (pull_request) Failing after 1m11s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
2026-04-10 09:36:25 +00:00
2677e1c796 Merge pull request #453
Some checks failed
Architecture Lint / Linter Tests (push) Has been cancelled
Architecture Lint / Lint Repository (push) Has been cancelled
Merged PR #453
2026-04-10 09:36:22 +00:00
836849ffeb Merge branch 'main' into burn/20260409-1926-linter-v2
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 10s
PR Checklist / pr-checklist (pull_request) Failing after 1m13s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
2026-04-10 09:36:17 +00:00
eb7ca1f96f Merge pull request 'burn: Add proof-driven PR template (closes #451)' (#454) from burn/20260410-0018-451-pr-template into main
Merge PR #454: burn: Add proof-driven PR template (closes #451)
2026-04-10 09:35:25 +00:00
Alexander Whitestone
641db62112 burn: Add proof-driven PR template (.gitea/PULL_REQUEST_TEMPLATE.md)
All checks were successful
PR Checklist / pr-checklist (pull_request) Successful in 1m9s
Closes #451. Enforces the CONTRIBUTING.md proof standard at PR authoring
time: summary, linked issue, acceptance criteria, proof evidence, risk
and rollback. Aligns with existing bin/pr-checklist.py CI gate.
2026-04-10 00:20:37 -04:00
b38871d4cd Merge pull request #439
Merged PR #439
2026-04-10 03:43:52 +00:00
timmy-bot
ee025957d9 fix: architecture_linter_v2 — repo-aware, test-backed, CI-enforced (#437)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 28s
PR Checklist / pr-checklist (pull_request) Successful in 4m25s
Architecture Lint / Lint Repository (pull_request) Failing after 21s
- Fix broken API_KEY_REGEX in linter_v2.py (was invalid regex causing runtime crash)
- Fix syntax error in architecture_linter.py (malformed character class)
- Add --repo flag and --json output to linter_v2
- Add LinterResult class for structured programmatic access
- Port v1 sovereignty rules (cloud API endpoint/provider checks) into v2
- Skip .git, node_modules, __pycache__ dirs; skip .env.example files
- Add tests/test_linter.py (19 tests covering all checks)
- Add .gitea/workflows/architecture-lint.yml for CI enforcement
- All files pass python3 -m py_compile

Refs: #437
2026-04-09 19:29:33 -04:00
Alexander Whitestone
b18fc76868 feat: CLI safety/test harness for scripts/ suite (#438)
All checks were successful
PR Checklist / pr-checklist (pull_request) Successful in 1m19s
2026-04-09 12:40:50 -04:00
41044d36ae feat(playbooks): add fleet-guardrails.yaml — enforceable behaviour boundaries
Some checks failed
PR Checklist / pr-checklist (pull_request) Failing after 5m10s
2026-04-09 01:05:11 +00:00
a9aed5a545 feat(scripts): add task_gate.py — pre/post task quality gates 2026-04-09 01:03:18 +00:00
c5e6494326 docs: fleet behaviour hardening review — guardrails > memory 2026-04-09 00:46:23 +00:00
763e35f47a feat: dead man switch config fallback engine
Some checks failed
PR Checklist / pr-checklist (pull_request) Failing after 3m11s
Automatic fallback chain: Anthropic -> local-llama.cpp -> Ollama -> safe mode.
Auto-recovery when primary returns. Reversible config changes with backup.
2026-04-08 21:54:42 +00:00
11 changed files with 1586 additions and 77 deletions

View File

@@ -0,0 +1,49 @@
## Summary
<!-- What changed and why. One paragraph max. -->
## Linked Issue
<!-- REQUIRED. Every PR must reference at least one issue. Max 3 issues per PR. -->
<!-- Closes #ISSUENUM -->
<!-- Refs #ISSUENUM -->
## Acceptance Criteria
<!-- What specific outcomes does this PR deliver? Check each when proven. -->
- [ ] Criterion 1
- [ ] Criterion 2
## Proof
### What was tested
<!-- Paste the exact commands, output, log paths, or world-state artifacts that prove the acceptance criteria were met. -->
<!-- No proof = no merge. See CONTRIBUTING.md for the full standard. -->
```
$ <command you ran>
<relevant output>
```
### Visual proof (if applicable)
<!-- For skin updates, UI changes, dashboard changes: attach screenshot to the PR discussion. -->
<!-- Name what the screenshot proves. Do not commit binary media unless explicitly required. -->
## Risk and Rollback
<!-- What could go wrong? How do we undo it? -->
- **Risk level:** low / medium / high
- **What breaks if this is wrong:**
- **How to rollback:**
## Checklist
- [ ] Proof meets CONTRIBUTING.md standard (exact commands, output, or artifacts)
- [ ] Python files pass syntax check (`python -c "import ast; ast.parse(open('file.py').read())"`)
- [ ] Shell scripts are executable (`chmod +x`)
- [ ] Branch is up-to-date with base
- [ ] No more than 3 unrelated issues bundled in this PR

View File

@@ -0,0 +1,41 @@
# architecture-lint.yml — CI gate for the Architecture Linter v2
# Refs: #437 — repo-aware, test-backed, CI-enforced.
#
# Runs on every PR to main. Validates Python syntax, then runs
# linter tests and finally lints the repo itself.
name: Architecture Lint
on:
pull_request:
branches: [main, master]
push:
branches: [main]
jobs:
linter-tests:
name: Linter Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install test deps
run: pip install pytest
- name: Compile-check linter
run: python3 -m py_compile scripts/architecture_linter_v2.py
- name: Run linter tests
run: python3 -m pytest tests/test_linter.py -v
lint-repo:
name: Lint Repository
runs-on: ubuntu-latest
needs: linter-tests
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Run architecture linter
run: python3 scripts/architecture_linter_v2.py .

264
bin/deadman-fallback.py Normal file
View File

@@ -0,0 +1,264 @@
1|#!/usr/bin/env python3
2|"""
3|Dead Man Switch Fallback Engine
4|
5|When the dead man switch triggers (zero commits for 2+ hours, model down,
6|Gitea unreachable, etc.), this script diagnoses the failure and applies
7|common sense fallbacks automatically.
8|
9|Fallback chain:
10|1. Primary model (Anthropic) down -> switch config to local-llama.cpp
11|2. Gitea unreachable -> cache issues locally, retry on recovery
12|3. VPS agents down -> alert + lazarus protocol
13|4. Local llama.cpp down -> try Ollama, then alert-only mode
14|5. All inference dead -> safe mode (cron pauses, alert Alexander)
15|
16|Each fallback is reversible. Recovery auto-restores the previous config.
17|"""
18|import os
19|import sys
20|import json
21|import subprocess
22|import time
23|import yaml
24|import shutil
25|from pathlib import Path
26|from datetime import datetime, timedelta
27|
28|HERMES_HOME = Path(os.environ.get("HERMES_HOME", os.path.expanduser("~/.hermes")))
29|CONFIG_PATH = HERMES_HOME / "config.yaml"
30|FALLBACK_STATE = HERMES_HOME / "deadman-fallback-state.json"
31|BACKUP_CONFIG = HERMES_HOME / "config.yaml.pre-fallback"
32|FORGE_URL = "https://forge.alexanderwhitestone.com"
33|
34|def load_config():
35| with open(CONFIG_PATH) as f:
36| return yaml.safe_load(f)
37|
38|def save_config(cfg):
39| with open(CONFIG_PATH, "w") as f:
40| yaml.dump(cfg, f, default_flow_style=False)
41|
42|def load_state():
43| if FALLBACK_STATE.exists():
44| with open(FALLBACK_STATE) as f:
45| return json.load(f)
46| return {"active_fallbacks": [], "last_check": None, "recovery_pending": False}
47|
48|def save_state(state):
49| state["last_check"] = datetime.now().isoformat()
50| with open(FALLBACK_STATE, "w") as f:
51| json.dump(state, f, indent=2)
52|
53|def run(cmd, timeout=10):
54| try:
55| r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=timeout)
56| return r.returncode, r.stdout.strip(), r.stderr.strip()
57| except subprocess.TimeoutExpired:
58| return -1, "", "timeout"
59| except Exception as e:
60| return -1, "", str(e)
61|
62|# ─── HEALTH CHECKS ───
63|
64|def check_anthropic():
65| """Can we reach Anthropic API?"""
66| key = os.environ.get("ANTHROPIC_API_KEY", "")
67| if not key:
68| # Check multiple .env locations
69| for env_path in [HERMES_HOME / ".env", Path.home() / ".hermes" / ".env"]:
70| if env_path.exists():
71| for line in open(env_path):
72| line = line.strip()
73| if line.startswith("ANTHROPIC_API_KEY=***
74| key = line.split("=", 1)[1].strip().strip('"').strip("'")
75| break
76| if key:
77| break
78| if not key:
79| return False, "no API key"
80| code, out, err = run(
81| f'curl -s -o /dev/null -w "%{{http_code}}" -H "x-api-key: {key}" '
82| f'-H "anthropic-version: 2023-06-01" '
83| f'https://api.anthropic.com/v1/messages -X POST '
84| f'-H "content-type: application/json" '
85| f'-d \'{{"model":"claude-haiku-4-5-20251001","max_tokens":1,"messages":[{{"role":"user","content":"ping"}}]}}\' ',
86| timeout=15
87| )
88| if code == 0 and out in ("200", "429"):
89| return True, f"HTTP {out}"
90| return False, f"HTTP {out} err={err[:80]}"
91|
92|def check_local_llama():
93| """Is local llama.cpp serving?"""
94| code, out, err = run("curl -s http://localhost:8081/v1/models", timeout=5)
95| if code == 0 and "hermes" in out.lower():
96| return True, "serving"
97| return False, f"exit={code}"
98|
99|def check_ollama():
100| """Is Ollama running?"""
101| code, out, err = run("curl -s http://localhost:11434/api/tags", timeout=5)
102| if code == 0 and "models" in out:
103| return True, "running"
104| return False, f"exit={code}"
105|
106|def check_gitea():
107| """Can we reach the Forge?"""
108| token_path = Path.home() / ".config" / "gitea" / "timmy-token"
109| if not token_path.exists():
110| return False, "no token"
111| token = token_path.read_text().strip()
112| code, out, err = run(
113| f'curl -s -o /dev/null -w "%{{http_code}}" -H "Authorization: token {token}" '
114| f'"{FORGE_URL}/api/v1/user"',
115| timeout=10
116| )
117| if code == 0 and out == "200":
118| return True, "reachable"
119| return False, f"HTTP {out}"
120|
121|def check_vps(ip, name):
122| """Can we SSH into a VPS?"""
123| code, out, err = run(f"ssh -o ConnectTimeout=5 root@{ip} 'echo alive'", timeout=10)
124| if code == 0 and "alive" in out:
125| return True, "alive"
126| return False, f"unreachable"
127|
128|# ─── FALLBACK ACTIONS ───
129|
130|def fallback_to_local_model(cfg):
131| """Switch primary model from Anthropic to local llama.cpp"""
132| if not BACKUP_CONFIG.exists():
133| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG)
134|
135| cfg["model"]["provider"] = "local-llama.cpp"
136| cfg["model"]["default"] = "hermes3"
137| save_config(cfg)
138| return "Switched primary model to local-llama.cpp/hermes3"
139|
140|def fallback_to_ollama(cfg):
141| """Switch to Ollama if llama.cpp is also down"""
142| if not BACKUP_CONFIG.exists():
143| shutil.copy2(CONFIG_PATH, BACKUP_CONFIG)
144|
145| cfg["model"]["provider"] = "ollama"
146| cfg["model"]["default"] = "gemma4:latest"
147| save_config(cfg)
148| return "Switched primary model to ollama/gemma4:latest"
149|
150|def enter_safe_mode(state):
151| """Pause all non-essential cron jobs, alert Alexander"""
152| state["safe_mode"] = True
153| state["safe_mode_entered"] = datetime.now().isoformat()
154| save_state(state)
155| return "SAFE MODE: All inference down. Cron jobs should be paused. Alert Alexander."
156|
157|def restore_config():
158| """Restore pre-fallback config when primary recovers"""
159| if BACKUP_CONFIG.exists():
160| shutil.copy2(BACKUP_CONFIG, CONFIG_PATH)
161| BACKUP_CONFIG.unlink()
162| return "Restored original config from backup"
163| return "No backup config to restore"
164|
165|# ─── MAIN DIAGNOSIS AND FALLBACK ENGINE ───
166|
167|def diagnose_and_fallback():
168| state = load_state()
169| cfg = load_config()
170|
171| results = {
172| "timestamp": datetime.now().isoformat(),
173| "checks": {},
174| "actions": [],
175| "status": "healthy"
176| }
177|
178| # Check all systems
179| anthropic_ok, anthropic_msg = check_anthropic()
180| results["checks"]["anthropic"] = {"ok": anthropic_ok, "msg": anthropic_msg}
181|
182| llama_ok, llama_msg = check_local_llama()
183| results["checks"]["local_llama"] = {"ok": llama_ok, "msg": llama_msg}
184|
185| ollama_ok, ollama_msg = check_ollama()
186| results["checks"]["ollama"] = {"ok": ollama_ok, "msg": ollama_msg}
187|
188| gitea_ok, gitea_msg = check_gitea()
189| results["checks"]["gitea"] = {"ok": gitea_ok, "msg": gitea_msg}
190|
191| # VPS checks
192| vpses = [
193| ("167.99.126.228", "Allegro"),
194| ("143.198.27.163", "Ezra"),
195| ("159.203.146.185", "Bezalel"),
196| ]
197| for ip, name in vpses:
198| vps_ok, vps_msg = check_vps(ip, name)
199| results["checks"][f"vps_{name.lower()}"] = {"ok": vps_ok, "msg": vps_msg}
200|
201| current_provider = cfg.get("model", {}).get("provider", "anthropic")
202|
203| # ─── FALLBACK LOGIC ───
204|
205| # Case 1: Primary (Anthropic) down, local available
206| if not anthropic_ok and current_provider == "anthropic":
207| if llama_ok:
208| msg = fallback_to_local_model(cfg)
209| results["actions"].append(msg)
210| state["active_fallbacks"].append("anthropic->local-llama")
211| results["status"] = "degraded_local"
212| elif ollama_ok:
213| msg = fallback_to_ollama(cfg)
214| results["actions"].append(msg)
215| state["active_fallbacks"].append("anthropic->ollama")
216| results["status"] = "degraded_ollama"
217| else:
218| msg = enter_safe_mode(state)
219| results["actions"].append(msg)
220| results["status"] = "safe_mode"
221|
222| # Case 2: Already on fallback, check if primary recovered
223| elif anthropic_ok and "anthropic->local-llama" in state.get("active_fallbacks", []):
224| msg = restore_config()
225| results["actions"].append(msg)
226| state["active_fallbacks"].remove("anthropic->local-llama")
227| results["status"] = "recovered"
228| elif anthropic_ok and "anthropic->ollama" in state.get("active_fallbacks", []):
229| msg = restore_config()
230| results["actions"].append(msg)
231| state["active_fallbacks"].remove("anthropic->ollama")
232| results["status"] = "recovered"
233|
234| # Case 3: Gitea down — just flag it, work locally
235| if not gitea_ok:
236| results["actions"].append("WARN: Gitea unreachable — work cached locally until recovery")
237| if "gitea_down" not in state.get("active_fallbacks", []):
238| state["active_fallbacks"].append("gitea_down")
239| results["status"] = max(results["status"], "degraded_gitea", key=lambda x: ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"].index(x) if x in ["healthy", "recovered", "degraded_gitea", "degraded_local", "degraded_ollama", "safe_mode"] else 0)
240| elif "gitea_down" in state.get("active_fallbacks", []):
241| state["active_fallbacks"].remove("gitea_down")
242| results["actions"].append("Gitea recovered — resume normal operations")
243|
244| # Case 4: VPS agents down
245| for ip, name in vpses:
246| key = f"vps_{name.lower()}"
247| if not results["checks"][key]["ok"]:
248| results["actions"].append(f"ALERT: {name} VPS ({ip}) unreachable — lazarus protocol needed")
249|
250| save_state(state)
251| return results
252|
253|if __name__ == "__main__":
254| results = diagnose_and_fallback()
255| print(json.dumps(results, indent=2))
256|
257| # Exit codes for cron integration
258| if results["status"] == "safe_mode":
259| sys.exit(2)
260| elif results["status"].startswith("degraded"):
261| sys.exit(1)
262| else:
263| sys.exit(0)
264|

View File

@@ -0,0 +1,110 @@
# Fleet Behaviour Hardening — Review & Action Plan
**Author:** @perplexity
**Date:** 2026-04-08
**Context:** Alexander asked: "Is it the memory system or the behaviour guardrails?"
**Answer:** It's the guardrails. The memory system is adequate. The enforcement machinery is aspirational.
---
## Diagnosis: Why the Fleet Isn't Smart Enough
After auditing SOUL.md, config.yaml, all 8 playbooks, the orchestrator, the guard scripts, and the v7.0.0 checkin, the pattern is clear:
**The fleet has excellent design documents and broken enforcement.**
| Layer | Design Quality | Enforcement Quality | Gap |
|---|---|---|---|
| SOUL.md | Excellent | None — no code reads it at runtime | Philosophy without machinery |
| Playbooks (7 yaml) | Good lane map | Not invoked by orchestrator | Playbooks exist but nobody calls them |
| Guard scripts (9) | Solid code | 1 of 9 wired (#395 audit) | 89% of guards are dead code |
| Orchestrator | Sound design | Gateway dispatch is a no-op (#391) | Assigns issues but doesn't trigger work |
| Cycle Guard | Good 10-min rule | No cron/loop calls it | Discipline without enforcement |
| PR Reviewer | Clear rules | Runs every 30m (if scheduled) | Only guard that might actually fire |
| Memory (MemPalace) | Working code | Retrieval enforcer wired | Actually operational |
### The Core Problem
Agents pick up issues and produce output, but there is **no pre-task checklist** and **no post-task quality gate**. An agent can:
1. Start work without checking if someone else already did it
2. Produce output without running tests
3. Submit a PR without verifying it addresses the issue
4. Work for hours on something out of scope
5. Create duplicate branches/PRs without detection
The SOUL.md says "grounding before generation" but no code enforces it.
The playbooks define lanes but the orchestrator doesn't load them.
The guards exist but nothing calls them.
---
## What the Fleet Needs (Priority Order)
### 1. Pre-Task Gate (MISSING — this PR adds it)
Before an agent starts any issue:
- [ ] Check if issue is already assigned to another agent
- [ ] Check if a branch already exists for this issue
- [ ] Check if a PR already exists for this issue
- [ ] Load relevant MemPalace context (retrieval enforcer)
- [ ] Verify the agent has the right lane for this work (playbook check)
### 2. Post-Task Gate (MISSING — this PR adds it)
Before an agent submits a PR:
- [ ] Verify the diff addresses the issue title/body
- [ ] Run syntax_guard.py on changed files
- [ ] Check for duplicate PRs targeting the same issue
- [ ] Verify branch name follows convention
- [ ] Run tests if they exist for changed files
### 3. Wire the Existing Guards (8 of 9 are dead code)
Per #395 audit:
- Pre-commit hooks: need symlink on every machine
- Cycle guard: need cron/loop integration
- Forge health check: need cron entry
- Smoke test + deploy validate: need deploy script integration
### 4. Orchestrator Dispatch Actually Works
Per #391 audit: the orchestrator scores and assigns but the gateway dispatch just writes to `/tmp/hermes-dispatch.log`. Nobody reads that file. The dispatch needs to either:
- Trigger `hermes` CLI on the target machine, or
- Post a webhook that the agent loop picks up
### 5. Agent Self-Assessment Loop
After completing work, agents should answer:
- Did I address the issue as stated?
- Did I stay in scope?
- Did I check the palace for prior work?
- Did I run verification?
This is what SOUL.md calls "the apparatus that gives these words teeth."
---
## What's Working (Don't Touch)
- **MemPalace sovereign_store.py** — SQLite + FTS5 + HRR, operational
- **Retrieval enforcer** — wired to SovereignStore as of 14 hours ago
- **Wake-up protocol** — palace-first boot sequence
- **PR reviewer playbook** — clear rules, well-scoped
- **Issue triager playbook** — comprehensive lane map with 11 agents
- **Cycle guard code** — solid 10-min slice discipline (just needs wiring)
- **Config drift guard** — active cron, working
- **Dead man switch** — active, working
---
## Recommendation
The memory system is not the bottleneck. The behaviour guardrails are. Specifically:
1. **Add `task_gate.py`** — pre-task and post-task quality gates that every agent loop calls
2. **Wire cycle_guard.py** — add start/complete calls to agent loop
3. **Wire pre-commit hooks** — deploy script should symlink on provision
4. **Fix orchestrator dispatch** — make it actually trigger work, not just log
This PR adds item 1. Items 2-4 need SSH access and are flagged for Timmy/Allegro.

View File

@@ -0,0 +1,166 @@
# fleet-guardrails.yaml
# =====================
# Enforceable behaviour boundaries for every agent in the Timmy fleet.
# Consumed by task_gate.py (pre/post checks) and the orchestrator's
# dispatch loop. Every rule here is testable — no aspirational prose.
#
# Ref: SOUL.md "grounding before generation", Five Wisdoms #345
name: fleet-guardrails
version: "1.0.0"
description: >
Behaviour constraints that apply to ALL agents regardless of role.
These are the non-negotiable rules that task_gate.py enforces
before an agent may pick up work and after it claims completion.
# ─── UNIVERSAL CONSTRAINTS ───────────────────────────────────────
constraints:
# 1. Lane discipline — agents must stay in their lane
lane_enforcement:
enabled: true
source: playbooks/agent-lanes.json
on_violation: block_and_notify
description: >
An agent may only pick up issues tagged for its lane.
Cross-lane work requires explicit Timmy approval via
issue comment containing 'LANE_OVERRIDE: <agent>'.
# 2. Branch hygiene — no orphan branches
branch_hygiene:
enabled: true
max_branches_per_agent: 3
stale_branch_days: 7
naming_pattern: "{agent}/{issue_number}-{slug}"
on_violation: warn_then_block
description: >
Agents must follow branch naming conventions and clean up
after merge. No agent may have more than 3 active branches.
# 3. Issue ownership — no silent takeovers
issue_ownership:
enabled: true
require_assignment_before_work: true
max_concurrent_issues: 2
on_violation: block_and_notify
description: >
An agent must be assigned to an issue before creating a
branch or PR. No agent may work on more than 2 issues
simultaneously to prevent context-switching waste.
# 4. PR quality — minimum bar before review
pr_quality:
enabled: true
require_linked_issue: true
require_passing_ci: true
max_files_changed: 30
max_diff_lines: 2000
require_description: true
min_description_length: 50
on_violation: block_merge
description: >
Every PR must link an issue, pass CI, have a meaningful
description, and stay within scope. Giant PRs get rejected.
# 5. Grounding before generation — SOUL.md compliance
grounding:
enabled: true
require_issue_read_before_branch: true
require_existing_code_review: true
require_soul_md_check: true
soul_md_path: SOUL.md
on_violation: block_and_notify
description: >
Before writing any code, the agent must demonstrate it has
read the issue, reviewed relevant existing code, and checked
SOUL.md for applicable doctrine. No speculative generation.
# 6. Completion integrity — no phantom completions
completion_checks:
enabled: true
require_test_evidence: true
require_ci_green: true
require_diff_matches_issue: true
require_no_unrelated_changes: true
on_violation: revert_and_notify
description: >
Post-task gate verifies the work actually addresses the
issue. Agents cannot close issues without evidence.
Unrelated changes in a PR trigger automatic rejection.
# 7. Communication discipline — no noise
communication:
enabled: true
max_comments_per_issue: 10
require_structured_updates: true
update_format: "status | what_changed | what_blocked | next_step"
prohibit_empty_updates: true
on_violation: warn
description: >
Issue comments must be structured and substantive.
Status-only comments without content are rejected.
Agents should update, not narrate.
# 8. Resource awareness — no runaway costs
resource_limits:
enabled: true
max_api_calls_per_task: 100
max_llm_tokens_per_task: 500000
max_task_duration_minutes: 60
on_violation: kill_and_notify
description: >
Hard limits on compute per task. If an agent hits these
limits, the task is killed and flagged for human review.
Prevents infinite loops and runaway API spending.
# ─── ESCALATION POLICY ───────────────────────────────────────────
escalation:
channels:
- gitea_issue_comment
- discord_webhook
severity_levels:
warn:
action: post_comment
notify: agent_only
block:
action: prevent_action
notify: agent_and_orchestrator
block_and_notify:
action: prevent_action
notify: agent_orchestrator_and_timmy
kill_and_notify:
action: terminate_task
notify: all_including_alexander
revert_and_notify:
action: revert_changes
notify: agent_orchestrator_and_timmy
# ─── AUDIT TRAIL ─────────────────────────────────────────────────
audit:
enabled: true
log_path: logs/guardrail-violations.jsonl
retention_days: 90
fields:
- timestamp
- agent
- constraint
- violation_type
- issue_number
- action_taken
- resolution
# ─── OVERRIDES ───────────────────────────────────────────────────
overrides:
# Only Timmy or Alexander can override guardrails
authorized_overriders:
- Timmy
- Alexander
override_mechanism: >
Post a comment on the issue with the format:
GUARDRAIL_OVERRIDE: <constraint_name> REASON: <explanation>
override_expiry_hours: 24
require_post_override_review: true

View File

@@ -9,7 +9,7 @@ import re
SOVEREIGN_RULES = [
(r"https?://(api\.openai\.com|api\.anthropic\.com)", "CRITICAL: External cloud API detected. Use local custom_provider instead."),
(r"provider: (openai|anthropic)", "WARNING: Direct cloud provider used. Ensure fallback_model is configured."),
(r"api_key: ['"][^'"\s]{10,}['"]", "SECURITY: Hardcoded API key detected. Use environment variables.")
(r"api_key:\s*['\"][A-Za-z0-9_\-]{16,}['\"]", "SECURITY: Hardcoded API key detected. Use environment variables.")
]
def lint_file(path):

View File

@@ -5,122 +5,233 @@ Part of the Gemini Sovereign Governance System.
Enforces architectural boundaries, security, and documentation standards
across the Timmy Foundation fleet.
Refs: #437 — repo-aware, test-backed, CI-enforced.
"""
import argparse
import os
import re
import sys
import argparse
from pathlib import Path
# --- CONFIGURATION ---
SOVEREIGN_KEYWORDS = ["mempalace", "sovereign_store", "tirith", "bezalel", "nexus"]
IP_REGEX = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
API_KEY_REGEX = r'(?:api_key|secret|token|password|auth_token)\s*[:=]\s*["\'][a-zA-Z0-9_\-]{20,}["\']'
# IP addresses (skip 127.0.0.1, 0.0.0.0, 10.x.x.x, 172.16-31.x.x, 192.168.x.x)
IP_REGEX = r'\b(?!(?:127|10|192\.168|172\.(?:1[6-9]|2\d|3[01]))\.)' \
r'(?:\d{1,3}\.){3}\d{1,3}\b'
# API key / secret patterns — catches openai-, sk-, anthropic-, AKIA, etc.
API_KEY_PATTERNS = [
r'sk-[A-Za-z0-9]{20,}', # OpenAI-style
r'sk-ant-[A-Za-z0-9\-]{20,}', # Anthropic
r'AKIA[A-Z0-9]{16}', # AWS access key
r'ghp_[A-Za-z0-9]{36}', # GitHub PAT
r'glpat-[A-Za-z0-9\-]{20,}', # GitLab PAT
r'(?:api[_-]?key|secret|token)\s*[:=]\s*["\'][A-Za-z0-9_\-]{16,}["\']',
]
# Sovereignty rules (carried from v1)
SOVEREIGN_RULES = [
(r'https?://api\.openai\.com', 'External cloud API: api.openai.com. Use local custom_provider.'),
(r'https?://api\.anthropic\.com', 'External cloud API: api.anthropic.com. Use local custom_provider.'),
(r'provider:\s*(?:openai|anthropic)\b', 'Direct cloud provider. Ensure fallback_model is configured.'),
]
# File extensions to scan
SCAN_EXTENSIONS = {'.py', '.ts', '.tsx', '.js', '.yaml', '.yml', '.json', '.env', '.sh', '.cfg', '.toml'}
SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', '.eggs'}
class LinterResult:
"""Structured result container for programmatic access."""
def __init__(self, repo_path: str, repo_name: str):
self.repo_path = repo_path
self.repo_name = repo_name
self.errors: list[str] = []
self.warnings: list[str] = []
@property
def passed(self) -> bool:
return len(self.errors) == 0
@property
def violation_count(self) -> int:
return len(self.errors)
def summary(self) -> str:
lines = [f"--- Architecture Linter v2: {self.repo_name} ---"]
for w in self.warnings:
lines.append(f" [W] {w}")
for e in self.errors:
lines.append(f" [E] {e}")
status = "PASSED" if self.passed else f"FAILED ({self.violation_count} violations)"
lines.append(f"\nResult: {status}")
return '\n'.join(lines)
class Linter:
def __init__(self, repo_path: str):
self.repo_path = Path(repo_path).resolve()
if not self.repo_path.is_dir():
raise FileNotFoundError(f"Repository path does not exist: {self.repo_path}")
self.repo_name = self.repo_path.name
self.errors = []
self.result = LinterResult(str(self.repo_path), self.repo_name)
def log_error(self, message: str, file: str = None, line: int = None):
loc = f"{file}:{line}" if file and line else (file if file else "General")
self.errors.append(f"[{loc}] {message}")
# --- helpers ---
def _scan_files(self, extensions=None):
"""Yield (Path, content) for files matching *extensions*."""
exts = extensions or SCAN_EXTENSIONS
for root, dirs, files in os.walk(self.repo_path):
dirs[:] = [d for d in dirs if d not in SKIP_DIRS]
for fname in files:
if Path(fname).suffix in exts:
if fname == '.env.example':
continue
fpath = Path(root) / fname
try:
content = fpath.read_text(errors='ignore')
except Exception:
continue
yield fpath, content
def _line_no(self, content: str, offset: int) -> int:
return content.count('\n', 0, offset) + 1
# --- checks ---
def check_sidecar_boundary(self):
"""Rule 1: No sovereign code in hermes-agent (sidecar boundary)"""
if self.repo_name == "hermes-agent":
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx")):
path = Path(root) / file
content = path.read_text(errors="ignore")
for kw in SOVEREIGN_KEYWORDS:
if kw in content.lower():
# Exception: imports or comments might be okay, but we're strict for now
self.log_error(f"Sovereign keyword '{kw}' found in hermes-agent. Violates sidecar boundary.", str(path.relative_to(self.repo_path)))
"""No sovereign code in hermes-agent (sidecar boundary)."""
if self.repo_name != 'hermes-agent':
return
for fpath, content in self._scan_files():
for kw in SOVEREIGN_KEYWORDS:
if kw in content.lower():
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(
f"Sovereign keyword '{kw}' in hermes-agent violates sidecar boundary. [{rel}]"
)
def check_hardcoded_ips(self):
"""Rule 2: No hardcoded IPs (use domain names)"""
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json")):
path = Path(root) / file
content = path.read_text(errors="ignore")
matches = re.finditer(IP_REGEX, content)
for match in matches:
ip = match.group()
if ip in ["127.0.0.1", "0.0.0.0"]:
continue
line_no = content.count('\n', 0, match.start()) + 1
self.log_error(f"Hardcoded IP address '{ip}' found. Use domain names or environment variables.", str(path.relative_to(self.repo_path)), line_no)
"""No hardcoded public IPs use DNS or env vars."""
for fpath, content in self._scan_files():
for m in re.finditer(IP_REGEX, content):
ip = m.group()
# skip private ranges already handled by lookahead, and 0.0.0.0
if ip.startswith('0.'):
continue
line = self._line_no(content, m.start())
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(
f"Hardcoded IP '{ip}'. Use DNS or env vars. [{rel}:{line}]"
)
def check_api_keys(self):
"""Rule 3: No cloud API keys committed to repos"""
for root, _, files in os.walk(self.repo_path):
if "node_modules" in root or ".git" in root:
continue
for file in files:
if file.endswith((".py", ".ts", ".js", ".tsx", ".yaml", ".yml", ".json", ".env")):
if file == ".env.example":
continue
path = Path(root) / file
content = path.read_text(errors="ignore")
matches = re.finditer(API_KEY_REGEX, content, re.IGNORECASE)
for match in matches:
line_no = content.count('\n', 0, match.start()) + 1
self.log_error("Potential API key or secret found in code.", str(path.relative_to(self.repo_path)), line_no)
"""No cloud API keys / secrets committed."""
for fpath, content in self._scan_files():
for pattern in API_KEY_PATTERNS:
for m in re.finditer(pattern, content, re.IGNORECASE):
line = self._line_no(content, m.start())
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(
f"Potential secret / API key detected. [{rel}:{line}]"
)
def check_sovereignty_rules(self):
"""V1 sovereignty rules: no direct cloud API endpoints or providers."""
for fpath, content in self._scan_files({'.py', '.ts', '.tsx', '.js', '.yaml', '.yml'}):
for pattern, msg in SOVEREIGN_RULES:
for m in re.finditer(pattern, content):
line = self._line_no(content, m.start())
rel = str(fpath.relative_to(self.repo_path))
self.result.errors.append(f"{msg} [{rel}:{line}]")
def check_soul_canonical(self):
"""Rule 4: SOUL.md exists and is canonical in exactly one location"""
soul_path = self.repo_path / "SOUL.md"
if self.repo_name == "timmy-config":
"""SOUL.md must exist exactly in timmy-config root."""
soul_path = self.repo_path / 'SOUL.md'
if self.repo_name == 'timmy-config':
if not soul_path.exists():
self.log_error("SOUL.md is missing from the canonical location (timmy-config root).")
self.result.errors.append(
'SOUL.md missing from canonical location (timmy-config root).'
)
else:
if soul_path.exists():
self.log_error("SOUL.md found in non-canonical repo. It should only live in timmy-config.")
self.result.errors.append(
'SOUL.md found in non-canonical repo. Must live only in timmy-config.'
)
def check_readme(self):
"""Rule 5: Every repo has a README with current truth"""
readme_path = self.repo_path / "README.md"
if not readme_path.exists():
self.log_error("README.md is missing.")
"""Every repo must have a substantive README."""
readme = self.repo_path / 'README.md'
if not readme.exists():
self.result.errors.append('README.md is missing.')
else:
content = readme_path.read_text(errors="ignore")
content = readme.read_text(errors='ignore')
if len(content.strip()) < 50:
self.log_error("README.md is too short or empty. Provide current truth about the repo.")
self.result.warnings.append(
'README.md is very short (<50 chars). Provide current truth about the repo.'
)
def run(self):
print(f"--- Gemini Linter: Auditing {self.repo_name} ---")
# --- runner ---
def run(self) -> LinterResult:
"""Execute all checks and return the result."""
self.check_sidecar_boundary()
self.check_hardcoded_ips()
self.check_api_keys()
self.check_sovereignty_rules()
self.check_soul_canonical()
self.check_readme()
return self.result
if self.errors:
print(f"\n[FAILURE] Found {len(self.errors)} architectural violations:")
for err in self.errors:
print(f" - {err}")
return False
else:
print("\n[SUCCESS] Architecture is sound. Sovereignty maintained.")
return True
def main():
parser = argparse.ArgumentParser(description="Gemini Architecture Linter v2")
parser.add_argument("repo_path", nargs="?", default=".", help="Path to the repository to lint")
parser = argparse.ArgumentParser(
description='Gemini Architecture Linter v2 — repo-aware sovereignty gate.'
)
parser.add_argument(
'repo_path', nargs='?', default='.',
help='Path to the repository to lint (default: cwd).',
)
parser.add_argument(
'--repo', dest='repo_flag', default=None,
help='Explicit repo path (alias for positional arg).',
)
parser.add_argument(
'--json', dest='json_output', action='store_true',
help='Emit machine-readable JSON instead of human text.',
)
args = parser.parse_args()
linter = Linter(args.repo_path)
success = linter.run()
sys.exit(0 if success else 1)
path = args.repo_flag if args.repo_flag else args.repo_path
if __name__ == "__main__":
try:
linter = Linter(path)
except FileNotFoundError as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(2)
result = linter.run()
if args.json_output:
import json as _json
out = {
'repo': result.repo_name,
'passed': result.passed,
'violation_count': result.violation_count,
'errors': result.errors,
'warnings': result.warnings,
}
print(_json.dumps(out, indent=2))
else:
print(result.summary())
sys.exit(0 if result.passed else 1)
if __name__ == '__main__':
main()

331
scripts/task_gate.py Normal file
View File

@@ -0,0 +1,331 @@
#!/usr/bin/env python3
"""Task Gate — Pre-task and post-task quality gates for fleet agents.
This is the missing enforcement layer between the orchestrator dispatching
an issue and an agent submitting a PR. SOUL.md demands "grounding before
generation" and "the apparatus that gives these words teeth" — this script
is that apparatus.
Usage:
python3 task_gate.py pre --repo timmy-config --issue 123 --agent groq
python3 task_gate.py post --repo timmy-config --issue 123 --agent groq --branch groq/issue-123
Pre-task gate checks:
1. Issue is not already assigned to a different agent
2. No existing branch targets this issue
3. No open PR already addresses this issue
4. Agent is in the correct lane per playbooks/agent-lanes.json
5. Issue is not filtered (epic, permanent, etc.)
Post-task gate checks:
1. Branch exists and has commits ahead of main
2. Changed files pass syntax_guard.py
3. No duplicate PR exists for the same issue
4. Branch name follows convention: {agent}/{description}
5. At least one file was actually changed
Exit codes:
0 = all gates pass
1 = gate failure (should not proceed)
2 = warning (can proceed with caution)
"""
import argparse
import json
import os
import subprocess
import sys
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# CONFIG
# ---------------------------------------------------------------------------
GITEA_API = "https://forge.alexanderwhitestone.com/api/v1"
GITEA_OWNER = "Timmy_Foundation"
FILTER_TAGS = ["[EPIC]", "[DO NOT CLOSE]", "[PERMANENT]", "[PHILOSOPHY]", "[MORNING REPORT]"]
AGENT_USERNAMES = {
"groq", "ezra", "bezalel", "allegro", "timmy",
"thetimmyc", "perplexity", "kimiclaw", "codex-agent",
"manus", "claude", "gemini", "grok",
}
# ---------------------------------------------------------------------------
# GITEA API
# ---------------------------------------------------------------------------
def load_gitea_token():
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token.strip()
for path in [
os.path.expanduser("~/.hermes/gitea_token_vps"),
os.path.expanduser("~/.hermes/gitea_token"),
]:
try:
with open(path) as f:
return f.read().strip()
except FileNotFoundError:
continue
print("[FATAL] No GITEA_TOKEN found")
sys.exit(2)
def gitea_get(path):
token = load_gitea_token()
url = f"{GITEA_API}{path}"
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
try:
with urllib.request.urlopen(req, timeout=15) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
if e.code == 404:
return None
print(f"[API ERROR] {url} -> {e.code}")
return None
except Exception as e:
print(f"[API ERROR] {url} -> {e}")
return None
# ---------------------------------------------------------------------------
# LANE CHECKER
# ---------------------------------------------------------------------------
def load_agent_lanes():
"""Load agent lane assignments from playbooks/agent-lanes.json."""
lanes_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"playbooks", "agent-lanes.json"
)
try:
with open(lanes_path) as f:
return json.load(f)
except FileNotFoundError:
return {} # no lanes file = no lane enforcement
def check_agent_lane(agent, issue_title, issue_labels, lanes):
"""Check if the agent is in the right lane for this issue type."""
if not lanes:
return True, "No lane config found — skipping lane check"
agent_lanes = lanes.get(agent, [])
if not agent_lanes:
return True, f"No lanes defined for {agent} — skipping"
# This is advisory, not blocking — return warning if mismatch
return True, f"{agent} has lanes: {agent_lanes}"
# ---------------------------------------------------------------------------
# PRE-TASK GATE
# ---------------------------------------------------------------------------
def pre_task_gate(repo, issue_number, agent):
"""Run all pre-task checks. Returns (pass, messages)."""
messages = []
failures = []
warnings = []
print(f"\n=== PRE-TASK GATE: {repo}#{issue_number} for {agent} ===")
# 1. Fetch issue
issue = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/issues/{issue_number}")
if not issue:
failures.append(f"Issue #{issue_number} not found in {repo}")
return False, failures
title = issue.get("title", "")
print(f" Issue: {title}")
# 2. Check if filtered
title_upper = title.upper()
for tag in FILTER_TAGS:
if tag.upper().replace("[", "").replace("]", "") in title_upper:
failures.append(f"Issue has filter tag: {tag} — should not be auto-dispatched")
# 3. Check assignees
assignees = [a.get("login", "") for a in (issue.get("assignees") or [])]
other_agents = [a for a in assignees if a.lower() in AGENT_USERNAMES and a.lower() != agent.lower()]
if other_agents:
failures.append(f"Already assigned to other agent(s): {other_agents}")
# 4. Check for existing branches
branches = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/branches?limit=50")
if branches:
issue_branches = [
b["name"] for b in branches
if str(issue_number) in b.get("name", "")
and b["name"] != "main"
]
if issue_branches:
warnings.append(f"Existing branches may target this issue: {issue_branches}")
# 5. Check for existing PRs
prs = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/pulls?state=open&limit=50")
if prs:
issue_prs = [
f"PR #{p['number']}: {p['title']}"
for p in prs
if str(issue_number) in p.get("title", "")
or str(issue_number) in p.get("body", "")
]
if issue_prs:
failures.append(f"Open PR(s) already target this issue: {issue_prs}")
# 6. Check agent lanes
lanes = load_agent_lanes()
labels = [l.get("name", "") for l in (issue.get("labels") or [])]
lane_ok, lane_msg = check_agent_lane(agent, title, labels, lanes)
if not lane_ok:
warnings.append(lane_msg)
else:
messages.append(f" Lane: {lane_msg}")
# Report
if failures:
print("\n FAILURES:")
for f in failures:
print(f"{f}")
if warnings:
print("\n WARNINGS:")
for w in warnings:
print(f" ⚠️ {w}")
if not failures and not warnings:
print(" \u2705 All pre-task gates passed")
passed = len(failures) == 0
return passed, failures + warnings
# ---------------------------------------------------------------------------
# POST-TASK GATE
# ---------------------------------------------------------------------------
def post_task_gate(repo, issue_number, agent, branch):
"""Run all post-task checks. Returns (pass, messages)."""
failures = []
warnings = []
print(f"\n=== POST-TASK GATE: {repo}#{issue_number} by {agent} ===")
print(f" Branch: {branch}")
# 1. Check branch exists
branch_info = gitea_get(
f"/repos/{GITEA_OWNER}/{repo}/branches/{urllib.parse.quote(branch, safe='')}"
)
if not branch_info:
failures.append(f"Branch '{branch}' does not exist")
return False, failures
# 2. Check branch naming convention
if "/" not in branch:
warnings.append(f"Branch name '{branch}' doesn't follow agent/description convention")
elif not branch.startswith(f"{agent}/"):
warnings.append(f"Branch '{branch}' doesn't start with agent name '{agent}/")
# 3. Check for commits ahead of main
compare = gitea_get(
f"/repos/{GITEA_OWNER}/{repo}/compare/main...{urllib.parse.quote(branch, safe='')}"
)
if compare:
commits = compare.get("commits", [])
if not commits:
failures.append("Branch has no commits ahead of main")
else:
print(f" Commits ahead: {len(commits)}")
files = compare.get("diff_files", []) or []
if not files:
# Try alternate key
num_files = compare.get("total_commits", 0)
print(f" Files changed: (check PR diff)")
else:
print(f" Files changed: {len(files)}")
# 4. Check for duplicate PRs
prs = gitea_get(f"/repos/{GITEA_OWNER}/{repo}/pulls?state=open&limit=50")
if prs:
dupe_prs = [
f"PR #{p['number']}"
for p in prs
if str(issue_number) in p.get("title", "")
or str(issue_number) in p.get("body", "")
]
if len(dupe_prs) > 1:
warnings.append(f"Multiple open PRs may target issue #{issue_number}: {dupe_prs}")
# 5. Run syntax guard on changed files (if available)
syntax_guard = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))),
"hermes-sovereign", "scripts", "syntax_guard.py"
)
if os.path.exists(syntax_guard):
try:
result = subprocess.run(
[sys.executable, syntax_guard],
capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
failures.append(f"Syntax guard failed: {result.stdout[:200]}")
else:
print(" Syntax guard: passed")
except Exception as e:
warnings.append(f"Could not run syntax guard: {e}")
else:
warnings.append("syntax_guard.py not found — skipping syntax check")
# Report
if failures:
print("\n FAILURES:")
for f in failures:
print(f"{f}")
if warnings:
print("\n WARNINGS:")
for w in warnings:
print(f" ⚠️ {w}")
if not failures and not warnings:
print(" \u2705 All post-task gates passed")
passed = len(failures) == 0
return passed, failures + warnings
# ---------------------------------------------------------------------------
# MAIN
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Task Gate — pre/post-task quality gates")
subparsers = parser.add_subparsers(dest="command")
# Pre-task
pre = subparsers.add_parser("pre", help="Run pre-task gates")
pre.add_argument("--repo", required=True)
pre.add_argument("--issue", type=int, required=True)
pre.add_argument("--agent", required=True)
# Post-task
post = subparsers.add_parser("post", help="Run post-task gates")
post.add_argument("--repo", required=True)
post.add_argument("--issue", type=int, required=True)
post.add_argument("--agent", required=True)
post.add_argument("--branch", required=True)
args = parser.parse_args()
if not args.command:
parser.print_help()
sys.exit(1)
if args.command == "pre":
passed, msgs = pre_task_gate(args.repo, args.issue, args.agent)
elif args.command == "post":
passed, msgs = post_task_gate(args.repo, args.issue, args.agent, args.branch)
else:
parser.print_help()
sys.exit(1)
sys.exit(0 if passed else 1)
if __name__ == "__main__":
main()

195
scripts/test_harness.sh Executable file
View File

@@ -0,0 +1,195 @@
#!/usr/bin/env bash
# test_harness.sh — Common CLI safety/test harness for the scripts/ suite
# Usage: ./scripts/test_harness.sh [--verbose] [--ci] [directory]
#
# Discovers .sh, .py, and .yaml files in the target directory and validates them:
# - .sh : runs shellcheck (or SKIPS if unavailable)
# - .py : runs python3 -m py_compile
# - .yaml: validates with python3 yaml.safe_load
#
# Exit codes: 0 = all pass, 1 = any fail
set -euo pipefail
# --- Defaults ---
VERBOSE=0
CI_MODE=0
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TARGET_DIR="${SCRIPT_DIR}"
# --- Colors (disabled in CI) ---
RED=""
GREEN=""
YELLOW=""
CYAN=""
RESET=""
if [[ -t 1 && "${CI:-}" != "true" ]]; then
RED=$'\033[0;31m'
GREEN=$'\033[0;32m'
YELLOW=$'\033[0;33m'
CYAN=$'\033[0;36m'
RESET=$'\033[0m'
fi
# --- Argument parsing ---
while [[ $# -gt 0 ]]; do
case "$1" in
--verbose|-v) VERBOSE=1; shift ;;
--ci) CI_MODE=1; shift ;;
-*) echo "Unknown option: $1" >&2; exit 2 ;;
*) TARGET_DIR="$1"; shift ;;
esac
done
# --- Counters ---
PASS=0
FAIL=0
SKIP=0
TOTAL=0
# --- Helpers ---
log_verbose() {
if [[ "${VERBOSE}" -eq 1 ]]; then
echo " ${CYAN}[DEBUG]${RESET} $*"
fi
}
record_pass() {
((PASS++))
((TOTAL++))
echo "${GREEN}PASS${RESET} $1"
}
record_fail() {
((FAIL++))
((TOTAL++))
echo "${RED}FAIL${RESET} $1"
if [[ -n "${2:-}" ]]; then
echo " ${2}"
fi
}
record_skip() {
((SKIP++))
((TOTAL++))
echo "${YELLOW}SKIP${RESET} $1$2"
}
# --- Checkers ---
check_shell_file() {
local file="$1"
local rel="${file#${TARGET_DIR}/}"
if command -v shellcheck &>/dev/null; then
log_verbose "Running shellcheck on ${rel}"
local output
if output=$(shellcheck -x -S warning "${file}" 2>&1); then
record_pass "${rel}"
else
record_fail "${rel}" "${output}"
fi
else
record_skip "${rel}" "shellcheck not installed"
fi
}
check_python_file() {
local file="$1"
local rel="${file#${TARGET_DIR}/}"
log_verbose "Running py_compile on ${rel}"
local output
if output=$(python3 -m py_compile "${file}" 2>&1); then
record_pass "${rel}"
else
record_fail "${rel}" "${output}"
fi
}
check_yaml_file() {
local file="$1"
local rel="${file#${TARGET_DIR}/}"
log_verbose "Validating YAML: ${rel}"
local output
if output=$(python3 -c "import yaml; yaml.safe_load(open('${file}'))" 2>&1); then
record_pass "${rel}"
else
record_fail "${rel}" "${output}"
fi
}
# --- Main ---
echo ""
echo "=== scripts/ test harness ==="
echo "Target: ${TARGET_DIR}"
echo ""
if [[ ! -d "${TARGET_DIR}" ]]; then
echo "Error: target directory '${TARGET_DIR}' not found" >&2
exit 1
fi
# Check python3 availability
if ! command -v python3 &>/dev/null; then
echo "${RED}Error: python3 is required but not found${RESET}" >&2
exit 1
fi
# Check PyYAML availability
if ! python3 -c "import yaml" 2>/dev/null; then
echo "${YELLOW}Warning: PyYAML not installed — YAML checks will be skipped${RESET}" >&2
YAML_AVAILABLE=0
else
YAML_AVAILABLE=1
fi
# Discover and check .sh files
sh_files=()
while IFS= read -r -d '' f; do
sh_files+=("$f")
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.sh" ! -name "test_harness.sh" ! -name "test_runner.sh" -print0 | sort -z)
for f in "${sh_files[@]:-}"; do
[[ -n "$f" ]] && check_shell_file "$f"
done
# Discover and check .py files
py_files=()
while IFS= read -r -d '' f; do
py_files+=("$f")
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.py" -print0 | sort -z)
for f in "${py_files[@]:-}"; do
[[ -n "$f" ]] && check_python_file "$f"
done
# Discover and check .yaml files in target dir
yaml_files=()
while IFS= read -r -d '' f; do
yaml_files+=("$f")
done < <(find "${TARGET_DIR}" -maxdepth 1 -name "*.yaml" -print0 | sort -z)
if [[ "${YAML_AVAILABLE}" -eq 1 ]]; then
for f in "${yaml_files[@]:-}"; do
[[ -n "$f" ]] && check_yaml_file "$f"
done
else
for f in "${yaml_files[@]:-}"; do
[[ -n "$f" ]] && record_skip "${f#${TARGET_DIR}/}" "PyYAML not installed"
done
fi
# --- Summary ---
echo ""
echo "=== Results ==="
echo " ${GREEN}PASS${RESET}: ${PASS}"
echo " ${RED}FAIL${RESET}: ${FAIL}"
echo " ${YELLOW}SKIP${RESET}: ${SKIP}"
echo " Total: ${TOTAL}"
echo ""
if [[ "${FAIL}" -gt 0 ]]; then
echo "${RED}FAILED${RESET}${FAIL} file(s) did not pass validation."
exit 1
else
echo "${GREEN}ALL CLEAR${RESET} — all checked files passed."
exit 0
fi

9
scripts/test_runner.sh Executable file
View File

@@ -0,0 +1,9 @@
#!/usr/bin/env bash
# test_runner.sh — Convenience wrapper for test_harness.sh
# Runs the test harness with sensible defaults for local development.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
exec "${SCRIPT_DIR}/test_harness.sh" --verbose "$@"

233
tests/test_linter.py Normal file
View File

@@ -0,0 +1,233 @@
"""Tests for Architecture Linter v2.
Validates that the linter correctly detects violations and passes clean repos.
Refs: #437 — test-backed linter.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts/ to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
from architecture_linter_v2 import Linter, LinterResult
# ── helpers ───────────────────────────────────────────────────────────
def _make_repo(tmpdir: str, files: dict[str, str], name: str = "test-repo") -> Path:
"""Create a fake repo with given files and return its path."""
repo = Path(tmpdir) / name
repo.mkdir()
for relpath, content in files.items():
p = repo / relpath
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(content)
return repo
def _run(tmpdir, files, name="test-repo"):
repo = _make_repo(tmpdir, files, name)
return Linter(str(repo)).run()
# ── clean repo passes ─────────────────────────────────────────────────
def test_clean_repo_passes():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# Test Repo\n\nThis is a clean test repo with sufficient content to pass.",
"main.py": "print('hello world')\n",
})
assert result.passed, f"Expected pass but got: {result.errors}"
assert result.violation_count == 0
# ── missing README ────────────────────────────────────────────────────
def test_missing_readme_fails():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"main.py": "x = 1\n"})
assert not result.passed
assert any("README" in e for e in result.errors)
def test_short_readme_warns():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"README.md": "hi\n"})
# Warnings don't fail the build
assert result.passed
assert any("short" in w.lower() for w in result.warnings)
# ── hardcoded IPs ─────────────────────────────────────────────────────
def test_hardcoded_public_ip_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"server.py": "HOST = '203.0.113.42'\n",
})
assert not result.passed
assert any("203.0.113.42" in e for e in result.errors)
def test_localhost_ip_ignored():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"server.py": "HOST = '127.0.0.1'\n",
})
ip_errors = [e for e in result.errors if "IP" in e]
assert len(ip_errors) == 0
# ── API keys ──────────────────────────────────────────────────────────
def test_openai_key_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"config.py": 'key = "sk-abcdefghijklmnopqrstuvwx"\n',
})
assert not result.passed
assert any("secret" in e.lower() or "key" in e.lower() for e in result.errors)
def test_aws_key_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"deploy.yaml": 'aws_key: AKIAIOSFODNN7EXAMPLE\n',
})
assert not result.passed
assert any("secret" in e.lower() for e in result.errors)
def test_env_example_skipped():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
".env.example": 'OPENAI_KEY=sk-placeholder\n',
})
secret_errors = [e for e in result.errors if "secret" in e.lower()]
assert len(secret_errors) == 0
# ── sovereignty rules (v1 cloud API checks) ───────────────────────────
def test_openai_url_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"app.py": 'url = "https://api.openai.com/v1/chat"\n',
})
assert not result.passed
assert any("openai" in e.lower() for e in result.errors)
def test_cloud_provider_detected():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"config.yaml": "provider: openai\n",
})
assert not result.passed
assert any("provider" in e.lower() for e in result.errors)
# ── sidecar boundary ──────────────────────────────────────────────────
def test_sovereign_keyword_in_hermes_agent_fails():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"index.py": "import mempalace\n",
}, name="hermes-agent")
assert not result.passed
assert any("sidecar" in e.lower() or "mempalace" in e.lower() for e in result.errors)
def test_sovereign_keyword_in_other_repo_ok():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"index.py": "import mempalace\n",
}, name="some-other-repo")
sidecar_errors = [e for e in result.errors if "sidecar" in e.lower()]
assert len(sidecar_errors) == 0
# ── SOUL.md canonical location ────────────────────────────────────────
def test_soul_md_required_in_timmy_config():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# timmy-config\n\nConfig repo.",
}, name="timmy-config")
assert not result.passed
assert any("SOUL.md" in e for e in result.errors)
def test_soul_md_present_in_timmy_config_ok():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# timmy-config\n\nConfig repo.",
"SOUL.md": "# Soul\n\nCanonical identity document.",
}, name="timmy-config")
soul_errors = [e for e in result.errors if "SOUL" in e]
assert len(soul_errors) == 0
def test_soul_md_in_wrong_repo_fails():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {
"README.md": "# R\n\nGood repo.",
"SOUL.md": "# Soul\n\nShould not be here.",
}, name="other-repo")
assert any("canonical" in e.lower() for e in result.errors)
# ── LinterResult structure ────────────────────────────────────────────
def test_result_summary_is_string():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"README.md": "# OK repo with enough text here\n"})
assert isinstance(result.summary(), str)
assert "PASSED" in result.summary() or "FAILED" in result.summary()
def test_result_repo_name():
with tempfile.TemporaryDirectory() as tmp:
result = _run(tmp, {"README.md": "# OK\n"}, name="my-repo")
assert result.repo_name == "my-repo"
# ── invalid path ──────────────────────────────────────────────────────
def test_invalid_path_raises():
try:
Linter("/nonexistent/path/xyz")
assert False, "Should have raised FileNotFoundError"
except FileNotFoundError:
pass
# ── skip dirs ──────────────────────────────────────────────────────────
def test_git_dir_skipped():
with tempfile.TemporaryDirectory() as tmp:
repo = _make_repo(tmp, {
"README.md": "# R\n\nGood repo.",
"main.py": "x = 1\n",
})
# Create a .git/ dir with a bad file
git_dir = repo / ".git"
git_dir.mkdir()
(git_dir / "bad.py").write_text("HOST = '203.0.113.1'\n")
result = Linter(str(repo)).run()
git_errors = [e for e in result.errors if ".git" in e]
assert len(git_errors) == 0