- fleet/fleet.sh: cross-VPS health, status, restart, deploy - nexus/retry_helper.py: retry decorator, dead letter queue, checkpoints - nexus/morning_report.py: automated 0600 overnight activity report - fleet/allegro/archived-scripts/README.md: burn script archive placeholder Fixes #910 Fixes #896 Fixes #897 Fixes #898
115 lines
3.6 KiB
Python
115 lines
3.6 KiB
Python
"""
|
|
Retry logic and error recovery for burn-mode operations.
|
|
Provides: retry decorator, cycle state tracking, dead letter queue.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# --- Configuration ---
|
|
STATE_DIR = Path(os.path.expanduser("~/.local/timmy/burn-state"))
|
|
STATE_FILE = STATE_DIR / "cycle-state.json"
|
|
DEAD_LETTER_FILE = STATE_DIR / "dead-letter.json"
|
|
MAX_RETRIES = 3
|
|
BASE_DELAY = 2 # seconds
|
|
|
|
|
|
def _ensure_dir():
|
|
STATE_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def retry(max_retries=MAX_RETRIES, base_delay=BASE_DELAY, exceptions=(Exception,)):
|
|
"""Retry decorator with exponential backoff."""
|
|
def decorator(fn):
|
|
def wrapper(*args, **kwargs):
|
|
last_exc = None
|
|
for attempt in range(1, max_retries + 1):
|
|
try:
|
|
return fn(*args, **kwargs)
|
|
except exceptions as exc:
|
|
last_exc = exc
|
|
if attempt < max_retries:
|
|
delay = base_delay * (2 ** (attempt - 1))
|
|
print(f" [RETRY] {fn.__name__} attempt {attempt}/{max_retries} failed: {exc}")
|
|
print(f" [RETRY] waiting {delay}s...")
|
|
time.sleep(delay)
|
|
else:
|
|
print(f" [FAIL] {fn.__name__} failed after {max_retries} attempts: {exc}")
|
|
dead_letter(fn.__name__, args, exc)
|
|
return None # All retries exhausted
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def dead_letter(fn_name, args, exc):
|
|
"""Record a failed action to the dead letter queue."""
|
|
_ensure_dir()
|
|
entry = {
|
|
"function": fn_name,
|
|
"args": str(args)[:500],
|
|
"error": str(exc),
|
|
"traceback": traceback.format_exc()[:1000],
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
}
|
|
dlq = []
|
|
if DEAD_LETTER_FILE.exists():
|
|
try:
|
|
dlq = json.loads(DEAD_LETTER_FILE.read_text())
|
|
except json.JSONDecodeError:
|
|
dlq = []
|
|
dlq.append(entry)
|
|
DEAD_LETTER_FILE.write_text(json.dumps(dlq, indent=2))
|
|
|
|
|
|
def save_checkpoint(action, repo=None, issue=None, detail=None):
|
|
"""Save the current cycle action for crash recovery."""
|
|
_ensure_dir()
|
|
state = {
|
|
"action": action,
|
|
"repo": repo,
|
|
"issue": issue,
|
|
"detail": detail or "",
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"status": "in-progress",
|
|
}
|
|
STATE_FILE.write_text(json.dumps(state, indent=2))
|
|
|
|
|
|
def clear_checkpoint():
|
|
"""Clear the checkpoint after successful completion."""
|
|
_ensure_dir()
|
|
state = {
|
|
"action": None,
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"status": "complete",
|
|
}
|
|
STATE_FILE.write_text(json.dumps(state, indent=2))
|
|
|
|
|
|
def load_checkpoint():
|
|
"""Load the last checkpoint for crash recovery."""
|
|
if not STATE_FILE.exists():
|
|
return None
|
|
try:
|
|
return json.loads(STATE_FILE.read_text())
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def get_dead_letter_summary():
|
|
"""Return a human-readable summary of the dead letter queue."""
|
|
if not DEAD_LETTER_FILE.exists():
|
|
return "Dead letter queue: empty"
|
|
try:
|
|
dlq = json.loads(DEAD_LETTER_FILE.read_text())
|
|
lines = [f"Dead letter queue: {len(dlq)} failed actions"]
|
|
for entry in dlq[-10:]: # Show last 10
|
|
lines.append(f" - {entry['function']}: {entry['error'][:100]} at {entry['timestamp']}")
|
|
return "\n".join(lines)
|
|
except json.JSONDecodeError:
|
|
return "Dead letter queue: corrupt"
|