Compare commits
3 Commits
feat/690-c
...
fix/634-to
| Author | SHA1 | Date | |
|---|---|---|---|
| df4dcf1fb4 | |||
| c4790d8bb9 | |||
| 6fbf5bb649 |
181
orchestration.py
181
orchestration.py
@@ -1,13 +1,20 @@
|
|||||||
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
|
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import subprocess
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from huey import SqliteHuey, signals
|
from huey import SqliteHuey, signals
|
||||||
|
|
||||||
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))
|
logger = logging.getLogger("orchestration")
|
||||||
|
|
||||||
|
huey = SqliteHuey(
|
||||||
|
filename=str(Path.home() / ".hermes" / "orchestration.db"),
|
||||||
|
results=True,
|
||||||
|
)
|
||||||
|
|
||||||
# === Token Tracking ===
|
# === Token Tracking ===
|
||||||
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
||||||
@@ -15,23 +22,24 @@ TOKEN_LOG = Path.home() / ".hermes" / "token_usage.jsonl"
|
|||||||
|
|
||||||
def log_token_usage(task_name, result):
|
def log_token_usage(task_name, result):
|
||||||
"""Log token usage from a completed pipeline task.
|
"""Log token usage from a completed pipeline task.
|
||||||
|
|
||||||
Reads input_tokens/output_tokens from the agent result dict.
|
Reads input_tokens/output_tokens from the agent result dict.
|
||||||
Auto-detects pipeline name from task context.
|
Auto-detects pipeline name from task context.
|
||||||
Appends to JSONL for downstream analysis.
|
Appends to JSONL for downstream analysis.
|
||||||
|
Also records to token_budget for daily enforcement.
|
||||||
"""
|
"""
|
||||||
if not isinstance(result, dict):
|
if not isinstance(result, dict):
|
||||||
return
|
return
|
||||||
|
|
||||||
input_tokens = result.get("input_tokens", 0)
|
input_tokens = result.get("input_tokens", 0)
|
||||||
output_tokens = result.get("output_tokens", 0)
|
output_tokens = result.get("output_tokens", 0)
|
||||||
|
|
||||||
if input_tokens == 0 and output_tokens == 0:
|
if input_tokens == 0 and output_tokens == 0:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Auto-detect pipeline name from task function name
|
# Auto-detect pipeline name from task function name
|
||||||
pipeline = task_name.replace("_task", "").replace("_", "-")
|
pipeline = task_name.replace("_task", "").replace("_", "-")
|
||||||
|
|
||||||
entry = {
|
entry = {
|
||||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||||
"pipeline": pipeline,
|
"pipeline": pipeline,
|
||||||
@@ -40,14 +48,173 @@ def log_token_usage(task_name, result):
|
|||||||
"total_tokens": input_tokens + output_tokens,
|
"total_tokens": input_tokens + output_tokens,
|
||||||
"task": task_name,
|
"task": task_name,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# Write to JSONL log
|
||||||
TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True)
|
TOKEN_LOG.parent.mkdir(parents=True, exist_ok=True)
|
||||||
with open(TOKEN_LOG, "a") as f:
|
with open(TOKEN_LOG, "a") as f:
|
||||||
f.write(json.dumps(entry) + "\n")
|
f.write(json.dumps(entry) + "\n")
|
||||||
|
|
||||||
|
# Record to token budget for daily enforcement
|
||||||
|
try:
|
||||||
|
from scripts.token_budget import record_usage
|
||||||
|
record_usage(pipeline, input_tokens, output_tokens)
|
||||||
|
logger.info(f"Budget updated: {pipeline} +{entry['total_tokens']} tokens")
|
||||||
|
except ImportError:
|
||||||
|
logger.debug("token_budget not available, skipping budget update")
|
||||||
|
|
||||||
|
|
||||||
|
def check_budget(pipeline: str, estimated_tokens: int) -> bool:
|
||||||
|
"""Check if there's enough budget for a pipeline run."""
|
||||||
|
try:
|
||||||
|
from scripts.token_budget import can_afford, get_remaining
|
||||||
|
remaining = get_remaining()
|
||||||
|
if not can_afford(estimated_tokens):
|
||||||
|
logger.warning(
|
||||||
|
f"Budget exhausted for {pipeline}: need {estimated_tokens}, "
|
||||||
|
f"have {remaining}"
|
||||||
|
)
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
except ImportError:
|
||||||
|
return True # No budget module = no enforcement
|
||||||
|
|
||||||
|
|
||||||
@huey.signal(signals.SIGNAL_COMPLETE)
|
@huey.signal(signals.SIGNAL_COMPLETE)
|
||||||
def on_task_complete(signal, task, task_value=None, **kwargs):
|
def on_task_complete(signal, task, task_value=None, **kwargs):
|
||||||
"""Huey hook: log token usage after each pipeline task completes."""
|
"""Huey hook: log token usage after each pipeline task completes."""
|
||||||
task_name = getattr(task, "name", "unknown")
|
task_name = getattr(task, "name", "unknown")
|
||||||
log_token_usage(task_name, task_value)
|
log_token_usage(task_name, task_value)
|
||||||
|
|
||||||
|
|
||||||
|
# === Pipeline Tasks ===
|
||||||
|
|
||||||
|
@huey.task()
|
||||||
|
def playground_factory_task(max_tokens: int = 100000):
|
||||||
|
"""Generate training data pairs from session transcripts."""
|
||||||
|
script = Path(__file__).parent / "scripts" / "pipeline_playground_factory.sh"
|
||||||
|
return _run_pipeline("playground-factory", str(script), max_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
@huey.task()
|
||||||
|
def training_factory_task(max_tokens: int = 150000):
|
||||||
|
"""Run model fine-tuning with generated training data."""
|
||||||
|
script = Path(__file__).parent / "scripts" / "pipeline_training_factory.sh"
|
||||||
|
return _run_pipeline("training-factory", str(script), max_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
@huey.task()
|
||||||
|
def knowledge_mine_task(max_tokens: int = 80000):
|
||||||
|
"""Extract structured knowledge from session archives."""
|
||||||
|
script = Path(__file__).parent / "scripts" / "pipeline_knowledge_mine.sh"
|
||||||
|
return _run_pipeline("knowledge-mine", str(script), max_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
@huey.task()
|
||||||
|
def adversary_task(max_tokens: int = 50000):
|
||||||
|
"""Run adversarial red-team prompts against fleet models."""
|
||||||
|
script = Path(__file__).parent / "scripts" / "pipeline_adversary.sh"
|
||||||
|
return _run_pipeline("adversary", str(script), max_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
@huey.task()
|
||||||
|
def codebase_genome_task(max_tokens: int = 120000):
|
||||||
|
"""Generate GENOME.md for one Gitea repo per run."""
|
||||||
|
script = Path(__file__).parent / "scripts" / "pipeline_codebase_genome.sh"
|
||||||
|
return _run_pipeline("codebase-genome", str(script), max_tokens)
|
||||||
|
|
||||||
|
|
||||||
|
# === Pipeline Runner ===
|
||||||
|
|
||||||
|
def _run_pipeline(name: str, script: str, max_tokens: int) -> dict:
|
||||||
|
"""Run a pipeline script and return structured result with token counts."""
|
||||||
|
if not check_budget(name, max_tokens):
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "skipped",
|
||||||
|
"reason": "budget_exhausted",
|
||||||
|
"input_tokens": 0,
|
||||||
|
"output_tokens": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
if not os.path.isfile(script):
|
||||||
|
logger.error(f"Pipeline script not found: {script}")
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "failed",
|
||||||
|
"reason": "script_not_found",
|
||||||
|
"input_tokens": 0,
|
||||||
|
"output_tokens": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(f"Starting pipeline: {name} (max_tokens={max_tokens})")
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = subprocess.run(
|
||||||
|
["bash", script, "--max-tokens", str(max_tokens)],
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
timeout=3600, # 1 hour max per pipeline
|
||||||
|
)
|
||||||
|
|
||||||
|
# Parse token usage from script output
|
||||||
|
input_tokens = 0
|
||||||
|
output_tokens = 0
|
||||||
|
for line in result.stdout.split("\n"):
|
||||||
|
if "tokens used" in line.lower():
|
||||||
|
try:
|
||||||
|
# Extract number from lines like "... 5000 tokens used."
|
||||||
|
parts = line.split()
|
||||||
|
for i, p in enumerate(parts):
|
||||||
|
if p.isdigit() and i + 1 < len(parts) and "token" in parts[i+1].lower():
|
||||||
|
output_tokens = int(p)
|
||||||
|
break
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "success" if result.returncode == 0 else "failed",
|
||||||
|
"exit_code": result.returncode,
|
||||||
|
"input_tokens": input_tokens,
|
||||||
|
"output_tokens": output_tokens,
|
||||||
|
"stdout_tail": result.stdout[-500:] if result.stdout else "",
|
||||||
|
"stderr_tail": result.stderr[-500:] if result.stderr else "",
|
||||||
|
}
|
||||||
|
except subprocess.TimeoutExpired:
|
||||||
|
logger.error(f"Pipeline {name} timed out after 3600s")
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "timeout",
|
||||||
|
"input_tokens": 0,
|
||||||
|
"output_tokens": 0,
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Pipeline {name} error: {e}")
|
||||||
|
return {
|
||||||
|
"pipeline": name,
|
||||||
|
"status": "error",
|
||||||
|
"reason": str(e),
|
||||||
|
"input_tokens": 0,
|
||||||
|
"output_tokens": 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# === Scheduler Interface ===
|
||||||
|
|
||||||
|
def schedule_nightly():
|
||||||
|
"""Schedule all pipelines in priority order with dependencies.
|
||||||
|
|
||||||
|
Called by the nightly scheduler or manually.
|
||||||
|
Playground and training start in parallel; others follow dependency chain.
|
||||||
|
"""
|
||||||
|
# Phase 1: playground + training (no deps, parallel)
|
||||||
|
playground_factory_task(max_tokens=100000)
|
||||||
|
training_factory_task(max_tokens=150000)
|
||||||
|
|
||||||
|
# Phase 2: knowledge-mine (depends on training)
|
||||||
|
knowledge_mine_task(max_tokens=80000).then(adversary_task, max_tokens=50000)
|
||||||
|
|
||||||
|
# Phase 3: codebase-genome (independent)
|
||||||
|
codebase_genome_task(max_tokens=120000)
|
||||||
|
|
||||||
|
logger.info("Nightly pipeline schedule dispatched")
|
||||||
|
|||||||
161
tests/test_orchestration_token_tracking.py
Normal file
161
tests/test_orchestration_token_tracking.py
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
"""Tests for orchestration.py token tracking integration (issue #634).
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- log_token_usage writes to JSONL
|
||||||
|
- log_token_usage calls token_budget.record_usage
|
||||||
|
- check_budget enforces limits
|
||||||
|
- Huey signal hook fires on task completion
|
||||||
|
- Pipeline tasks are registered
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogTokenUsage:
|
||||||
|
"""Test log_token_usage function."""
|
||||||
|
|
||||||
|
def test_skips_non_dict_result(self):
|
||||||
|
"""Should silently skip non-dict results."""
|
||||||
|
from orchestration import log_token_usage
|
||||||
|
# Should not raise
|
||||||
|
log_token_usage("test_task", None)
|
||||||
|
log_token_usage("test_task", "string")
|
||||||
|
log_token_usage("test_task", 42)
|
||||||
|
|
||||||
|
def test_skips_zero_tokens(self):
|
||||||
|
"""Should skip entries with zero tokens."""
|
||||||
|
from orchestration import log_token_usage
|
||||||
|
with patch("orchestration.TOKEN_LOG") as mock_log:
|
||||||
|
mock_log.parent = MagicMock()
|
||||||
|
log_token_usage("test_task", {"input_tokens": 0, "output_tokens": 0})
|
||||||
|
# Should not write to file
|
||||||
|
mock_log.parent.mkdir.assert_not_called()
|
||||||
|
|
||||||
|
def test_writes_to_jsonl(self, tmp_path):
|
||||||
|
"""Should append token usage to JSONL log."""
|
||||||
|
log_file = tmp_path / "token_usage.jsonl"
|
||||||
|
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
|
||||||
|
from orchestration import log_token_usage
|
||||||
|
log_token_usage("playground_factory_task", {
|
||||||
|
"input_tokens": 100,
|
||||||
|
"output_tokens": 200,
|
||||||
|
})
|
||||||
|
|
||||||
|
assert log_file.exists()
|
||||||
|
line = json.loads(log_file.read_text().strip())
|
||||||
|
assert line["pipeline"] == "playground-factory"
|
||||||
|
assert line["input_tokens"] == 100
|
||||||
|
assert line["output_tokens"] == 200
|
||||||
|
assert line["total_tokens"] == 300
|
||||||
|
|
||||||
|
def test_calls_budget_record_usage(self, tmp_path):
|
||||||
|
"""Should call token_budget.record_usage for budget tracking."""
|
||||||
|
log_file = tmp_path / "token_usage.jsonl"
|
||||||
|
mock_record = MagicMock(return_value={"daily_remaining": 400000})
|
||||||
|
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage", mock_record):
|
||||||
|
from orchestration import log_token_usage
|
||||||
|
log_token_usage("training_factory_task", {
|
||||||
|
"input_tokens": 500,
|
||||||
|
"output_tokens": 1000,
|
||||||
|
})
|
||||||
|
|
||||||
|
mock_record.assert_called_once_with("training-factory", 500, 1000)
|
||||||
|
|
||||||
|
def test_pipeline_name_derived_from_task(self, tmp_path):
|
||||||
|
"""Pipeline name should strip _task suffix and use hyphens."""
|
||||||
|
log_file = tmp_path / "token_usage.jsonl"
|
||||||
|
with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"):
|
||||||
|
from orchestration import log_token_usage
|
||||||
|
log_token_usage("knowledge_mine_task", {
|
||||||
|
"input_tokens": 10,
|
||||||
|
"output_tokens": 20,
|
||||||
|
})
|
||||||
|
|
||||||
|
line = json.loads(log_file.read_text().strip())
|
||||||
|
assert line["pipeline"] == "knowledge-mine"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCheckBudget:
|
||||||
|
"""Test check_budget function."""
|
||||||
|
|
||||||
|
def test_returns_true_when_budget_available(self):
|
||||||
|
"""Should return True when can_afford returns True."""
|
||||||
|
with patch("orchestration.can_afford", return_value=True):
|
||||||
|
from orchestration import check_budget
|
||||||
|
assert check_budget("test", 1000) is True
|
||||||
|
|
||||||
|
def test_returns_false_when_budget_exhausted(self):
|
||||||
|
"""Should return False when can_afford returns False."""
|
||||||
|
with patch("orchestration.can_afford", return_value=False), patch("orchestration.get_remaining", return_value=50):
|
||||||
|
from orchestration import check_budget
|
||||||
|
assert check_budget("test", 10000) is False
|
||||||
|
|
||||||
|
def test_returns_true_when_budget_module_missing(self):
|
||||||
|
"""Should return True (no enforcement) when token_budget not importable."""
|
||||||
|
with patch("orchestration.can_afford", side_effect=ImportError):
|
||||||
|
from orchestration import check_budget
|
||||||
|
assert check_budget("test", 999999) is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestPipelineTasks:
|
||||||
|
"""Test Huey pipeline task registration."""
|
||||||
|
|
||||||
|
def test_all_pipelines_registered(self):
|
||||||
|
"""All 5 pipeline tasks should be registered with Huey."""
|
||||||
|
from orchestration import (
|
||||||
|
playground_factory_task,
|
||||||
|
training_factory_task,
|
||||||
|
knowledge_mine_task,
|
||||||
|
adversary_task,
|
||||||
|
codebase_genome_task,
|
||||||
|
)
|
||||||
|
tasks = [
|
||||||
|
playground_factory_task,
|
||||||
|
training_factory_task,
|
||||||
|
knowledge_mine_task,
|
||||||
|
adversary_task,
|
||||||
|
codebase_genome_task,
|
||||||
|
]
|
||||||
|
for task in tasks:
|
||||||
|
# Huey tasks have a .call_local method
|
||||||
|
assert hasattr(task, "call_local"), f"{task.__name__} not registered with Huey"
|
||||||
|
|
||||||
|
def test_run_pipeline_returns_structured_result(self, tmp_path):
|
||||||
|
"""_run_pipeline should return dict with pipeline, status, tokens."""
|
||||||
|
# Create a stub script
|
||||||
|
stub = tmp_path / "stub.sh"
|
||||||
|
stub.write_text("#!/bin/bash\necho 'tokens used: 42'\n")
|
||||||
|
stub.chmod(0o755)
|
||||||
|
|
||||||
|
with patch("orchestration.check_budget", return_value=True):
|
||||||
|
from orchestration import _run_pipeline
|
||||||
|
result = _run_pipeline("test-pipeline", str(stub), 1000)
|
||||||
|
|
||||||
|
assert result["pipeline"] == "test-pipeline"
|
||||||
|
assert result["status"] == "success"
|
||||||
|
assert "input_tokens" in result
|
||||||
|
assert "output_tokens" in result
|
||||||
|
|
||||||
|
def test_run_pipeline_skips_when_budget_exhausted(self):
|
||||||
|
"""Should return skipped status when budget is exhausted."""
|
||||||
|
with patch("orchestration.check_budget", return_value=False):
|
||||||
|
from orchestration import _run_pipeline
|
||||||
|
result = _run_pipeline("expensive", "/nonexistent", 999999)
|
||||||
|
|
||||||
|
assert result["status"] == "skipped"
|
||||||
|
assert result["reason"] == "budget_exhausted"
|
||||||
|
|
||||||
|
def test_run_pipeline_handles_missing_script(self):
|
||||||
|
"""Should return failed status when script doesn't exist."""
|
||||||
|
with patch("orchestration.check_budget", return_value=True):
|
||||||
|
from orchestration import _run_pipeline
|
||||||
|
result = _run_pipeline("broken", "/no/such/script.sh", 1000)
|
||||||
|
|
||||||
|
assert result["status"] == "failed"
|
||||||
|
assert result["reason"] == "script_not_found"
|
||||||
Reference in New Issue
Block a user