Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4bb12e05ef |
461
benchmarks/tool_call_benchmark.py
Executable file
461
benchmarks/tool_call_benchmark.py
Executable file
@@ -0,0 +1,461 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tool_call_benchmark.py — Benchmark Gemma 4 tool calling vs mimo-v2-pro.
|
||||
|
||||
Runs 100 diverse tool calling prompts through each model and compares:
|
||||
- Schema parse success rate
|
||||
- Tool execution success rate
|
||||
- Parallel tool call success rate
|
||||
- Average latency
|
||||
- Token cost per call
|
||||
|
||||
Usage:
|
||||
python3 benchmarks/tool_call_benchmark.py --model1 gemma3:27b --model2 xiaomi/mimo-v2-pro
|
||||
python3 benchmarks/tool_call_benchmark.py --model1 gemma3:27b --limit 10 # quick test
|
||||
python3 benchmarks/tool_call_benchmark.py --output benchmarks/results.json
|
||||
|
||||
Requires:
|
||||
- Ollama running locally (or --endpoint for remote)
|
||||
- Models pulled: ollama pull gemma3:27b, etc.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from typing import List, Dict, Optional
|
||||
|
||||
ENDPOINT = os.environ.get("OPENAI_BASE_URL", "http://localhost:11434/v1")
|
||||
API_KEY = os.environ.get("OPENAI_API_KEY", "ollama")
|
||||
|
||||
# ── Tool schemas (subset for benchmarking) ──────────────────────────────
|
||||
|
||||
TOOL_SCHEMAS = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a text file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "File path"},
|
||||
"offset": {"type": "integer", "description": "Start line"},
|
||||
"limit": {"type": "integer", "description": "Max lines"}
|
||||
},
|
||||
"required": ["path"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "terminal",
|
||||
"description": "Execute a shell command",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"command": {"type": "string", "description": "Shell command"}
|
||||
},
|
||||
"required": ["command"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "write_file",
|
||||
"description": "Write content to a file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string"},
|
||||
"content": {"type": "string"}
|
||||
},
|
||||
"required": ["path", "content"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "search_files",
|
||||
"description": "Search for content in files",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"pattern": {"type": "string"},
|
||||
"path": {"type": "string"}
|
||||
},
|
||||
"required": ["pattern"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string"}
|
||||
},
|
||||
"required": ["query"]
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "execute_code",
|
||||
"description": "Execute Python code",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"code": {"type": "string"}
|
||||
},
|
||||
"required": ["code"]
|
||||
}
|
||||
}
|
||||
},
|
||||
]
|
||||
|
||||
SYSTEM_PROMPT = "You are a helpful assistant with access to tools. Use tools when needed."
|
||||
|
||||
# ── Test prompts (100 diverse tool calling scenarios) ────────────────────
|
||||
|
||||
TEST_PROMPTS = [
|
||||
# File operations (20)
|
||||
("Read the README.md file", "read_file", "file_ops"),
|
||||
("Show me the contents of config.yaml", "read_file", "file_ops"),
|
||||
("Read lines 10-20 of main.py", "read_file", "file_ops"),
|
||||
("Open the package.json", "read_file", "file_ops"),
|
||||
("Read the .gitignore file", "read_file", "file_ops"),
|
||||
("Save this to notes.txt: meeting at 3pm", "write_file", "file_ops"),
|
||||
("Create a new file hello.py with print hello", "write_file", "file_ops"),
|
||||
("Write the config to settings.json", "write_file", "file_ops"),
|
||||
("Save the output to results.txt", "write_file", "file_ops"),
|
||||
("Create TODO.md with my tasks", "write_file", "file_ops"),
|
||||
("Search for 'import os' in the codebase", "search_files", "file_ops"),
|
||||
("Find all Python files mentioning 'error'", "search_files", "file_ops"),
|
||||
("Search for TODO comments", "search_files", "file_ops"),
|
||||
("Find where 'authenticate' is defined", "search_files", "file_ops"),
|
||||
("Look for any hardcoded API keys", "search_files", "file_ops"),
|
||||
("Read the Makefile", "read_file", "file_ops"),
|
||||
("Show me the Dockerfile", "read_file", "file_ops"),
|
||||
("Read the docker-compose.yml", "read_file", "file_ops"),
|
||||
("Save the function to utils.py", "write_file", "file_ops"),
|
||||
("Create a backup of config.yaml", "write_file", "file_ops"),
|
||||
|
||||
# Terminal commands (20)
|
||||
("List all files in the current directory", "terminal", "terminal"),
|
||||
("Show disk usage", "terminal", "terminal"),
|
||||
("Check what processes are running", "terminal", "terminal"),
|
||||
("Show the git log", "terminal", "terminal"),
|
||||
("Check the Python version", "terminal", "terminal"),
|
||||
("Run ls -la in the home directory", "terminal", "terminal"),
|
||||
("Show the current date and time", "terminal", "terminal"),
|
||||
("Check network connectivity with ping", "terminal", "terminal"),
|
||||
("Show environment variables", "terminal", "terminal"),
|
||||
("List running docker containers", "terminal", "terminal"),
|
||||
("Check system memory usage", "terminal", "terminal"),
|
||||
("Show the crontab", "terminal", "terminal"),
|
||||
("Check the firewall status", "terminal", "terminal"),
|
||||
("Show recent log entries", "terminal", "terminal"),
|
||||
("Check disk free space", "terminal", "terminal"),
|
||||
("Run a system update check", "terminal", "terminal"),
|
||||
("Show open network connections", "terminal", "terminal"),
|
||||
("Check the timezone", "terminal", "terminal"),
|
||||
("List tmux sessions", "terminal", "terminal"),
|
||||
("Check systemd service status", "terminal", "terminal"),
|
||||
|
||||
# Web search (15)
|
||||
("Search for Python asyncio documentation", "web_search", "web"),
|
||||
("Look up the latest GPT-4 pricing", "web_search", "web"),
|
||||
("Find information about Gemma 4 benchmarks", "web_search", "web"),
|
||||
("Search for Rust vs Go performance comparison", "web_search", "web"),
|
||||
("Look up Docker best practices", "web_search", "web"),
|
||||
("Search for Kubernetes deployment tutorials", "web_search", "web"),
|
||||
("Find the latest AI safety research papers", "web_search", "web"),
|
||||
("Search for SQLite vs PostgreSQL comparison", "web_search", "web"),
|
||||
("Look up Linux kernel tuning parameters", "web_search", "web"),
|
||||
("Search for WebSocket protocol specification", "web_search", "web"),
|
||||
("Find information about Matrix protocol federation", "web_search", "web"),
|
||||
("Search for MCP protocol documentation", "web_search", "web"),
|
||||
("Look up A2A agent protocol spec", "web_search", "web"),
|
||||
("Search for quantization methods for LLMs", "web_search", "web"),
|
||||
("Find information about GRPO training", "web_search", "web"),
|
||||
|
||||
# Code execution (15)
|
||||
("Calculate the factorial of 20", "execute_code", "code"),
|
||||
("Parse this JSON and extract keys", "execute_code", "code"),
|
||||
("Sort a list of numbers", "execute_code", "code"),
|
||||
("Calculate the fibonacci sequence", "execute_code", "code"),
|
||||
("Convert a CSV to JSON", "execute_code", "code"),
|
||||
("Parse an email address", "execute_code", "code"),
|
||||
("Calculate elapsed time between dates", "execute_code", "code"),
|
||||
("Generate a random password", "execute_code", "code"),
|
||||
("Hash a string with SHA256", "execute_code", "code"),
|
||||
("Parse a URL into components", "execute_code", "code"),
|
||||
("Calculate statistics on a dataset", "execute_code", "code"),
|
||||
("Convert epoch timestamp to human readable", "execute_code", "code"),
|
||||
("Validate an IPv4 address", "execute_code", "code"),
|
||||
("Calculate the distance between coordinates", "execute_code", "code"),
|
||||
("Generate a UUID", "execute_code", "code"),
|
||||
|
||||
# Parallel tool calls (10)
|
||||
("Read config.yaml and show git status at the same time", "read_file|terminal", "parallel"),
|
||||
("Check disk usage and memory usage simultaneously", "terminal|terminal", "parallel"),
|
||||
("Read two files at once: README and CHANGELOG", "read_file|read_file", "parallel"),
|
||||
("Search for imports in both Python and JS files", "search_files|search_files", "parallel"),
|
||||
("Check git log and disk space in parallel", "terminal|terminal", "parallel"),
|
||||
("Read the Makefile and Dockerfile together", "read_file|read_file", "parallel"),
|
||||
("Search for TODO and FIXME at the same time", "search_files|search_files", "parallel"),
|
||||
("List files and check Python version simultaneously", "terminal|terminal", "parallel"),
|
||||
("Read package.json and requirements.txt together", "read_file|read_file", "parallel"),
|
||||
("Check system time and uptime in parallel", "terminal|terminal", "parallel"),
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class BenchmarkResult:
|
||||
model: str
|
||||
prompt: str
|
||||
expected_tool: str
|
||||
category: str
|
||||
success: bool = False
|
||||
tool_called: str = ""
|
||||
args_valid: bool = False
|
||||
latency_ms: float = 0.0
|
||||
prompt_tokens: int = 0
|
||||
completion_tokens: int = 0
|
||||
error: str = ""
|
||||
|
||||
|
||||
def call_model(model: str, prompt: str) -> dict:
|
||||
"""Call a model with tool schemas and return the response."""
|
||||
url = f"{ENDPOINT}/chat/completions"
|
||||
data = {
|
||||
"model": model,
|
||||
"messages": [
|
||||
{"role": "system", "content": SYSTEM_PROMPT},
|
||||
{"role": "user", "content": prompt},
|
||||
],
|
||||
"tools": TOOL_SCHEMAS,
|
||||
"max_tokens": 512,
|
||||
"temperature": 0.0,
|
||||
}
|
||||
body = json.dumps(data).encode()
|
||||
req = urllib.request.Request(url, data=body, headers={
|
||||
"Content-Type": "application/json",
|
||||
"Authorization": f"Bearer {API_KEY}",
|
||||
}, method="POST")
|
||||
|
||||
start = time.time()
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
result = json.loads(resp.read())
|
||||
elapsed = time.time() - start
|
||||
return {"response": result, "elapsed": elapsed, "error": None}
|
||||
except Exception as e:
|
||||
elapsed = time.time() - start
|
||||
return {"response": None, "elapsed": elapsed, "error": str(e)}
|
||||
|
||||
|
||||
def evaluate_response(result: dict, expected_tool: str) -> BenchmarkResult:
|
||||
"""Evaluate a model response against expectations."""
|
||||
resp = result.get("response")
|
||||
error = result.get("error", "")
|
||||
elapsed = result.get("elapsed", 0)
|
||||
|
||||
br = BenchmarkResult(
|
||||
model="",
|
||||
prompt="",
|
||||
expected_tool=expected_tool,
|
||||
category="",
|
||||
latency_ms=round(elapsed * 1000, 1),
|
||||
error=error or "",
|
||||
)
|
||||
|
||||
if not resp:
|
||||
br.success = False
|
||||
return br
|
||||
|
||||
usage = resp.get("usage", {})
|
||||
br.prompt_tokens = usage.get("prompt_tokens", 0)
|
||||
br.completion_tokens = usage.get("completion_tokens", 0)
|
||||
|
||||
choice = resp.get("choices", [{}])[0]
|
||||
message = choice.get("message", {})
|
||||
tool_calls = message.get("tool_calls", [])
|
||||
|
||||
if not tool_calls:
|
||||
br.success = False
|
||||
br.error = "no_tool_calls"
|
||||
return br
|
||||
|
||||
# Check first tool call
|
||||
tc = tool_calls[0]
|
||||
fn = tc.get("function", {})
|
||||
br.tool_called = fn.get("name", "")
|
||||
|
||||
# Parse args
|
||||
args_str = fn.get("arguments", "{}")
|
||||
try:
|
||||
json.loads(args_str)
|
||||
br.args_valid = True
|
||||
except json.JSONDecodeError:
|
||||
# Try normalization
|
||||
try:
|
||||
import re
|
||||
fixed = re.sub(r',\s*([}\]])', r'\1', args_str.strip())
|
||||
json.loads(fixed)
|
||||
br.args_valid = True
|
||||
except:
|
||||
br.args_valid = False
|
||||
|
||||
# Success = tool called matches expected (or contains it for parallel)
|
||||
expected = expected_tool.split("|")[0] # primary expected tool
|
||||
br.success = br.tool_called == expected and br.args_valid
|
||||
|
||||
return br
|
||||
|
||||
|
||||
def run_benchmark(model: str, prompts: list, limit: int = None) -> List[BenchmarkResult]:
|
||||
"""Run benchmark against a model."""
|
||||
if limit:
|
||||
prompts = prompts[:limit]
|
||||
|
||||
results = []
|
||||
for i, (prompt, expected_tool, category) in enumerate(prompts):
|
||||
print(f" [{i+1}/{len(prompts)}] {model}: {prompt[:50]}...", end=" ", flush=True)
|
||||
|
||||
raw = call_model(model, prompt)
|
||||
br = evaluate_response(raw, expected_tool)
|
||||
br.model = model
|
||||
br.prompt = prompt
|
||||
br.category = category
|
||||
|
||||
status = "OK" if br.success else f"FAIL({br.error or br.tool_called})"
|
||||
print(f"{status} {br.latency_ms}ms")
|
||||
results.append(br)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def generate_report(results: List[BenchmarkResult]) -> str:
|
||||
"""Generate markdown benchmark report."""
|
||||
by_model = {}
|
||||
for r in results:
|
||||
if r.model not in by_model:
|
||||
by_model[r.model] = []
|
||||
by_model[r.model].append(r)
|
||||
|
||||
lines = [
|
||||
"# Gemma 4 Tool Calling Benchmark",
|
||||
f"",
|
||||
f"**Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
|
||||
f"**Prompts:** {len(results) // len(by_model)} per model",
|
||||
f"",
|
||||
]
|
||||
|
||||
# Summary table
|
||||
lines.append("| Metric | " + " | ".join(by_model.keys()) + " |")
|
||||
lines.append("|--------|" + "|".join(["--------"] * len(by_model)) + "|")
|
||||
|
||||
metrics = ["schema_parse", "tool_execution", "avg_latency_ms", "total_prompt_tokens"]
|
||||
for metric in ["success_rate", "args_valid_rate", "avg_latency_ms", "total_prompt_tokens"]:
|
||||
vals = []
|
||||
for model, rs in by_model.items():
|
||||
if metric == "success_rate":
|
||||
v = sum(1 for r in rs if r.success) / len(rs) * 100
|
||||
vals.append(f"{v:.1f}%")
|
||||
elif metric == "args_valid_rate":
|
||||
v = sum(1 for r in rs if r.args_valid) / len(rs) * 100
|
||||
vals.append(f"{v:.1f}%")
|
||||
elif metric == "avg_latency_ms":
|
||||
v = sum(r.latency_ms for r in rs) / len(rs)
|
||||
vals.append(f"{v:.0f}ms")
|
||||
elif metric == "total_prompt_tokens":
|
||||
v = sum(r.prompt_tokens for r in rs)
|
||||
vals.append(f"{v:,}")
|
||||
label = metric.replace("_", " ").title()
|
||||
lines.append(f"| {label} | " + " | ".join(vals) + " |")
|
||||
|
||||
lines.append("")
|
||||
|
||||
# By category
|
||||
lines.append("## By Category")
|
||||
lines.append("")
|
||||
lines.append("| Category | " + " | ".join(f"{m} success" for m in by_model.keys()) + " |")
|
||||
lines.append("|----------|" + "|".join(["--------"] * len(by_model)) + "|")
|
||||
|
||||
categories = sorted(set(r.category for r in results))
|
||||
for cat in categories:
|
||||
vals = []
|
||||
for model, rs in by_model.items():
|
||||
cat_results = [r for r in rs if r.category == cat]
|
||||
if cat_results:
|
||||
v = sum(1 for r in cat_results if r.success) / len(cat_results) * 100
|
||||
vals.append(f"{v:.0f}%")
|
||||
else:
|
||||
vals.append("N/A")
|
||||
lines.append(f"| {cat} | " + " | ".join(vals) + " |")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Tool calling benchmark")
|
||||
parser.add_argument("--model1", default="gemma3:27b")
|
||||
parser.add_argument("--model2", default="xiaomi/mimo-v2-pro")
|
||||
parser.add_argument("--endpoint", default=ENDPOINT)
|
||||
parser.add_argument("--limit", type=int, default=None)
|
||||
parser.add_argument("--output", default=None)
|
||||
parser.add_argument("--markdown", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
global ENDPOINT
|
||||
ENDPOINT = args.endpoint
|
||||
|
||||
prompts = TEST_PROMPTS
|
||||
if args.limit:
|
||||
prompts = prompts[:args.limit]
|
||||
|
||||
print(f"Benchmark: {args.model1} vs {args.model2}")
|
||||
print(f"Prompts: {len(prompts)}")
|
||||
print()
|
||||
|
||||
print(f"--- {args.model1} ---")
|
||||
results1 = run_benchmark(args.model1, prompts)
|
||||
|
||||
print(f"\n--- {args.model2} ---")
|
||||
results2 = run_benchmark(args.model2, prompts)
|
||||
|
||||
all_results = results1 + results2
|
||||
|
||||
report = generate_report(all_results)
|
||||
print(f"\n{report}")
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
json.dump([r.__dict__ for r in all_results], f, indent=2, default=str)
|
||||
print(f"\nResults saved to {args.output}")
|
||||
|
||||
# Save markdown report
|
||||
report_path = f"benchmarks/gemma4-tool-calling-{datetime.now().strftime('%Y-%m-%d')}.md"
|
||||
Path("benchmarks").mkdir(exist_ok=True)
|
||||
with open(report_path, "w") as f:
|
||||
f.write(report)
|
||||
print(f"Report saved to {report_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,137 +0,0 @@
|
||||
"""Tests for Ultraplan Mode — Issue #840."""
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.ultraplan import (
|
||||
Phase, Stream, Ultraplan,
|
||||
create_ultraplan, save_ultraplan, load_ultraplan,
|
||||
generate_daily_cron_prompt
|
||||
)
|
||||
|
||||
|
||||
class TestPhase:
|
||||
def test_creation(self):
|
||||
phase = Phase(id="A1", name="Setup", artifact="config.yaml")
|
||||
assert phase.id == "A1"
|
||||
assert phase.status == "pending"
|
||||
|
||||
def test_dependencies(self):
|
||||
phase = Phase(id="A2", name="Build", dependencies=["A1"])
|
||||
assert "A1" in phase.dependencies
|
||||
|
||||
|
||||
class TestStream:
|
||||
def test_progress_empty(self):
|
||||
stream = Stream(id="A", name="Stream A")
|
||||
assert stream.progress == 0.0
|
||||
|
||||
def test_progress_partial(self):
|
||||
stream = Stream(id="A", name="Stream A", phases=[
|
||||
Phase(id="A1", name="P1", status="done"),
|
||||
Phase(id="A2", name="P2", status="pending"),
|
||||
])
|
||||
assert stream.progress == 0.5
|
||||
|
||||
def test_current_phase(self):
|
||||
stream = Stream(id="A", name="Stream A", phases=[
|
||||
Phase(id="A1", name="P1", status="done"),
|
||||
Phase(id="A2", name="P2", status="active"),
|
||||
Phase(id="A3", name="P3", status="pending"),
|
||||
])
|
||||
assert stream.current_phase.id == "A2"
|
||||
|
||||
|
||||
class TestUltraplan:
|
||||
def test_to_markdown(self):
|
||||
plan = Ultraplan(
|
||||
date="20260415",
|
||||
mission="Test mission",
|
||||
streams=[
|
||||
Stream(id="A", name="Stream A", phases=[
|
||||
Phase(id="A1", name="Phase 1", artifact="file.txt"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
md = plan.to_markdown()
|
||||
assert "# Ultraplan: 20260415" in md
|
||||
assert "Test mission" in md
|
||||
assert "Stream A" in md
|
||||
|
||||
def test_progress(self):
|
||||
plan = Ultraplan(
|
||||
date="20260415",
|
||||
mission="Test",
|
||||
streams=[
|
||||
Stream(id="A", name="A", status="done", phases=[
|
||||
Phase(id="A1", name="P1", status="done"),
|
||||
]),
|
||||
Stream(id="B", name="B", status="pending", phases=[
|
||||
Phase(id="B1", name="P1", status="pending"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
assert plan.progress == 0.5
|
||||
|
||||
def test_to_dict(self):
|
||||
plan = Ultraplan(date="20260415", mission="Test")
|
||||
d = plan.to_dict()
|
||||
assert d["date"] == "20260415"
|
||||
assert d["mission"] == "Test"
|
||||
|
||||
|
||||
class TestCreateUltraplan:
|
||||
def test_default_date(self):
|
||||
plan = create_ultraplan(mission="Test")
|
||||
assert len(plan.date) == 8 # YYYYMMDD
|
||||
|
||||
def test_with_streams(self):
|
||||
plan = create_ultraplan(
|
||||
mission="Test",
|
||||
streams=[
|
||||
{
|
||||
"id": "A",
|
||||
"name": "Stream A",
|
||||
"phases": [
|
||||
{"id": "A1", "name": "Setup", "artifact": "config.yaml"},
|
||||
{"id": "A2", "name": "Build", "dependencies": ["A1"]},
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
assert len(plan.streams) == 1
|
||||
assert len(plan.streams[0].phases) == 2
|
||||
assert plan.streams[0].phases[1].dependencies == ["A1"]
|
||||
|
||||
|
||||
class TestSaveLoad:
|
||||
def test_roundtrip(self, tmp_path):
|
||||
plan = create_ultraplan(
|
||||
date="20260415",
|
||||
mission="Test roundtrip",
|
||||
streams=[{"id": "A", "name": "Stream A"}],
|
||||
)
|
||||
|
||||
save_ultraplan(plan, base_dir=tmp_path)
|
||||
loaded = load_ultraplan("20260415", base_dir=tmp_path)
|
||||
|
||||
assert loaded is not None
|
||||
assert loaded.date == "20260415"
|
||||
assert loaded.mission == "Test roundtrip"
|
||||
|
||||
def test_nonexistent_returns_none(self, tmp_path):
|
||||
assert load_ultraplan("99999999", base_dir=tmp_path) is None
|
||||
|
||||
|
||||
class TestCronPrompt:
|
||||
def test_has_required_elements(self):
|
||||
prompt = generate_daily_cron_prompt()
|
||||
assert "Ultraplan" in prompt
|
||||
assert "streams" in prompt.lower()
|
||||
assert "Gitea" in prompt
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,310 +0,0 @@
|
||||
"""Ultraplan Mode — Daily autonomous planning and execution discipline.
|
||||
|
||||
Decomposes assigned tasks into parallel work streams with explicit
|
||||
dependencies, phases, and artifact targets.
|
||||
|
||||
Issue #840: Ultraplan Mode: Daily autonomous planning and execution
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Phase:
|
||||
"""A single phase within a work stream."""
|
||||
id: str
|
||||
name: str
|
||||
description: str = ""
|
||||
status: str = "pending" # pending, active, done, blocked
|
||||
artifact: str = "" # Expected deliverable
|
||||
dependencies: List[str] = field(default_factory=list)
|
||||
started_at: Optional[float] = None
|
||||
completed_at: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Stream:
|
||||
"""A parallel work stream with sequential phases."""
|
||||
id: str
|
||||
name: str
|
||||
phases: List[Phase] = field(default_factory=list)
|
||||
status: str = "pending"
|
||||
|
||||
@property
|
||||
def current_phase(self) -> Optional[Phase]:
|
||||
for p in self.phases:
|
||||
if p.status in ("active", "pending"):
|
||||
return p
|
||||
return None
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
if not self.phases:
|
||||
return 0.0
|
||||
done = sum(1 for p in self.phases if p.status == "done")
|
||||
return done / len(self.phases)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Ultraplan:
|
||||
"""Daily ultraplan with work streams and metrics."""
|
||||
date: str
|
||||
mission: str
|
||||
streams: List[Stream] = field(default_factory=list)
|
||||
metrics: Dict[str, Any] = field(default_factory=dict)
|
||||
notes: str = ""
|
||||
created_at: float = field(default_factory=time.time)
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
if not self.streams:
|
||||
return 0.0
|
||||
return sum(s.progress for s in self.streams) / len(self.streams)
|
||||
|
||||
@property
|
||||
def active_streams(self) -> List[Stream]:
|
||||
return [s for s in self.streams if s.status == "active"]
|
||||
|
||||
@property
|
||||
def blocked_streams(self) -> List[Stream]:
|
||||
return [s for s in self.streams if s.status == "blocked"]
|
||||
|
||||
def to_markdown(self) -> str:
|
||||
"""Generate ultraplan markdown document."""
|
||||
lines = []
|
||||
|
||||
# Header
|
||||
lines.append(f"# Ultraplan: {self.date}")
|
||||
lines.append("")
|
||||
lines.append(f"**Mission:** {self.mission}")
|
||||
lines.append(f"**Created:** {datetime.fromtimestamp(self.created_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
|
||||
lines.append(f"**Progress:** {self.progress:.0%}")
|
||||
lines.append("")
|
||||
|
||||
# Metrics
|
||||
if self.metrics:
|
||||
lines.append("## Metrics")
|
||||
for key, value in self.metrics.items():
|
||||
lines.append(f"- **{key}:** {value}")
|
||||
lines.append("")
|
||||
|
||||
# Streams
|
||||
lines.append("## Work Streams")
|
||||
lines.append("")
|
||||
|
||||
for stream in self.streams:
|
||||
status_icon = {"pending": "○", "active": "●", "done": "✓", "blocked": "✗"}.get(stream.status, "?")
|
||||
lines.append(f"### {status_icon} Stream {stream.id}: {stream.name}")
|
||||
lines.append(f"**Status:** {stream.status} | **Progress:** {stream.progress:.0%}")
|
||||
lines.append("")
|
||||
|
||||
# Phase table
|
||||
lines.append("| Phase | Name | Status | Artifact |")
|
||||
lines.append("|-------|------|--------|----------|")
|
||||
for phase in stream.phases:
|
||||
p_icon = {"pending": "○", "active": "●", "done": "✓", "blocked": "✗"}.get(phase.status, "?")
|
||||
artifact = phase.artifact or "—"
|
||||
lines.append(f"| {phase.id} | {phase.name} | {p_icon} {phase.status} | {artifact} |")
|
||||
lines.append("")
|
||||
|
||||
# Dependency map
|
||||
lines.append("## Dependency Map")
|
||||
lines.append("")
|
||||
for stream in self.streams:
|
||||
deps = []
|
||||
for phase in stream.phases:
|
||||
if phase.dependencies:
|
||||
deps.append(f"{phase.id} depends on: {', '.join(phase.dependencies)}")
|
||||
if deps:
|
||||
lines.append(f"**{stream.id}:** {'; '.join(deps)}")
|
||||
|
||||
if not any(p.dependencies for s in self.streams for p in s.phases):
|
||||
lines.append("All streams are independent — parallel execution possible.")
|
||||
lines.append("")
|
||||
|
||||
# Notes
|
||||
if self.notes:
|
||||
lines.append("## Notes")
|
||||
lines.append(self.notes)
|
||||
lines.append("")
|
||||
|
||||
# Footer
|
||||
lines.append("---")
|
||||
lines.append(f"*Generated by Ultraplan Mode — {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to JSON-serializable dict."""
|
||||
return {
|
||||
"date": self.date,
|
||||
"mission": self.mission,
|
||||
"streams": [
|
||||
{
|
||||
"id": s.id,
|
||||
"name": s.name,
|
||||
"status": s.status,
|
||||
"phases": [
|
||||
{
|
||||
"id": p.id,
|
||||
"name": p.name,
|
||||
"description": p.description,
|
||||
"status": p.status,
|
||||
"artifact": p.artifact,
|
||||
"dependencies": p.dependencies,
|
||||
}
|
||||
for p in s.phases
|
||||
],
|
||||
}
|
||||
for s in self.streams
|
||||
],
|
||||
"metrics": self.metrics,
|
||||
"notes": self.notes,
|
||||
"progress": self.progress,
|
||||
"created_at": self.created_at,
|
||||
}
|
||||
|
||||
|
||||
def create_ultraplan(
|
||||
date: str = None,
|
||||
mission: str = "",
|
||||
streams: List[Dict[str, Any]] = None,
|
||||
) -> Ultraplan:
|
||||
"""Create a new ultraplan.
|
||||
|
||||
Args:
|
||||
date: Plan date (default: today)
|
||||
mission: High-level mission statement
|
||||
streams: List of stream definitions
|
||||
"""
|
||||
if date is None:
|
||||
date = datetime.now().strftime("%Y%m%d")
|
||||
|
||||
plan_streams = []
|
||||
if streams:
|
||||
for s in streams:
|
||||
phases = [
|
||||
Phase(
|
||||
id=p.get("id", f"{s.get('id', 'S')}{i+1}"),
|
||||
name=p.get("name", f"Phase {i+1}"),
|
||||
description=p.get("description", ""),
|
||||
artifact=p.get("artifact", ""),
|
||||
dependencies=p.get("dependencies", []),
|
||||
)
|
||||
for i, p in enumerate(s.get("phases", []))
|
||||
]
|
||||
plan_streams.append(Stream(
|
||||
id=s.get("id", f"S{len(plan_streams)+1}"),
|
||||
name=s.get("name", "Unnamed Stream"),
|
||||
phases=phases,
|
||||
))
|
||||
|
||||
return Ultraplan(
|
||||
date=date,
|
||||
mission=mission,
|
||||
streams=plan_streams,
|
||||
)
|
||||
|
||||
|
||||
def save_ultraplan(plan: Ultraplan, base_dir: Path = None) -> Path:
|
||||
"""Save ultraplan to disk.
|
||||
|
||||
Args:
|
||||
plan: The ultraplan to save
|
||||
base_dir: Base directory (default: ~/.timmy/cron/)
|
||||
|
||||
Returns:
|
||||
Path to saved file
|
||||
"""
|
||||
if base_dir is None:
|
||||
base_dir = Path.home() / ".timmy" / "cron"
|
||||
|
||||
base_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Save markdown
|
||||
md_path = base_dir / f"ultraplan_{plan.date}.md"
|
||||
md_path.write_text(plan.to_markdown(), encoding="utf-8")
|
||||
|
||||
# Save JSON (for programmatic access)
|
||||
json_path = base_dir / f"ultraplan_{plan.date}.json"
|
||||
json_path.write_text(json.dumps(plan.to_dict(), indent=2), encoding="utf-8")
|
||||
|
||||
return md_path
|
||||
|
||||
|
||||
def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
|
||||
"""Load ultraplan from disk.
|
||||
|
||||
Args:
|
||||
date: Plan date (YYYYMMDD)
|
||||
base_dir: Base directory (default: ~/.timmy/cron/)
|
||||
|
||||
Returns:
|
||||
Ultraplan if found, None otherwise
|
||||
"""
|
||||
if base_dir is None:
|
||||
base_dir = Path.home() / ".timmy" / "cron"
|
||||
|
||||
json_path = base_dir / f"ultraplan_{date}.json"
|
||||
if not json_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(json_path.read_text(encoding="utf-8"))
|
||||
|
||||
streams = []
|
||||
for s in data.get("streams", []):
|
||||
phases = [
|
||||
Phase(
|
||||
id=p["id"],
|
||||
name=p["name"],
|
||||
description=p.get("description", ""),
|
||||
status=p.get("status", "pending"),
|
||||
artifact=p.get("artifact", ""),
|
||||
dependencies=p.get("dependencies", []),
|
||||
)
|
||||
for p in s.get("phases", [])
|
||||
]
|
||||
streams.append(Stream(
|
||||
id=s["id"],
|
||||
name=s["name"],
|
||||
phases=phases,
|
||||
status=s.get("status", "pending"),
|
||||
))
|
||||
|
||||
return Ultraplan(
|
||||
date=data["date"],
|
||||
mission=data.get("mission", ""),
|
||||
streams=streams,
|
||||
metrics=data.get("metrics", {}),
|
||||
notes=data.get("notes", ""),
|
||||
created_at=data.get("created_at", time.time()),
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def generate_daily_cron_prompt() -> str:
|
||||
"""Generate the prompt for the daily ultraplan cron job."""
|
||||
return """Generate today's Ultraplan.
|
||||
|
||||
Steps:
|
||||
1. Check open Gitea issues assigned to you
|
||||
2. Check open PRs needing review
|
||||
3. Check fleet health status
|
||||
4. Decompose work into parallel streams
|
||||
5. Generate ultraplan_YYYYMMDD.md
|
||||
6. File Gitea issue with the plan
|
||||
|
||||
Output format:
|
||||
- Mission statement
|
||||
- 3-5 work streams with phases
|
||||
- Dependency map
|
||||
- Success metrics
|
||||
"""
|
||||
Reference in New Issue
Block a user