Compare commits
2 Commits
fix/750-co
...
fix/634-to
| Author | SHA1 | Date | |
|---|---|---|---|
| df4dcf1fb4 | |||
| c4790d8bb9 |
181
orchestration.py
181
orchestration.py
@@ -1,13 +1,20 @@
|
||||
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from huey import SqliteHuey, signals
|
||||
|
||||
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
|
||||
logger = logging.getLogger("orchestration")
|
||||
|
||||
huey = SqliteHuey(
|
||||
filename=str(Path.home() / ".hermes" / "orchestration.db"),
|
||||
results=True,
|
||||
)
|
||||
|
||||
# === Token Tracking ===
|
||||
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
||||
@@ -15,23 +22,24 @@ TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
||||
|
||||
def log_token_usage(task_name, result):
|
||||
"""Log token usage from a completed pipeline task.
|
||||
|
||||
|
||||
Reads input_tokens/output_tokens from the agent result dict.
|
||||
Auto-detects pipeline name from task context.
|
||||
Appends to JSONL for downstream analysis.
|
||||
Also records to token_budget for daily enforcement.
|
||||
"""
|
||||
if not isinstance(result, dict):
|
||||
return
|
||||
|
||||
|
||||
input_tokens = result.get("input_tokens", 0)
|
||||
output_tokens = result.get("output_tokens", 0)
|
||||
|
||||
|
||||
if input_tokens == 0 and output_tokens == 0:
|
||||
return
|
||||
|
||||
|
||||
# Auto-detect pipeline name from task function name
|
||||
pipeline = task_name.replace("_task", "").replace("_", "-")
|
||||
|
||||
|
||||
entry = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pipeline": pipeline,
|
||||
@@ -40,14 +48,173 @@ def log_token_usage(task_name, result):
|
||||
"total_tokens": input_tokens + output_tokens,
|
||||
"task": task_name,
|
||||
}
|
||||
|
||||
|
||||
# Write to JSONL log
|
||||
TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(TOKEN_LOG, "a") as f:
|
||||
f.write(json.dumps(entry) + "\n")
|
||||
|
||||
# Record to token budget for daily enforcement
|
||||
try:
|
||||
from scripts.token_budget import record_usage
|
||||
record_usage(pipeline, input_tokens, output_tokens)
|
||||
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
|
||||
except ImportError:
|
||||
logger.debug("token_budget not available, skipping budget update")
|
||||
|
||||
|
||||
def check_budget(pipeline: str, estimated_tokens: int) -> bool:
|
||||
"""Check if there's enough budget for a pipeline run."""
|
||||
try:
|
||||
from scripts.token_budget import can_afford, get_remaining
|
||||
remaining = get_remaining()
|
||||
if not can_afford(estimated_tokens):
|
||||
logger.warning(
|
||||
f"Budget exhausted for {pipeline}: need {estimated_tokens}, "
|
||||
f"have {remaining}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
except ImportError:
|
||||
return True # No budget module = no enforcement
|
||||
|
||||
|
||||
@huey.signal(signals.SIGNAL_COMPLETE)
|
||||
def on_task_complete(signal, task, task_value=None, **kwargs):
|
||||
"""Huey hook: log token usage after each pipeline task completes."""
|
||||
task_name = getattr(task, "name", "unknown")
|
||||
log_token_usage(task_name, task_value)
|
||||
|
||||
|
||||
# === Pipeline Tasks ===
|
||||
|
||||
@huey.task()
|
||||
def playground_factory_task(max_tokens: int = 100000):
|
||||
"""Generate training data pairs from session transcripts."""
|
||||
script = Path(__file__).parent / "scripts" / "pipeline_playground_factory.sh"
|
||||
return _run_pipeline("playground-factory", str(script), max_tokens)
|
||||
|
||||
|
||||
@huey.task()
|
||||
def training_factory_task(max_tokens: int = 150000):
|
||||
"""Run model fine-tuning with generated training data."""
|
||||
script = Path(__file__).parent / "scripts" / "pipeline_training_factory.sh"
|
||||
return _run_pipeline("training-factory", str(script), max_tokens)
|
||||
|
||||
|
||||
@huey.task()
|
||||
def knowledge_mine_task(max_tokens: int = 80000):
|
||||
"""Extract structured knowledge from session archives."""
|
||||
script = Path(__file__).parent / "scripts" / "pipeline_knowledge_mine.sh"
|
||||
return _run_pipeline("knowledge-mine", str(script), max_tokens)
|
||||
|
||||
|
||||
@huey.task()
|
||||
def adversary_task(max_tokens: int = 50000):
|
||||
"""Run adversarial red-team prompts against fleet models."""
|
||||
script = Path(__file__).parent / "scripts" / "pipeline_adversary.sh"
|
||||
return _run_pipeline("adversary", str(script), max_tokens)
|
||||
|
||||
|
||||
@huey.task()
|
||||
def codebase_genome_task(max_tokens: int = 120000):
|
||||
"""Generate GENOME.md for one Gitea repo per run."""
|
||||
script = Path(__file__).parent / "scripts" / "pipeline_codebase_genome.sh"
|
||||
return _run_pipeline("codebase-genome", str(script), max_tokens)
|
||||
|
||||
|
||||
# === Pipeline Runner ===
|
||||
|
||||
def _run_pipeline(name: str, script: str, max_tokens: int) -> dict:
|
||||
"""Run a pipeline script and return structured result with token counts."""
|
||||
if not check_budget(name, max_tokens):
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "skipped",
|
||||
"reason": "budget_exhausted",
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
}
|
||||
|
||||
if not os.path.isfile(script):
|
||||
logger.error(f"Pipeline script not found: {script}")
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "failed",
|
||||
"reason": "script_not_found",
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
}
|
||||
|
||||
logger.info(f"Starting pipeline: {name} (max_tokens={max_tokens})")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["bash", script, "--max-tokens", str(max_tokens)],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=3600, # 1 hour max per pipeline
|
||||
)
|
||||
|
||||
# Parse token usage from script output
|
||||
input_tokens = 0
|
||||
output_tokens = 0
|
||||
for line in result.stdout.split("\n"):
|
||||
if "tokens used" in line.lower():
|
||||
try:
|
||||
# Extract number from lines like "... 5000 tokens used."
|
||||
parts = line.split()
|
||||
for i, p in enumerate(parts):
|
||||
if p.isdigit() and i + 1 < len(parts) and "token" in parts[i+1].lower():
|
||||
output_tokens = int(p)
|
||||
break
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "success" if result.returncode == 0 else "failed",
|
||||
"exit_code": result.returncode,
|
||||
"input_tokens": input_tokens,
|
||||
"output_tokens": output_tokens,
|
||||
"stdout_tail": result.stdout[-500:] if result.stdout else "",
|
||||
"stderr_tail": result.stderr[-500:] if result.stderr else "",
|
||||
}
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error(f"Pipeline {name} timed out after 3600s")
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "timeout",
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Pipeline {name} error: {e}")
|
||||
return {
|
||||
"pipeline": name,
|
||||
"status": "error",
|
||||
"reason": str(e),
|
||||
"input_tokens": 0,
|
||||
"output_tokens": 0,
|
||||
}
|
||||
|
||||
|
||||
# === Scheduler Interface ===
|
||||
|
||||
def schedule_nightly():
|
||||
"""Schedule all pipelines in priority order with dependencies.
|
||||
|
||||
Called by the nightly scheduler or manually.
|
||||
Playground and training start in parallel; others follow dependency chain.
|
||||
"""
|
||||
# Phase 1: playground + training (no deps, parallel)
|
||||
playground_factory_task(max_tokens=100000)
|
||||
training_factory_task(max_tokens=150000)
|
||||
|
||||
# Phase 2: knowledge-mine (depends on training)
|
||||
knowledge_mine_task(max_tokens=80000).then(adversary_task, max_tokens=50000)
|
||||
|
||||
# Phase 3: codebase-genome (independent)
|
||||
codebase_genome_task(max_tokens=120000)
|
||||
|
||||
logger.info("Nightly pipeline schedule dispatched")
|
||||
|
||||
161
tests/test_orchestration_token_tracking.py
Normal file
161
tests/test_orchestration_token_tracking.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Tests for orchestration.py token tracking integration (issue #634).
|
||||
|
||||
Verifies:
|
||||
- log_token_usage writes to JSONL
|
||||
- log_token_usage calls token_budget.record_usage
|
||||
- check_budget enforces limits
|
||||
- Huey signal hook fires on task completion
|
||||
- Pipeline tasks are registered
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestLogTokenUsage:
|
||||
"""Test log_token_usage function."""
|
||||
|
||||
def test_skips_non_dict_result(self):
|
||||
"""Should silently skip non-dict results."""
|
||||
from orchestration import log_token_usage
|
||||
# Should not raise
|
||||
log_token_usage("test_task", None)
|
||||
log_token_usage("test_task", "string")
|
||||
log_token_usage("test_task", 42)
|
||||
|
||||
def test_skips_zero_tokens(self):
|
||||
"""Should skip entries with zero tokens."""
|
||||
from orchestration import log_token_usage
|
||||
with patch("orchestration.TOKEN_LOG") as mock_log:
|
||||
mock_log.parent = MagicMock()
|
||||
log_token_usage("test_task", {"input_tokens": 0, "output_tokens": 0})
|
||||
# Should not write to file
|
||||
mock_log.parent.mkdir.assert_not_called()
|
||||
|
||||
def test_writes_to_jsonl(self, tmp_path):
|
||||
"""Should append token usage to JSONL log."""
|
||||
log_file = tmp_path / "token_usage.jsonl"
|
||||
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
|
||||
from orchestration import log_token_usage
|
||||
log_token_usage("playground_factory_task", {
|
||||
"input_tokens": 100,
|
||||
"output_tokens": 200,
|
||||
})
|
||||
|
||||
assert log_file.exists()
|
||||
line = json.loads(log_file.read_text().strip())
|
||||
assert line["pipeline"] == "playground-factory"
|
||||
assert line["input_tokens"] == 100
|
||||
assert line["output_tokens"] == 200
|
||||
assert line["total_tokens"] == 300
|
||||
|
||||
def test_calls_budget_record_usage(self, tmp_path):
|
||||
"""Should call token_budget.record_usage for budget tracking."""
|
||||
log_file = tmp_path / "token_usage.jsonl"
|
||||
mock_record = MagicMock(return_value={"daily_remaining": 400000})
|
||||
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage", mock_record):
|
||||
from orchestration import log_token_usage
|
||||
log_token_usage("training_factory_task", {
|
||||
"input_tokens": 500,
|
||||
"output_tokens": 1000,
|
||||
})
|
||||
|
||||
mock_record.assert_called_once_with("training-factory", 500, 1000)
|
||||
|
||||
def test_pipeline_name_derived_from_task(self, tmp_path):
|
||||
"""Pipeline name should strip _task suffix and use hyphens."""
|
||||
log_file = tmp_path / "token_usage.jsonl"
|
||||
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
|
||||
from orchestration import log_token_usage
|
||||
log_token_usage("knowledge_mine_task", {
|
||||
"input_tokens": 10,
|
||||
"output_tokens": 20,
|
||||
})
|
||||
|
||||
line = json.loads(log_file.read_text().strip())
|
||||
assert line["pipeline"] == "knowledge-mine"
|
||||
|
||||
|
||||
class TestCheckBudget:
|
||||
"""Test check_budget function."""
|
||||
|
||||
def test_returns_true_when_budget_available(self):
|
||||
"""Should return True when can_afford returns True."""
|
||||
with patch("orchestration.can_afford", return_value=True):
|
||||
from orchestration import check_budget
|
||||
assert check_budget("test", 1000) is True
|
||||
|
||||
def test_returns_false_when_budget_exhausted(self):
|
||||
"""Should return False when can_afford returns False."""
|
||||
with patch("orchestration.can_afford", return_value=False), patch("orchestration.get_remaining", return_value=50):
|
||||
from orchestration import check_budget
|
||||
assert check_budget("test", 10000) is False
|
||||
|
||||
def test_returns_true_when_budget_module_missing(self):
|
||||
"""Should return True (no enforcement) when token_budget not importable."""
|
||||
with patch("orchestration.can_afford", side_effect=ImportError):
|
||||
from orchestration import check_budget
|
||||
assert check_budget("test", 999999) is True
|
||||
|
||||
|
||||
class TestPipelineTasks:
|
||||
"""Test Huey pipeline task registration."""
|
||||
|
||||
def test_all_pipelines_registered(self):
|
||||
"""All 5 pipeline tasks should be registered with Huey."""
|
||||
from orchestration import (
|
||||
playground_factory_task,
|
||||
training_factory_task,
|
||||
knowledge_mine_task,
|
||||
adversary_task,
|
||||
codebase_genome_task,
|
||||
)
|
||||
tasks = [
|
||||
playground_factory_task,
|
||||
training_factory_task,
|
||||
knowledge_mine_task,
|
||||
adversary_task,
|
||||
codebase_genome_task,
|
||||
]
|
||||
for task in tasks:
|
||||
# Huey tasks have a .call_local method
|
||||
assert hasattr(task, "call_local"), f"{task.__name__} not registered with Huey"
|
||||
|
||||
def test_run_pipeline_returns_structured_result(self, tmp_path):
|
||||
"""_run_pipeline should return dict with pipeline, status, tokens."""
|
||||
# Create a stub script
|
||||
stub = tmp_path / "stub.sh"
|
||||
stub.write_text("#!/bin/bash\necho 'tokens used: 42'\n")
|
||||
stub.chmod(0o755)
|
||||
|
||||
with patch("orchestration.check_budget", return_value=True):
|
||||
from orchestration import _run_pipeline
|
||||
result = _run_pipeline("test-pipeline", str(stub), 1000)
|
||||
|
||||
assert result["pipeline"] == "test-pipeline"
|
||||
assert result["status"] == "success"
|
||||
assert "input_tokens" in result
|
||||
assert "output_tokens" in result
|
||||
|
||||
def test_run_pipeline_skips_when_budget_exhausted(self):
|
||||
"""Should return skipped status when budget is exhausted."""
|
||||
with patch("orchestration.check_budget", return_value=False):
|
||||
from orchestration import _run_pipeline
|
||||
result = _run_pipeline("expensive", "/nonexistent", 999999)
|
||||
|
||||
assert result["status"] == "skipped"
|
||||
assert result["reason"] == "budget_exhausted"
|
||||
|
||||
def test_run_pipeline_handles_missing_script(self):
|
||||
"""Should return failed status when script doesn't exist."""
|
||||
with patch("orchestration.check_budget", return_value=True):
|
||||
from orchestration import _run_pipeline
|
||||
result = _run_pipeline("broken", "/no/such/script.sh", 1000)
|
||||
|
||||
assert result["status"] == "failed"
|
||||
assert result["reason"] == "script_not_found"
|
||||
Reference in New Issue
Block a user