Compare commits

...

5 Commits

Author SHA1 Message Date
c327521112 feat: Add pipeline daily reset cron (closes #650) 2026-04-17 05:22:58 +00:00
a50b236c8c fix: reset_stale_states delegates to standalone script (closes #650) 2026-04-17 05:22:03 +00:00
3bdc62d8b4 feat: Add pipeline reset tests (closes #650) 2026-04-17 05:19:33 +00:00
df0bef485e feat: Add pipeline state daily reset (closes #650) 2026-04-17 05:17:26 +00:00
097ff176c8 fix(#650): reset pipeline states daily at midnight
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 6s
PR Checklist / pr-checklist (pull_request) Failing after 1m10s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 4s
Validate Config / JSON Validate (pull_request) Successful in 4s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 12s
Validate Config / Shell Script Lint (pull_request) Failing after 13s
Validate Config / Cron Syntax Check (pull_request) Successful in 3s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 3s
Validate Config / Playbook Schema Validation (pull_request) Successful in 6s
Architecture Lint / Lint Repository (pull_request) Has been cancelled
Validate Config / Python Test Suite (pull_request) Has been cancelled
Problem: Once a pipeline completes (Monday), its 'completed' status
persists forever. Tuesday's run sees it as 'completed' and skips it.

Fix: Added reset_completed_pipelines() that:
- Tracks pipeline_reset_date in state
- On new day, resets 'completed' and 'failed' pipelines to 'not_started'
- Preserves 'running' pipelines (don't kill active work)
- Called at start of every run_scheduler() invocation

Also added pipeline_reset_date to default state dict.
2026-04-17 05:15:25 +00:00
5 changed files with 421 additions and 2 deletions

View File

@@ -0,0 +1,5 @@
- name: Pipeline State Daily Reset
schedule: '0 0 * * *' # Midnight daily
tasks:
- name: Reset stale pipeline states
shell: "python3 scripts/reset_pipeline_state.py --state-file ~/.hermes/pipeline_state.json"

View File

@@ -66,6 +66,7 @@ def load_state():
"last_run": None, "last_run": None,
"daily_tokens_used": 0, "daily_tokens_used": 0,
"budget_reset_date": None, "budget_reset_date": None,
"pipeline_reset_date": None,
"pipelines": {}, "pipelines": {},
"active_sessions": [], "active_sessions": [],
} }
@@ -78,6 +79,31 @@ def save_state(state):
json.dump(state, f, indent=2) json.dump(state, f, indent=2)
def reset_completed_pipelines(state):
"""Reset pipeline statuses at midnight so they can run again the next day."""
today = datetime.now().strftime("%Y-%m-%d")
if state.get("pipeline_reset_date") == today:
return # Already reset today
pipelines = state.get("pipelines", {})
reset_count = 0
for name, info in pipelines.items():
if info.get("status") == "completed":
info["status"] = "not_started"
info.pop("pid", None)
reset_count += 1
elif info.get("status") == "failed":
info["status"] = "not_started"
info.pop("pid", None)
reset_count += 1
state["pipeline_reset_date"] = today
save_state(state)
if reset_count:
log(f"Daily reset: {reset_count} pipeline(s) set to not_started for {today}")
def check_provider(endpoint): def check_provider(endpoint):
"""Check if an inference provider is available.""" """Check if an inference provider is available."""
try: try:

View File

@@ -169,10 +169,16 @@ else:
reset_stale_states() { reset_stale_states() {
# Delegate to standalone script (fixes #650: inline Python $STATE_FILE escaping bug)
local script_dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
if [[ -f "$script_dir/reset_pipeline_state.py" ]]; then
python3 "$script_dir/reset_pipeline_state.py" --state-file "$STATE_FILE"
else
# Fallback: inline reset with proper variable expansion
python3 -c " python3 -c "
import json, os import json, os
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
path = '\$STATE_FILE' path = '$STATE_FILE'
if not os.path.exists(path): if not os.path.exists(path):
exit(0) exit(0)
with open(path) as f: with open(path) as f:
@@ -200,6 +206,7 @@ if stale:
json.dump(d, f, indent=2) json.dump(d, f, indent=2)
print('Reset stale pipelines: ' + ', '.join(stale)) print('Reset stale pipelines: ' + ', '.join(stale))
" 2>/dev/null " 2>/dev/null
fi
} }
check_dependency() { check_dependency() {

View File

@@ -0,0 +1,191 @@
#!/usr/bin/env python3
"""
reset_pipeline_state.py — Daily reset for pipeline_state.json
Cleans stale pipeline entries so yesterday's "complete" doesn't block
today's runs. Called by cron at midnight or by the scheduler before
each cycle.
Usage:
python3 scripts/reset_pipeline_state.py
python3 scripts/reset_pipeline_state.py --state-file /path/to/pipeline_state.json
python3 scripts/reset_pipeline_state.py --dry-run
python3 scripts/reset_pipeline_state.py --max-age-hours 12
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone, timedelta
from typing import Dict, Any, List, Tuple
# Default age thresholds
DEFAULT_COMPLETE_MAX_AGE_HOURS = 24
DEFAULT_FAILED_MAX_AGE_HOURS = 24
DEFAULT_RUNNING_MAX_AGE_HOURS = 6
DEFAULT_STATE_FILE = os.path.expanduser("~/.hermes/pipeline_state.json")
def parse_timestamp(ts: str) -> datetime:
"""Parse ISO timestamp, handling Z suffix and missing tzinfo."""
ts = ts.replace("Z", "+00:00")
try:
return datetime.fromisoformat(ts)
except ValueError:
# Fallback for non-standard formats
for fmt in ("%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
return datetime.strptime(ts, fmt).replace(tzinfo=timezone.utc)
except ValueError:
continue
raise ValueError(f"Cannot parse timestamp: {ts}")
def classify_stale(
entry: Dict[str, Any],
now: datetime,
complete_max_age: timedelta,
failed_max_age: timedelta,
running_max_age: timedelta,
) -> Tuple[bool, str]:
"""Check if a pipeline entry is stale. Returns (is_stale, reason)."""
state = entry.get("state", "not_started")
updated_str = entry.get("updated", "")
if not updated_str:
return True, "no_timestamp"
try:
updated = parse_timestamp(updated_str)
except (ValueError, Exception):
return True, "invalid_timestamp"
age = now - updated
if state == "complete" and age > complete_max_age:
return True, f"complete_{int(age.total_seconds() / 3600)}h_ago"
elif state == "failed" and age > failed_max_age:
return True, f"failed_{int(age.total_seconds() / 3600)}h_ago"
elif state == "running" and age > running_max_age:
return True, f"running_stuck_{int(age.total_seconds() / 3600)}h_ago"
return False, ""
def reset_pipeline_state(
state_file: str,
complete_max_age_hours: int = DEFAULT_COMPLETE_MAX_AGE_HOURS,
failed_max_age_hours: int = DEFAULT_FAILED_MAX_AGE_HOURS,
running_max_age_hours: int = DEFAULT_RUNNING_MAX_AGE_HOURS,
dry_run: bool = False,
) -> Tuple[Dict[str, Any], List[str]]:
"""Reset stale pipeline entries.
Returns (cleaned_state_dict, list_of_removed_names).
"""
now = datetime.now(timezone.utc)
complete_max_age = timedelta(hours=complete_max_age_hours)
failed_max_age = timedelta(hours=failed_max_age_hours)
running_max_age = timedelta(hours=running_max_age_hours)
# Load state
if not os.path.exists(state_file):
return {}, []
with open(state_file) as f:
state = json.load(f)
if not isinstance(state, dict):
# Corrupted file — reset entirely
if not dry_run:
with open(state_file, "w") as f:
json.dump({}, f)
return {}, list(state.keys()) if isinstance(state, dict) else ["corrupted"]
# Classify entries
to_remove = []
for name, entry in list(state.items()):
if not isinstance(entry, dict):
to_remove.append((name, "not_dict"))
continue
if "state" not in entry:
to_remove.append((name, "no_state_field"))
continue
stale, reason = classify_stale(
entry, now, complete_max_age, failed_max_age, running_max_age
)
if stale:
to_remove.append((name, reason))
# Remove stale entries
removed_names = []
for name, reason in to_remove:
del state[name]
removed_names.append(f"{name}({reason})")
# Write back
if removed_names and not dry_run:
os.makedirs(os.path.dirname(state_file) or ".", exist_ok=True)
with open(state_file, "w") as f:
json.dump(state, f, indent=2)
return state, removed_names
def main():
parser = argparse.ArgumentParser(description="Reset stale pipeline state entries")
parser.add_argument("--state-file", default=DEFAULT_STATE_FILE,
help="Path to pipeline_state.json")
parser.add_argument("--complete-max-age-hours", type=int, default=DEFAULT_COMPLETE_MAX_AGE_HOURS,
help="Max age in hours for complete entries (default: 24)")
parser.add_argument("--failed-max-age-hours", type=int, default=DEFAULT_FAILED_MAX_AGE_HOURS,
help="Max age in hours for failed entries (default: 24)")
parser.add_argument("--running-max-age-hours", type=int, default=DEFAULT_RUNNING_MAX_AGE_HOURS,
help="Max age in hours for stuck running entries (default: 6)")
parser.add_argument("--dry-run", action="store_true",
help="Show what would be removed without modifying")
parser.add_argument("--json", action="store_true",
help="Output as JSON")
args = parser.parse_args()
state, removed = reset_pipeline_state(
state_file=args.state_file,
complete_max_age_hours=args.complete_max_age_hours,
failed_max_age_hours=args.failed_max_age_hours,
running_max_age_hours=args.running_max_age_hours,
dry_run=args.dry_run,
)
if args.json:
result = {
"state_file": args.state_file,
"removed": removed,
"remaining": list(state.keys()),
"dry_run": args.dry_run,
}
print(json.dumps(result, indent=2))
else:
prefix = "[DRY RUN] " if args.dry_run else ""
if removed:
print(f"{prefix}Reset {len(removed)} stale pipeline(s): {', '.join(removed)}")
else:
print(f"{prefix}No stale pipelines found. {len(state)} active.")
if state:
for name, entry in state.items():
age_h = ""
try:
updated = parse_timestamp(entry["updated"])
age = datetime.now(timezone.utc) - updated
age_h = f" ({int(age.total_seconds() / 3600)}h ago)"
except Exception:
pass
print(f" {name}: {entry['state']}{age_h}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""Tests for scripts/reset_pipeline_state.py — 10 tests."""
import json
import os
import sys
import tempfile
from datetime import datetime, timezone, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from reset_pipeline_state import reset_pipeline_state, classify_stale, parse_timestamp
def test_no_state_file():
"""Reset on missing file returns empty."""
state, removed = reset_pipeline_state("/nonexistent/pipeline_state.json")
assert state == {}
assert removed == []
print("PASS: test_no_state_file")
def test_empty_state():
"""Empty JSON object is untouched."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert state == {}
assert removed == []
finally:
os.unlink(path)
print("PASS: test_empty_state")
def test_fresh_complete_kept():
"""Recent complete entry is kept."""
now = datetime.now(timezone.utc)
entry = {"state": "complete", "updated": now.isoformat()}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"my-pipeline": entry}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "my-pipeline" in state
assert removed == []
finally:
os.unlink(path)
print("PASS: test_fresh_complete_kept")
def test_old_complete_removed():
"""Complete entry older than 24h is removed."""
old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
entry = {"state": "complete", "updated": old}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"old-pipeline": entry}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "old-pipeline" not in state
assert len(removed) == 1
assert "old-pipeline" in removed[0]
finally:
os.unlink(path)
print("PASS: test_old_complete_removed")
def test_stuck_running_removed():
"""Running entry older than 6h is treated as stuck and removed."""
old = (datetime.now(timezone.utc) - timedelta(hours=10)).isoformat()
entry = {"state": "running", "updated": old}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"stuck-pipeline": entry}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "stuck-pipeline" not in state
assert len(removed) == 1
finally:
os.unlink(path)
print("PASS: test_stuck_running_removed")
def test_old_failed_removed():
"""Failed entry older than 24h is removed."""
old = (datetime.now(timezone.utc) - timedelta(hours=48)).isoformat()
entry = {"state": "failed", "updated": old}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"failed-pipeline": entry}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "failed-pipeline" not in state
finally:
os.unlink(path)
print("PASS: test_old_failed_removed")
def test_running_kept_if_fresh():
"""Fresh running entry is kept."""
now = datetime.now(timezone.utc)
entry = {"state": "running", "updated": now.isoformat()}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"active-pipeline": entry}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "active-pipeline" in state
assert removed == []
finally:
os.unlink(path)
print("PASS: test_running_kept_if_fresh")
def test_dry_run_does_not_modify():
"""Dry run reports removals but doesn't change the file."""
old = (datetime.now(timezone.utc) - timedelta(hours=30)).isoformat()
content = json.dumps({"old-pipeline": {"state": "complete", "updated": old}})
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
f.write(content)
path = f.name
try:
state, removed = reset_pipeline_state(path, dry_run=True)
assert "old-pipeline" not in state
assert len(removed) == 1
# File should be unchanged
with open(path) as f:
file_state = json.load(f)
assert "old-pipeline" in file_state
finally:
os.unlink(path)
print("PASS: test_dry_run_does_not_modify")
def test_mixed_entries():
"""Mix of fresh and stale entries — only stale removed."""
now = datetime.now(timezone.utc)
old = (now - timedelta(hours=30)).isoformat()
state_data = {
"fresh-complete": {"state": "complete", "updated": now.isoformat()},
"stale-complete": {"state": "complete", "updated": old},
"fresh-running": {"state": "running", "updated": now.isoformat()},
"stuck-running": {"state": "running", "updated": (now - timedelta(hours=10)).isoformat()},
}
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump(state_data, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "fresh-complete" in state
assert "fresh-running" in state
assert "stale-complete" not in state
assert "stuck-running" not in state
assert len(removed) == 2
finally:
os.unlink(path)
print("PASS: test_mixed_entries")
def test_corrupted_entry_removed():
"""Non-dict entries are removed."""
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
json.dump({"broken": "not_a_dict", "also-bad": 42}, f)
path = f.name
try:
state, removed = reset_pipeline_state(path)
assert "broken" not in state
assert "also-bad" not in state
finally:
os.unlink(path)
print("PASS: test_corrupted_entry_removed")
def run_all():
test_no_state_file()
test_empty_state()
test_fresh_complete_kept()
test_old_complete_removed()
test_stuck_running_removed()
test_old_failed_removed()
test_running_kept_if_fresh()
test_dry_run_does_not_modify()
test_mixed_entries()
test_corrupted_entry_removed()
print("\nAll 10 tests passed!")
if __name__ == "__main__":
run_all()