Compare commits

...

3 Commits

Author SHA1 Message Date
Alexander Whitestone
b39aee90b4 fix(pipelines): resume checkpoints and atomically claim jobs (#621)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 28s
Smoke Test / smoke (pull_request) Failing after 26s
Validate Config / YAML Lint (pull_request) Failing after 18s
Validate Config / JSON Validate (pull_request) Successful in 22s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m3s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m6s
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Playbook Schema Validation (pull_request) Successful in 30s
Architecture Lint / Lint Repository (pull_request) Failing after 24s
PR Checklist / pr-checklist (pull_request) Successful in 6m22s
Load checkpoints from the checkpoint table when resuming paused jobs and
atomically claim pending work so parallel runs do not process the same job twice.
2026-04-22 11:33:03 -04:00
ae8c1d46ae Merge pull request 'feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers' (#857) from fix/407 into main
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 28s
Smoke Test / smoke (push) Failing after 21s
Validate Config / YAML Lint (push) Failing after 9s
Validate Config / JSON Validate (push) Successful in 12s
Validate Config / Python Syntax & Import Check (push) Failing after 35s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 38s
Validate Config / Cron Syntax Check (push) Successful in 7s
Validate Config / Deploy Script Dry Run (push) Successful in 7s
Validate Config / Playbook Schema Validation (push) Successful in 16s
Architecture Lint / Lint Repository (push) Failing after 20s
2026-04-22 07:36:26 +00:00
Alexander Whitestone
508441acb4 feat(#407): Phase progression tracker with auto-eval, Telegram daily post, and blockers
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 25s
Smoke Test / smoke (pull_request) Failing after 23s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 19s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m2s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m6s
Validate Config / Cron Syntax Check (pull_request) Successful in 14s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 14s
Validate Config / Playbook Schema Validation (pull_request) Successful in 28s
Architecture Lint / Lint Repository (pull_request) Failing after 27s
PR Checklist / pr-checklist (pull_request) Failing after 11m41s
2026-04-22 03:34:36 -04:00
2 changed files with 382 additions and 128 deletions

View File

@@ -235,6 +235,33 @@ class OrchestratorDB:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
return conn
def _job_from_row(self, row: sqlite3.Row) -> Job:
"""Hydrate a Job from a DB row, loading checkpoints from either storage path."""
checkpoint = None
if row['checkpoint']:
checkpoint = JobCheckpoint.from_dict(json.loads(row['checkpoint']))
else:
checkpoint = self.get_checkpoint(row['id'])
return Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=checkpoint,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
def save_job(self, job: Job):
"""Save or update a job."""
@@ -265,24 +292,7 @@ class OrchestratorDB:
if not row:
return None
return Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
return self._job_from_row(row)
def get_next_job(self, pipeline: Optional[str] = None) -> Optional[Job]:
"""Get next pending job (highest priority first)."""
@@ -303,24 +313,34 @@ class OrchestratorDB:
if not row:
return None
return Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
return self._job_from_row(row)
def claim_next_job(self, pipeline: Optional[str] = None) -> Optional[Job]:
"""Atomically claim the next pending job for execution."""
conn = self._get_conn()
try:
conn.execute("BEGIN IMMEDIATE")
query = "SELECT id FROM jobs WHERE status = 'pending'"
params = []
if pipeline:
query += " AND pipeline = ?"
params.append(pipeline)
query += " ORDER BY priority DESC, created_at ASC LIMIT 1"
row = conn.execute(query, params).fetchone()
if not row:
conn.commit()
return None
started_at = time.time()
conn.execute(
"UPDATE jobs SET status = ?, started_at = ? WHERE id = ?",
(JobStatus.RUNNING.value, started_at, row['id'])
)
claimed = conn.execute("SELECT * FROM jobs WHERE id = ?", (row['id'],)).fetchone()
conn.commit()
return self._job_from_row(claimed)
finally:
conn.close()
def get_jobs_by_status(self, status: JobStatus, pipeline: Optional[str] = None) -> List[Job]:
"""Get all jobs with given status."""
@@ -338,27 +358,7 @@ class OrchestratorDB:
rows = conn.execute(query, params).fetchall()
conn.close()
return [
Job(
id=row['id'],
pipeline=row['pipeline'],
task=json.loads(row['task']),
status=JobStatus(row['status']),
priority=JobPriority(row['priority']),
token_budget=row['token_budget'],
token_usage=TokenUsage.from_dict(json.loads(row['token_usage'])),
retry_count=row['retry_count'],
max_retries=row['max_retries'],
created_at=row['created_at'],
started_at=row['started_at'],
completed_at=row['completed_at'],
error=row['error'],
result=json.loads(row['result']) if row['result'] else None,
checkpoint=JobCheckpoint.from_dict(json.loads(row['checkpoint'])) if row['checkpoint'] else None,
metadata=json.loads(row['metadata']) if row['metadata'] else {}
)
for row in rows
]
return [self._job_from_row(row) for row in rows]
def save_checkpoint(self, job_id: str, checkpoint: JobCheckpoint):
"""Save a checkpoint for a job."""
@@ -612,7 +612,7 @@ class PipelineOrchestrator:
continue
# Get next job
job = self.db.get_next_job(pipeline)
job = self.db.claim_next_job(pipeline)
if not job:
if not futures:

View File

@@ -4,111 +4,365 @@
Part of the Gemini Sovereign Infrastructure Suite.
Tracks the fleet's progress through the Paperclips-inspired evolution arc.
Usage:
python3 scripts/phase_tracker.py status # Show current state
python3 scripts/phase_tracker.py evaluate # Auto-evaluate checkable milestones
python3 scripts/phase_tracker.py complete M4 # Mark milestone complete
python3 scripts/phase_tracker.py telegram # Post daily update to Telegram
python3 scripts/phase_tracker.py daily # evaluate + telegram
"""
import os
import sys
import json
import re
import argparse
import urllib.request
import subprocess
from pathlib import Path
from datetime import datetime, timezone, timedelta
MILESTONES_FILE = "fleet/milestones.md"
COMPLETED_FILE = "fleet/completed_milestones.json"
LOG_DIR = Path(os.path.expanduser("~/.local/timmy/fleet-health"))
UPTIME_FILE = LOG_DIR / "uptime.json"
TELEGRAM_TOKEN_PATHS = [
Path.home() / ".config" / "timmy" / "telegram_bot_token",
Path.home() / ".hermes" / "telegram_bot_token",
Path.home() / ".hermes" / "telegram_token",
]
TELEGRAM_CHAT = os.environ.get("TELEGRAM_HOME_CHANNEL", "-1003664764329")
HOSTS = {
"ezra": {"ip": "143.198.27.163"},
"allegro": {"ip": "167.99.126.228"},
"bezalel": {"ip": "159.203.146.185"},
}
def _find_repo_root() -> Path:
script_dir = Path(__file__).resolve().parent
return script_dir.parent
def _read_token() -> str | None:
for p in TELEGRAM_TOKEN_PATHS:
if p.exists():
return p.read_text().strip()
return os.environ.get("TELEGRAM_BOT_TOKEN") or None
def telegram_send(text: str) -> bool:
token = _read_token()
if not token:
print("[WARN] No Telegram token found.", file=sys.stderr)
return False
url = f"https://api.telegram.org/bot{token}/sendMessage"
body = json.dumps({"chat_id": TELEGRAM_CHAT, "text": text, "parse_mode": "HTML"}).encode()
req = urllib.request.Request(url, data=body, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status == 200
except Exception as e:
print(f"[WARN] Telegram send failed: {e}", file=sys.stderr)
return False
class Milestone:
def __init__(self, m_id: str, title: str, trigger: str, message: str):
self.id = m_id
self.title = title
self.trigger = trigger
self.message = message
class Phase:
def __init__(self, name: str, number: int, unlock_condition: str | None):
self.name = name
self.number = number
self.unlock_condition = unlock_condition
self.milestones: list[Milestone] = []
class PhaseTracker:
def __init__(self):
# Find files relative to repo root
script_dir = os.path.dirname(os.path.abspath(__file__))
repo_root = os.path.dirname(script_dir)
self.milestones_path = os.path.join(repo_root, MILESTONES_FILE)
self.completed_path = os.path.join(repo_root, COMPLETED_FILE)
self.milestones = self.parse_milestones()
self.completed = self.load_completed()
self.repo_root = _find_repo_root()
self.milestones_path = self.repo_root / MILESTONES_FILE
self.completed_path = self.repo_root / COMPLETED_FILE
self.phases: list[Phase] = self._parse_milestones()
self.completed: set[str] = self._load_completed()
def _parse_milestones(self) -> list[Phase]:
if not self.milestones_path.exists():
return []
content = self.milestones_path.read_text()
phases: list[Phase] = []
current_phase: Phase | None = None
for line in content.splitlines():
phase_match = re.match(r"##\s*Phase\s*(\d+):\s*(.+?)\s*(?:\(([^)]+)\))?\s*$", line)
if phase_match:
num = int(phase_match.group(1))
name = phase_match.group(2).strip()
unlock = phase_match.group(3)
current_phase = Phase(name, num, unlock)
phases.append(current_phase)
continue
m_match = re.match(r"###\s*(M\d+):\s*(.+)$", line)
if m_match and current_phase is not None:
m_id = m_match.group(1)
title = m_match.group(2).strip()
current_phase.milestones.append(Milestone(m_id, title, "", ""))
continue
if line.startswith("**Trigger:**") and current_phase and current_phase.milestones:
current_phase.milestones[-1].trigger = line.replace("**Trigger:**", "").strip()
continue
if line.startswith("**Message:**") and current_phase and current_phase.milestones:
current_phase.milestones[-1].message = line.replace("**Message:**", "").strip().strip('"')
continue
def parse_milestones(self):
if not os.path.exists(self.milestones_path):
return {}
with open(self.milestones_path, "r") as f:
content = f.read()
phases = {}
current_phase = None
for line in content.split("\n"):
if line.startswith("## Phase"):
current_phase = line.replace("## ", "").strip()
phases[current_phase] = []
elif line.startswith("### M"):
m_id = line.split(":")[0].replace("### ", "").strip()
title = line.split(":")[1].strip()
phases[current_phase].append({"id": m_id, "title": title})
return phases
def load_completed(self):
if os.path.exists(self.completed_path):
with open(self.completed_path, "r") as f:
try:
return json.load(f)
except:
return []
return []
def _load_completed(self) -> set[str]:
if self.completed_path.exists():
try:
data = json.loads(self.completed_path.read_text())
if isinstance(data, list):
return set(data)
except Exception:
pass
return set()
def save_completed(self):
with open(self.completed_path, "w") as f:
json.dump(self.completed, f, indent=2)
self.completed_path.write_text(json.dumps(sorted(self.completed), indent=2))
def show_progress(self):
print("--- Fleet Phase Progression Tracker ---")
total_milestones = 0
total_completed = 0
if not self.milestones:
print("[ERROR] No milestones found in fleet/milestones.md")
return
for phase, ms in self.milestones.items():
print(f"\n{phase}")
for m in ms:
total_milestones += 1
done = m["id"] in self.completed
if done:
total_completed += 1
status = "" if done else ""
print(f" {status} {m['id']}: {m['title']}")
percent = (total_completed / total_milestones) * 100 if total_milestones > 0 else 0
print(f"\nOverall Progress: {total_completed}/{total_milestones} ({percent:.1f}%)")
def mark_complete(self, m_id: str):
def mark_complete(self, m_id: str) -> bool:
m_id = m_id.upper()
exists = any(m.id == m_id for p in self.phases for m in p.milestones)
if not exists:
print(f"[ERROR] Unknown milestone: {m_id}")
return False
if m_id not in self.completed:
self.completed.append(m_id)
self.completed.add(m_id)
self.save_completed()
print(f"[SUCCESS] Marked {m_id} as complete.")
return True
print(f"[INFO] {m_id} is already complete.")
return True
def _get_phase_state(self) -> tuple[int, float, list[str], list[str]]:
"""Returns (current_phase_number, decimal_progress, blockers, next_milestones)."""
blockers = []
next_milestones = []
for phase in self.phases:
phase_completed = sum(1 for m in phase.milestones if m.id in self.completed)
phase_total = len(phase.milestones)
if phase_total == 0:
continue
if phase_completed < phase_total:
progress = phase_completed / phase_total
decimal = phase.number + progress
# Find next incomplete milestone
for m in phase.milestones:
if m.id not in self.completed:
next_milestones.append(f"{m.id}: {m.title}")
if m.trigger:
blockers.append(f"{m.id}: {m.trigger}")
break
# Phase unlock condition as blocker if near end
if phase_completed == phase_total - 1 and phase.unlock_condition:
blockers.append(f"Unlock Phase {phase.number + 1}: {phase.unlock_condition}")
return phase.number, decimal, blockers, next_milestones
# All done
last = self.phases[-1] if self.phases else None
if last:
return last.number, float(last.number) + 1.0, ["All phases complete."], []
return 0, 0.0, ["No milestones defined."], []
def show_progress(self):
phase_num, decimal, blockers, next_ms = self._get_phase_state()
total_ms = sum(len(p.milestones) for p in self.phases)
total_completed = len(self.completed)
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
print("=" * 50)
print(" Fleet Phase Progression Tracker")
print("=" * 50)
print(f"\nCurrent Phase: Phase {phase_num}{self.phases[phase_num - 1].name if phase_num <= len(self.phases) else 'Complete'}")
print(f"Decimal Progress: Phase {decimal:.1f}")
print(f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)")
print("\n--- Milestones ---")
for phase in self.phases:
done = sum(1 for m in phase.milestones if m.id in self.completed)
total = len(phase.milestones)
status = "" if done == total else ""
print(f"\n{status} Phase {phase.number}: {phase.name} ({done}/{total})")
for m in phase.milestones:
mark = "" if m.id in self.completed else ""
print(f" {mark} {m.id}: {m.title}")
print("\n--- Next Up ---")
for nm in next_ms[:3]:
print(f"{nm}")
print("\n--- Blockers ---")
for b in blockers[:5]:
print(f" ⚠️ {b}")
if not blockers:
print(" 🚀 Nothing blocking.")
print()
def summary_text(self) -> str:
phase_num, decimal, blockers, next_ms = self._get_phase_state()
total_ms = sum(len(p.milestones) for p in self.phases)
total_completed = len(self.completed)
overall_pct = (total_completed / total_ms * 100) if total_ms else 0
phase_name = self.phases[phase_num - 1].name if phase_num <= len(self.phases) else "Complete"
next_phase = phase_num + 1 if phase_num < len(self.phases) else phase_num
progress_to_next = (decimal - phase_num) * 100
lines = [
f"Fleet: Phase {decimal:.1f} ({progress_to_next:.0f}% to Phase {next_phase})",
f"Phase: {phase_num}{phase_name}",
f"Overall: {total_completed}/{total_ms} milestones ({overall_pct:.1f}%)",
]
if next_ms:
lines.append(f"Next: {next_ms[0]}")
if blockers and blockers[0] != "All phases complete.":
lines.append(f"Blocker: {blockers[0]}")
return "\n".join(lines)
# === Auto-evaluation heuristics ===
def _eval_file_exists(self, path: str) -> bool:
return (self.repo_root / path).exists()
def _eval_command(self, cmd: str) -> bool:
try:
result = subprocess.run(cmd, shell=True, capture_output=True, timeout=10)
return result.returncode == 0
except Exception:
return False
def _eval_uptime(self, target: float) -> bool:
if not UPTIME_FILE.exists():
return False
try:
data = json.loads(UPTIME_FILE.read_text())
uptime = data.get("uptime_30d_percent", 0.0)
return uptime >= target
except Exception:
return False
def _eval_local_model_multi(self) -> bool:
count = 0
for host in HOSTS:
if self._eval_command(f"ssh -o ConnectTimeout=5 {host} 'pgrep -f ollama >/dev/null 2>&1'"):
count += 1
return count >= 2
def _eval_zero_manual_restarts(self, days: int = 7) -> bool:
log = LOG_DIR / "auto_restart.log"
if not log.exists():
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
try:
with open(log) as f:
for line in f:
if "manual restart" in line.lower():
# crude timestamp parse
try:
ts = datetime.fromisoformat(line[:19])
if ts > cutoff:
return False
except Exception:
continue
return True
except Exception:
return False
def evaluate(self):
"""Auto-check milestones where we have heuristics."""
print("[EVAL] Running automatic milestone checks...\n")
checks = [
("M1", self._eval_command, "python3 fleet/health_check.py --dry-run 2>/dev/null || python3 fleet/health_check.py 2>&1 | head -1 >/dev/null"),
("M2", self._eval_command, "test -f ~/.local/timmy/fleet-health/auto_restart.log && grep -q 'restarted' ~/.local/timmy/fleet-health/auto_restart.log"),
("M3", self._eval_command, "test -d ~/.local/timmy/backups && ls ~/.local/timmy/backups | grep -q ."),
("M4", self._eval_uptime, 95.0),
("M5", self._eval_uptime, 97.0),
("M6", self._eval_zero_manual_restarts, 7),
("M9", self._eval_uptime, 98.0),
("M11", self._eval_local_model_multi, None),
]
newly_found = []
for m_id, check_fn, arg in checks:
if m_id in self.completed:
continue
result = check_fn(arg) if arg is not None else check_fn()
if result:
print(f"{m_id} appears satisfied — marking complete.")
self.completed.add(m_id)
newly_found.append(m_id)
else:
print(f"{m_id} not yet satisfied.")
if newly_found:
self.save_completed()
print(f"\n[SUCCESS] Auto-completed {len(newly_found)} milestone(s): {', '.join(newly_found)}")
else:
print(f"[INFO] {m_id} is already complete.")
print("\n[INFO] No new milestones auto-detected.")
def daily(self):
self.evaluate()
text = self.summary_text()
print(text)
ok = telegram_send(text)
if ok:
print("\n[TELEGRAM] Daily update sent.")
else:
print("\n[TELEGRAM] Failed to send update.")
def main():
parser = argparse.ArgumentParser(description="Gemini Phase Tracker")
parser = argparse.ArgumentParser(description="Fleet Phase Progression Tracker")
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser("status", help="Show current progress")
subparsers.add_parser("evaluate", help="Auto-evaluate checkable milestones")
subparsers.add_parser("telegram", help="Post summary to Telegram")
subparsers.add_parser("daily", help="Evaluate then post to Telegram")
complete_parser = subparsers.add_parser("complete", help="Mark a milestone as complete")
complete_parser.add_argument("id", help="Milestone ID (e.g. M1)")
args = parser.parse_args()
tracker = PhaseTracker()
if args.command == "status":
tracker.show_progress()
elif args.command == "evaluate":
tracker.evaluate()
elif args.command == "telegram":
ok = telegram_send(tracker.summary_text())
sys.exit(0 if ok else 1)
elif args.command == "daily":
tracker.daily()
elif args.command == "complete":
tracker.mark_complete(args.id)
ok = tracker.mark_complete(args.id)
sys.exit(0 if ok else 1)
else:
parser.print_help()
if __name__ == "__main__":
main()