Compare commits

..

1 Commits

Author SHA1 Message Date
fe22dbfcc9 feat: test suite for quality gates (#629) 2026-04-15 23:08:52 +00:00
3 changed files with 168 additions and 413 deletions

View File

@@ -1,274 +1,6 @@
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
Pipeline tasks automatically track token usage via token_budget.py.
After each task completes, the Huey signal records usage for the pipeline.
"""
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from huey import SqliteHuey, crontab
from pathlib import Path
from huey import SqliteHuey, crontab, signals
# --- Setup ---
HERMES_HOME = Path.home() / ".hermes"
huey = SqliteHuey(filename=str(HERMES_HOME / "orchestration.db"))
# Token budget integration
sys.path.insert(0, str(Path(__file__).parent))
try:
from token_budget import record_usage, get_remaining, can_afford, get_report
HAS_BUDGET = True
except ImportError:
HAS_BUDGET = False
# --- Pipeline definitions ---
PIPELINES = {
"playground-factory": {
"script": "scripts/pipeline_playground_factory.sh",
"max_tokens": 100_000,
"dependencies": [],
},
"training-factory": {
"script": "scripts/pipeline_training_factory.sh",
"max_tokens": 150_000,
"dependencies": [],
},
"knowledge-mine": {
"script": "scripts/pipeline_knowledge_mine.sh",
"max_tokens": 80_000,
"dependencies": ["training-factory"],
},
"adversary": {
"script": "scripts/pipeline_adversary.sh",
"max_tokens": 50_000,
"dependencies": ["knowledge-mine"],
},
"codebase-genome": {
"script": "scripts/pipeline_codebase_genome.sh",
"max_tokens": 120_000,
"dependencies": [],
},
}
# --- Token tracking signal ---
@huey.signal()
def track_tokens(signal, task, task_value=None, **kwargs):
"""Automatically log token usage after each pipeline task completes.
Hooks into Huey's signal system. Fires on task execution.
Extracts token counts from the task result and records them.
"""
if not HAS_BUDGET:
return
# Only track pipeline tasks
task_name = getattr(task, "name", "") or ""
if not task_name.startswith("pipeline."):
return
pipeline = task_name.replace("pipeline.", "")
# Extract token counts from result
result = task_value or {}
if isinstance(result, dict):
input_tokens = result.get("input_tokens", 0)
output_tokens = result.get("output_tokens", 0)
if input_tokens or output_tokens:
record_usage(pipeline, input_tokens, output_tokens)
# --- Pipeline tasks ---
@huey.task()
def pipeline_task(name: str, max_tokens: int = None):
"""Run a single pipeline and return token usage stats."""
spec = PIPELINES.get(name)
if not spec:
return {"error": f"Unknown pipeline: {name}", "input_tokens": 0, "output_tokens": 0}
script = spec["script"]
budget = max_tokens or spec["max_tokens"]
# Check budget before running
if HAS_BUDGET and not can_afford(budget):
return {
"error": f"Insufficient budget for {name} (need {budget}, have {get_remaining()})",
"input_tokens": 0,
"output_tokens": 0,
}
# Check dependencies
for dep in spec.get("dependencies", []):
dep_state = _get_pipeline_state(dep)
if dep_state not in ("running", "complete"):
return {
"error": f"Dependency {dep} not met (state: {dep_state})",
"input_tokens": 0,
"output_tokens": 0,
}
# Run the pipeline script
script_path = Path.home() / "timmy-config" / script
if not script_path.exists():
return {"error": f"Script not found: {script_path}", "input_tokens": 0, "output_tokens": 0}
_set_pipeline_state(name, "running")
log_path = HERMES_HOME / "logs" / f"pipeline-{name}.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
try:
result = subprocess.run(
["bash", str(script_path), "--max-tokens", str(budget)],
capture_output=True,
text=True,
timeout=3600, # 1 hour max
)
# Parse token usage from stdout (if script reports it)
input_tokens = 0
output_tokens = 0
for line in result.stdout.splitlines():
if "input_tokens=" in line:
try:
input_tokens = int(line.split("input_tokens=")[1].split()[0])
except (ValueError, IndexError):
pass
if "output_tokens=" in line:
try:
output_tokens = int(line.split("output_tokens=")[1].split()[0])
except (ValueError, IndexError):
pass
# If script didn't report tokens, estimate from output
if not input_tokens and not output_tokens:
output_tokens = len(result.stdout) // 4 # rough estimate
# Log output
with open(log_path, "a") as f:
f.write(f"\n--- {datetime.now(timezone.utc).isoformat()} ---\n")
f.write(result.stdout)
if result.stderr:
f.write(f"\nSTDERR:\n{result.stderr}")
if result.returncode == 0:
_set_pipeline_state(name, "complete")
return {
"pipeline": name,
"status": "complete",
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
else:
_set_pipeline_state(name, "failed")
return {
"pipeline": name,
"status": "failed",
"error": result.stderr[:500],
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
except subprocess.TimeoutExpired:
_set_pipeline_state(name, "failed")
return {"pipeline": name, "status": "timeout", "input_tokens": 0, "output_tokens": 0}
except Exception as e:
_set_pipeline_state(name, "failed")
return {"pipeline": name, "status": "error", "error": str(e), "input_tokens": 0, "output_tokens": 0}
@huey.periodic_task(crontab(hour="*/6"))
def pipeline_scheduler():
"""Check pipeline state and start the next eligible pipeline."""
report_lines = ["=== Pipeline Scheduler ==="]
# Check budget
if HAS_BUDGET:
remaining = get_remaining()
report_lines.append(f"Budget remaining: {remaining:,} tokens")
if remaining <= 0:
report_lines.append("Budget exhausted. Skipping.")
return "\n".join(report_lines)
# Find next eligible pipeline
started = False
for name, spec in PIPELINES.items():
state = _get_pipeline_state(name)
if state in ("running", "complete"):
report_lines.append(f"SKIP {name}: {state}")
continue
# Check dependencies
deps_ok = True
for dep in spec.get("dependencies", []):
dep_state = _get_pipeline_state(dep)
if dep_state not in ("running", "complete"):
report_lines.append(f"SKIP {name}: dependency {dep} not met")
deps_ok = False
break
if not deps_ok:
continue
# Start pipeline
report_lines.append(f"START {name}")
pipeline_task(name)
started = True
break # One pipeline per scheduler tick
if not started:
report_lines.append("No pipelines to start")
return "\n".join(report_lines)
# --- State management ---
STATE_FILE = HERMES_HOME / "pipeline_state.json"
def _get_pipeline_state(name: str) -> str:
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text())
return data.get(name, {}).get("state", "not_started")
except (json.JSONDecodeError, OSError):
pass
return "not_started"
def _set_pipeline_state(name: str, state: str):
data = {}
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
data[name] = {"state": state, "updated": datetime.now(timezone.utc).isoformat()}
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(data, indent=2))
# --- CLI ---
if __name__ == "__main__":
if "--budget" in sys.argv or "--report" in sys.argv:
if HAS_BUDGET:
print(get_report())
else:
print("token_budget module not available")
elif "--status" in sys.argv:
for name in PIPELINES:
state = _get_pipeline_state(name)
print(f" {name}: {state}")
elif "--run" in sys.argv:
idx = sys.argv.index("--run")
name = sys.argv[idx + 1]
print(f"Enqueuing pipeline: {name}")
result = pipeline_task(name)
print(f"Task enqueued")
else:
print("Usage: orchestration.py [--budget|--status|--run PIPELINE]")
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))

View File

@@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""
token_budget.py — Daily token budget tracker for pipeline orchestration.
Tracks token usage per pipeline per day, enforces daily limits,
and provides a query interface for the orchestrator.
Data: ~/.hermes/pipeline_budget.json
"""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
BUDGET_FILE = Path.home() / ".hermes" / "pipeline_budget.json"
DEFAULT_DAILY_LIMIT = 500_000
def _load() -> dict:
if BUDGET_FILE.exists():
try:
return json.loads(BUDGET_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def _save(data: dict):
BUDGET_FILE.parent.mkdir(parents=True, exist_ok=True)
BUDGET_FILE.write_text(json.dumps(data, indent=2))
def today_key() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def get_daily_usage(pipeline: str = None) -> dict:
"""Get token usage for today. If pipeline specified, return just that pipeline."""
data = _load()
day = data.get("daily", {}).get(today_key(), {"tokens_used": 0, "pipelines": {}})
if pipeline:
return {
"pipeline": pipeline,
"tokens_used": day.get("pipelines", {}).get(pipeline, 0),
"daily_total": day.get("tokens_used", 0),
}
return day
def get_remaining(limit: int = DEFAULT_DAILY_LIMIT) -> int:
"""Get remaining token budget for today."""
usage = get_daily_usage()
return max(0, limit - usage.get("tokens_used", 0))
def can_afford(tokens: int, limit: int = DEFAULT_DAILY_LIMIT) -> bool:
"""Check if we have budget for a token spend."""
return get_remaining(limit) >= tokens
def record_usage(pipeline: str, input_tokens: int, output_tokens: int) -> dict:
"""
Record token usage for a pipeline task.
Called automatically by the orchestrator after each pipeline task completes.
Returns the updated daily state.
"""
total = input_tokens + output_tokens
data = _load()
today = today_key()
daily = data.setdefault("daily", {})
day = daily.setdefault(today, {"tokens_used": 0, "pipelines": {}})
day["tokens_used"] = day.get("tokens_used", 0) + total
pipes = day.setdefault("pipelines", {})
pipes[pipeline] = pipes.get(pipeline, 0) + total
# Track breakdown
breakdown = day.setdefault("breakdown", {})
pb = breakdown.setdefault(pipeline, {"input": 0, "output": 0, "calls": 0})
pb["input"] += input_tokens
pb["output"] += output_tokens
pb["calls"] += 1
# Track lifetime stats
lifetime = data.setdefault("lifetime", {"total_tokens": 0, "total_days": 0})
lifetime["total_tokens"] = lifetime.get("total_tokens", 0) + total
_save(data)
return {
"pipeline": pipeline,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total": total,
"daily_used": day["tokens_used"],
"daily_remaining": get_remaining(),
}
def get_report() -> str:
"""Generate a human-readable budget report."""
data = _load()
today = today_key()
day = data.get("daily", {}).get(today, {"tokens_used": 0, "pipelines": {}})
lines = []
lines.append(f"Token Budget — {today}")
lines.append(f" Daily usage: {day.get('tokens_used', 0):,} / {DEFAULT_DAILY_LIMIT:,}")
lines.append(f" Remaining: {get_remaining():,}")
lines.append("")
lines.append(" Pipelines:")
breakdown = day.get("breakdown", {})
for name, stats in sorted(breakdown.items(), key=lambda x: -x[1]["output"]):
total = stats["input"] + stats["output"]
lines.append(f" {name}: {total:,} tokens ({stats['calls']} calls)")
if not breakdown:
lines.append(" (no pipelines run today)")
lifetime = data.get("lifetime", {})
lines.append("")
lines.append(f" Lifetime: {lifetime.get('total_tokens', 0):,} total tokens")
return "\n".join(lines)
if __name__ == "__main__":
import sys
if "--report" in sys.argv:
print(get_report())
elif "--remaining" in sys.argv:
print(get_remaining())
elif "--can-afford" in sys.argv:
idx = sys.argv.index("--can-afford")
tokens = int(sys.argv[idx + 1])
print("yes" if can_afford(tokens) else "no")
else:
print(get_report())

165
tests/test_quality_gates.py Normal file
View File

@@ -0,0 +1,165 @@
"""Tests for CI Automation Gate and Task Gate.
Tests the quality gate infrastructure:
- ci_automation_gate.py: function length, linting, trailing whitespace
- task_gate.py: pre/post task validation
"""
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
from ci_automation_gate import QualityGate
# =========================================================================
# QualityGate — ci_automation_gate.py
# =========================================================================
class TestQualityGateLinting:
"""Test trailing whitespace and final newline checks."""
def test_clean_file_passes(self, tmp_path):
f = tmp_path / "clean.py"
f.write_text("def foo():\n return 1\n")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.failures == 0
assert gate.warnings == 0
def test_trailing_whitespace_warns(self, tmp_path):
f = tmp_path / "messy.py"
f.write_text("def foo(): \n return 1\n")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.warnings >= 1
assert gate.failures == 0
def test_missing_final_newline_warns(self, tmp_path):
f = tmp_path / "no_newline.py"
f.write_text("def foo():\n return 1")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.warnings >= 1
def test_fix_mode_cleans_whitespace(self, tmp_path):
f = tmp_path / "messy.py"
f.write_text("def foo(): \n return 1\n")
gate = QualityGate(fix=True)
gate.check_file(f)
fixed = f.read_text()
assert " \n" not in fixed # trailing spaces removed
assert fixed.endswith("\n")
def test_fix_mode_adds_final_newline(self, tmp_path):
f = tmp_path / "no_newline.py"
f.write_text("def foo():\n return 1")
gate = QualityGate(fix=True)
gate.check_file(f)
fixed = f.read_text()
assert fixed.endswith("\n")
class TestQualityGateFunctionLength:
"""Test function length detection for JS/TS files."""
def test_short_function_passes(self, tmp_path):
f = tmp_path / "short.js"
f.write_text("function foo() {\n return 1;\n}\n")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.failures == 0
def test_long_function_warns(self, tmp_path):
body = "\n".join(f" console.log({i});" for i in range(25))
f = tmp_path / "long.js"
f.write_text(f"function foo() {{\n{body}\n}}\n")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.warnings >= 1
def test_very_long_function_fails(self, tmp_path):
body = "\n".join(f" console.log({i});" for i in range(55))
f = tmp_path / "huge.js"
f.write_text(f"function foo() {{\n{body}\n}}\n")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.failures >= 1
def test_python_files_skip_length_check(self, tmp_path):
"""Python files should not trigger JS function length regex."""
body = "\n".join(f" x = {i}" for i in range(60))
f = tmp_path / "long.py"
f.write_text(f"def foo():\n{body}\n return x\n")
gate = QualityGate(fix=False)
gate.check_file(f)
assert gate.failures == 0 # JS regex won't match Python
def test_non_code_files_skipped(self, tmp_path):
f = tmp_path / "readme.md"
f.write_text("# Hello \n\nSome text")
gate = QualityGate(fix=False)
gate.check_file(f)
# .md files should be skipped entirely
assert gate.failures == 0
assert gate.warnings == 0
class TestQualityGateRun:
"""Test the full directory scan."""
def test_run_exits_1_on_failure(self, tmp_path):
body = "\n".join(f" console.log({i});" for i in range(55))
f = tmp_path / "huge.js"
f.write_text(f"function foo() {{\n{body}\n}}\n")
gate = QualityGate(fix=False)
with pytest.raises(SystemExit) as exc:
gate.run(str(tmp_path))
assert exc.value.code == 1
def test_run_exits_0_on_clean(self, tmp_path):
f = tmp_path / "clean.py"
f.write_text("x = 1\n")
gate = QualityGate(fix=False)
gate.run(str(tmp_path)) # should not raise
assert gate.failures == 0
def test_run_skips_node_modules(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
bad = nm / "huge.js"
body = "\n".join(f" console.log({i});" for i in range(55))
bad.write_text(f"function foo() {{\n{body}\n}}\n")
gate = QualityGate(fix=False)
gate.run(str(tmp_path))
assert gate.failures == 0 # node_modules skipped
# =========================================================================
# Task Gate — task_gate.py (integration-level tests)
# =========================================================================
class TestTaskGateImports:
"""Verify task_gate module is importable."""
def test_import_task_gate(self):
from task_gate import FILTER_TAGS, AGENT_USERNAMES
assert isinstance(FILTER_TAGS, list)
assert len(FILTER_TAGS) > 0
assert isinstance(AGENT_USERNAMES, set)
assert "timmy" in AGENT_USERNAMES
def test_filter_tags_contain_epic(self):
from task_gate import FILTER_TAGS
assert any("EPIC" in tag for tag in FILTER_TAGS)
def test_filter_tags_contain_permanent(self):
from task_gate import FILTER_TAGS
assert any("PERMANENT" in tag for tag in FILTER_TAGS)