Compare commits

...

2 Commits

Author SHA1 Message Date
df4dcf1fb4 test: Token tracker orchestrator integration tests (#634)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 24s
Smoke Test / smoke (pull_request) Failing after 9s
Validate Config / YAML Lint (pull_request) Failing after 11s
Validate Config / JSON Validate (pull_request) Successful in 13s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 42s
Validate Config / Shell Script Lint (pull_request) Failing after 34s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 4s
Validate Config / Playbook Schema Validation (pull_request) Successful in 14s
PR Checklist / pr-checklist (pull_request) Failing after 3m32s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-17 05:32:18 +00:00
c4790d8bb9 feat: Integrate token tracker with orchestrator (#634)
- Fix corrupted TOKEN_LOG path
- Import token_budget.record_usage in log_token_budget
- Add check_budget() before pipeline runs
- Add Huey tasks for all 5 pipelines
- Add _run_pipeline() runner with timeout and budget enforcement
- Add schedule_nightly() for dependency-ordered dispatch
- Signal hook auto-logs to both JSONL and budget tracker
2026-04-17 05:31:12 +00:00
2 changed files with 335 additions and 7 deletions

View File

@@ -1,13 +1,20 @@
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
import json
import logging
import os
import subprocess
from datetime import datetime, timezone
from pathlib import Path
from huey import SqliteHuey, signals
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
logger = logging.getLogger("orchestration")
huey = SqliteHuey(
filename=str(Path.home() / ".hermes" / "orchestration.db"),
results=True,
)
# === Token Tracking ===
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
@@ -15,23 +22,24 @@ TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
def log_token_usage(task_name, result):
"""Log token usage from a completed pipeline task.
Reads input_tokens/output_tokens from the agent result dict.
Auto-detects pipeline name from task context.
Appends to JSONL for downstream analysis.
Also records to token_budget for daily enforcement.
"""
if not isinstance(result, dict):
return
input_tokens = result.get("input_tokens", 0)
output_tokens = result.get("output_tokens", 0)
if input_tokens == 0 and output_tokens == 0:
return
# Auto-detect pipeline name from task function name
pipeline = task_name.replace("_task", "").replace("_", "-")
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline": pipeline,
@@ -40,14 +48,173 @@ def log_token_usage(task_name, result):
"total_tokens": input_tokens + output_tokens,
"task": task_name,
}
# Write to JSONL log
TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(TOKEN_LOG, "a") as f:
f.write(json.dumps(entry) + "\n")
# Record to token budget for daily enforcement
try:
from scripts.token_budget import record_usage
record_usage(pipeline, input_tokens, output_tokens)
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
except ImportError:
logger.debug("token_budget not available, skipping budget update")
def check_budget(pipeline: str, estimated_tokens: int) -> bool:
"""Check if there's enough budget for a pipeline run."""
try:
from scripts.token_budget import can_afford, get_remaining
remaining = get_remaining()
if not can_afford(estimated_tokens):
logger.warning(
f"Budget exhausted for {pipeline}: need {estimated_tokens}, "
f"have {remaining}"
)
return False
return True
except ImportError:
return True # No budget module = no enforcement
@huey.signal(signals.SIGNAL_COMPLETE)
def on_task_complete(signal, task, task_value=None, **kwargs):
"""Huey hook: log token usage after each pipeline task completes."""
task_name = getattr(task, "name", "unknown")
log_token_usage(task_name, task_value)
# === Pipeline Tasks ===
@huey.task()
def playground_factory_task(max_tokens: int = 100000):
"""Generate training data pairs from session transcripts."""
script = Path(__file__).parent / "scripts" / "pipeline_playground_factory.sh"
return _run_pipeline("playground-factory", str(script), max_tokens)
@huey.task()
def training_factory_task(max_tokens: int = 150000):
"""Run model fine-tuning with generated training data."""
script = Path(__file__).parent / "scripts" / "pipeline_training_factory.sh"
return _run_pipeline("training-factory", str(script), max_tokens)
@huey.task()
def knowledge_mine_task(max_tokens: int = 80000):
"""Extract structured knowledge from session archives."""
script = Path(__file__).parent / "scripts" / "pipeline_knowledge_mine.sh"
return _run_pipeline("knowledge-mine", str(script), max_tokens)
@huey.task()
def adversary_task(max_tokens: int = 50000):
"""Run adversarial red-team prompts against fleet models."""
script = Path(__file__).parent / "scripts" / "pipeline_adversary.sh"
return _run_pipeline("adversary", str(script), max_tokens)
@huey.task()
def codebase_genome_task(max_tokens: int = 120000):
"""Generate GENOME.md for one Gitea repo per run."""
script = Path(__file__).parent / "scripts" / "pipeline_codebase_genome.sh"
return _run_pipeline("codebase-genome", str(script), max_tokens)
# === Pipeline Runner ===
def _run_pipeline(name: str, script: str, max_tokens: int) -> dict:
"""Run a pipeline script and return structured result with token counts."""
if not check_budget(name, max_tokens):
return {
"pipeline": name,
"status": "skipped",
"reason": "budget_exhausted",
"input_tokens": 0,
"output_tokens": 0,
}
if not os.path.isfile(script):
logger.error(f"Pipeline script not found: {script}")
return {
"pipeline": name,
"status": "failed",
"reason": "script_not_found",
"input_tokens": 0,
"output_tokens": 0,
}
logger.info(f"Starting pipeline: {name} (max_tokens={max_tokens})")
try:
result = subprocess.run(
["bash", script, "--max-tokens", str(max_tokens)],
capture_output=True,
text=True,
timeout=3600, # 1 hour max per pipeline
)
# Parse token usage from script output
input_tokens = 0
output_tokens = 0
for line in result.stdout.split("\n"):
if "tokens used" in line.lower():
try:
# Extract number from lines like "... 5000 tokens used."
parts = line.split()
for i, p in enumerate(parts):
if p.isdigit() and i + 1 < len(parts) and "token" in parts[i+1].lower():
output_tokens = int(p)
break
except (ValueError, IndexError):
pass
return {
"pipeline": name,
"status": "success" if result.returncode == 0 else "failed",
"exit_code": result.returncode,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"stdout_tail": result.stdout[-500:] if result.stdout else "",
"stderr_tail": result.stderr[-500:] if result.stderr else "",
}
except subprocess.TimeoutExpired:
logger.error(f"Pipeline {name} timed out after 3600s")
return {
"pipeline": name,
"status": "timeout",
"input_tokens": 0,
"output_tokens": 0,
}
except Exception as e:
logger.error(f"Pipeline {name} error: {e}")
return {
"pipeline": name,
"status": "error",
"reason": str(e),
"input_tokens": 0,
"output_tokens": 0,
}
# === Scheduler Interface ===
def schedule_nightly():
"""Schedule all pipelines in priority order with dependencies.
Called by the nightly scheduler or manually.
Playground and training start in parallel; others follow dependency chain.
"""
# Phase 1: playground + training (no deps, parallel)
playground_factory_task(max_tokens=100000)
training_factory_task(max_tokens=150000)
# Phase 2: knowledge-mine (depends on training)
knowledge_mine_task(max_tokens=80000).then(adversary_task, max_tokens=50000)
# Phase 3: codebase-genome (independent)
codebase_genome_task(max_tokens=120000)
logger.info("Nightly pipeline schedule dispatched")

View File

@@ -0,0 +1,161 @@
"""Tests for orchestration.py token tracking integration (issue #634).
Verifies:
- log_token_usage writes to JSONL
- log_token_usage calls token_budget.record_usage
- check_budget enforces limits
- Huey signal hook fires on task completion
- Pipeline tasks are registered
"""
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
class TestLogTokenUsage:
"""Test log_token_usage function."""
def test_skips_non_dict_result(self):
"""Should silently skip non-dict results."""
from orchestration import log_token_usage
# Should not raise
log_token_usage("test_task", None)
log_token_usage("test_task", "string")
log_token_usage("test_task", 42)
def test_skips_zero_tokens(self):
"""Should skip entries with zero tokens."""
from orchestration import log_token_usage
with patch("orchestration.TOKEN_LOG") as mock_log:
mock_log.parent = MagicMock()
log_token_usage("test_task", {"input_tokens": 0, "output_tokens": 0})
# Should not write to file
mock_log.parent.mkdir.assert_not_called()
def test_writes_to_jsonl(self, tmp_path):
"""Should append token usage to JSONL log."""
log_file = tmp_path / "token_usage.jsonl"
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
from orchestration import log_token_usage
log_token_usage("playground_factory_task", {
"input_tokens": 100,
"output_tokens": 200,
})
assert log_file.exists()
line = json.loads(log_file.read_text().strip())
assert line["pipeline"] == "playground-factory"
assert line["input_tokens"] == 100
assert line["output_tokens"] == 200
assert line["total_tokens"] == 300
def test_calls_budget_record_usage(self, tmp_path):
"""Should call token_budget.record_usage for budget tracking."""
log_file = tmp_path / "token_usage.jsonl"
mock_record = MagicMock(return_value={"daily_remaining": 400000})
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage", mock_record):
from orchestration import log_token_usage
log_token_usage("training_factory_task", {
"input_tokens": 500,
"output_tokens": 1000,
})
mock_record.assert_called_once_with("training-factory", 500, 1000)
def test_pipeline_name_derived_from_task(self, tmp_path):
"""Pipeline name should strip _task suffix and use hyphens."""
log_file = tmp_path / "token_usage.jsonl"
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
from orchestration import log_token_usage
log_token_usage("knowledge_mine_task", {
"input_tokens": 10,
"output_tokens": 20,
})
line = json.loads(log_file.read_text().strip())
assert line["pipeline"] == "knowledge-mine"
class TestCheckBudget:
"""Test check_budget function."""
def test_returns_true_when_budget_available(self):
"""Should return True when can_afford returns True."""
with patch("orchestration.can_afford", return_value=True):
from orchestration import check_budget
assert check_budget("test", 1000) is True
def test_returns_false_when_budget_exhausted(self):
"""Should return False when can_afford returns False."""
with patch("orchestration.can_afford", return_value=False), patch("orchestration.get_remaining", return_value=50):
from orchestration import check_budget
assert check_budget("test", 10000) is False
def test_returns_true_when_budget_module_missing(self):
"""Should return True (no enforcement) when token_budget not importable."""
with patch("orchestration.can_afford", side_effect=ImportError):
from orchestration import check_budget
assert check_budget("test", 999999) is True
class TestPipelineTasks:
"""Test Huey pipeline task registration."""
def test_all_pipelines_registered(self):
"""All 5 pipeline tasks should be registered with Huey."""
from orchestration import (
playground_factory_task,
training_factory_task,
knowledge_mine_task,
adversary_task,
codebase_genome_task,
)
tasks = [
playground_factory_task,
training_factory_task,
knowledge_mine_task,
adversary_task,
codebase_genome_task,
]
for task in tasks:
# Huey tasks have a .call_local method
assert hasattr(task, "call_local"), f"{task.__name__} not registered with Huey"
def test_run_pipeline_returns_structured_result(self, tmp_path):
"""_run_pipeline should return dict with pipeline, status, tokens."""
# Create a stub script
stub = tmp_path / "stub.sh"
stub.write_text("#!/bin/bash\necho 'tokens used: 42'\n")
stub.chmod(0o755)
with patch("orchestration.check_budget", return_value=True):
from orchestration import _run_pipeline
result = _run_pipeline("test-pipeline", str(stub), 1000)
assert result["pipeline"] == "test-pipeline"
assert result["status"] == "success"
assert "input_tokens" in result
assert "output_tokens" in result
def test_run_pipeline_skips_when_budget_exhausted(self):
"""Should return skipped status when budget is exhausted."""
with patch("orchestration.check_budget", return_value=False):
from orchestration import _run_pipeline
result = _run_pipeline("expensive", "/nonexistent", 999999)
assert result["status"] == "skipped"
assert result["reason"] == "budget_exhausted"
def test_run_pipeline_handles_missing_script(self):
"""Should return failed status when script doesn't exist."""
with patch("orchestration.check_budget", return_value=True):
from orchestration import _run_pipeline
result = _run_pipeline("broken", "/no/such/script.sh", 1000)
assert result["status"] == "failed"
assert result["reason"] == "script_not_found"