forked from Rockachopa/Timmy-time-dashboard
182 lines
5.7 KiB
Python
182 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Loop guard — idle detection + exponential backoff for the dev loop.
|
|
|
|
Checks .loop/queue.json for ready items before spawning hermes.
|
|
When the queue is empty, applies exponential backoff (60s → 600s max)
|
|
instead of burning empty cycles every 3 seconds.
|
|
|
|
Usage (called by the dev loop before each cycle):
|
|
python3 scripts/loop_guard.py # exits 0 if ready, 1 if idle
|
|
python3 scripts/loop_guard.py --wait # same, but sleeps the backoff first
|
|
python3 scripts/loop_guard.py --status # print current idle state
|
|
|
|
Exit codes:
|
|
0 — queue has work, proceed with cycle
|
|
1 — queue empty, idle backoff applied (skip cycle)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parent.parent
|
|
QUEUE_FILE = REPO_ROOT / ".loop" / "queue.json"
|
|
IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
|
|
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
|
|
|
|
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
|
|
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
|
|
|
|
# Backoff sequence: 60s, 120s, 240s, 600s max
|
|
BACKOFF_BASE = 60
|
|
BACKOFF_MAX = 600
|
|
BACKOFF_MULTIPLIER = 2
|
|
|
|
|
|
def _get_token() -> str:
|
|
"""Read Gitea token from env or file."""
|
|
token = os.environ.get("GITEA_TOKEN", "").strip()
|
|
if not token and TOKEN_FILE.exists():
|
|
token = TOKEN_FILE.read_text().strip()
|
|
return token
|
|
|
|
|
|
def _fetch_open_issue_numbers() -> set[int] | None:
|
|
"""Fetch open issue numbers from Gitea. Returns None on failure."""
|
|
token = _get_token()
|
|
if not token:
|
|
return None
|
|
try:
|
|
numbers: set[int] = set()
|
|
page = 1
|
|
while True:
|
|
url = (
|
|
f"{GITEA_API}/repos/{REPO_SLUG}/issues"
|
|
f"?state=open&type=issues&limit=50&page={page}"
|
|
)
|
|
req = urllib.request.Request(url, headers={
|
|
"Authorization": f"token {token}",
|
|
"Accept": "application/json",
|
|
})
|
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
|
data = json.loads(resp.read())
|
|
if not data:
|
|
break
|
|
for issue in data:
|
|
numbers.add(issue["number"])
|
|
if len(data) < 50:
|
|
break
|
|
page += 1
|
|
return numbers
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def load_queue() -> list[dict]:
|
|
"""Load queue.json and return ready items, filtering out closed issues."""
|
|
if not QUEUE_FILE.exists():
|
|
return []
|
|
try:
|
|
data = json.loads(QUEUE_FILE.read_text())
|
|
if not isinstance(data, list):
|
|
return []
|
|
ready = [item for item in data if item.get("ready")]
|
|
if not ready:
|
|
return []
|
|
|
|
# Filter out issues that are no longer open (auto-hygiene)
|
|
open_numbers = _fetch_open_issue_numbers()
|
|
if open_numbers is not None:
|
|
before = len(ready)
|
|
ready = [item for item in ready if item.get("issue") in open_numbers]
|
|
removed = before - len(ready)
|
|
if removed > 0:
|
|
print(f"[loop-guard] Filtered {removed} closed issue(s) from queue")
|
|
# Persist the cleaned queue so stale entries don't recur
|
|
_save_cleaned_queue(data, open_numbers)
|
|
return ready
|
|
except (json.JSONDecodeError, OSError):
|
|
return []
|
|
|
|
|
|
def _save_cleaned_queue(full_queue: list[dict], open_numbers: set[int]) -> None:
|
|
"""Rewrite queue.json without closed issues."""
|
|
cleaned = [item for item in full_queue if item.get("issue") in open_numbers]
|
|
try:
|
|
QUEUE_FILE.write_text(json.dumps(cleaned, indent=2) + "\n")
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
def load_idle_state() -> dict:
|
|
"""Load persistent idle state."""
|
|
if not IDLE_STATE_FILE.exists():
|
|
return {"consecutive_idle": 0, "last_idle_at": 0}
|
|
try:
|
|
return json.loads(IDLE_STATE_FILE.read_text())
|
|
except (json.JSONDecodeError, OSError):
|
|
return {"consecutive_idle": 0, "last_idle_at": 0}
|
|
|
|
|
|
def save_idle_state(state: dict) -> None:
|
|
"""Persist idle state."""
|
|
IDLE_STATE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
IDLE_STATE_FILE.write_text(json.dumps(state, indent=2) + "\n")
|
|
|
|
|
|
def compute_backoff(consecutive_idle: int) -> int:
|
|
"""Exponential backoff: 60, 120, 240, 600 (capped)."""
|
|
return min(BACKOFF_BASE * (BACKOFF_MULTIPLIER ** consecutive_idle), BACKOFF_MAX)
|
|
|
|
|
|
def main() -> int:
|
|
wait_mode = "--wait" in sys.argv
|
|
status_mode = "--status" in sys.argv
|
|
|
|
state = load_idle_state()
|
|
|
|
if status_mode:
|
|
ready = load_queue()
|
|
backoff = compute_backoff(state["consecutive_idle"])
|
|
print(json.dumps({
|
|
"queue_ready": len(ready),
|
|
"consecutive_idle": state["consecutive_idle"],
|
|
"next_backoff_seconds": backoff if not ready else 0,
|
|
}, indent=2))
|
|
return 0
|
|
|
|
ready = load_queue()
|
|
|
|
if ready:
|
|
# Queue has work — reset idle state, proceed
|
|
if state["consecutive_idle"] > 0:
|
|
print(f"[loop-guard] Queue active ({len(ready)} ready) — "
|
|
f"resuming after {state['consecutive_idle']} idle cycles")
|
|
state["consecutive_idle"] = 0
|
|
state["last_idle_at"] = 0
|
|
save_idle_state(state)
|
|
return 0
|
|
|
|
# Queue empty — apply backoff
|
|
backoff = compute_backoff(state["consecutive_idle"])
|
|
state["consecutive_idle"] += 1
|
|
state["last_idle_at"] = time.time()
|
|
save_idle_state(state)
|
|
|
|
print(f"[loop-guard] Queue empty — idle #{state['consecutive_idle']}, "
|
|
f"backoff {backoff}s")
|
|
|
|
if wait_mode:
|
|
time.sleep(backoff)
|
|
|
|
return 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|