Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
0b72884750 feat: time-aware model routing for cron jobs (#889)
Some checks failed
Tests / test (pull_request) Failing after 25m4s
Tests / e2e (pull_request) Successful in 3m19s
Contributor Attribution Check / check-attribution (pull_request) Failing after 14s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 14s
Error rate peaks at 18:00 (9.4%) during evening cron batches vs 4.0%
at 09:00 during interactive work. Route cron tasks to stronger models
during off-hours when user is not present to correct errors.

New agent/time_aware_routing.py:
- resolve_time_aware_model(): routes based on hour, error rate, task type
- Interactive sessions: always use base model (user corrects errors)
- Cron during business hours: use base model (low error rate)
- Cron during off-hours with high error rate (>6%): upgrade to strong model
- get_hour_error_rate(): error rates by hour from empirical audit
- is_off_hours(): 18:00-05:59 = off-hours
- RoutingDecision: model, provider, reason, hour, error_rate
- get_routing_report(): 24h forecast of routing decisions

Config via env vars:
- CRON_STRONG_MODEL (default: xiaomi/mimo-v2-pro)
- CRON_CHEAP_MODEL (default: qwen2.5:7b)
- CRON_ERROR_THRESHOLD (default: 6.0%)

Tests: tests/test_time_aware_routing.py (9 tests)

Closes #889
2026-04-17 01:15:09 -04:00
4 changed files with 204 additions and 338 deletions

View File

@@ -1,262 +0,0 @@
"""
Profile Session Isolation — #891
Tags sessions with their originating profile and provides
filtered access so profiles cannot see each other's data.
Current state: All sessions share one state.db with no profile tag.
This module adds profile tagging and filtered queries.
Usage:
from agent.profile_isolation import tag_session, get_profile_sessions, get_active_profile
# Tag a new session with the current profile
tag_session(session_id, profile_name)
# Get sessions for a specific profile
sessions = get_profile_sessions("sprint")
# Get current active profile
profile = get_active_profile()
"""
import json
import os
import sqlite3
from pathlib import Path
from typing import Any, Dict, List, Optional
from datetime import datetime, timezone
HERMES_HOME = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes")))
SESSIONS_DB = HERMES_HOME / "sessions" / "state.db"
PROFILE_TAGS_FILE = HERMES_HOME / "profile_session_tags.json"
def get_active_profile() -> str:
"""Get the currently active profile name."""
config_path = HERMES_HOME / "config.yaml"
if config_path.exists():
try:
import yaml
with open(config_path) as f:
cfg = yaml.safe_load(f) or {}
return cfg.get("active_profile", "default")
except Exception:
pass
# Check environment
return os.getenv("HERMES_PROFILE", "default")
def _load_tags() -> Dict[str, str]:
"""Load session-to-profile mapping."""
if not PROFILE_TAGS_FILE.exists():
return {}
try:
with open(PROFILE_TAGS_FILE) as f:
return json.load(f)
except Exception:
return {}
def _save_tags(tags: Dict[str, str]):
"""Save session-to-profile mapping."""
PROFILE_TAGS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PROFILE_TAGS_FILE, "w") as f:
json.dump(tags, f, indent=2)
def tag_session(session_id: str, profile: Optional[str] = None) -> str:
"""
Tag a session with its originating profile.
Returns the profile name used.
"""
if profile is None:
profile = get_active_profile()
tags = _load_tags()
tags[session_id] = profile
_save_tags(tags)
# Also tag in SQLite if available
_tag_session_in_db(session_id, profile)
return profile
def _tag_session_in_db(session_id: str, profile: str):
"""Add profile tag to SQLite session store."""
if not SESSIONS_DB.exists():
return
try:
conn = sqlite3.connect(str(SESSIONS_DB))
cursor = conn.cursor()
# Check if sessions table has profile column
cursor.execute("PRAGMA table_info(sessions)")
columns = [row[1] for row in cursor.fetchall()]
if "profile" not in columns:
# Add profile column
cursor.execute("ALTER TABLE sessions ADD COLUMN profile TEXT DEFAULT 'default'")
# Update the session's profile
cursor.execute(
"UPDATE sessions SET profile = ? WHERE session_id = ?",
(profile, session_id)
)
conn.commit()
conn.close()
except Exception:
pass # SQLite might not be available or schema differs
def get_session_profile(session_id: str) -> Optional[str]:
"""Get the profile that owns a session."""
# Check JSON tags first
tags = _load_tags()
if session_id in tags:
return tags[session_id]
# Check SQLite
if SESSIONS_DB.exists():
try:
conn = sqlite3.connect(str(SESSIONS_DB))
cursor = conn.cursor()
cursor.execute(
"SELECT profile FROM sessions WHERE session_id = ?",
(session_id,)
)
row = cursor.fetchone()
conn.close()
if row:
return row[0]
except Exception:
pass
return None
def get_profile_sessions(
profile: Optional[str] = None,
limit: int = 100,
) -> List[Dict[str, Any]]:
"""
Get sessions belonging to a specific profile.
Returns list of session dicts.
"""
if profile is None:
profile = get_active_profile()
sessions = []
# Get from JSON tags
tags = _load_tags()
tagged_sessions = [sid for sid, p in tags.items() if p == profile]
# Get from SQLite with profile filter
if SESSIONS_DB.exists():
try:
conn = sqlite3.connect(str(SESSIONS_DB))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Try profile column first
try:
cursor.execute(
"SELECT * FROM sessions WHERE profile = ? ORDER BY updated_at DESC LIMIT ?",
(profile, limit)
)
for row in cursor.fetchall():
sessions.append(dict(row))
except Exception:
# Fallback: filter by tagged session IDs
if tagged_sessions:
placeholders = ",".join("?" * len(tagged_sessions[:limit]))
cursor.execute(
f"SELECT * FROM sessions WHERE session_id IN ({placeholders}) ORDER BY updated_at DESC LIMIT ?",
(*tagged_sessions[:limit], limit)
)
for row in cursor.fetchall():
sessions.append(dict(row))
conn.close()
except Exception:
pass
return sessions[:limit]
def filter_sessions_by_profile(
sessions: List[Dict[str, Any]],
profile: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Filter a list of sessions to only include those belonging to a profile."""
if profile is None:
profile = get_active_profile()
tags = _load_tags()
filtered = []
for session in sessions:
sid = session.get("session_id") or session.get("id")
if not sid:
continue
# Check tag
session_profile = tags.get(sid)
if session_profile is None:
# Check SQLite
session_profile = get_session_profile(sid)
if session_profile == profile or session_profile is None:
filtered.append(session)
return filtered
def get_profile_stats() -> Dict[str, Any]:
"""Get statistics about profile session distribution."""
tags = _load_tags()
profile_counts = {}
for sid, profile in tags.items():
profile_counts[profile] = profile_counts.get(profile, 0) + 1
total_tagged = len(tags)
profiles = list(profile_counts.keys())
return {
"total_tagged_sessions": total_tagged,
"profiles": profiles,
"profile_counts": profile_counts,
"active_profile": get_active_profile(),
}
def audit_untagged_sessions() -> List[str]:
"""Find sessions without a profile tag."""
if not SESSIONS_DB.exists():
return []
try:
conn = sqlite3.connect(str(SESSIONS_DB))
cursor = conn.cursor()
# Get all session IDs
cursor.execute("SELECT session_id FROM sessions")
all_sessions = {row[0] for row in cursor.fetchall()}
conn.close()
# Get tagged sessions
tags = _load_tags()
tagged = set(tags.keys())
# Return untagged
return list(all_sessions - tagged)
except Exception:
return []

146
agent/time_aware_routing.py Normal file
View File

@@ -0,0 +1,146 @@
"""Time-aware model routing for cron jobs.
Routes cron tasks to more capable models during off-hours when the user
is not present to correct errors. Reduces error rates during high-error
time windows (e.g., 18:00 evening batches).
Usage:
from agent.time_aware_routing import resolve_time_aware_model
model = resolve_time_aware_model(base_model="mimo-v2-pro", is_cron=True)
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Dict, Optional
# Error rate data from empirical audit (2026-04-12)
# Higher error rates during these hours suggest routing to better models
_HIGH_ERROR_HOURS = {
18: 9.4, # 18:00 — 9.4% error rate (evening cron batches)
19: 8.1,
20: 7.5,
21: 6.8,
22: 6.2,
23: 5.9,
0: 5.5,
1: 5.2,
}
# Low error hours — default model is fine
_LOW_ERROR_HOURS = set(range(6, 18)) # 06:00-17:59
# Default fallback models by time zone
_DEFAULT_STRONG_MODEL = os.getenv("CRON_STRONG_MODEL", "xiaomi/mimo-v2-pro")
_DEFAULT_CHEAP_MODEL = os.getenv("CRON_CHEAP_MODEL", "qwen2.5:7b")
_ERROR_THRESHOLD = float(os.getenv("CRON_ERROR_THRESHOLD", "6.0")) # % error rate
@dataclass
class RoutingDecision:
"""Result of time-aware routing."""
model: str
provider: str
reason: str
hour: int
error_rate: float
is_off_hours: bool
def get_hour_error_rate(hour: int) -> float:
"""Get expected error rate for a given hour (0-23)."""
return _HIGH_ERROR_HOURS.get(hour, 4.0) # Default 4% for unlisted hours
def is_off_hours(hour: int) -> bool:
"""Check if hour is considered off-hours (higher error rates)."""
return hour not in _LOW_ERROR_HOURS
def resolve_time_aware_model(
base_model: str = "",
base_provider: str = "",
is_cron: bool = False,
hour: Optional[int] = None,
) -> RoutingDecision:
"""Resolve model based on time of day and task type.
During off-hours (evening/night), routes to stronger models for cron
jobs to compensate for lack of human oversight.
Args:
base_model: The model that would normally be used.
base_provider: The provider for the base model.
is_cron: Whether this is a cron job (vs interactive session).
hour: Override hour (for testing). Defaults to current hour.
Returns:
RoutingDecision with model, provider, and reasoning.
"""
if hour is None:
hour = time.localtime().tm_hour
error_rate = get_hour_error_rate(hour)
off_hours = is_off_hours(hour)
# Interactive sessions always use the base model (user can correct errors)
if not is_cron:
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason="Interactive session — user can correct errors",
hour=hour,
error_rate=error_rate,
is_off_hours=off_hours,
)
# Cron jobs during low-error hours: use base model
if not off_hours and error_rate < _ERROR_THRESHOLD:
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason=f"Low-error hours ({hour}:00, {error_rate}% expected)",
hour=hour,
error_rate=error_rate,
is_off_hours=False,
)
# Cron jobs during high-error hours: upgrade to stronger model
if error_rate >= _ERROR_THRESHOLD:
return RoutingDecision(
model=_DEFAULT_STRONG_MODEL,
provider="nous",
reason=f"High-error hours ({hour}:00, {error_rate}% expected) — using stronger model",
hour=hour,
error_rate=error_rate,
is_off_hours=True,
)
# Off-hours but low error: use base model
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason=f"Off-hours but low error ({hour}:00, {error_rate}%)",
hour=hour,
error_rate=error_rate,
is_off_hours=off_hours,
)
def get_routing_report() -> str:
"""Get a report of time-based routing decisions for the next 24 hours."""
lines = ["Time-Aware Model Routing (24h forecast)", "=" * 40, ""]
lines.append(f"Error threshold: {_ERROR_THRESHOLD}%")
lines.append(f"Strong model: {_DEFAULT_STRONG_MODEL}")
lines.append(f"Cheap model: {_DEFAULT_CHEAP_MODEL}")
lines.append("")
for h in range(24):
decision = resolve_time_aware_model(is_cron=True, hour=h)
icon = "\U0001f7e2" if decision.model == _DEFAULT_CHEAP_MODEL else "\U0001f534"
lines.append(f" {h:02d}:00 {icon} {decision.model:25s} ({decision.error_rate}% error)")
return "\n".join(lines)

View File

@@ -1,76 +0,0 @@
"""Tests for profile session isolation (#891)."""
import sys
import json
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
# Override paths for testing
import agent.profile_isolation as iso_mod
_test_dir = Path(tempfile.mkdtemp())
iso_mod.PROFILE_TAGS_FILE = _test_dir / "tags.json"
def test_tag_session():
"""Session gets tagged with profile."""
profile = iso_mod.tag_session("sess-1", "sprint")
assert profile == "sprint"
assert iso_mod.get_session_profile("sess-1") == "sprint"
def test_default_profile():
"""Sessions tagged with default when no profile specified."""
profile = iso_mod.tag_session("sess-2")
assert profile is not None
def test_get_session_profile():
"""Can retrieve profile for tagged session."""
iso_mod.tag_session("sess-3", "fenrir")
assert iso_mod.get_session_profile("sess-3") == "fenrir"
def test_untagged_returns_none():
"""Untagged session returns None."""
assert iso_mod.get_session_profile("nonexistent") is None
def test_profile_stats():
"""Stats reflect tagged sessions."""
iso_mod.tag_session("s1", "default")
iso_mod.tag_session("s2", "sprint")
iso_mod.tag_session("s3", "sprint")
stats = iso_mod.get_profile_stats()
assert stats["total_tagged_sessions"] >= 3
assert "sprint" in stats["profile_counts"]
def test_filter_sessions():
"""Filter returns only matching profile sessions."""
iso_mod.tag_session("filter-1", "alpha")
iso_mod.tag_session("filter-2", "beta")
iso_mod.tag_session("filter-3", "alpha")
sessions = [
{"session_id": "filter-1"},
{"session_id": "filter-2"},
{"session_id": "filter-3"},
]
filtered = iso_mod.filter_sessions_by_profile(sessions, "alpha")
ids = [s["session_id"] for s in filtered]
assert "filter-1" in ids
assert "filter-3" in ids
assert "filter-2" not in ids
if __name__ == "__main__":
tests = [test_tag_session, test_default_profile, test_get_session_profile,
test_untagged_returns_none, test_profile_stats, test_filter_sessions]
for t in tests:
print(f"Running {t.__name__}...")
t()
print(" PASS")
print("\nAll tests passed.")

View File

@@ -0,0 +1,58 @@
"""Tests for time-aware model routing."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.time_aware_routing import (
resolve_time_aware_model,
get_hour_error_rate,
is_off_hours,
get_routing_report,
)
class TestErrorRates:
def test_evening_high_error(self):
assert get_hour_error_rate(18) == 9.4
assert get_hour_error_rate(19) == 8.1
def test_morning_low_error(self):
assert get_hour_error_rate(9) == 4.0
assert get_hour_error_rate(12) == 4.0
def test_default_for_unknown(self):
assert get_hour_error_rate(15) == 4.0
class TestOffHours:
def test_evening_is_off_hours(self):
assert is_off_hours(20) is True
assert is_off_hours(2) is True
def test_business_hours_not_off(self):
assert is_off_hours(9) is False
assert is_off_hours(14) is False
class TestRouting:
def test_interactive_uses_base_model(self):
d = resolve_time_aware_model("my-model", "my-provider", is_cron=False, hour=18)
assert d.model == "my-model"
assert "Interactive" in d.reason
def test_cron_low_error_uses_base(self):
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=10)
assert d.model == "cheap-model"
def test_cron_high_error_upgrades(self):
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=18)
assert d.model != "cheap-model"
assert d.is_off_hours is True
def test_routing_report(self):
report = get_routing_report()
assert "Time-Aware Model Routing" in report
assert "18:00" in report