Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fb7464995c |
137
tests/test_ultraplan.py
Normal file
137
tests/test_ultraplan.py
Normal file
@@ -0,0 +1,137 @@
|
|||||||
|
"""Tests for Ultraplan Mode — Issue #840."""
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||||
|
|
||||||
|
from tools.ultraplan import (
|
||||||
|
Phase, Stream, Ultraplan,
|
||||||
|
create_ultraplan, save_ultraplan, load_ultraplan,
|
||||||
|
generate_daily_cron_prompt
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPhase:
|
||||||
|
def test_creation(self):
|
||||||
|
phase = Phase(id="A1", name="Setup", artifact="config.yaml")
|
||||||
|
assert phase.id == "A1"
|
||||||
|
assert phase.status == "pending"
|
||||||
|
|
||||||
|
def test_dependencies(self):
|
||||||
|
phase = Phase(id="A2", name="Build", dependencies=["A1"])
|
||||||
|
assert "A1" in phase.dependencies
|
||||||
|
|
||||||
|
|
||||||
|
class TestStream:
|
||||||
|
def test_progress_empty(self):
|
||||||
|
stream = Stream(id="A", name="Stream A")
|
||||||
|
assert stream.progress == 0.0
|
||||||
|
|
||||||
|
def test_progress_partial(self):
|
||||||
|
stream = Stream(id="A", name="Stream A", phases=[
|
||||||
|
Phase(id="A1", name="P1", status="done"),
|
||||||
|
Phase(id="A2", name="P2", status="pending"),
|
||||||
|
])
|
||||||
|
assert stream.progress == 0.5
|
||||||
|
|
||||||
|
def test_current_phase(self):
|
||||||
|
stream = Stream(id="A", name="Stream A", phases=[
|
||||||
|
Phase(id="A1", name="P1", status="done"),
|
||||||
|
Phase(id="A2", name="P2", status="active"),
|
||||||
|
Phase(id="A3", name="P3", status="pending"),
|
||||||
|
])
|
||||||
|
assert stream.current_phase.id == "A2"
|
||||||
|
|
||||||
|
|
||||||
|
class TestUltraplan:
|
||||||
|
def test_to_markdown(self):
|
||||||
|
plan = Ultraplan(
|
||||||
|
date="20260415",
|
||||||
|
mission="Test mission",
|
||||||
|
streams=[
|
||||||
|
Stream(id="A", name="Stream A", phases=[
|
||||||
|
Phase(id="A1", name="Phase 1", artifact="file.txt"),
|
||||||
|
]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
md = plan.to_markdown()
|
||||||
|
assert "# Ultraplan: 20260415" in md
|
||||||
|
assert "Test mission" in md
|
||||||
|
assert "Stream A" in md
|
||||||
|
|
||||||
|
def test_progress(self):
|
||||||
|
plan = Ultraplan(
|
||||||
|
date="20260415",
|
||||||
|
mission="Test",
|
||||||
|
streams=[
|
||||||
|
Stream(id="A", name="A", status="done", phases=[
|
||||||
|
Phase(id="A1", name="P1", status="done"),
|
||||||
|
]),
|
||||||
|
Stream(id="B", name="B", status="pending", phases=[
|
||||||
|
Phase(id="B1", name="P1", status="pending"),
|
||||||
|
]),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert plan.progress == 0.5
|
||||||
|
|
||||||
|
def test_to_dict(self):
|
||||||
|
plan = Ultraplan(date="20260415", mission="Test")
|
||||||
|
d = plan.to_dict()
|
||||||
|
assert d["date"] == "20260415"
|
||||||
|
assert d["mission"] == "Test"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCreateUltraplan:
|
||||||
|
def test_default_date(self):
|
||||||
|
plan = create_ultraplan(mission="Test")
|
||||||
|
assert len(plan.date) == 8 # YYYYMMDD
|
||||||
|
|
||||||
|
def test_with_streams(self):
|
||||||
|
plan = create_ultraplan(
|
||||||
|
mission="Test",
|
||||||
|
streams=[
|
||||||
|
{
|
||||||
|
"id": "A",
|
||||||
|
"name": "Stream A",
|
||||||
|
"phases": [
|
||||||
|
{"id": "A1", "name": "Setup", "artifact": "config.yaml"},
|
||||||
|
{"id": "A2", "name": "Build", "dependencies": ["A1"]},
|
||||||
|
],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
assert len(plan.streams) == 1
|
||||||
|
assert len(plan.streams[0].phases) == 2
|
||||||
|
assert plan.streams[0].phases[1].dependencies == ["A1"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestSaveLoad:
|
||||||
|
def test_roundtrip(self, tmp_path):
|
||||||
|
plan = create_ultraplan(
|
||||||
|
date="20260415",
|
||||||
|
mission="Test roundtrip",
|
||||||
|
streams=[{"id": "A", "name": "Stream A"}],
|
||||||
|
)
|
||||||
|
|
||||||
|
save_ultraplan(plan, base_dir=tmp_path)
|
||||||
|
loaded = load_ultraplan("20260415", base_dir=tmp_path)
|
||||||
|
|
||||||
|
assert loaded is not None
|
||||||
|
assert loaded.date == "20260415"
|
||||||
|
assert loaded.mission == "Test roundtrip"
|
||||||
|
|
||||||
|
def test_nonexistent_returns_none(self, tmp_path):
|
||||||
|
assert load_ultraplan("99999999", base_dir=tmp_path) is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestCronPrompt:
|
||||||
|
def test_has_required_elements(self):
|
||||||
|
prompt = generate_daily_cron_prompt()
|
||||||
|
assert "Ultraplan" in prompt
|
||||||
|
assert "streams" in prompt.lower()
|
||||||
|
assert "Gitea" in prompt
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import pytest
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
310
tools/ultraplan.py
Normal file
310
tools/ultraplan.py
Normal file
@@ -0,0 +1,310 @@
|
|||||||
|
"""Ultraplan Mode — Daily autonomous planning and execution discipline.
|
||||||
|
|
||||||
|
Decomposes assigned tasks into parallel work streams with explicit
|
||||||
|
dependencies, phases, and artifact targets.
|
||||||
|
|
||||||
|
Issue #840: Ultraplan Mode: Daily autonomous planning and execution
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Phase:
|
||||||
|
"""A single phase within a work stream."""
|
||||||
|
id: str
|
||||||
|
name: str
|
||||||
|
description: str = ""
|
||||||
|
status: str = "pending" # pending, active, done, blocked
|
||||||
|
artifact: str = "" # Expected deliverable
|
||||||
|
dependencies: List[str] = field(default_factory=list)
|
||||||
|
started_at: Optional[float] = None
|
||||||
|
completed_at: Optional[float] = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Stream:
|
||||||
|
"""A parallel work stream with sequential phases."""
|
||||||
|
id: str
|
||||||
|
name: str
|
||||||
|
phases: List[Phase] = field(default_factory=list)
|
||||||
|
status: str = "pending"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def current_phase(self) -> Optional[Phase]:
|
||||||
|
for p in self.phases:
|
||||||
|
if p.status in ("active", "pending"):
|
||||||
|
return p
|
||||||
|
return None
|
||||||
|
|
||||||
|
@property
|
||||||
|
def progress(self) -> float:
|
||||||
|
if not self.phases:
|
||||||
|
return 0.0
|
||||||
|
done = sum(1 for p in self.phases if p.status == "done")
|
||||||
|
return done / len(self.phases)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Ultraplan:
|
||||||
|
"""Daily ultraplan with work streams and metrics."""
|
||||||
|
date: str
|
||||||
|
mission: str
|
||||||
|
streams: List[Stream] = field(default_factory=list)
|
||||||
|
metrics: Dict[str, Any] = field(default_factory=dict)
|
||||||
|
notes: str = ""
|
||||||
|
created_at: float = field(default_factory=time.time)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def progress(self) -> float:
|
||||||
|
if not self.streams:
|
||||||
|
return 0.0
|
||||||
|
return sum(s.progress for s in self.streams) / len(self.streams)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_streams(self) -> List[Stream]:
|
||||||
|
return [s for s in self.streams if s.status == "active"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def blocked_streams(self) -> List[Stream]:
|
||||||
|
return [s for s in self.streams if s.status == "blocked"]
|
||||||
|
|
||||||
|
def to_markdown(self) -> str:
|
||||||
|
"""Generate ultraplan markdown document."""
|
||||||
|
lines = []
|
||||||
|
|
||||||
|
# Header
|
||||||
|
lines.append(f"# Ultraplan: {self.date}")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"**Mission:** {self.mission}")
|
||||||
|
lines.append(f"**Created:** {datetime.fromtimestamp(self.created_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
|
||||||
|
lines.append(f"**Progress:** {self.progress:.0%}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Metrics
|
||||||
|
if self.metrics:
|
||||||
|
lines.append("## Metrics")
|
||||||
|
for key, value in self.metrics.items():
|
||||||
|
lines.append(f"- **{key}:** {value}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Streams
|
||||||
|
lines.append("## Work Streams")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
for stream in self.streams:
|
||||||
|
status_icon = {"pending": "○", "active": "●", "done": "✓", "blocked": "✗"}.get(stream.status, "?")
|
||||||
|
lines.append(f"### {status_icon} Stream {stream.id}: {stream.name}")
|
||||||
|
lines.append(f"**Status:** {stream.status} | **Progress:** {stream.progress:.0%}")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Phase table
|
||||||
|
lines.append("| Phase | Name | Status | Artifact |")
|
||||||
|
lines.append("|-------|------|--------|----------|")
|
||||||
|
for phase in stream.phases:
|
||||||
|
p_icon = {"pending": "○", "active": "●", "done": "✓", "blocked": "✗"}.get(phase.status, "?")
|
||||||
|
artifact = phase.artifact or "—"
|
||||||
|
lines.append(f"| {phase.id} | {phase.name} | {p_icon} {phase.status} | {artifact} |")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Dependency map
|
||||||
|
lines.append("## Dependency Map")
|
||||||
|
lines.append("")
|
||||||
|
for stream in self.streams:
|
||||||
|
deps = []
|
||||||
|
for phase in stream.phases:
|
||||||
|
if phase.dependencies:
|
||||||
|
deps.append(f"{phase.id} depends on: {', '.join(phase.dependencies)}")
|
||||||
|
if deps:
|
||||||
|
lines.append(f"**{stream.id}:** {'; '.join(deps)}")
|
||||||
|
|
||||||
|
if not any(p.dependencies for s in self.streams for p in s.phases):
|
||||||
|
lines.append("All streams are independent — parallel execution possible.")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Notes
|
||||||
|
if self.notes:
|
||||||
|
lines.append("## Notes")
|
||||||
|
lines.append(self.notes)
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
# Footer
|
||||||
|
lines.append("---")
|
||||||
|
lines.append(f"*Generated by Ultraplan Mode — {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict[str, Any]:
|
||||||
|
"""Convert to JSON-serializable dict."""
|
||||||
|
return {
|
||||||
|
"date": self.date,
|
||||||
|
"mission": self.mission,
|
||||||
|
"streams": [
|
||||||
|
{
|
||||||
|
"id": s.id,
|
||||||
|
"name": s.name,
|
||||||
|
"status": s.status,
|
||||||
|
"phases": [
|
||||||
|
{
|
||||||
|
"id": p.id,
|
||||||
|
"name": p.name,
|
||||||
|
"description": p.description,
|
||||||
|
"status": p.status,
|
||||||
|
"artifact": p.artifact,
|
||||||
|
"dependencies": p.dependencies,
|
||||||
|
}
|
||||||
|
for p in s.phases
|
||||||
|
],
|
||||||
|
}
|
||||||
|
for s in self.streams
|
||||||
|
],
|
||||||
|
"metrics": self.metrics,
|
||||||
|
"notes": self.notes,
|
||||||
|
"progress": self.progress,
|
||||||
|
"created_at": self.created_at,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def create_ultraplan(
|
||||||
|
date: str = None,
|
||||||
|
mission: str = "",
|
||||||
|
streams: List[Dict[str, Any]] = None,
|
||||||
|
) -> Ultraplan:
|
||||||
|
"""Create a new ultraplan.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
date: Plan date (default: today)
|
||||||
|
mission: High-level mission statement
|
||||||
|
streams: List of stream definitions
|
||||||
|
"""
|
||||||
|
if date is None:
|
||||||
|
date = datetime.now().strftime("%Y%m%d")
|
||||||
|
|
||||||
|
plan_streams = []
|
||||||
|
if streams:
|
||||||
|
for s in streams:
|
||||||
|
phases = [
|
||||||
|
Phase(
|
||||||
|
id=p.get("id", f"{s.get('id', 'S')}{i+1}"),
|
||||||
|
name=p.get("name", f"Phase {i+1}"),
|
||||||
|
description=p.get("description", ""),
|
||||||
|
artifact=p.get("artifact", ""),
|
||||||
|
dependencies=p.get("dependencies", []),
|
||||||
|
)
|
||||||
|
for i, p in enumerate(s.get("phases", []))
|
||||||
|
]
|
||||||
|
plan_streams.append(Stream(
|
||||||
|
id=s.get("id", f"S{len(plan_streams)+1}"),
|
||||||
|
name=s.get("name", "Unnamed Stream"),
|
||||||
|
phases=phases,
|
||||||
|
))
|
||||||
|
|
||||||
|
return Ultraplan(
|
||||||
|
date=date,
|
||||||
|
mission=mission,
|
||||||
|
streams=plan_streams,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def save_ultraplan(plan: Ultraplan, base_dir: Path = None) -> Path:
|
||||||
|
"""Save ultraplan to disk.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
plan: The ultraplan to save
|
||||||
|
base_dir: Base directory (default: ~/.timmy/cron/)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to saved file
|
||||||
|
"""
|
||||||
|
if base_dir is None:
|
||||||
|
base_dir = Path.home() / ".timmy" / "cron"
|
||||||
|
|
||||||
|
base_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# Save markdown
|
||||||
|
md_path = base_dir / f"ultraplan_{plan.date}.md"
|
||||||
|
md_path.write_text(plan.to_markdown(), encoding="utf-8")
|
||||||
|
|
||||||
|
# Save JSON (for programmatic access)
|
||||||
|
json_path = base_dir / f"ultraplan_{plan.date}.json"
|
||||||
|
json_path.write_text(json.dumps(plan.to_dict(), indent=2), encoding="utf-8")
|
||||||
|
|
||||||
|
return md_path
|
||||||
|
|
||||||
|
|
||||||
|
def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
|
||||||
|
"""Load ultraplan from disk.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
date: Plan date (YYYYMMDD)
|
||||||
|
base_dir: Base directory (default: ~/.timmy/cron/)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Ultraplan if found, None otherwise
|
||||||
|
"""
|
||||||
|
if base_dir is None:
|
||||||
|
base_dir = Path.home() / ".timmy" / "cron"
|
||||||
|
|
||||||
|
json_path = base_dir / f"ultraplan_{date}.json"
|
||||||
|
if not json_path.exists():
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = json.loads(json_path.read_text(encoding="utf-8"))
|
||||||
|
|
||||||
|
streams = []
|
||||||
|
for s in data.get("streams", []):
|
||||||
|
phases = [
|
||||||
|
Phase(
|
||||||
|
id=p["id"],
|
||||||
|
name=p["name"],
|
||||||
|
description=p.get("description", ""),
|
||||||
|
status=p.get("status", "pending"),
|
||||||
|
artifact=p.get("artifact", ""),
|
||||||
|
dependencies=p.get("dependencies", []),
|
||||||
|
)
|
||||||
|
for p in s.get("phases", [])
|
||||||
|
]
|
||||||
|
streams.append(Stream(
|
||||||
|
id=s["id"],
|
||||||
|
name=s["name"],
|
||||||
|
phases=phases,
|
||||||
|
status=s.get("status", "pending"),
|
||||||
|
))
|
||||||
|
|
||||||
|
return Ultraplan(
|
||||||
|
date=data["date"],
|
||||||
|
mission=data.get("mission", ""),
|
||||||
|
streams=streams,
|
||||||
|
metrics=data.get("metrics", {}),
|
||||||
|
notes=data.get("notes", ""),
|
||||||
|
created_at=data.get("created_at", time.time()),
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def generate_daily_cron_prompt() -> str:
|
||||||
|
"""Generate the prompt for the daily ultraplan cron job."""
|
||||||
|
return """Generate today's Ultraplan.
|
||||||
|
|
||||||
|
Steps:
|
||||||
|
1. Check open Gitea issues assigned to you
|
||||||
|
2. Check open PRs needing review
|
||||||
|
3. Check fleet health status
|
||||||
|
4. Decompose work into parallel streams
|
||||||
|
5. Generate ultraplan_YYYYMMDD.md
|
||||||
|
6. File Gitea issue with the plan
|
||||||
|
|
||||||
|
Output format:
|
||||||
|
- Mission statement
|
||||||
|
- 3-5 work streams with phases
|
||||||
|
- Dependency map
|
||||||
|
- Success metrics
|
||||||
|
"""
|
||||||
Reference in New Issue
Block a user