Compare commits

...

3 Commits

Author SHA1 Message Date
df4dcf1fb4 test: Token tracker orchestrator integration tests (#634)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 24s
Smoke Test / smoke (pull_request) Failing after 9s
Validate Config / YAML Lint (pull_request) Failing after 11s
Validate Config / JSON Validate (pull_request) Successful in 13s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 42s
Validate Config / Shell Script Lint (pull_request) Failing after 34s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 4s
Validate Config / Playbook Schema Validation (pull_request) Successful in 14s
PR Checklist / pr-checklist (pull_request) Failing after 3m32s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
2026-04-17 05:32:18 +00:00
c4790d8bb9 feat: Integrate token tracker with orchestrator (#634)
- Fix corrupted TOKEN_LOG path
- Import token_budget.record_usage in log_token_budget
- Add check_budget() before pipeline runs
- Add Huey tasks for all 5 pipelines
- Add _run_pipeline() runner with timeout and budget enforcement
- Add schedule_nightly() for dependency-ordered dispatch
- Signal hook auto-logs to both JSONL and budget tracker
2026-04-17 05:31:12 +00:00
6fbf5bb649 Merge pull request 'feat: sidecar config validation on deploy' (#797) from feat/690-config-validation into main
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 14s
Smoke Test / smoke (pull_request) Failing after 15s
Validate Config / YAML Lint (pull_request) Failing after 13s
Validate Config / JSON Validate (pull_request) Successful in 16s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 42s
Validate Config / Shell Script Lint (pull_request) Failing after 45s
Validate Config / Cron Syntax Check (pull_request) Successful in 9s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
Validate Config / Playbook Schema Validation (pull_request) Successful in 21s
PR Checklist / pr-checklist (pull_request) Failing after 3m31s
Validate Config / Python Test Suite (pull_request) Has been cancelled
Architecture Lint / Lint Repository (pull_request) Has been cancelled
2026-04-17 05:15:05 +00:00
2 changed files with 335 additions and 7 deletions

View File

@@ -1,13 +1,20 @@
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.""" """Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
import json import json
import logging
import os import os
import subprocess
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from huey import SqliteHuey, signals from huey import SqliteHuey, signals
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db")) logger = logging.getLogger("orchestration")
huey = SqliteHuey(
filename=str(Path.home() / ".hermes" / "orchestration.db"),
results=True,
)
# === Token Tracking === # === Token Tracking ===
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl" TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
@@ -15,23 +22,24 @@ TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
def log_token_usage(task_name, result): def log_token_usage(task_name, result):
"""Log token usage from a completed pipeline task. """Log token usage from a completed pipeline task.
Reads input_tokens/output_tokens from the agent result dict. Reads input_tokens/output_tokens from the agent result dict.
Auto-detects pipeline name from task context. Auto-detects pipeline name from task context.
Appends to JSONL for downstream analysis. Appends to JSONL for downstream analysis.
Also records to token_budget for daily enforcement.
""" """
if not isinstance(result, dict): if not isinstance(result, dict):
return return
input_tokens = result.get("input_tokens", 0) input_tokens = result.get("input_tokens", 0)
output_tokens = result.get("output_tokens", 0) output_tokens = result.get("output_tokens", 0)
if input_tokens == 0 and output_tokens == 0: if input_tokens == 0 and output_tokens == 0:
return return
# Auto-detect pipeline name from task function name # Auto-detect pipeline name from task function name
pipeline = task_name.replace("_task", "").replace("_", "-") pipeline = task_name.replace("_task", "").replace("_", "-")
entry = { entry = {
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"pipeline": pipeline, "pipeline": pipeline,
@@ -40,14 +48,173 @@ def log_token_usage(task_name, result):
"total_tokens": input_tokens + output_tokens, "total_tokens": input_tokens + output_tokens,
"task": task_name, "task": task_name,
} }
# Write to JSONL log
TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True) TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True)
with open(TOKEN_LOG, "a") as f: with open(TOKEN_LOG, "a") as f:
f.write(json.dumps(entry) + "\n") f.write(json.dumps(entry) + "\n")
# Record to token budget for daily enforcement
try:
from scripts.token_budget import record_usage
record_usage(pipeline, input_tokens, output_tokens)
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
except ImportError:
logger.debug("token_budget not available, skipping budget update")
def check_budget(pipeline: str, estimated_tokens: int) -> bool:
"""Check if there's enough budget for a pipeline run."""
try:
from scripts.token_budget import can_afford, get_remaining
remaining = get_remaining()
if not can_afford(estimated_tokens):
logger.warning(
f"Budget exhausted for {pipeline}: need {estimated_tokens}, "
f"have {remaining}"
)
return False
return True
except ImportError:
return True # No budget module = no enforcement
@huey.signal(signals.SIGNAL_COMPLETE) @huey.signal(signals.SIGNAL_COMPLETE)
def on_task_complete(signal, task, task_value=None, **kwargs): def on_task_complete(signal, task, task_value=None, **kwargs):
"""Huey hook: log token usage after each pipeline task completes.""" """Huey hook: log token usage after each pipeline task completes."""
task_name = getattr(task, "name", "unknown") task_name = getattr(task, "name", "unknown")
log_token_usage(task_name, task_value) log_token_usage(task_name, task_value)
# === Pipeline Tasks ===
@huey.task()
def playground_factory_task(max_tokens: int = 100000):
"""Generate training data pairs from session transcripts."""
script = Path(__file__).parent / "scripts" / "pipeline_playground_factory.sh"
return _run_pipeline("playground-factory", str(script), max_tokens)
@huey.task()
def training_factory_task(max_tokens: int = 150000):
"""Run model fine-tuning with generated training data."""
script = Path(__file__).parent / "scripts" / "pipeline_training_factory.sh"
return _run_pipeline("training-factory", str(script), max_tokens)
@huey.task()
def knowledge_mine_task(max_tokens: int = 80000):
"""Extract structured knowledge from session archives."""
script = Path(__file__).parent / "scripts" / "pipeline_knowledge_mine.sh"
return _run_pipeline("knowledge-mine", str(script), max_tokens)
@huey.task()
def adversary_task(max_tokens: int = 50000):
"""Run adversarial red-team prompts against fleet models."""
script = Path(__file__).parent / "scripts" / "pipeline_adversary.sh"
return _run_pipeline("adversary", str(script), max_tokens)
@huey.task()
def codebase_genome_task(max_tokens: int = 120000):
"""Generate GENOME.md for one Gitea repo per run."""
script = Path(__file__).parent / "scripts" / "pipeline_codebase_genome.sh"
return _run_pipeline("codebase-genome", str(script), max_tokens)
# === Pipeline Runner ===
def _run_pipeline(name: str, script: str, max_tokens: int) -> dict:
"""Run a pipeline script and return structured result with token counts."""
if not check_budget(name, max_tokens):
return {
"pipeline": name,
"status": "skipped",
"reason": "budget_exhausted",
"input_tokens": 0,
"output_tokens": 0,
}
if not os.path.isfile(script):
logger.error(f"Pipeline script not found: {script}")
return {
"pipeline": name,
"status": "failed",
"reason": "script_not_found",
"input_tokens": 0,
"output_tokens": 0,
}
logger.info(f"Starting pipeline: {name} (max_tokens={max_tokens})")
try:
result = subprocess.run(
["bash", script, "--max-tokens", str(max_tokens)],
capture_output=True,
text=True,
timeout=3600, # 1 hour max per pipeline
)
# Parse token usage from script output
input_tokens = 0
output_tokens = 0
for line in result.stdout.split("\n"):
if "tokens used" in line.lower():
try:
# Extract number from lines like "... 5000 tokens used."
parts = line.split()
for i, p in enumerate(parts):
if p.isdigit() and i + 1 < len(parts) and "token" in parts[i+1].lower():
output_tokens = int(p)
break
except (ValueError, IndexError):
pass
return {
"pipeline": name,
"status": "success" if result.returncode == 0 else "failed",
"exit_code": result.returncode,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"stdout_tail": result.stdout[-500:] if result.stdout else "",
"stderr_tail": result.stderr[-500:] if result.stderr else "",
}
except subprocess.TimeoutExpired:
logger.error(f"Pipeline {name} timed out after 3600s")
return {
"pipeline": name,
"status": "timeout",
"input_tokens": 0,
"output_tokens": 0,
}
except Exception as e:
logger.error(f"Pipeline {name} error: {e}")
return {
"pipeline": name,
"status": "error",
"reason": str(e),
"input_tokens": 0,
"output_tokens": 0,
}
# === Scheduler Interface ===
def schedule_nightly():
"""Schedule all pipelines in priority order with dependencies.
Called by the nightly scheduler or manually.
Playground and training start in parallel; others follow dependency chain.
"""
# Phase 1: playground + training (no deps, parallel)
playground_factory_task(max_tokens=100000)
training_factory_task(max_tokens=150000)
# Phase 2: knowledge-mine (depends on training)
knowledge_mine_task(max_tokens=80000).then(adversary_task, max_tokens=50000)
# Phase 3: codebase-genome (independent)
codebase_genome_task(max_tokens=120000)
logger.info("Nightly pipeline schedule dispatched")

View File

@@ -0,0 +1,161 @@
"""Tests for orchestration.py token tracking integration (issue #634).
Verifies:
- log_token_usage writes to JSONL
- log_token_usage calls token_budget.record_usage
- check_budget enforces limits
- Huey signal hook fires on task completion
- Pipeline tasks are registered
"""
import json
import os
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
class TestLogTokenUsage:
"""Test log_token_usage function."""
def test_skips_non_dict_result(self):
"""Should silently skip non-dict results."""
from orchestration import log_token_usage
# Should not raise
log_token_usage("test_task", None)
log_token_usage("test_task", "string")
log_token_usage("test_task", 42)
def test_skips_zero_tokens(self):
"""Should skip entries with zero tokens."""
from orchestration import log_token_usage
with patch("orchestration.TOKEN_LOG") as mock_log:
mock_log.parent = MagicMock()
log_token_usage("test_task", {"input_tokens": 0, "output_tokens": 0})
# Should not write to file
mock_log.parent.mkdir.assert_not_called()
def test_writes_to_jsonl(self, tmp_path):
"""Should append token usage to JSONL log."""
log_file = tmp_path / "token_usage.jsonl"
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
from orchestration import log_token_usage
log_token_usage("playground_factory_task", {
"input_tokens": 100,
"output_tokens": 200,
})
assert log_file.exists()
line = json.loads(log_file.read_text().strip())
assert line["pipeline"] == "playground-factory"
assert line["input_tokens"] == 100
assert line["output_tokens"] == 200
assert line["total_tokens"] == 300
def test_calls_budget_record_usage(self, tmp_path):
"""Should call token_budget.record_usage for budget tracking."""
log_file = tmp_path / "token_usage.jsonl"
mock_record = MagicMock(return_value={"daily_remaining": 400000})
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage", mock_record):
from orchestration import log_token_usage
log_token_usage("training_factory_task", {
"input_tokens": 500,
"output_tokens": 1000,
})
mock_record.assert_called_once_with("training-factory", 500, 1000)
def test_pipeline_name_derived_from_task(self, tmp_path):
"""Pipeline name should strip _task suffix and use hyphens."""
log_file = tmp_path / "token_usage.jsonl"
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
from orchestration import log_token_usage
log_token_usage("knowledge_mine_task", {
"input_tokens": 10,
"output_tokens": 20,
})
line = json.loads(log_file.read_text().strip())
assert line["pipeline"] == "knowledge-mine"
class TestCheckBudget:
"""Test check_budget function."""
def test_returns_true_when_budget_available(self):
"""Should return True when can_afford returns True."""
with patch("orchestration.can_afford", return_value=True):
from orchestration import check_budget
assert check_budget("test", 1000) is True
def test_returns_false_when_budget_exhausted(self):
"""Should return False when can_afford returns False."""
with patch("orchestration.can_afford", return_value=False), patch("orchestration.get_remaining", return_value=50):
from orchestration import check_budget
assert check_budget("test", 10000) is False
def test_returns_true_when_budget_module_missing(self):
"""Should return True (no enforcement) when token_budget not importable."""
with patch("orchestration.can_afford", side_effect=ImportError):
from orchestration import check_budget
assert check_budget("test", 999999) is True
class TestPipelineTasks:
"""Test Huey pipeline task registration."""
def test_all_pipelines_registered(self):
"""All 5 pipeline tasks should be registered with Huey."""
from orchestration import (
playground_factory_task,
training_factory_task,
knowledge_mine_task,
adversary_task,
codebase_genome_task,
)
tasks = [
playground_factory_task,
training_factory_task,
knowledge_mine_task,
adversary_task,
codebase_genome_task,
]
for task in tasks:
# Huey tasks have a .call_local method
assert hasattr(task, "call_local"), f"{task.__name__} not registered with Huey"
def test_run_pipeline_returns_structured_result(self, tmp_path):
"""_run_pipeline should return dict with pipeline, status, tokens."""
# Create a stub script
stub = tmp_path / "stub.sh"
stub.write_text("#!/bin/bash\necho 'tokens used: 42'\n")
stub.chmod(0o755)
with patch("orchestration.check_budget", return_value=True):
from orchestration import _run_pipeline
result = _run_pipeline("test-pipeline", str(stub), 1000)
assert result["pipeline"] == "test-pipeline"
assert result["status"] == "success"
assert "input_tokens" in result
assert "output_tokens" in result
def test_run_pipeline_skips_when_budget_exhausted(self):
"""Should return skipped status when budget is exhausted."""
with patch("orchestration.check_budget", return_value=False):
from orchestration import _run_pipeline
result = _run_pipeline("expensive", "/nonexistent", 999999)
assert result["status"] == "skipped"
assert result["reason"] == "budget_exhausted"
def test_run_pipeline_handles_missing_script(self):
"""Should return failed status when script doesn't exist."""
with patch("orchestration.check_budget", return_value=True):
from orchestration import _run_pipeline
result = _run_pipeline("broken", "/no/such/script.sh", 1000)
assert result["status"] == "failed"
assert result["reason"] == "script_not_found"