Compare commits

..

1 Commits

Author SHA1 Message Date
bdd0f2709b feat: provider preflight validation before session start (#924)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 47s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 52s
Tests / test (pull_request) Failing after 30m48s
Tests / e2e (pull_request) Successful in 2m9s
2026-04-21 04:48:57 +00:00
2 changed files with 146 additions and 122 deletions

146
agent/provider_preflight.py Normal file
View File

@@ -0,0 +1,146 @@
"""Provider Preflight — Poka-yoke validation of provider/model config.
Validates provider and model configuration before session start.
Prevents wasted context on misconfigured providers.
Usage:
from agent.provider_preflight import preflight_check
result = preflight_check(provider="openrouter", model="xiaomi/mimo-v2-pro")
if not result["valid"]:
print(result["error"])
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# Provider -> required env var
PROVIDER_KEYS = {
"openrouter": "OPENROUTER_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"openai": "OPENAI_API_KEY",
"nous": "NOUS_API_KEY",
"ollama": None, # Local, no key needed
"local": None,
}
def check_provider_key(provider: str) -> Dict[str, Any]:
"""Check if provider has a valid API key configured."""
provider_lower = provider.lower().strip()
env_var = None
for known, key in PROVIDER_KEYS.items():
if known in provider_lower:
env_var = key
break
if env_var is None:
# Unknown provider — assume OK (custom/local)
return {"valid": True, "provider": provider, "key_status": "unknown"}
if env_var is None:
# Local provider, no key needed
return {"valid": True, "provider": provider, "key_status": "not_required"}
key_value = os.getenv(env_var, "").strip()
if not key_value:
return {
"valid": False,
"provider": provider,
"key_status": "missing",
"error": f"{env_var} is not set. Provider '{provider}' will fail.",
"fix": f"Set {env_var} in ~/.hermes/.env",
}
if len(key_value) < 10:
return {
"valid": False,
"provider": provider,
"key_status": "too_short",
"error": f"{env_var} is suspiciously short ({len(key_value)} chars). May be invalid.",
"fix": f"Verify {env_var} value in ~/.hermes/.env",
}
return {"valid": True, "provider": provider, "key_status": "set"}
def check_model_availability(model: str, provider: str) -> Dict[str, Any]:
"""Check if model is likely available for provider."""
if not model:
return {"valid": False, "error": "No model specified"}
# Basic sanity checks
model_lower = model.lower()
# Anthropic models should use anthropic provider
if "claude" in model_lower and "anthropic" not in provider.lower():
return {
"valid": True, # Allow but warn
"warning": f"Model '{model}' usually runs on Anthropic provider, not '{provider}'",
}
# Ollama models
ollama_indicators = ["llama", "mistral", "qwen", "gemma", "phi", "hermes"]
if any(x in model_lower for x in ollama_indicators) and ":" not in model:
return {
"valid": True,
"warning": f"Model '{model}' may need a version tag for Ollama (e.g., {model}:latest)",
}
return {"valid": True}
def preflight_check(
provider: str = "",
model: str = "",
fallback_provider: str = "",
fallback_model: str = "",
) -> Dict[str, Any]:
"""Full pre-flight check for provider/model configuration.
Returns:
Dict with valid (bool), errors (list), warnings (list).
"""
errors = []
warnings = []
# Check primary provider
if provider:
result = check_provider_key(provider)
if not result["valid"]:
errors.append(result.get("error", f"Provider {provider} invalid"))
# Check primary model
if model:
result = check_model_availability(model, provider)
if not result["valid"]:
errors.append(result.get("error", f"Model {model} invalid"))
elif result.get("warning"):
warnings.append(result["warning"])
# Check fallback
if fallback_provider:
result = check_provider_key(fallback_provider)
if not result["valid"]:
warnings.append(f"Fallback provider {fallback_provider} also invalid: {result.get('error','')}")
if fallback_model:
result = check_model_availability(fallback_model, fallback_provider)
if not result["valid"]:
warnings.append(f"Fallback model {fallback_model} invalid")
elif result.get("warning"):
warnings.append(result["warning"])
return {
"valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"provider": provider,
"model": model,
}

View File

@@ -1,122 +0,0 @@
"""Skill Edit Guard — Poka-yoke auto-revert for incomplete skill edits.
Creates atomic skill edits with automatic rollback on failure.
Prevents broken skills from corrupting future sessions.
Usage:
from tools.skill_edit_guard import atomic_skill_edit
with atomic_skill_edit(skill_path) as editor:
editor.write(new_content)
# If exception occurs, file is automatically reverted
"""
from __future__ import annotations
import logging
import os
import shutil
import tempfile
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
class SkillEditGuard:
"""Atomic skill file editing with auto-revert on failure."""
def __init__(self, skill_path: str):
self._path = Path(skill_path)
self._backup: Optional[Path] = None
self._committed = False
def backup(self) -> bool:
"""Create backup before editing."""
if not self._path.exists():
return True # New file, nothing to backup
backup_dir = self._path.parent / ".skill_backups"
backup_dir.mkdir(exist_ok=True)
ts = int(time.time() * 1000)
self._backup = backup_dir / f"{self._path.name}.{ts}.bak"
shutil.copy2(self._path, self._backup)
logger.debug("Skill backup created: %s", self._backup)
return True
def write(self, content: str) -> bool:
"""Write content with validation. Returns True if valid."""
# Validate YAML frontmatter
if content.startswith("---"):
end = content.find("---", 3)
if end < 0:
logger.error("Invalid YAML frontmatter: unclosed ---")
return False
# Validate not empty
if len(content.strip()) < 10:
logger.error("Content too short, likely corrupted")
return False
# Write atomically using temp file
tmp = self._path.with_suffix(".tmp")
try:
tmp.write_text(content, encoding="utf-8")
tmp.rename(self._path)
return True
except Exception as e:
logger.error("Write failed: %s", e)
if tmp.exists():
tmp.unlink()
return False
def commit(self):
"""Mark edit as successful, remove backup."""
self._committed = True
if self._backup and self._backup.exists():
self._backup.unlink()
logger.debug("Skill backup removed: %s", self._backup)
def rollback(self) -> bool:
"""Revert to backup."""
if self._backup and self._backup.exists():
shutil.copy2(self._backup, self._path)
self._backup.unlink()
logger.warning("Skill reverted from backup: %s", self._path)
return True
return False
def __enter__(self):
self.backup()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.rollback()
return False # Re-raise exception
if not self._committed:
self.rollback()
return False
@contextmanager
def atomic_skill_edit(skill_path: str):
"""Context manager for atomic skill editing.
Usage:
with atomic_skill_edit("/path/to/skill/SKILL.md") as editor:
success = editor.write(new_content)
if not success:
raise ValueError("Write failed")
# __exit__ commits on success, reverts on exception
"""
guard = SkillEditGuard(skill_path)
guard.backup()
try:
yield guard
guard.commit()
except Exception:
guard.rollback()
raise