Compare commits

..

3 Commits

4 changed files with 292 additions and 413 deletions

View File

@@ -1,274 +1,6 @@
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew.
"""Sovereign orchestration — Huey replaces 3,843 lines of homebrew."""
Pipeline tasks automatically track token usage via token_budget.py.
After each task completes, the Huey signal records usage for the pipeline.
"""
import json
import os
import subprocess
import sys
from datetime import datetime, timezone
from huey import SqliteHuey, crontab
from pathlib import Path
from huey import SqliteHuey, crontab, signals
# --- Setup ---
HERMES_HOME = Path.home() / ".hermes"
huey = SqliteHuey(filename=str(HERMES_HOME / "orchestration.db"))
# Token budget integration
sys.path.insert(0, str(Path(__file__).parent))
try:
from token_budget import record_usage, get_remaining, can_afford, get_report
HAS_BUDGET = True
except ImportError:
HAS_BUDGET = False
# --- Pipeline definitions ---
PIPELINES = {
"playground-factory": {
"script": "scripts/pipeline_playground_factory.sh",
"max_tokens": 100_000,
"dependencies": [],
},
"training-factory": {
"script": "scripts/pipeline_training_factory.sh",
"max_tokens": 150_000,
"dependencies": [],
},
"knowledge-mine": {
"script": "scripts/pipeline_knowledge_mine.sh",
"max_tokens": 80_000,
"dependencies": ["training-factory"],
},
"adversary": {
"script": "scripts/pipeline_adversary.sh",
"max_tokens": 50_000,
"dependencies": ["knowledge-mine"],
},
"codebase-genome": {
"script": "scripts/pipeline_codebase_genome.sh",
"max_tokens": 120_000,
"dependencies": [],
},
}
# --- Token tracking signal ---
@huey.signal()
def track_tokens(signal, task, task_value=None, **kwargs):
"""Automatically log token usage after each pipeline task completes.
Hooks into Huey's signal system. Fires on task execution.
Extracts token counts from the task result and records them.
"""
if not HAS_BUDGET:
return
# Only track pipeline tasks
task_name = getattr(task, "name", "") or ""
if not task_name.startswith("pipeline."):
return
pipeline = task_name.replace("pipeline.", "")
# Extract token counts from result
result = task_value or {}
if isinstance(result, dict):
input_tokens = result.get("input_tokens", 0)
output_tokens = result.get("output_tokens", 0)
if input_tokens or output_tokens:
record_usage(pipeline, input_tokens, output_tokens)
# --- Pipeline tasks ---
@huey.task()
def pipeline_task(name: str, max_tokens: int = None):
"""Run a single pipeline and return token usage stats."""
spec = PIPELINES.get(name)
if not spec:
return {"error": f"Unknown pipeline: {name}", "input_tokens": 0, "output_tokens": 0}
script = spec["script"]
budget = max_tokens or spec["max_tokens"]
# Check budget before running
if HAS_BUDGET and not can_afford(budget):
return {
"error": f"Insufficient budget for {name} (need {budget}, have {get_remaining()})",
"input_tokens": 0,
"output_tokens": 0,
}
# Check dependencies
for dep in spec.get("dependencies", []):
dep_state = _get_pipeline_state(dep)
if dep_state not in ("running", "complete"):
return {
"error": f"Dependency {dep} not met (state: {dep_state})",
"input_tokens": 0,
"output_tokens": 0,
}
# Run the pipeline script
script_path = Path.home() / "timmy-config" / script
if not script_path.exists():
return {"error": f"Script not found: {script_path}", "input_tokens": 0, "output_tokens": 0}
_set_pipeline_state(name, "running")
log_path = HERMES_HOME / "logs" / f"pipeline-{name}.log"
log_path.parent.mkdir(parents=True, exist_ok=True)
try:
result = subprocess.run(
["bash", str(script_path), "--max-tokens", str(budget)],
capture_output=True,
text=True,
timeout=3600, # 1 hour max
)
# Parse token usage from stdout (if script reports it)
input_tokens = 0
output_tokens = 0
for line in result.stdout.splitlines():
if "input_tokens=" in line:
try:
input_tokens = int(line.split("input_tokens=")[1].split()[0])
except (ValueError, IndexError):
pass
if "output_tokens=" in line:
try:
output_tokens = int(line.split("output_tokens=")[1].split()[0])
except (ValueError, IndexError):
pass
# If script didn't report tokens, estimate from output
if not input_tokens and not output_tokens:
output_tokens = len(result.stdout) // 4 # rough estimate
# Log output
with open(log_path, "a") as f:
f.write(f"\n--- {datetime.now(timezone.utc).isoformat()} ---\n")
f.write(result.stdout)
if result.stderr:
f.write(f"\nSTDERR:\n{result.stderr}")
if result.returncode == 0:
_set_pipeline_state(name, "complete")
return {
"pipeline": name,
"status": "complete",
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
else:
_set_pipeline_state(name, "failed")
return {
"pipeline": name,
"status": "failed",
"error": result.stderr[:500],
"input_tokens": input_tokens,
"output_tokens": output_tokens,
}
except subprocess.TimeoutExpired:
_set_pipeline_state(name, "failed")
return {"pipeline": name, "status": "timeout", "input_tokens": 0, "output_tokens": 0}
except Exception as e:
_set_pipeline_state(name, "failed")
return {"pipeline": name, "status": "error", "error": str(e), "input_tokens": 0, "output_tokens": 0}
@huey.periodic_task(crontab(hour="*/6"))
def pipeline_scheduler():
"""Check pipeline state and start the next eligible pipeline."""
report_lines = ["=== Pipeline Scheduler ==="]
# Check budget
if HAS_BUDGET:
remaining = get_remaining()
report_lines.append(f"Budget remaining: {remaining:,} tokens")
if remaining <= 0:
report_lines.append("Budget exhausted. Skipping.")
return "\n".join(report_lines)
# Find next eligible pipeline
started = False
for name, spec in PIPELINES.items():
state = _get_pipeline_state(name)
if state in ("running", "complete"):
report_lines.append(f"SKIP {name}: {state}")
continue
# Check dependencies
deps_ok = True
for dep in spec.get("dependencies", []):
dep_state = _get_pipeline_state(dep)
if dep_state not in ("running", "complete"):
report_lines.append(f"SKIP {name}: dependency {dep} not met")
deps_ok = False
break
if not deps_ok:
continue
# Start pipeline
report_lines.append(f"START {name}")
pipeline_task(name)
started = True
break # One pipeline per scheduler tick
if not started:
report_lines.append("No pipelines to start")
return "\n".join(report_lines)
# --- State management ---
STATE_FILE = HERMES_HOME / "pipeline_state.json"
def _get_pipeline_state(name: str) -> str:
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text())
return data.get(name, {}).get("state", "not_started")
except (json.JSONDecodeError, OSError):
pass
return "not_started"
def _set_pipeline_state(name: str, state: str):
data = {}
if STATE_FILE.exists():
try:
data = json.loads(STATE_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
data[name] = {"state": state, "updated": datetime.now(timezone.utc).isoformat()}
STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
STATE_FILE.write_text(json.dumps(data, indent=2))
# --- CLI ---
if __name__ == "__main__":
if "--budget" in sys.argv or "--report" in sys.argv:
if HAS_BUDGET:
print(get_report())
else:
print("token_budget module not available")
elif "--status" in sys.argv:
for name in PIPELINES:
state = _get_pipeline_state(name)
print(f" {name}: {state}")
elif "--run" in sys.argv:
idx = sys.argv.index("--run")
name = sys.argv[idx + 1]
print(f"Enqueuing pipeline: {name}")
result = pipeline_task(name)
print(f"Task enqueued")
else:
print("Usage: orchestration.py [--budget|--status|--run PIPELINE]")
huey = SqliteHuey(filename=str(Path.home() / ".hermes" / "orchestration.db"))

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
normalize-code-blocks.py — Fix inconsistent indentation in training data code blocks.
When code blocks are embedded in JSONL as triple-quoted strings, indentation
accumulates from the surrounding context. This script normalizes code block
content using textwrap.dedent and consistent 4-space indentation.
Usage:
python3 scripts/normalize-code-blocks.py training/data/preference_pairs.jsonl
python3 scripts/normalize-code-blocks.py --dry-run training/data/*.jsonl
python3 scripts/normalize-code-blocks.py --check training/data/*.jsonl # CI mode
"""
import argparse
import json
import re
import sys
import textwrap
from pathlib import Path
# Matches ```python ... ``` or ``` ... ``` blocks inside string values
CODE_BLOCK_RE = re.compile(
r"(?P<open>```(?:python|py|bash|sh|javascript|js|typescript|ts|go|rust|ruby)?\s*\n)"
r"(?P<code>.*?)"
r"(?P<close>```)",
re.DOTALL,
)
def normalize_code_block(match: re.Match) -> str:
"""Normalize indentation in a single code block."""
open_tag = match.group("open")
code = match.group("code")
close_tag = match.group("close")
if not code.strip():
return match.group(0)
dedented = textwrap.dedent(code)
lines = dedented.split("\n")
while lines and not lines[0].strip():
lines.pop(0)
while lines and not lines[-1].strip():
lines.pop()
normalized = "\n".join(lines)
return f"{open_tag}{normalized}\n{close_tag}"
def process_line(line: str) -> tuple:
"""Process a single JSONL line. Returns (new_line, num_fixes)."""
try:
obj = json.loads(line)
except json.JSONDecodeError:
return line, 0
fixes = 0
def fix_strings(obj):
nonlocal fixes
if isinstance(obj, str):
original = obj
fixed = CODE_BLOCK_RE.sub(normalize_code_block, obj)
if fixed != original:
fixes += 1
return fixed
elif isinstance(obj, dict):
return {k: fix_strings(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [fix_strings(item) for item in obj]
return obj
fixed_obj = fix_strings(obj)
return json.dumps(fixed_obj, ensure_ascii=False) + "\n", fixes
def process_file(filepath: str, dry_run: bool = False) -> dict:
"""Process a single JSONL file. Returns stats dict."""
path = Path(filepath)
if not path.exists():
return {"file": str(filepath), "error": "not found", "fixes": 0, "lines": 0}
lines = path.read_text(encoding="utf-8").splitlines()
fixed_lines = []
total_fixes = 0
for line in lines:
if not line.strip():
fixed_lines.append(line)
continue
new_line, fixes = process_line(line)
fixed_lines.append(new_line.rstrip("\n"))
total_fixes += fixes
if total_fixes > 0 and not dry_run:
path.write_text("\n".join(fixed_lines) + "\n", encoding="utf-8")
return {
"file": str(filepath),
"lines": len(lines),
"fixes": total_fixes,
"changed": total_fixes > 0,
}
def main():
parser = argparse.ArgumentParser(
description="Normalize code block indentation in JSONL training data"
)
parser.add_argument("files", nargs="+", help="JSONL files to process")
parser.add_argument("--dry-run", action="store_true", help="Show changes without writing")
parser.add_argument("--check", action="store_true", help="CI mode: exit 1 if fixes needed")
args = parser.parse_args()
total_fixes = 0
results = []
for filepath in args.files:
result = process_file(filepath, dry_run=args.dry_run or args.check)
results.append(result)
total_fixes += result["fixes"]
if result["fixes"] > 0:
status = "FIXED" if not args.dry_run and not args.check else "WOULD FIX"
print(f" {status}: {result['file']}{result['fixes']} code blocks normalized")
else:
print(f" OK: {result['file']}")
print(f"\nTotal: {total_fixes} code blocks normalized across {len(results)} files")
if args.check and total_fixes > 0:
print("FAIL: Code block indentation issues found. Run without --check to fix.")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,142 +0,0 @@
#!/usr/bin/env python3
"""
token_budget.py — Daily token budget tracker for pipeline orchestration.
Tracks token usage per pipeline per day, enforces daily limits,
and provides a query interface for the orchestrator.
Data: ~/.hermes/pipeline_budget.json
"""
import json
import os
from datetime import datetime, timezone
from pathlib import Path
BUDGET_FILE = Path.home() / ".hermes" / "pipeline_budget.json"
DEFAULT_DAILY_LIMIT = 500_000
def _load() -> dict:
if BUDGET_FILE.exists():
try:
return json.loads(BUDGET_FILE.read_text())
except (json.JSONDecodeError, OSError):
pass
return {}
def _save(data: dict):
BUDGET_FILE.parent.mkdir(parents=True, exist_ok=True)
BUDGET_FILE.write_text(json.dumps(data, indent=2))
def today_key() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")
def get_daily_usage(pipeline: str = None) -> dict:
"""Get token usage for today. If pipeline specified, return just that pipeline."""
data = _load()
day = data.get("daily", {}).get(today_key(), {"tokens_used": 0, "pipelines": {}})
if pipeline:
return {
"pipeline": pipeline,
"tokens_used": day.get("pipelines", {}).get(pipeline, 0),
"daily_total": day.get("tokens_used", 0),
}
return day
def get_remaining(limit: int = DEFAULT_DAILY_LIMIT) -> int:
"""Get remaining token budget for today."""
usage = get_daily_usage()
return max(0, limit - usage.get("tokens_used", 0))
def can_afford(tokens: int, limit: int = DEFAULT_DAILY_LIMIT) -> bool:
"""Check if we have budget for a token spend."""
return get_remaining(limit) >= tokens
def record_usage(pipeline: str, input_tokens: int, output_tokens: int) -> dict:
"""
Record token usage for a pipeline task.
Called automatically by the orchestrator after each pipeline task completes.
Returns the updated daily state.
"""
total = input_tokens + output_tokens
data = _load()
today = today_key()
daily = data.setdefault("daily", {})
day = daily.setdefault(today, {"tokens_used": 0, "pipelines": {}})
day["tokens_used"] = day.get("tokens_used", 0) + total
pipes = day.setdefault("pipelines", {})
pipes[pipeline] = pipes.get(pipeline, 0) + total
# Track breakdown
breakdown = day.setdefault("breakdown", {})
pb = breakdown.setdefault(pipeline, {"input": 0, "output": 0, "calls": 0})
pb["input"] += input_tokens
pb["output"] += output_tokens
pb["calls"] += 1
# Track lifetime stats
lifetime = data.setdefault("lifetime", {"total_tokens": 0, "total_days": 0})
lifetime["total_tokens"] = lifetime.get("total_tokens", 0) + total
_save(data)
return {
"pipeline": pipeline,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total": total,
"daily_used": day["tokens_used"],
"daily_remaining": get_remaining(),
}
def get_report() -> str:
"""Generate a human-readable budget report."""
data = _load()
today = today_key()
day = data.get("daily", {}).get(today, {"tokens_used": 0, "pipelines": {}})
lines = []
lines.append(f"Token Budget — {today}")
lines.append(f" Daily usage: {day.get('tokens_used', 0):,} / {DEFAULT_DAILY_LIMIT:,}")
lines.append(f" Remaining: {get_remaining():,}")
lines.append("")
lines.append(" Pipelines:")
breakdown = day.get("breakdown", {})
for name, stats in sorted(breakdown.items(), key=lambda x: -x[1]["output"]):
total = stats["input"] + stats["output"]
lines.append(f" {name}: {total:,} tokens ({stats['calls']} calls)")
if not breakdown:
lines.append(" (no pipelines run today)")
lifetime = data.get("lifetime", {})
lines.append("")
lines.append(f" Lifetime: {lifetime.get('total_tokens', 0):,} total tokens")
return "\n".join(lines)
if __name__ == "__main__":
import sys
if "--report" in sys.argv:
print(get_report())
elif "--remaining" in sys.argv:
print(get_remaining())
elif "--can-afford" in sys.argv:
idx = sys.argv.index("--can-afford")
tokens = int(sys.argv[idx + 1])
print("yes" if can_afford(tokens) else "no")
else:
print(get_report())

View File

@@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""Tests for normalize-code-blocks.py — issue #750"""
import json
import sys
import tempfile
from pathlib import Path
import pytest
# Import from scripts/
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
from normalize_code_blocks import normalize_code_block, process_line, process_file, CODE_BLOCK_RE
class TestCodeBlockRegex:
def test_matches_python_block(self):
text = "```python\nprint('hi')\n```"
assert CODE_BLOCK_RE.search(text)
def test_matches_plain_block(self):
text = "```\nsome code\n```"
assert CODE_BLOCK_RE.search(text)
def test_matches_bash_block(self):
text = "```bash\necho hello\n```"
assert CODE_BLOCK_RE.search(text)
def test_ignores_inline_backticks(self):
text = "Use `code` inline"
assert not CODE_BLOCK_RE.search(text)
def test_handles_multiline_code(self):
text = "```python\ndef foo():\n return 1\n\ndef bar():\n return 2\n```"
match = CODE_BLOCK_RE.search(text)
assert match
assert "def foo" in match.group("code")
class TestNormalizeCodeBlock:
def test_strips_leading_indent(self):
match = CODE_BLOCK_RE.search("```python\n print('hi')\n```")
result = normalize_code_block(match)
assert " print" not in result
assert "print('hi')" in result
def test_dedents_mixed_indent(self):
code = "```python\n def foo():\n return 1\n def bar():\n return 2\n```"
match = CODE_BLOCK_RE.search(code)
result = normalize_code_block(match)
lines = result.split("\n")
# First non-tag line should have 0 indent
code_lines = [l for l in lines if l.strip() and not l.startswith("```")]
assert code_lines[0].startswith("def foo")
def test_strips_trailing_blank_lines(self):
match = CODE_BLOCK_RE.search("```python\nprint('hi')\n\n\n```")
result = normalize_code_block(match)
assert result.endswith("print('hi')\n```")
def test_preserves_language_tag(self):
match = CODE_BLOCK_RE.search("```python\n x = 1\n```")
result = normalize_code_block(match)
assert result.startswith("```python")
def test_empty_block_unchanged(self):
match = CODE_BLOCK_RE.search("```python\n \n```")
original = match.group(0)
result = normalize_code_block(match)
assert result == original
def test_diff_markers_preserved(self):
code = "```\n+def new_func():\n+ return 1\n-def old_func():\n- return 0\n```"
match = CODE_BLOCK_RE.search(code)
result = normalize_code_block(match)
assert "+def new_func" in result
assert "-def old_func" in result
class TestProcessLine:
def test_valid_json_no_code_blocks(self):
line = json.dumps({"prompt": "hello world"})
new_line, fixes = process_line(line)
assert fixes == 0
def test_valid_json_with_code_block(self):
obj = {"prompt": "Here is code:\n```python\n x = 1\n```"}
line = json.dumps(obj)
new_line, fixes = process_line(line)
assert fixes == 1
parsed = json.loads(new_line)
assert " x = 1" not in parsed["prompt"]
def test_nested_dict_code_blocks(self):
obj = {
"prompt": "code: ```python\n a = 1\n```",
"chosen": "```python\n b = 2\n```",
}
line = json.dumps(obj)
new_line, fixes = process_line(line)
assert fixes == 2
def test_invalid_json_returned_unchanged(self):
line = "{broken json"
new_line, fixes = process_line(line)
assert new_line == line
assert fixes == 0
def test_list_field_code_blocks(self):
obj = {"items": ["```python\n x = 1\n```", "no code here"]}
line = json.dumps(obj)
new_line, fixes = process_line(line)
assert fixes == 1
class TestProcessFile:
def test_fixes_file_in_place(self, tmp_path):
f = tmp_path / "test.jsonl"
lines = [
json.dumps({"prompt": "```python\n x = 1\n```"}),
json.dumps({"prompt": "no code"}),
]
f.write_text("\n".join(lines) + "\n")
result = process_file(str(f))
assert result["fixes"] == 1
assert result["lines"] == 2
# Verify file was actually modified
content = f.read_text()
assert " x = 1" not in content
def test_dry_run_no_write(self, tmp_path):
f = tmp_path / "test.jsonl"
original = json.dumps({"prompt": "```python\n x = 1\n```"})
f.write_text(original + "\n")
result = process_file(str(f), dry_run=True)
assert result["fixes"] == 1
# File unchanged
assert f.read_text().strip() == original
def test_missing_file(self, tmp_path):
result = process_file(str(tmp_path / "nope.jsonl"))
assert "error" in result
def test_clean_file_no_fixes(self, tmp_path):
f = tmp_path / "clean.jsonl"
f.write_text(json.dumps({"prompt": "no code blocks here"}) + "\n")
result = process_file(str(f))
assert result["fixes"] == 0