Compare commits

..

3 Commits

Author SHA1 Message Date
c8186b1d11 test: add cron audit tests (#902)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 16s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 18s
Tests / test (pull_request) Failing after 18m39s
Tests / e2e (pull_request) Successful in 1m30s
2026-04-17 05:13:10 +00:00
db69ef8e3f docs: cron audit documentation (#902) 2026-04-17 05:13:09 +00:00
64dd3fda01 feat: add cron audit and cleanup script (#902) 2026-04-17 05:11:40 +00:00
6 changed files with 472 additions and 407 deletions

50
docs/cron-audit-890.md Normal file
View File

@@ -0,0 +1,50 @@
# Cron Audit & Cleanup
Identifies and removes dead/stale cron jobs that waste scheduler cycles.
## Problem
Of 69 cron jobs, 9 had zero completions — running on schedule but never producing results. Each execution costs API tokens and scheduler cycles.
## Dead Jobs Found (2026-04-14)
| Job | Schedule | Completions |
|-----|----------|-------------|
| exp-swarm-pipeline | every 10 min | 0 |
| exp-music-generator | every 2h | 0 |
| exp-paper-citations | every 3h | 0 |
| exp-gbrain-patterns | every 2h | 0 |
| exp-infra-hardening | every 2h | 0 |
| gemma4-multimodal-burn | every 1h | 0 |
| morning-paper-report | daily | 0 |
| overnight-collector | every 15 min | 0 |
| morning-experiment-report | daily | 0 |
## Usage
```bash
# Show dead jobs (zero completions)
python3 scripts/cron_audit.py
# Show all jobs with status
python3 scripts/cron_audit.py --all
# Find stale jobs (no runs in 7 days)
python3 scripts/cron_audit.py --older-than 7
# Disable dead jobs (pause them)
python3 scripts/cron_audit.py --disable
# Delete dead jobs permanently
python3 scripts/cron_audit.py --delete
# JSON output for automation
python3 scripts/cron_audit.py --json
```
## Job Categories
- **Healthy**: enabled, has completions, ran recently
- **Dead**: enabled, zero completions, exists for N+ days
- **Stale**: enabled, no successful run in N days
- **Disabled**: paused or completed

279
scripts/cron_audit.py Normal file
View File

@@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
Cron Audit & Cleanup — find and remove dead/stale cron jobs.
Identifies jobs that waste scheduler cycles:
- Dead jobs: zero completions despite running for days
- Stale jobs: no successful run in N days
- Error jobs: high error ratio
Usage:
# Show dead jobs (zero completions)
python3 scripts/cron_audit.py
# Show stale jobs (no runs in 7 days)
python3 scripts/cron_audit.py --older-than 7
# Show all jobs with status
python3 scripts/cron_audit.py --all
# Disable dead jobs (sets enabled=False, state=paused)
python3 scripts/cron_audit.py --disable
# Delete dead jobs permanently
python3 scripts/cron_audit.py --delete
# Custom threshold: dead = 0 completions after N days
python3 scripts/cron_audit.py --min-age 3
# JSON output
python3 scripts/cron_audit.py --json
"""
import argparse
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
# hermes cron is a sibling module
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
try:
from cron.jobs import (
list_jobs,
load_jobs,
save_jobs,
update_job,
remove_job,
)
except ImportError:
# Fallback: direct file access
JOBS_FILE = Path.home() / ".hermes" / "cron" / "jobs.json"
def load_jobs() -> list:
if JOBS_FILE.exists():
return json.loads(JOBS_FILE.read_text())
return []
def save_jobs(jobs: list):
JOBS_FILE.parent.mkdir(parents=True, exist_ok=True)
JOBS_FILE.write_text(json.dumps(jobs, indent=2))
def list_jobs(include_disabled=False):
jobs = load_jobs()
if not include_disabled:
jobs = [j for j in jobs if j.get("enabled", True)]
return jobs
def update_job(job_id, updates):
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
job.update(updates)
save_jobs(jobs)
return job
return None
def remove_job(job_id):
jobs = load_jobs()
original = len(jobs)
jobs = [j for j in jobs if j["id"] != job_id]
if len(jobs) < original:
save_jobs(jobs)
return True
return False
# ── Analysis ──────────────────────────────────────────────────────────────
def parse_schedule_display(job: dict) -> str:
"""Get human-readable schedule from job."""
sched = job.get("schedule", {})
if isinstance(sched, dict):
return sched.get("display", job.get("schedule_display", "?"))
return str(sched)
def get_last_run_age(job: dict) -> Optional[timedelta]:
"""Get time since last run."""
last = job.get("last_run_at")
if not last:
return None
try:
if isinstance(last, str):
last_dt = datetime.fromisoformat(last.replace("Z", "+00:00"))
else:
return None
return datetime.now(timezone.utc) - last_dt
except (ValueError, TypeError):
return None
def analyze_jobs(min_age_days: int = 0, stale_days: int = 0) -> Dict[str, List[dict]]:
"""Analyze all jobs and categorize them.
Returns dict with keys: dead, stale, healthy, disabled, completed.
"""
all_jobs = list_jobs(include_disabled=True)
now = datetime.now(timezone.utc)
result = {"dead": [], "stale": [], "healthy": [], "disabled": [], "completed": []}
for job in all_jobs:
job_id = job.get("id", "?")
name = job.get("name", job_id)
enabled = job.get("enabled", True)
state = job.get("state", "scheduled")
completed = job.get("repeat", {}).get("completed", 0)
schedule = parse_schedule_display(job)
last_run = job.get("last_run_at")
last_status = job.get("last_status", "never")
last_error = job.get("last_error")
created = job.get("created_at", "")
# Calculate age
age_days = 0
if created:
try:
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
age_days = (now - created_dt).days
except (ValueError, TypeError):
pass
last_age = get_last_run_age(job)
entry = {
"id": job_id,
"name": name,
"schedule": schedule,
"enabled": enabled,
"state": state,
"completed": completed,
"last_run_at": last_run,
"last_status": last_status,
"last_error": last_error,
"age_days": age_days,
"last_run_age_days": last_age.days if last_age else None,
}
if not enabled or state == "completed":
result["disabled"].append(entry)
elif completed == 0 and age_days >= min_age_days:
result["dead"].append(entry)
elif stale_days > 0 and last_age and last_age.days >= stale_days:
result["stale"].append(entry)
else:
result["healthy"].append(entry)
return result
# ── Actions ───────────────────────────────────────────────────────────────
def disable_jobs(jobs: List[dict]) -> int:
"""Disable dead/stale jobs (pause them)."""
count = 0
for j in jobs:
result = update_job(j["id"], {"enabled": False, "state": "paused"})
if result:
count += 1
print(f" DISABLED: {j['name']} ({j['schedule']})")
return count
def delete_jobs(jobs: List[dict]) -> int:
"""Permanently delete jobs."""
count = 0
for j in jobs:
if remove_job(j["id"]):
count += 1
print(f" DELETED: {j['name']} ({j['schedule']})")
return count
# ── Report ────────────────────────────────────────────────────────────────
def print_table(jobs: List[dict], title: str):
"""Print a table of jobs."""
if not jobs:
return
print(f"
{title} ({len(jobs)}):")
print(f" {'Name':<35} {'Schedule':<15} {'Completed':<10} {'Last Run':<15} {'Status'}")
print(f" {'-'*35} {'-'*15} {'-'*10} {'-'*15} {'-'*10}")
for j in jobs:
last_run = "never"
if j["last_run_age_days"] is not None:
last_run = f"{j['last_run_age_days']}d ago"
elif j["last_run_at"]:
last_run = j["last_run_at"][:10]
status = j["last_status"] or "never"
print(f" {j['name']:<35} {j['schedule']:<15} {j['completed']:<10} {last_run:<15} {status}")
# ── CLI ───────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Cron audit & cleanup")
parser.add_argument("--all", action="store_true", help="Show all jobs including healthy")
parser.add_argument("--older-than", type=int, default=0,
help="Stale threshold in days (jobs with no runs in N days)")
parser.add_argument("--min-age", type=int, default=0,
help="Minimum job age in days to be considered dead")
parser.add_argument("--disable", action="store_true", help="Disable dead jobs")
parser.add_argument("--delete", action="store_true", help="Delete dead jobs permanently")
parser.add_argument("--json", dest="json_output", action="store_true", help="JSON output")
args = parser.parse_args()
analysis = analyze_jobs(min_age_days=args.min_age, stale_days=args.older_than)
if args.json_output:
print(json.dumps(analysis, indent=2))
return
# Summary
total = sum(len(v) for v in analysis.values())
print(f"Cron Audit — {total} total jobs")
print(f" Healthy: {len(analysis['healthy'])}")
print(f" Dead: {len(analysis['dead'])}")
print(f" Stale: {len(analysis['stale'])}")
print(f" Disabled: {len(analysis['disabled'])}")
print(f" Completed: {len(analysis['completed'])}")
if args.all:
print_table(analysis["healthy"], "HEALTHY")
print_table(analysis["dead"], "DEAD (zero completions)")
print_table(analysis["stale"], "STALE (no recent runs)")
if not args.disable and not args.delete:
if analysis["dead"] or analysis["stale"]:
print(f"
To clean up: --disable (pause) or --delete (permanent)")
return
targets = analysis["dead"] + analysis["stale"]
if not targets:
print("
Nothing to clean up.")
return
if args.delete:
confirm = input(f"
Delete {len(targets)} jobs permanently? [y/N] ")
if confirm.lower() != "y":
print("Aborted.")
return
count = delete_jobs(targets)
print(f"
Deleted {count} jobs.")
elif args.disable:
count = disable_jobs(targets)
print(f"
Disabled {count} jobs.")
if __name__ == "__main__":
main()

143
tests/test_cron_audit.py Normal file
View File

@@ -0,0 +1,143 @@
"""Tests for cron audit script."""
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "scripts"))
# ---------------------------------------------------------------------------
# Test data
# ---------------------------------------------------------------------------
def _make_job(name, job_id=None, completed=0, enabled=True, state="scheduled",
last_run=None, last_status=None, created=None, schedule="every 1h"):
return {
"id": job_id or f"job-{name}",
"name": name,
"enabled": enabled,
"state": state,
"schedule": {"display": schedule} if isinstance(schedule, str) else schedule,
"repeat": {"completed": completed},
"last_run_at": last_run,
"last_status": last_status,
"created_at": created or datetime.now(timezone.utc).isoformat(),
}
# ---------------------------------------------------------------------------
# analyze_jobs
# ---------------------------------------------------------------------------
class TestAnalyzeJobs:
@patch("cron_audit.list_jobs")
def test_dead_job_detected(self, mock_list):
"""Job with 0 completions and old creation date is dead."""
old_date = (datetime.now(timezone.utc) - timedelta(days=5)).isoformat()
mock_list.return_value = [
_make_job("dead-job", completed=0, created=old_date),
]
from cron_audit import analyze_jobs
result = analyze_jobs(min_age_days=1)
assert len(result["dead"]) == 1
assert result["dead"][0]["name"] == "dead-job"
@patch("cron_audit.list_jobs")
def test_healthy_job_not_dead(self, mock_list):
"""Job with completions is healthy."""
mock_list.return_value = [
_make_job("good-job", completed=42, last_run=datetime.now(timezone.utc).isoformat()),
]
from cron_audit import analyze_jobs
result = analyze_jobs()
assert len(result["dead"]) == 0
assert len(result["healthy"]) == 1
@patch("cron_audit.list_jobs")
def test_disabled_job_categorized(self, mock_list):
"""Disabled job goes to disabled category."""
mock_list.return_value = [
_make_job("paused-job", enabled=False, state="paused"),
]
from cron_audit import analyze_jobs
result = analyze_jobs()
assert len(result["disabled"]) == 1
@patch("cron_audit.list_jobs")
def test_stale_job_detected(self, mock_list):
"""Job with last run > N days ago is stale."""
old_run = (datetime.now(timezone.utc) - timedelta(days=10)).isoformat()
mock_list.return_value = [
_make_job("stale-job", completed=5, last_run=old_run),
]
from cron_audit import analyze_jobs
result = analyze_jobs(stale_days=7)
assert len(result["stale"]) == 1
@patch("cron_audit.list_jobs")
def test_completed_job_disabled(self, mock_list):
"""Job with state=completed goes to disabled."""
mock_list.return_value = [
_make_job("done-job", completed=100, state="completed", enabled=False),
]
from cron_audit import analyze_jobs
result = analyze_jobs()
assert len(result["disabled"]) == 1
@patch("cron_audit.list_jobs")
def test_empty_jobs(self, mock_list):
"""No jobs returns empty categories."""
mock_list.return_value = []
from cron_audit import analyze_jobs
result = analyze_jobs()
for v in result.values():
assert len(v) == 0
# ---------------------------------------------------------------------------
# get_last_run_age
# ---------------------------------------------------------------------------
class TestGetLastRunAge:
def test_returns_timedelta(self):
from cron_audit import get_last_run_age
recent = (datetime.now(timezone.utc) - timedelta(hours=2)).isoformat()
age = get_last_run_age({"last_run_at": recent})
assert age is not None
assert age.days == 0
assert age.seconds >= 7000 # ~2 hours
def test_returns_none_for_no_run(self):
from cron_audit import get_last_run_age
assert get_last_run_age({"last_run_at": None}) is None
assert get_last_run_age({}) is None
# ---------------------------------------------------------------------------
# disable_jobs / delete_jobs
# ---------------------------------------------------------------------------
class TestJobActions:
@patch("cron_audit.update_job")
def test_disable_calls_update(self, mock_update):
mock_update.return_value = {"id": "x"}
from cron_audit import disable_jobs
jobs = [{"id": "j1", "name": "test", "schedule": "1h"}]
count = disable_jobs(jobs)
assert count == 1
mock_update.assert_called_once_with("j1", {"enabled": False, "state": "paused"})
@patch("cron_audit.remove_job")
def test_delete_calls_remove(self, mock_remove):
mock_remove.return_value = True
from cron_audit import delete_jobs
jobs = [{"id": "j1", "name": "test", "schedule": "1h"}]
count = delete_jobs(jobs)
assert count == 1
mock_remove.assert_called_once_with("j1")

View File

@@ -1,67 +0,0 @@
"""
Tests for tool hallucination detection (#922).
"""
import pytest
from tools.tool_validator import ToolHallucinationDetector, ValidationSeverity
class TestToolHallucinationDetector:
def setup_method(self):
self.detector = ToolHallucinationDetector()
self.detector.register_tool("read_file", {
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"encoding": {"type": "string"},
},
"required": ["path"]
}
})
def test_valid_tool_call(self):
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt"})
assert result.valid is True
assert len(result.blocking_issues) == 0
def test_unknown_tool(self):
result = self.detector.validate_tool_call("hallucinated_tool", {})
assert result.valid is False
assert any(i.code == "UNKNOWN_TOOL" for i in result.issues)
def test_missing_required_param(self):
result = self.detector.validate_tool_call("read_file", {})
assert result.valid is False
assert any(i.code == "MISSING_REQUIRED" for i in result.issues)
def test_wrong_type(self):
result = self.detector.validate_tool_call("read_file", {"path": 123})
assert result.valid is False
assert any(i.code == "WRONG_TYPE" for i in result.issues)
def test_unknown_param_warning(self):
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt", "unknown": "value"})
assert result.valid is True # Warning, not blocking
assert any(i.code == "UNKNOWN_PARAM" for i in result.issues)
def test_placeholder_detection(self):
result = self.detector.validate_tool_call("read_file", {"path": "<placeholder>"})
assert any(i.code == "PLACEHOLDER_VALUE" for i in result.issues)
def test_rejection_stats(self):
self.detector.validate_tool_call("unknown_tool", {})
self.detector.validate_tool_call("read_file", {})
stats = self.detector.get_rejection_stats()
assert stats["total"] >= 2
def test_rejection_response(self):
from tools.tool_validator import create_rejection_response
result = self.detector.validate_tool_call("unknown_tool", {})
response = create_rejection_response(result)
assert response["role"] == "tool"
assert "rejected" in response["content"].lower()
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try:

View File

@@ -1,312 +0,0 @@
"""
Poka-Yoke: Tool Hallucination Detection — #922.
Validation firewall between LLM tool-call output and actual execution.
Detects and blocks:
1. Unknown tool names (hallucinated tools)
2. Malformed parameters (wrong types)
3. Missing required arguments
4. Extra unknown parameters
Poka-Yoke Type: Detection (catches errors at the boundary before harm)
"""
import json
import logging
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
class ValidationSeverity(Enum):
"""Severity of validation failure."""
BLOCK = "block" # Must block execution
WARN = "warn" # Warning, may proceed
INFO = "info" # Informational
@dataclass
class ValidationIssue:
"""A validation issue found."""
severity: ValidationSeverity
code: str
message: str
tool_name: str
parameter: Optional[str] = None
expected: Optional[str] = None
actual: Optional[Any] = None
@dataclass
class ValidationResult:
"""Result of tool call validation."""
valid: bool
tool_name: str
issues: List[ValidationIssue] = field(default_factory=list)
corrected_args: Optional[Dict[str, Any]] = None
@property
def blocking_issues(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.BLOCK]
@property
def warnings(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.WARN]
class ToolHallucinationDetector:
"""
Poka-yoke detector for tool hallucinations.
Validates tool calls against registered schemas before execution.
"""
def __init__(self, tool_registry: Optional[Dict] = None):
"""
Initialize detector.
Args:
tool_registry: Dict of tool_name -> tool_schema
"""
self.registry = tool_registry or {}
self._rejection_log: List[Dict] = []
def register_tool(self, name: str, schema: Dict):
"""Register a tool with its JSON Schema."""
self.registry[name] = schema
def register_tools(self, tools: Dict[str, Dict]):
"""Register multiple tools."""
self.registry.update(tools)
def validate_tool_call(
self,
tool_name: str,
arguments: Dict[str, Any],
model: str = "unknown",
) -> ValidationResult:
"""
Validate a tool call against the registry.
Args:
tool_name: Name of the tool being called
arguments: Arguments passed to the tool
model: Model that generated the call (for logging)
Returns:
ValidationResult with validation status
"""
issues = []
# 1. Check if tool exists
if tool_name not in self.registry:
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="UNKNOWN_TOOL",
message=f"Tool '{tool_name}' does not exist. Available: {', '.join(sorted(self.registry.keys())[:10])}...",
tool_name=tool_name,
)
issues.append(issue)
self._log_rejection(tool_name, arguments, model, "UNKNOWN_TOOL")
return ValidationResult(valid=False, tool_name=tool_name, issues=issues)
schema = self.registry[tool_name]
params_schema = schema.get("parameters", {}).get("properties", {})
required = set(schema.get("parameters", {}).get("required", []))
# 2. Check for missing required parameters
for param in required:
if param not in arguments:
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="MISSING_REQUIRED",
message=f"Missing required parameter: {param}",
tool_name=tool_name,
parameter=param,
)
issues.append(issue)
# 3. Check parameter types
for param_name, param_value in arguments.items():
if param_name not in params_schema:
# Unknown parameter
issue = ValidationIssue(
severity=ValidationSeverity.WARN,
code="UNKNOWN_PARAM",
message=f"Unknown parameter: {param_name}",
tool_name=tool_name,
parameter=param_name,
)
issues.append(issue)
continue
param_schema = params_schema[param_name]
expected_type = param_schema.get("type")
if expected_type and not self._check_type(param_value, expected_type):
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="WRONG_TYPE",
message=f"Parameter '{param_name}' expects {expected_type}, got {type(param_value).__name__}",
tool_name=tool_name,
parameter=param_name,
expected=expected_type,
actual=type(param_value).__name__,
)
issues.append(issue)
# 4. Check for common hallucination patterns
hallucination_issues = self._detect_hallucination_patterns(tool_name, arguments)
issues.extend(hallucination_issues)
# Determine validity
has_blocking = any(i.severity == ValidationSeverity.BLOCK for i in issues)
if has_blocking:
self._log_rejection(tool_name, arguments, model,
"; ".join(i.code for i in issues if i.severity == ValidationSeverity.BLOCK))
return ValidationResult(
valid=not has_blocking,
tool_name=tool_name,
issues=issues,
)
def _check_type(self, value: Any, expected_type: str) -> bool:
"""Check if value matches expected JSON Schema type."""
type_map = {
"string": str,
"number": (int, float),
"integer": int,
"boolean": bool,
"array": list,
"object": dict,
}
expected = type_map.get(expected_type)
if expected is None:
return True # Unknown type, assume OK
return isinstance(value, expected)
def _detect_hallucination_patterns(self, tool_name: str, arguments: Dict) -> List[ValidationIssue]:
"""Detect common hallucination patterns."""
issues = []
# Pattern 1: Placeholder values
placeholder_patterns = [
r"^<.*>$", # <placeholder>
r"^\[.*\]$", # [placeholder]
r"^TODO$|^FIXME$", # TODO/FIXME
r"^example\.com$", # example.com
r"^127\.0\.0\.1$", # localhost
]
for param_name, param_value in arguments.items():
if isinstance(param_value, str):
for pattern in placeholder_patterns:
if re.match(pattern, param_value, re.IGNORECASE):
issues.append(ValidationIssue(
severity=ValidationSeverity.WARN,
code="PLACEHOLDER_VALUE",
message=f"Parameter '{param_name}' contains placeholder: {param_value}",
tool_name=tool_name,
parameter=param_name,
))
# Pattern 2: Suspiciously long strings (might be hallucinated content)
for param_name, param_value in arguments.items():
if isinstance(param_value, str) and len(param_value) > 10000:
issues.append(ValidationIssue(
severity=ValidationSeverity.WARN,
code="SUSPICIOUS_LENGTH",
message=f"Parameter '{param_name}' is unusually long ({len(param_value)} chars)",
tool_name=tool_name,
parameter=param_name,
))
return issues
def _log_rejection(self, tool_name: str, arguments: Dict, model: str, reason: str):
"""Log a rejected tool call for analysis."""
import time
entry = {
"timestamp": time.time(),
"tool_name": tool_name,
"arguments": {k: str(v)[:100] for k, v in arguments.items()},
"model": model,
"reason": reason,
}
self._rejection_log.append(entry)
# Keep log bounded
if len(self._rejection_log) > 1000:
self._rejection_log = self._rejection_log[-500:]
logger.warning(
"Tool hallucination blocked: tool=%s, model=%s, reason=%s",
tool_name, model, reason
)
def get_rejection_stats(self) -> Dict:
"""Get statistics on rejected tool calls."""
if not self._rejection_log:
return {"total": 0, "by_reason": {}, "by_tool": {}}
by_reason = {}
by_tool = {}
for entry in self._rejection_log:
reason = entry["reason"]
tool = entry["tool_name"]
by_reason[reason] = by_reason.get(reason, 0) + 1
by_tool[tool] = by_tool.get(tool, 0) + 1
return {
"total": len(self._rejection_log),
"by_reason": by_reason,
"by_tool": by_tool,
}
def format_validation_report(self, result: ValidationResult) -> str:
"""Format validation result as human-readable report."""
if result.valid:
return f"{result.tool_name}: valid"
lines = [f"{result.tool_name}: BLOCKED"]
for issue in result.blocking_issues:
lines.append(f" [{issue.code}] {issue.message}")
for issue in result.warnings:
lines.append(f" ⚠️ [{issue.code}] {issue.message}")
return "\n".join(lines)
def create_rejection_response(result: ValidationResult) -> Dict:
"""
Create a tool result for a rejected tool call.
This allows the agent to see the rejection and self-correct.
"""
issues_text = "\n".join(
f"- [{i.code}] {i.message}"
for i in result.blocking_issues
)
return {
"role": "tool",
"content": f"""Tool call rejected: {result.tool_name}
Issues found:
{issues_text}
Please check the tool name and parameters, then try again with valid arguments.""",
}