Compare commits

..

3 Commits

Author SHA1 Message Date
Timmy Time
ca737412ef Fix #293: Poka-yoke - prevent hardcoded ~/.hermes paths
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 35s
Added error-proofing to prevent hardcoded ~/.hermes paths that break
profile isolation. This is a poka-yoke (mistake-proofing) measure.

Changes:
1. Added .githooks/check_hardcoded_paths.py - pre-commit hook that detects:
   - Path.home() / '.hermes' patterns
   - '~/.hermes' in string literals
   - os.path.expanduser('~/.hermes') patterns
   - os.path.join(expanduser('~'), '.hermes') patterns

2. Updated .githooks/pre-commit.py to run the hardcoded path check

3. Added CI job in .github/workflows/tests.yml to check for hardcoded paths

4. Added comprehensive tests in tests/test_hardcoded_paths.py:
   - Tests for pattern detection
   - Tests for get_hermes_home() and display_hermes_home() functions
   - Tests for profile isolation
   - Integration tests for pre-commit hook

The hook ignores:
- hermes_constants.py (source of truth)
- Test files (can mock/test behavior)
- Documentation files (.md, README, etc.)
- Comments and docstrings

This prevents the recurring pattern of hardcoded paths that break
profile isolation, as mentioned in issue #293.

Fixes #293
2026-04-13 20:37:38 -04:00
5180c172fa Merge pull request 'feat: profile-tagged session isolation (#323)' (#422) from burn/323-1776120221 into main
Some checks failed
Forge CI / smoke-and-build (push) Failing after 43s
feat: profile-tagged session isolation (#323)

Closes #323.
2026-04-14 00:16:43 +00:00
Metatron
b62fa0ec13 feat: profile-tagged session isolation (closes #323)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 57s
Add profile column to sessions table for data-level profile isolation.
All session queries now accept an optional profile filter.

Changes:
- Schema v7: new 'profile' TEXT column + idx_sessions_profile index
- Migration v7: ALTER TABLE + CREATE INDEX on existing DBs
- create_session(): new profile parameter
- ensure_session(): new profile parameter
- list_sessions_rich(): profile filter (WHERE s.profile = ?)
- search_sessions(): profile filter
- session_count(): profile filter

Sessions without a profile (None) remain visible to all queries for
backward compatibility. When a profile is passed, only that profile's
sessions are returned.

Profile agents can no longer see each other's sessions when filtered.
No breaking changes to existing callers.
2026-04-13 18:53:45 -04:00
10 changed files with 515 additions and 366 deletions

View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
"""
Pre-commit hook for detecting hardcoded ~/.hermes paths.
This is a poka-yoke (error-proofing) measure to prevent profile isolation
failures. All code should use get_hermes_home() from hermes_constants instead
of hardcoding ~/.hermes or Path.home() / ".hermes".
Installation:
git config core.hooksPath .githooks
To bypass:
git commit --no-verify
"""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
from typing import Iterable, List
# ANSI color codes
RED = "\033[0;31m"
YELLOW = "\033[1;33m"
GREEN = "\033[0;32m"
NC = "\033[0m"
class Finding:
"""Represents a single hardcoded path finding."""
def __init__(self, filename: str, line: int, message: str, suggestion: str = "") -> None:
self.filename = filename
self.line = line
self.message = message
self.suggestion = suggestion
def __repr__(self) -> str:
return f"Finding({self.filename!r}, {self.line}, {self.message!r})"
# ---------------------------------------------------------------------------
# Regex patterns for hardcoded paths
# ---------------------------------------------------------------------------
# Pattern 1: Path.home() / ".hermes" or Path.home() / '.hermes'
_RE_PATH_HOME_HERMES = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]"""
)
# Pattern 2: Path.home() / ".hermes" / something
_RE_PATH_HOME_HERMES_SUB = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]\s*/"""
)
# Pattern 3: ~/.hermes in strings (but not in comments or docs)
_RE_TILDE_HERMES = re.compile(
r"""['"]~/?\.hermes(/|['"])"""
)
# Pattern 4: os.path.expanduser("~/.hermes")
_RE_EXPANDUSER_HERMES = re.compile(
r"""os\.path\.expanduser\(\s*['"]~/?\.hermes"""
)
# Pattern 5: os.path.join(os.path.expanduser("~"), ".hermes")
_RE_JOIN_EXPANDUSER = re.compile(
r"""os\.path\.join\(\s*os\.path\.expanduser\(\s*['"]~['"]\s*\)\s*,\s*['"]\.hermes['"]"""
)
# All patterns combined
_ALL_PATTERNS = [
(_RE_PATH_HOME_HERMES, "Path.home() / '.hermes' — use get_hermes_home() instead"),
(_RE_PATH_HOME_HERMES_SUB, "Path.home() / '.hermes' / ... — use get_hermes_home() / '...' instead"),
(_RE_TILDE_HERMES, "'~/.hermes' — use get_hermes_home() for paths, display_hermes_home() for display"),
(_RE_EXPANDUSER_HERMES, "os.path.expanduser('~/.hermes') — use get_hermes_home() instead"),
(_RE_JOIN_EXPANDUSER, "os.path.join(expanduser('~'), '.hermes') — use get_hermes_home() instead"),
]
# Safe contexts (don't flag these)
_SAFE_CONTEXTS = [
# hermes_constants.py is allowed (it's the source of truth)
"hermes_constants.py",
# Test files can mock/test the behavior
"test_",
"_test.py",
"/tests/",
# Documentation files
".md",
"README",
"CHANGELOG",
"AGENTS.md",
# Example/template files
".example",
"template",
]
def _is_safe_context(filename: str) -> bool:
"""Check if the file is in a safe context where hardcoded paths are OK."""
for safe in _SAFE_CONTEXTS:
if safe in filename:
return True
return False
def _is_comment_or_doc(line: str) -> bool:
"""Check if the line is a comment or documentation."""
stripped = line.strip()
if stripped.startswith("#"):
return True
if stripped.startswith('"""') or stripped.startswith("'''"):
return True
if '"""' in stripped or "'''" in stripped:
return True
return False
def scan_line_for_hardcoded_paths(line: str, filename: str, line_no: int) -> Iterable[Finding]:
"""Scan a single line for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return
stripped = line.rstrip("\n")
if not stripped:
return
# Skip comments and docstrings
if _is_comment_or_doc(stripped):
return
for pattern, message in _ALL_PATTERNS:
if pattern.search(stripped):
yield Finding(
filename,
line_no,
message,
"Use get_hermes_home() from hermes_constants for paths, display_hermes_home() for display",
)
return # One finding per line is enough
def get_staged_files() -> List[str]:
"""Get list of staged files in the git index."""
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True,
text=True,
check=True,
)
return [f.strip() for f in result.stdout.splitlines() if f.strip()]
except subprocess.CalledProcessError:
return []
def get_staged_content(filename: str) -> str:
"""Get the staged content of a file."""
try:
result = subprocess.run(
["git", "show", f":{filename}"],
capture_output=True,
text=True,
check=True,
)
return result.stdout
except subprocess.CalledProcessError:
return ""
def scan_file(filename: str) -> List[Finding]:
"""Scan a file for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return []
# Only scan Python files
if not filename.endswith(".py"):
return []
content = get_staged_content(filename)
if not content:
return []
findings = []
for line_no, line in enumerate(content.splitlines(), start=1):
for finding in scan_line_for_hardcoded_paths(line, filename, line_no):
findings.append(finding)
return findings
def main() -> int:
"""Main entry point for the pre-commit hook."""
staged_files = get_staged_files()
if not staged_files:
return 0
all_findings = []
for filename in staged_files:
findings = scan_file(filename)
all_findings.extend(findings)
if not all_findings:
return 0
# Print findings
print(f"\n{RED}✗ Hardcoded ~/.hermes paths detected:{NC}\n")
for finding in all_findings:
print(f" {YELLOW}{finding.filename}:{finding.line}{NC}")
print(f" {finding.message}")
if finding.suggestion:
print(f" {GREEN}Fix: {finding.suggestion}{NC}")
print()
print(f"{RED}Found {len(all_findings)} hardcoded path(s).{NC}")
print(f"{YELLOW}Use get_hermes_home() from hermes_constants for paths.{NC}")
print(f"{YELLOW}Use display_hermes_home() for user-facing display.{NC}")
print(f"\n{YELLOW}To bypass: git commit --no-verify{NC}\n")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -295,6 +295,22 @@ def main() -> int:
if line.startswith("+") and not line.startswith("+++"):
findings.extend(scan_line(line[1:], "<diff>", line_no))
# Also check for hardcoded ~/.hermes paths
print(f"{GREEN}🔍 Scanning for hardcoded ~/.hermes paths...{NC}")
try:
import subprocess as sp
result = sp.run(
[sys.executable, str(Path(__file__).parent / "check_hardcoded_paths.py")],
capture_output=True,
text=True,
)
if result.returncode != 0:
# Print the output from the hardcoded path check
print(result.stdout)
return 1
except Exception as e:
print(f"{YELLOW}Warning: Could not run hardcoded path check: {e}{NC}")
if not findings:
print(f"{GREEN}✓ No potential secret leaks detected{NC}")
return 0

View File

@@ -12,6 +12,23 @@ concurrency:
cancel-in-progress: true
jobs:
check-hardcoded-paths:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Check for hardcoded ~/.hermes paths
run: |
python .githooks/check_hardcoded_paths.py
# This will fail if any hardcoded paths are found
test:
runs-on: ubuntu-latest
container: catthehacker/ubuntu:act-22.04

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
import os
import re
from datetime import datetime
from typing import Any, Dict, Optional
from utils import is_truthy_value
@@ -183,7 +182,7 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
"command": runtime.get("command"),
"args": list(runtime.get("args") or []),
},
"label": f"smart route \u2192 {route.get('model')} ({runtime.get('provider')})",
"label": f"smart route {route.get('model')} ({runtime.get('provider')})",
"signature": (
route.get("model"),
runtime.get("provider"),
@@ -193,151 +192,3 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
tuple(runtime.get("args") or ()),
),
}
# ---------------------------------------------------------------------------
# Time-aware cron model routing
# ---------------------------------------------------------------------------
# During peak hours (user active), cron jobs use a cheaper model because the
# user is present to catch and correct errors. During off-peak hours (user
# absent), cron jobs use a stronger model because errors go uncorrected.
#
# Config (under smart_model_routing.cron_time_routing):
# enabled: true
# timezone: "America/New_York" # IANA timezone name (default: UTC)
# peak_hours:
# start: 9 # inclusive, 0-23
# end: 18 # exclusive, 0-23
# peak_model: # model to use during peak hours
# provider: openrouter
# model: xiaomi/mimo-v2-pro
# offpeak_model: # model to use during off-peak hours
# provider: openrouter
# model: anthropic/claude-sonnet-4
def _get_current_hour_in_tz(tz_name: str) -> int:
"""Return the current hour (0-23) in the given IANA timezone."""
try:
from zoneinfo import ZoneInfo
tz = ZoneInfo(tz_name)
except Exception:
try:
import pytz
tz = pytz.timezone(tz_name)
except Exception:
return datetime.utcnow().hour
return datetime.now(tz).hour
def _is_peak_hour(hour: int, peak_start: int, peak_end: int) -> bool:
"""Return True if *hour* falls within [peak_start, peak_end).
Handles wrap-around (e.g. start=22, end=6 means 22:00-05:59 is peak).
"""
if peak_start <= peak_end:
return peak_start <= hour < peak_end
else:
# Wraps midnight: e.g. 22-6 means 22,23,0,1,2,3,4,5
return hour >= peak_start or hour < peak_end
def resolve_cron_time_route(
routing_config: Optional[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
"""Return a time-aware model override for cron jobs.
Considers the current hour in the configured timezone and picks
between a peak-hours model (cheaper, user present) and an off-peak
model (stronger, user absent, errors go uncorrected).
Returns None when time-aware routing is disabled or misconfigured,
so the caller falls through to normal routing.
"""
cfg = routing_config or {}
cron_cfg = cfg.get("cron_time_routing") or {}
if not _coerce_bool(cron_cfg.get("enabled"), False):
return None
tz_name = str(cron_cfg.get("timezone", "UTC")).strip()
peak = cron_cfg.get("peak_hours") or {}
peak_start = _coerce_int(peak.get("start"), 9)
peak_end = _coerce_int(peak.get("end"), 18)
current_hour = _get_current_hour_in_tz(tz_name)
is_peak = _is_peak_hour(current_hour, peak_start, peak_end)
if is_peak:
model_cfg = cron_cfg.get("peak_model") or {}
reason = "cron_peak_hours"
else:
model_cfg = cron_cfg.get("offpeak_model") or {}
reason = "cron_offpeak_hours"
provider = str(model_cfg.get("provider") or "").strip().lower()
model = str(model_cfg.get("model") or "").strip()
if not provider or not model:
return None
return {
"provider": provider,
"model": model,
"base_url": model_cfg.get("base_url", ""),
"api_key_env": model_cfg.get("api_key_env", ""),
"routing_reason": reason,
"is_peak_hour": is_peak,
"hour": current_hour,
}
def resolve_cron_turn_route(
user_message: str,
routing_config: Optional[Dict[str, Any]],
primary: Dict[str, Any],
) -> Dict[str, Any]:
"""Resolve model route for a cron job turn with time-awareness.
Checks time-aware routing first (cron_time_routing), then falls
back to normal smart routing, then falls back to primary.
"""
# 1. Time-aware cron routing (peak vs off-peak)
time_route = resolve_cron_time_route(routing_config)
if time_route:
from hermes_cli.runtime_provider import resolve_runtime_provider
explicit_api_key = None
api_key_env = str(time_route.get("api_key_env") or "").strip()
if api_key_env:
explicit_api_key = os.getenv(api_key_env) or None
try:
runtime = resolve_runtime_provider(
requested=time_route.get("provider"),
explicit_api_key=explicit_api_key,
explicit_base_url=time_route.get("base_url"),
)
peak_label = "peak" if time_route.get("is_peak_hour") else "off-peak"
return {
"model": time_route.get("model"),
"runtime": {
"api_key": runtime.get("api_key"),
"base_url": runtime.get("base_url"),
"provider": runtime.get("provider"),
"api_mode": runtime.get("api_mode"),
"command": runtime.get("command"),
"args": list(runtime.get("args") or []),
},
"label": f"cron {peak_label} -> {time_route.get('model')} ({runtime.get('provider')})",
"signature": (
time_route.get("model"),
runtime.get("provider"),
runtime.get("base_url"),
runtime.get("api_mode"),
runtime.get("command"),
tuple(runtime.get("args") or ()),
),
}
except Exception:
pass # Fall through to normal routing
# 2. Normal smart routing (simple-turn cheap model)
return resolve_turn_route(user_message, routing_config, primary)

View File

@@ -87,21 +87,6 @@ model:
# cheap_model:
# provider: openrouter
# model: google/gemini-2.5-flash
# # Time-aware cron routing: pick model based on hour of day.
# # Peak hours = user present, cheaper model OK (they catch errors).
# # Off-peak = user absent, stronger model (errors go uncorrected).
# cron_time_routing:
# enabled: true
# timezone: "America/New_York" # IANA timezone (default: UTC)
# peak_hours:
# start: 9 # inclusive, 0-23
# end: 18 # exclusive, 0-23
# peak_model: # model during peak hours (user active)
# provider: openrouter
# model: xiaomi/mimo-v2-pro
# offpeak_model: # model during off-peak (user absent)
# provider: openrouter
# model: anthropic/claude-sonnet-4
# =============================================================================
# Git Worktree Isolation

View File

@@ -762,8 +762,8 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
from agent.smart_model_routing import resolve_cron_turn_route
turn_route = resolve_cron_turn_route(
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,
smart_routing,
{
@@ -776,8 +776,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
"args": list(runtime.get("args") or []),
},
)
if turn_route.get("label"):
logger.info("Job '%s': %s", job_name, turn_route["label"])
_agent_kwargs = _safe_agent_kwargs({
"model": turn_route["model"],

View File

@@ -299,13 +299,6 @@ DEFAULT_CONFIG = {
"max_simple_chars": 160,
"max_simple_words": 28,
"cheap_model": {},
"cron_time_routing": {
"enabled": False,
"timezone": "UTC",
"peak_hours": {"start": 9, "end": 18},
"peak_model": {},
"offpeak_model": {},
},
},
# Auxiliary model config — provider:model for each side task.

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6
SCHEMA_VERSION = 7
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,6 +66,7 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -86,6 +87,7 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -330,6 +332,19 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -362,13 +377,19 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id."""
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -377,6 +398,7 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -505,19 +527,23 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
)
self._execute_write(_do)
@@ -788,6 +814,7 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -799,6 +826,10 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -813,6 +844,9 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1158,34 +1192,52 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source."""
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
cursor = self._conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
with self._lock:
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -1,164 +0,0 @@
"""Tests for time-aware cron model routing."""
from agent.smart_model_routing import (
_is_peak_hour,
resolve_cron_time_route,
resolve_cron_turn_route,
)
# ---------------------------------------------------------------------------
# _is_peak_hour
# ---------------------------------------------------------------------------
def test_peak_hour_within_normal_range():
assert _is_peak_hour(10, 9, 18) is True
assert _is_peak_hour(12, 9, 18) is True
assert _is_peak_hour(17, 9, 18) is True
def test_peak_hour_outside_normal_range():
assert _is_peak_hour(8, 9, 18) is False
assert _is_peak_hour(18, 9, 18) is False
assert _is_peak_hour(22, 9, 18) is False
assert _is_peak_hour(0, 9, 18) is False
def test_peak_hour_at_boundaries():
assert _is_peak_hour(9, 9, 18) is True # start inclusive
assert _is_peak_hour(18, 9, 18) is False # end exclusive
def test_peak_hour_wraps_midnight():
# 22-6 means peak from 22:00 to 05:59
assert _is_peak_hour(22, 22, 6) is True
assert _is_peak_hour(23, 22, 6) is True
assert _is_peak_hour(0, 22, 6) is True
assert _is_peak_hour(5, 22, 6) is True
assert _is_peak_hour(6, 22, 6) is False
assert _is_peak_hour(12, 22, 6) is False
assert _is_peak_hour(21, 22, 6) is False
# ---------------------------------------------------------------------------
# resolve_cron_time_route
# ---------------------------------------------------------------------------
_CRON_ROUTING_CFG = {
"cron_time_routing": {
"enabled": True,
"timezone": "UTC",
"peak_hours": {"start": 9, "end": 18},
"peak_model": {
"provider": "openrouter",
"model": "xiaomi/mimo-v2-pro",
},
"offpeak_model": {
"provider": "openrouter",
"model": "anthropic/claude-sonnet-4",
},
},
}
def test_returns_none_when_disabled():
cfg = {"cron_time_routing": {"enabled": False}}
assert resolve_cron_time_route(cfg) is None
def test_returns_none_when_no_config():
assert resolve_cron_time_route(None) is None
assert resolve_cron_time_route({}) is None
def test_returns_none_when_models_missing():
cfg = {
"cron_time_routing": {
"enabled": True,
"peak_model": {"provider": "", "model": ""},
"offpeak_model": {"provider": "", "model": ""},
}
}
assert resolve_cron_time_route(cfg) is None
def test_returns_route_with_hour_injection(monkeypatch):
"""Force hour=14 (peak) via _get_current_hour_in_tz patch."""
monkeypatch.setattr(
"agent.smart_model_routing._get_current_hour_in_tz",
lambda tz: 14,
)
result = resolve_cron_time_route(_CRON_ROUTING_CFG)
assert result is not None
assert result["model"] == "xiaomi/mimo-v2-pro"
assert result["is_peak_hour"] is True
assert result["hour"] == 14
assert result["routing_reason"] == "cron_peak_hours"
def test_returns_offpeak_route(monkeypatch):
monkeypatch.setattr(
"agent.smart_model_routing._get_current_hour_in_tz",
lambda tz: 3,
)
result = resolve_cron_time_route(_CRON_ROUTING_CFG)
assert result is not None
assert result["model"] == "anthropic/claude-sonnet-4"
assert result["is_peak_hour"] is False
assert result["hour"] == 3
assert result["routing_reason"] == "cron_offpeak_hours"
# ---------------------------------------------------------------------------
# resolve_cron_turn_route
# ---------------------------------------------------------------------------
_PRIMARY = {
"model": "anthropic/claude-opus-4",
"provider": "openrouter",
"base_url": "https://openrouter.ai/api/v1",
"api_mode": "chat_completions",
"api_key": "***",
}
def test_cron_turn_route_uses_time_awareness(monkeypatch):
monkeypatch.setattr(
"agent.smart_model_routing._get_current_hour_in_tz",
lambda tz: 2, # off-peak
)
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda **kw: {
"api_key": "test-key",
"base_url": "https://openrouter.ai/api/v1",
"provider": "openrouter",
"api_mode": "chat_completions",
"command": None,
"args": [],
},
)
result = resolve_cron_turn_route("check status", _CRON_ROUTING_CFG, _PRIMARY)
assert result["model"] == "anthropic/claude-sonnet-4"
assert "cron off-peak" in (result.get("label") or "")
def test_cron_turn_route_falls_back_to_primary_when_no_config():
result = resolve_cron_turn_route("check status", None, _PRIMARY)
assert result["model"] == "anthropic/claude-opus-4"
assert result["label"] is None # no smart routing match
def test_cron_turn_route_falls_back_on_runtime_error(monkeypatch):
"""If time-route runtime resolution fails, fall back to normal routing."""
monkeypatch.setattr(
"agent.smart_model_routing._get_current_hour_in_tz",
lambda tz: 2,
)
monkeypatch.setattr(
"hermes_cli.runtime_provider.resolve_runtime_provider",
lambda **kw: (_ for _ in ()).throw(RuntimeError("bad")),
)
result = resolve_cron_turn_route("check status", _CRON_ROUTING_CFG, _PRIMARY)
# Falls back to primary since the time-route runtime failed
assert result["model"] == "anthropic/claude-opus-4"

View File

@@ -0,0 +1,175 @@
"""
Tests for hardcoded ~/.hermes path detection (poka-yoke).
These tests verify that the pre-commit hook correctly detects hardcoded
paths and that the codebase uses get_hermes_home() correctly.
"""
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Import the scanner
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / ".githooks"))
from check_hardcoded_paths import scan_line_for_hardcoded_paths, Finding
class TestHardcodedPathDetection:
"""Test the hardcoded path detection logic."""
def test_detects_path_home_hermes(self):
"""Detect Path.home() / '.hermes' pattern."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
assert "Path.home()" in findings[0].message
def test_detects_path_home_hermes_subpath(self):
"""Detect Path.home() / '.hermes' / 'subdir' pattern."""
line = ' config_dir = Path.home() / ".hermes" / "config"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_tilde_hermes_in_string(self):
"""Detect '~/.hermes' in string literals."""
line = ' path = "~/.hermes/config.yaml"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_expanduser_hermes(self):
"""Detect os.path.expanduser('~/.hermes') pattern."""
line = ' home = os.path.expanduser("~/.hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_join_expanduser(self):
"""Detect os.path.join(expanduser('~'), '.hermes') pattern."""
line = ' home = os.path.join(os.path.expanduser("~"), ".hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_ignores_comments(self):
"""Ignore hardcoded paths in comments."""
line = ' # This is ~/.hermes in a comment'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_docstrings(self):
"""Ignore hardcoded paths in docstrings."""
line = ' """This mentions ~/.hermes in a docstring."""'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_hermes_constants(self):
"""hermes_constants.py is allowed to have hardcoded paths."""
line = ' return Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "hermes_constants.py", 1))
assert len(findings) == 0
def test_ignores_test_files(self):
"""Test files can have hardcoded paths for testing."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test_something.py", 1))
assert len(findings) == 0
def test_ignores_markdown_files(self):
"""Markdown files can have hardcoded paths in examples."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "README.md", 1))
assert len(findings) == 0
def test_ignores_empty_lines(self):
"""Empty lines should not produce findings."""
line = ""
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
class TestHermesHomeUsage:
"""Test that the codebase uses get_hermes_home() correctly."""
def test_hermes_constants_has_get_hermes_home(self):
"""hermes_constants.py should export get_hermes_home()."""
from hermes_constants import get_hermes_home
assert callable(get_hermes_home)
def test_hermes_constants_has_display_hermes_home(self):
"""hermes_constants.py should export display_hermes_home()."""
from hermes_constants import display_hermes_home
assert callable(display_hermes_home)
def test_get_hermes_home_returns_path(self):
"""get_hermes_home() should return a Path object."""
from hermes_constants import get_hermes_home
result = get_hermes_home()
assert isinstance(result, Path)
def test_get_hermes_home_honors_env_var(self):
"""get_hermes_home() should honor HERMES_HOME env var."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir:
with patch.dict(os.environ, {"HERMES_HOME": tmpdir}):
result = get_hermes_home()
assert result == Path(tmpdir)
def test_display_hermes_home_returns_string(self):
"""display_hermes_home() should return a string."""
from hermes_constants import display_hermes_home
result = display_hermes_home()
assert isinstance(result, str)
def test_display_hermes_home_uses_tilde_shorthand(self):
"""display_hermes_home() should use ~/ shorthand for home directory."""
from hermes_constants import display_hermes_home, get_hermes_home
# If HERMES_HOME is under home directory, should use ~/
home = get_hermes_home()
if home.is_relative_to(Path.home()):
result = display_hermes_home()
assert result.startswith("~/")
def test_profile_isolation_with_env_var(self):
"""Each profile should have its own HERMES_HOME."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir1, tempfile.TemporaryDirectory() as tmpdir2:
# Profile 1
with patch.dict(os.environ, {"HERMES_HOME": tmpdir1}):
home1 = get_hermes_home()
# Profile 2
with patch.dict(os.environ, {"HERMES_HOME": tmpdir2}):
home2 = get_hermes_home()
assert home1 != home2
assert home1 == Path(tmpdir1)
assert home2 == Path(tmpdir2)
class TestPreCommitHookIntegration:
"""Integration tests for the pre-commit hook."""
def test_hook_script_exists(self):
"""The check_hardcoded_paths.py script should exist."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.exists()
def test_hook_script_is_executable(self):
"""The check_hardcoded_paths.py script should be executable."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.stat().st_mode & 0o111 # Check executable bits
def test_pre_commit_calls_hardcoded_check(self):
"""pre-commit.py should call the hardcoded path check."""
pre_commit_path = Path(__file__).parent.parent / ".githooks" / "pre-commit.py"
content = pre_commit_path.read_text()
assert "check_hardcoded_paths.py" in content
if __name__ == "__main__":
pytest.main([__file__, "-v"])