Merge pull request 'fix: pipeline_state.json daily reset' (#805) from fix/650-pipeline-daily-reset-v2 into main
This commit is contained in:
@@ -169,10 +169,16 @@ else:
|
||||
|
||||
|
||||
reset_stale_states() {
|
||||
python3 -c "
|
||||
# Delegate to standalone script (fixes #650: inline Python $STATE_FILE escaping bug)
|
||||
local script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
if [[ -f "$script_dir/reset_pipeline_state.py" ]]; then
|
||||
python3 "$script_dir/reset_pipeline_state.py" --state-file "$STATE_FILE"
|
||||
else
|
||||
# Fallback: inline reset with proper variable expansion
|
||||
python3 -c "
|
||||
import json, os
|
||||
from datetime import datetime, timezone, timedelta
|
||||
path = '\$STATE_FILE'
|
||||
path = '$STATE_FILE'
|
||||
if not os.path.exists(path):
|
||||
exit(0)
|
||||
with open(path) as f:
|
||||
@@ -200,6 +206,7 @@ if stale:
|
||||
json.dump(d, f, indent=2)
|
||||
print('Reset stale pipelines: ' + ', '.join(stale))
|
||||
" 2>/dev/null
|
||||
fi
|
||||
}
|
||||
|
||||
check_dependency() {
|
||||
|
||||
191
scripts/reset_pipeline_state.py
Normal file
191
scripts/reset_pipeline_state.py
Normal file
@@ -0,0 +1,191 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
reset_pipeline_state.py — Daily reset for pipeline_state.json
|
||||
|
||||
Cleans stale pipeline entries so yesterday's "complete" doesn't block
|
||||
today's runs. Called by cron at midnight or by the scheduler before
|
||||
each cycle.
|
||||
|
||||
Usage:
|
||||
python3 scripts/reset_pipeline_state.py
|
||||
python3 scripts/reset_pipeline_state.py --state-file /path/to/pipeline_state.json
|
||||
python3 scripts/reset_pipeline_state.py --dry-run
|
||||
python3 scripts/reset_pipeline_state.py --max-age-hours 12
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from typing import Dict, Any, List, Tuple
|
||||
|
||||
|
||||
# Default age thresholds
|
||||
DEFAULT_COMPLETE_MAX_AGE_HOURS = 24
|
||||
DEFAULT_FAILED_MAX_AGE_HOURS = 24
|
||||
DEFAULT_RUNNING_MAX_AGE_HOURS = 6
|
||||
DEFAULT_STATE_FILE = os.path.expanduser("~/.hermes/pipeline_state.json")
|
||||
|
||||
|
||||
def parse_timestamp(ts: str) -> datetime:
|
||||
"""Parse ISO timestamp, handling Z suffix and missing tzinfo."""
|
||||
ts = ts.replace("Z", "+00:00")
|
||||
try:
|
||||
return datetime.fromisoformat(ts)
|
||||
except ValueError:
|
||||
# Fallback for non-standard formats
|
||||
for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
|
||||
try:
|
||||
return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
raise ValueError(f"Cannot parse timestamp: {ts}")
|
||||
|
||||
|
||||
def classify_stale(
|
||||
entry: Dict[str, Any],
|
||||
now: datetime,
|
||||
complete_max_age: timedelta,
|
||||
failed_max_age: timedelta,
|
||||
running_max_age: timedelta,
|
||||
) -> Tuple[bool, str]:
|
||||
"""Check if a pipeline entry is stale. Returns (is_stale, reason)."""
|
||||
state = entry.get("state", "not_started")
|
||||
updated_str = entry.get("updated", "")
|
||||
|
||||
if not updated_str:
|
||||
return True, "no_timestamp"
|
||||
|
||||
try:
|
||||
updated = parse_timestamp(updated_str)
|
||||
except (ValueError, Exception):
|
||||
return True, "invalid_timestamp"
|
||||
|
||||
age = now - updated
|
||||
|
||||
if state == "complete" and age > complete_max_age:
|
||||
return True, f"complete_{int(age.total_seconds() / 3600)}h_ago"
|
||||
elif state == "failed" and age > failed_max_age:
|
||||
return True, f"failed_{int(age.total_seconds() / 3600)}h_ago"
|
||||
elif state == "running" and age > running_max_age:
|
||||
return True, f"running_stuck_{int(age.total_seconds() / 3600)}h_ago"
|
||||
|
||||
return False, ""
|
||||
|
||||
|
||||
def reset_pipeline_state(
|
||||
state_file: str,
|
||||
complete_max_age_hours: int = DEFAULT_COMPLETE_MAX_AGE_HOURS,
|
||||
failed_max_age_hours: int = DEFAULT_FAILED_MAX_AGE_HOURS,
|
||||
running_max_age_hours: int = DEFAULT_RUNNING_MAX_AGE_HOURS,
|
||||
dry_run: bool = False,
|
||||
) -> Tuple[Dict[str, Any], List[str]]:
|
||||
"""Reset stale pipeline entries.
|
||||
|
||||
Returns (cleaned_state_dict, list_of_removed_names).
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
complete_max_age = timedelta(hours=complete_max_age_hours)
|
||||
failed_max_age = timedelta(hours=failed_max_age_hours)
|
||||
running_max_age = timedelta(hours=running_max_age_hours)
|
||||
|
||||
# Load state
|
||||
if not os.path.exists(state_file):
|
||||
return {}, []
|
||||
|
||||
with open(state_file) as f:
|
||||
state = json.load(f)
|
||||
|
||||
if not isinstance(state, dict):
|
||||
# Corrupted file — reset entirely
|
||||
if not dry_run:
|
||||
with open(state_file, "w") as f:
|
||||
json.dump({}, f)
|
||||
return {}, list(state.keys()) if isinstance(state, dict) else ["corrupted"]
|
||||
|
||||
# Classify entries
|
||||
to_remove = []
|
||||
for name, entry in list(state.items()):
|
||||
if not isinstance(entry, dict):
|
||||
to_remove.append((name, "not_dict"))
|
||||
continue
|
||||
if "state" not in entry:
|
||||
to_remove.append((name, "no_state_field"))
|
||||
continue
|
||||
|
||||
stale, reason = classify_stale(
|
||||
entry, now, complete_max_age, failed_max_age, running_max_age
|
||||
)
|
||||
if stale:
|
||||
to_remove.append((name, reason))
|
||||
|
||||
# Remove stale entries
|
||||
removed_names = []
|
||||
for name, reason in to_remove:
|
||||
del state[name]
|
||||
removed_names.append(f"{name}({reason})")
|
||||
|
||||
# Write back
|
||||
if removed_names and not dry_run:
|
||||
os.makedirs(os.path.dirname(state_file) or ".", exist_ok=True)
|
||||
with open(state_file, "w") as f:
|
||||
json.dump(state, f, indent=2)
|
||||
|
||||
return state, removed_names
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Reset stale pipeline state entries")
|
||||
parser.add_argument("--state-file", default=DEFAULT_STATE_FILE,
|
||||
help="Path to pipeline_state.json")
|
||||
parser.add_argument("--complete-max-age-hours", type=int, default=DEFAULT_COMPLETE_MAX_AGE_HOURS,
|
||||
help="Max age in hours for complete entries (default: 24)")
|
||||
parser.add_argument("--failed-max-age-hours", type=int, default=DEFAULT_FAILED_MAX_AGE_HOURS,
|
||||
help="Max age in hours for failed entries (default: 24)")
|
||||
parser.add_argument("--running-max-age-hours", type=int, default=DEFAULT_RUNNING_MAX_AGE_HOURS,
|
||||
help="Max age in hours for stuck running entries (default: 6)")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Show what would be removed without modifying")
|
||||
parser.add_argument("--json", action="store_true",
|
||||
help="Output as JSON")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
state, removed = reset_pipeline_state(
|
||||
state_file=args.state_file,
|
||||
complete_max_age_hours=args.complete_max_age_hours,
|
||||
failed_max_age_hours=args.failed_max_age_hours,
|
||||
running_max_age_hours=args.running_max_age_hours,
|
||||
dry_run=args.dry_run,
|
||||
)
|
||||
|
||||
if args.json:
|
||||
result = {
|
||||
"state_file": args.state_file,
|
||||
"removed": removed,
|
||||
"remaining": list(state.keys()),
|
||||
"dry_run": args.dry_run,
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
prefix = "[DRY RUN] " if args.dry_run else ""
|
||||
if removed:
|
||||
print(f"{prefix}Reset {len(removed)} stale pipeline(s): {', '.join(removed)}")
|
||||
else:
|
||||
print(f"{prefix}No stale pipelines found. {len(state)} active.")
|
||||
if state:
|
||||
for name, entry in state.items():
|
||||
age_h = ""
|
||||
try:
|
||||
updated = parse_timestamp(entry["updated"])
|
||||
age = datetime.now(timezone.utc) - updated
|
||||
age_h = f" ({int(age.total_seconds() / 3600)}h ago)"
|
||||
except Exception:
|
||||
pass
|
||||
print(f" {name}: {entry['state']}{age_h}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
190
scripts/test_reset_pipeline_state.py
Normal file
190
scripts/test_reset_pipeline_state.py
Normal file
@@ -0,0 +1,190 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for scripts/reset_pipeline_state.py — 10 tests."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import datetime, timezone, timedelta
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
from reset_pipeline_state import reset_pipeline_state, classify_stale, parse_timestamp
|
||||
|
||||
|
||||
def test_no_state_file():
|
||||
"""Reset on missing file returns empty."""
|
||||
state, removed = reset_pipeline_state("/nonexistent/pipeline_state.json")
|
||||
assert state == {}
|
||||
assert removed == []
|
||||
print("PASS: test_no_state_file")
|
||||
|
||||
|
||||
def test_empty_state():
|
||||
"""Empty JSON object is untouched."""
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert state == {}
|
||||
assert removed == []
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_empty_state")
|
||||
|
||||
|
||||
def test_fresh_complete_kept():
|
||||
"""Recent complete entry is kept."""
|
||||
now = datetime.now(timezone.utc)
|
||||
entry = {"state": "complete", "updated": now.isoformat()}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({"my-pipeline": entry}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "my-pipeline" in state
|
||||
assert removed == []
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_fresh_complete_kept")
|
||||
|
||||
|
||||
def test_old_complete_removed():
|
||||
"""Complete entry older than 24h is removed."""
|
||||
old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
|
||||
entry = {"state": "complete", "updated": old}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({"old-pipeline": entry}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "old-pipeline" not in state
|
||||
assert len(removed) == 1
|
||||
assert "old-pipeline" in removed[0]
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_old_complete_removed")
|
||||
|
||||
|
||||
def test_stuck_running_removed():
|
||||
"""Running entry older than 6h is treated as stuck and removed."""
|
||||
old = (datetime.now(timezone.utc) - timedelta(hours=10)).isoformat()
|
||||
entry = {"state": "running", "updated": old}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({"stuck-pipeline": entry}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "stuck-pipeline" not in state
|
||||
assert len(removed) == 1
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_stuck_running_removed")
|
||||
|
||||
|
||||
def test_old_failed_removed():
|
||||
"""Failed entry older than 24h is removed."""
|
||||
old = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
|
||||
entry = {"state": "failed", "updated": old}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({"failed-pipeline": entry}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "failed-pipeline" not in state
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_old_failed_removed")
|
||||
|
||||
|
||||
def test_running_kept_if_fresh():
|
||||
"""Fresh running entry is kept."""
|
||||
now = datetime.now(timezone.utc)
|
||||
entry = {"state": "running", "updated": now.isoformat()}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({"active-pipeline": entry}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "active-pipeline" in state
|
||||
assert removed == []
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_running_kept_if_fresh")
|
||||
|
||||
|
||||
def test_dry_run_does_not_modify():
|
||||
"""Dry run reports removals but doesn't change the file."""
|
||||
old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
|
||||
content = json.dumps({"old-pipeline": {"state": "complete", "updated": old}})
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
f.write(content)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path, dry_run=True)
|
||||
assert "old-pipeline" not in state
|
||||
assert len(removed) == 1
|
||||
# File should be unchanged
|
||||
with open(path) as f:
|
||||
file_state = json.load(f)
|
||||
assert "old-pipeline" in file_state
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_dry_run_does_not_modify")
|
||||
|
||||
|
||||
def test_mixed_entries():
|
||||
"""Mix of fresh and stale entries — only stale removed."""
|
||||
now = datetime.now(timezone.utc)
|
||||
old = (now - timedelta(hours=30)).isoformat()
|
||||
state_data = {
|
||||
"fresh-complete": {"state": "complete", "updated": now.isoformat()},
|
||||
"stale-complete": {"state": "complete", "updated": old},
|
||||
"fresh-running": {"state": "running", "updated": now.isoformat()},
|
||||
"stuck-running": {"state": "running", "updated": (now - timedelta(hours=10)).isoformat()},
|
||||
}
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(state_data, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "fresh-complete" in state
|
||||
assert "fresh-running" in state
|
||||
assert "stale-complete" not in state
|
||||
assert "stuck-running" not in state
|
||||
assert len(removed) == 2
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_mixed_entries")
|
||||
|
||||
|
||||
def test_corrupted_entry_removed():
|
||||
"""Non-dict entries are removed."""
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump({"broken": "not_a_dict", "also-bad": 42}, f)
|
||||
path = f.name
|
||||
try:
|
||||
state, removed = reset_pipeline_state(path)
|
||||
assert "broken" not in state
|
||||
assert "also-bad" not in state
|
||||
finally:
|
||||
os.unlink(path)
|
||||
print("PASS: test_corrupted_entry_removed")
|
||||
|
||||
|
||||
def run_all():
|
||||
test_no_state_file()
|
||||
test_empty_state()
|
||||
test_fresh_complete_kept()
|
||||
test_old_complete_removed()
|
||||
test_stuck_running_removed()
|
||||
test_old_failed_removed()
|
||||
test_running_kept_if_fresh()
|
||||
test_dry_run_does_not_modify()
|
||||
test_mixed_entries()
|
||||
test_corrupted_entry_removed()
|
||||
print("\nAll 10 tests passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all()
|
||||
Reference in New Issue
Block a user