Compare commits
3 Commits
fix/922
...
fix/902-cr
| Author | SHA1 | Date | |
|---|---|---|---|
| c8186b1d11 | |||
| db69ef8e3f | |||
| 64dd3fda01 |
50
docs/cron-audit-890.md
Normal file
50
docs/cron-audit-890.md
Normal file
@@ -0,0 +1,50 @@
|
||||
# Cron Audit & Cleanup
|
||||
|
||||
Identifies and removes dead/stale cron jobs that waste scheduler cycles.
|
||||
|
||||
## Problem
|
||||
|
||||
Of 69 cron jobs, 9 had zero completions — running on schedule but never producing results. Each execution costs API tokens and scheduler cycles.
|
||||
|
||||
## Dead Jobs Found (2026-04-14)
|
||||
|
||||
| Job | Schedule | Completions |
|
||||
|-----|----------|-------------|
|
||||
| exp-swarm-pipeline | every 10 min | 0 |
|
||||
| exp-music-generator | every 2h | 0 |
|
||||
| exp-paper-citations | every 3h | 0 |
|
||||
| exp-gbrain-patterns | every 2h | 0 |
|
||||
| exp-infra-hardening | every 2h | 0 |
|
||||
| gemma4-multimodal-burn | every 1h | 0 |
|
||||
| morning-paper-report | daily | 0 |
|
||||
| overnight-collector | every 15 min | 0 |
|
||||
| morning-experiment-report | daily | 0 |
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
# Show dead jobs (zero completions)
|
||||
python3 scripts/cron_audit.py
|
||||
|
||||
# Show all jobs with status
|
||||
python3 scripts/cron_audit.py --all
|
||||
|
||||
# Find stale jobs (no runs in 7 days)
|
||||
python3 scripts/cron_audit.py --older-than 7
|
||||
|
||||
# Disable dead jobs (pause them)
|
||||
python3 scripts/cron_audit.py --disable
|
||||
|
||||
# Delete dead jobs permanently
|
||||
python3 scripts/cron_audit.py --delete
|
||||
|
||||
# JSON output for automation
|
||||
python3 scripts/cron_audit.py --json
|
||||
```
|
||||
|
||||
## Job Categories
|
||||
|
||||
- **Healthy**: enabled, has completions, ran recently
|
||||
- **Dead**: enabled, zero completions, exists for N+ days
|
||||
- **Stale**: enabled, no successful run in N days
|
||||
- **Disabled**: paused or completed
|
||||
279
scripts/cron_audit.py
Normal file
279
scripts/cron_audit.py
Normal file
@@ -0,0 +1,279 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Cron Audit & Cleanup — find and remove dead/stale cron jobs.
|
||||
|
||||
Identifies jobs that waste scheduler cycles:
|
||||
- Dead jobs: zero completions despite running for days
|
||||
- Stale jobs: no successful run in N days
|
||||
- Error jobs: high error ratio
|
||||
|
||||
Usage:
|
||||
# Show dead jobs (zero completions)
|
||||
python3 scripts/cron_audit.py
|
||||
|
||||
# Show stale jobs (no runs in 7 days)
|
||||
python3 scripts/cron_audit.py --older-than 7
|
||||
|
||||
# Show all jobs with status
|
||||
python3 scripts/cron_audit.py --all
|
||||
|
||||
# Disable dead jobs (sets enabled=False, state=paused)
|
||||
python3 scripts/cron_audit.py --disable
|
||||
|
||||
# Delete dead jobs permanently
|
||||
python3 scripts/cron_audit.py --delete
|
||||
|
||||
# Custom threshold: dead = 0 completions after N days
|
||||
python3 scripts/cron_audit.py --min-age 3
|
||||
|
||||
# JSON output
|
||||
python3 scripts/cron_audit.py --json
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
# hermes cron is a sibling module
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
|
||||
|
||||
try:
|
||||
from cron.jobs import (
|
||||
list_jobs,
|
||||
load_jobs,
|
||||
save_jobs,
|
||||
update_job,
|
||||
remove_job,
|
||||
)
|
||||
except ImportError:
|
||||
# Fallback: direct file access
|
||||
JOBS_FILE = Path.home() / ".hermes" / "cron" / "jobs.json"
|
||||
|
||||
def load_jobs() -> list:
|
||||
if JOBS_FILE.exists():
|
||||
return json.loads(JOBS_FILE.read_text())
|
||||
return []
|
||||
|
||||
def save_jobs(jobs: list):
|
||||
JOBS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
JOBS_FILE.write_text(json.dumps(jobs, indent=2))
|
||||
|
||||
def list_jobs(include_disabled=False):
|
||||
jobs = load_jobs()
|
||||
if not include_disabled:
|
||||
jobs = [j for j in jobs if j.get("enabled", True)]
|
||||
return jobs
|
||||
|
||||
def update_job(job_id, updates):
|
||||
jobs = load_jobs()
|
||||
for job in jobs:
|
||||
if job["id"] == job_id:
|
||||
job.update(updates)
|
||||
save_jobs(jobs)
|
||||
return job
|
||||
return None
|
||||
|
||||
def remove_job(job_id):
|
||||
jobs = load_jobs()
|
||||
original = len(jobs)
|
||||
jobs = [j for j in jobs if j["id"] != job_id]
|
||||
if len(jobs) < original:
|
||||
save_jobs(jobs)
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
# ── Analysis ──────────────────────────────────────────────────────────────
|
||||
|
||||
def parse_schedule_display(job: dict) -> str:
|
||||
"""Get human-readable schedule from job."""
|
||||
sched = job.get("schedule", {})
|
||||
if isinstance(sched, dict):
|
||||
return sched.get("display", job.get("schedule_display", "?"))
|
||||
return str(sched)
|
||||
|
||||
|
||||
def get_last_run_age(job: dict) -> Optional[timedelta]:
|
||||
"""Get time since last run."""
|
||||
last = job.get("last_run_at")
|
||||
if not last:
|
||||
return None
|
||||
try:
|
||||
if isinstance(last, str):
|
||||
last_dt = datetime.fromisoformat(last.replace("Z", "+00:00"))
|
||||
else:
|
||||
return None
|
||||
return datetime.now(timezone.utc) - last_dt
|
||||
except (ValueError, TypeError):
|
||||
return None
|
||||
|
||||
|
||||
def analyze_jobs(min_age_days: int = 0, stale_days: int = 0) -> Dict[str, List[dict]]:
|
||||
"""Analyze all jobs and categorize them.
|
||||
|
||||
Returns dict with keys: dead, stale, healthy, disabled, completed.
|
||||
"""
|
||||
all_jobs = list_jobs(include_disabled=True)
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
result = {"dead": [], "stale": [], "healthy": [], "disabled": [], "completed": []}
|
||||
|
||||
for job in all_jobs:
|
||||
job_id = job.get("id", "?")
|
||||
name = job.get("name", job_id)
|
||||
enabled = job.get("enabled", True)
|
||||
state = job.get("state", "scheduled")
|
||||
completed = job.get("repeat", {}).get("completed", 0)
|
||||
schedule = parse_schedule_display(job)
|
||||
last_run = job.get("last_run_at")
|
||||
last_status = job.get("last_status", "never")
|
||||
last_error = job.get("last_error")
|
||||
created = job.get("created_at", "")
|
||||
|
||||
# Calculate age
|
||||
age_days = 0
|
||||
if created:
|
||||
try:
|
||||
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
|
||||
age_days = (now - created_dt).days
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
|
||||
last_age = get_last_run_age(job)
|
||||
|
||||
entry = {
|
||||
"id": job_id,
|
||||
"name": name,
|
||||
"schedule": schedule,
|
||||
"enabled": enabled,
|
||||
"state": state,
|
||||
"completed": completed,
|
||||
"last_run_at": last_run,
|
||||
"last_status": last_status,
|
||||
"last_error": last_error,
|
||||
"age_days": age_days,
|
||||
"last_run_age_days": last_age.days if last_age else None,
|
||||
}
|
||||
|
||||
if not enabled or state == "completed":
|
||||
result["disabled"].append(entry)
|
||||
elif completed == 0 and age_days >= min_age_days:
|
||||
result["dead"].append(entry)
|
||||
elif stale_days > 0 and last_age and last_age.days >= stale_days:
|
||||
result["stale"].append(entry)
|
||||
else:
|
||||
result["healthy"].append(entry)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── Actions ───────────────────────────────────────────────────────────────
|
||||
|
||||
def disable_jobs(jobs: List[dict]) -> int:
|
||||
"""Disable dead/stale jobs (pause them)."""
|
||||
count = 0
|
||||
for j in jobs:
|
||||
result = update_job(j["id"], {"enabled": False, "state": "paused"})
|
||||
if result:
|
||||
count += 1
|
||||
print(f" DISABLED: {j['name']} ({j['schedule']})")
|
||||
return count
|
||||
|
||||
|
||||
def delete_jobs(jobs: List[dict]) -> int:
|
||||
"""Permanently delete jobs."""
|
||||
count = 0
|
||||
for j in jobs:
|
||||
if remove_job(j["id"]):
|
||||
count += 1
|
||||
print(f" DELETED: {j['name']} ({j['schedule']})")
|
||||
return count
|
||||
|
||||
|
||||
# ── Report ────────────────────────────────────────────────────────────────
|
||||
|
||||
def print_table(jobs: List[dict], title: str):
|
||||
"""Print a table of jobs."""
|
||||
if not jobs:
|
||||
return
|
||||
print(f"
|
||||
{title} ({len(jobs)}):")
|
||||
print(f" {'Name':<35} {'Schedule':<15} {'Completed':<10} {'Last Run':<15} {'Status'}")
|
||||
print(f" {'-'*35} {'-'*15} {'-'*10} {'-'*15} {'-'*10}")
|
||||
for j in jobs:
|
||||
last_run = "never"
|
||||
if j["last_run_age_days"] is not None:
|
||||
last_run = f"{j['last_run_age_days']}d ago"
|
||||
elif j["last_run_at"]:
|
||||
last_run = j["last_run_at"][:10]
|
||||
status = j["last_status"] or "never"
|
||||
print(f" {j['name']:<35} {j['schedule']:<15} {j['completed']:<10} {last_run:<15} {status}")
|
||||
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Cron audit & cleanup")
|
||||
parser.add_argument("--all", action="store_true", help="Show all jobs including healthy")
|
||||
parser.add_argument("--older-than", type=int, default=0,
|
||||
help="Stale threshold in days (jobs with no runs in N days)")
|
||||
parser.add_argument("--min-age", type=int, default=0,
|
||||
help="Minimum job age in days to be considered dead")
|
||||
parser.add_argument("--disable", action="store_true", help="Disable dead jobs")
|
||||
parser.add_argument("--delete", action="store_true", help="Delete dead jobs permanently")
|
||||
parser.add_argument("--json", dest="json_output", action="store_true", help="JSON output")
|
||||
args = parser.parse_args()
|
||||
|
||||
analysis = analyze_jobs(min_age_days=args.min_age, stale_days=args.older_than)
|
||||
|
||||
if args.json_output:
|
||||
print(json.dumps(analysis, indent=2))
|
||||
return
|
||||
|
||||
# Summary
|
||||
total = sum(len(v) for v in analysis.values())
|
||||
print(f"Cron Audit — {total} total jobs")
|
||||
print(f" Healthy: {len(analysis['healthy'])}")
|
||||
print(f" Dead: {len(analysis['dead'])}")
|
||||
print(f" Stale: {len(analysis['stale'])}")
|
||||
print(f" Disabled: {len(analysis['disabled'])}")
|
||||
print(f" Completed: {len(analysis['completed'])}")
|
||||
|
||||
if args.all:
|
||||
print_table(analysis["healthy"], "HEALTHY")
|
||||
print_table(analysis["dead"], "DEAD (zero completions)")
|
||||
print_table(analysis["stale"], "STALE (no recent runs)")
|
||||
|
||||
if not args.disable and not args.delete:
|
||||
if analysis["dead"] or analysis["stale"]:
|
||||
print(f"
|
||||
To clean up: --disable (pause) or --delete (permanent)")
|
||||
return
|
||||
|
||||
targets = analysis["dead"] + analysis["stale"]
|
||||
if not targets:
|
||||
print("
|
||||
Nothing to clean up.")
|
||||
return
|
||||
|
||||
if args.delete:
|
||||
confirm = input(f"
|
||||
Delete {len(targets)} jobs permanently? [y/N] ")
|
||||
if confirm.lower() != "y":
|
||||
print("Aborted.")
|
||||
return
|
||||
count = delete_jobs(targets)
|
||||
print(f"
|
||||
Deleted {count} jobs.")
|
||||
elif args.disable:
|
||||
count = disable_jobs(targets)
|
||||
print(f"
|
||||
Disabled {count} jobs.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
143
tests/test_cron_audit.py
Normal file
143
tests/test_cron_audit.py
Normal file
@@ -0,0 +1,143 @@
|
||||
"""Tests for cron audit script."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "scripts"))
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test data
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_job(name, job_id=None, completed=0, enabled=True, state="scheduled",
|
||||
last_run=None, last_status=None, created=None, schedule="every 1h"):
|
||||
return {
|
||||
"id": job_id or f"job-{name}",
|
||||
"name": name,
|
||||
"enabled": enabled,
|
||||
"state": state,
|
||||
"schedule": {"display": schedule} if isinstance(schedule, str) else schedule,
|
||||
"repeat": {"completed": completed},
|
||||
"last_run_at": last_run,
|
||||
"last_status": last_status,
|
||||
"created_at": created or datetime.now(timezone.utc).isoformat(),
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# analyze_jobs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAnalyzeJobs:
|
||||
@patch("cron_audit.list_jobs")
|
||||
def test_dead_job_detected(self, mock_list):
|
||||
"""Job with 0 completions and old creation date is dead."""
|
||||
old_date = (datetime.now(timezone.utc) - timedelta(days=5)).isoformat()
|
||||
mock_list.return_value = [
|
||||
_make_job("dead-job", completed=0, created=old_date),
|
||||
]
|
||||
from cron_audit import analyze_jobs
|
||||
result = analyze_jobs(min_age_days=1)
|
||||
assert len(result["dead"]) == 1
|
||||
assert result["dead"][0]["name"] == "dead-job"
|
||||
|
||||
@patch("cron_audit.list_jobs")
|
||||
def test_healthy_job_not_dead(self, mock_list):
|
||||
"""Job with completions is healthy."""
|
||||
mock_list.return_value = [
|
||||
_make_job("good-job", completed=42, last_run=datetime.now(timezone.utc).isoformat()),
|
||||
]
|
||||
from cron_audit import analyze_jobs
|
||||
result = analyze_jobs()
|
||||
assert len(result["dead"]) == 0
|
||||
assert len(result["healthy"]) == 1
|
||||
|
||||
@patch("cron_audit.list_jobs")
|
||||
def test_disabled_job_categorized(self, mock_list):
|
||||
"""Disabled job goes to disabled category."""
|
||||
mock_list.return_value = [
|
||||
_make_job("paused-job", enabled=False, state="paused"),
|
||||
]
|
||||
from cron_audit import analyze_jobs
|
||||
result = analyze_jobs()
|
||||
assert len(result["disabled"]) == 1
|
||||
|
||||
@patch("cron_audit.list_jobs")
|
||||
def test_stale_job_detected(self, mock_list):
|
||||
"""Job with last run > N days ago is stale."""
|
||||
old_run = (datetime.now(timezone.utc) - timedelta(days=10)).isoformat()
|
||||
mock_list.return_value = [
|
||||
_make_job("stale-job", completed=5, last_run=old_run),
|
||||
]
|
||||
from cron_audit import analyze_jobs
|
||||
result = analyze_jobs(stale_days=7)
|
||||
assert len(result["stale"]) == 1
|
||||
|
||||
@patch("cron_audit.list_jobs")
|
||||
def test_completed_job_disabled(self, mock_list):
|
||||
"""Job with state=completed goes to disabled."""
|
||||
mock_list.return_value = [
|
||||
_make_job("done-job", completed=100, state="completed", enabled=False),
|
||||
]
|
||||
from cron_audit import analyze_jobs
|
||||
result = analyze_jobs()
|
||||
assert len(result["disabled"]) == 1
|
||||
|
||||
@patch("cron_audit.list_jobs")
|
||||
def test_empty_jobs(self, mock_list):
|
||||
"""No jobs returns empty categories."""
|
||||
mock_list.return_value = []
|
||||
from cron_audit import analyze_jobs
|
||||
result = analyze_jobs()
|
||||
for v in result.values():
|
||||
assert len(v) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_last_run_age
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestGetLastRunAge:
|
||||
def test_returns_timedelta(self):
|
||||
from cron_audit import get_last_run_age
|
||||
recent = (datetime.now(timezone.utc) - timedelta(hours=2)).isoformat()
|
||||
age = get_last_run_age({"last_run_at": recent})
|
||||
assert age is not None
|
||||
assert age.days == 0
|
||||
assert age.seconds >= 7000 # ~2 hours
|
||||
|
||||
def test_returns_none_for_no_run(self):
|
||||
from cron_audit import get_last_run_age
|
||||
assert get_last_run_age({"last_run_at": None}) is None
|
||||
assert get_last_run_age({}) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# disable_jobs / delete_jobs
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestJobActions:
|
||||
@patch("cron_audit.update_job")
|
||||
def test_disable_calls_update(self, mock_update):
|
||||
mock_update.return_value = {"id": "x"}
|
||||
from cron_audit import disable_jobs
|
||||
jobs = [{"id": "j1", "name": "test", "schedule": "1h"}]
|
||||
count = disable_jobs(jobs)
|
||||
assert count == 1
|
||||
mock_update.assert_called_once_with("j1", {"enabled": False, "state": "paused"})
|
||||
|
||||
@patch("cron_audit.remove_job")
|
||||
def test_delete_calls_remove(self, mock_remove):
|
||||
mock_remove.return_value = True
|
||||
from cron_audit import delete_jobs
|
||||
jobs = [{"id": "j1", "name": "test", "schedule": "1h"}]
|
||||
count = delete_jobs(jobs)
|
||||
assert count == 1
|
||||
mock_remove.assert_called_once_with("j1")
|
||||
@@ -1,67 +0,0 @@
|
||||
"""
|
||||
Tests for tool hallucination detection (#922).
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from tools.tool_validator import ToolHallucinationDetector, ValidationSeverity
|
||||
|
||||
|
||||
class TestToolHallucinationDetector:
|
||||
def setup_method(self):
|
||||
self.detector = ToolHallucinationDetector()
|
||||
self.detector.register_tool("read_file", {
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string"},
|
||||
"encoding": {"type": "string"},
|
||||
},
|
||||
"required": ["path"]
|
||||
}
|
||||
})
|
||||
|
||||
def test_valid_tool_call(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt"})
|
||||
assert result.valid is True
|
||||
assert len(result.blocking_issues) == 0
|
||||
|
||||
def test_unknown_tool(self):
|
||||
result = self.detector.validate_tool_call("hallucinated_tool", {})
|
||||
assert result.valid is False
|
||||
assert any(i.code == "UNKNOWN_TOOL" for i in result.issues)
|
||||
|
||||
def test_missing_required_param(self):
|
||||
result = self.detector.validate_tool_call("read_file", {})
|
||||
assert result.valid is False
|
||||
assert any(i.code == "MISSING_REQUIRED" for i in result.issues)
|
||||
|
||||
def test_wrong_type(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": 123})
|
||||
assert result.valid is False
|
||||
assert any(i.code == "WRONG_TYPE" for i in result.issues)
|
||||
|
||||
def test_unknown_param_warning(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt", "unknown": "value"})
|
||||
assert result.valid is True # Warning, not blocking
|
||||
assert any(i.code == "UNKNOWN_PARAM" for i in result.issues)
|
||||
|
||||
def test_placeholder_detection(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": "<placeholder>"})
|
||||
assert any(i.code == "PLACEHOLDER_VALUE" for i in result.issues)
|
||||
|
||||
def test_rejection_stats(self):
|
||||
self.detector.validate_tool_call("unknown_tool", {})
|
||||
self.detector.validate_tool_call("read_file", {})
|
||||
stats = self.detector.get_rejection_stats()
|
||||
assert stats["total"] >= 2
|
||||
|
||||
def test_rejection_response(self):
|
||||
from tools.tool_validator import create_rejection_response
|
||||
result = self.detector.validate_tool_call("unknown_tool", {})
|
||||
response = create_rejection_response(result)
|
||||
assert response["role"] == "tool"
|
||||
assert "rejected" in response["content"].lower()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_error(
|
||||
message: str,
|
||||
skill_name: str = None,
|
||||
file_path: str = None,
|
||||
suggestion: str = None,
|
||||
context: dict = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Format an error with rich context for better debugging."""
|
||||
parts = [message]
|
||||
if skill_name:
|
||||
parts.append(f"Skill: {skill_name}")
|
||||
if file_path:
|
||||
parts.append(f"File: {file_path}")
|
||||
if suggestion:
|
||||
parts.append(f"Suggestion: {suggestion}")
|
||||
if context:
|
||||
for key, value in context.items():
|
||||
parts.append(f"{key}: {value}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": " | ".join(parts),
|
||||
"skill_name": skill_name,
|
||||
"file_path": file_path,
|
||||
"suggestion": suggestion,
|
||||
}
|
||||
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
|
||||
@@ -1,312 +0,0 @@
|
||||
"""
|
||||
Poka-Yoke: Tool Hallucination Detection — #922.
|
||||
|
||||
Validation firewall between LLM tool-call output and actual execution.
|
||||
|
||||
Detects and blocks:
|
||||
1. Unknown tool names (hallucinated tools)
|
||||
2. Malformed parameters (wrong types)
|
||||
3. Missing required arguments
|
||||
4. Extra unknown parameters
|
||||
|
||||
Poka-Yoke Type: Detection (catches errors at the boundary before harm)
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ValidationSeverity(Enum):
|
||||
"""Severity of validation failure."""
|
||||
BLOCK = "block" # Must block execution
|
||||
WARN = "warn" # Warning, may proceed
|
||||
INFO = "info" # Informational
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationIssue:
|
||||
"""A validation issue found."""
|
||||
severity: ValidationSeverity
|
||||
code: str
|
||||
message: str
|
||||
tool_name: str
|
||||
parameter: Optional[str] = None
|
||||
expected: Optional[str] = None
|
||||
actual: Optional[Any] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Result of tool call validation."""
|
||||
valid: bool
|
||||
tool_name: str
|
||||
issues: List[ValidationIssue] = field(default_factory=list)
|
||||
corrected_args: Optional[Dict[str, Any]] = None
|
||||
|
||||
@property
|
||||
def blocking_issues(self) -> List[ValidationIssue]:
|
||||
return [i for i in self.issues if i.severity == ValidationSeverity.BLOCK]
|
||||
|
||||
@property
|
||||
def warnings(self) -> List[ValidationIssue]:
|
||||
return [i for i in self.issues if i.severity == ValidationSeverity.WARN]
|
||||
|
||||
|
||||
class ToolHallucinationDetector:
|
||||
"""
|
||||
Poka-yoke detector for tool hallucinations.
|
||||
|
||||
Validates tool calls against registered schemas before execution.
|
||||
"""
|
||||
|
||||
def __init__(self, tool_registry: Optional[Dict] = None):
|
||||
"""
|
||||
Initialize detector.
|
||||
|
||||
Args:
|
||||
tool_registry: Dict of tool_name -> tool_schema
|
||||
"""
|
||||
self.registry = tool_registry or {}
|
||||
self._rejection_log: List[Dict] = []
|
||||
|
||||
def register_tool(self, name: str, schema: Dict):
|
||||
"""Register a tool with its JSON Schema."""
|
||||
self.registry[name] = schema
|
||||
|
||||
def register_tools(self, tools: Dict[str, Dict]):
|
||||
"""Register multiple tools."""
|
||||
self.registry.update(tools)
|
||||
|
||||
def validate_tool_call(
|
||||
self,
|
||||
tool_name: str,
|
||||
arguments: Dict[str, Any],
|
||||
model: str = "unknown",
|
||||
) -> ValidationResult:
|
||||
"""
|
||||
Validate a tool call against the registry.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool being called
|
||||
arguments: Arguments passed to the tool
|
||||
model: Model that generated the call (for logging)
|
||||
|
||||
Returns:
|
||||
ValidationResult with validation status
|
||||
"""
|
||||
issues = []
|
||||
|
||||
# 1. Check if tool exists
|
||||
if tool_name not in self.registry:
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.BLOCK,
|
||||
code="UNKNOWN_TOOL",
|
||||
message=f"Tool '{tool_name}' does not exist. Available: {', '.join(sorted(self.registry.keys())[:10])}...",
|
||||
tool_name=tool_name,
|
||||
)
|
||||
issues.append(issue)
|
||||
self._log_rejection(tool_name, arguments, model, "UNKNOWN_TOOL")
|
||||
return ValidationResult(valid=False, tool_name=tool_name, issues=issues)
|
||||
|
||||
schema = self.registry[tool_name]
|
||||
params_schema = schema.get("parameters", {}).get("properties", {})
|
||||
required = set(schema.get("parameters", {}).get("required", []))
|
||||
|
||||
# 2. Check for missing required parameters
|
||||
for param in required:
|
||||
if param not in arguments:
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.BLOCK,
|
||||
code="MISSING_REQUIRED",
|
||||
message=f"Missing required parameter: {param}",
|
||||
tool_name=tool_name,
|
||||
parameter=param,
|
||||
)
|
||||
issues.append(issue)
|
||||
|
||||
# 3. Check parameter types
|
||||
for param_name, param_value in arguments.items():
|
||||
if param_name not in params_schema:
|
||||
# Unknown parameter
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.WARN,
|
||||
code="UNKNOWN_PARAM",
|
||||
message=f"Unknown parameter: {param_name}",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
)
|
||||
issues.append(issue)
|
||||
continue
|
||||
|
||||
param_schema = params_schema[param_name]
|
||||
expected_type = param_schema.get("type")
|
||||
|
||||
if expected_type and not self._check_type(param_value, expected_type):
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.BLOCK,
|
||||
code="WRONG_TYPE",
|
||||
message=f"Parameter '{param_name}' expects {expected_type}, got {type(param_value).__name__}",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
expected=expected_type,
|
||||
actual=type(param_value).__name__,
|
||||
)
|
||||
issues.append(issue)
|
||||
|
||||
# 4. Check for common hallucination patterns
|
||||
hallucination_issues = self._detect_hallucination_patterns(tool_name, arguments)
|
||||
issues.extend(hallucination_issues)
|
||||
|
||||
# Determine validity
|
||||
has_blocking = any(i.severity == ValidationSeverity.BLOCK for i in issues)
|
||||
|
||||
if has_blocking:
|
||||
self._log_rejection(tool_name, arguments, model,
|
||||
"; ".join(i.code for i in issues if i.severity == ValidationSeverity.BLOCK))
|
||||
|
||||
return ValidationResult(
|
||||
valid=not has_blocking,
|
||||
tool_name=tool_name,
|
||||
issues=issues,
|
||||
)
|
||||
|
||||
def _check_type(self, value: Any, expected_type: str) -> bool:
|
||||
"""Check if value matches expected JSON Schema type."""
|
||||
type_map = {
|
||||
"string": str,
|
||||
"number": (int, float),
|
||||
"integer": int,
|
||||
"boolean": bool,
|
||||
"array": list,
|
||||
"object": dict,
|
||||
}
|
||||
|
||||
expected = type_map.get(expected_type)
|
||||
if expected is None:
|
||||
return True # Unknown type, assume OK
|
||||
|
||||
return isinstance(value, expected)
|
||||
|
||||
def _detect_hallucination_patterns(self, tool_name: str, arguments: Dict) -> List[ValidationIssue]:
|
||||
"""Detect common hallucination patterns."""
|
||||
issues = []
|
||||
|
||||
# Pattern 1: Placeholder values
|
||||
placeholder_patterns = [
|
||||
r"^<.*>$", # <placeholder>
|
||||
r"^\[.*\]$", # [placeholder]
|
||||
r"^TODO$|^FIXME$", # TODO/FIXME
|
||||
r"^example\.com$", # example.com
|
||||
r"^127\.0\.0\.1$", # localhost
|
||||
]
|
||||
|
||||
for param_name, param_value in arguments.items():
|
||||
if isinstance(param_value, str):
|
||||
for pattern in placeholder_patterns:
|
||||
if re.match(pattern, param_value, re.IGNORECASE):
|
||||
issues.append(ValidationIssue(
|
||||
severity=ValidationSeverity.WARN,
|
||||
code="PLACEHOLDER_VALUE",
|
||||
message=f"Parameter '{param_name}' contains placeholder: {param_value}",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
))
|
||||
|
||||
# Pattern 2: Suspiciously long strings (might be hallucinated content)
|
||||
for param_name, param_value in arguments.items():
|
||||
if isinstance(param_value, str) and len(param_value) > 10000:
|
||||
issues.append(ValidationIssue(
|
||||
severity=ValidationSeverity.WARN,
|
||||
code="SUSPICIOUS_LENGTH",
|
||||
message=f"Parameter '{param_name}' is unusually long ({len(param_value)} chars)",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
))
|
||||
|
||||
return issues
|
||||
|
||||
def _log_rejection(self, tool_name: str, arguments: Dict, model: str, reason: str):
|
||||
"""Log a rejected tool call for analysis."""
|
||||
import time
|
||||
|
||||
entry = {
|
||||
"timestamp": time.time(),
|
||||
"tool_name": tool_name,
|
||||
"arguments": {k: str(v)[:100] for k, v in arguments.items()},
|
||||
"model": model,
|
||||
"reason": reason,
|
||||
}
|
||||
|
||||
self._rejection_log.append(entry)
|
||||
|
||||
# Keep log bounded
|
||||
if len(self._rejection_log) > 1000:
|
||||
self._rejection_log = self._rejection_log[-500:]
|
||||
|
||||
logger.warning(
|
||||
"Tool hallucination blocked: tool=%s, model=%s, reason=%s",
|
||||
tool_name, model, reason
|
||||
)
|
||||
|
||||
def get_rejection_stats(self) -> Dict:
|
||||
"""Get statistics on rejected tool calls."""
|
||||
if not self._rejection_log:
|
||||
return {"total": 0, "by_reason": {}, "by_tool": {}}
|
||||
|
||||
by_reason = {}
|
||||
by_tool = {}
|
||||
|
||||
for entry in self._rejection_log:
|
||||
reason = entry["reason"]
|
||||
tool = entry["tool_name"]
|
||||
|
||||
by_reason[reason] = by_reason.get(reason, 0) + 1
|
||||
by_tool[tool] = by_tool.get(tool, 0) + 1
|
||||
|
||||
return {
|
||||
"total": len(self._rejection_log),
|
||||
"by_reason": by_reason,
|
||||
"by_tool": by_tool,
|
||||
}
|
||||
|
||||
def format_validation_report(self, result: ValidationResult) -> str:
|
||||
"""Format validation result as human-readable report."""
|
||||
if result.valid:
|
||||
return f"✅ {result.tool_name}: valid"
|
||||
|
||||
lines = [f"❌ {result.tool_name}: BLOCKED"]
|
||||
for issue in result.blocking_issues:
|
||||
lines.append(f" [{issue.code}] {issue.message}")
|
||||
|
||||
for issue in result.warnings:
|
||||
lines.append(f" ⚠️ [{issue.code}] {issue.message}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def create_rejection_response(result: ValidationResult) -> Dict:
|
||||
"""
|
||||
Create a tool result for a rejected tool call.
|
||||
|
||||
This allows the agent to see the rejection and self-correct.
|
||||
"""
|
||||
issues_text = "\n".join(
|
||||
f"- [{i.code}] {i.message}"
|
||||
for i in result.blocking_issues
|
||||
)
|
||||
|
||||
return {
|
||||
"role": "tool",
|
||||
"content": f"""Tool call rejected: {result.tool_name}
|
||||
|
||||
Issues found:
|
||||
{issues_text}
|
||||
|
||||
Please check the tool name and parameters, then try again with valid arguments.""",
|
||||
}
|
||||
Reference in New Issue
Block a user