fix: Ultraplan Mode for daily autonomous planning (closes #840)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Successful in 37s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 39s
Tests / test (pull_request) Failing after 1h15m33s
Tests / e2e (pull_request) Successful in 2m20s

This commit is contained in:
Timmy Time
2026-04-15 22:14:16 -04:00
parent db72e908f7
commit fb7464995c
2 changed files with 447 additions and 0 deletions

137
tests/test_ultraplan.py Normal file
View File

@@ -0,0 +1,137 @@
"""Tests for Ultraplan Mode — Issue #840."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from tools.ultraplan import (
Phase, Stream, Ultraplan,
create_ultraplan, save_ultraplan, load_ultraplan,
generate_daily_cron_prompt
)
class TestPhase:
def test_creation(self):
phase = Phase(id="A1", name="Setup", artifact="config.yaml")
assert phase.id == "A1"
assert phase.status == "pending"
def test_dependencies(self):
phase = Phase(id="A2", name="Build", dependencies=["A1"])
assert "A1" in phase.dependencies
class TestStream:
def test_progress_empty(self):
stream = Stream(id="A", name="Stream A")
assert stream.progress == 0.0
def test_progress_partial(self):
stream = Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="P1", status="done"),
Phase(id="A2", name="P2", status="pending"),
])
assert stream.progress == 0.5
def test_current_phase(self):
stream = Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="P1", status="done"),
Phase(id="A2", name="P2", status="active"),
Phase(id="A3", name="P3", status="pending"),
])
assert stream.current_phase.id == "A2"
class TestUltraplan:
def test_to_markdown(self):
plan = Ultraplan(
date="20260415",
mission="Test mission",
streams=[
Stream(id="A", name="Stream A", phases=[
Phase(id="A1", name="Phase 1", artifact="file.txt"),
]),
],
)
md = plan.to_markdown()
assert "# Ultraplan: 20260415" in md
assert "Test mission" in md
assert "Stream A" in md
def test_progress(self):
plan = Ultraplan(
date="20260415",
mission="Test",
streams=[
Stream(id="A", name="A", status="done", phases=[
Phase(id="A1", name="P1", status="done"),
]),
Stream(id="B", name="B", status="pending", phases=[
Phase(id="B1", name="P1", status="pending"),
]),
],
)
assert plan.progress == 0.5
def test_to_dict(self):
plan = Ultraplan(date="20260415", mission="Test")
d = plan.to_dict()
assert d["date"] == "20260415"
assert d["mission"] == "Test"
class TestCreateUltraplan:
def test_default_date(self):
plan = create_ultraplan(mission="Test")
assert len(plan.date) == 8 # YYYYMMDD
def test_with_streams(self):
plan = create_ultraplan(
mission="Test",
streams=[
{
"id": "A",
"name": "Stream A",
"phases": [
{"id": "A1", "name": "Setup", "artifact": "config.yaml"},
{"id": "A2", "name": "Build", "dependencies": ["A1"]},
],
},
],
)
assert len(plan.streams) == 1
assert len(plan.streams[0].phases) == 2
assert plan.streams[0].phases[1].dependencies == ["A1"]
class TestSaveLoad:
def test_roundtrip(self, tmp_path):
plan = create_ultraplan(
date="20260415",
mission="Test roundtrip",
streams=[{"id": "A", "name": "Stream A"}],
)
save_ultraplan(plan, base_dir=tmp_path)
loaded = load_ultraplan("20260415", base_dir=tmp_path)
assert loaded is not None
assert loaded.date == "20260415"
assert loaded.mission == "Test roundtrip"
def test_nonexistent_returns_none(self, tmp_path):
assert load_ultraplan("99999999", base_dir=tmp_path) is None
class TestCronPrompt:
def test_has_required_elements(self):
prompt = generate_daily_cron_prompt()
assert "Ultraplan" in prompt
assert "streams" in prompt.lower()
assert "Gitea" in prompt
if __name__ == "__main__":
import pytest
pytest.main([__file__, "-v"])

310
tools/ultraplan.py Normal file
View File

@@ -0,0 +1,310 @@
"""Ultraplan Mode — Daily autonomous planning and execution discipline.
Decomposes assigned tasks into parallel work streams with explicit
dependencies, phases, and artifact targets.
Issue #840: Ultraplan Mode: Daily autonomous planning and execution
"""
import json
import os
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
@dataclass
class Phase:
"""A single phase within a work stream."""
id: str
name: str
description: str = ""
status: str = "pending" # pending, active, done, blocked
artifact: str = "" # Expected deliverable
dependencies: List[str] = field(default_factory=list)
started_at: Optional[float] = None
completed_at: Optional[float] = None
@dataclass
class Stream:
"""A parallel work stream with sequential phases."""
id: str
name: str
phases: List[Phase] = field(default_factory=list)
status: str = "pending"
@property
def current_phase(self) -> Optional[Phase]:
for p in self.phases:
if p.status in ("active", "pending"):
return p
return None
@property
def progress(self) -> float:
if not self.phases:
return 0.0
done = sum(1 for p in self.phases if p.status == "done")
return done / len(self.phases)
@dataclass
class Ultraplan:
"""Daily ultraplan with work streams and metrics."""
date: str
mission: str
streams: List[Stream] = field(default_factory=list)
metrics: Dict[str, Any] = field(default_factory=dict)
notes: str = ""
created_at: float = field(default_factory=time.time)
@property
def progress(self) -> float:
if not self.streams:
return 0.0
return sum(s.progress for s in self.streams) / len(self.streams)
@property
def active_streams(self) -> List[Stream]:
return [s for s in self.streams if s.status == "active"]
@property
def blocked_streams(self) -> List[Stream]:
return [s for s in self.streams if s.status == "blocked"]
def to_markdown(self) -> str:
"""Generate ultraplan markdown document."""
lines = []
# Header
lines.append(f"# Ultraplan: {self.date}")
lines.append("")
lines.append(f"**Mission:** {self.mission}")
lines.append(f"**Created:** {datetime.fromtimestamp(self.created_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
lines.append(f"**Progress:** {self.progress:.0%}")
lines.append("")
# Metrics
if self.metrics:
lines.append("## Metrics")
for key, value in self.metrics.items():
lines.append(f"- **{key}:** {value}")
lines.append("")
# Streams
lines.append("## Work Streams")
lines.append("")
for stream in self.streams:
status_icon = {"pending": "", "active": "", "done": "", "blocked": ""}.get(stream.status, "?")
lines.append(f"### {status_icon} Stream {stream.id}: {stream.name}")
lines.append(f"**Status:** {stream.status} | **Progress:** {stream.progress:.0%}")
lines.append("")
# Phase table
lines.append("| Phase | Name | Status | Artifact |")
lines.append("|-------|------|--------|----------|")
for phase in stream.phases:
p_icon = {"pending": "", "active": "", "done": "", "blocked": ""}.get(phase.status, "?")
artifact = phase.artifact or ""
lines.append(f"| {phase.id} | {phase.name} | {p_icon} {phase.status} | {artifact} |")
lines.append("")
# Dependency map
lines.append("## Dependency Map")
lines.append("")
for stream in self.streams:
deps = []
for phase in stream.phases:
if phase.dependencies:
deps.append(f"{phase.id} depends on: {', '.join(phase.dependencies)}")
if deps:
lines.append(f"**{stream.id}:** {'; '.join(deps)}")
if not any(p.dependencies for s in self.streams for p in s.phases):
lines.append("All streams are independent — parallel execution possible.")
lines.append("")
# Notes
if self.notes:
lines.append("## Notes")
lines.append(self.notes)
lines.append("")
# Footer
lines.append("---")
lines.append(f"*Generated by Ultraplan Mode — {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
return "\n".join(lines)
def to_dict(self) -> Dict[str, Any]:
"""Convert to JSON-serializable dict."""
return {
"date": self.date,
"mission": self.mission,
"streams": [
{
"id": s.id,
"name": s.name,
"status": s.status,
"phases": [
{
"id": p.id,
"name": p.name,
"description": p.description,
"status": p.status,
"artifact": p.artifact,
"dependencies": p.dependencies,
}
for p in s.phases
],
}
for s in self.streams
],
"metrics": self.metrics,
"notes": self.notes,
"progress": self.progress,
"created_at": self.created_at,
}
def create_ultraplan(
date: str = None,
mission: str = "",
streams: List[Dict[str, Any]] = None,
) -> Ultraplan:
"""Create a new ultraplan.
Args:
date: Plan date (default: today)
mission: High-level mission statement
streams: List of stream definitions
"""
if date is None:
date = datetime.now().strftime("%Y%m%d")
plan_streams = []
if streams:
for s in streams:
phases = [
Phase(
id=p.get("id", f"{s.get('id', 'S')}{i+1}"),
name=p.get("name", f"Phase {i+1}"),
description=p.get("description", ""),
artifact=p.get("artifact", ""),
dependencies=p.get("dependencies", []),
)
for i, p in enumerate(s.get("phases", []))
]
plan_streams.append(Stream(
id=s.get("id", f"S{len(plan_streams)+1}"),
name=s.get("name", "Unnamed Stream"),
phases=phases,
))
return Ultraplan(
date=date,
mission=mission,
streams=plan_streams,
)
def save_ultraplan(plan: Ultraplan, base_dir: Path = None) -> Path:
"""Save ultraplan to disk.
Args:
plan: The ultraplan to save
base_dir: Base directory (default: ~/.timmy/cron/)
Returns:
Path to saved file
"""
if base_dir is None:
base_dir = Path.home() / ".timmy" / "cron"
base_dir.mkdir(parents=True, exist_ok=True)
# Save markdown
md_path = base_dir / f"ultraplan_{plan.date}.md"
md_path.write_text(plan.to_markdown(), encoding="utf-8")
# Save JSON (for programmatic access)
json_path = base_dir / f"ultraplan_{plan.date}.json"
json_path.write_text(json.dumps(plan.to_dict(), indent=2), encoding="utf-8")
return md_path
def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
"""Load ultraplan from disk.
Args:
date: Plan date (YYYYMMDD)
base_dir: Base directory (default: ~/.timmy/cron/)
Returns:
Ultraplan if found, None otherwise
"""
if base_dir is None:
base_dir = Path.home() / ".timmy" / "cron"
json_path = base_dir / f"ultraplan_{date}.json"
if not json_path.exists():
return None
try:
data = json.loads(json_path.read_text(encoding="utf-8"))
streams = []
for s in data.get("streams", []):
phases = [
Phase(
id=p["id"],
name=p["name"],
description=p.get("description", ""),
status=p.get("status", "pending"),
artifact=p.get("artifact", ""),
dependencies=p.get("dependencies", []),
)
for p in s.get("phases", [])
]
streams.append(Stream(
id=s["id"],
name=s["name"],
phases=phases,
status=s.get("status", "pending"),
))
return Ultraplan(
date=data["date"],
mission=data.get("mission", ""),
streams=streams,
metrics=data.get("metrics", {}),
notes=data.get("notes", ""),
created_at=data.get("created_at", time.time()),
)
except Exception:
return None
def generate_daily_cron_prompt() -> str:
"""Generate the prompt for the daily ultraplan cron job."""
return """Generate today's Ultraplan.
Steps:
1. Check open Gitea issues assigned to you
2. Check open PRs needing review
3. Check fleet health status
4. Decompose work into parallel streams
5. Generate ultraplan_YYYYMMDD.md
6. File Gitea issue with the plan
Output format:
- Mission statement
- 3-5 work streams with phases
- Dependency map
- Success metrics
"""