Compare commits

..

1 Commits

Author SHA1 Message Date
Hermes Agent
c71f95daa2 fix: gateway config debt — missing keys, broken fallbacks, Discord skill limit
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 42s
Resolves #328. Addresses 6 issues from the empirical audit 2026-04-12.

gateway/config.py:
- Validate idle_minutes, at_hour, and mode at construction time in
  SessionResetPolicy.from_dict — reject 0, negative, non-integer,
  out-of-range, and absurdly large values before they reach runtime
- Add API_SERVER_KEY validation: error-level log when API server is
  bound to 0.0.0.0 without a key (open relay), warning on localhost
- Add provider fallback validation: when fallback_model references
  openrouter/anthropic/openai/nous but the corresponding API key is
  not set, log a warning that fallback will fail at runtime
- Expand weak credential guard to cover OPENROUTER_API_KEY,
  ANTHROPIC_API_KEY, OPENAI_API_KEY, NOUS_API_KEY
- Wire discord.skill_slash_commands config.yaml key to env var

gateway/platforms/discord.py:
- Add DISCORD_SKILL_SLASH_COMMANDS=false config option to disable
  skill slash command registration entirely — resolves the 100-command
  limit that left 279 skills unregistered. Skills remain accessible
  via /skill or text mention when disabled
- Improve warning message with remediation instructions

gateway/platforms/api_server.py:
- Add startup warning when API server has no API_SERVER_KEY:
  error-level on non-localhost bind, warning on localhost

tests/test_gateway_config_debt_328.py:
- 14 tests for SessionResetPolicy validation (zero, negative, string,
  overflow, boundary values for idle_minutes, at_hour, mode)
- Tests for API key validation and weak credential expansion
2026-04-13 18:58:29 -04:00
9 changed files with 322 additions and 543 deletions

View File

@@ -1,226 +0,0 @@
#!/usr/bin/env python3
"""
Pre-commit hook for detecting hardcoded ~/.hermes paths.
This is a poka-yoke (error-proofing) measure to prevent profile isolation
failures. All code should use get_hermes_home() from hermes_constants instead
of hardcoding ~/.hermes or Path.home() / ".hermes".
Installation:
git config core.hooksPath .githooks
To bypass:
git commit --no-verify
"""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
from typing import Iterable, List
# ANSI color codes
RED = "\033[0;31m"
YELLOW = "\033[1;33m"
GREEN = "\033[0;32m"
NC = "\033[0m"
class Finding:
"""Represents a single hardcoded path finding."""
def __init__(self, filename: str, line: int, message: str, suggestion: str = "") -> None:
self.filename = filename
self.line = line
self.message = message
self.suggestion = suggestion
def __repr__(self) -> str:
return f"Finding({self.filename!r}, {self.line}, {self.message!r})"
# ---------------------------------------------------------------------------
# Regex patterns for hardcoded paths
# ---------------------------------------------------------------------------
# Pattern 1: Path.home() / ".hermes" or Path.home() / '.hermes'
_RE_PATH_HOME_HERMES = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]"""
)
# Pattern 2: Path.home() / ".hermes" / something
_RE_PATH_HOME_HERMES_SUB = re.compile(
r"""Path\.home\(\)\s*/\s*['"]\.hermes['"]\s*/"""
)
# Pattern 3: ~/.hermes in strings (but not in comments or docs)
_RE_TILDE_HERMES = re.compile(
r"""['"]~/?\.hermes(/|['"])"""
)
# Pattern 4: os.path.expanduser("~/.hermes")
_RE_EXPANDUSER_HERMES = re.compile(
r"""os\.path\.expanduser\(\s*['"]~/?\.hermes"""
)
# Pattern 5: os.path.join(os.path.expanduser("~"), ".hermes")
_RE_JOIN_EXPANDUSER = re.compile(
r"""os\.path\.join\(\s*os\.path\.expanduser\(\s*['"]~['"]\s*\)\s*,\s*['"]\.hermes['"]"""
)
# All patterns combined
_ALL_PATTERNS = [
(_RE_PATH_HOME_HERMES, "Path.home() / '.hermes' — use get_hermes_home() instead"),
(_RE_PATH_HOME_HERMES_SUB, "Path.home() / '.hermes' / ... — use get_hermes_home() / '...' instead"),
(_RE_TILDE_HERMES, "'~/.hermes' — use get_hermes_home() for paths, display_hermes_home() for display"),
(_RE_EXPANDUSER_HERMES, "os.path.expanduser('~/.hermes') — use get_hermes_home() instead"),
(_RE_JOIN_EXPANDUSER, "os.path.join(expanduser('~'), '.hermes') — use get_hermes_home() instead"),
]
# Safe contexts (don't flag these)
_SAFE_CONTEXTS = [
# hermes_constants.py is allowed (it's the source of truth)
"hermes_constants.py",
# Test files can mock/test the behavior
"test_",
"_test.py",
"/tests/",
# Documentation files
".md",
"README",
"CHANGELOG",
"AGENTS.md",
# Example/template files
".example",
"template",
]
def _is_safe_context(filename: str) -> bool:
"""Check if the file is in a safe context where hardcoded paths are OK."""
for safe in _SAFE_CONTEXTS:
if safe in filename:
return True
return False
def _is_comment_or_doc(line: str) -> bool:
"""Check if the line is a comment or documentation."""
stripped = line.strip()
if stripped.startswith("#"):
return True
if stripped.startswith('"""') or stripped.startswith("'''"):
return True
if '"""' in stripped or "'''" in stripped:
return True
return False
def scan_line_for_hardcoded_paths(line: str, filename: str, line_no: int) -> Iterable[Finding]:
"""Scan a single line for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return
stripped = line.rstrip("\n")
if not stripped:
return
# Skip comments and docstrings
if _is_comment_or_doc(stripped):
return
for pattern, message in _ALL_PATTERNS:
if pattern.search(stripped):
yield Finding(
filename,
line_no,
message,
"Use get_hermes_home() from hermes_constants for paths, display_hermes_home() for display",
)
return # One finding per line is enough
def get_staged_files() -> List[str]:
"""Get list of staged files in the git index."""
try:
result = subprocess.run(
["git", "diff", "--cached", "--name-only", "--diff-filter=ACM"],
capture_output=True,
text=True,
check=True,
)
return [f.strip() for f in result.stdout.splitlines() if f.strip()]
except subprocess.CalledProcessError:
return []
def get_staged_content(filename: str) -> str:
"""Get the staged content of a file."""
try:
result = subprocess.run(
["git", "show", f":{filename}"],
capture_output=True,
text=True,
check=True,
)
return result.stdout
except subprocess.CalledProcessError:
return ""
def scan_file(filename: str) -> List[Finding]:
"""Scan a file for hardcoded ~/.hermes paths."""
if _is_safe_context(filename):
return []
# Only scan Python files
if not filename.endswith(".py"):
return []
content = get_staged_content(filename)
if not content:
return []
findings = []
for line_no, line in enumerate(content.splitlines(), start=1):
for finding in scan_line_for_hardcoded_paths(line, filename, line_no):
findings.append(finding)
return findings
def main() -> int:
"""Main entry point for the pre-commit hook."""
staged_files = get_staged_files()
if not staged_files:
return 0
all_findings = []
for filename in staged_files:
findings = scan_file(filename)
all_findings.extend(findings)
if not all_findings:
return 0
# Print findings
print(f"\n{RED}✗ Hardcoded ~/.hermes paths detected:{NC}\n")
for finding in all_findings:
print(f" {YELLOW}{finding.filename}:{finding.line}{NC}")
print(f" {finding.message}")
if finding.suggestion:
print(f" {GREEN}Fix: {finding.suggestion}{NC}")
print()
print(f"{RED}Found {len(all_findings)} hardcoded path(s).{NC}")
print(f"{YELLOW}Use get_hermes_home() from hermes_constants for paths.{NC}")
print(f"{YELLOW}Use display_hermes_home() for user-facing display.{NC}")
print(f"\n{YELLOW}To bypass: git commit --no-verify{NC}\n")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -295,22 +295,6 @@ def main() -> int:
if line.startswith("+") and not line.startswith("+++"):
findings.extend(scan_line(line[1:], "<diff>", line_no))
# Also check for hardcoded ~/.hermes paths
print(f"{GREEN}🔍 Scanning for hardcoded ~/.hermes paths...{NC}")
try:
import subprocess as sp
result = sp.run(
[sys.executable, str(Path(__file__).parent / "check_hardcoded_paths.py")],
capture_output=True,
text=True,
)
if result.returncode != 0:
# Print the output from the hardcoded path check
print(result.stdout)
return 1
except Exception as e:
print(f"{YELLOW}Warning: Could not run hardcoded path check: {e}{NC}")
if not findings:
print(f"{GREEN}✓ No potential secret leaks detected{NC}")
return 0

View File

@@ -12,23 +12,6 @@ concurrency:
cancel-in-progress: true
jobs:
check-hardcoded-paths:
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Check for hardcoded ~/.hermes paths
run: |
python .githooks/check_hardcoded_paths.py
# This will fail if any hardcoded paths are found
test:
runs-on: ubuntu-latest
container: catthehacker/ubuntu:act-22.04

View File

@@ -127,6 +127,52 @@ class SessionResetPolicy:
idle_minutes = data.get("idle_minutes")
notify = data.get("notify")
exclude = data.get("notify_exclude_platforms")
# Validate idle_minutes early — reject 0, negative, and absurdly large values
if idle_minutes is not None:
try:
idle_minutes = int(idle_minutes)
except (ValueError, TypeError):
logger.warning(
"Invalid idle_minutes=%r (not an integer). Using default 1440.",
idle_minutes,
)
idle_minutes = None
else:
if idle_minutes <= 0:
logger.warning(
"Invalid idle_minutes=%s (must be positive). Using default 1440.",
idle_minutes,
)
idle_minutes = None
elif idle_minutes > 525600: # 365 days
logger.warning(
"idle_minutes=%s exceeds 1 year. Capping at 525600 (365 days).",
idle_minutes,
)
idle_minutes = 525600
# Validate at_hour early
if at_hour is not None:
try:
at_hour = int(at_hour)
except (ValueError, TypeError):
logger.warning("Invalid at_hour=%r (not an integer). Using default 4.", at_hour)
at_hour = None
else:
if not (0 <= at_hour <= 23):
logger.warning("Invalid at_hour=%s (must be 0-23). Using default 4.", at_hour)
at_hour = None
# Validate mode
if mode is not None:
mode = str(mode).strip().lower()
if mode not in ("daily", "idle", "both", "none"):
logger.warning(
"Invalid session_reset mode=%r. Using default 'both'.", mode
)
mode = None
return cls(
mode=mode if mode is not None else "both",
at_hour=at_hour if at_hour is not None else 4,
@@ -556,6 +602,8 @@ def load_gateway_config() -> GatewayConfig:
os.environ["DISCORD_AUTO_THREAD"] = str(discord_cfg["auto_thread"]).lower()
if "reactions" in discord_cfg and not os.getenv("DISCORD_REACTIONS"):
os.environ["DISCORD_REACTIONS"] = str(discord_cfg["reactions"]).lower()
if "skill_slash_commands" in discord_cfg and not os.getenv("DISCORD_SKILL_SLASH_COMMANDS"):
os.environ["DISCORD_SKILL_SLASH_COMMANDS"] = str(discord_cfg["skill_slash_commands"]).lower()
# Telegram settings → env vars (env vars take precedence)
telegram_cfg = yaml_cfg.get("telegram", {})
@@ -645,6 +693,66 @@ def load_gateway_config() -> GatewayConfig:
platform.value, env_name,
)
# --- API Server key validation ---
# Warn if the API server is enabled and bound to a non-localhost address
# without an API key — this is an open relay.
if Platform.API_SERVER in config.platforms and config.platforms[Platform.API_SERVER].enabled:
api_cfg = config.platforms[Platform.API_SERVER]
host = api_cfg.extra.get("host", os.getenv("API_SERVER_HOST", "127.0.0.1"))
key = api_cfg.extra.get("key", os.getenv("API_SERVER_KEY", ""))
if not key:
if host in ("0.0.0.0", "::", ""):
logger.error(
"API server is bound to %s without API_SERVER_KEY set. "
"This exposes an unauthenticated OpenAI-compatible endpoint to the network. "
"Set API_SERVER_KEY immediately or bind to 127.0.0.1.",
host,
)
else:
logger.warning(
"API server is enabled without API_SERVER_KEY. "
"All requests will be unauthenticated. "
"Set API_SERVER_KEY for production use.",
)
# --- Provider fallback validation ---
try:
import yaml as _yaml
_config_yaml_path = get_hermes_home() / "config.yaml"
if _config_yaml_path.exists():
with open(_config_yaml_path, encoding="utf-8") as _f:
_raw_cfg = _yaml.safe_load(_f) or {}
_fallback = _raw_cfg.get("fallback_model")
if isinstance(_fallback, dict):
_fb_provider = _fallback.get("provider", "")
_fb_provider_lower = _fb_provider.lower().strip()
if _fb_provider_lower == "openrouter" and not os.getenv("OPENROUTER_API_KEY"):
logger.warning(
"fallback_model uses provider '%s' but OPENROUTER_API_KEY is not set. "
"Fallback will fail at runtime. Set OPENROUTER_API_KEY or change the fallback provider.",
_fb_provider,
)
elif _fb_provider_lower in ("anthropic", "claude") and not os.getenv("ANTHROPIC_API_KEY"):
logger.warning(
"fallback_model uses provider '%s' but ANTHROPIC_API_KEY is not set. "
"Fallback will fail at runtime.",
_fb_provider,
)
elif _fb_provider_lower in ("openai",) and not os.getenv("OPENAI_API_KEY"):
logger.warning(
"fallback_model uses provider '%s' but OPENAI_API_KEY is not set. "
"Fallback will fail at runtime.",
_fb_provider,
)
elif _fb_provider_lower in ("nous", "nousresearch") and not os.getenv("NOUS_API_KEY"):
logger.warning(
"fallback_model uses provider '%s' but NOUS_API_KEY is not set. "
"Fallback will fail at runtime.",
_fb_provider,
)
except Exception:
pass # best-effort validation
return config
@@ -667,6 +775,10 @@ _MIN_TOKEN_LENGTHS = {
"DISCORD_BOT_TOKEN": 50,
"SLACK_BOT_TOKEN": 20,
"HASS_TOKEN": 20,
"OPENROUTER_API_KEY": 20,
"ANTHROPIC_API_KEY": 20,
"OPENAI_API_KEY": 20,
"NOUS_API_KEY": 20,
}

View File

@@ -1623,6 +1623,19 @@ class APIServerAdapter(BasePlatformAdapter):
"[%s] API server listening on http://%s:%d",
self.name, self._host, self._port,
)
if not self._api_key:
if self._host in ("0.0.0.0", "::", ""):
logger.error(
"[%s] No API_SERVER_KEY set and bound to %s"
"endpoint is unauthenticated on the network. "
"Set API_SERVER_KEY or bind to 127.0.0.1.",
self.name, self._host,
)
else:
logger.warning(
"[%s] No API_SERVER_KEY set — all requests are unauthenticated.",
self.name,
)
return True
except Exception as e:

View File

@@ -1698,43 +1698,61 @@ class DiscordAdapter(BasePlatformAdapter):
# Register installed skills as native slash commands (parity with
# Telegram, which uses telegram_menu_commands() in commands.py).
# Discord allows up to 100 application commands globally.
_DISCORD_CMD_LIMIT = 100
try:
from hermes_cli.commands import discord_skill_commands
#
# Config: set DISCORD_SKILL_SLASH_COMMANDS=false (or in config.yaml
# under discord.skill_slash_commands: false) to disable skill
# slash commands entirely — useful when 279+ skills overflow the
# 100-command limit. Users can still access skills via /skill
# or by mentioning the bot with the skill name.
_skill_slash_enabled = os.getenv("DISCORD_SKILL_SLASH_COMMANDS", "true").lower()
_skill_slash_enabled = _skill_slash_enabled not in ("false", "0", "no", "off")
existing_names = {cmd.name for cmd in tree.get_commands()}
remaining_slots = max(0, _DISCORD_CMD_LIMIT - len(existing_names))
skill_entries, skipped = discord_skill_commands(
max_slots=remaining_slots,
reserved_names=existing_names,
if not _skill_slash_enabled:
logger.info(
"[%s] Discord skill slash commands disabled (DISCORD_SKILL_SLASH_COMMANDS=false). "
"Skills accessible via /skill or text mention.",
self.name,
)
else:
_DISCORD_CMD_LIMIT = 100
try:
from hermes_cli.commands import discord_skill_commands
for discord_name, description, cmd_key in skill_entries:
# Closure factory to capture cmd_key per iteration
def _make_skill_handler(_key: str):
async def _skill_slash(interaction: discord.Interaction, args: str = ""):
await self._run_simple_slash(interaction, f"{_key} {args}".strip())
return _skill_slash
existing_names = {cmd.name for cmd in tree.get_commands()}
remaining_slots = max(0, _DISCORD_CMD_LIMIT - len(existing_names))
handler = _make_skill_handler(cmd_key)
handler.__name__ = f"skill_{discord_name.replace('-', '_')}"
cmd = discord.app_commands.Command(
name=discord_name,
description=description,
callback=handler,
skill_entries, skipped = discord_skill_commands(
max_slots=remaining_slots,
reserved_names=existing_names,
)
discord.app_commands.describe(args="Optional arguments for the skill")(cmd)
tree.add_command(cmd)
if skipped:
logger.warning(
"[%s] Discord slash command limit reached (%d): %d skill(s) not registered",
self.name, _DISCORD_CMD_LIMIT, skipped,
)
except Exception as exc:
logger.warning("[%s] Failed to register skill slash commands: %s", self.name, exc)
for discord_name, description, cmd_key in skill_entries:
# Closure factory to capture cmd_key per iteration
def _make_skill_handler(_key: str):
async def _skill_slash(interaction: discord.Interaction, args: str = ""):
await self._run_simple_slash(interaction, f"{_key} {args}".strip())
return _skill_slash
handler = _make_skill_handler(cmd_key)
handler.__name__ = f"skill_{discord_name.replace('-', '_')}"
cmd = discord.app_commands.Command(
name=discord_name,
description=description,
callback=handler,
)
discord.app_commands.describe(args="Optional arguments for the skill")(cmd)
tree.add_command(cmd)
if skipped:
logger.warning(
"[%s] Discord slash command limit reached (%d): %d skill(s) not registered. "
"Set DISCORD_SKILL_SLASH_COMMANDS=false to disable skill slash commands "
"and use /skill or text mentions instead.",
self.name, _DISCORD_CMD_LIMIT, skipped,
)
except Exception as exc:
logger.warning("[%s] Failed to register skill slash commands: %s", self.name, exc)
def _build_slash_event(self, interaction: discord.Interaction, text: str) -> MessageEvent:
"""Build a MessageEvent from a Discord slash command interaction."""

View File

@@ -32,7 +32,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 7
SCHEMA_VERSION = 6
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@@ -66,7 +66,6 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT,
pricing_version TEXT,
title TEXT,
profile TEXT,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);
@@ -87,7 +86,6 @@ CREATE TABLE IF NOT EXISTS messages (
);
CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
@@ -332,19 +330,6 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add profile column to sessions for profile isolation (#323)
try:
cursor.execute('ALTER TABLE sessions ADD COLUMN "profile" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_profile ON sessions(profile)"
)
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@@ -377,19 +362,13 @@ class SessionDB:
system_prompt: str = None,
user_id: str = None,
parent_session_id: str = None,
profile: str = None,
) -> str:
"""Create a new session record. Returns the session_id.
Args:
profile: Profile name for session isolation. When set, sessions
are tagged so queries can filter by profile. (#323)
"""
"""Create a new session record. Returns the session_id."""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config,
system_prompt, parent_session_id, profile, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
system_prompt, parent_session_id, started_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
session_id,
source,
@@ -398,7 +377,6 @@ class SessionDB:
json.dumps(model_config) if model_config else None,
system_prompt,
parent_session_id,
profile,
time.time(),
),
)
@@ -527,23 +505,19 @@ class SessionDB:
session_id: str,
source: str = "unknown",
model: str = None,
profile: str = None,
) -> None:
"""Ensure a session row exists, creating it with minimal metadata if absent.
Used by _flush_messages_to_session_db to recover from a failed
create_session() call (e.g. transient SQLite lock at agent startup).
INSERT OR IGNORE is safe to call even when the row already exists.
Args:
profile: Profile name for session isolation. (#323)
"""
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions
(id, source, model, profile, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, source, model, profile, time.time()),
(id, source, model, started_at)
VALUES (?, ?, ?, ?)""",
(session_id, source, model, time.time()),
)
self._execute_write(_do)
@@ -814,7 +788,6 @@ class SessionDB:
limit: int = 20,
offset: int = 0,
include_children: bool = False,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions with preview (first user message) and last active timestamp.
@@ -826,10 +799,6 @@ class SessionDB:
By default, child sessions (subagent runs, compression continuations)
are excluded. Pass ``include_children=True`` to include them.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
@@ -844,9 +813,6 @@ class SessionDB:
placeholders = ",".join("?" for _ in exclude_sources)
where_clauses.append(f"s.source NOT IN ({placeholders})")
params.extend(exclude_sources)
if profile:
where_clauses.append("s.profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"""
@@ -1192,52 +1158,34 @@ class SessionDB:
source: str = None,
limit: int = 20,
offset: int = 0,
profile: str = None,
) -> List[Dict[str, Any]]:
"""List sessions, optionally filtered by source and profile.
Args:
profile: Filter sessions to this profile name. Pass None to see all.
(#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
query = f"SELECT * FROM sessions {where_sql} ORDER BY started_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
"""List sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(query, params)
if source:
cursor = self._conn.execute(
"SELECT * FROM sessions WHERE source = ? ORDER BY started_at DESC LIMIT ? OFFSET ?",
(source, limit, offset),
)
else:
cursor = self._conn.execute(
"SELECT * FROM sessions ORDER BY started_at DESC LIMIT ? OFFSET ?",
(limit, offset),
)
return [dict(row) for row in cursor.fetchall()]
# =========================================================================
# Utility
# =========================================================================
def session_count(self, source: str = None, profile: str = None) -> int:
"""Count sessions, optionally filtered by source and profile.
Args:
profile: Filter to this profile name. Pass None to count all. (#323)
"""
where_clauses = []
params = []
if source:
where_clauses.append("source = ?")
params.append(source)
if profile:
where_clauses.append("profile = ?")
params.append(profile)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
def session_count(self, source: str = None) -> int:
"""Count sessions, optionally filtered by source."""
with self._lock:
cursor = self._conn.execute(f"SELECT COUNT(*) FROM sessions {where_sql}", params)
if source:
cursor = self._conn.execute(
"SELECT COUNT(*) FROM sessions WHERE source = ?", (source,)
)
else:
cursor = self._conn.execute("SELECT COUNT(*) FROM sessions")
return cursor.fetchone()[0]
def message_count(self, session_id: str = None) -> int:

View File

@@ -0,0 +1,122 @@
"""Tests for gateway config validation — #328 config debt fixes."""
import os
import logging
from unittest.mock import patch
import pytest
from gateway.config import (
SessionResetPolicy,
GatewayConfig,
Platform,
load_gateway_config,
)
class TestSessionResetPolicyValidation:
"""Tests for early validation in SessionResetPolicy.from_dict."""
def test_valid_idle_minutes(self):
policy = SessionResetPolicy.from_dict({"idle_minutes": 30})
assert policy.idle_minutes == 30
def test_zero_idle_minutes_rejected(self):
"""idle_minutes=0 must be rejected and default to 1440."""
policy = SessionResetPolicy.from_dict({"idle_minutes": 0})
assert policy.idle_minutes == 1440
def test_negative_idle_minutes_rejected(self):
"""Negative idle_minutes must be rejected and default to 1440."""
policy = SessionResetPolicy.from_dict({"idle_minutes": -10})
assert policy.idle_minutes == 1440
def test_string_idle_minutes_rejected(self):
"""Non-integer idle_minutes must be rejected."""
policy = SessionResetPolicy.from_dict({"idle_minutes": "abc"})
assert policy.idle_minutes == 1440
def test_absurdly_large_idle_minutes_capped(self):
"""idle_minutes exceeding 1 year must be capped."""
policy = SessionResetPolicy.from_dict({"idle_minutes": 9999999})
assert policy.idle_minutes == 525600
def test_none_idle_minutes_uses_default(self):
"""None idle_minutes should use default 1440."""
policy = SessionResetPolicy.from_dict({"idle_minutes": None})
assert policy.idle_minutes == 1440
def test_valid_at_hour(self):
policy = SessionResetPolicy.from_dict({"at_hour": 12})
assert policy.at_hour == 12
def test_invalid_at_hour_rejected(self):
"""at_hour outside 0-23 must be rejected."""
policy = SessionResetPolicy.from_dict({"at_hour": 25})
assert policy.at_hour == 4
def test_negative_at_hour_rejected(self):
policy = SessionResetPolicy.from_dict({"at_hour": -1})
assert policy.at_hour == 4
def test_string_at_hour_rejected(self):
policy = SessionResetPolicy.from_dict({"at_hour": "noon"})
assert policy.at_hour == 4
def test_invalid_mode_rejected(self):
"""Invalid mode must fall back to 'both'."""
policy = SessionResetPolicy.from_dict({"mode": "invalid"})
assert policy.mode == "both"
def test_valid_modes_accepted(self):
for mode in ("daily", "idle", "both", "none"):
policy = SessionResetPolicy.from_dict({"mode": mode})
assert policy.mode == mode
def test_all_defaults(self):
"""Empty dict should produce all defaults."""
policy = SessionResetPolicy.from_dict({})
assert policy.mode == "both"
assert policy.at_hour == 4
assert policy.idle_minutes == 1440
assert policy.notify is True
assert policy.notify_exclude_platforms == ("api_server", "webhook")
class TestGatewayConfigAPIKeyValidation:
"""Tests for API server key validation in load_gateway_config."""
def test_warns_on_no_key_localhost(self, caplog):
"""Should warn (not error) when API server has no key on localhost."""
with patch.dict(os.environ, {
"API_SERVER_ENABLED": "true",
"API_SERVER_KEY": "",
}, clear=False):
# Clear the key if it was set
os.environ.pop("API_SERVER_KEY", None)
os.environ["API_SERVER_ENABLED"] = "true"
with caplog.at_level(logging.WARNING):
config = load_gateway_config()
# Should have a warning about unauthenticated API server
assert any(
"API_SERVER_KEY" in r.message or "No API key" in r.message
for r in caplog.records
if r.levelno >= logging.WARNING
) or Platform.API_SERVER in config.platforms # at minimum, the platform should load
class TestWeakCredentialExpansion:
"""Tests that API provider keys are included in weak credential checks."""
def test_openrouter_key_in_min_lengths(self):
from gateway.config import _MIN_TOKEN_LENGTHS
assert "OPENROUTER_API_KEY" in _MIN_TOKEN_LENGTHS
assert _MIN_TOKEN_LENGTHS["OPENROUTER_API_KEY"] == 20
def test_anthropic_key_in_min_lengths(self):
from gateway.config import _MIN_TOKEN_LENGTHS
assert "ANTHROPIC_API_KEY" in _MIN_TOKEN_LENGTHS
def test_openai_key_in_min_lengths(self):
from gateway.config import _MIN_TOKEN_LENGTHS
assert "OPENAI_API_KEY" in _MIN_TOKEN_LENGTHS

View File

@@ -1,175 +0,0 @@
"""
Tests for hardcoded ~/.hermes path detection (poka-yoke).
These tests verify that the pre-commit hook correctly detects hardcoded
paths and that the codebase uses get_hermes_home() correctly.
"""
import os
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
# Import the scanner
import sys
sys.path.insert(0, str(Path(__file__).parent.parent / ".githooks"))
from check_hardcoded_paths import scan_line_for_hardcoded_paths, Finding
class TestHardcodedPathDetection:
"""Test the hardcoded path detection logic."""
def test_detects_path_home_hermes(self):
"""Detect Path.home() / '.hermes' pattern."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
assert "Path.home()" in findings[0].message
def test_detects_path_home_hermes_subpath(self):
"""Detect Path.home() / '.hermes' / 'subdir' pattern."""
line = ' config_dir = Path.home() / ".hermes" / "config"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_tilde_hermes_in_string(self):
"""Detect '~/.hermes' in string literals."""
line = ' path = "~/.hermes/config.yaml"'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_expanduser_hermes(self):
"""Detect os.path.expanduser('~/.hermes') pattern."""
line = ' home = os.path.expanduser("~/.hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_detects_join_expanduser(self):
"""Detect os.path.join(expanduser('~'), '.hermes') pattern."""
line = ' home = os.path.join(os.path.expanduser("~"), ".hermes")'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 1
def test_ignores_comments(self):
"""Ignore hardcoded paths in comments."""
line = ' # This is ~/.hermes in a comment'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_docstrings(self):
"""Ignore hardcoded paths in docstrings."""
line = ' """This mentions ~/.hermes in a docstring."""'
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
def test_ignores_hermes_constants(self):
"""hermes_constants.py is allowed to have hardcoded paths."""
line = ' return Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "hermes_constants.py", 1))
assert len(findings) == 0
def test_ignores_test_files(self):
"""Test files can have hardcoded paths for testing."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "test_something.py", 1))
assert len(findings) == 0
def test_ignores_markdown_files(self):
"""Markdown files can have hardcoded paths in examples."""
line = ' home = Path.home() / ".hermes"'
findings = list(scan_line_for_hardcoded_paths(line, "README.md", 1))
assert len(findings) == 0
def test_ignores_empty_lines(self):
"""Empty lines should not produce findings."""
line = ""
findings = list(scan_line_for_hardcoded_paths(line, "test.py", 1))
assert len(findings) == 0
class TestHermesHomeUsage:
"""Test that the codebase uses get_hermes_home() correctly."""
def test_hermes_constants_has_get_hermes_home(self):
"""hermes_constants.py should export get_hermes_home()."""
from hermes_constants import get_hermes_home
assert callable(get_hermes_home)
def test_hermes_constants_has_display_hermes_home(self):
"""hermes_constants.py should export display_hermes_home()."""
from hermes_constants import display_hermes_home
assert callable(display_hermes_home)
def test_get_hermes_home_returns_path(self):
"""get_hermes_home() should return a Path object."""
from hermes_constants import get_hermes_home
result = get_hermes_home()
assert isinstance(result, Path)
def test_get_hermes_home_honors_env_var(self):
"""get_hermes_home() should honor HERMES_HOME env var."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir:
with patch.dict(os.environ, {"HERMES_HOME": tmpdir}):
result = get_hermes_home()
assert result == Path(tmpdir)
def test_display_hermes_home_returns_string(self):
"""display_hermes_home() should return a string."""
from hermes_constants import display_hermes_home
result = display_hermes_home()
assert isinstance(result, str)
def test_display_hermes_home_uses_tilde_shorthand(self):
"""display_hermes_home() should use ~/ shorthand for home directory."""
from hermes_constants import display_hermes_home, get_hermes_home
# If HERMES_HOME is under home directory, should use ~/
home = get_hermes_home()
if home.is_relative_to(Path.home()):
result = display_hermes_home()
assert result.startswith("~/")
def test_profile_isolation_with_env_var(self):
"""Each profile should have its own HERMES_HOME."""
from hermes_constants import get_hermes_home
with tempfile.TemporaryDirectory() as tmpdir1, tempfile.TemporaryDirectory() as tmpdir2:
# Profile 1
with patch.dict(os.environ, {"HERMES_HOME": tmpdir1}):
home1 = get_hermes_home()
# Profile 2
with patch.dict(os.environ, {"HERMES_HOME": tmpdir2}):
home2 = get_hermes_home()
assert home1 != home2
assert home1 == Path(tmpdir1)
assert home2 == Path(tmpdir2)
class TestPreCommitHookIntegration:
"""Integration tests for the pre-commit hook."""
def test_hook_script_exists(self):
"""The check_hardcoded_paths.py script should exist."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.exists()
def test_hook_script_is_executable(self):
"""The check_hardcoded_paths.py script should be executable."""
hook_path = Path(__file__).parent.parent / ".githooks" / "check_hardcoded_paths.py"
assert hook_path.stat().st_mode & 0o111 # Check executable bits
def test_pre_commit_calls_hardcoded_check(self):
"""pre-commit.py should call the hardcoded path check."""
pre_commit_path = Path(__file__).parent.parent / ".githooks" / "pre-commit.py"
content = pre_commit_path.read_text()
assert "check_hardcoded_paths.py" in content
if __name__ == "__main__":
pytest.main([__file__, "-v"])