This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/scripts/loop_guard.py
2026-03-21 13:53:11 +00:00

272 lines
8.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Loop guard — idle detection + exponential backoff for the dev loop.
Checks .loop/queue.json for ready items before spawning hermes.
When the queue is empty, applies exponential backoff (60s → 600s max)
instead of burning empty cycles every 3 seconds.
Usage (called by the dev loop before each cycle):
python3 scripts/loop_guard.py # exits 0 if ready, 1 if idle
python3 scripts/loop_guard.py --wait # same, but sleeps the backoff first
python3 scripts/loop_guard.py --status # print current idle state
Exit codes:
0 — queue has work, proceed with cycle
1 — queue empty, idle backoff applied (skip cycle)
"""
from __future__ import annotations
import json
import os
import sys
import time
import urllib.request
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parent.parent
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this
CYCLE_DURATION = int(os.environ.get("CYCLE_DURATION", "300"))
# Backoff sequence: 60s, 120s, 240s, 600s max
BACKOFF_BASE = 60
BACKOFF_MAX = 600
BACKOFF_MULTIPLIER = 2
def _get_token() -> str:
"""Read Gitea token from env or file."""
token = os.environ.get("GITEA_TOKEN", "").strip()
if not token and TOKEN_FILE.exists():
token = TOKEN_FILE.read_text().strip()
return token
def _fetch_open_issue_numbers() -> set[int] | None:
"""Fetch open issue numbers from Gitea. Returns None on failure."""
token = _get_token()
if not token:
return None
try:
numbers: set[int] = set()
page = 1
while True:
url = (
f"{GITEA_API}/repos/{REPO_SLUG}/issues"
f"?state=open&type=issues&limit=50&page={page}"
)
req = urllib.request.Request(url, headers={
"Authorization": f"token {token}",
"Accept": "application/json",
})
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
if not data:
break
for issue in data:
numbers.add(issue["number"])
if len(data) < 50:
break
page += 1
return numbers
except Exception:
return None
def _load_cycle_result() -> dict:
"""Read cycle_result.json, handling markdown-fenced JSON."""
if not CYCLE_RESULT_FILE.exists():
return {}
try:
raw = CYCLE_RESULT_FILE.read_text().strip()
if raw.startswith("```"):
lines = raw.splitlines()
lines = [ln for ln in lines if not ln.startswith("```")]
raw = "\n".join(lines)
return json.loads(raw)
except (json.JSONDecodeError, OSError):
return {}
def _is_issue_open(issue_number: int) -> bool | None:
"""Check if a single issue is open. Returns None on API failure."""
token = _get_token()
if not token:
return None
try:
url = f"{GITEA_API}/repos/{REPO_SLUG}/issues/{issue_number}"
req = urllib.request.Request(
url,
headers={
"Authorization": f"token {token}",
"Accept": "application/json",
},
)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
return data.get("state") == "open"
except Exception:
return None
def validate_cycle_result() -> bool:
"""Pre-cycle validation: remove stale or invalid cycle_result.json.
Checks:
1. Age — if older than 2× CYCLE_DURATION, delete it.
2. Issue — if the referenced issue is closed, delete it.
Returns True if the file was removed, False otherwise.
"""
if not CYCLE_RESULT_FILE.exists():
return False
# Age check
try:
age = time.time() - CYCLE_RESULT_FILE.stat().st_mtime
except OSError:
return False
stale_threshold = CYCLE_DURATION * 2
if age > stale_threshold:
print(
f"[loop-guard] cycle_result.json is {int(age)}s old "
f"(threshold {stale_threshold}s) — removing stale file"
)
CYCLE_RESULT_FILE.unlink(missing_ok=True)
return True
# Issue check
cr = _load_cycle_result()
issue_num = cr.get("issue")
if issue_num is not None:
try:
issue_num = int(issue_num)
except (ValueError, TypeError):
return False
is_open = _is_issue_open(issue_num)
if is_open is False:
print(
f"[loop-guard] cycle_result.json references closed "
f"issue #{issue_num} — removing"
)
CYCLE_RESULT_FILE.unlink(missing_ok=True)
return True
# is_open is None (API failure) or True — keep file
return False
def load_queue() -> list[dict]:
"""Load queue.json and return ready items, filtering out closed issues."""
if not QUEUE_FILE.exists():
return []
try:
data = json.loads(QUEUE_FILE.read_text())
if not isinstance(data, list):
return []
ready = [item for item in data if item.get("ready")]
if not ready:
return []
# Filter out issues that are no longer open (auto-hygiene)
open_numbers = _fetch_open_issue_numbers()
if open_numbers is not None:
before = len(ready)
ready = [item for item in ready if item.get("issue") in open_numbers]
removed = before - len(ready)
if removed > 0:
print(f"[loop-guard] Filtered {removed} closed issue(s) from queue")
# Persist the cleaned queue so stale entries don't recur
_save_cleaned_queue(data, open_numbers)
return ready
except (json.JSONDecodeError, OSError):
return []
def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None:
"""Rewrite queue.json without closed issues."""
cleaned = [item for item in full_queue if item.get("issue") in open_numbers]
try:
QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n")
except OSError:
pass
def load_idle_state() -> dict:
"""Load persistent idle state."""
if not IDLE_STATE_FILE.exists():
return {"consecutive_idle": 0, "last_idle_at": 0}
try:
return json.loads(IDLE_STATE_FILE.read_text())
except (json.JSONDecodeError, OSError):
return {"consecutive_idle": 0, "last_idle_at": 0}
def save_idle_state(state: dict) -> None:
"""Persist idle state."""
IDLE_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
IDLE_STATE_FILE.write_text(json.dumps(state, indent=2) + "\n")
def compute_backoff(consecutive_idle: int) -> int:
"""Exponential backoff: 60, 120, 240, 600 (capped)."""
return min(BACKOFF_BASE * (BACKOFF_MULTIPLIER ** consecutive_idle), BACKOFF_MAX)
def main() -> int:
wait_mode = "--wait" in sys.argv
status_mode = "--status" in sys.argv
state = load_idle_state()
if status_mode:
ready = load_queue()
backoff = compute_backoff(state["consecutive_idle"])
print(json.dumps({
"queue_ready": len(ready),
"consecutive_idle": state["consecutive_idle"],
"next_backoff_seconds": backoff if not ready else 0,
}, indent=2))
return 0
# Pre-cycle validation: remove stale cycle_result.json
validate_cycle_result()
ready = load_queue()
if ready:
# Queue has work — reset idle state, proceed
if state["consecutive_idle"] > 0:
print(f"[loop-guard] Queue active ({len(ready)} ready) — "
f"resuming after {state['consecutive_idle']} idle cycles")
state["consecutive_idle"] = 0
state["last_idle_at"] = 0
save_idle_state(state)
return 0
# Queue empty — apply backoff
backoff = compute_backoff(state["consecutive_idle"])
state["consecutive_idle"] += 1
state["last_idle_at"] = time.time()
save_idle_state(state)
print(f"[loop-guard] Queue empty — idle #{state['consecutive_idle']}, "
f"backoff {backoff}s")
if wait_mode:
time.sleep(backoff)
return 1
if __name__ == "__main__":
sys.exit(main())