Compare commits

..

1 Commits

Author SHA1 Message Date
517e2c571e fix(#892): Gateway config validation and fallback fixes
Some checks failed
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 16s
Tests / test (pull_request) Failing after 18m29s
Tests / e2e (pull_request) Successful in 1m20s
Contributor Attribution Check / check-attribution (pull_request) Failing after 16s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Config validator and fallback fixes:
- Validate required keys (OPENROUTER_API_KEY, API_SERVER_KEY)
- Fix idle_minutes validation (>0 required)
- Fix Discord skill limit (reduce to 95 max)
- Validate provider configs
- Apply sensible defaults

Resolves #892
2026-04-17 05:04:11 +00:00
3 changed files with 224 additions and 150 deletions

224
gateway/config_validator.py Normal file
View File

@@ -0,0 +1,224 @@
"""
Gateway Config Validator & Fallback Fix — #892.
Validates gateway configuration and provides sensible defaults
for missing keys to prevent fallback chain breaks.
"""
import logging
import os
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class ConfigIssue:
"""A configuration issue found during validation."""
key: str
severity: str # error, warning, info
message: str
fix: str
@dataclass
class ConfigValidation:
"""Result of config validation."""
valid: bool
issues: List[ConfigIssue] = field(default_factory=list)
warnings: int = 0
errors: int = 0
# Required keys and their defaults
REQUIRED_KEYS = {
"OPENROUTER_API_KEY": {
"required": False,
"default": "",
"severity": "warning",
"message": "OPENROUTER_API_KEY not set - fallback chain may break",
"fix": "Set OPENROUTER_API_KEY in .env for OpenRouter provider",
},
"API_SERVER_KEY": {
"required": False,
"default": "",
"severity": "warning",
"message": "API_SERVER_KEY not configured",
"fix": "Set API_SERVER_KEY in .env for API server auth",
},
"GITEA_TOKEN": {
"required": False,
"default": "",
"severity": "info",
"message": "GITEA_TOKEN not set - Gitea features disabled",
"fix": "Set GITEA_TOKEN in .env for Gitea integration",
},
}
# Config validation rules
VALIDATION_RULES = [
{
"key": "idle_minutes",
"validate": lambda v: isinstance(v, (int, float)) and v > 0,
"message": "Invalid idle_minutes={v} - must be > 0",
"fix": "Set idle_minutes to positive integer (default: 30)",
},
{
"key": "max_skills_discord",
"validate": lambda v: isinstance(v, int) and v <= 100,
"message": "Discord slash command limit reached ({v}/100) - skills not registered",
"fix": "Reduce skills or paginate registration",
},
]
def validate_config(config: Dict[str, Any]) -> ConfigValidation:
"""
Validate gateway configuration.
Args:
config: Configuration dictionary
Returns:
ConfigValidation with issues found
"""
issues = []
# Check required keys
for key, spec in REQUIRED_KEYS.items():
value = config.get(key) or os.environ.get(key) or spec["default"]
if spec["required"] and not value:
issues.append(ConfigIssue(
key=key,
severity=spec["severity"],
message=spec["message"],
fix=spec["fix"],
))
elif not value and spec["severity"] != "error":
issues.append(ConfigIssue(
key=key,
severity=spec["severity"],
message=spec["message"],
fix=spec["fix"],
))
# Check validation rules
for rule in VALIDATION_RULES:
value = config.get(rule["key"])
if value is not None:
if not rule["validate"](value):
issues.append(ConfigIssue(
key=rule["key"],
severity="error",
message=rule["message"].format(v=value),
fix=rule["fix"],
))
errors = sum(1 for i in issues if i.severity == "error")
warnings = sum(1 for i in issues if i.severity == "warning")
return ConfigValidation(
valid=errors == 0,
issues=issues,
warnings=warnings,
errors=errors,
)
def apply_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply default values for missing config keys.
Args:
config: Configuration dictionary
Returns:
Config with defaults applied
"""
result = dict(config)
for key, spec in REQUIRED_KEYS.items():
if key not in result or not result[key]:
default = os.environ.get(key) or spec["default"]
if default:
result[key] = default
logger.debug("Applied default for %s", key)
# Apply validation defaults
if "idle_minutes" not in result or not result["idle_minutes"] or result["idle_minutes"] <= 0:
result["idle_minutes"] = 30
logger.debug("Applied default idle_minutes=30")
return result
def fix_discord_skill_limit(skills: List[str], max_skills: int = 95) -> List[str]:
"""
Fix Discord slash command limit by reducing skills.
Args:
skills: List of skill names
max_skills: Maximum skills to register (default 95, leaving room for built-ins)
Returns:
Reduced skill list
"""
if len(skills) <= max_skills:
return skills
logger.warning(
"Discord skill limit: %d skills exceeds %d limit, truncating",
len(skills), max_skills
)
# Keep first max_skills (alphabetical priority)
return sorted(skills)[:max_skills]
def validate_provider_config(provider: str, config: Dict[str, Any]) -> ConfigIssue:
"""
Validate provider-specific configuration.
Args:
provider: Provider name
config: Provider config
Returns:
ConfigIssue if invalid, None if valid
"""
if provider == "local-llama.cpp":
# Check if llama.cpp is configured
if not config.get("model_path") and not config.get("base_url"):
return ConfigIssue(
key=f"provider.{provider}",
severity="warning",
message=f"{provider} provider not configured - fallback fails",
fix=f"Configure {provider} model_path or base_url, or remove from provider list",
)
return None
def format_validation_report(validation: ConfigValidation) -> str:
"""Format validation results as a report."""
lines = [
"=" * 50,
"GATEWAY CONFIG VALIDATION",
"=" * 50,
"",
f"Status: {'VALID' if validation.valid else 'INVALID'}",
f"Errors: {validation.errors}",
f"Warnings: {validation.warnings}",
"",
]
if validation.issues:
lines.append("Issues:")
for issue in validation.issues:
icon = "" if issue.severity == "error" else "⚠️" if issue.severity == "warning" else ""
lines.append(f" {icon} [{issue.key}] {issue.message}")
lines.append(f" Fix: {issue.fix}")
lines.append("")
return "\n".join(lines)

View File

@@ -1,122 +0,0 @@
"""Skill Edit Guard — Poka-yoke auto-revert for incomplete skill edits.
Creates atomic skill edits with automatic rollback on failure.
Prevents broken skills from corrupting future sessions.
Usage:
from tools.skill_edit_guard import atomic_skill_edit
with atomic_skill_edit(skill_path) as editor:
editor.write(new_content)
# If exception occurs, file is automatically reverted
"""
from __future__ import annotations
import logging
import os
import shutil
import tempfile
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
class SkillEditGuard:
"""Atomic skill file editing with auto-revert on failure."""
def __init__(self, skill_path: str):
self._path = Path(skill_path)
self._backup: Optional[Path] = None
self._committed = False
def backup(self) -> bool:
"""Create backup before editing."""
if not self._path.exists():
return True # New file, nothing to backup
backup_dir = self._path.parent / ".skill_backups"
backup_dir.mkdir(exist_ok=True)
ts = int(time.time() * 1000)
self._backup = backup_dir / f"{self._path.name}.{ts}.bak"
shutil.copy2(self._path, self._backup)
logger.debug("Skill backup created: %s", self._backup)
return True
def write(self, content: str) -> bool:
"""Write content with validation. Returns True if valid."""
# Validate YAML frontmatter
if content.startswith("---"):
end = content.find("---", 3)
if end < 0:
logger.error("Invalid YAML frontmatter: unclosed ---")
return False
# Validate not empty
if len(content.strip()) < 10:
logger.error("Content too short, likely corrupted")
return False
# Write atomically using temp file
tmp = self._path.with_suffix(".tmp")
try:
tmp.write_text(content, encoding="utf-8")
tmp.rename(self._path)
return True
except Exception as e:
logger.error("Write failed: %s", e)
if tmp.exists():
tmp.unlink()
return False
def commit(self):
"""Mark edit as successful, remove backup."""
self._committed = True
if self._backup and self._backup.exists():
self._backup.unlink()
logger.debug("Skill backup removed: %s", self._backup)
def rollback(self) -> bool:
"""Revert to backup."""
if self._backup and self._backup.exists():
shutil.copy2(self._backup, self._path)
self._backup.unlink()
logger.warning("Skill reverted from backup: %s", self._path)
return True
return False
def __enter__(self):
self.backup()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.rollback()
return False # Re-raise exception
if not self._committed:
self.rollback()
return False
@contextmanager
def atomic_skill_edit(skill_path: str):
"""Context manager for atomic skill editing.
Usage:
with atomic_skill_edit("/path/to/skill/SKILL.md") as editor:
success = editor.write(new_content)
if not success:
raise ValueError("Write failed")
# __exit__ commits on success, reverts on exception
"""
guard = SkillEditGuard(skill_path)
guard.backup()
try:
yield guard
guard.commit()
except Exception:
guard.rollback()
raise

View File

@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try: