#!/usr/bin/env python3 """Loop guard — idle detection + exponential backoff for the dev loop. Checks .loop/queue.json for ready items before spawning hermes. When the queue is empty, applies exponential backoff (60s → 600s max) instead of burning empty cycles every 3 seconds. Usage (called by the dev loop before each cycle): python3 scripts/loop_guard.py # exits 0 if ready, 1 if idle python3 scripts/loop_guard.py --wait # same, but sleeps the backoff first python3 scripts/loop_guard.py --status # print current idle state Exit codes: 0 — queue has work, proceed with cycle 1 — queue empty, idle backoff applied (skip cycle) """ from __future__ import annotations import json import os import sys import time import urllib.request from pathlib import Path REPO_ROOT = Path(__file__).resolve().parent.parent QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json" IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json" CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json" TOKEN_FILE = Path.home() / ".hermes" / "gitea_token" def _get_gitea_api() -> str: """Read Gitea API URL from env var, then ~/.hermes/gitea_api file, then default.""" # Check env vars first (TIMMY_GITEA_API is preferred, GITEA_API for compatibility) api_url = os.environ.get("TIMMY_GITEA_API") or os.environ.get("GITEA_API") if api_url: return api_url # Check ~/.hermes/gitea_api file api_file = Path.home() / ".hermes" / "gitea_api" if api_file.exists(): return api_file.read_text().strip() # Default fallback return "http://143.198.27.163:3000/api/v1" GITEA_API = _get_gitea_api() REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard") # Default cycle duration in seconds (5 min); stale threshold = 2× this CYCLE_DURATION = int(os.environ.get("CYCLE_DURATION", "300")) # Backoff sequence: 60s, 120s, 240s, 600s max BACKOFF_BASE = 60 BACKOFF_MAX = 600 BACKOFF_MULTIPLIER = 2 def _get_token() -> str: """Read Gitea token from env or file.""" token = os.environ.get("GITEA_TOKEN", "").strip() if not token and TOKEN_FILE.exists(): token = TOKEN_FILE.read_text().strip() return token def _fetch_open_issue_numbers() -> set[int] | None: """Fetch open issue numbers from Gitea. Returns None on failure.""" token = _get_token() if not token: return None try: numbers: set[int] = set() page = 1 while True: url = ( f"{GITEA_API}/repos/{REPO_SLUG}/issues" f"?state=open&type=issues&limit=50&page={page}" ) req = urllib.request.Request(url, headers={ "Authorization": f"token {token}", "Accept": "application/json", }) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) if not data: break for issue in data: numbers.add(issue["number"]) if len(data) < 50: break page += 1 return numbers except Exception: return None def _load_cycle_result() -> dict: """Read cycle_result.json, handling markdown-fenced JSON.""" if not CYCLE_RESULT_FILE.exists(): return {} try: raw = CYCLE_RESULT_FILE.read_text().strip() if raw.startswith("```"): lines = raw.splitlines() lines = [ln for ln in lines if not ln.startswith("```")] raw = "\n".join(lines) return json.loads(raw) except (json.JSONDecodeError, OSError): return {} def _is_issue_open(issue_number: int) -> bool | None: """Check if a single issue is open. Returns None on API failure.""" token = _get_token() if not token: return None try: url = f"{GITEA_API}/repos/{REPO_SLUG}/issues/{issue_number}" req = urllib.request.Request( url, headers={ "Authorization": f"token {token}", "Accept": "application/json", }, ) with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read()) return data.get("state") == "open" except Exception: return None def validate_cycle_result() -> bool: """Pre-cycle validation: remove stale or invalid cycle_result.json. Checks: 1. Age — if older than 2× CYCLE_DURATION, delete it. 2. Issue — if the referenced issue is closed, delete it. Returns True if the file was removed, False otherwise. """ if not CYCLE_RESULT_FILE.exists(): return False # Age check try: age = time.time() - CYCLE_RESULT_FILE.stat().st_mtime except OSError: return False stale_threshold = CYCLE_DURATION * 2 if age > stale_threshold: print( f"[loop-guard] cycle_result.json is {int(age)}s old " f"(threshold {stale_threshold}s) — removing stale file" ) CYCLE_RESULT_FILE.unlink(missing_ok=True) return True # Issue check cr = _load_cycle_result() issue_num = cr.get("issue") if issue_num is not None: try: issue_num = int(issue_num) except (ValueError, TypeError): return False is_open = _is_issue_open(issue_num) if is_open is False: print( f"[loop-guard] cycle_result.json references closed " f"issue #{issue_num} — removing" ) CYCLE_RESULT_FILE.unlink(missing_ok=True) return True # is_open is None (API failure) or True — keep file return False def load_queue() -> list[dict]: """Load queue.json and return ready items, filtering out closed issues.""" if not QUEUE_FILE.exists(): return [] try: data = json.loads(QUEUE_FILE.read_text()) if not isinstance(data, list): return [] ready = [item for item in data if item.get("ready")] if not ready: return [] # Filter out issues that are no longer open (auto-hygiene) open_numbers = _fetch_open_issue_numbers() if open_numbers is not None: before = len(ready) ready = [item for item in ready if item.get("issue") in open_numbers] removed = before - len(ready) if removed > 0: print(f"[loop-guard] Filtered {removed} closed issue(s) from queue") # Persist the cleaned queue so stale entries don't recur _save_cleaned_queue(data, open_numbers) return ready except json.JSONDecodeError as exc: print(f"[loop-guard] WARNING: Corrupt queue.json ({exc}) — returning empty queue") return [] except OSError as exc: print(f"[loop-guard] WARNING: Cannot read queue.json ({exc}) — returning empty queue") return [] def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None: """Rewrite queue.json without closed issues.""" cleaned = [item for item in full_queue if item.get("issue") in open_numbers] try: QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n") except OSError: pass def load_idle_state() -> dict: """Load persistent idle state.""" if not IDLE_STATE_FILE.exists(): return {"consecutive_idle": 0, "last_idle_at": 0} try: return json.loads(IDLE_STATE_FILE.read_text()) except (json.JSONDecodeError, OSError): return {"consecutive_idle": 0, "last_idle_at": 0} def save_idle_state(state: dict) -> None: """Persist idle state.""" IDLE_STATE_FILE.parent.mkdir(parents=True, exist_ok=True) IDLE_STATE_FILE.write_text(json.dumps(state, indent=2) + "\n") def compute_backoff(consecutive_idle: int) -> int: """Exponential backoff: 60, 120, 240, 600 (capped).""" return min(BACKOFF_BASE * (BACKOFF_MULTIPLIER ** consecutive_idle), BACKOFF_MAX) def seed_cycle_result(item: dict) -> None: """Pre-seed cycle_result.json with the top queue item. Only writes if cycle_result.json does not already exist — never overwrites agent-written data. This ensures cycle_retro.py can always resolve the issue number even when the dispatcher (claude-loop, gemini-loop, etc.) does not write cycle_result.json itself. """ if CYCLE_RESULT_FILE.exists(): return # Agent already wrote its own result — leave it alone seed = { "issue": item.get("issue"), "type": item.get("type", "unknown"), } try: CYCLE_RESULT_FILE.parent.mkdir(parents=True, exist_ok=True) CYCLE_RESULT_FILE.write_text(json.dumps(seed) + "\n") print(f"[loop-guard] Seeded cycle_result.json with issue #{seed['issue']}") except OSError as exc: print(f"[loop-guard] WARNING: Could not seed cycle_result.json: {exc}") def main() -> int: wait_mode = "--wait" in sys.argv status_mode = "--status" in sys.argv pick_mode = "--pick" in sys.argv state = load_idle_state() if status_mode: ready = load_queue() backoff = compute_backoff(state["consecutive_idle"]) print(json.dumps({ "queue_ready": len(ready), "consecutive_idle": state["consecutive_idle"], "next_backoff_seconds": backoff if not ready else 0, }, indent=2)) return 0 # Pre-cycle validation: remove stale cycle_result.json validate_cycle_result() ready = load_queue() if ready: # Queue has work — reset idle state, proceed if state["consecutive_idle"] > 0: print(f"[loop-guard] Queue active ({len(ready)} ready) — " f"resuming after {state['consecutive_idle']} idle cycles") state["consecutive_idle"] = 0 state["last_idle_at"] = 0 save_idle_state(state) # Pre-seed cycle_result.json so cycle_retro.py can resolve issue= # even when the dispatcher doesn't write the file itself. seed_cycle_result(ready[0]) if pick_mode: # Emit the top issue number to stdout for shell script capture. issue = ready[0].get("issue") if issue is not None: print(issue) return 0 # Queue empty — apply backoff backoff = compute_backoff(state["consecutive_idle"]) state["consecutive_idle"] += 1 state["last_idle_at"] = time.time() save_idle_state(state) print(f"[loop-guard] Queue empty — idle #{state['consecutive_idle']}, " f"backoff {backoff}s") if wait_mode: time.sleep(backoff) return 1 if __name__ == "__main__": sys.exit(main())