Compare commits

..

2 Commits

Author SHA1 Message Date
244dae106d test: Add dependency resolver tests (#754)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 33s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 27s
Tests / e2e (pull_request) Successful in 3m19s
Tests / test (pull_request) Failing after 59m22s
2026-04-15 03:46:10 +00:00
a606660c9d feat: Add skill dependency resolver (#754) 2026-04-15 03:45:53 +00:00
4 changed files with 266 additions and 233 deletions

View File

@@ -1,41 +0,0 @@
"""
Tests for cost estimator tool (#745).
"""
import pytest
from tools.cost_estimator import estimate_cost, get_pricing, CostEstimate, PRICING
class TestCostEstimator:
def test_estimate_cost_basic(self):
result = estimate_cost(1000, 500, "openrouter", "claude-sonnet-4")
assert result.input_tokens == 1000
assert result.output_tokens == 500
assert result.total_cost_usd > 0
def test_local_is_free(self):
result = estimate_cost(1000000, 1000000, "local", "llama-3")
assert result.total_cost_usd == 0.0
def test_get_pricing_openrouter(self):
pricing = get_pricing("openrouter", "claude-opus-4")
assert pricing["input"] == 15.0
assert pricing["output"] == 75.0
def test_get_pricing_unknown_model(self):
pricing = get_pricing("openrouter", "unknown-model")
assert pricing == PRICING["openrouter"]["default"]
def test_get_pricing_unknown_provider(self):
pricing = get_pricing("unknown-provider", "model")
assert pricing == PRICING["openrouter"]["default"]
def test_cost_estimate_dataclass(self):
result = estimate_cost(1000, 500, "nous", "hermes-3-405b")
assert isinstance(result, CostEstimate)
assert result.provider == "nous"
assert result.model == "hermes-3-405b"
if __name__ == "__main__":
pytest.main([__file__])

81
tests/test_skill_deps.py Normal file
View File

@@ -0,0 +1,81 @@
"""
Tests for skill dependency resolver
Issue: #754
"""
import unittest
from unittest.mock import patch, MagicMock
from tools.skill_deps import (
parse_requires,
check_dependency,
check_dependencies,
resolve_dependencies,
)
class TestParseRequires(unittest.TestCase):
def test_list(self):
fm = {"requires": ["pkg1", "pkg2"]}
self.assertEqual(parse_requires(fm), ["pkg1", "pkg2"])
def test_string(self):
fm = {"requires": "pkg1"}
self.assertEqual(parse_requires(fm), ["pkg1"])
def test_empty(self):
fm = {}
self.assertEqual(parse_requires(fm), [])
def test_none(self):
fm = {"requires": None}
self.assertEqual(parse_requires(fm), [])
class TestCheckDependency(unittest.TestCase):
def test_installed(self):
installed, version = check_dependency("json")
self.assertTrue(installed)
def test_not_installed(self):
installed, version = check_dependency("nonexistent_package_xyz_123")
self.assertFalse(installed)
class TestCheckDependencies(unittest.TestCase):
def test_all_installed(self):
installed, missing = check_dependencies(["json", "os"])
self.assertEqual(len(missing), 0)
def test_some_missing(self):
installed, missing = check_dependencies(["json", "nonexistent_xyz"])
self.assertIn("json", installed)
self.assertIn("nonexistent_xyz", missing)
class TestResolveDependencies(unittest.TestCase):
def test_no_requires(self):
satisfied, installed, missing = resolve_dependencies({})
self.assertTrue(satisfied)
def test_all_satisfied(self):
satisfied, installed, missing = resolve_dependencies(
{"requires": ["json"]}, auto_install=False
)
self.assertTrue(satisfied)
def test_missing_no_auto(self):
satisfied, installed, missing = resolve_dependencies(
{"requires": ["nonexistent_xyz"]}, auto_install=False
)
self.assertFalse(satisfied)
self.assertIn("nonexistent_xyz", missing)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,192 +0,0 @@
"""
Provider Cost Estimator — Estimate API costs from token counts.
Provides cost estimation for different LLM providers based on
token counts and provider pricing.
"""
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
@dataclass
class CostEstimate:
"""Cost estimate for a request."""
input_tokens: int
output_tokens: int
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float
provider: str
model: str
# Pricing table (USD per 1M tokens) — as of April 2026
PRICING = {
"openrouter": {
"claude-opus-4": {"input": 15.0, "output": 75.0},
"claude-sonnet-4": {"input": 3.0, "output": 15.0},
"claude-haiku-3.5": {"input": 0.80, "output": 4.0},
"gpt-4o": {"input": 2.50, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gemini-2.5-pro": {"input": 1.25, "output": 10.0},
"gemini-2.5-flash": {"input": 0.15, "output": 0.60},
"llama-4-scout": {"input": 0.20, "output": 0.80},
"llama-4-maverick": {"input": 0.50, "output": 2.0},
"default": {"input": 1.0, "output": 3.0},
},
"nous": {
"hermes-3-405b": {"input": 5.0, "output": 5.0},
"mixtral-8x22b": {"input": 2.0, "output": 2.0},
"hermes-2-mixtral-8x7b": {"input": 0.90, "output": 0.90},
"default": {"input": 2.0, "output": 2.0},
},
"anthropic": {
"claude-opus-4": {"input": 15.0, "output": 75.0},
"claude-sonnet-4": {"input": 3.0, "output": 15.0},
"claude-haiku-3.5": {"input": 0.80, "output": 4.0},
"default": {"input": 3.0, "output": 15.0},
},
"local": {
# Local models are free (electricity only)
"default": {"input": 0.0, "output": 0.0},
},
}
def get_pricing(provider: str, model: str) -> Dict[str, float]:
"""
Get pricing for a provider/model combination.
Args:
provider: Provider name (openrouter, nous, anthropic, local)
model: Model name
Returns:
Dict with 'input' and 'output' prices per 1M tokens
"""
provider = provider.lower().strip()
model = model.lower().strip()
provider_pricing = PRICING.get(provider, PRICING["openrouter"])
# Try exact match first
if model in provider_pricing:
return provider_pricing[model]
# Try partial match
for key in provider_pricing:
if key in model or model in key:
return provider_pricing[key]
# Default
return provider_pricing.get("default", {"input": 1.0, "output": 3.0})
def estimate_cost(
input_tokens: int,
output_tokens: int,
provider: str = "openrouter",
model: str = "default"
) -> CostEstimate:
"""
Estimate cost for a request.
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
provider: Provider name
model: Model name
Returns:
CostEstimate with breakdown
"""
pricing = get_pricing(provider, model)
# Calculate costs (pricing is per 1M tokens)
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
total_cost = input_cost + output_cost
return CostEstimate(
input_tokens=input_tokens,
output_tokens=output_tokens,
input_cost_usd=input_cost,
output_cost_usd=output_cost,
total_cost_usd=total_cost,
provider=provider,
model=model,
)
def estimate_session_cost(messages: list, provider: str = "openrouter", model: str = "default") -> CostEstimate:
"""
Estimate cost for a session based on message count.
Args:
messages: List of messages (each with 'role' and 'content')
provider: Provider name
model: Model name
Returns:
CostEstimate for the session
"""
# Rough token estimation: ~4 chars per token
input_tokens = 0
output_tokens = 0
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
tokens = len(content) // 4
if msg.get("role") == "user":
input_tokens += tokens
elif msg.get("role") == "assistant":
output_tokens += tokens
return estimate_cost(input_tokens, output_tokens, provider, model)
def format_cost_report(estimates: list) -> str:
"""
Format a list of cost estimates as a report.
Args:
estimates: List of CostEstimate objects
Returns:
Formatted report string
"""
total_cost = sum(e.total_cost_usd for e in estimates)
total_input = sum(e.input_tokens for e in estimates)
total_output = sum(e.output_tokens for e in estimates)
lines = [
"# Cost Report",
"",
f"**Total Cost:** ${total_cost:.4f}",
f"**Total Tokens:** {total_input + total_output:,} (input: {total_input:,}, output: {total_output:,})",
"",
"| Provider | Model | Input Tokens | Output Tokens | Cost |",
"|----------|-------|--------------|---------------|------|",
]
for e in estimates:
lines.append(f"| {e.provider} | {e.model} | {e.input_tokens:,} | {e.output_tokens:,} | ${e.total_cost_usd:.4f} |")
lines.append("")
lines.append(f"*Generated by cost_estimator.py*")
return "\n".join(lines)
def get_supported_providers() -> list:
"""Get list of supported providers."""
return list(PRICING.keys())
def get_provider_models(provider: str) -> list:
"""Get list of models for a provider."""
provider = provider.lower().strip()
provider_pricing = PRICING.get(provider, {})
return [k for k in provider_pricing.keys() if k != "default"]

185
tools/skill_deps.py Normal file
View File

@@ -0,0 +1,185 @@
"""
Skill Dependency Resolver — Auto-install missing dependencies
Checks skill frontmatter for `requires` field and ensures
dependencies are installed before loading the skill.
Issue: #754
"""
import importlib
import logging
import subprocess
import sys
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
def parse_requires(frontmatter: Dict[str, Any]) -> List[str]:
"""
Parse the `requires` field from skill frontmatter.
Supports:
- requires: [package1, package2]
- requires: package1
- requires:
- package1
- package2
"""
requires = frontmatter.get("requires", [])
if isinstance(requires, str):
return [requires]
if isinstance(requires, list):
return [str(r) for r in requires if r]
return []
def check_dependency(package: str) -> Tuple[bool, str]:
"""
Check if a Python package is installed.
Returns:
Tuple of (is_installed, version_or_error)
"""
# Handle pip package names (e.g., "matrix-nio[e2e]")
import_name = package.split("[")[0].replace("-", "_")
try:
mod = importlib.import_module(import_name)
version = getattr(mod, "__version__", "installed")
return True, version
except ImportError:
return False, "not installed"
def check_dependencies(requires: List[str]) -> Tuple[List[str], List[str]]:
"""
Check which dependencies are missing.
Returns:
Tuple of (installed, missing)
"""
installed = []
missing = []
for pkg in requires:
is_installed, _ = check_dependency(pkg)
if is_installed:
installed.append(pkg)
else:
missing.append(pkg)
return installed, missing
def install_dependency(package: str, quiet: bool = False) -> Tuple[bool, str]:
"""
Install a Python package via pip.
Returns:
Tuple of (success, output_or_error)
"""
try:
cmd = [sys.executable, "-m", "pip", "install", package]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120
)
if result.returncode == 0:
if not quiet:
logger.info("Installed %s", package)
return True, result.stdout
else:
logger.error("Failed to install %s: %s", package, result.stderr)
return False, result.stderr
except subprocess.TimeoutExpired:
return False, "Installation timed out"
except Exception as e:
return False, str(e)
def resolve_dependencies(
frontmatter: Dict[str, Any],
auto_install: bool = False,
quiet: bool = False
) -> Tuple[bool, List[str], List[str]]:
"""
Resolve skill dependencies.
Args:
frontmatter: Skill frontmatter dict
auto_install: If True, install missing deps without asking
quiet: If True, suppress output
Returns:
Tuple of (all_satisfied, installed_now, still_missing)
"""
requires = parse_requires(frontmatter)
if not requires:
return True, [], []
installed, missing = check_dependencies(requires)
if not missing:
if not quiet:
logger.debug("All dependencies satisfied: %s", installed)
return True, [], []
if not auto_install:
if not quiet:
logger.warning("Missing dependencies: %s", missing)
return False, [], missing
# Auto-install missing dependencies
installed_now = []
still_missing = []
for pkg in missing:
if not quiet:
logger.info("Installing missing dependency: %s", pkg)
success, output = install_dependency(pkg, quiet=quiet)
if success:
installed_now.append(pkg)
else:
still_missing.append(pkg)
logger.error("Failed to install %s: %s", pkg, output[:200])
all_satisfied = len(still_missing) == 0
return all_satisfied, installed_now, still_missing
def check_skill_dependencies(skill_dir) -> Dict[str, Any]:
"""
Check dependencies for a skill directory.
Returns:
Dict with dependency status
"""
from pathlib import Path
skill_md = Path(skill_dir) / "SKILL.md"
if not skill_md.exists():
return {"requires": [], "installed": [], "missing": [], "satisfied": True}
try:
content = skill_md.read_text()
from agent.skill_utils import parse_frontmatter
frontmatter, _ = parse_frontmatter(content)
except Exception:
return {"requires": [], "installed": [], "missing": [], "satisfied": True}
requires = parse_requires(frontmatter)
installed, missing = check_dependencies(requires)
return {
"requires": requires,
"installed": installed,
"missing": missing,
"satisfied": len(missing) == 0
}