""" Retry logic and error recovery for burn-mode operations. Provides: retry decorator, cycle state tracking, dead letter queue. """ import json import os import time import traceback from datetime import datetime, timezone from pathlib import Path # --- Configuration --- STATE_DIR = Path(os.path.expanduser("~/.local/timmy/burn-state")) STATE_FILE = STATE_DIR / "cycle-state.json" DEAD_LETTER_FILE = STATE_DIR / "dead-letter.json" MAX_RETRIES = 3 BASE_DELAY = 2 # seconds def _ensure_dir(): STATE_DIR.mkdir(parents=True, exist_ok=True) def retry(max_retries=MAX_RETRIES, base_delay=BASE_DELAY, exceptions=(Exception,)): """Retry decorator with exponential backoff.""" def decorator(fn): def wrapper(*args, **kwargs): last_exc = None for attempt in range(1, max_retries + 1): try: return fn(*args, **kwargs) except exceptions as exc: last_exc = exc if attempt < max_retries: delay = base_delay * (2 ** (attempt - 1)) print(f" [RETRY] {fn.__name__} attempt {attempt}/{max_retries} failed: {exc}") print(f" [RETRY] waiting {delay}s...") time.sleep(delay) else: print(f" [FAIL] {fn.__name__} failed after {max_retries} attempts: {exc}") dead_letter(fn.__name__, args, exc) return None # All retries exhausted return wrapper return decorator def dead_letter(fn_name, args, exc): """Record a failed action to the dead letter queue.""" _ensure_dir() entry = { "function": fn_name, "args": str(args)[:500], "error": str(exc), "traceback": traceback.format_exc()[:1000], "timestamp": datetime.now(timezone.utc).isoformat(), } dlq = [] if DEAD_LETTER_FILE.exists(): try: dlq = json.loads(DEAD_LETTER_FILE.read_text()) except json.JSONDecodeError: dlq = [] dlq.append(entry) DEAD_LETTER_FILE.write_text(json.dumps(dlq, indent=2)) def save_checkpoint(action, repo=None, issue=None, detail=None): """Save the current cycle action for crash recovery.""" _ensure_dir() state = { "action": action, "repo": repo, "issue": issue, "detail": detail or "", "timestamp": datetime.now(timezone.utc).isoformat(), "status": "in-progress", } STATE_FILE.write_text(json.dumps(state, indent=2)) def clear_checkpoint(): """Clear the checkpoint after successful completion.""" _ensure_dir() state = { "action": None, "timestamp": datetime.now(timezone.utc).isoformat(), "status": "complete", } STATE_FILE.write_text(json.dumps(state, indent=2)) def load_checkpoint(): """Load the last checkpoint for crash recovery.""" if not STATE_FILE.exists(): return None try: return json.loads(STATE_FILE.read_text()) except json.JSONDecodeError: return None def get_dead_letter_summary(): """Return a human-readable summary of the dead letter queue.""" if not DEAD_LETTER_FILE.exists(): return "Dead letter queue: empty" try: dlq = json.loads(DEAD_LETTER_FILE.read_text()) lines = [f"Dead letter queue: {len(dlq)} failed actions"] for entry in dlq[-10:]: # Show last 10 lines.append(f" - {entry['function']}: {entry['error'][:100]} at {entry['timestamp']}") return "\n".join(lines) except json.JSONDecodeError: return "Dead letter queue: corrupt"