From c4790d8bb9551b883a56e0c5ca8e9fe0aada0d29 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Fri, 17 Apr 2026 05:31:12 +0000 Subject: [PATCH 1/2] feat: Integrate token tracker with orchestrator (#634) - Fix corrupted TOKEN_LOG path - Import token_budget.record_usage in log_token_budget - Add check_budget() before pipeline runs - Add Huey tasks for all 5 pipelines - Add _run_pipeline() runner with timeout and budget enforcement - Add schedule_nightly() for dependency-ordered dispatch - Signal hook auto-logs to both JSONL and budget tracker --- orchestration.py | 181 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 174 insertions(+), 7 deletions(-) diff --git a/orchestration.py b/orchestration.py index ff58f659..51d4380b 100644 --- a/orchestration.py +++ b/orchestration.py @@ -1,13 +1,20 @@ """Sovereign orchestration — Huey replaces 3,843 lines of homebrew.""" import json +import logging import os +import subprocess from datetime import datetime, timezone from pathlib import Path from huey import SqliteHuey, signals -huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db")) +logger = logging.getLogger("orchestration") + +huey = SqliteHuey( + filename=str(Path.home() / ".hermes" / "orchestration.db"), + results=True, +) # === Token Tracking === TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl" @@ -15,23 +22,24 @@ TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl" def log_token_usage(task_name, result): """Log token usage from a completed pipeline task. - + Reads input_tokens/output_tokens from the agent result dict. Auto-detects pipeline name from task context. Appends to JSONL for downstream analysis. + Also records to token_budget for daily enforcement. """ if not isinstance(result, dict): return - + input_tokens = result.get("input_tokens", 0) output_tokens = result.get("output_tokens", 0) - + if input_tokens == 0 and output_tokens == 0: return - + # Auto-detect pipeline name from task function name pipeline = task_name.replace("_task", "").replace("_", "-") - + entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "pipeline": pipeline, @@ -40,14 +48,173 @@ def log_token_usage(task_name, result): "total_tokens": input_tokens + output_tokens, "task": task_name, } - + + # Write to JSONL log TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True) with open(TOKEN_LOG, "a") as f: f.write(json.dumps(entry) + "\n") + # Record to token budget for daily enforcement + try: + from scripts.token_budget import record_usage + record_usage(pipeline, input_tokens, output_tokens) + logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens") + except ImportError: + logger.debug("token_budget not available, skipping budget update") + + +def check_budget(pipeline: str, estimated_tokens: int) -> bool: + """Check if there's enough budget for a pipeline run.""" + try: + from scripts.token_budget import can_afford, get_remaining + remaining = get_remaining() + if not can_afford(estimated_tokens): + logger.warning( + f"Budget exhausted for {pipeline}: need {estimated_tokens}, " + f"have {remaining}" + ) + return False + return True + except ImportError: + return True # No budget module = no enforcement + @huey.signal(signals.SIGNAL_COMPLETE) def on_task_complete(signal, task, task_value=None, **kwargs): """Huey hook: log token usage after each pipeline task completes.""" task_name = getattr(task, "name", "unknown") log_token_usage(task_name, task_value) + + +# === Pipeline Tasks === + +@huey.task() +def playground_factory_task(max_tokens: int = 100000): + """Generate training data pairs from session transcripts.""" + script = Path(__file__).parent / "scripts" / "pipeline_playground_factory.sh" + return _run_pipeline("playground-factory", str(script), max_tokens) + + +@huey.task() +def training_factory_task(max_tokens: int = 150000): + """Run model fine-tuning with generated training data.""" + script = Path(__file__).parent / "scripts" / "pipeline_training_factory.sh" + return _run_pipeline("training-factory", str(script), max_tokens) + + +@huey.task() +def knowledge_mine_task(max_tokens: int = 80000): + """Extract structured knowledge from session archives.""" + script = Path(__file__).parent / "scripts" / "pipeline_knowledge_mine.sh" + return _run_pipeline("knowledge-mine", str(script), max_tokens) + + +@huey.task() +def adversary_task(max_tokens: int = 50000): + """Run adversarial red-team prompts against fleet models.""" + script = Path(__file__).parent / "scripts" / "pipeline_adversary.sh" + return _run_pipeline("adversary", str(script), max_tokens) + + +@huey.task() +def codebase_genome_task(max_tokens: int = 120000): + """Generate GENOME.md for one Gitea repo per run.""" + script = Path(__file__).parent / "scripts" / "pipeline_codebase_genome.sh" + return _run_pipeline("codebase-genome", str(script), max_tokens) + + +# === Pipeline Runner === + +def _run_pipeline(name: str, script: str, max_tokens: int) -> dict: + """Run a pipeline script and return structured result with token counts.""" + if not check_budget(name, max_tokens): + return { + "pipeline": name, + "status": "skipped", + "reason": "budget_exhausted", + "input_tokens": 0, + "output_tokens": 0, + } + + if not os.path.isfile(script): + logger.error(f"Pipeline script not found: {script}") + return { + "pipeline": name, + "status": "failed", + "reason": "script_not_found", + "input_tokens": 0, + "output_tokens": 0, + } + + logger.info(f"Starting pipeline: {name} (max_tokens={max_tokens})") + + try: + result = subprocess.run( + ["bash", script, "--max-tokens", str(max_tokens)], + capture_output=True, + text=True, + timeout=3600, # 1 hour max per pipeline + ) + + # Parse token usage from script output + input_tokens = 0 + output_tokens = 0 + for line in result.stdout.split("\n"): + if "tokens used" in line.lower(): + try: + # Extract number from lines like "... 5000 tokens used." + parts = line.split() + for i, p in enumerate(parts): + if p.isdigit() and i + 1 < len(parts) and "token" in parts[i+1].lower(): + output_tokens = int(p) + break + except (ValueError, IndexError): + pass + + return { + "pipeline": name, + "status": "success" if result.returncode == 0 else "failed", + "exit_code": result.returncode, + "input_tokens": input_tokens, + "output_tokens": output_tokens, + "stdout_tail": result.stdout[-500:] if result.stdout else "", + "stderr_tail": result.stderr[-500:] if result.stderr else "", + } + except subprocess.TimeoutExpired: + logger.error(f"Pipeline {name} timed out after 3600s") + return { + "pipeline": name, + "status": "timeout", + "input_tokens": 0, + "output_tokens": 0, + } + except Exception as e: + logger.error(f"Pipeline {name} error: {e}") + return { + "pipeline": name, + "status": "error", + "reason": str(e), + "input_tokens": 0, + "output_tokens": 0, + } + + +# === Scheduler Interface === + +def schedule_nightly(): + """Schedule all pipelines in priority order with dependencies. + + Called by the nightly scheduler or manually. + Playground and training start in parallel; others follow dependency chain. + """ + # Phase 1: playground + training (no deps, parallel) + playground_factory_task(max_tokens=100000) + training_factory_task(max_tokens=150000) + + # Phase 2: knowledge-mine (depends on training) + knowledge_mine_task(max_tokens=80000).then(adversary_task, max_tokens=50000) + + # Phase 3: codebase-genome (independent) + codebase_genome_task(max_tokens=120000) + + logger.info("Nightly pipeline schedule dispatched") From df4dcf1fb489541186d1c1188c6aa7b06c4447d3 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Fri, 17 Apr 2026 05:32:18 +0000 Subject: [PATCH 2/2] test: Token tracker orchestrator integration tests (#634) --- tests/test_orchestration_token_tracking.py | 161 +++++++++++++++++++++ 1 file changed, 161 insertions(+) create mode 100644 tests/test_orchestration_token_tracking.py diff --git a/tests/test_orchestration_token_tracking.py b/tests/test_orchestration_token_tracking.py new file mode 100644 index 00000000..5cf63ef9 --- /dev/null +++ b/tests/test_orchestration_token_tracking.py @@ -0,0 +1,161 @@ +"""Tests for orchestration.py token tracking integration (issue #634). + +Verifies: +- log_token_usage writes to JSONL +- log_token_usage calls token_budget.record_usage +- check_budget enforces limits +- Huey signal hook fires on task completion +- Pipeline tasks are registered +""" + +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +class TestLogTokenUsage: + """Test log_token_usage function.""" + + def test_skips_non_dict_result(self): + """Should silently skip non-dict results.""" + from orchestration import log_token_usage + # Should not raise + log_token_usage("test_task", None) + log_token_usage("test_task", "string") + log_token_usage("test_task", 42) + + def test_skips_zero_tokens(self): + """Should skip entries with zero tokens.""" + from orchestration import log_token_usage + with patch("orchestration.TOKEN_LOG") as mock_log: + mock_log.parent = MagicMock() + log_token_usage("test_task", {"input_tokens": 0, "output_tokens": 0}) + # Should not write to file + mock_log.parent.mkdir.assert_not_called() + + def test_writes_to_jsonl(self, tmp_path): + """Should append token usage to JSONL log.""" + log_file = tmp_path / "token_usage.jsonl" + with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"): + from orchestration import log_token_usage + log_token_usage("playground_factory_task", { + "input_tokens": 100, + "output_tokens": 200, + }) + + assert log_file.exists() + line = json.loads(log_file.read_text().strip()) + assert line["pipeline"] == "playground-factory" + assert line["input_tokens"] == 100 + assert line["output_tokens"] == 200 + assert line["total_tokens"] == 300 + + def test_calls_budget_record_usage(self, tmp_path): + """Should call token_budget.record_usage for budget tracking.""" + log_file = tmp_path / "token_usage.jsonl" + mock_record = MagicMock(return_value={"daily_remaining": 400000}) + with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage", mock_record): + from orchestration import log_token_usage + log_token_usage("training_factory_task", { + "input_tokens": 500, + "output_tokens": 1000, + }) + + mock_record.assert_called_once_with("training-factory", 500, 1000) + + def test_pipeline_name_derived_from_task(self, tmp_path): + """Pipeline name should strip _task suffix and use hyphens.""" + log_file = tmp_path / "token_usage.jsonl" + with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"): + from orchestration import log_token_usage + log_token_usage("knowledge_mine_task", { + "input_tokens": 10, + "output_tokens": 20, + }) + + line = json.loads(log_file.read_text().strip()) + assert line["pipeline"] == "knowledge-mine" + + +class TestCheckBudget: + """Test check_budget function.""" + + def test_returns_true_when_budget_available(self): + """Should return True when can_afford returns True.""" + with patch("orchestration.can_afford", return_value=True): + from orchestration import check_budget + assert check_budget("test", 1000) is True + + def test_returns_false_when_budget_exhausted(self): + """Should return False when can_afford returns False.""" + with patch("orchestration.can_afford", return_value=False), patch("orchestration.get_remaining", return_value=50): + from orchestration import check_budget + assert check_budget("test", 10000) is False + + def test_returns_true_when_budget_module_missing(self): + """Should return True (no enforcement) when token_budget not importable.""" + with patch("orchestration.can_afford", side_effect=ImportError): + from orchestration import check_budget + assert check_budget("test", 999999) is True + + +class TestPipelineTasks: + """Test Huey pipeline task registration.""" + + def test_all_pipelines_registered(self): + """All 5 pipeline tasks should be registered with Huey.""" + from orchestration import ( + playground_factory_task, + training_factory_task, + knowledge_mine_task, + adversary_task, + codebase_genome_task, + ) + tasks = [ + playground_factory_task, + training_factory_task, + knowledge_mine_task, + adversary_task, + codebase_genome_task, + ] + for task in tasks: + # Huey tasks have a .call_local method + assert hasattr(task, "call_local"), f"{task.__name__} not registered with Huey" + + def test_run_pipeline_returns_structured_result(self, tmp_path): + """_run_pipeline should return dict with pipeline, status, tokens.""" + # Create a stub script + stub = tmp_path / "stub.sh" + stub.write_text("#!/bin/bash\necho 'tokens used: 42'\n") + stub.chmod(0o755) + + with patch("orchestration.check_budget", return_value=True): + from orchestration import _run_pipeline + result = _run_pipeline("test-pipeline", str(stub), 1000) + + assert result["pipeline"] == "test-pipeline" + assert result["status"] == "success" + assert "input_tokens" in result + assert "output_tokens" in result + + def test_run_pipeline_skips_when_budget_exhausted(self): + """Should return skipped status when budget is exhausted.""" + with patch("orchestration.check_budget", return_value=False): + from orchestration import _run_pipeline + result = _run_pipeline("expensive", "/nonexistent", 999999) + + assert result["status"] == "skipped" + assert result["reason"] == "budget_exhausted" + + def test_run_pipeline_handles_missing_script(self): + """Should return failed status when script doesn't exist.""" + with patch("orchestration.check_budget", return_value=True): + from orchestration import _run_pipeline + result = _run_pipeline("broken", "/no/such/script.sh", 1000) + + assert result["status"] == "failed" + assert result["reason"] == "script_not_found"