From bca52ea61e7f7b92e45f1e526df72b888496ea7c Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 15 Apr 2026 03:27:39 +0000 Subject: [PATCH] fix: add timestamp-based staleness detection for pipeline states Closes #676, fixes #650 - is_pipeline_complete: stale if >24h old - is_pipeline_running: stale if >6h old (likely crashed) - reset_stale_states: purges stale entries at scheduler start - --status shows (stale) indicator for outdated states --- scripts/nightly-pipeline-scheduler.sh | 99 ++++++++++++++++++++++++--- 1 file changed, 90 insertions(+), 9 deletions(-) diff --git a/scripts/nightly-pipeline-scheduler.sh b/scripts/nightly-pipeline-scheduler.sh index 2d2fe8e3..d1b546d8 100644 --- a/scripts/nightly-pipeline-scheduler.sh +++ b/scripts/nightly-pipeline-scheduler.sh @@ -117,14 +117,26 @@ is_pipeline_complete() { local pipeline="$1" python3 -c " import json, os +from datetime import datetime, timezone, timedelta path = '$STATE_FILE' if not os.path.exists(path): print('false') else: with open(path) as f: d = json.load(f) - state = d.get('$pipeline', {}).get('state', 'not_started') - print('true' if state == 'complete' else 'false') + entry = d.get('$pipeline', {}) + state = entry.get('state', 'not_started') + if state != 'complete': + print('false') + else: + # Check staleness: complete from a previous day is stale + updated = entry.get('updated', '') + try: + t = datetime.fromisoformat(updated.replace('Z', '+00:00')) + is_today = (datetime.now(timezone.utc) - t) < timedelta(hours=24) + print('true' if is_today else 'false') + except Exception: + print('false') " 2>/dev/null || echo false } @@ -132,17 +144,64 @@ is_pipeline_running() { local pipeline="$1" python3 -c " import json, os +from datetime import datetime, timezone, timedelta path = '$STATE_FILE' if not os.path.exists(path): print('false') else: with open(path) as f: d = json.load(f) - state = d.get('$pipeline', {}).get('state', 'not_started') - print('true' if state == 'running' else 'false') + entry = d.get('$pipeline', {}) + state = entry.get('state', 'not_started') + if state != 'running': + print('false') + else: + # Running for >6 hours is likely crashed + updated = entry.get('updated', '') + try: + t = datetime.fromisoformat(updated.replace('Z', '+00:00')) + stale = (datetime.now(timezone.utc) - t) > timedelta(hours=6) + print('false' if stale else 'true') + except Exception: + print('true') " 2>/dev/null || echo false } + +reset_stale_states() { + python3 -c " +import json, os +from datetime import datetime, timezone, timedelta +path = '\$STATE_FILE' +if not os.path.exists(path): + exit(0) +with open(path) as f: + d = json.load(f) +stale = [] +for name, entry in list(d.items()): + if not isinstance(entry, dict) or 'state' not in entry: + continue + state = entry['state'] + updated = entry.get('updated', '') + try: + t = datetime.fromisoformat(updated.replace('Z', '+00:00')) + except Exception: + stale.append(name) + continue + age = datetime.now(timezone.utc) - t + if state in ('complete', 'failed') and age > timedelta(hours=24): + stale.append(name) + elif state == 'running' and age > timedelta(hours=6): + stale.append(name) +for name in stale: + del d[name] +if stale: + with open(path, 'w') as f: + json.dump(d, f, indent=2) + print('Reset stale pipelines: ' + ', '.join(stale)) +" 2>/dev/null +} + check_dependency() { local dep="$1" if [[ "$dep" == "none" ]]; then @@ -310,22 +369,44 @@ main() { exit 0 fi + # Reset stale pipeline states from previous days + reset_stale_states + # Check 5: Pipeline status if [[ "$mode" == "--status" ]]; then echo -e "${CYAN}Pipeline Status:${NC}" echo "────────────────────────────────────────────────────" for entry in "${PIPELINES[@]}"; do IFS='|' read -r name script max_tokens dep <<< "$entry" - local state=$(python3 -c " + local state_info=$(python3 -c " import json, os +from datetime import datetime, timezone, timedelta path = '$STATE_FILE' if not os.path.exists(path): - print('not_started') + print('not_started|') else: with open(path) as f: d = json.load(f) - print(d.get('$name', {}).get('state', 'not_started')) -" 2>/dev/null || echo "not_started") + entry = d.get('$name', {}) + state = entry.get('state', 'not_started') + updated = entry.get('updated', '') + stale = '' + try: + t = datetime.fromisoformat(updated.replace('Z', '+00:00')) + age = datetime.now(timezone.utc) - t + if state in ('complete', 'failed') and age > timedelta(hours=24): + stale = 'stale' + elif state == 'running' and age > timedelta(hours=6): + stale = 'stale' + except Exception: + pass + print(state + '|' + stale) +" 2>/dev/null || echo "not_started|") + + local state="${state_info%%|*}" + local stale="${state_info##*|}" + local suffix="" + [[ -n "$stale" ]] && suffix=" (stale)" local color=$NC case "$state" in @@ -333,7 +414,7 @@ else: complete) color=$GREEN ;; failed) color=$RED ;; esac - printf " %-25s %b%s%b (max: %s tokens, dep: %s)\n" "$name" "$color" "$state" "$NC" "$max_tokens" "$dep" + printf " %-25s %b%s%s%b (max: %s tokens, dep: %s)\n" "$name" "$color" "$state" "$suffix" "$NC" "$max_tokens" "$dep" done echo "────────────────────────────────────────────────────" echo " Budget: $budget / $TOKEN_DAILY_LIMIT tokens remaining"