Compare commits

..

1 Commits

Author SHA1 Message Date
55797c8a3e feat: add sampler.py — session value scorer (#17) 2026-04-15 03:02:12 +00:00
5 changed files with 353 additions and 961 deletions

View File

@@ -1,554 +0,0 @@
#!/usr/bin/env python3
"""
Automation Opportunity Finder — Scan fleet for manual processes that could be automated.
Analyzes:
1. Cron jobs — finds manual steps between scheduled tasks
2. Documentation — extracts TODO/FIXME/manual-step patterns
3. Scripts — detects repeated command sequences
4. Session transcripts — finds repeated tool-call patterns
Usage:
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --json
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --output proposals.json
"""
import argparse
import json
import os
import re
import sys
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
# ---------------------------------------------------------------------------
# Patterns that signal manual work
# ---------------------------------------------------------------------------
MANUAL_STEP_PATTERNS = [
# Explicit manual markers
(r"(?i)\bTODO[:\s]", "todo"),
(r"(?i)\bFIXME[:\s]", "fixme"),
(r"(?i)\bMANUAL[:\s]", "manual_step"),
(r"(?i)\bHACK[:\s]", "hack"),
(r"(?i)\bWORKAROUND[:\s]", "workaround"),
# Step-by-step instructions in docs
(r"(?i)^(\d+)[.\)]\s+(run|execute|ssh|scp|curl|cd|make|docker|ansible|git)", "sequential_step"),
# Explicitly manual operations
(r"(?i)\bmanually\b", "manual_keyword"),
(r"(?i)\bby hand\b", "manual_keyword"),
(r"(?i)\bdon\'?t forget to\b", "manual_keyword"),
(r"(?i)\bremember to\b", "manual_keyword"),
(r"(?i)\bmake sure to\b", "manual_keyword"),
]
# Shell commands that appear frequently in runbooks — signal automatable workflows
SHELL_COMMAND_PATTERNS = [
"ssh ", "scp ", "rsync ", "curl ", "wget ",
"docker ", "docker-compose ", "kubectl ",
"ansible-playbook ", "terraform ", "systemctl ",
"systemctl restart", "systemctl status",
"git push", "git pull", "git merge", "git checkout",
"pip install", "npm install", "cargo build",
]
# Directories to skip during scans — large/uninteresting trees
EXCLUDE_DIRS = frozenset({
"node_modules", "venv", ".venv", "__pycache__", ".git",
"site-packages", "dist", "build", ".tox", ".mypy_cache",
".pytest_cache", "coverage", ".next", "vendor",
"skills", # hermes skills dir is huge
"audio_cache", "skins", "profiles",
})
# Session tool calls that appear repeatedly — candidates for workflow automation
TOOL_SEQUENCE_MIN_OCCURRENCES = 3
# ---------------------------------------------------------------------------
# Analyzers
# ---------------------------------------------------------------------------
def analyze_cron_jobs(hermes_home: str) -> List[Dict[str, Any]]:
"""Analyze cron job definitions for automation gaps."""
proposals = []
cron_dir = Path(hermes_home) / "cron"
jobs_file = cron_dir / "jobs.json"
if not jobs_file.exists():
# Try YAML format
for ext in (".yaml", ".yml"):
alt = cron_dir / f"jobs{ext}"
if alt.exists():
jobs_file = alt
break
if not jobs_file.exists():
return proposals
try:
if jobs_file.suffix == ".json":
with open(jobs_file) as f:
jobs = json.load(f)
else:
try:
import yaml
with open(jobs_file) as f:
jobs = yaml.safe_load(f)
except ImportError:
return proposals
except (json.JSONDecodeError, Exception):
return proposals
if not isinstance(jobs, list):
return proposals
# Look for disabled jobs (someone turned them off — might need a different approach)
disabled = [j for j in jobs if not j.get("enabled", True)]
if disabled:
names = [j.get("name", j.get("id", "?")) for j in disabled[:5]]
proposals.append({
"category": "cron_disabled",
"title": f"{len(disabled)} disabled cron job(s) may need automation rework",
"description": f"These jobs were disabled: {', '.join(names)}. Investigate why and whether a different automation approach is needed.",
"confidence": 0.7,
"impact": "medium",
"sources": [str(jobs_file)],
})
# Look for jobs with high error counts
error_jobs = [j for j in jobs if j.get("last_status") == "error"]
if error_jobs:
names = [j.get("name", j.get("id", "?")) for j in error_jobs[:5]]
proposals.append({
"category": "cron_errors",
"title": f"{len(error_jobs)} cron job(s) failing — may need automation rework",
"description": f"Jobs with errors: {', '.join(names)}. Failure patterns suggest missing dependencies or fragile automation.",
"confidence": 0.8,
"impact": "high",
"sources": [str(jobs_file)],
})
# Look for jobs with delivery errors (platform issues)
delivery_errors = [j for j in jobs if j.get("last_delivery_error")]
if delivery_errors:
proposals.append({
"category": "cron_delivery",
"title": f"{len(delivery_errors)} cron job(s) have delivery failures",
"description": "Delivery failures suggest missing retry logic or platform integration gaps.",
"confidence": 0.75,
"impact": "medium",
"sources": [str(jobs_file)],
})
# Look for jobs on short intervals that could be event-driven
for job in jobs:
schedule = job.get("schedule", "")
# Check for very frequent schedules (every minute, every 5 min)
if isinstance(schedule, str) and re.match(r"^\*\/([1-5])\s", schedule):
proposals.append({
"category": "cron_frequency",
"title": f"Job '{job.get('name', job.get('id', '?'))}' runs every {schedule.split()[0]} — consider event-driven",
"description": f"High-frequency cron ({schedule}) may be better as event-driven or daemon.",
"confidence": 0.6,
"impact": "low",
"sources": [str(jobs_file)],
})
return proposals
def analyze_documents(root_dirs: List[str]) -> List[Dict[str, Any]]:
"""Scan documentation for manual step patterns."""
proposals = []
doc_extensions = {".md", ".txt", ".rst", ".adoc"}
findings_by_category = defaultdict(list)
for root_dir in root_dirs:
root = Path(root_dir)
if not root.exists():
continue
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix not in doc_extensions:
continue
# Skip excluded dirs and hidden dirs
parts = path.relative_to(root).parts if root in path.parents or root == path.parent else path.parts
if any(p.startswith(".") or p in EXCLUDE_DIRS for p in parts):
continue
if len(parts) > 8:
continue
try:
content = path.read_text(errors="replace")
except (PermissionError, OSError):
continue
lines = content.split("\n")
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped:
continue
for pattern, category in MANUAL_STEP_PATTERNS:
if re.search(pattern, stripped):
findings_by_category[category].append({
"file": str(path),
"line": i + 1,
"text": stripped[:200],
})
# Generate proposals from findings
for category, findings in findings_by_category.items():
if len(findings) < 2:
continue
file_count = len(set(f["file"] for f in findings))
proposals.append({
"category": f"manual_{category}",
"title": f"{len(findings)} '{category}' markers across {file_count} doc(s)",
"description": f"Found in: {', '.join(set(Path(f['file']).name for f in findings[:5]))}",
"confidence": 0.65,
"impact": "medium",
"sources": list(set(f["file"] for f in findings[:10])),
"details": findings[:5], # sample
})
return proposals
def analyze_scripts(root_dirs: List[str]) -> List[Dict[str, Any]]:
"""Detect repeated command sequences in scripts."""
proposals = []
script_extensions = {".py", ".sh", ".bash", ".zsh"}
command_counter = Counter()
command_locations = defaultdict(list)
for root_dir in root_dirs:
root = Path(root_dir)
if not root.exists():
continue
for path in root.rglob("*"):
if path.is_dir():
continue
if path.suffix not in script_extensions:
continue
parts = path.relative_to(root).parts if root in path.parents or root == path.parent else path.parts
if any(p.startswith(".") or p in EXCLUDE_DIRS for p in parts):
continue
if len(parts) > 8:
continue
try:
content = path.read_text(errors="replace")
except (PermissionError, OSError):
continue
lines = content.split("\n")
for i, line in enumerate(lines):
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
for cmd_prefix in SHELL_COMMAND_PATTERNS:
if cmd_prefix in stripped:
# Normalize the command
normalized = re.sub(r"\s+", " ", stripped)[:120]
command_counter[normalized] += 1
command_locations[normalized].append(f"{path}:{i+1}")
# Proposals for commands appearing 3+ times
for cmd, count in command_counter.most_common(20):
if count < 3:
break
locs = command_locations[cmd]
file_count = len(set(loc.split(":")[0] for loc in locs))
proposals.append({
"category": "repeated_command",
"title": f"Command repeated {count}x across {file_count} file(s): {cmd[:80]}",
"description": f"Locations: {', '.join(locs[:3])}",
"confidence": min(0.5 + (count * 0.1), 0.95),
"impact": "medium",
"sources": list(set(loc.split(":")[0] for loc in locs)),
})
return proposals
def analyze_session_transcripts(session_dirs: List[str]) -> List[Dict[str, Any]]:
"""Find repeated tool-call patterns in session transcripts."""
proposals = []
tool_sequence_counter = Counter()
tool_sequence_examples = {}
for session_dir in session_dirs:
session_path = Path(session_dir)
if not session_path.exists():
continue
for path in session_path.rglob("*.jsonl"):
try:
content = path.read_text(errors="replace")
except (PermissionError, OSError):
continue
# Extract tool calls in sequence
tool_sequence = []
for line in content.split("\n"):
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
# Look for tool calls in assistant messages
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
func_name = tc.get("function", {}).get("name", "?")
tool_sequence.append(func_name)
# Find 2-call sequences
for i in range(len(tool_sequence) - 1):
seq = (tool_sequence[i], tool_sequence[i + 1])
tool_sequence_counter[seq] += 1
if seq not in tool_sequence_examples:
tool_sequence_examples[seq] = str(path.name)
# Find 3-call sequences
for i in range(len(tool_sequence) - 2):
seq = (tool_sequence[i], tool_sequence[i + 1], tool_sequence[i + 2])
tool_sequence_counter[seq] += 1
if seq not in tool_sequence_examples:
tool_sequence_examples[seq] = str(path.name)
# Generate proposals for frequently repeated sequences
for seq, count in tool_sequence_counter.most_common(20):
if count < TOOL_SEQUENCE_MIN_OCCURRENCES:
break
seq_str = " -> ".join(seq)
proposals.append({
"category": "tool_sequence",
"title": f"Tool sequence '{seq_str}' repeated {count} times",
"description": f"Consider creating a workflow/skill that automates this sequence.",
"confidence": min(0.5 + (count * 0.05), 0.9),
"impact": "medium",
"sources": [tool_sequence_examples.get(seq, "unknown")],
})
return proposals
def analyze_shell_history(root_dirs: List[str]) -> List[Dict[str, Any]]:
"""Find repeated shell commands from history files."""
proposals = []
command_counter = Counter()
for root_dir in root_dirs:
root = Path(root_dir)
history_files = []
# Look for shell history files
for name in (".bash_history", ".zsh_history", ".python_history"):
p = root / name
if p.exists():
history_files.append(p)
# Also check in hermes home
for p in root.glob("**/*history*"):
if p.is_file() and p.suffix in ("", ".txt", ".log"):
history_files.append(p)
for hf in history_files:
try:
content = hf.read_text(errors="replace")
except (PermissionError, OSError):
continue
for line in content.split("\n"):
stripped = line.strip()
# ZSH history format: ": 1234567890:0;command"
stripped = re.sub(r"^:\s*\d+:\d+;", "", stripped)
if not stripped or len(stripped) < 5:
continue
# Skip trivial commands
if stripped in ("ls", "cd", "pwd", "clear", "exit"):
continue
command_counter[stripped] += 1
for cmd, count in command_counter.most_common(10):
if count < 5:
break
proposals.append({
"category": "shell_repetition",
"title": f"Shell command run {count}+ times: {cmd[:80]}",
"description": "Frequently repeated shell command — candidate for alias, function, or script.",
"confidence": min(0.4 + (count * 0.05), 0.85),
"impact": "low",
"sources": ["shell_history"],
})
return proposals
# ---------------------------------------------------------------------------
# Proposal output
# ---------------------------------------------------------------------------
def deduplicate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Remove duplicate proposals based on title similarity."""
seen_titles = set()
unique = []
for p in proposals:
# Normalize title for dedup
key = re.sub(r"\d+", "N", p["title"]).lower()
if key not in seen_titles:
seen_titles.add(key)
unique.append(p)
return unique
def rank_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Sort proposals by impact * confidence (highest first)."""
impact_weight = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}
return sorted(
proposals,
key=lambda p: impact_weight.get(p.get("impact", "low"), 0.2) * p.get("confidence", 0.5),
reverse=True,
)
def format_text_report(proposals: List[Dict[str, Any]]) -> str:
"""Format proposals as human-readable text."""
if not proposals:
return "No automation opportunities found."
lines = [
"=" * 70,
" AUTOMATION OPPORTUNITY REPORT",
f" Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
f" Proposals: {len(proposals)}",
"=" * 70,
"",
]
for i, p in enumerate(proposals, 1):
score = p.get("confidence", 0.5) * {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}.get(p.get("impact", "low"), 0.2)
lines.append(f"[{i}] {p['title']}")
lines.append(f" Category: {p['category']} | Impact: {p.get('impact','?')} | Confidence: {p.get('confidence',0):.0%} | Score: {score:.2f}")
lines.append(f" {p['description']}")
if p.get("sources"):
lines.append(f" Sources: {', '.join(p['sources'][:3])}")
lines.append("")
# Summary by category
cat_counts = Counter(p["category"] for p in proposals)
lines.append("-" * 70)
lines.append("Summary by category:")
for cat, count in cat_counts.most_common():
lines.append(f" {cat}: {count}")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Find automation opportunities across the fleet")
parser.add_argument("--hermes-home", default=os.path.expanduser("~/.hermes"),
help="Path to hermes home directory (default: ~/.hermes)")
parser.add_argument("--scan-dirs", nargs="*",
help="Additional directories to scan (default: hermes-home + cwd)")
parser.add_argument("--session-dirs", nargs="*",
help="Session transcript directories (default: hermes-home/sessions)")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--output", "-o", help="Write proposals to file")
parser.add_argument("--min-confidence", type=float, default=0.3,
help="Minimum confidence threshold (default: 0.3)")
parser.add_argument("--categories", nargs="*",
help="Only include these categories (cron, docs, scripts, sessions, shell)")
args = parser.parse_args()
hermes_home = os.path.expanduser(args.hermes_home)
# Default scan directories
scan_dirs = [hermes_home, "."]
if args.scan_dirs:
scan_dirs.extend(args.scan_dirs)
session_dirs = [os.path.join(hermes_home, "sessions")]
if args.session_dirs:
session_dirs.extend(args.session_dirs)
# Also check common session locations
for subdir in ("transcripts", "session-db"):
p = os.path.join(hermes_home, subdir)
if os.path.isdir(p):
session_dirs.append(p)
categories = set(args.categories) if args.categories else {"cron", "docs", "scripts", "sessions", "shell"}
# Run analyzers
all_proposals = []
if "cron" in categories:
all_proposals.extend(analyze_cron_jobs(hermes_home))
if "docs" in categories:
all_proposals.extend(analyze_documents(scan_dirs))
if "scripts" in categories:
all_proposals.extend(analyze_scripts(scan_dirs))
if "sessions" in categories:
all_proposals.extend(analyze_session_transcripts(session_dirs))
if "shell" in categories:
all_proposals.extend(analyze_shell_history(scan_dirs))
# Deduplicate and rank
all_proposals = deduplicate_proposals(all_proposals)
all_proposals = rank_proposals(all_proposals)
# Filter by confidence
all_proposals = [p for p in all_proposals if p.get("confidence", 0) >= args.min_confidence]
# Output
output = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"hermes_home": hermes_home,
"scan_dirs": scan_dirs,
"total_proposals": len(all_proposals),
"proposals": all_proposals,
}
if args.json:
result = json.dumps(output, indent=2)
else:
result = format_text_report(all_proposals)
if args.output:
with open(args.output, "w") as f:
if args.json:
json.dump(output, f, indent=2)
else:
f.write(result)
print(f"Written to {args.output}", file=sys.stderr)
else:
print(result)
return 0 if all_proposals else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,131 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

353
scripts/sampler.py Normal file
View File

@@ -0,0 +1,353 @@
#!/usr/bin/env python3
"""
sampler.py — Score and rank sessions by harvest value.
With 20k+ sessions on disk, we can't harvest all at once. This script
scores each session by how likely it is to contain valuable knowledge,
so the harvester processes the best ones first.
Scoring strategy:
- Recency: last 7d=3pts, last 30d=2pts, older=1pt
- Length: >50 messages=3pts, >20=2pts, <20=1pt
- Repo uniqueness: first session for a repo=5pts, otherwise=1pt
- Outcome: failure=3pts (most to learn), success=2pts, unknown=1pt
- Tool calls: >10 tool invocations=2pts (complex sessions)
Usage:
python3 sampler.py --count 100 # Top 100 sessions
python3 sampler.py --repo the-nexus --count 20 # Top 20 for a repo
python3 sampler.py --since 2026-04-01 # All sessions since date
python3 sampler.py --count 50 --min-score 8 # Only high-value sessions
python3 sampler.py --count 100 --output sample.json # Save to file
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
# --- Fast session scanning (no full parse) ---
def scan_session_fast(path: str) -> dict:
"""Extract scoring metadata from a session without parsing the full JSONL.
Reads only: first line, last ~20 lines, and line count. This processes
20k sessions in seconds instead of minutes.
"""
meta = {
'path': path,
'message_count': 0,
'has_tool_calls': False,
'tool_call_count': 0,
'first_timestamp': '',
'last_timestamp': '',
'is_failure': False,
'repos_mentioned': set(),
'first_role': '',
'last_content_preview': '',
}
try:
file_size = os.path.getsize(path)
if file_size == 0:
return meta
with open(path, 'r', encoding='utf-8', errors='replace') as f:
# Read first line for timestamp + role
first_line = f.readline().strip()
if first_line:
try:
first_msg = json.loads(first_line)
meta['first_timestamp'] = first_msg.get('timestamp', '')
meta['first_role'] = first_msg.get('role', '')
except json.JSONDecodeError:
pass
# Fast line count + collect tail lines
# For the tail, seek to near end of file
tail_lines = []
line_count = 1 # already read first
if file_size > 8192:
# Seek to last 8KB for tail sampling
f.seek(max(0, file_size - 8192))
f.readline() # skip partial line
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
# We lost the exact count for big files — estimate from file size
# Average JSONL line is ~500 bytes
if line_count < 100:
line_count = max(line_count, file_size // 500)
else:
# Small file — read all
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
meta['message_count'] = line_count
# Parse tail lines for outcome, tool calls, repos
for line in tail_lines[-30:]: # last 30 non-empty lines
try:
msg = json.loads(line)
# Track last timestamp
ts = msg.get('timestamp', '')
if ts:
meta['last_timestamp'] = ts
# Count tool calls
if msg.get('tool_calls'):
meta['has_tool_calls'] = True
meta['tool_call_count'] += len(msg['tool_calls'])
# Detect failure signals in content
content = ''
if isinstance(msg.get('content'), str):
content = msg['content'].lower()
elif isinstance(msg.get('content'), list):
for part in msg['content']:
if isinstance(part, dict) and part.get('type') == 'text':
content += part.get('text', '').lower()
if content:
meta['last_content_preview'] = content[:200]
failure_signals = ['error', 'failed', 'cannot', 'unable',
'exception', 'traceback', 'rejected', 'denied']
if any(sig in content for sig in failure_signals):
meta['is_failure'] = True
# Extract repo references from tool call arguments
if msg.get('tool_calls'):
for tc in msg['tool_calls']:
args = tc.get('function', {}).get('arguments', '')
if isinstance(args, str):
# Look for repo patterns
for pattern in ['Timmy_Foundation/', 'Rockachopa/', 'compounding-intelligence', 'the-nexus', 'timmy-home', 'hermes-agent', 'the-beacon', 'the-door']:
if pattern in args:
repo = pattern.rstrip('/')
meta['repos_mentioned'].add(repo)
except json.JSONDecodeError:
continue
except (IOError, OSError):
pass
meta['repos_mentioned'] = list(meta['repos_mentioned'])
return meta
# --- Filename timestamp parsing ---
def parse_session_timestamp(filename: str) -> Optional[datetime]:
"""Parse timestamp from session filename.
Common formats:
session_20260413_123456_hash.jsonl
20260413_123456_hash.jsonl
"""
stem = Path(filename).stem
parts = stem.split('_')
# Try session_YYYYMMDD_HHMMSS format
for i, part in enumerate(parts):
if len(part) == 8 and part.isdigit():
date_part = part
time_part = parts[i + 1] if i + 1 < len(parts) and len(parts[i + 1]) == 6 else '000000'
try:
return datetime.strptime(f"{date_part}_{time_part}", '%Y%m%d_%H%M%S').replace(tzinfo=timezone.utc)
except ValueError:
continue
# Fallback: use file modification time
return None
# --- Scoring ---
def score_session(meta: dict, now: datetime, seen_repos: set) -> tuple[int, dict]:
"""Score a session for harvest value. Returns (score, breakdown)."""
score = 0
breakdown = {}
# 1. Recency
ts = parse_session_timestamp(os.path.basename(meta['path']))
if ts is None:
# Fallback to mtime
try:
ts = datetime.fromtimestamp(os.path.getmtime(meta['path']), tz=timezone.utc)
except OSError:
ts = now - timedelta(days=365)
age_days = (now - ts).days
if age_days <= 7:
recency = 3
elif age_days <= 30:
recency = 2
else:
recency = 1
score += recency
breakdown['recency'] = recency
# 2. Length
count = meta['message_count']
if count > 50:
length = 3
elif count > 20:
length = 2
else:
length = 1
score += length
breakdown['length'] = length
# 3. Repo uniqueness (first session mentioning a repo gets bonus)
repo_score = 0
for repo in meta.get('repos_mentioned', []):
if repo not in seen_repos:
seen_repos.add(repo)
repo_score = max(repo_score, 5)
else:
repo_score = max(repo_score, 1)
score += repo_score
breakdown['repo_unique'] = repo_score
# 4. Outcome
if meta.get('is_failure'):
outcome = 3
elif meta.get('last_content_preview', '').strip():
outcome = 2 # has some content = likely completed
else:
outcome = 1
score += outcome
breakdown['outcome'] = outcome
# 5. Tool calls
if meta.get('tool_call_count', 0) > 10:
tool = 2
else:
tool = 0
score += tool
breakdown['tool_calls'] = tool
return score, breakdown
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Score and rank sessions for harvesting")
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files')
parser.add_argument('--count', type=int, default=100, help='Number of top sessions to return')
parser.add_argument('--repo', default='', help='Filter to sessions mentioning this repo')
parser.add_argument('--since', default='', help='Only score sessions after this date (YYYY-MM-DD)')
parser.add_argument('--min-score', type=int, default=0, help='Minimum score threshold')
parser.add_argument('--output', default='', help='Output file (JSON). Default: stdout')
parser.add_argument('--format', choices=['json', 'paths', 'table'], default='table',
help='Output format: json (full), paths (one per line), table (human)')
parser.add_argument('--top-percent', type=float, default=0, help='Return top N%% instead of --count')
args = parser.parse_args()
sessions_dir = Path(args.sessions_dir)
if not sessions_dir.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
sys.exit(1)
# Find all JSONL files
print(f"Scanning {sessions_dir}...", file=sys.stderr)
t0 = time.time()
session_files = list(sessions_dir.glob('*.jsonl'))
total = len(session_files)
print(f"Found {total} session files", file=sys.stderr)
# Parse since date
since_dt = None
if args.since:
since_dt = datetime.strptime(args.since, '%Y-%m-%d').replace(tzinfo=timezone.utc)
# Score all sessions
now = datetime.now(timezone.utc)
seen_repos = set() # Track repos for uniqueness scoring
scored = []
for i, sf in enumerate(session_files):
# Date filter (fast path: check filename first)
if since_dt:
ts = parse_session_timestamp(sf.name)
if ts and ts < since_dt:
continue
meta = scan_session_fast(str(sf))
# Repo filter
if args.repo:
repos = meta.get('repos_mentioned', [])
if args.repo.lower() not in [r.lower() for r in repos]:
# Also check filename
if args.repo.lower() not in sf.name.lower():
continue
score, breakdown = score_session(meta, now, seen_repos)
if score >= args.min_score:
scored.append({
'path': str(sf),
'filename': sf.name,
'score': score,
'breakdown': breakdown,
'message_count': meta['message_count'],
'repos': meta['repos_mentioned'],
'is_failure': meta['is_failure'],
})
if (i + 1) % 5000 == 0:
elapsed = time.time() - t0
print(f" Scanned {i + 1}/{total} ({elapsed:.1f}s)", file=sys.stderr)
elapsed = time.time() - t0
print(f"Scored {len(scored)} sessions in {elapsed:.1f}s", file=sys.stderr)
# Sort by score descending
scored.sort(key=lambda x: x['score'], reverse=True)
# Apply count or percent
if args.top_percent > 0:
count = max(1, int(len(scored) * args.top_percent / 100))
else:
count = args.count
scored = scored[:count]
# Output
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(scored, f, indent=2)
print(f"Wrote {len(scored)} sessions to {args.output}", file=sys.stderr)
elif args.format == 'json':
json.dump(scored, sys.stdout, indent=2)
elif args.format == 'paths':
for s in scored:
print(s['path'])
else: # table
print(f"{'SCORE':>5} {'MSGS':>5} {'REPOS':<25} {'FILE'}")
print(f"{'-'*5} {'-'*5} {'-'*25} {'-'*40}")
for s in scored:
repos = ', '.join(s['repos'][:2]) if s['repos'] else '-'
fail = ' FAIL' if s['is_failure'] else ''
print(f"{s['score']:>5} {s['message_count']:>5} {repos:<25} {s['filename'][:40]}{fail}")
if __name__ == '__main__':
main()

View File

@@ -1,147 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/automation_opportunity_finder.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location(
"aof",
os.path.join(os.path.dirname(__file__) or ".", "automation_opportunity_finder.py"),
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
def test_analyze_cron_jobs_no_file():
"""Returns empty list when no cron jobs file exists."""
with tempfile.TemporaryDirectory() as tmpdir:
result = mod.analyze_cron_jobs(tmpdir)
assert result == []
print("PASS: test_analyze_cron_jobs_no_file")
def test_analyze_cron_jobs_disabled():
"""Detects disabled cron jobs."""
with tempfile.TemporaryDirectory() as tmpdir:
cron_dir = os.path.join(tmpdir, "cron")
os.makedirs(cron_dir)
jobs = [
{"id": "j1", "name": "backup", "enabled": False, "schedule": "0 * * * *"},
{"id": "j2", "name": "health", "enabled": True, "schedule": "*/5 * * * *"},
]
with open(os.path.join(cron_dir, "jobs.json"), "w") as f:
json.dump(jobs, f)
result = mod.analyze_cron_jobs(tmpdir)
assert any(p["category"] == "cron_disabled" for p in result)
print("PASS: test_analyze_cron_jobs_disabled")
def test_analyze_cron_jobs_errors():
"""Detects cron jobs with error status."""
with tempfile.TemporaryDirectory() as tmpdir:
cron_dir = os.path.join(tmpdir, "cron")
os.makedirs(cron_dir)
jobs = [
{"id": "j1", "name": "broken", "enabled": True, "last_status": "error", "schedule": "0 * * * *"},
]
with open(os.path.join(cron_dir, "jobs.json"), "w") as f:
json.dump(jobs, f)
result = mod.analyze_cron_jobs(tmpdir)
assert any(p["category"] == "cron_errors" for p in result)
print("PASS: test_analyze_cron_jobs_errors")
def test_analyze_documents_finds_todos():
"""Detects TODO markers in documents."""
with tempfile.TemporaryDirectory() as tmpdir:
docs_dir = os.path.join(tmpdir, "docs")
os.makedirs(docs_dir)
for i in range(3):
with open(os.path.join(docs_dir, f"guide{i}.md"), "w") as f:
f.write(f"# Guide {i}\n\nTODO: Automate this step\n")
result = mod.analyze_documents([tmpdir])
assert any(p["category"] == "manual_todo" for p in result)
todo_proposals = [p for p in result if p["category"] == "manual_todo"]
assert todo_proposals[0]["details"].__len__() == 3
print("PASS: test_analyze_documents_finds_todos")
def test_analyze_scripts_repeated_commands():
"""Detects repeated shell commands across scripts."""
with tempfile.TemporaryDirectory() as tmpdir:
scripts_dir = os.path.join(tmpdir, "scripts")
os.makedirs(scripts_dir)
repeated_cmd = "docker restart myapp"
for i in range(4):
with open(os.path.join(scripts_dir, f"deploy{i}.sh"), "w") as f:
f.write(f"#!/bin/bash\n{repeated_cmd}\n")
result = mod.analyze_scripts([tmpdir])
assert any(p["category"] == "repeated_command" for p in result)
print("PASS: test_analyze_scripts_repeated_commands")
def test_analyze_session_transcripts():
"""Detects repeated tool-call sequences."""
with tempfile.TemporaryDirectory() as tmpdir:
sessions_dir = os.path.join(tmpdir, "sessions")
os.makedirs(sessions_dir)
for i in range(4):
with open(os.path.join(sessions_dir, f"session{i}.jsonl"), "w") as f:
f.write(json.dumps({"role": "user", "content": f"task {i}"}) + "\n")
f.write(json.dumps({
"role": "assistant",
"content": "working",
"tool_calls": [
{"function": {"name": "read_file"}},
{"function": {"name": "write_file"}},
]
}) + "\n")
result = mod.analyze_session_transcripts([sessions_dir])
assert any(p["category"] == "tool_sequence" for p in result)
seq_proposals = [p for p in result if p["category"] == "tool_sequence"]
assert any("read_file" in p["title"] and "write_file" in p["title"] for p in seq_proposals)
print("PASS: test_analyze_session_transcripts")
def test_deduplicate_proposals():
"""Deduplicates proposals with similar titles."""
proposals = [
{"title": "TODO found 3 times", "category": "manual_todo", "confidence": 0.7, "impact": "medium", "description": "x", "sources": []},
{"title": "TODO found 3 times", "category": "manual_todo", "confidence": 0.7, "impact": "medium", "description": "x", "sources": []},
{"title": "FIXME found 5 times", "category": "manual_fixme", "confidence": 0.8, "impact": "medium", "description": "y", "sources": []},
]
result = mod.deduplicate_proposals(proposals)
assert len(result) == 2
print("PASS: test_deduplicate_proposals")
def test_rank_proposals():
"""Ranks proposals by impact * confidence."""
proposals = [
{"title": "low", "category": "x", "confidence": 0.9, "impact": "low", "description": "", "sources": []},
{"title": "high", "category": "x", "confidence": 0.8, "impact": "high", "description": "", "sources": []},
{"title": "med", "category": "x", "confidence": 0.7, "impact": "medium", "description": "", "sources": []},
]
result = mod.rank_proposals(proposals)
assert result[0]["title"] == "high"
assert result[-1]["title"] == "low"
print("PASS: test_rank_proposals")
if __name__ == "__main__":
tests = [v for k, v in globals().items() if k.startswith("test_")]
passed = 0
failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
print(f"FAIL: {t.__name__}: {e}")
failed += 1
print(f"\n{passed}/{passed+failed} tests passed")
sys.exit(1 if failed else 0)

View File

@@ -1,129 +0,0 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()