Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
73984ca72f feat: Add queue health check script
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 29s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 31s
Tests / e2e (pull_request) Successful in 2m13s
Tests / test (pull_request) Failing after 28m10s
2026-04-17 01:26:07 -04:00
3 changed files with 147 additions and 204 deletions

View File

@@ -1,146 +0,0 @@
"""Time-aware model routing for cron jobs.
Routes cron tasks to more capable models during off-hours when the user
is not present to correct errors. Reduces error rates during high-error
time windows (e.g., 18:00 evening batches).
Usage:
from agent.time_aware_routing import resolve_time_aware_model
model = resolve_time_aware_model(base_model="mimo-v2-pro", is_cron=True)
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Dict, Optional
# Error rate data from empirical audit (2026-04-12)
# Higher error rates during these hours suggest routing to better models
_HIGH_ERROR_HOURS = {
18: 9.4, # 18:00 — 9.4% error rate (evening cron batches)
19: 8.1,
20: 7.5,
21: 6.8,
22: 6.2,
23: 5.9,
0: 5.5,
1: 5.2,
}
# Low error hours — default model is fine
_LOW_ERROR_HOURS = set(range(6, 18)) # 06:00-17:59
# Default fallback models by time zone
_DEFAULT_STRONG_MODEL = os.getenv("CRON_STRONG_MODEL", "xiaomi/mimo-v2-pro")
_DEFAULT_CHEAP_MODEL = os.getenv("CRON_CHEAP_MODEL", "qwen2.5:7b")
_ERROR_THRESHOLD = float(os.getenv("CRON_ERROR_THRESHOLD", "6.0")) # % error rate
@dataclass
class RoutingDecision:
"""Result of time-aware routing."""
model: str
provider: str
reason: str
hour: int
error_rate: float
is_off_hours: bool
def get_hour_error_rate(hour: int) -> float:
"""Get expected error rate for a given hour (0-23)."""
return _HIGH_ERROR_HOURS.get(hour, 4.0) # Default 4% for unlisted hours
def is_off_hours(hour: int) -> bool:
"""Check if hour is considered off-hours (higher error rates)."""
return hour not in _LOW_ERROR_HOURS
def resolve_time_aware_model(
base_model: str = "",
base_provider: str = "",
is_cron: bool = False,
hour: Optional[int] = None,
) -> RoutingDecision:
"""Resolve model based on time of day and task type.
During off-hours (evening/night), routes to stronger models for cron
jobs to compensate for lack of human oversight.
Args:
base_model: The model that would normally be used.
base_provider: The provider for the base model.
is_cron: Whether this is a cron job (vs interactive session).
hour: Override hour (for testing). Defaults to current hour.
Returns:
RoutingDecision with model, provider, and reasoning.
"""
if hour is None:
hour = time.localtime().tm_hour
error_rate = get_hour_error_rate(hour)
off_hours = is_off_hours(hour)
# Interactive sessions always use the base model (user can correct errors)
if not is_cron:
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason="Interactive session — user can correct errors",
hour=hour,
error_rate=error_rate,
is_off_hours=off_hours,
)
# Cron jobs during low-error hours: use base model
if not off_hours and error_rate < _ERROR_THRESHOLD:
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason=f"Low-error hours ({hour}:00, {error_rate}% expected)",
hour=hour,
error_rate=error_rate,
is_off_hours=False,
)
# Cron jobs during high-error hours: upgrade to stronger model
if error_rate >= _ERROR_THRESHOLD:
return RoutingDecision(
model=_DEFAULT_STRONG_MODEL,
provider="nous",
reason=f"High-error hours ({hour}:00, {error_rate}% expected) — using stronger model",
hour=hour,
error_rate=error_rate,
is_off_hours=True,
)
# Off-hours but low error: use base model
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason=f"Off-hours but low error ({hour}:00, {error_rate}%)",
hour=hour,
error_rate=error_rate,
is_off_hours=off_hours,
)
def get_routing_report() -> str:
"""Get a report of time-based routing decisions for the next 24 hours."""
lines = ["Time-Aware Model Routing (24h forecast)", "=" * 40, ""]
lines.append(f"Error threshold: {_ERROR_THRESHOLD}%")
lines.append(f"Strong model: {_DEFAULT_STRONG_MODEL}")
lines.append(f"Cheap model: {_DEFAULT_CHEAP_MODEL}")
lines.append("")
for h in range(24):
decision = resolve_time_aware_model(is_cron=True, hour=h)
icon = "\U0001f7e2" if decision.model == _DEFAULT_CHEAP_MODEL else "\U0001f534"
lines.append(f" {h:02d}:00 {icon} {decision.model:25s} ({decision.error_rate}% error)")
return "\n".join(lines)

147
scripts/queue_health_check.py Executable file
View File

@@ -0,0 +1,147 @@
#!/usr/bin/env python3
"""
Queue Health Check — Verify dispatch queue is operational.
Checks:
1. Queue file exists and is readable
2. Queue has pending items
3. Queue is not stuck (items processing)
4. Queue age (stale items)
Usage:
python scripts/queue_health_check.py
python scripts/queue_health_check.py --json
"""
import json
import sys
from datetime import datetime, timedelta
from pathlib import Path
def check_queue_health(queue_path: str = "~/.hermes/queue.json") -> dict:
"""Check queue health status."""
path = Path(queue_path).expanduser()
result = {
"healthy": True,
"checks": {},
"warnings": [],
"errors": []
}
# Check 1: File exists
if not path.exists():
result["healthy"] = False
result["errors"].append(f"Queue file not found: {path}")
result["checks"]["file_exists"] = False
return result
result["checks"]["file_exists"] = True
# Check 2: File is readable
try:
with open(path, "r") as f:
data = json.load(f)
except Exception as e:
result["healthy"] = False
result["errors"].append(f"Cannot read queue: {e}")
result["checks"]["readable"] = False
return result
result["checks"]["readable"] = True
# Check 3: Queue structure
if not isinstance(data, dict):
result["healthy"] = False
result["errors"].append("Queue is not a dict")
result["checks"]["valid_structure"] = False
return result
result["checks"]["valid_structure"] = True
# Check 4: Pending items
pending = data.get("pending", [])
processing = data.get("processing", [])
completed = data.get("completed", [])
result["checks"]["pending_count"] = len(pending)
result["checks"]["processing_count"] = len(processing)
result["checks"]["completed_count"] = len(completed)
if len(pending) == 0 and len(processing) == 0:
result["warnings"].append("Queue is empty")
# Check 5: Stale processing items
now = datetime.now()
stale_threshold = timedelta(hours=1)
for item in processing:
started = item.get("started_at")
if started:
try:
started_time = datetime.fromisoformat(started.replace("Z", "+00:00"))
if now - started_time > stale_threshold:
result["warnings"].append(f"Stale item: {item.get('id', 'unknown')} (started {started})")
except:
pass
# Check 6: Queue age
if pending:
oldest = min(pending, key=lambda x: x.get("added_at", ""))
added = oldest.get("added_at")
if added:
try:
added_time = datetime.fromisoformat(added.replace("Z", "+00:00"))
age = now - added_time
if age > timedelta(hours=24):
result["warnings"].append(f"Old item in queue: {oldest.get('id', 'unknown')} (added {added})")
except:
pass
return result
def main():
"""Main function."""
import argparse
parser = argparse.ArgumentParser(description="Queue health check")
parser.add_argument("--queue", default="~/.hermes/queue.json", help="Queue file path")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
result = check_queue_health(args.queue)
if args.json:
print(json.dumps(result, indent=2))
else:
print("Queue Health Check")
print("=" * 50)
print(f"Healthy: {'' if result['healthy'] else ''}")
print()
print("Checks:")
for check, value in result["checks"].items():
if isinstance(value, bool):
print(f" {check}: {'' if value else ''}")
else:
print(f" {check}: {value}")
if result["warnings"]:
print()
print("Warnings:")
for warning in result["warnings"]:
print(f"{warning}")
if result["errors"]:
print()
print("Errors:")
for error in result["errors"]:
print(f"{error}")
sys.exit(0 if result["healthy"] else 1)
if __name__ == "__main__":
main()

View File

@@ -1,58 +0,0 @@
"""Tests for time-aware model routing."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.time_aware_routing import (
resolve_time_aware_model,
get_hour_error_rate,
is_off_hours,
get_routing_report,
)
class TestErrorRates:
def test_evening_high_error(self):
assert get_hour_error_rate(18) == 9.4
assert get_hour_error_rate(19) == 8.1
def test_morning_low_error(self):
assert get_hour_error_rate(9) == 4.0
assert get_hour_error_rate(12) == 4.0
def test_default_for_unknown(self):
assert get_hour_error_rate(15) == 4.0
class TestOffHours:
def test_evening_is_off_hours(self):
assert is_off_hours(20) is True
assert is_off_hours(2) is True
def test_business_hours_not_off(self):
assert is_off_hours(9) is False
assert is_off_hours(14) is False
class TestRouting:
def test_interactive_uses_base_model(self):
d = resolve_time_aware_model("my-model", "my-provider", is_cron=False, hour=18)
assert d.model == "my-model"
assert "Interactive" in d.reason
def test_cron_low_error_uses_base(self):
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=10)
assert d.model == "cheap-model"
def test_cron_high_error_upgrades(self):
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=18)
assert d.model != "cheap-model"
assert d.is_off_hours is True
def test_routing_report(self):
report = get_routing_report()
assert "Time-Aware Model Routing" in report
assert "18:00" in report