Compare commits
5 Commits
step35/595
...
fix/650-pi
| Author | SHA1 | Date | |
|---|---|---|---|
| c327521112 | |||
| a50b236c8c | |||
| 3bdc62d8b4 | |||
| df0bef485e | |||
| 097ff176c8 |
5
cron/pipeline-daily-reset.yml
Normal file
5
cron/pipeline-daily-reset.yml
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
- name: Pipeline State Daily Reset
|
||||||
|
schedule: '0 0 * * *' # Midnight daily
|
||||||
|
tasks:
|
||||||
|
- name: Reset stale pipeline states
|
||||||
|
shell: "python3 scripts/reset_pipeline_state.py --state-file ~/.hermes/pipeline_state.json"
|
||||||
@@ -66,6 +66,7 @@ def load_state():
|
|||||||
"last_run": None,
|
"last_run": None,
|
||||||
"daily_tokens_used": 0,
|
"daily_tokens_used": 0,
|
||||||
"budget_reset_date": None,
|
"budget_reset_date": None,
|
||||||
|
"pipeline_reset_date": None,
|
||||||
"pipelines": {},
|
"pipelines": {},
|
||||||
"active_sessions": [],
|
"active_sessions": [],
|
||||||
}
|
}
|
||||||
@@ -78,6 +79,31 @@ def save_state(state):
|
|||||||
json.dump(state, f, indent=2)
|
json.dump(state, f, indent=2)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def reset_completed_pipelines(state):
|
||||||
|
"""Reset pipeline statuses at midnight so they can run again the next day."""
|
||||||
|
today = datetime.now().strftime("%Y-%m-%d")
|
||||||
|
if state.get("pipeline_reset_date") == today:
|
||||||
|
return # Already reset today
|
||||||
|
|
||||||
|
pipelines = state.get("pipelines", {})
|
||||||
|
reset_count = 0
|
||||||
|
for name, info in pipelines.items():
|
||||||
|
if info.get("status") == "completed":
|
||||||
|
info["status"] = "not_started"
|
||||||
|
info.pop("pid", None)
|
||||||
|
reset_count += 1
|
||||||
|
elif info.get("status") == "failed":
|
||||||
|
info["status"] = "not_started"
|
||||||
|
info.pop("pid", None)
|
||||||
|
reset_count += 1
|
||||||
|
|
||||||
|
state["pipeline_reset_date"] = today
|
||||||
|
save_state(state)
|
||||||
|
if reset_count:
|
||||||
|
log(f"Daily reset: {reset_count} pipeline(s) set to not_started for {today}")
|
||||||
|
|
||||||
def check_provider(endpoint):
|
def check_provider(endpoint):
|
||||||
"""Check if an inference provider is available."""
|
"""Check if an inference provider is available."""
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -169,10 +169,16 @@ else:
|
|||||||
|
|
||||||
|
|
||||||
reset_stale_states() {
|
reset_stale_states() {
|
||||||
|
# Delegate to standalone script (fixes #650: inline Python $STATE_FILE escaping bug)
|
||||||
|
local script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||||
|
if [[ -f "$script_dir/reset_pipeline_state.py" ]]; then
|
||||||
|
python3 "$script_dir/reset_pipeline_state.py" --state-file "$STATE_FILE"
|
||||||
|
else
|
||||||
|
# Fallback: inline reset with proper variable expansion
|
||||||
python3 -c "
|
python3 -c "
|
||||||
import json, os
|
import json, os
|
||||||
from datetime import datetime, timezone, timedelta
|
from datetime import datetime, timezone, timedelta
|
||||||
path = '\$STATE_FILE'
|
path = '$STATE_FILE'
|
||||||
if not os.path.exists(path):
|
if not os.path.exists(path):
|
||||||
exit(0)
|
exit(0)
|
||||||
with open(path) as f:
|
with open(path) as f:
|
||||||
@@ -200,6 +206,7 @@ if stale:
|
|||||||
json.dump(d, f, indent=2)
|
json.dump(d, f, indent=2)
|
||||||
print('Reset stale pipelines: ' + ', '.join(stale))
|
print('Reset stale pipelines: ' + ', '.join(stale))
|
||||||
" 2>/dev/null
|
" 2>/dev/null
|
||||||
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
check_dependency() {
|
check_dependency() {
|
||||||
|
|||||||
191
scripts/reset_pipeline_state.py
Normal file
191
scripts/reset_pipeline_state.py
Normal file
@@ -0,0 +1,191 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
reset_pipeline_state.py — Daily reset for pipeline_state.json
|
||||||
|
|
||||||
|
Cleans stale pipeline entries so yesterday's "complete" doesn't block
|
||||||
|
today's runs. Called by cron at midnight or by the scheduler before
|
||||||
|
each cycle.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python3 scripts/reset_pipeline_state.py
|
||||||
|
python3 scripts/reset_pipeline_state.py --state-file /path/to/pipeline_state.json
|
||||||
|
python3 scripts/reset_pipeline_state.py --dry-run
|
||||||
|
python3 scripts/reset_pipeline_state.py --max-age-hours 12
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
from typing import Dict, Any, List, Tuple
|
||||||
|
|
||||||
|
|
||||||
|
# Default age thresholds
|
||||||
|
DEFAULT_COMPLETE_MAX_AGE_HOURS = 24
|
||||||
|
DEFAULT_FAILED_MAX_AGE_HOURS = 24
|
||||||
|
DEFAULT_RUNNING_MAX_AGE_HOURS = 6
|
||||||
|
DEFAULT_STATE_FILE = os.path.expanduser("~/.hermes/pipeline_state.json")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_timestamp(ts: str) -> datetime:
|
||||||
|
"""Parse ISO timestamp, handling Z suffix and missing tzinfo."""
|
||||||
|
ts = ts.replace("Z", "+00:00")
|
||||||
|
try:
|
||||||
|
return datetime.fromisoformat(ts)
|
||||||
|
except ValueError:
|
||||||
|
# Fallback for non-standard formats
|
||||||
|
for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
|
||||||
|
try:
|
||||||
|
return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc)
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
raise ValueError(f"Cannot parse timestamp: {ts}")
|
||||||
|
|
||||||
|
|
||||||
|
def classify_stale(
|
||||||
|
entry: Dict[str, Any],
|
||||||
|
now: datetime,
|
||||||
|
complete_max_age: timedelta,
|
||||||
|
failed_max_age: timedelta,
|
||||||
|
running_max_age: timedelta,
|
||||||
|
) -> Tuple[bool, str]:
|
||||||
|
"""Check if a pipeline entry is stale. Returns (is_stale, reason)."""
|
||||||
|
state = entry.get("state", "not_started")
|
||||||
|
updated_str = entry.get("updated", "")
|
||||||
|
|
||||||
|
if not updated_str:
|
||||||
|
return True, "no_timestamp"
|
||||||
|
|
||||||
|
try:
|
||||||
|
updated = parse_timestamp(updated_str)
|
||||||
|
except (ValueError, Exception):
|
||||||
|
return True, "invalid_timestamp"
|
||||||
|
|
||||||
|
age = now - updated
|
||||||
|
|
||||||
|
if state == "complete" and age > complete_max_age:
|
||||||
|
return True, f"complete_{int(age.total_seconds() / 3600)}h_ago"
|
||||||
|
elif state == "failed" and age > failed_max_age:
|
||||||
|
return True, f"failed_{int(age.total_seconds() / 3600)}h_ago"
|
||||||
|
elif state == "running" and age > running_max_age:
|
||||||
|
return True, f"running_stuck_{int(age.total_seconds() / 3600)}h_ago"
|
||||||
|
|
||||||
|
return False, ""
|
||||||
|
|
||||||
|
|
||||||
|
def reset_pipeline_state(
|
||||||
|
state_file: str,
|
||||||
|
complete_max_age_hours: int = DEFAULT_COMPLETE_MAX_AGE_HOURS,
|
||||||
|
failed_max_age_hours: int = DEFAULT_FAILED_MAX_AGE_HOURS,
|
||||||
|
running_max_age_hours: int = DEFAULT_RUNNING_MAX_AGE_HOURS,
|
||||||
|
dry_run: bool = False,
|
||||||
|
) -> Tuple[Dict[str, Any], List[str]]:
|
||||||
|
"""Reset stale pipeline entries.
|
||||||
|
|
||||||
|
Returns (cleaned_state_dict, list_of_removed_names).
|
||||||
|
"""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
complete_max_age = timedelta(hours=complete_max_age_hours)
|
||||||
|
failed_max_age = timedelta(hours=failed_max_age_hours)
|
||||||
|
running_max_age = timedelta(hours=running_max_age_hours)
|
||||||
|
|
||||||
|
# Load state
|
||||||
|
if not os.path.exists(state_file):
|
||||||
|
return {}, []
|
||||||
|
|
||||||
|
with open(state_file) as f:
|
||||||
|
state = json.load(f)
|
||||||
|
|
||||||
|
if not isinstance(state, dict):
|
||||||
|
# Corrupted file — reset entirely
|
||||||
|
if not dry_run:
|
||||||
|
with open(state_file, "w") as f:
|
||||||
|
json.dump({}, f)
|
||||||
|
return {}, list(state.keys()) if isinstance(state, dict) else ["corrupted"]
|
||||||
|
|
||||||
|
# Classify entries
|
||||||
|
to_remove = []
|
||||||
|
for name, entry in list(state.items()):
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
to_remove.append((name, "not_dict"))
|
||||||
|
continue
|
||||||
|
if "state" not in entry:
|
||||||
|
to_remove.append((name, "no_state_field"))
|
||||||
|
continue
|
||||||
|
|
||||||
|
stale, reason = classify_stale(
|
||||||
|
entry, now, complete_max_age, failed_max_age, running_max_age
|
||||||
|
)
|
||||||
|
if stale:
|
||||||
|
to_remove.append((name, reason))
|
||||||
|
|
||||||
|
# Remove stale entries
|
||||||
|
removed_names = []
|
||||||
|
for name, reason in to_remove:
|
||||||
|
del state[name]
|
||||||
|
removed_names.append(f"{name}({reason})")
|
||||||
|
|
||||||
|
# Write back
|
||||||
|
if removed_names and not dry_run:
|
||||||
|
os.makedirs(os.path.dirname(state_file) or ".", exist_ok=True)
|
||||||
|
with open(state_file, "w") as f:
|
||||||
|
json.dump(state, f, indent=2)
|
||||||
|
|
||||||
|
return state, removed_names
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(description="Reset stale pipeline state entries")
|
||||||
|
parser.add_argument("--state-file", default=DEFAULT_STATE_FILE,
|
||||||
|
help="Path to pipeline_state.json")
|
||||||
|
parser.add_argument("--complete-max-age-hours", type=int, default=DEFAULT_COMPLETE_MAX_AGE_HOURS,
|
||||||
|
help="Max age in hours for complete entries (default: 24)")
|
||||||
|
parser.add_argument("--failed-max-age-hours", type=int, default=DEFAULT_FAILED_MAX_AGE_HOURS,
|
||||||
|
help="Max age in hours for failed entries (default: 24)")
|
||||||
|
parser.add_argument("--running-max-age-hours", type=int, default=DEFAULT_RUNNING_MAX_AGE_HOURS,
|
||||||
|
help="Max age in hours for stuck running entries (default: 6)")
|
||||||
|
parser.add_argument("--dry-run", action="store_true",
|
||||||
|
help="Show what would be removed without modifying")
|
||||||
|
parser.add_argument("--json", action="store_true",
|
||||||
|
help="Output as JSON")
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
state, removed = reset_pipeline_state(
|
||||||
|
state_file=args.state_file,
|
||||||
|
complete_max_age_hours=args.complete_max_age_hours,
|
||||||
|
failed_max_age_hours=args.failed_max_age_hours,
|
||||||
|
running_max_age_hours=args.running_max_age_hours,
|
||||||
|
dry_run=args.dry_run,
|
||||||
|
)
|
||||||
|
|
||||||
|
if args.json:
|
||||||
|
result = {
|
||||||
|
"state_file": args.state_file,
|
||||||
|
"removed": removed,
|
||||||
|
"remaining": list(state.keys()),
|
||||||
|
"dry_run": args.dry_run,
|
||||||
|
}
|
||||||
|
print(json.dumps(result, indent=2))
|
||||||
|
else:
|
||||||
|
prefix = "[DRY RUN] " if args.dry_run else ""
|
||||||
|
if removed:
|
||||||
|
print(f"{prefix}Reset {len(removed)} stale pipeline(s): {', '.join(removed)}")
|
||||||
|
else:
|
||||||
|
print(f"{prefix}No stale pipelines found. {len(state)} active.")
|
||||||
|
if state:
|
||||||
|
for name, entry in state.items():
|
||||||
|
age_h = ""
|
||||||
|
try:
|
||||||
|
updated = parse_timestamp(entry["updated"])
|
||||||
|
age = datetime.now(timezone.utc) - updated
|
||||||
|
age_h = f" ({int(age.total_seconds() / 3600)}h ago)"
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
print(f" {name}: {entry['state']}{age_h}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
190
scripts/test_reset_pipeline_state.py
Normal file
190
scripts/test_reset_pipeline_state.py
Normal file
@@ -0,0 +1,190 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Tests for scripts/reset_pipeline_state.py — 10 tests."""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
from datetime import datetime, timezone, timedelta
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||||
|
from reset_pipeline_state import reset_pipeline_state, classify_stale, parse_timestamp
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_state_file():
|
||||||
|
"""Reset on missing file returns empty."""
|
||||||
|
state, removed = reset_pipeline_state("/nonexistent/pipeline_state.json")
|
||||||
|
assert state == {}
|
||||||
|
assert removed == []
|
||||||
|
print("PASS: test_no_state_file")
|
||||||
|
|
||||||
|
|
||||||
|
def test_empty_state():
|
||||||
|
"""Empty JSON object is untouched."""
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert state == {}
|
||||||
|
assert removed == []
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_empty_state")
|
||||||
|
|
||||||
|
|
||||||
|
def test_fresh_complete_kept():
|
||||||
|
"""Recent complete entry is kept."""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
entry = {"state": "complete", "updated": now.isoformat()}
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({"my-pipeline": entry}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "my-pipeline" in state
|
||||||
|
assert removed == []
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_fresh_complete_kept")
|
||||||
|
|
||||||
|
|
||||||
|
def test_old_complete_removed():
|
||||||
|
"""Complete entry older than 24h is removed."""
|
||||||
|
old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
|
||||||
|
entry = {"state": "complete", "updated": old}
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({"old-pipeline": entry}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "old-pipeline" not in state
|
||||||
|
assert len(removed) == 1
|
||||||
|
assert "old-pipeline" in removed[0]
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_old_complete_removed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_stuck_running_removed():
|
||||||
|
"""Running entry older than 6h is treated as stuck and removed."""
|
||||||
|
old = (datetime.now(timezone.utc) - timedelta(hours=10)).isoformat()
|
||||||
|
entry = {"state": "running", "updated": old}
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({"stuck-pipeline": entry}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "stuck-pipeline" not in state
|
||||||
|
assert len(removed) == 1
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_stuck_running_removed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_old_failed_removed():
|
||||||
|
"""Failed entry older than 24h is removed."""
|
||||||
|
old = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
|
||||||
|
entry = {"state": "failed", "updated": old}
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({"failed-pipeline": entry}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "failed-pipeline" not in state
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_old_failed_removed")
|
||||||
|
|
||||||
|
|
||||||
|
def test_running_kept_if_fresh():
|
||||||
|
"""Fresh running entry is kept."""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
entry = {"state": "running", "updated": now.isoformat()}
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({"active-pipeline": entry}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "active-pipeline" in state
|
||||||
|
assert removed == []
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_running_kept_if_fresh")
|
||||||
|
|
||||||
|
|
||||||
|
def test_dry_run_does_not_modify():
|
||||||
|
"""Dry run reports removals but doesn't change the file."""
|
||||||
|
old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
|
||||||
|
content = json.dumps({"old-pipeline": {"state": "complete", "updated": old}})
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
f.write(content)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path, dry_run=True)
|
||||||
|
assert "old-pipeline" not in state
|
||||||
|
assert len(removed) == 1
|
||||||
|
# File should be unchanged
|
||||||
|
with open(path) as f:
|
||||||
|
file_state = json.load(f)
|
||||||
|
assert "old-pipeline" in file_state
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_dry_run_does_not_modify")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mixed_entries():
|
||||||
|
"""Mix of fresh and stale entries — only stale removed."""
|
||||||
|
now = datetime.now(timezone.utc)
|
||||||
|
old = (now - timedelta(hours=30)).isoformat()
|
||||||
|
state_data = {
|
||||||
|
"fresh-complete": {"state": "complete", "updated": now.isoformat()},
|
||||||
|
"stale-complete": {"state": "complete", "updated": old},
|
||||||
|
"fresh-running": {"state": "running", "updated": now.isoformat()},
|
||||||
|
"stuck-running": {"state": "running", "updated": (now - timedelta(hours=10)).isoformat()},
|
||||||
|
}
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump(state_data, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "fresh-complete" in state
|
||||||
|
assert "fresh-running" in state
|
||||||
|
assert "stale-complete" not in state
|
||||||
|
assert "stuck-running" not in state
|
||||||
|
assert len(removed) == 2
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_mixed_entries")
|
||||||
|
|
||||||
|
|
||||||
|
def test_corrupted_entry_removed():
|
||||||
|
"""Non-dict entries are removed."""
|
||||||
|
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||||
|
json.dump({"broken": "not_a_dict", "also-bad": 42}, f)
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
state, removed = reset_pipeline_state(path)
|
||||||
|
assert "broken" not in state
|
||||||
|
assert "also-bad" not in state
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
print("PASS: test_corrupted_entry_removed")
|
||||||
|
|
||||||
|
|
||||||
|
def run_all():
|
||||||
|
test_no_state_file()
|
||||||
|
test_empty_state()
|
||||||
|
test_fresh_complete_kept()
|
||||||
|
test_old_complete_removed()
|
||||||
|
test_stuck_running_removed()
|
||||||
|
test_old_failed_removed()
|
||||||
|
test_running_kept_if_fresh()
|
||||||
|
test_dry_run_does_not_modify()
|
||||||
|
test_mixed_entries()
|
||||||
|
test_corrupted_entry_removed()
|
||||||
|
print("\nAll 10 tests passed!")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
run_all()
|
||||||
Reference in New Issue
Block a user