Compare commits

..

3 Commits

Author SHA1 Message Date
c8186b1d11 test: add cron audit tests (#902)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 16s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 18s
Tests / test (pull_request) Failing after 18m39s
Tests / e2e (pull_request) Successful in 1m30s
2026-04-17 05:13:10 +00:00
db69ef8e3f docs: cron audit documentation (#902) 2026-04-17 05:13:09 +00:00
64dd3fda01 feat: add cron audit and cleanup script (#902) 2026-04-17 05:11:40 +00:00
5 changed files with 472 additions and 96 deletions

50
docs/cron-audit-890.md Normal file
View File

@@ -0,0 +1,50 @@
# Cron Audit & Cleanup
Identifies and removes dead/stale cron jobs that waste scheduler cycles.
## Problem
Of 69 cron jobs, 9 had zero completions — running on schedule but never producing results. Each execution costs API tokens and scheduler cycles.
## Dead Jobs Found (2026-04-14)
| Job | Schedule | Completions |
|-----|----------|-------------|
| exp-swarm-pipeline | every 10 min | 0 |
| exp-music-generator | every 2h | 0 |
| exp-paper-citations | every 3h | 0 |
| exp-gbrain-patterns | every 2h | 0 |
| exp-infra-hardening | every 2h | 0 |
| gemma4-multimodal-burn | every 1h | 0 |
| morning-paper-report | daily | 0 |
| overnight-collector | every 15 min | 0 |
| morning-experiment-report | daily | 0 |
## Usage
```bash
# Show dead jobs (zero completions)
python3 scripts/cron_audit.py
# Show all jobs with status
python3 scripts/cron_audit.py --all
# Find stale jobs (no runs in 7 days)
python3 scripts/cron_audit.py --older-than 7
# Disable dead jobs (pause them)
python3 scripts/cron_audit.py --disable
# Delete dead jobs permanently
python3 scripts/cron_audit.py --delete
# JSON output for automation
python3 scripts/cron_audit.py --json
```
## Job Categories
- **Healthy**: enabled, has completions, ran recently
- **Dead**: enabled, zero completions, exists for N+ days
- **Stale**: enabled, no successful run in N days
- **Disabled**: paused or completed

View File

@@ -1,68 +0,0 @@
# Tool Investigation Report: Top 5 Recommendations from awesome-ai-tools
**Generated:** 2026-04-20 | **Source:** [formatho/awesome-ai-tools](https://github.com/formatho/awesome-ai-tools)
---
## Methodology
Scanned 795 tools across 10 categories from the awesome-ai-tools repository. Evaluated each tool against Hermes Agent's architecture and needs:
- **Memory/Context**: Persistent memory, conversation history, knowledge graphs
- **Inference Optimization**: Token efficiency, local model serving, routing
- **Agent Orchestration**: Multi-agent coordination, fleet management
- **Workflow Automation**: Task decomposition, scheduling, pipelines
- **Retrieval/RAG**: Semantic search, document understanding, context injection
Each tool scored on: GitHub stars, development activity (freshness), integration potential, and impact on Hermes.
---
## Top 5 Recommended Tools
| Rank | Tool | Stars | Category | Integration Effort | Impact | Why It Fits Hermes |
|------|------|-------|----------|-------------------|--------|---------------------|
| 1 | **[LiteLLM](https://github.com/BerriAI/litellm)** | 76k+ | Inference Optimization | 2/5 | 5/5 | Unified API gateway for 100+ LLM providers with cost tracking, guardrails, load balancing, and logging. Hermes already routes through multiple providers — LiteLLM could replace custom provider routing with battle-tested load balancing and automatic fallback. Direct drop-in for `provider` abstraction layer. Native support for Bedrock, Azure, OpenAI, VertexAI, Anthropic, Ollama, vLLM. Would reduce Hermes's provider management code by ~60%. |
| 2 | **[Mem0](https://github.com/mem0ai/mem0)** | 53k+ | Memory/Context | 3/5 | 5/5 | Universal memory layer for AI agents with persistent, searchable memory across sessions. Hermes has session memory but lacks a structured long-term memory system. Mem0 provides automatic memory extraction from conversations, semantic search over memories, and memory decay/pruning. Could replace/enhance the current memory tool with a purpose-built agent memory infrastructure. Supports Pinecone, Qdrant, ChromaDB backends. |
| 3 | **[RAGFlow](https://github.com/infiniflow/ragflow)** | 77k+ | Retrieval/RAG | 4/5 | 4/5 | Open-source RAG engine with deep document understanding, OCR, and agent capabilities. Hermes's current retrieval is limited to web search and file reading. RAGFlow adds visual document parsing (PDF/Word/PPT with tables, charts, formulas), chunk-level citation, and configurable retrieval strategies. Would massively upgrade Hermes's document processing capabilities. Docker-deployable, compatible with local models. |
| 4 | **[LiteRT-LM](https://github.com/google-ai-edge/LiteRT-LM)** | 3.7k | Inference Optimization | 3/5 | 4/5 | C++ implementation of Google's LiteRT for efficient on-device language model inference. Hermes supports local models via Ollama but lacks optimized on-device inference for edge/mobile. LiteRT-LM provides sub-second inference on commodity hardware with minimal memory footprint. Could power a "Hermes lite" mode for offline/edge deployments. Active development (Fresh status), backed by Google AI Edge team. |
| 5 | **[Claude-Mem](https://github.com/thedotmack/claude-mem)** | 61k+ | Memory/Context | 2/5 | 3/5 | Automatic session capture and context injection for coding agents. Compresses session history with AI and injects relevant context into future sessions. Pattern directly applicable to Hermes's cross-session persistence problem. Uses agent SDK for intelligent compression — could enhance Hermes's session_search with automatic relevance-weighted recall. Lightweight integration, focused on the exact pain point of context loss between sessions. |
---
## Category Coverage Analysis
| Category | Tools Scanned | Top Pick | Coverage Gap |
|----------|--------------|----------|-------------|
| Memory/Context | 45+ | Mem0 (53k⭐) | Hermes lacks structured long-term memory — Mem0 or Claude-Mem would fill this |
| Inference Optimization | 80+ | LiteLLM (76k⭐) | Provider routing is custom-built; LiteLLM standardizes it |
| Agent Orchestration | 120+ | langgraph (29k⭐) | Hermes's fleet model is unique — langgraph patterns could improve DAG workflows |
| Workflow Automation | 90+ | n8n (183k⭐) | Cron system exists but n8n patterns could improve visual pipeline design |
| Retrieval/RAG | 60+ | RAGFlow (77k⭐) | Document processing is weak; RAGFlow adds OCR + visual parsing |
---
## Implementation Priority
**Phase 1 (Immediate):** LiteLLM integration — highest impact, lowest effort. Replace custom provider routing with LiteLLM's unified API. Estimated: 2-3 days.
**Phase 2 (Short-term):** Mem0 memory layer — critical for agent maturity. Add structured memory extraction and retrieval. Estimated: 1 week.
**Phase 3 (Medium-term):** RAGFlow document engine — significant capability upgrade. Requires Docker setup and integration with existing file tools. Estimated: 1-2 weeks.
---
## Honorable Mentions
- **[GPTCache](https://github.com/zilliztech/GPTCache)** (8k⭐): Semantic cache for LLMs — could reduce API costs by 30-50% for repeated queries
- **[promptfoo](https://github.com/promptfoo/promptfoo)** (20k⭐): LLM testing/evaluation framework — essential for quality assurance
- **[PageIndex](https://github.com/VectifyAI/PageIndex)** (25k⭐): Vectorless reasoning-based RAG — next-gen retrieval without embeddings
- **[rtk](https://github.com/rtk-ai/rtk)** (28k⭐): CLI proxy that reduces token consumption 60-90% — directly relevant to cost optimization
---
## Data Sources
- Repository: https://github.com/formatho/awesome-ai-tools
- Total tools cataloged: 795
- Categories analyzed: Agents & Automation, Developer Tools, LLMs & Chatbots, Research & Data, Productivity
- Freshness filter: Prioritized tools with Fresh (≤7d) or Recent (≤30d) status

279
scripts/cron_audit.py Normal file
View File

@@ -0,0 +1,279 @@
#!/usr/bin/env python3
"""
Cron Audit & Cleanup — find and remove dead/stale cron jobs.
Identifies jobs that waste scheduler cycles:
- Dead jobs: zero completions despite running for days
- Stale jobs: no successful run in N days
- Error jobs: high error ratio
Usage:
# Show dead jobs (zero completions)
python3 scripts/cron_audit.py
# Show stale jobs (no runs in 7 days)
python3 scripts/cron_audit.py --older-than 7
# Show all jobs with status
python3 scripts/cron_audit.py --all
# Disable dead jobs (sets enabled=False, state=paused)
python3 scripts/cron_audit.py --disable
# Delete dead jobs permanently
python3 scripts/cron_audit.py --delete
# Custom threshold: dead = 0 completions after N days
python3 scripts/cron_audit.py --min-age 3
# JSON output
python3 scripts/cron_audit.py --json
"""
import argparse
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
# hermes cron is a sibling module
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
try:
from cron.jobs import (
list_jobs,
load_jobs,
save_jobs,
update_job,
remove_job,
)
except ImportError:
# Fallback: direct file access
JOBS_FILE = Path.home() / ".hermes" / "cron" / "jobs.json"
def load_jobs() -> list:
if JOBS_FILE.exists():
return json.loads(JOBS_FILE.read_text())
return []
def save_jobs(jobs: list):
JOBS_FILE.parent.mkdir(parents=True, exist_ok=True)
JOBS_FILE.write_text(json.dumps(jobs, indent=2))
def list_jobs(include_disabled=False):
jobs = load_jobs()
if not include_disabled:
jobs = [j for j in jobs if j.get("enabled", True)]
return jobs
def update_job(job_id, updates):
jobs = load_jobs()
for job in jobs:
if job["id"] == job_id:
job.update(updates)
save_jobs(jobs)
return job
return None
def remove_job(job_id):
jobs = load_jobs()
original = len(jobs)
jobs = [j for j in jobs if j["id"] != job_id]
if len(jobs) < original:
save_jobs(jobs)
return True
return False
# ── Analysis ──────────────────────────────────────────────────────────────
def parse_schedule_display(job: dict) -> str:
"""Get human-readable schedule from job."""
sched = job.get("schedule", {})
if isinstance(sched, dict):
return sched.get("display", job.get("schedule_display", "?"))
return str(sched)
def get_last_run_age(job: dict) -> Optional[timedelta]:
"""Get time since last run."""
last = job.get("last_run_at")
if not last:
return None
try:
if isinstance(last, str):
last_dt = datetime.fromisoformat(last.replace("Z", "+00:00"))
else:
return None
return datetime.now(timezone.utc) - last_dt
except (ValueError, TypeError):
return None
def analyze_jobs(min_age_days: int = 0, stale_days: int = 0) -> Dict[str, List[dict]]:
"""Analyze all jobs and categorize them.
Returns dict with keys: dead, stale, healthy, disabled, completed.
"""
all_jobs = list_jobs(include_disabled=True)
now = datetime.now(timezone.utc)
result = {"dead": [], "stale": [], "healthy": [], "disabled": [], "completed": []}
for job in all_jobs:
job_id = job.get("id", "?")
name = job.get("name", job_id)
enabled = job.get("enabled", True)
state = job.get("state", "scheduled")
completed = job.get("repeat", {}).get("completed", 0)
schedule = parse_schedule_display(job)
last_run = job.get("last_run_at")
last_status = job.get("last_status", "never")
last_error = job.get("last_error")
created = job.get("created_at", "")
# Calculate age
age_days = 0
if created:
try:
created_dt = datetime.fromisoformat(created.replace("Z", "+00:00"))
age_days = (now - created_dt).days
except (ValueError, TypeError):
pass
last_age = get_last_run_age(job)
entry = {
"id": job_id,
"name": name,
"schedule": schedule,
"enabled": enabled,
"state": state,
"completed": completed,
"last_run_at": last_run,
"last_status": last_status,
"last_error": last_error,
"age_days": age_days,
"last_run_age_days": last_age.days if last_age else None,
}
if not enabled or state == "completed":
result["disabled"].append(entry)
elif completed == 0 and age_days >= min_age_days:
result["dead"].append(entry)
elif stale_days > 0 and last_age and last_age.days >= stale_days:
result["stale"].append(entry)
else:
result["healthy"].append(entry)
return result
# ── Actions ───────────────────────────────────────────────────────────────
def disable_jobs(jobs: List[dict]) -> int:
"""Disable dead/stale jobs (pause them)."""
count = 0
for j in jobs:
result = update_job(j["id"], {"enabled": False, "state": "paused"})
if result:
count += 1
print(f" DISABLED: {j['name']} ({j['schedule']})")
return count
def delete_jobs(jobs: List[dict]) -> int:
"""Permanently delete jobs."""
count = 0
for j in jobs:
if remove_job(j["id"]):
count += 1
print(f" DELETED: {j['name']} ({j['schedule']})")
return count
# ── Report ────────────────────────────────────────────────────────────────
def print_table(jobs: List[dict], title: str):
"""Print a table of jobs."""
if not jobs:
return
print(f"
{title} ({len(jobs)}):")
print(f" {'Name':<35} {'Schedule':<15} {'Completed':<10} {'Last Run':<15} {'Status'}")
print(f" {'-'*35} {'-'*15} {'-'*10} {'-'*15} {'-'*10}")
for j in jobs:
last_run = "never"
if j["last_run_age_days"] is not None:
last_run = f"{j['last_run_age_days']}d ago"
elif j["last_run_at"]:
last_run = j["last_run_at"][:10]
status = j["last_status"] or "never"
print(f" {j['name']:<35} {j['schedule']:<15} {j['completed']:<10} {last_run:<15} {status}")
# ── CLI ───────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Cron audit & cleanup")
parser.add_argument("--all", action="store_true", help="Show all jobs including healthy")
parser.add_argument("--older-than", type=int, default=0,
help="Stale threshold in days (jobs with no runs in N days)")
parser.add_argument("--min-age", type=int, default=0,
help="Minimum job age in days to be considered dead")
parser.add_argument("--disable", action="store_true", help="Disable dead jobs")
parser.add_argument("--delete", action="store_true", help="Delete dead jobs permanently")
parser.add_argument("--json", dest="json_output", action="store_true", help="JSON output")
args = parser.parse_args()
analysis = analyze_jobs(min_age_days=args.min_age, stale_days=args.older_than)
if args.json_output:
print(json.dumps(analysis, indent=2))
return
# Summary
total = sum(len(v) for v in analysis.values())
print(f"Cron Audit — {total} total jobs")
print(f" Healthy: {len(analysis['healthy'])}")
print(f" Dead: {len(analysis['dead'])}")
print(f" Stale: {len(analysis['stale'])}")
print(f" Disabled: {len(analysis['disabled'])}")
print(f" Completed: {len(analysis['completed'])}")
if args.all:
print_table(analysis["healthy"], "HEALTHY")
print_table(analysis["dead"], "DEAD (zero completions)")
print_table(analysis["stale"], "STALE (no recent runs)")
if not args.disable and not args.delete:
if analysis["dead"] or analysis["stale"]:
print(f"
To clean up: --disable (pause) or --delete (permanent)")
return
targets = analysis["dead"] + analysis["stale"]
if not targets:
print("
Nothing to clean up.")
return
if args.delete:
confirm = input(f"
Delete {len(targets)} jobs permanently? [y/N] ")
if confirm.lower() != "y":
print("Aborted.")
return
count = delete_jobs(targets)
print(f"
Deleted {count} jobs.")
elif args.disable:
count = disable_jobs(targets)
print(f"
Disabled {count} jobs.")
if __name__ == "__main__":
main()

143
tests/test_cron_audit.py Normal file
View File

@@ -0,0 +1,143 @@
"""Tests for cron audit script."""
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "scripts"))
# ---------------------------------------------------------------------------
# Test data
# ---------------------------------------------------------------------------
def _make_job(name, job_id=None, completed=0, enabled=True, state="scheduled",
last_run=None, last_status=None, created=None, schedule="every 1h"):
return {
"id": job_id or f"job-{name}",
"name": name,
"enabled": enabled,
"state": state,
"schedule": {"display": schedule} if isinstance(schedule, str) else schedule,
"repeat": {"completed": completed},
"last_run_at": last_run,
"last_status": last_status,
"created_at": created or datetime.now(timezone.utc).isoformat(),
}
# ---------------------------------------------------------------------------
# analyze_jobs
# ---------------------------------------------------------------------------
class TestAnalyzeJobs:
@patch("cron_audit.list_jobs")
def test_dead_job_detected(self, mock_list):
"""Job with 0 completions and old creation date is dead."""
old_date = (datetime.now(timezone.utc) - timedelta(days=5)).isoformat()
mock_list.return_value = [
_make_job("dead-job", completed=0, created=old_date),
]
from cron_audit import analyze_jobs
result = analyze_jobs(min_age_days=1)
assert len(result["dead"]) == 1
assert result["dead"][0]["name"] == "dead-job"
@patch("cron_audit.list_jobs")
def test_healthy_job_not_dead(self, mock_list):
"""Job with completions is healthy."""
mock_list.return_value = [
_make_job("good-job", completed=42, last_run=datetime.now(timezone.utc).isoformat()),
]
from cron_audit import analyze_jobs
result = analyze_jobs()
assert len(result["dead"]) == 0
assert len(result["healthy"]) == 1
@patch("cron_audit.list_jobs")
def test_disabled_job_categorized(self, mock_list):
"""Disabled job goes to disabled category."""
mock_list.return_value = [
_make_job("paused-job", enabled=False, state="paused"),
]
from cron_audit import analyze_jobs
result = analyze_jobs()
assert len(result["disabled"]) == 1
@patch("cron_audit.list_jobs")
def test_stale_job_detected(self, mock_list):
"""Job with last run > N days ago is stale."""
old_run = (datetime.now(timezone.utc) - timedelta(days=10)).isoformat()
mock_list.return_value = [
_make_job("stale-job", completed=5, last_run=old_run),
]
from cron_audit import analyze_jobs
result = analyze_jobs(stale_days=7)
assert len(result["stale"]) == 1
@patch("cron_audit.list_jobs")
def test_completed_job_disabled(self, mock_list):
"""Job with state=completed goes to disabled."""
mock_list.return_value = [
_make_job("done-job", completed=100, state="completed", enabled=False),
]
from cron_audit import analyze_jobs
result = analyze_jobs()
assert len(result["disabled"]) == 1
@patch("cron_audit.list_jobs")
def test_empty_jobs(self, mock_list):
"""No jobs returns empty categories."""
mock_list.return_value = []
from cron_audit import analyze_jobs
result = analyze_jobs()
for v in result.values():
assert len(v) == 0
# ---------------------------------------------------------------------------
# get_last_run_age
# ---------------------------------------------------------------------------
class TestGetLastRunAge:
def test_returns_timedelta(self):
from cron_audit import get_last_run_age
recent = (datetime.now(timezone.utc) - timedelta(hours=2)).isoformat()
age = get_last_run_age({"last_run_at": recent})
assert age is not None
assert age.days == 0
assert age.seconds >= 7000 # ~2 hours
def test_returns_none_for_no_run(self):
from cron_audit import get_last_run_age
assert get_last_run_age({"last_run_at": None}) is None
assert get_last_run_age({}) is None
# ---------------------------------------------------------------------------
# disable_jobs / delete_jobs
# ---------------------------------------------------------------------------
class TestJobActions:
@patch("cron_audit.update_job")
def test_disable_calls_update(self, mock_update):
mock_update.return_value = {"id": "x"}
from cron_audit import disable_jobs
jobs = [{"id": "j1", "name": "test", "schedule": "1h"}]
count = disable_jobs(jobs)
assert count == 1
mock_update.assert_called_once_with("j1", {"enabled": False, "state": "paused"})
@patch("cron_audit.remove_job")
def test_delete_calls_remove(self, mock_remove):
mock_remove.return_value = True
from cron_audit import delete_jobs
jobs = [{"id": "j1", "name": "test", "schedule": "1h"}]
count = delete_jobs(jobs)
assert count == 1
mock_remove.assert_called_once_with("j1")

View File

@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try: