diff --git a/cron/pipeline-daily-reset.yml b/cron/pipeline-daily-reset.yml new file mode 100644 index 00000000..e7216f67 --- /dev/null +++ b/cron/pipeline-daily-reset.yml @@ -0,0 +1,5 @@ +- name: Pipeline State Daily Reset + schedule: '0 0 * * *' # Midnight daily + tasks: + - name: Reset stale pipeline states + shell: "python3 scripts/reset_pipeline_state.py --state-file ~/.hermes/pipeline_state.json" diff --git a/scripts/nightly-pipeline-scheduler.sh b/scripts/nightly-pipeline-scheduler.sh index d1b546d8..07895392 100644 --- a/scripts/nightly-pipeline-scheduler.sh +++ b/scripts/nightly-pipeline-scheduler.sh @@ -169,10 +169,16 @@ else: reset_stale_states() { - python3 -c " + # Delegate to standalone script (fixes #650: inline Python $STATE_FILE escaping bug) + local script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + if [[ -f "$script_dir/reset_pipeline_state.py" ]]; then + python3 "$script_dir/reset_pipeline_state.py" --state-file "$STATE_FILE" + else + # Fallback: inline reset with proper variable expansion + python3 -c " import json, os from datetime import datetime, timezone, timedelta -path = '\$STATE_FILE' +path = '$STATE_FILE' if not os.path.exists(path): exit(0) with open(path) as f: @@ -200,6 +206,7 @@ if stale: json.dump(d, f, indent=2) print('Reset stale pipelines: ' + ', '.join(stale)) " 2>/dev/null + fi } check_dependency() { diff --git a/scripts/reset_pipeline_state.py b/scripts/reset_pipeline_state.py new file mode 100644 index 00000000..db8f8901 --- /dev/null +++ b/scripts/reset_pipeline_state.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +""" +reset_pipeline_state.py — Daily reset for pipeline_state.json + +Cleans stale pipeline entries so yesterday's "complete" doesn't block +today's runs. Called by cron at midnight or by the scheduler before +each cycle. + +Usage: + python3 scripts/reset_pipeline_state.py + python3 scripts/reset_pipeline_state.py --state-file /path/to/pipeline_state.json + python3 scripts/reset_pipeline_state.py --dry-run + python3 scripts/reset_pipeline_state.py --max-age-hours 12 +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timezone, timedelta +from typing import Dict, Any, List, Tuple + + +# Default age thresholds +DEFAULT_COMPLETE_MAX_AGE_HOURS = 24 +DEFAULT_FAILED_MAX_AGE_HOURS = 24 +DEFAULT_RUNNING_MAX_AGE_HOURS = 6 +DEFAULT_STATE_FILE = os.path.expanduser("~/.hermes/pipeline_state.json") + + +def parse_timestamp(ts: str) -> datetime: + """Parse ISO timestamp, handling Z suffix and missing tzinfo.""" + ts = ts.replace("Z", "+00:00") + try: + return datetime.fromisoformat(ts) + except ValueError: + # Fallback for non-standard formats + for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"): + try: + return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc) + except ValueError: + continue + raise ValueError(f"Cannot parse timestamp: {ts}") + + +def classify_stale( + entry: Dict[str, Any], + now: datetime, + complete_max_age: timedelta, + failed_max_age: timedelta, + running_max_age: timedelta, +) -> Tuple[bool, str]: + """Check if a pipeline entry is stale. Returns (is_stale, reason).""" + state = entry.get("state", "not_started") + updated_str = entry.get("updated", "") + + if not updated_str: + return True, "no_timestamp" + + try: + updated = parse_timestamp(updated_str) + except (ValueError, Exception): + return True, "invalid_timestamp" + + age = now - updated + + if state == "complete" and age > complete_max_age: + return True, f"complete_{int(age.total_seconds() / 3600)}h_ago" + elif state == "failed" and age > failed_max_age: + return True, f"failed_{int(age.total_seconds() / 3600)}h_ago" + elif state == "running" and age > running_max_age: + return True, f"running_stuck_{int(age.total_seconds() / 3600)}h_ago" + + return False, "" + + +def reset_pipeline_state( + state_file: str, + complete_max_age_hours: int = DEFAULT_COMPLETE_MAX_AGE_HOURS, + failed_max_age_hours: int = DEFAULT_FAILED_MAX_AGE_HOURS, + running_max_age_hours: int = DEFAULT_RUNNING_MAX_AGE_HOURS, + dry_run: bool = False, +) -> Tuple[Dict[str, Any], List[str]]: + """Reset stale pipeline entries. + + Returns (cleaned_state_dict, list_of_removed_names). + """ + now = datetime.now(timezone.utc) + + complete_max_age = timedelta(hours=complete_max_age_hours) + failed_max_age = timedelta(hours=failed_max_age_hours) + running_max_age = timedelta(hours=running_max_age_hours) + + # Load state + if not os.path.exists(state_file): + return {}, [] + + with open(state_file) as f: + state = json.load(f) + + if not isinstance(state, dict): + # Corrupted file — reset entirely + if not dry_run: + with open(state_file, "w") as f: + json.dump({}, f) + return {}, list(state.keys()) if isinstance(state, dict) else ["corrupted"] + + # Classify entries + to_remove = [] + for name, entry in list(state.items()): + if not isinstance(entry, dict): + to_remove.append((name, "not_dict")) + continue + if "state" not in entry: + to_remove.append((name, "no_state_field")) + continue + + stale, reason = classify_stale( + entry, now, complete_max_age, failed_max_age, running_max_age + ) + if stale: + to_remove.append((name, reason)) + + # Remove stale entries + removed_names = [] + for name, reason in to_remove: + del state[name] + removed_names.append(f"{name}({reason})") + + # Write back + if removed_names and not dry_run: + os.makedirs(os.path.dirname(state_file) or ".", exist_ok=True) + with open(state_file, "w") as f: + json.dump(state, f, indent=2) + + return state, removed_names + + +def main(): + parser = argparse.ArgumentParser(description="Reset stale pipeline state entries") + parser.add_argument("--state-file", default=DEFAULT_STATE_FILE, + help="Path to pipeline_state.json") + parser.add_argument("--complete-max-age-hours", type=int, default=DEFAULT_COMPLETE_MAX_AGE_HOURS, + help="Max age in hours for complete entries (default: 24)") + parser.add_argument("--failed-max-age-hours", type=int, default=DEFAULT_FAILED_MAX_AGE_HOURS, + help="Max age in hours for failed entries (default: 24)") + parser.add_argument("--running-max-age-hours", type=int, default=DEFAULT_RUNNING_MAX_AGE_HOURS, + help="Max age in hours for stuck running entries (default: 6)") + parser.add_argument("--dry-run", action="store_true", + help="Show what would be removed without modifying") + parser.add_argument("--json", action="store_true", + help="Output as JSON") + + args = parser.parse_args() + + state, removed = reset_pipeline_state( + state_file=args.state_file, + complete_max_age_hours=args.complete_max_age_hours, + failed_max_age_hours=args.failed_max_age_hours, + running_max_age_hours=args.running_max_age_hours, + dry_run=args.dry_run, + ) + + if args.json: + result = { + "state_file": args.state_file, + "removed": removed, + "remaining": list(state.keys()), + "dry_run": args.dry_run, + } + print(json.dumps(result, indent=2)) + else: + prefix = "[DRY RUN] " if args.dry_run else "" + if removed: + print(f"{prefix}Reset {len(removed)} stale pipeline(s): {', '.join(removed)}") + else: + print(f"{prefix}No stale pipelines found. {len(state)} active.") + if state: + for name, entry in state.items(): + age_h = "" + try: + updated = parse_timestamp(entry["updated"]) + age = datetime.now(timezone.utc) - updated + age_h = f" ({int(age.total_seconds() / 3600)}h ago)" + except Exception: + pass + print(f" {name}: {entry['state']}{age_h}") + + +if __name__ == "__main__": + main() diff --git a/scripts/test_reset_pipeline_state.py b/scripts/test_reset_pipeline_state.py new file mode 100644 index 00000000..83567b9a --- /dev/null +++ b/scripts/test_reset_pipeline_state.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +"""Tests for scripts/reset_pipeline_state.py — 10 tests.""" + +import json +import os +import sys +import tempfile +from datetime import datetime, timezone, timedelta + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) +from reset_pipeline_state import reset_pipeline_state, classify_stale, parse_timestamp + + +def test_no_state_file(): + """Reset on missing file returns empty.""" + state, removed = reset_pipeline_state("/nonexistent/pipeline_state.json") + assert state == {} + assert removed == [] + print("PASS: test_no_state_file") + + +def test_empty_state(): + """Empty JSON object is untouched.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert state == {} + assert removed == [] + finally: + os.unlink(path) + print("PASS: test_empty_state") + + +def test_fresh_complete_kept(): + """Recent complete entry is kept.""" + now = datetime.now(timezone.utc) + entry = {"state": "complete", "updated": now.isoformat()} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"my-pipeline": entry}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "my-pipeline" in state + assert removed == [] + finally: + os.unlink(path) + print("PASS: test_fresh_complete_kept") + + +def test_old_complete_removed(): + """Complete entry older than 24h is removed.""" + old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat() + entry = {"state": "complete", "updated": old} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"old-pipeline": entry}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "old-pipeline" not in state + assert len(removed) == 1 + assert "old-pipeline" in removed[0] + finally: + os.unlink(path) + print("PASS: test_old_complete_removed") + + +def test_stuck_running_removed(): + """Running entry older than 6h is treated as stuck and removed.""" + old = (datetime.now(timezone.utc) - timedelta(hours=10)).isoformat() + entry = {"state": "running", "updated": old} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"stuck-pipeline": entry}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "stuck-pipeline" not in state + assert len(removed) == 1 + finally: + os.unlink(path) + print("PASS: test_stuck_running_removed") + + +def test_old_failed_removed(): + """Failed entry older than 24h is removed.""" + old = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat() + entry = {"state": "failed", "updated": old} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"failed-pipeline": entry}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "failed-pipeline" not in state + finally: + os.unlink(path) + print("PASS: test_old_failed_removed") + + +def test_running_kept_if_fresh(): + """Fresh running entry is kept.""" + now = datetime.now(timezone.utc) + entry = {"state": "running", "updated": now.isoformat()} + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"active-pipeline": entry}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "active-pipeline" in state + assert removed == [] + finally: + os.unlink(path) + print("PASS: test_running_kept_if_fresh") + + +def test_dry_run_does_not_modify(): + """Dry run reports removals but doesn't change the file.""" + old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat() + content = json.dumps({"old-pipeline": {"state": "complete", "updated": old}}) + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + f.write(content) + path = f.name + try: + state, removed = reset_pipeline_state(path, dry_run=True) + assert "old-pipeline" not in state + assert len(removed) == 1 + # File should be unchanged + with open(path) as f: + file_state = json.load(f) + assert "old-pipeline" in file_state + finally: + os.unlink(path) + print("PASS: test_dry_run_does_not_modify") + + +def test_mixed_entries(): + """Mix of fresh and stale entries — only stale removed.""" + now = datetime.now(timezone.utc) + old = (now - timedelta(hours=30)).isoformat() + state_data = { + "fresh-complete": {"state": "complete", "updated": now.isoformat()}, + "stale-complete": {"state": "complete", "updated": old}, + "fresh-running": {"state": "running", "updated": now.isoformat()}, + "stuck-running": {"state": "running", "updated": (now - timedelta(hours=10)).isoformat()}, + } + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump(state_data, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "fresh-complete" in state + assert "fresh-running" in state + assert "stale-complete" not in state + assert "stuck-running" not in state + assert len(removed) == 2 + finally: + os.unlink(path) + print("PASS: test_mixed_entries") + + +def test_corrupted_entry_removed(): + """Non-dict entries are removed.""" + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + json.dump({"broken": "not_a_dict", "also-bad": 42}, f) + path = f.name + try: + state, removed = reset_pipeline_state(path) + assert "broken" not in state + assert "also-bad" not in state + finally: + os.unlink(path) + print("PASS: test_corrupted_entry_removed") + + +def run_all(): + test_no_state_file() + test_empty_state() + test_fresh_complete_kept() + test_old_complete_removed() + test_stuck_running_removed() + test_old_failed_removed() + test_running_kept_if_fresh() + test_dry_run_does_not_modify() + test_mixed_entries() + test_corrupted_entry_removed() + print("\nAll 10 tests passed!") + + +if __name__ == "__main__": + run_all()