Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ac2bb0f773 | |||
| 1aa4a3d3c8 |
274
orchestration.py
274
orchestration.py
@@ -1,6 +1,274 @@
|
|||||||
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
|
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.
|
||||||
|
|
||||||
from huey import SqliteHuey, crontab
|
Pipeline tasks automatically track token usage via token_budget.py.
|
||||||
|
After each task completes, the Huey signal records usage for the pipeline.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
|
from huey import SqliteHuey, crontab, signals
|
||||||
|
|
||||||
|
# --- Setup ---
|
||||||
|
HERMES_HOME = Path.home() / ".hermes"
|
||||||
|
huey = SqliteHuey(filename=str(HERMES_HOME / "orchestration.db"))
|
||||||
|
|
||||||
|
# Token budget integration
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent))
|
||||||
|
try:
|
||||||
|
from token_budget import record_usage, get_remaining, can_afford, get_report
|
||||||
|
HAS_BUDGET = True
|
||||||
|
except ImportError:
|
||||||
|
HAS_BUDGET = False
|
||||||
|
|
||||||
|
|
||||||
|
# --- Pipeline definitions ---
|
||||||
|
PIPELINES = {
|
||||||
|
"playground-factory": {
|
||||||
|
"script": "scripts/pipeline_playground_factory.sh",
|
||||||
|
"max_tokens": 100_000,
|
||||||
|
"dependencies": [],
|
||||||
|
},
|
||||||
|
"training-factory": {
|
||||||
|
"script": "scripts/pipeline_training_factory.sh",
|
||||||
|
"max_tokens": 150_000,
|
||||||
|
"dependencies": [],
|
||||||
|
},
|
||||||
|
"knowledge-mine": {
|
||||||
|
"script": "scripts/pipeline_knowledge_mine.sh",
|
||||||
|
"max_tokens": 80_000,
|
||||||
|
"dependencies": ["training-factory"],
|
||||||
|
},
|
||||||
|
"adversary": {
|
||||||
|
"script": "scripts/pipeline_adversary.sh",
|
||||||
|
"max_tokens": 50_000,
|
||||||
|
"dependencies": ["knowledge-mine"],
|
||||||
|
},
|
||||||
|
"codebase-genome": {
|
||||||
|
"script": "scripts/pipeline_codebase_genome.sh",
|
||||||
|
"max_tokens": 120_000,
|
||||||
|
"dependencies": [],
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# --- Token tracking signal ---
|
||||||
|
@huey.signal()
|
||||||
|
def track_tokens(signal, task, task_value=None, **kwargs):
|
||||||
|
"""Automatically log token usage after each pipeline task completes.
|
||||||
|
|
||||||
|
Hooks into Huey's signal system. Fires on task execution.
|
||||||
|
Extracts token counts from the task result and records them.
|
||||||
|
"""
|
||||||
|
if not HAS_BUDGET:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Only track pipeline tasks
|
||||||
|
task_name = getattr(task, "name", "") or ""
|
||||||
|
if not task_name.startswith("pipeline."):
|
||||||
|
return
|
||||||
|
|
||||||
|
pipeline = task_name.replace("pipeline.", "")
|
||||||
|
|
||||||
|
# Extract token counts from result
|
||||||
|
result = task_value or {}
|
||||||
|
if isinstance(result, dict):
|
||||||
|
input_tokens = result.get("input_tokens", 0)
|
||||||
|
output_tokens = result.get("output_tokens", 0)
|
||||||
|
if input_tokens or output_tokens:
|
||||||
|
record_usage(pipeline, input_tokens, output_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
# --- Pipeline tasks ---
|
||||||
|
@huey.task()
|
||||||
|
def pipeline_task(name: str, max_tokens: int = None):
|
||||||
|
"""Run a single pipeline and return token usage stats."""
|
||||||
|
spec = PIPELINES.get(name)
|
||||||
|
if not spec:
|
||||||
|
return {"error": f"Unknown pipeline: {name}", "input_tokens": 0, "output_tokens": 0}
|
||||||
|
|
||||||
|
script = spec["script"]
|
||||||
|
budget = max_tokens or spec["max_tokens"]
|
||||||
|
|
||||||
|
# Check budget before running
|
||||||
|
if HAS_BUDGET and not can_afford(budget):
|
||||||
|
return {
|
||||||
|
"error": f"Insufficient budget for {name} (need {budget}, have {get_remaining()})",
|
||||||
|
"input_tokens": 0,
|
||||||
|
"output_tokens": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check dependencies
|
||||||
|
for dep in spec.get("dependencies", []):
|
||||||
|
dep_state = _get_pipeline_state(dep)
|
||||||
|
if dep_state not in ("running", "complete"):
|
||||||
|
return {
|
||||||
|
"error": f"Dependency {dep} not met (state: {dep_state})",
|
||||||
|
"input_tokens": 0,
|
||||||
|
"output_tokens": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Run the pipeline script
|
||||||
|
script_path = Path.home() / "timmy-config" / script
|
||||||
|
if not script_path.exists():
|
||||||
|
return {"error": f"Script not found: {script_path}", "input_tokens": 0, "output_tokens": 0}
|
||||||
|
|
||||||
|
_set_pipeline_state(name, "running")
|
||||||
|
log_path = HERMES_HOME / "logs" / f"pipeline-{name}.log"
|
||||||
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["bash", str(script_path), "--max-tokens", str(budget)],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=3600, # 1 hour max
|
||||||
|
)
|
||||||
|
|
||||||
|
# Parse token usage from stdout (if script reports it)
|
||||||
|
input_tokens = 0
|
||||||
|
output_tokens = 0
|
||||||
|
for line in result.stdout.splitlines():
|
||||||
|
if "input_tokens=" in line:
|
||||||
|
try:
|
||||||
|
input_tokens = int(line.split("input_tokens=")[1].split()[0])
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
pass
|
||||||
|
if "output_tokens=" in line:
|
||||||
|
try:
|
||||||
|
output_tokens = int(line.split("output_tokens=")[1].split()[0])
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
# If script didn't report tokens, estimate from output
|
||||||
|
if not input_tokens and not output_tokens:
|
||||||
|
output_tokens = len(result.stdout) // 4 # rough estimate
|
||||||
|
|
||||||
|
# Log output
|
||||||
|
with open(log_path, "a") as f:
|
||||||
|
f.write(f"\n--- {datetime.now(timezone.utc).isoformat()} ---\n")
|
||||||
|
f.write(result.stdout)
|
||||||
|
if result.stderr:
|
||||||
|
f.write(f"\nSTDERR:\n{result.stderr}")
|
||||||
|
|
||||||
|
if result.returncode == 0:
|
||||||
|
_set_pipeline_state(name, "complete")
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "complete",
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"output_tokens": output_tokens,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
_set_pipeline_state(name, "failed")
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "failed",
|
||||||
|
"error": result.stderr[:500],
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"output_tokens": output_tokens,
|
||||||
|
}
|
||||||
|
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
_set_pipeline_state(name, "failed")
|
||||||
|
return {"pipeline": name, "status": "timeout", "input_tokens": 0, "output_tokens": 0}
|
||||||
|
except Exception as e:
|
||||||
|
_set_pipeline_state(name, "failed")
|
||||||
|
return {"pipeline": name, "status": "error", "error": str(e), "input_tokens": 0, "output_tokens": 0}
|
||||||
|
|
||||||
|
|
||||||
|
@huey.periodic_task(crontab(hour="*/6"))
|
||||||
|
def pipeline_scheduler():
|
||||||
|
"""Check pipeline state and start the next eligible pipeline."""
|
||||||
|
report_lines = ["=== Pipeline Scheduler ==="]
|
||||||
|
|
||||||
|
# Check budget
|
||||||
|
if HAS_BUDGET:
|
||||||
|
remaining = get_remaining()
|
||||||
|
report_lines.append(f"Budget remaining: {remaining:,} tokens")
|
||||||
|
if remaining <= 0:
|
||||||
|
report_lines.append("Budget exhausted. Skipping.")
|
||||||
|
return "\n".join(report_lines)
|
||||||
|
|
||||||
|
# Find next eligible pipeline
|
||||||
|
started = False
|
||||||
|
for name, spec in PIPELINES.items():
|
||||||
|
state = _get_pipeline_state(name)
|
||||||
|
|
||||||
|
if state in ("running", "complete"):
|
||||||
|
report_lines.append(f"SKIP {name}: {state}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check dependencies
|
||||||
|
deps_ok = True
|
||||||
|
for dep in spec.get("dependencies", []):
|
||||||
|
dep_state = _get_pipeline_state(dep)
|
||||||
|
if dep_state not in ("running", "complete"):
|
||||||
|
report_lines.append(f"SKIP {name}: dependency {dep} not met")
|
||||||
|
deps_ok = False
|
||||||
|
break
|
||||||
|
|
||||||
|
if not deps_ok:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Start pipeline
|
||||||
|
report_lines.append(f"START {name}")
|
||||||
|
pipeline_task(name)
|
||||||
|
started = True
|
||||||
|
break # One pipeline per scheduler tick
|
||||||
|
|
||||||
|
if not started:
|
||||||
|
report_lines.append("No pipelines to start")
|
||||||
|
|
||||||
|
return "\n".join(report_lines)
|
||||||
|
|
||||||
|
|
||||||
|
# --- State management ---
|
||||||
|
STATE_FILE = HERMES_HOME / "pipeline_state.json"
|
||||||
|
|
||||||
|
|
||||||
|
def _get_pipeline_state(name: str) -> str:
|
||||||
|
if STATE_FILE.exists():
|
||||||
|
try:
|
||||||
|
data = json.loads(STATE_FILE.read_text())
|
||||||
|
return data.get(name, {}).get("state", "not_started")
|
||||||
|
except (json.JSONDecodeError, OSError):
|
||||||
|
pass
|
||||||
|
return "not_started"
|
||||||
|
|
||||||
|
|
||||||
|
def _set_pipeline_state(name: str, state: str):
|
||||||
|
data = {}
|
||||||
|
if STATE_FILE.exists():
|
||||||
|
try:
|
||||||
|
data = json.loads(STATE_FILE.read_text())
|
||||||
|
except (json.JSONDecodeError, OSError):
|
||||||
|
pass
|
||||||
|
data[name] = {"state": state, "updated": datetime.now(timezone.utc).isoformat()}
|
||||||
|
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
STATE_FILE.write_text(json.dumps(data, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
# --- CLI ---
|
||||||
|
if __name__ == "__main__":
|
||||||
|
if "--budget" in sys.argv or "--report" in sys.argv:
|
||||||
|
if HAS_BUDGET:
|
||||||
|
print(get_report())
|
||||||
|
else:
|
||||||
|
print("token_budget module not available")
|
||||||
|
elif "--status" in sys.argv:
|
||||||
|
for name in PIPELINES:
|
||||||
|
state = _get_pipeline_state(name)
|
||||||
|
print(f" {name}: {state}")
|
||||||
|
elif "--run" in sys.argv:
|
||||||
|
idx = sys.argv.index("--run")
|
||||||
|
name = sys.argv[idx + 1]
|
||||||
|
print(f"Enqueuing pipeline: {name}")
|
||||||
|
result = pipeline_task(name)
|
||||||
|
print(f"Task enqueued")
|
||||||
|
else:
|
||||||
|
print("Usage: orchestration.py [--budget|--status|--run PIPELINE]")
|
||||||
|
|||||||
142
scripts/token_budget.py
Normal file
142
scripts/token_budget.py
Normal file
@@ -0,0 +1,142 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
token_budget.py — Daily token budget tracker for pipeline orchestration.
|
||||||
|
|
||||||
|
Tracks token usage per pipeline per day, enforces daily limits,
|
||||||
|
and provides a query interface for the orchestrator.
|
||||||
|
|
||||||
|
Data: ~/.hermes/pipeline_budget.json
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
BUDGET_FILE = Path.home() / ".hermes" / "pipeline_budget.json"
|
||||||
|
DEFAULT_DAILY_LIMIT = 500_000
|
||||||
|
|
||||||
|
|
||||||
|
def _load() -> dict:
|
||||||
|
if BUDGET_FILE.exists():
|
||||||
|
try:
|
||||||
|
return json.loads(BUDGET_FILE.read_text())
|
||||||
|
except (json.JSONDecodeError, OSError):
|
||||||
|
pass
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def _save(data: dict):
|
||||||
|
BUDGET_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
BUDGET_FILE.write_text(json.dumps(data, indent=2))
|
||||||
|
|
||||||
|
|
||||||
|
def today_key() -> str:
|
||||||
|
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
|
||||||
|
def get_daily_usage(pipeline: str = None) -> dict:
|
||||||
|
"""Get token usage for today. If pipeline specified, return just that pipeline."""
|
||||||
|
data = _load()
|
||||||
|
day = data.get("daily", {}).get(today_key(), {"tokens_used": 0, "pipelines": {}})
|
||||||
|
if pipeline:
|
||||||
|
return {
|
||||||
|
"pipeline": pipeline,
|
||||||
|
"tokens_used": day.get("pipelines", {}).get(pipeline, 0),
|
||||||
|
"daily_total": day.get("tokens_used", 0),
|
||||||
|
}
|
||||||
|
return day
|
||||||
|
|
||||||
|
|
||||||
|
def get_remaining(limit: int = DEFAULT_DAILY_LIMIT) -> int:
|
||||||
|
"""Get remaining token budget for today."""
|
||||||
|
usage = get_daily_usage()
|
||||||
|
return max(0, limit - usage.get("tokens_used", 0))
|
||||||
|
|
||||||
|
|
||||||
|
def can_afford(tokens: int, limit: int = DEFAULT_DAILY_LIMIT) -> bool:
|
||||||
|
"""Check if we have budget for a token spend."""
|
||||||
|
return get_remaining(limit) >= tokens
|
||||||
|
|
||||||
|
|
||||||
|
def record_usage(pipeline: str, input_tokens: int, output_tokens: int) -> dict:
|
||||||
|
"""
|
||||||
|
Record token usage for a pipeline task.
|
||||||
|
|
||||||
|
Called automatically by the orchestrator after each pipeline task completes.
|
||||||
|
Returns the updated daily state.
|
||||||
|
"""
|
||||||
|
total = input_tokens + output_tokens
|
||||||
|
data = _load()
|
||||||
|
today = today_key()
|
||||||
|
|
||||||
|
daily = data.setdefault("daily", {})
|
||||||
|
day = daily.setdefault(today, {"tokens_used": 0, "pipelines": {}})
|
||||||
|
|
||||||
|
day["tokens_used"] = day.get("tokens_used", 0) + total
|
||||||
|
pipes = day.setdefault("pipelines", {})
|
||||||
|
pipes[pipeline] = pipes.get(pipeline, 0) + total
|
||||||
|
|
||||||
|
# Track breakdown
|
||||||
|
breakdown = day.setdefault("breakdown", {})
|
||||||
|
pb = breakdown.setdefault(pipeline, {"input": 0, "output": 0, "calls": 0})
|
||||||
|
pb["input"] += input_tokens
|
||||||
|
pb["output"] += output_tokens
|
||||||
|
pb["calls"] += 1
|
||||||
|
|
||||||
|
# Track lifetime stats
|
||||||
|
lifetime = data.setdefault("lifetime", {"total_tokens": 0, "total_days": 0})
|
||||||
|
lifetime["total_tokens"] = lifetime.get("total_tokens", 0) + total
|
||||||
|
|
||||||
|
_save(data)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"pipeline": pipeline,
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"output_tokens": output_tokens,
|
||||||
|
"total": total,
|
||||||
|
"daily_used": day["tokens_used"],
|
||||||
|
"daily_remaining": get_remaining(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_report() -> str:
|
||||||
|
"""Generate a human-readable budget report."""
|
||||||
|
data = _load()
|
||||||
|
today = today_key()
|
||||||
|
day = data.get("daily", {}).get(today, {"tokens_used": 0, "pipelines": {}})
|
||||||
|
|
||||||
|
lines = []
|
||||||
|
lines.append(f"Token Budget — {today}")
|
||||||
|
lines.append(f" Daily usage: {day.get('tokens_used', 0):,} / {DEFAULT_DAILY_LIMIT:,}")
|
||||||
|
lines.append(f" Remaining: {get_remaining():,}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(" Pipelines:")
|
||||||
|
|
||||||
|
breakdown = day.get("breakdown", {})
|
||||||
|
for name, stats in sorted(breakdown.items(), key=lambda x: -x[1]["output"]):
|
||||||
|
total = stats["input"] + stats["output"]
|
||||||
|
lines.append(f" {name}: {total:,} tokens ({stats['calls']} calls)")
|
||||||
|
|
||||||
|
if not breakdown:
|
||||||
|
lines.append(" (no pipelines run today)")
|
||||||
|
|
||||||
|
lifetime = data.get("lifetime", {})
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f" Lifetime: {lifetime.get('total_tokens', 0):,} total tokens")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import sys
|
||||||
|
if "--report" in sys.argv:
|
||||||
|
print(get_report())
|
||||||
|
elif "--remaining" in sys.argv:
|
||||||
|
print(get_remaining())
|
||||||
|
elif "--can-afford" in sys.argv:
|
||||||
|
idx = sys.argv.index("--can-afford")
|
||||||
|
tokens = int(sys.argv[idx + 1])
|
||||||
|
print("yes" if can_afford(tokens) else "no")
|
||||||
|
else:
|
||||||
|
print(get_report())
|
||||||
Reference in New Issue
Block a user