Files
the-nexus/nexus/retry_helper.py
Alexander Whitestone 37b006d3c6
Some checks failed
Deploy Nexus / deploy (push) Has been cancelled
CI / validate (pull_request) Failing after 10s
feat: Fleet management (#910), retry logic (#896), morning report (#897)
- fleet/fleet.sh: cross-VPS health, status, restart, deploy
- nexus/retry_helper.py: retry decorator, dead letter queue, checkpoints
- nexus/morning_report.py: automated 0600 overnight activity report
- fleet/allegro/archived-scripts/README.md: burn script archive placeholder

Fixes #910
Fixes #896
Fixes #897
Fixes #898
2026-04-06 23:09:49 -04:00

115 lines
3.6 KiB
Python

"""
Retry logic and error recovery for burn-mode operations.
Provides: retry decorator, cycle state tracking, dead letter queue.
"""
import json
import os
import time
import traceback
from datetime import datetime, timezone
from pathlib import Path
# --- Configuration ---
STATE_DIR = Path(os.path.expanduser("~/.local/timmy/burn-state"))
STATE_FILE = STATE_DIR / "cycle-state.json"
DEAD_LETTER_FILE = STATE_DIR / "dead-letter.json"
MAX_RETRIES = 3
BASE_DELAY = 2 # seconds
def _ensure_dir():
STATE_DIR.mkdir(parents=True, exist_ok=True)
def retry(max_retries=MAX_RETRIES, base_delay=BASE_DELAY, exceptions=(Exception,)):
"""Retry decorator with exponential backoff."""
def decorator(fn):
def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(1, max_retries + 1):
try:
return fn(*args, **kwargs)
except exceptions as exc:
last_exc = exc
if attempt < max_retries:
delay = base_delay * (2 ** (attempt - 1))
print(f" [RETRY] {fn.__name__} attempt {attempt}/{max_retries} failed: {exc}")
print(f" [RETRY] waiting {delay}s...")
time.sleep(delay)
else:
print(f" [FAIL] {fn.__name__} failed after {max_retries} attempts: {exc}")
dead_letter(fn.__name__, args, exc)
return None # All retries exhausted
return wrapper
return decorator
def dead_letter(fn_name, args, exc):
"""Record a failed action to the dead letter queue."""
_ensure_dir()
entry = {
"function": fn_name,
"args": str(args)[:500],
"error": str(exc),
"traceback": traceback.format_exc()[:1000],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
dlq = []
if DEAD_LETTER_FILE.exists():
try:
dlq = json.loads(DEAD_LETTER_FILE.read_text())
except json.JSONDecodeError:
dlq = []
dlq.append(entry)
DEAD_LETTER_FILE.write_text(json.dumps(dlq, indent=2))
def save_checkpoint(action, repo=None, issue=None, detail=None):
"""Save the current cycle action for crash recovery."""
_ensure_dir()
state = {
"action": action,
"repo": repo,
"issue": issue,
"detail": detail or "",
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": "in-progress",
}
STATE_FILE.write_text(json.dumps(state, indent=2))
def clear_checkpoint():
"""Clear the checkpoint after successful completion."""
_ensure_dir()
state = {
"action": None,
"timestamp": datetime.now(timezone.utc).isoformat(),
"status": "complete",
}
STATE_FILE.write_text(json.dumps(state, indent=2))
def load_checkpoint():
"""Load the last checkpoint for crash recovery."""
if not STATE_FILE.exists():
return None
try:
return json.loads(STATE_FILE.read_text())
except json.JSONDecodeError:
return None
def get_dead_letter_summary():
"""Return a human-readable summary of the dead letter queue."""
if not DEAD_LETTER_FILE.exists():
return "Dead letter queue: empty"
try:
dlq = json.loads(DEAD_LETTER_FILE.read_text())
lines = [f"Dead letter queue: {len(dlq)} failed actions"]
for entry in dlq[-10:]: # Show last 10
lines.append(f" - {entry['function']}: {entry['error'][:100]} at {entry['timestamp']}")
return "\n".join(lines)
except json.JSONDecodeError:
return "Dead letter queue: corrupt"