Compare commits

...

2 Commits

Author SHA1 Message Date
ac2bb0f773 feat: integrate token tracking with orchestrator (#634)
Some checks are pending
Architecture Lint / Linter Tests (pull_request) Waiting to run
Architecture Lint / Lint Repository (pull_request) Blocked by required conditions
PR Checklist / pr-checklist (pull_request) Waiting to run
Smoke Test / smoke (pull_request) Waiting to run
Validate Config / YAML Lint (pull_request) Waiting to run
Validate Config / JSON Validate (pull_request) Waiting to run
Validate Config / Python Syntax & Import Check (pull_request) Waiting to run
Validate Config / Python Test Suite (pull_request) Blocked by required conditions
Validate Config / Shell Script Lint (pull_request) Waiting to run
Validate Config / Cron Syntax Check (pull_request) Waiting to run
Validate Config / Deploy Script Dry Run (pull_request) Waiting to run
Validate Config / Playbook Schema Validation (pull_request) Waiting to run
2026-04-15 23:04:37 +00:00
1aa4a3d3c8 feat: token budget tracker module (#634) 2026-04-15 23:03:54 +00:00
2 changed files with 413 additions and 3 deletions

View File

@@ -1,6 +1,274 @@
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.
from huey import SqliteHuey, crontab
Pipeline tasks automatically track token usage via token_budget.py.
After each task completes, the Huey signal records usage for the pipeline.
"""
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
from huey import SqliteHuey, crontab, signals
# --- Setup ---
HERMES_HOME = Path.home() / ".hermes"
huey = SqliteHuey(filename=str(HERMES_HOME / "orchestration.db"))
# Token budget integration
sys.path.insert(0, str(Path(__file__).parent))
try:
from token_budget import record_usage, get_remaining, can_afford, get_report
HAS_BUDGET = True
except ImportError:
HAS_BUDGET = False
# --- Pipeline definitions ---
PIPELINES = {
"playground-factory": {
"script": "scripts/pipeline_playground_factory.sh",
"max_tokens": 100_000,
"dependencies": [],
},
"training-factory": {
"script": "scripts/pipeline_training_factory.sh",
"max_tokens": 150_000,
"dependencies": [],
},
"knowledge-mine": {
"script": "scripts/pipeline_knowledge_mine.sh",
"max_tokens": 80_000,
"dependencies": ["training-factory"],
},
"adversary": {
"script": "scripts/pipeline_adversary.sh",
"max_tokens": 50_000,
"dependencies": ["knowledge-mine"],
},
"codebase-genome": {
"script": "scripts/pipeline_codebase_genome.sh",
"max_tokens": 120_000,
"dependencies": [],
},
}
# --- Token tracking signal ---
@huey.signal()
def track_tokens(signal, task, task_value=None, **kwargs):
"""Automatically log token usage after each pipeline task completes.
Hooks into Huey's signal system. Fires on task execution.
Extracts token counts from the task result and records them.
"""
if not HAS_BUDGET:
return
# Only track pipeline tasks
task_name = getattr(task, "name", "") or ""
if not task_name.startswith("pipeline."):
return
pipeline = task_name.replace("pipeline.", "")
# Extract token counts from result
result = task_value or {}
if isinstance(result, dict):
input_tokens = result.get("input_tokens", 0)
output_tokens = result.get("output_tokens", 0)
if input_tokens or output_tokens:
record_usage(pipeline, input_tokens, output_tokens)
# --- Pipeline tasks ---
@huey.task()
def pipeline_task(name: str, max_tokens: int = None):
"""Run a single pipeline and return token usage stats."""
spec = PIPELINES.get(name)
if not spec:
return {"error": f"Unknown pipeline: {name}", "input_tokens": 0, "output_tokens": 0}
script = spec["script"]
budget = max_tokens or spec["max_tokens"]
# Check budget before running
if HAS_BUDGET and not can_afford(budget):
return {
"error": f"Insufficient budget for {name} (need {budget}, have {get_remaining()})",
"input_tokens": 0,
"output_tokens": 0,
}
# Check dependencies
for dep in spec.get("dependencies", []):
dep_state = _get_pipeline_state(dep)
if dep_state not in ("running", "complete"):
return {
"error": f"Dependency {dep} not met (state: {dep_state})",
"input_tokens": 0,
"output_tokens": 0,
}
# Run the pipeline script
script_path = Path.home() / "timmy-config" / script
if not script_path.exists():
return {"error": f"Script not found: {script_path}", "input_tokens": 0, "output_tokens": 0}
_set_pipeline_state(name, "running")
log_path = HERMES_HOME / "logs" / f"pipeline-{name}.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
try:
result = subprocess.run(
["bash", str(script_path), "--max-tokens", str(budget)],
capture_output=True,
text=True,
timeout=3600, # 1 hour max
)
# Parse token usage from stdout (if script reports it)
input_tokens = 0
output_tokens = 0
for line in result.stdout.splitlines():
if "input_tokens=" in line:
try:
input_tokens = int(line.split("input_tokens=")[1].split()[0])
except (ValueError, IndexError):
pass
if "output_tokens=" in line:
try:
output_tokens = int(line.split("output_tokens=")[1].split()[0])
except (ValueError, IndexError):
pass
# If script didn't report tokens, estimate from output
if not input_tokens and not output_tokens:
output_tokens = len(result.stdout) // 4 # rough estimate
# Log output
with open(log_path, "a") as f:
f.write(f"\n--- {datetime.now(timezone.utc).isoformat()} ---\n")
f.write(result.stdout)
if result.stderr:
f.write(f"\nSTDERR:\n{result.stderr}")
if result.returncode == 0:
_set_pipeline_state(name, "complete")
return {
"pipeline": name,
"status": "complete",
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
else:
_set_pipeline_state(name, "failed")
return {
"pipeline": name,
"status": "failed",
"error": result.stderr[:500],
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
except subprocess.TimeoutExpired:
_set_pipeline_state(name, "failed")
return {"pipeline": name, "status": "timeout", "input_tokens": 0, "output_tokens": 0}
except Exception as e:
_set_pipeline_state(name, "failed")
return {"pipeline": name, "status": "error", "error": str(e), "input_tokens": 0, "output_tokens": 0}
@huey.periodic_task(crontab(hour="*/6"))
def pipeline_scheduler():
"""Check pipeline state and start the next eligible pipeline."""
report_lines = ["=== Pipeline Scheduler ==="]
# Check budget
if HAS_BUDGET:
remaining = get_remaining()
report_lines.append(f"Budget remaining: {remaining:,} tokens")
if remaining <= 0:
report_lines.append("Budget exhausted. Skipping.")
return "\n".join(report_lines)
# Find next eligible pipeline
started = False
for name, spec in PIPELINES.items():
state = _get_pipeline_state(name)
if state in ("running", "complete"):
report_lines.append(f"SKIP {name}: {state}")
continue
# Check dependencies
deps_ok = True
for dep in spec.get("dependencies", []):
dep_state = _get_pipeline_state(dep)
if dep_state not in ("running", "complete"):
report_lines.append(f"SKIP {name}: dependency {dep} not met")
deps_ok = False
break
if not deps_ok:
continue
# Start pipeline
report_lines.append(f"START {name}")
pipeline_task(name)
started = True
break # One pipeline per scheduler tick
if not started:
report_lines.append("No pipelines to start")
return "\n".join(report_lines)
# --- State management ---
STATE_FILE = HERMES_HOME / "pipeline_state.json"
def _get_pipeline_state(name: str) -> str:
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text())
return data.get(name, {}).get("state", "not_started")
except (json.JSONDecodeError, OSError):
pass
return "not_started"
def _set_pipeline_state(name: str, state: str):
data = {}
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
data[name] = {"state": state, "updated": datetime.now(timezone.utc).isoformat()}
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(data, indent=2))
# --- CLI ---
if __name__ == "__main__":
if "--budget" in sys.argv or "--report" in sys.argv:
if HAS_BUDGET:
print(get_report())
else:
print("token_budget module not available")
elif "--status" in sys.argv:
for name in PIPELINES:
state = _get_pipeline_state(name)
print(f" {name}: {state}")
elif "--run" in sys.argv:
idx = sys.argv.index("--run")
name = sys.argv[idx + 1]
print(f"Enqueuing pipeline: {name}")
result = pipeline_task(name)
print(f"Task enqueued")
else:
print("Usage: orchestration.py [--budget|--status|--run PIPELINE]")

142
scripts/token_budget.py Normal file
View File

@@ -0,0 +1,142 @@
#!/usr/bin/env python3
"""
token_budget.py — Daily token budget tracker for pipeline orchestration.
Tracks token usage per pipeline per day, enforces daily limits,
and provides a query interface for the orchestrator.
Data: ~/.hermes/pipeline_budget.json
"""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
BUDGET_FILE = Path.home() / ".hermes" / "pipeline_budget.json"
DEFAULT_DAILY_LIMIT = 500_000
def _load() -> dict:
if BUDGET_FILE.exists():
try:
return json.loads(BUDGET_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def _save(data: dict):
BUDGET_FILE.parent.mkdir(parents=True, exist_ok=True)
BUDGET_FILE.write_text(json.dumps(data, indent=2))
def today_key() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def get_daily_usage(pipeline: str = None) -> dict:
"""Get token usage for today. If pipeline specified, return just that pipeline."""
data = _load()
day = data.get("daily", {}).get(today_key(), {"tokens_used": 0, "pipelines": {}})
if pipeline:
return {
"pipeline": pipeline,
"tokens_used": day.get("pipelines", {}).get(pipeline, 0),
"daily_total": day.get("tokens_used", 0),
}
return day
def get_remaining(limit: int = DEFAULT_DAILY_LIMIT) -> int:
"""Get remaining token budget for today."""
usage = get_daily_usage()
return max(0, limit - usage.get("tokens_used", 0))
def can_afford(tokens: int, limit: int = DEFAULT_DAILY_LIMIT) -> bool:
"""Check if we have budget for a token spend."""
return get_remaining(limit) >= tokens
def record_usage(pipeline: str, input_tokens: int, output_tokens: int) -> dict:
"""
Record token usage for a pipeline task.
Called automatically by the orchestrator after each pipeline task completes.
Returns the updated daily state.
"""
total = input_tokens + output_tokens
data = _load()
today = today_key()
daily = data.setdefault("daily", {})
day = daily.setdefault(today, {"tokens_used": 0, "pipelines": {}})
day["tokens_used"] = day.get("tokens_used", 0) + total
pipes = day.setdefault("pipelines", {})
pipes[pipeline] = pipes.get(pipeline, 0) + total
# Track breakdown
breakdown = day.setdefault("breakdown", {})
pb = breakdown.setdefault(pipeline, {"input": 0, "output": 0, "calls": 0})
pb["input"] += input_tokens
pb["output"] += output_tokens
pb["calls"] += 1
# Track lifetime stats
lifetime = data.setdefault("lifetime", {"total_tokens": 0, "total_days": 0})
lifetime["total_tokens"] = lifetime.get("total_tokens", 0) + total
_save(data)
return {
"pipeline": pipeline,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total": total,
"daily_used": day["tokens_used"],
"daily_remaining": get_remaining(),
}
def get_report() -> str:
"""Generate a human-readable budget report."""
data = _load()
today = today_key()
day = data.get("daily", {}).get(today, {"tokens_used": 0, "pipelines": {}})
lines = []
lines.append(f"Token Budget — {today}")
lines.append(f" Daily usage: {day.get('tokens_used', 0):,} / {DEFAULT_DAILY_LIMIT:,}")
lines.append(f" Remaining: {get_remaining():,}")
lines.append("")
lines.append(" Pipelines:")
breakdown = day.get("breakdown", {})
for name, stats in sorted(breakdown.items(), key=lambda x: -x[1]["output"]):
total = stats["input"] + stats["output"]
lines.append(f" {name}: {total:,} tokens ({stats['calls']} calls)")
if not breakdown:
lines.append(" (no pipelines run today)")
lifetime = data.get("lifetime", {})
lines.append("")
lines.append(f" Lifetime: {lifetime.get('total_tokens', 0):,} total tokens")
return "\n".join(lines)
if __name__ == "__main__":
import sys
if "--report" in sys.argv:
print(get_report())
elif "--remaining" in sys.argv:
print(get_remaining())
elif "--can-afford" in sys.argv:
idx = sys.argv.index("--can-afford")
tokens = int(sys.argv[idx + 1])
print("yes" if can_afford(tokens) else "no")
else:
print(get_report())