diff --git a/tests/test_orchestration_token_tracking.py b/tests/test_orchestration_token_tracking.py new file mode 100644 index 00000000..5cf63ef9 --- /dev/null +++ b/tests/test_orchestration_token_tracking.py @@ -0,0 +1,161 @@ +"""Tests for orchestration.py token tracking integration (issue #634). + +Verifies: +- log_token_usage writes to JSONL +- log_token_usage calls token_budget.record_usage +- check_budget enforces limits +- Huey signal hook fires on task completion +- Pipeline tasks are registered +""" + +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +class TestLogTokenUsage: + """Test log_token_usage function.""" + + def test_skips_non_dict_result(self): + """Should silently skip non-dict results.""" + from orchestration import log_token_usage + # Should not raise + log_token_usage("test_task", None) + log_token_usage("test_task", "string") + log_token_usage("test_task", 42) + + def test_skips_zero_tokens(self): + """Should skip entries with zero tokens.""" + from orchestration import log_token_usage + with patch("orchestration.TOKEN_LOG") as mock_log: + mock_log.parent = MagicMock() + log_token_usage("test_task", {"input_tokens": 0, "output_tokens": 0}) + # Should not write to file + mock_log.parent.mkdir.assert_not_called() + + def test_writes_to_jsonl(self, tmp_path): + """Should append token usage to JSONL log.""" + log_file = tmp_path / "token_usage.jsonl" + with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"): + from orchestration import log_token_usage + log_token_usage("playground_factory_task", { + "input_tokens": 100, + "output_tokens": 200, + }) + + assert log_file.exists() + line = json.loads(log_file.read_text().strip()) + assert line["pipeline"] == "playground-factory" + assert line["input_tokens"] == 100 + assert line["output_tokens"] == 200 + assert line["total_tokens"] == 300 + + def test_calls_budget_record_usage(self, tmp_path): + """Should call token_budget.record_usage for budget tracking.""" + log_file = tmp_path / "token_usage.jsonl" + mock_record = MagicMock(return_value={"daily_remaining": 400000}) + with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage", mock_record): + from orchestration import log_token_usage + log_token_usage("training_factory_task", { + "input_tokens": 500, + "output_tokens": 1000, + }) + + mock_record.assert_called_once_with("training-factory", 500, 1000) + + def test_pipeline_name_derived_from_task(self, tmp_path): + """Pipeline name should strip _task suffix and use hyphens.""" + log_file = tmp_path / "token_usage.jsonl" + with patch("orchestration.TOKEN_LOG", log_file), patch("orchestration.record_usage"): + from orchestration import log_token_usage + log_token_usage("knowledge_mine_task", { + "input_tokens": 10, + "output_tokens": 20, + }) + + line = json.loads(log_file.read_text().strip()) + assert line["pipeline"] == "knowledge-mine" + + +class TestCheckBudget: + """Test check_budget function.""" + + def test_returns_true_when_budget_available(self): + """Should return True when can_afford returns True.""" + with patch("orchestration.can_afford", return_value=True): + from orchestration import check_budget + assert check_budget("test", 1000) is True + + def test_returns_false_when_budget_exhausted(self): + """Should return False when can_afford returns False.""" + with patch("orchestration.can_afford", return_value=False), patch("orchestration.get_remaining", return_value=50): + from orchestration import check_budget + assert check_budget("test", 10000) is False + + def test_returns_true_when_budget_module_missing(self): + """Should return True (no enforcement) when token_budget not importable.""" + with patch("orchestration.can_afford", side_effect=ImportError): + from orchestration import check_budget + assert check_budget("test", 999999) is True + + +class TestPipelineTasks: + """Test Huey pipeline task registration.""" + + def test_all_pipelines_registered(self): + """All 5 pipeline tasks should be registered with Huey.""" + from orchestration import ( + playground_factory_task, + training_factory_task, + knowledge_mine_task, + adversary_task, + codebase_genome_task, + ) + tasks = [ + playground_factory_task, + training_factory_task, + knowledge_mine_task, + adversary_task, + codebase_genome_task, + ] + for task in tasks: + # Huey tasks have a .call_local method + assert hasattr(task, "call_local"), f"{task.__name__} not registered with Huey" + + def test_run_pipeline_returns_structured_result(self, tmp_path): + """_run_pipeline should return dict with pipeline, status, tokens.""" + # Create a stub script + stub = tmp_path / "stub.sh" + stub.write_text("#!/bin/bash\necho 'tokens used: 42'\n") + stub.chmod(0o755) + + with patch("orchestration.check_budget", return_value=True): + from orchestration import _run_pipeline + result = _run_pipeline("test-pipeline", str(stub), 1000) + + assert result["pipeline"] == "test-pipeline" + assert result["status"] == "success" + assert "input_tokens" in result + assert "output_tokens" in result + + def test_run_pipeline_skips_when_budget_exhausted(self): + """Should return skipped status when budget is exhausted.""" + with patch("orchestration.check_budget", return_value=False): + from orchestration import _run_pipeline + result = _run_pipeline("expensive", "/nonexistent", 999999) + + assert result["status"] == "skipped" + assert result["reason"] == "budget_exhausted" + + def test_run_pipeline_handles_missing_script(self): + """Should return failed status when script doesn't exist.""" + with patch("orchestration.check_budget", return_value=True): + from orchestration import _run_pipeline + result = _run_pipeline("broken", "/no/such/script.sh", 1000) + + assert result["status"] == "failed" + assert result["reason"] == "script_not_found"