Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 26s
PR Checklist / pr-checklist (pull_request) Failing after 4m1s
Smoke Test / smoke (pull_request) Failing after 24s
Validate Config / YAML Lint (pull_request) Failing after 18s
Validate Config / JSON Validate (pull_request) Successful in 16s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m15s
Validate Config / Shell Script Lint (pull_request) Failing after 33s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 8s
Validate Config / Playbook Schema Validation (pull_request) Successful in 24s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
275 lines
8.6 KiB
Python
275 lines
8.6 KiB
Python
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.
|
|
|
|
Pipeline tasks automatically track token usage via token_budget.py.
|
|
After each task completes, the Huey signal records usage for the pipeline.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from huey import SqliteHuey, crontab, signals
|
|
|
|
# --- Setup ---
|
|
HERMES_HOME = Path.home() / ".hermes"
|
|
huey = SqliteHuey(filename=str(HERMES_HOME / "orchestration.db"))
|
|
|
|
# Token budget integration
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
try:
|
|
from token_budget import record_usage, get_remaining, can_afford, get_report
|
|
HAS_BUDGET = True
|
|
except ImportError:
|
|
HAS_BUDGET = False
|
|
|
|
|
|
# --- Pipeline definitions ---
|
|
PIPELINES = {
|
|
"playground-factory": {
|
|
"script": "scripts/pipeline_playground_factory.sh",
|
|
"max_tokens": 100_000,
|
|
"dependencies": [],
|
|
},
|
|
"training-factory": {
|
|
"script": "scripts/pipeline_training_factory.sh",
|
|
"max_tokens": 150_000,
|
|
"dependencies": [],
|
|
},
|
|
"knowledge-mine": {
|
|
"script": "scripts/pipeline_knowledge_mine.sh",
|
|
"max_tokens": 80_000,
|
|
"dependencies": ["training-factory"],
|
|
},
|
|
"adversary": {
|
|
"script": "scripts/pipeline_adversary.sh",
|
|
"max_tokens": 50_000,
|
|
"dependencies": ["knowledge-mine"],
|
|
},
|
|
"codebase-genome": {
|
|
"script": "scripts/pipeline_codebase_genome.sh",
|
|
"max_tokens": 120_000,
|
|
"dependencies": [],
|
|
},
|
|
}
|
|
|
|
|
|
# --- Token tracking signal ---
|
|
@huey.signal()
|
|
def track_tokens(signal, task, task_value=None, **kwargs):
|
|
"""Automatically log token usage after each pipeline task completes.
|
|
|
|
Hooks into Huey's signal system. Fires on task execution.
|
|
Extracts token counts from the task result and records them.
|
|
"""
|
|
if not HAS_BUDGET:
|
|
return
|
|
|
|
# Only track pipeline tasks
|
|
task_name = getattr(task, "name", "") or ""
|
|
if not task_name.startswith("pipeline."):
|
|
return
|
|
|
|
pipeline = task_name.replace("pipeline.", "")
|
|
|
|
# Extract token counts from result
|
|
result = task_value or {}
|
|
if isinstance(result, dict):
|
|
input_tokens = result.get("input_tokens", 0)
|
|
output_tokens = result.get("output_tokens", 0)
|
|
if input_tokens or output_tokens:
|
|
record_usage(pipeline, input_tokens, output_tokens)
|
|
|
|
|
|
# --- Pipeline tasks ---
|
|
@huey.task()
|
|
def pipeline_task(name: str, max_tokens: int = None):
|
|
"""Run a single pipeline and return token usage stats."""
|
|
spec = PIPELINES.get(name)
|
|
if not spec:
|
|
return {"error": f"Unknown pipeline: {name}", "input_tokens": 0, "output_tokens": 0}
|
|
|
|
script = spec["script"]
|
|
budget = max_tokens or spec["max_tokens"]
|
|
|
|
# Check budget before running
|
|
if HAS_BUDGET and not can_afford(budget):
|
|
return {
|
|
"error": f"Insufficient budget for {name} (need {budget}, have {get_remaining()})",
|
|
"input_tokens": 0,
|
|
"output_tokens": 0,
|
|
}
|
|
|
|
# Check dependencies
|
|
for dep in spec.get("dependencies", []):
|
|
dep_state = _get_pipeline_state(dep)
|
|
if dep_state not in ("running", "complete"):
|
|
return {
|
|
"error": f"Dependency {dep} not met (state: {dep_state})",
|
|
"input_tokens": 0,
|
|
"output_tokens": 0,
|
|
}
|
|
|
|
# Run the pipeline script
|
|
script_path = Path.home() / "timmy-config" / script
|
|
if not script_path.exists():
|
|
return {"error": f"Script not found: {script_path}", "input_tokens": 0, "output_tokens": 0}
|
|
|
|
_set_pipeline_state(name, "running")
|
|
log_path = HERMES_HOME / "logs" / f"pipeline-{name}.log"
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["bash", str(script_path), "--max-tokens", str(budget)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=3600, # 1 hour max
|
|
)
|
|
|
|
# Parse token usage from stdout (if script reports it)
|
|
input_tokens = 0
|
|
output_tokens = 0
|
|
for line in result.stdout.splitlines():
|
|
if "input_tokens=" in line:
|
|
try:
|
|
input_tokens = int(line.split("input_tokens=")[1].split()[0])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
if "output_tokens=" in line:
|
|
try:
|
|
output_tokens = int(line.split("output_tokens=")[1].split()[0])
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
# If script didn't report tokens, estimate from output
|
|
if not input_tokens and not output_tokens:
|
|
output_tokens = len(result.stdout) // 4 # rough estimate
|
|
|
|
# Log output
|
|
with open(log_path, "a") as f:
|
|
f.write(f"\n--- {datetime.now(timezone.utc).isoformat()} ---\n")
|
|
f.write(result.stdout)
|
|
if result.stderr:
|
|
f.write(f"\nSTDERR:\n{result.stderr}")
|
|
|
|
if result.returncode == 0:
|
|
_set_pipeline_state(name, "complete")
|
|
return {
|
|
"pipeline": name,
|
|
"status": "complete",
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens,
|
|
}
|
|
else:
|
|
_set_pipeline_state(name, "failed")
|
|
return {
|
|
"pipeline": name,
|
|
"status": "failed",
|
|
"error": result.stderr[:500],
|
|
"input_tokens": input_tokens,
|
|
"output_tokens": output_tokens,
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
_set_pipeline_state(name, "failed")
|
|
return {"pipeline": name, "status": "timeout", "input_tokens": 0, "output_tokens": 0}
|
|
except Exception as e:
|
|
_set_pipeline_state(name, "failed")
|
|
return {"pipeline": name, "status": "error", "error": str(e), "input_tokens": 0, "output_tokens": 0}
|
|
|
|
|
|
@huey.periodic_task(crontab(hour="*/6"))
|
|
def pipeline_scheduler():
|
|
"""Check pipeline state and start the next eligible pipeline."""
|
|
report_lines = ["=== Pipeline Scheduler ==="]
|
|
|
|
# Check budget
|
|
if HAS_BUDGET:
|
|
remaining = get_remaining()
|
|
report_lines.append(f"Budget remaining: {remaining:,} tokens")
|
|
if remaining <= 0:
|
|
report_lines.append("Budget exhausted. Skipping.")
|
|
return "\n".join(report_lines)
|
|
|
|
# Find next eligible pipeline
|
|
started = False
|
|
for name, spec in PIPELINES.items():
|
|
state = _get_pipeline_state(name)
|
|
|
|
if state in ("running", "complete"):
|
|
report_lines.append(f"SKIP {name}: {state}")
|
|
continue
|
|
|
|
# Check dependencies
|
|
deps_ok = True
|
|
for dep in spec.get("dependencies", []):
|
|
dep_state = _get_pipeline_state(dep)
|
|
if dep_state not in ("running", "complete"):
|
|
report_lines.append(f"SKIP {name}: dependency {dep} not met")
|
|
deps_ok = False
|
|
break
|
|
|
|
if not deps_ok:
|
|
continue
|
|
|
|
# Start pipeline
|
|
report_lines.append(f"START {name}")
|
|
pipeline_task(name)
|
|
started = True
|
|
break # One pipeline per scheduler tick
|
|
|
|
if not started:
|
|
report_lines.append("No pipelines to start")
|
|
|
|
return "\n".join(report_lines)
|
|
|
|
|
|
# --- State management ---
|
|
STATE_FILE = HERMES_HOME / "pipeline_state.json"
|
|
|
|
|
|
def _get_pipeline_state(name: str) -> str:
|
|
if STATE_FILE.exists():
|
|
try:
|
|
data = json.loads(STATE_FILE.read_text())
|
|
return data.get(name, {}).get("state", "not_started")
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
return "not_started"
|
|
|
|
|
|
def _set_pipeline_state(name: str, state: str):
|
|
data = {}
|
|
if STATE_FILE.exists():
|
|
try:
|
|
data = json.loads(STATE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
pass
|
|
data[name] = {"state": state, "updated": datetime.now(timezone.utc).isoformat()}
|
|
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
STATE_FILE.write_text(json.dumps(data, indent=2))
|
|
|
|
|
|
# --- CLI ---
|
|
if __name__ == "__main__":
|
|
if "--budget" in sys.argv or "--report" in sys.argv:
|
|
if HAS_BUDGET:
|
|
print(get_report())
|
|
else:
|
|
print("token_budget module not available")
|
|
elif "--status" in sys.argv:
|
|
for name in PIPELINES:
|
|
state = _get_pipeline_state(name)
|
|
print(f" {name}: {state}")
|
|
elif "--run" in sys.argv:
|
|
idx = sys.argv.index("--run")
|
|
name = sys.argv[idx + 1]
|
|
print(f"Enqueuing pipeline: {name}")
|
|
result = pipeline_task(name)
|
|
print(f"Task enqueued")
|
|
else:
|
|
print("Usage: orchestration.py [--budget|--status|--run PIPELINE]")
|