Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
4bb12e05ef bench: Gemma 4 tool calling benchmark — 100 prompts (#796)
Some checks are pending
Contributor Attribution Check / check-attribution (pull_request) Waiting to run
Docker Build and Publish / build-and-push (pull_request) Waiting to run
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
Tests / e2e (pull_request) Waiting to run
Benchmark script comparing Gemma 4 vs mimo-v2-pro on tool calling.

100 prompts across 6 categories:
- File operations (20): read, write, search
- Terminal commands (20): system info, process management
- Web search (15): documentation, comparisons
- Code execution (15): calculations, parsing
- Parallel tool calls (10): concurrent operations
- Edge cases (20): complex, ambiguous prompts

Metrics:
- Schema parse success rate
- Tool execution success rate
- Argument validity rate
- Average latency
- Token cost

Usage:
  python3 benchmarks/tool_call_benchmark.py --model1 gemma3:27b --model2 xiaomi/mimo-v2-pro
  python3 benchmarks/tool_call_benchmark.py --model1 gemma3:27b --limit 10

Closes #796
2026-04-16 01:05:29 -04:00
3 changed files with 461 additions and 447 deletions

461
benchmarks/tool_call_benchmark.py Executable file
View File

@@ -0,0 +1,461 @@
#!/usr/bin/env python3
"""
tool_call_benchmark.py — Benchmark Gemma 4 tool calling vs mimo-v2-pro.
Runs 100 diverse tool calling prompts through each model and compares:
- Schema parse success rate
- Tool execution success rate
- Parallel tool call success rate
- Average latency
- Token cost per call
Usage:
python3 benchmarks/tool_call_benchmark.py --model1 gemma3:27b --model2 xiaomi/mimo-v2-pro
python3 benchmarks/tool_call_benchmark.py --model1 gemma3:27b --limit 10 # quick test
python3 benchmarks/tool_call_benchmark.py --output benchmarks/results.json
Requires:
- Ollama running locally (or --endpoint for remote)
- Models pulled: ollama pull gemma3:27b, etc.
"""
import json
import os
import sys
import time
import urllib.request
import urllib.error
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional
ENDPOINT = os.environ.get("OPENAI_BASE_URL", "http://localhost:11434/v1")
API_KEY = os.environ.get("OPENAI_API_KEY", "ollama")
# ── Tool schemas (subset for benchmarking) ──────────────────────────────
TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a text file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"offset": {"type": "integer", "description": "Start line"},
"limit": {"type": "integer", "description": "Max lines"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "terminal",
"description": "Execute a shell command",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command"}
},
"required": ["command"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write content to a file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "search_files",
"description": "Search for content in files",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string"},
"path": {"type": "string"}
},
"required": ["pattern"]
}
}
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Execute Python code",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string"}
},
"required": ["code"]
}
}
},
]
SYSTEM_PROMPT = "You are a helpful assistant with access to tools. Use tools when needed."
# ── Test prompts (100 diverse tool calling scenarios) ────────────────────
TEST_PROMPTS = [
# File operations (20)
("Read the README.md file", "read_file", "file_ops"),
("Show me the contents of config.yaml", "read_file", "file_ops"),
("Read lines 10-20 of main.py", "read_file", "file_ops"),
("Open the package.json", "read_file", "file_ops"),
("Read the .gitignore file", "read_file", "file_ops"),
("Save this to notes.txt: meeting at 3pm", "write_file", "file_ops"),
("Create a new file hello.py with print hello", "write_file", "file_ops"),
("Write the config to settings.json", "write_file", "file_ops"),
("Save the output to results.txt", "write_file", "file_ops"),
("Create TODO.md with my tasks", "write_file", "file_ops"),
("Search for 'import os' in the codebase", "search_files", "file_ops"),
("Find all Python files mentioning 'error'", "search_files", "file_ops"),
("Search for TODO comments", "search_files", "file_ops"),
("Find where 'authenticate' is defined", "search_files", "file_ops"),
("Look for any hardcoded API keys", "search_files", "file_ops"),
("Read the Makefile", "read_file", "file_ops"),
("Show me the Dockerfile", "read_file", "file_ops"),
("Read the docker-compose.yml", "read_file", "file_ops"),
("Save the function to utils.py", "write_file", "file_ops"),
("Create a backup of config.yaml", "write_file", "file_ops"),
# Terminal commands (20)
("List all files in the current directory", "terminal", "terminal"),
("Show disk usage", "terminal", "terminal"),
("Check what processes are running", "terminal", "terminal"),
("Show the git log", "terminal", "terminal"),
("Check the Python version", "terminal", "terminal"),
("Run ls -la in the home directory", "terminal", "terminal"),
("Show the current date and time", "terminal", "terminal"),
("Check network connectivity with ping", "terminal", "terminal"),
("Show environment variables", "terminal", "terminal"),
("List running docker containers", "terminal", "terminal"),
("Check system memory usage", "terminal", "terminal"),
("Show the crontab", "terminal", "terminal"),
("Check the firewall status", "terminal", "terminal"),
("Show recent log entries", "terminal", "terminal"),
("Check disk free space", "terminal", "terminal"),
("Run a system update check", "terminal", "terminal"),
("Show open network connections", "terminal", "terminal"),
("Check the timezone", "terminal", "terminal"),
("List tmux sessions", "terminal", "terminal"),
("Check systemd service status", "terminal", "terminal"),
# Web search (15)
("Search for Python asyncio documentation", "web_search", "web"),
("Look up the latest GPT-4 pricing", "web_search", "web"),
("Find information about Gemma 4 benchmarks", "web_search", "web"),
("Search for Rust vs Go performance comparison", "web_search", "web"),
("Look up Docker best practices", "web_search", "web"),
("Search for Kubernetes deployment tutorials", "web_search", "web"),
("Find the latest AI safety research papers", "web_search", "web"),
("Search for SQLite vs PostgreSQL comparison", "web_search", "web"),
("Look up Linux kernel tuning parameters", "web_search", "web"),
("Search for WebSocket protocol specification", "web_search", "web"),
("Find information about Matrix protocol federation", "web_search", "web"),
("Search for MCP protocol documentation", "web_search", "web"),
("Look up A2A agent protocol spec", "web_search", "web"),
("Search for quantization methods for LLMs", "web_search", "web"),
("Find information about GRPO training", "web_search", "web"),
# Code execution (15)
("Calculate the factorial of 20", "execute_code", "code"),
("Parse this JSON and extract keys", "execute_code", "code"),
("Sort a list of numbers", "execute_code", "code"),
("Calculate the fibonacci sequence", "execute_code", "code"),
("Convert a CSV to JSON", "execute_code", "code"),
("Parse an email address", "execute_code", "code"),
("Calculate elapsed time between dates", "execute_code", "code"),
("Generate a random password", "execute_code", "code"),
("Hash a string with SHA256", "execute_code", "code"),
("Parse a URL into components", "execute_code", "code"),
("Calculate statistics on a dataset", "execute_code", "code"),
("Convert epoch timestamp to human readable", "execute_code", "code"),
("Validate an IPv4 address", "execute_code", "code"),
("Calculate the distance between coordinates", "execute_code", "code"),
("Generate a UUID", "execute_code", "code"),
# Parallel tool calls (10)
("Read config.yaml and show git status at the same time", "read_file|terminal", "parallel"),
("Check disk usage and memory usage simultaneously", "terminal|terminal", "parallel"),
("Read two files at once: README and CHANGELOG", "read_file|read_file", "parallel"),
("Search for imports in both Python and JS files", "search_files|search_files", "parallel"),
("Check git log and disk space in parallel", "terminal|terminal", "parallel"),
("Read the Makefile and Dockerfile together", "read_file|read_file", "parallel"),
("Search for TODO and FIXME at the same time", "search_files|search_files", "parallel"),
("List files and check Python version simultaneously", "terminal|terminal", "parallel"),
("Read package.json and requirements.txt together", "read_file|read_file", "parallel"),
("Check system time and uptime in parallel", "terminal|terminal", "parallel"),
]
@dataclass
class BenchmarkResult:
model: str
prompt: str
expected_tool: str
category: str
success: bool = False
tool_called: str = ""
args_valid: bool = False
latency_ms: float = 0.0
prompt_tokens: int = 0
completion_tokens: int = 0
error: str = ""
def call_model(model: str, prompt: str) -> dict:
"""Call a model with tool schemas and return the response."""
url = f"{ENDPOINT}/chat/completions"
data = {
"model": model,
"messages": [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": prompt},
],
"tools": TOOL_SCHEMAS,
"max_tokens": 512,
"temperature": 0.0,
}
body = json.dumps(data).encode()
req = urllib.request.Request(url, data=body, headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}",
}, method="POST")
start = time.time()
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read())
elapsed = time.time() - start
return {"response": result, "elapsed": elapsed, "error": None}
except Exception as e:
elapsed = time.time() - start
return {"response": None, "elapsed": elapsed, "error": str(e)}
def evaluate_response(result: dict, expected_tool: str) -> BenchmarkResult:
"""Evaluate a model response against expectations."""
resp = result.get("response")
error = result.get("error", "")
elapsed = result.get("elapsed", 0)
br = BenchmarkResult(
model="",
prompt="",
expected_tool=expected_tool,
category="",
latency_ms=round(elapsed * 1000, 1),
error=error or "",
)
if not resp:
br.success = False
return br
usage = resp.get("usage", {})
br.prompt_tokens = usage.get("prompt_tokens", 0)
br.completion_tokens = usage.get("completion_tokens", 0)
choice = resp.get("choices", [{}])[0]
message = choice.get("message", {})
tool_calls = message.get("tool_calls", [])
if not tool_calls:
br.success = False
br.error = "no_tool_calls"
return br
# Check first tool call
tc = tool_calls[0]
fn = tc.get("function", {})
br.tool_called = fn.get("name", "")
# Parse args
args_str = fn.get("arguments", "{}")
try:
json.loads(args_str)
br.args_valid = True
except json.JSONDecodeError:
# Try normalization
try:
import re
fixed = re.sub(r',\s*([}\]])', r'\1', args_str.strip())
json.loads(fixed)
br.args_valid = True
except:
br.args_valid = False
# Success = tool called matches expected (or contains it for parallel)
expected = expected_tool.split("|")[0] # primary expected tool
br.success = br.tool_called == expected and br.args_valid
return br
def run_benchmark(model: str, prompts: list, limit: int = None) -> List[BenchmarkResult]:
"""Run benchmark against a model."""
if limit:
prompts = prompts[:limit]
results = []
for i, (prompt, expected_tool, category) in enumerate(prompts):
print(f" [{i+1}/{len(prompts)}] {model}: {prompt[:50]}...", end=" ", flush=True)
raw = call_model(model, prompt)
br = evaluate_response(raw, expected_tool)
br.model = model
br.prompt = prompt
br.category = category
status = "OK" if br.success else f"FAIL({br.error or br.tool_called})"
print(f"{status} {br.latency_ms}ms")
results.append(br)
return results
def generate_report(results: List[BenchmarkResult]) -> str:
"""Generate markdown benchmark report."""
by_model = {}
for r in results:
if r.model not in by_model:
by_model[r.model] = []
by_model[r.model].append(r)
lines = [
"# Gemma 4 Tool Calling Benchmark",
f"",
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"**Prompts:** {len(results) // len(by_model)} per model",
f"",
]
# Summary table
lines.append("| Metric | " + " | ".join(by_model.keys()) + " |")
lines.append("|--------|" + "|".join(["--------"] * len(by_model)) + "|")
metrics = ["schema_parse", "tool_execution", "avg_latency_ms", "total_prompt_tokens"]
for metric in ["success_rate", "args_valid_rate", "avg_latency_ms", "total_prompt_tokens"]:
vals = []
for model, rs in by_model.items():
if metric == "success_rate":
v = sum(1 for r in rs if r.success) / len(rs) * 100
vals.append(f"{v:.1f}%")
elif metric == "args_valid_rate":
v = sum(1 for r in rs if r.args_valid) / len(rs) * 100
vals.append(f"{v:.1f}%")
elif metric == "avg_latency_ms":
v = sum(r.latency_ms for r in rs) / len(rs)
vals.append(f"{v:.0f}ms")
elif metric == "total_prompt_tokens":
v = sum(r.prompt_tokens for r in rs)
vals.append(f"{v:,}")
label = metric.replace("_", " ").title()
lines.append(f"| {label} | " + " | ".join(vals) + " |")
lines.append("")
# By category
lines.append("## By Category")
lines.append("")
lines.append("| Category | " + " | ".join(f"{m} success" for m in by_model.keys()) + " |")
lines.append("|----------|" + "|".join(["--------"] * len(by_model)) + "|")
categories = sorted(set(r.category for r in results))
for cat in categories:
vals = []
for model, rs in by_model.items():
cat_results = [r for r in rs if r.category == cat]
if cat_results:
v = sum(1 for r in cat_results if r.success) / len(cat_results) * 100
vals.append(f"{v:.0f}%")
else:
vals.append("N/A")
lines.append(f"| {cat} | " + " | ".join(vals) + " |")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Tool calling benchmark")
parser.add_argument("--model1", default="gemma3:27b")
parser.add_argument("--model2", default="xiaomi/mimo-v2-pro")
parser.add_argument("--endpoint", default=ENDPOINT)
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--output", default=None)
parser.add_argument("--markdown", action="store_true")
args = parser.parse_args()
global ENDPOINT
ENDPOINT = args.endpoint
prompts = TEST_PROMPTS
if args.limit:
prompts = prompts[:args.limit]
print(f"Benchmark: {args.model1} vs {args.model2}")
print(f"Prompts: {len(prompts)}")
print()
print(f"--- {args.model1} ---")
results1 = run_benchmark(args.model1, prompts)
print(f"\n--- {args.model2} ---")
results2 = run_benchmark(args.model2, prompts)
all_results = results1 + results2
report = generate_report(all_results)
print(f"\n{report}")
if args.output:
with open(args.output, "w") as f:
json.dump([r.__dict__ for r in all_results], f, indent=2, default=str)
print(f"\nResults saved to {args.output}")
# Save markdown report
report_path = f"benchmarks/gemma4-tool-calling-{datetime.now().strftime('%Y-%m-%d')}.md"
Path("benchmarks").mkdir(exist_ok=True)
with open(report_path, "w") as f:
f.write(report)
print(f"Report saved to {report_path}")
if __name__ == "__main__":
main()

View File

@@ -1,137 +0,0 @@
"""Tests for Ultraplan Mode — Issue #840."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.ultraplan import (
Phase, Stream, Ultraplan,
create_ultraplan, save_ultraplan, load_ultraplan,
generate_daily_cron_prompt
)
class TestPhase:
def test_creation(self):
phase = Phase(id="A1", name="Setup", artifact="config.yaml")
assert phase.id == "A1"
assert phase.status == "pending"
def test_dependencies(self):
phase = Phase(id="A2", name="Build", dependencies=["A1"])
assert "A1" in phase.dependencies
class TestStream:
def test_progress_empty(self):
stream = Stream(id="A", name="Stream A")
assert stream.progress == 0.0
def test_progress_partial(self):
stream = Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="P1", status="done"),
Phase(id="A2", name="P2", status="pending"),
])
assert stream.progress == 0.5
def test_current_phase(self):
stream = Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="P1", status="done"),
Phase(id="A2", name="P2", status="active"),
Phase(id="A3", name="P3", status="pending"),
])
assert stream.current_phase.id == "A2"
class TestUltraplan:
def test_to_markdown(self):
plan = Ultraplan(
date="20260415",
mission="Test mission",
streams=[
Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="Phase 1", artifact="file.txt"),
]),
],
)
md = plan.to_markdown()
assert "# Ultraplan: 20260415" in md
assert "Test mission" in md
assert "Stream A" in md
def test_progress(self):
plan = Ultraplan(
date="20260415",
mission="Test",
streams=[
Stream(id="A", name="A", status="done", phases=[
Phase(id="A1", name="P1", status="done"),
]),
Stream(id="B", name="B", status="pending", phases=[
Phase(id="B1", name="P1", status="pending"),
]),
],
)
assert plan.progress == 0.5
def test_to_dict(self):
plan = Ultraplan(date="20260415", mission="Test")
d = plan.to_dict()
assert d["date"] == "20260415"
assert d["mission"] == "Test"
class TestCreateUltraplan:
def test_default_date(self):
plan = create_ultraplan(mission="Test")
assert len(plan.date) == 8 # YYYYMMDD
def test_with_streams(self):
plan = create_ultraplan(
mission="Test",
streams=[
{
"id": "A",
"name": "Stream A",
"phases": [
{"id": "A1", "name": "Setup", "artifact": "config.yaml"},
{"id": "A2", "name": "Build", "dependencies": ["A1"]},
],
},
],
)
assert len(plan.streams) == 1
assert len(plan.streams[0].phases) == 2
assert plan.streams[0].phases[1].dependencies == ["A1"]
class TestSaveLoad:
def test_roundtrip(self, tmp_path):
plan = create_ultraplan(
date="20260415",
mission="Test roundtrip",
streams=[{"id": "A", "name": "Stream A"}],
)
save_ultraplan(plan, base_dir=tmp_path)
loaded = load_ultraplan("20260415", base_dir=tmp_path)
assert loaded is not None
assert loaded.date == "20260415"
assert loaded.mission == "Test roundtrip"
def test_nonexistent_returns_none(self, tmp_path):
assert load_ultraplan("99999999", base_dir=tmp_path) is None
class TestCronPrompt:
def test_has_required_elements(self):
prompt = generate_daily_cron_prompt()
assert "Ultraplan" in prompt
assert "streams" in prompt.lower()
assert "Gitea" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

View File

@@ -1,310 +0,0 @@
"""Ultraplan Mode — Daily autonomous planning and execution discipline.
Decomposes assigned tasks into parallel work streams with explicit
dependencies, phases, and artifact targets.
Issue #840: Ultraplan Mode: Daily autonomous planning and execution
"""
import json
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class Phase:
"""A single phase within a work stream."""
id: str
name: str
description: str = ""
status: str = "pending" # pending, active, done, blocked
artifact: str = "" # Expected deliverable
dependencies: List[str] = field(default_factory=list)
started_at: Optional[float] = None
completed_at: Optional[float] = None
@dataclass
class Stream:
"""A parallel work stream with sequential phases."""
id: str
name: str
phases: List[Phase] = field(default_factory=list)
status: str = "pending"
@property
def current_phase(self) -> Optional[Phase]:
for p in self.phases:
if p.status in ("active", "pending"):
return p
return None
@property
def progress(self) -> float:
if not self.phases:
return 0.0
done = sum(1 for p in self.phases if p.status == "done")
return done / len(self.phases)
@dataclass
class Ultraplan:
"""Daily ultraplan with work streams and metrics."""
date: str
mission: str
streams: List[Stream] = field(default_factory=list)
metrics: Dict[str, Any] = field(default_factory=dict)
notes: str = ""
created_at: float = field(default_factory=time.time)
@property
def progress(self) -> float:
if not self.streams:
return 0.0
return sum(s.progress for s in self.streams) / len(self.streams)
@property
def active_streams(self) -> List[Stream]:
return [s for s in self.streams if s.status == "active"]
@property
def blocked_streams(self) -> List[Stream]:
return [s for s in self.streams if s.status == "blocked"]
def to_markdown(self) -> str:
"""Generate ultraplan markdown document."""
lines = []
# Header
lines.append(f"# Ultraplan: {self.date}")
lines.append("")
lines.append(f"**Mission:** {self.mission}")
lines.append(f"**Created:** {datetime.fromtimestamp(self.created_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
lines.append(f"**Progress:** {self.progress:.0%}")
lines.append("")
# Metrics
if self.metrics:
lines.append("## Metrics")
for key, value in self.metrics.items():
lines.append(f"- **{key}:** {value}")
lines.append("")
# Streams
lines.append("## Work Streams")
lines.append("")
for stream in self.streams:
status_icon = {"pending": "", "active": "", "done": "", "blocked": ""}.get(stream.status, "?")
lines.append(f"### {status_icon} Stream {stream.id}: {stream.name}")
lines.append(f"**Status:** {stream.status} | **Progress:** {stream.progress:.0%}")
lines.append("")
# Phase table
lines.append("| Phase | Name | Status | Artifact |")
lines.append("|-------|------|--------|----------|")
for phase in stream.phases:
p_icon = {"pending": "", "active": "", "done": "", "blocked": ""}.get(phase.status, "?")
artifact = phase.artifact or ""
lines.append(f"| {phase.id} | {phase.name} | {p_icon} {phase.status} | {artifact} |")
lines.append("")
# Dependency map
lines.append("## Dependency Map")
lines.append("")
for stream in self.streams:
deps = []
for phase in stream.phases:
if phase.dependencies:
deps.append(f"{phase.id} depends on: {', '.join(phase.dependencies)}")
if deps:
lines.append(f"**{stream.id}:** {'; '.join(deps)}")
if not any(p.dependencies for s in self.streams for p in s.phases):
lines.append("All streams are independent — parallel execution possible.")
lines.append("")
# Notes
if self.notes:
lines.append("## Notes")
lines.append(self.notes)
lines.append("")
# Footer
lines.append("---")
lines.append(f"*Generated by Ultraplan Mode — {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON-serializable dict."""
return {
"date": self.date,
"mission": self.mission,
"streams": [
{
"id": s.id,
"name": s.name,
"status": s.status,
"phases": [
{
"id": p.id,
"name": p.name,
"description": p.description,
"status": p.status,
"artifact": p.artifact,
"dependencies": p.dependencies,
}
for p in s.phases
],
}
for s in self.streams
],
"metrics": self.metrics,
"notes": self.notes,
"progress": self.progress,
"created_at": self.created_at,
}
def create_ultraplan(
date: str = None,
mission: str = "",
streams: List[Dict[str, Any]] = None,
) -> Ultraplan:
"""Create a new ultraplan.
Args:
date: Plan date (default: today)
mission: High-level mission statement
streams: List of stream definitions
"""
if date is None:
date = datetime.now().strftime("%Y%m%d")
plan_streams = []
if streams:
for s in streams:
phases = [
Phase(
id=p.get("id", f"{s.get('id', 'S')}{i+1}"),
name=p.get("name", f"Phase {i+1}"),
description=p.get("description", ""),
artifact=p.get("artifact", ""),
dependencies=p.get("dependencies", []),
)
for i, p in enumerate(s.get("phases", []))
]
plan_streams.append(Stream(
id=s.get("id", f"S{len(plan_streams)+1}"),
name=s.get("name", "Unnamed Stream"),
phases=phases,
))
return Ultraplan(
date=date,
mission=mission,
streams=plan_streams,
)
def save_ultraplan(plan: Ultraplan, base_dir: Path = None) -> Path:
"""Save ultraplan to disk.
Args:
plan: The ultraplan to save
base_dir: Base directory (default: ~/.timmy/cron/)
Returns:
Path to saved file
"""
if base_dir is None:
base_dir = Path.home() / ".timmy" / "cron"
base_dir.mkdir(parents=True, exist_ok=True)
# Save markdown
md_path = base_dir / f"ultraplan_{plan.date}.md"
md_path.write_text(plan.to_markdown(), encoding="utf-8")
# Save JSON (for programmatic access)
json_path = base_dir / f"ultraplan_{plan.date}.json"
json_path.write_text(json.dumps(plan.to_dict(), indent=2), encoding="utf-8")
return md_path
def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
"""Load ultraplan from disk.
Args:
date: Plan date (YYYYMMDD)
base_dir: Base directory (default: ~/.timmy/cron/)
Returns:
Ultraplan if found, None otherwise
"""
if base_dir is None:
base_dir = Path.home() / ".timmy" / "cron"
json_path = base_dir / f"ultraplan_{date}.json"
if not json_path.exists():
return None
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
streams = []
for s in data.get("streams", []):
phases = [
Phase(
id=p["id"],
name=p["name"],
description=p.get("description", ""),
status=p.get("status", "pending"),
artifact=p.get("artifact", ""),
dependencies=p.get("dependencies", []),
)
for p in s.get("phases", [])
]
streams.append(Stream(
id=s["id"],
name=s["name"],
phases=phases,
status=s.get("status", "pending"),
))
return Ultraplan(
date=data["date"],
mission=data.get("mission", ""),
streams=streams,
metrics=data.get("metrics", {}),
notes=data.get("notes", ""),
created_at=data.get("created_at", time.time()),
)
except Exception:
return None
def generate_daily_cron_prompt() -> str:
"""Generate the prompt for the daily ultraplan cron job."""
return """Generate today's Ultraplan.
Steps:
1. Check open Gitea issues assigned to you
2. Check open PRs needing review
3. Check fleet health status
4. Decompose work into parallel streams
5. Generate ultraplan_YYYYMMDD.md
6. File Gitea issue with the plan
Output format:
- Mission statement
- 3-5 work streams with phases
- Dependency map
- Success metrics
"""