Compare commits
2 Commits
fix/840
...
feat/822-a
| Author | SHA1 | Date | |
|---|---|---|---|
| 4c8d63a5c9 | |||
| 6bc10419b1 |
257
hermes_cli/a2a_health.py
Normal file
257
hermes_cli/a2a_health.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
A2A Health Monitor — Fleet Agent Heartbeat (#822)
|
||||
|
||||
Pings each fleet agent's A2A endpoint and tracks health status.
|
||||
Persists state to ~/.hermes/a2a_health.json.
|
||||
|
||||
Usage:
|
||||
from hermes_cli.a2a_health import check_fleet_health, check_agent_health
|
||||
|
||||
report = check_fleet_health()
|
||||
for agent in report["agents"]:
|
||||
print(f"{agent['name']}: {agent['status']} ({agent['response_ms']}ms)")
|
||||
"""
|
||||
|
||||
import json
|
||||
import time
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
HERMES_HOME = Path.home() / ".hermes"
|
||||
FLEET_CONFIG = HERMES_HOME / "fleet_agents.json"
|
||||
HEALTH_STATE = HERMES_HOME / "a2a_health.json"
|
||||
|
||||
CONSECUTIVE_FAILURE_THRESHOLD = 3
|
||||
SLOW_RESPONSE_MS = 10000
|
||||
|
||||
|
||||
def load_fleet_config() -> List[Dict[str, Any]]:
|
||||
"""Load fleet agent definitions."""
|
||||
if not FLEET_CONFIG.exists():
|
||||
return []
|
||||
try:
|
||||
with open(FLEET_CONFIG) as f:
|
||||
data = json.load(f)
|
||||
return data.get("agents", [])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
|
||||
def load_health_state() -> Dict[str, Any]:
|
||||
"""Load persisted health state."""
|
||||
if not HEALTH_STATE.exists():
|
||||
return {"agents": {}, "last_check": None}
|
||||
try:
|
||||
with open(HEALTH_STATE) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {"agents": {}, "last_check": None}
|
||||
|
||||
|
||||
def save_health_state(state: Dict[str, Any]):
|
||||
"""Persist health state."""
|
||||
HEALTH_STATE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(HEALTH_STATE, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
|
||||
def ping_agent(base_url: str, timeout: int = 10) -> Dict[str, Any]:
|
||||
"""
|
||||
Ping an agent's A2A endpoint.
|
||||
|
||||
Tries /health first, falls back to /.well-known/agent-card.json.
|
||||
"""
|
||||
start = time.monotonic()
|
||||
endpoints = ["/health", "/.well-known/agent-card.json"]
|
||||
|
||||
for endpoint in endpoints:
|
||||
url = f"{base_url.rstrip('/')}{endpoint}"
|
||||
try:
|
||||
req = urllib.request.Request(url, method="GET")
|
||||
req.add_header("User-Agent", "hermes-a2a-health/1.0")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
elapsed = (time.monotonic() - start) * 1000
|
||||
body = resp.read(1024).decode("utf-8", errors="replace")
|
||||
|
||||
result = {
|
||||
"alive": True,
|
||||
"status_code": resp.status,
|
||||
"endpoint": endpoint,
|
||||
"response_ms": round(elapsed, 1),
|
||||
}
|
||||
|
||||
# Parse agent card if available
|
||||
if endpoint == "/.well-known/agent-card.json":
|
||||
try:
|
||||
card = json.loads(body)
|
||||
result["agent_card"] = {
|
||||
"name": card.get("name", "unknown"),
|
||||
"tools_count": len(card.get("skills", [])),
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
except urllib.error.URLError:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
elapsed = (time.monotonic() - start) * 1000
|
||||
return {
|
||||
"alive": False,
|
||||
"error": "All endpoints unreachable",
|
||||
"response_ms": round(elapsed, 1),
|
||||
}
|
||||
|
||||
|
||||
def check_agent_health(agent: Dict[str, Any], prev_state: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Check health of a single agent."""
|
||||
name = agent.get("name", "unknown")
|
||||
base_url = ""
|
||||
|
||||
# Get URL from agent config
|
||||
interfaces = agent.get("supportedInterfaces", [])
|
||||
if interfaces:
|
||||
base_url = interfaces[0].get("url", "")
|
||||
if not base_url:
|
||||
base_url = agent.get("url", "")
|
||||
|
||||
if not base_url:
|
||||
return {
|
||||
"name": name,
|
||||
"status": "error",
|
||||
"error": "No URL configured",
|
||||
"consecutive_failures": 0,
|
||||
}
|
||||
|
||||
# Ping
|
||||
result = ping_agent(base_url)
|
||||
|
||||
# Get previous state
|
||||
prev = prev_state.get("agents", {}).get(name, {})
|
||||
prev_failures = prev.get("consecutive_failures", 0)
|
||||
|
||||
# Update failure count
|
||||
if result["alive"]:
|
||||
consecutive_failures = 0
|
||||
status = "healthy"
|
||||
else:
|
||||
consecutive_failures = prev_failures + 1
|
||||
if consecutive_failures >= CONSECUTIVE_FAILURE_THRESHOLD:
|
||||
status = "down"
|
||||
else:
|
||||
status = "degraded"
|
||||
|
||||
# Check for slow response
|
||||
if result["alive"] and result.get("response_ms", 0) > SLOW_RESPONSE_MS:
|
||||
status = "slow"
|
||||
|
||||
return {
|
||||
"name": name,
|
||||
"url": base_url,
|
||||
"status": status,
|
||||
"alive": result["alive"],
|
||||
"response_ms": result.get("response_ms"),
|
||||
"endpoint": result.get("endpoint"),
|
||||
"status_code": result.get("status_code"),
|
||||
"agent_card": result.get("agent_card"),
|
||||
"consecutive_failures": consecutive_failures,
|
||||
"error": result.get("error"),
|
||||
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
|
||||
|
||||
def check_fleet_health(
|
||||
agent_name: Optional[str] = None,
|
||||
timeout: int = 10,
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Check health of all (or one) fleet agent.
|
||||
|
||||
Returns report dict with agents list and summary.
|
||||
"""
|
||||
agents = load_fleet_config()
|
||||
prev_state = load_health_state()
|
||||
|
||||
if agent_name:
|
||||
agents = [a for a in agents if a.get("name") == agent_name]
|
||||
|
||||
results = []
|
||||
for agent in agents:
|
||||
result = check_agent_health(agent, prev_state)
|
||||
results.append(result)
|
||||
|
||||
# Update persisted state
|
||||
new_state = {
|
||||
"agents": {r["name"]: r for r in results},
|
||||
"last_check": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
save_health_state(new_state)
|
||||
|
||||
# Summary
|
||||
healthy = sum(1 for r in results if r["status"] == "healthy")
|
||||
degraded = sum(1 for r in results if r["status"] == "degraded")
|
||||
slow = sum(1 for r in results if r["status"] == "slow")
|
||||
down = sum(1 for r in results if r["status"] in ("down", "error"))
|
||||
|
||||
return {
|
||||
"agents": results,
|
||||
"summary": {
|
||||
"total": len(results),
|
||||
"healthy": healthy,
|
||||
"degraded": degraded,
|
||||
"slow": slow,
|
||||
"down": down,
|
||||
"all_healthy": down == 0 and degraded == 0,
|
||||
},
|
||||
"checked_at": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
}
|
||||
|
||||
|
||||
def format_health_dashboard(report: Dict[str, Any]) -> str:
|
||||
"""Format health report as text dashboard."""
|
||||
lines = []
|
||||
summary = report["summary"]
|
||||
|
||||
# Header
|
||||
if summary["all_healthy"]:
|
||||
lines.append("\u2705 All fleet agents healthy")
|
||||
elif summary["down"] > 0:
|
||||
lines.append(f"\u274c {summary['down']} agent(s) DOWN")
|
||||
else:
|
||||
lines.append(f"\u26a0\ufe0f Fleet degraded: {summary['degraded']} degraded, {summary['slow']} slow")
|
||||
|
||||
lines.append(f"Checked: {report['checked_at']}")
|
||||
lines.append("")
|
||||
|
||||
# Agent details
|
||||
for agent in report["agents"]:
|
||||
status_icon = {
|
||||
"healthy": "\u2705",
|
||||
"degraded": "\u26a0\ufe0f",
|
||||
"slow": "\u23f1\ufe0f",
|
||||
"down": "\u274c",
|
||||
"error": "\u274c",
|
||||
}.get(agent["status"], "\u2753")
|
||||
|
||||
name = agent["name"]
|
||||
ms = agent.get("response_ms", "?")
|
||||
failures = agent.get("consecutive_failures", 0)
|
||||
|
||||
line = f" {status_icon} {name}"
|
||||
if agent.get("alive"):
|
||||
line += f" — {ms}ms"
|
||||
if agent.get("agent_card"):
|
||||
tools = agent["agent_card"].get("tools_count", 0)
|
||||
line += f" — {tools} tools"
|
||||
else:
|
||||
line += f" — {agent.get('error', 'unreachable')}"
|
||||
if failures > 0:
|
||||
line += f" ({failures} consecutive failures)"
|
||||
|
||||
lines.append(line)
|
||||
|
||||
return "\n".join(lines)
|
||||
80
tests/test_a2a_health.py
Normal file
80
tests/test_a2a_health.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Tests for A2A health monitor (#822)."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from hermes_cli.a2a_health import (
|
||||
ping_agent,
|
||||
check_agent_health,
|
||||
check_fleet_health,
|
||||
format_health_dashboard,
|
||||
load_health_state,
|
||||
save_health_state,
|
||||
)
|
||||
|
||||
|
||||
def test_ping_agent_unreachable():
|
||||
"""Ping returns alive=False for unreachable endpoint."""
|
||||
result = ping_agent("http://192.0.2.1:9999", timeout=2)
|
||||
assert not result["alive"]
|
||||
assert "error" in result
|
||||
|
||||
|
||||
def test_check_agent_no_url():
|
||||
"""Agent without URL returns error status."""
|
||||
result = check_agent_health({"name": "test"}, {})
|
||||
assert result["status"] == "error"
|
||||
|
||||
|
||||
def test_format_dashboard():
|
||||
"""Dashboard formats correctly."""
|
||||
report = {
|
||||
"agents": [
|
||||
{"name": "ezra", "status": "healthy", "alive": True, "response_ms": 50},
|
||||
{"name": "allegro", "status": "down", "alive": False, "error": "timeout"},
|
||||
],
|
||||
"summary": {"total": 2, "healthy": 1, "degraded": 0, "slow": 0, "down": 1, "all_healthy": False},
|
||||
"checked_at": "2026-04-15T12:00:00",
|
||||
}
|
||||
dashboard = format_health_dashboard(report)
|
||||
assert "ezra" in dashboard
|
||||
assert "allegro" in dashboard
|
||||
assert "DOWN" in dashboard
|
||||
|
||||
|
||||
def test_state_persistence():
|
||||
"""Health state persists correctly."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
state_file = Path(tmpdir) / "health.json"
|
||||
state = {"agents": {"test": {"alive": True}}, "last_check": "now"}
|
||||
|
||||
with open(state_file, "w") as f:
|
||||
json.dump(state, f)
|
||||
|
||||
with open(state_file) as f:
|
||||
loaded = json.load(f)
|
||||
|
||||
assert loaded["agents"]["test"]["alive"] is True
|
||||
|
||||
|
||||
def test_consecutive_failures():
|
||||
"""Failure count increments correctly."""
|
||||
prev = {"agents": {"test": {"consecutive_failures": 2}}}
|
||||
agent = {"name": "test", "url": "http://192.0.2.1:9999"}
|
||||
result = check_agent_health(agent, prev)
|
||||
assert result["consecutive_failures"] == 3
|
||||
assert result["status"] == "down"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [test_ping_agent_unreachable, test_check_agent_no_url,
|
||||
test_format_dashboard, test_state_persistence, test_consecutive_failures]
|
||||
for t in tests:
|
||||
print(f"Running {t.__name__}...")
|
||||
t()
|
||||
print(" PASS")
|
||||
print("\nAll tests passed.")
|
||||
@@ -1,137 +0,0 @@
|
||||
"""Tests for Ultraplan Mode — Issue #840."""
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from tools.ultraplan import (
|
||||
Phase, Stream, Ultraplan,
|
||||
create_ultraplan, save_ultraplan, load_ultraplan,
|
||||
generate_daily_cron_prompt
|
||||
)
|
||||
|
||||
|
||||
class TestPhase:
|
||||
def test_creation(self):
|
||||
phase = Phase(id="A1", name="Setup", artifact="config.yaml")
|
||||
assert phase.id == "A1"
|
||||
assert phase.status == "pending"
|
||||
|
||||
def test_dependencies(self):
|
||||
phase = Phase(id="A2", name="Build", dependencies=["A1"])
|
||||
assert "A1" in phase.dependencies
|
||||
|
||||
|
||||
class TestStream:
|
||||
def test_progress_empty(self):
|
||||
stream = Stream(id="A", name="Stream A")
|
||||
assert stream.progress == 0.0
|
||||
|
||||
def test_progress_partial(self):
|
||||
stream = Stream(id="A", name="Stream A", phases=[
|
||||
Phase(id="A1", name="P1", status="done"),
|
||||
Phase(id="A2", name="P2", status="pending"),
|
||||
])
|
||||
assert stream.progress == 0.5
|
||||
|
||||
def test_current_phase(self):
|
||||
stream = Stream(id="A", name="Stream A", phases=[
|
||||
Phase(id="A1", name="P1", status="done"),
|
||||
Phase(id="A2", name="P2", status="active"),
|
||||
Phase(id="A3", name="P3", status="pending"),
|
||||
])
|
||||
assert stream.current_phase.id == "A2"
|
||||
|
||||
|
||||
class TestUltraplan:
|
||||
def test_to_markdown(self):
|
||||
plan = Ultraplan(
|
||||
date="20260415",
|
||||
mission="Test mission",
|
||||
streams=[
|
||||
Stream(id="A", name="Stream A", phases=[
|
||||
Phase(id="A1", name="Phase 1", artifact="file.txt"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
md = plan.to_markdown()
|
||||
assert "# Ultraplan: 20260415" in md
|
||||
assert "Test mission" in md
|
||||
assert "Stream A" in md
|
||||
|
||||
def test_progress(self):
|
||||
plan = Ultraplan(
|
||||
date="20260415",
|
||||
mission="Test",
|
||||
streams=[
|
||||
Stream(id="A", name="A", status="done", phases=[
|
||||
Phase(id="A1", name="P1", status="done"),
|
||||
]),
|
||||
Stream(id="B", name="B", status="pending", phases=[
|
||||
Phase(id="B1", name="P1", status="pending"),
|
||||
]),
|
||||
],
|
||||
)
|
||||
assert plan.progress == 0.5
|
||||
|
||||
def test_to_dict(self):
|
||||
plan = Ultraplan(date="20260415", mission="Test")
|
||||
d = plan.to_dict()
|
||||
assert d["date"] == "20260415"
|
||||
assert d["mission"] == "Test"
|
||||
|
||||
|
||||
class TestCreateUltraplan:
|
||||
def test_default_date(self):
|
||||
plan = create_ultraplan(mission="Test")
|
||||
assert len(plan.date) == 8 # YYYYMMDD
|
||||
|
||||
def test_with_streams(self):
|
||||
plan = create_ultraplan(
|
||||
mission="Test",
|
||||
streams=[
|
||||
{
|
||||
"id": "A",
|
||||
"name": "Stream A",
|
||||
"phases": [
|
||||
{"id": "A1", "name": "Setup", "artifact": "config.yaml"},
|
||||
{"id": "A2", "name": "Build", "dependencies": ["A1"]},
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
assert len(plan.streams) == 1
|
||||
assert len(plan.streams[0].phases) == 2
|
||||
assert plan.streams[0].phases[1].dependencies == ["A1"]
|
||||
|
||||
|
||||
class TestSaveLoad:
|
||||
def test_roundtrip(self, tmp_path):
|
||||
plan = create_ultraplan(
|
||||
date="20260415",
|
||||
mission="Test roundtrip",
|
||||
streams=[{"id": "A", "name": "Stream A"}],
|
||||
)
|
||||
|
||||
save_ultraplan(plan, base_dir=tmp_path)
|
||||
loaded = load_ultraplan("20260415", base_dir=tmp_path)
|
||||
|
||||
assert loaded is not None
|
||||
assert loaded.date == "20260415"
|
||||
assert loaded.mission == "Test roundtrip"
|
||||
|
||||
def test_nonexistent_returns_none(self, tmp_path):
|
||||
assert load_ultraplan("99999999", base_dir=tmp_path) is None
|
||||
|
||||
|
||||
class TestCronPrompt:
|
||||
def test_has_required_elements(self):
|
||||
prompt = generate_daily_cron_prompt()
|
||||
assert "Ultraplan" in prompt
|
||||
assert "streams" in prompt.lower()
|
||||
assert "Gitea" in prompt
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,310 +0,0 @@
|
||||
"""Ultraplan Mode — Daily autonomous planning and execution discipline.
|
||||
|
||||
Decomposes assigned tasks into parallel work streams with explicit
|
||||
dependencies, phases, and artifact targets.
|
||||
|
||||
Issue #840: Ultraplan Mode: Daily autonomous planning and execution
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class Phase:
|
||||
"""A single phase within a work stream."""
|
||||
id: str
|
||||
name: str
|
||||
description: str = ""
|
||||
status: str = "pending" # pending, active, done, blocked
|
||||
artifact: str = "" # Expected deliverable
|
||||
dependencies: List[str] = field(default_factory=list)
|
||||
started_at: Optional[float] = None
|
||||
completed_at: Optional[float] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class Stream:
|
||||
"""A parallel work stream with sequential phases."""
|
||||
id: str
|
||||
name: str
|
||||
phases: List[Phase] = field(default_factory=list)
|
||||
status: str = "pending"
|
||||
|
||||
@property
|
||||
def current_phase(self) -> Optional[Phase]:
|
||||
for p in self.phases:
|
||||
if p.status in ("active", "pending"):
|
||||
return p
|
||||
return None
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
if not self.phases:
|
||||
return 0.0
|
||||
done = sum(1 for p in self.phases if p.status == "done")
|
||||
return done / len(self.phases)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Ultraplan:
|
||||
"""Daily ultraplan with work streams and metrics."""
|
||||
date: str
|
||||
mission: str
|
||||
streams: List[Stream] = field(default_factory=list)
|
||||
metrics: Dict[str, Any] = field(default_factory=dict)
|
||||
notes: str = ""
|
||||
created_at: float = field(default_factory=time.time)
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
if not self.streams:
|
||||
return 0.0
|
||||
return sum(s.progress for s in self.streams) / len(self.streams)
|
||||
|
||||
@property
|
||||
def active_streams(self) -> List[Stream]:
|
||||
return [s for s in self.streams if s.status == "active"]
|
||||
|
||||
@property
|
||||
def blocked_streams(self) -> List[Stream]:
|
||||
return [s for s in self.streams if s.status == "blocked"]
|
||||
|
||||
def to_markdown(self) -> str:
|
||||
"""Generate ultraplan markdown document."""
|
||||
lines = []
|
||||
|
||||
# Header
|
||||
lines.append(f"# Ultraplan: {self.date}")
|
||||
lines.append("")
|
||||
lines.append(f"**Mission:** {self.mission}")
|
||||
lines.append(f"**Created:** {datetime.fromtimestamp(self.created_at, tz=timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
|
||||
lines.append(f"**Progress:** {self.progress:.0%}")
|
||||
lines.append("")
|
||||
|
||||
# Metrics
|
||||
if self.metrics:
|
||||
lines.append("## Metrics")
|
||||
for key, value in self.metrics.items():
|
||||
lines.append(f"- **{key}:** {value}")
|
||||
lines.append("")
|
||||
|
||||
# Streams
|
||||
lines.append("## Work Streams")
|
||||
lines.append("")
|
||||
|
||||
for stream in self.streams:
|
||||
status_icon = {"pending": "○", "active": "●", "done": "✓", "blocked": "✗"}.get(stream.status, "?")
|
||||
lines.append(f"### {status_icon} Stream {stream.id}: {stream.name}")
|
||||
lines.append(f"**Status:** {stream.status} | **Progress:** {stream.progress:.0%}")
|
||||
lines.append("")
|
||||
|
||||
# Phase table
|
||||
lines.append("| Phase | Name | Status | Artifact |")
|
||||
lines.append("|-------|------|--------|----------|")
|
||||
for phase in stream.phases:
|
||||
p_icon = {"pending": "○", "active": "●", "done": "✓", "blocked": "✗"}.get(phase.status, "?")
|
||||
artifact = phase.artifact or "—"
|
||||
lines.append(f"| {phase.id} | {phase.name} | {p_icon} {phase.status} | {artifact} |")
|
||||
lines.append("")
|
||||
|
||||
# Dependency map
|
||||
lines.append("## Dependency Map")
|
||||
lines.append("")
|
||||
for stream in self.streams:
|
||||
deps = []
|
||||
for phase in stream.phases:
|
||||
if phase.dependencies:
|
||||
deps.append(f"{phase.id} depends on: {', '.join(phase.dependencies)}")
|
||||
if deps:
|
||||
lines.append(f"**{stream.id}:** {'; '.join(deps)}")
|
||||
|
||||
if not any(p.dependencies for s in self.streams for p in s.phases):
|
||||
lines.append("All streams are independent — parallel execution possible.")
|
||||
lines.append("")
|
||||
|
||||
# Notes
|
||||
if self.notes:
|
||||
lines.append("## Notes")
|
||||
lines.append(self.notes)
|
||||
lines.append("")
|
||||
|
||||
# Footer
|
||||
lines.append("---")
|
||||
lines.append(f"*Generated by Ultraplan Mode — {datetime.now().strftime('%Y-%m-%d %H:%M')}*")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to JSON-serializable dict."""
|
||||
return {
|
||||
"date": self.date,
|
||||
"mission": self.mission,
|
||||
"streams": [
|
||||
{
|
||||
"id": s.id,
|
||||
"name": s.name,
|
||||
"status": s.status,
|
||||
"phases": [
|
||||
{
|
||||
"id": p.id,
|
||||
"name": p.name,
|
||||
"description": p.description,
|
||||
"status": p.status,
|
||||
"artifact": p.artifact,
|
||||
"dependencies": p.dependencies,
|
||||
}
|
||||
for p in s.phases
|
||||
],
|
||||
}
|
||||
for s in self.streams
|
||||
],
|
||||
"metrics": self.metrics,
|
||||
"notes": self.notes,
|
||||
"progress": self.progress,
|
||||
"created_at": self.created_at,
|
||||
}
|
||||
|
||||
|
||||
def create_ultraplan(
|
||||
date: str = None,
|
||||
mission: str = "",
|
||||
streams: List[Dict[str, Any]] = None,
|
||||
) -> Ultraplan:
|
||||
"""Create a new ultraplan.
|
||||
|
||||
Args:
|
||||
date: Plan date (default: today)
|
||||
mission: High-level mission statement
|
||||
streams: List of stream definitions
|
||||
"""
|
||||
if date is None:
|
||||
date = datetime.now().strftime("%Y%m%d")
|
||||
|
||||
plan_streams = []
|
||||
if streams:
|
||||
for s in streams:
|
||||
phases = [
|
||||
Phase(
|
||||
id=p.get("id", f"{s.get('id', 'S')}{i+1}"),
|
||||
name=p.get("name", f"Phase {i+1}"),
|
||||
description=p.get("description", ""),
|
||||
artifact=p.get("artifact", ""),
|
||||
dependencies=p.get("dependencies", []),
|
||||
)
|
||||
for i, p in enumerate(s.get("phases", []))
|
||||
]
|
||||
plan_streams.append(Stream(
|
||||
id=s.get("id", f"S{len(plan_streams)+1}"),
|
||||
name=s.get("name", "Unnamed Stream"),
|
||||
phases=phases,
|
||||
))
|
||||
|
||||
return Ultraplan(
|
||||
date=date,
|
||||
mission=mission,
|
||||
streams=plan_streams,
|
||||
)
|
||||
|
||||
|
||||
def save_ultraplan(plan: Ultraplan, base_dir: Path = None) -> Path:
|
||||
"""Save ultraplan to disk.
|
||||
|
||||
Args:
|
||||
plan: The ultraplan to save
|
||||
base_dir: Base directory (default: ~/.timmy/cron/)
|
||||
|
||||
Returns:
|
||||
Path to saved file
|
||||
"""
|
||||
if base_dir is None:
|
||||
base_dir = Path.home() / ".timmy" / "cron"
|
||||
|
||||
base_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Save markdown
|
||||
md_path = base_dir / f"ultraplan_{plan.date}.md"
|
||||
md_path.write_text(plan.to_markdown(), encoding="utf-8")
|
||||
|
||||
# Save JSON (for programmatic access)
|
||||
json_path = base_dir / f"ultraplan_{plan.date}.json"
|
||||
json_path.write_text(json.dumps(plan.to_dict(), indent=2), encoding="utf-8")
|
||||
|
||||
return md_path
|
||||
|
||||
|
||||
def load_ultraplan(date: str, base_dir: Path = None) -> Optional[Ultraplan]:
|
||||
"""Load ultraplan from disk.
|
||||
|
||||
Args:
|
||||
date: Plan date (YYYYMMDD)
|
||||
base_dir: Base directory (default: ~/.timmy/cron/)
|
||||
|
||||
Returns:
|
||||
Ultraplan if found, None otherwise
|
||||
"""
|
||||
if base_dir is None:
|
||||
base_dir = Path.home() / ".timmy" / "cron"
|
||||
|
||||
json_path = base_dir / f"ultraplan_{date}.json"
|
||||
if not json_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
data = json.loads(json_path.read_text(encoding="utf-8"))
|
||||
|
||||
streams = []
|
||||
for s in data.get("streams", []):
|
||||
phases = [
|
||||
Phase(
|
||||
id=p["id"],
|
||||
name=p["name"],
|
||||
description=p.get("description", ""),
|
||||
status=p.get("status", "pending"),
|
||||
artifact=p.get("artifact", ""),
|
||||
dependencies=p.get("dependencies", []),
|
||||
)
|
||||
for p in s.get("phases", [])
|
||||
]
|
||||
streams.append(Stream(
|
||||
id=s["id"],
|
||||
name=s["name"],
|
||||
phases=phases,
|
||||
status=s.get("status", "pending"),
|
||||
))
|
||||
|
||||
return Ultraplan(
|
||||
date=data["date"],
|
||||
mission=data.get("mission", ""),
|
||||
streams=streams,
|
||||
metrics=data.get("metrics", {}),
|
||||
notes=data.get("notes", ""),
|
||||
created_at=data.get("created_at", time.time()),
|
||||
)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def generate_daily_cron_prompt() -> str:
|
||||
"""Generate the prompt for the daily ultraplan cron job."""
|
||||
return """Generate today's Ultraplan.
|
||||
|
||||
Steps:
|
||||
1. Check open Gitea issues assigned to you
|
||||
2. Check open PRs needing review
|
||||
3. Check fleet health status
|
||||
4. Decompose work into parallel streams
|
||||
5. Generate ultraplan_YYYYMMDD.md
|
||||
6. File Gitea issue with the plan
|
||||
|
||||
Output format:
|
||||
- Mission statement
|
||||
- 3-5 work streams with phases
|
||||
- Dependency map
|
||||
- Success metrics
|
||||
"""
|
||||
Reference in New Issue
Block a user