Compare commits

..

2 Commits

Author SHA1 Message Date
bdd0f2709b feat: provider preflight validation before session start (#924)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 47s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 52s
Tests / test (pull_request) Failing after 30m48s
Tests / e2e (pull_request) Successful in 2m9s
2026-04-21 04:48:57 +00:00
c6f2855745 fix: restore _format_error helper for test compatibility (#916)
Some checks failed
Docker Build and Publish / build-and-push (push) Has been skipped
Nix / nix (ubuntu-latest) (push) Failing after 2s
Tests / e2e (push) Successful in 2m47s
Tests / test (push) Failing after 27m41s
Build Skills Index / build-index (push) Has been skipped
Build Skills Index / deploy-with-index (push) Has been skipped
Nix / nix (macos-latest) (push) Has been cancelled
fix: restore _format_error helper for test compatibility (#916)
2026-04-20 23:56:27 +00:00
4 changed files with 174 additions and 232 deletions

146
agent/provider_preflight.py Normal file
View File

@@ -0,0 +1,146 @@
"""Provider Preflight — Poka-yoke validation of provider/model config.
Validates provider and model configuration before session start.
Prevents wasted context on misconfigured providers.
Usage:
from agent.provider_preflight import preflight_check
result = preflight_check(provider="openrouter", model="xiaomi/mimo-v2-pro")
if not result["valid"]:
print(result["error"])
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# Provider -> required env var
PROVIDER_KEYS = {
"openrouter": "OPENROUTER_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"openai": "OPENAI_API_KEY",
"nous": "NOUS_API_KEY",
"ollama": None, # Local, no key needed
"local": None,
}
def check_provider_key(provider: str) -> Dict[str, Any]:
"""Check if provider has a valid API key configured."""
provider_lower = provider.lower().strip()
env_var = None
for known, key in PROVIDER_KEYS.items():
if known in provider_lower:
env_var = key
break
if env_var is None:
# Unknown provider — assume OK (custom/local)
return {"valid": True, "provider": provider, "key_status": "unknown"}
if env_var is None:
# Local provider, no key needed
return {"valid": True, "provider": provider, "key_status": "not_required"}
key_value = os.getenv(env_var, "").strip()
if not key_value:
return {
"valid": False,
"provider": provider,
"key_status": "missing",
"error": f"{env_var} is not set. Provider '{provider}' will fail.",
"fix": f"Set {env_var} in ~/.hermes/.env",
}
if len(key_value) < 10:
return {
"valid": False,
"provider": provider,
"key_status": "too_short",
"error": f"{env_var} is suspiciously short ({len(key_value)} chars). May be invalid.",
"fix": f"Verify {env_var} value in ~/.hermes/.env",
}
return {"valid": True, "provider": provider, "key_status": "set"}
def check_model_availability(model: str, provider: str) -> Dict[str, Any]:
"""Check if model is likely available for provider."""
if not model:
return {"valid": False, "error": "No model specified"}
# Basic sanity checks
model_lower = model.lower()
# Anthropic models should use anthropic provider
if "claude" in model_lower and "anthropic" not in provider.lower():
return {
"valid": True, # Allow but warn
"warning": f"Model '{model}' usually runs on Anthropic provider, not '{provider}'",
}
# Ollama models
ollama_indicators = ["llama", "mistral", "qwen", "gemma", "phi", "hermes"]
if any(x in model_lower for x in ollama_indicators) and ":" not in model:
return {
"valid": True,
"warning": f"Model '{model}' may need a version tag for Ollama (e.g., {model}:latest)",
}
return {"valid": True}
def preflight_check(
provider: str = "",
model: str = "",
fallback_provider: str = "",
fallback_model: str = "",
) -> Dict[str, Any]:
"""Full pre-flight check for provider/model configuration.
Returns:
Dict with valid (bool), errors (list), warnings (list).
"""
errors = []
warnings = []
# Check primary provider
if provider:
result = check_provider_key(provider)
if not result["valid"]:
errors.append(result.get("error", f"Provider {provider} invalid"))
# Check primary model
if model:
result = check_model_availability(model, provider)
if not result["valid"]:
errors.append(result.get("error", f"Model {model} invalid"))
elif result.get("warning"):
warnings.append(result["warning"])
# Check fallback
if fallback_provider:
result = check_provider_key(fallback_provider)
if not result["valid"]:
warnings.append(f"Fallback provider {fallback_provider} also invalid: {result.get('error','')}")
if fallback_model:
result = check_model_availability(fallback_model, fallback_provider)
if not result["valid"]:
warnings.append(f"Fallback model {fallback_model} invalid")
elif result.get("warning"):
warnings.append(result["warning"])
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"provider": provider,
"model": model,
}

View File

@@ -1,156 +0,0 @@
"""Tool fixation detection — break repetitive tool calling loops.
Detects when the agent latches onto one tool and calls it repeatedly
without making progress. Injects a nudge prompt to break the loop.
Usage:
from agent.tool_fixation_detector import ToolFixationDetector
detector = ToolFixationDetector()
nudge = detector.record("execute_code")
if nudge:
# Inject nudge into conversation
messages.append({"role": "system", "content": nudge})
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from typing import Dict, List, Optional
# Default thresholds
_DEFAULT_THRESHOLD = int(os.getenv("TOOL_FIXATION_THRESHOLD", "5"))
_DEFAULT_WINDOW = int(os.getenv("TOOL_FIXATION_WINDOW", "10"))
@dataclass
class FixationEvent:
"""Record of a fixation detection."""
tool_name: str
streak_length: int
threshold: int
nudge_sent: bool = False
class ToolFixationDetector:
"""Detects and breaks tool fixation loops.
Tracks the sequence of tool calls and detects when the same tool
is called N times consecutively. When detected, returns a nudge
prompt to inject into the conversation.
"""
def __init__(self, threshold: int = 0, window: int = 0):
self.threshold = threshold or _DEFAULT_THRESHOLD
self.window = window or _DEFAULT_WINDOW
self._history: List[str] = []
self._current_streak: str = ""
self._streak_count: int = 0
self._nudges_sent: int = 0
self._events: List[FixationEvent] = []
@property
def nudges_sent(self) -> int:
return self._nudges_sent
@property
def events(self) -> List[FixationEvent]:
return list(self._events)
def record(self, tool_name: str) -> Optional[str]:
"""Record a tool call and return nudge prompt if fixation detected.
Args:
tool_name: Name of the tool that was called.
Returns:
Nudge prompt string if fixation detected, None otherwise.
"""
self._history.append(tool_name)
# Trim history to window
if len(self._history) > self.window:
self._history = self._history[-self.window:]
# Update streak
if tool_name == self._current_streak:
self._streak_count += 1
else:
self._current_streak = tool_name
self._streak_count = 1
# Check for fixation
if self._streak_count >= self.threshold:
event = FixationEvent(
tool_name=tool_name,
streak_length=self._streak_count,
threshold=self.threshold,
nudge_sent=True,
)
self._events.append(event)
self._nudges_sent += 1
return self._build_nudge(tool_name, self._streak_count)
return None
def _build_nudge(self, tool_name: str, count: int) -> str:
"""Build a nudge prompt to break the fixation loop."""
return (
f"[SYSTEM: You have called `{tool_name}` {count} times in a row "
f"without switching tools. This suggests a fixation loop. "
f"Consider:\n"
f"1. Is the tool returning an error? Read the error carefully.\n"
f"2. Is there a different tool that could help?\n"
f"3. Should you ask the user for clarification?\n"
f"4. Is the task actually complete?\n"
f"Break the loop by trying a different approach.]"
)
def reset(self) -> None:
"""Reset the detector state."""
self._history.clear()
self._current_streak = ""
self._streak_count = 0
def get_streak_info(self) -> dict:
"""Get current streak information."""
return {
"current_tool": self._current_streak,
"streak_count": self._streak_count,
"threshold": self.threshold,
"at_threshold": self._streak_count >= self.threshold,
"nudges_sent": self._nudges_sent,
}
def format_report(self) -> str:
"""Format fixation events as a report."""
if not self._events:
return "No tool fixation detected."
lines = [
f"Tool Fixation Report ({len(self._events)} events)",
"=" * 40,
]
for e in self._events:
lines.append(f" {e.tool_name}: {e.streak_length} consecutive calls (threshold: {e.threshold})")
return "\n".join(lines)
# Singleton
_detector: Optional[ToolFixationDetector] = None
def get_fixation_detector() -> ToolFixationDetector:
"""Get or create the singleton detector."""
global _detector
if _detector is None:
_detector = ToolFixationDetector()
return _detector
def reset_fixation_detector() -> None:
"""Reset the singleton."""
global _detector
_detector = None

View File

@@ -1,76 +0,0 @@
"""Tests for tool fixation detection."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.tool_fixation_detector import ToolFixationDetector, get_fixation_detector
class TestFixationDetection:
def test_no_fixation_below_threshold(self):
d = ToolFixationDetector(threshold=5)
for i in range(4):
assert d.record("execute_code") is None
def test_fixation_at_threshold(self):
d = ToolFixationDetector(threshold=3)
d.record("execute_code")
d.record("execute_code")
nudge = d.record("execute_code")
assert nudge is not None
assert "execute_code" in nudge
assert "3 times" in nudge
def test_fixation_above_threshold(self):
d = ToolFixationDetector(threshold=3)
d.record("execute_code")
d.record("execute_code")
d.record("execute_code") # threshold hit
nudge = d.record("execute_code") # still nudging
assert nudge is not None
def test_streak_resets_on_different_tool(self):
d = ToolFixationDetector(threshold=3)
d.record("execute_code")
d.record("execute_code")
d.record("terminal") # breaks streak
assert d._streak_count == 1
assert d._current_streak == "terminal"
def test_nudges_sent_counter(self):
d = ToolFixationDetector(threshold=2)
d.record("a")
d.record("a") # nudge 1
d.record("a") # nudge 2
assert d.nudges_sent == 2
def test_events_recorded(self):
d = ToolFixationDetector(threshold=2)
d.record("x")
d.record("x")
assert len(d.events) == 1
assert d.events[0].tool_name == "x"
assert d.events[0].streak_length == 2
def test_report(self):
d = ToolFixationDetector(threshold=2)
d.record("x")
d.record("x")
report = d.format_report()
assert "x" in report
def test_reset(self):
d = ToolFixationDetector(threshold=2)
d.record("x")
d.record("x")
d.reset()
assert d._streak_count == 0
assert d._current_streak == ""
def test_singleton(self):
d1 = get_fixation_detector()
d2 = get_fixation_detector()
assert d1 is d2

View File

@@ -44,6 +44,34 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try: