555 lines
20 KiB
Python
555 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Automation Opportunity Finder — Scan fleet for manual processes that could be automated.
|
|
|
|
Analyzes:
|
|
1. Cron jobs — finds manual steps between scheduled tasks
|
|
2. Documentation — extracts TODO/FIXME/manual-step patterns
|
|
3. Scripts — detects repeated command sequences
|
|
4. Session transcripts — finds repeated tool-call patterns
|
|
|
|
Usage:
|
|
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes
|
|
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --json
|
|
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --output proposals.json
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Patterns that signal manual work
|
|
# ---------------------------------------------------------------------------
|
|
|
|
MANUAL_STEP_PATTERNS = [
|
|
# Explicit manual markers
|
|
(r"(?i)\bTODO[:\s]", "todo"),
|
|
(r"(?i)\bFIXME[:\s]", "fixme"),
|
|
(r"(?i)\bMANUAL[:\s]", "manual_step"),
|
|
(r"(?i)\bHACK[:\s]", "hack"),
|
|
(r"(?i)\bWORKAROUND[:\s]", "workaround"),
|
|
# Step-by-step instructions in docs
|
|
(r"(?i)^(\d+)[.\)]\s+(run|execute|ssh|scp|curl|cd|make|docker|ansible|git)", "sequential_step"),
|
|
# Explicitly manual operations
|
|
(r"(?i)\bmanually\b", "manual_keyword"),
|
|
(r"(?i)\bby hand\b", "manual_keyword"),
|
|
(r"(?i)\bdon\'?t forget to\b", "manual_keyword"),
|
|
(r"(?i)\bremember to\b", "manual_keyword"),
|
|
(r"(?i)\bmake sure to\b", "manual_keyword"),
|
|
]
|
|
|
|
# Shell commands that appear frequently in runbooks — signal automatable workflows
|
|
SHELL_COMMAND_PATTERNS = [
|
|
"ssh ", "scp ", "rsync ", "curl ", "wget ",
|
|
"docker ", "docker-compose ", "kubectl ",
|
|
"ansible-playbook ", "terraform ", "systemctl ",
|
|
"systemctl restart", "systemctl status",
|
|
"git push", "git pull", "git merge", "git checkout",
|
|
"pip install", "npm install", "cargo build",
|
|
]
|
|
|
|
# Directories to skip during scans — large/uninteresting trees
|
|
EXCLUDE_DIRS = frozenset({
|
|
"node_modules", "venv", ".venv", "__pycache__", ".git",
|
|
"site-packages", "dist", "build", ".tox", ".mypy_cache",
|
|
".pytest_cache", "coverage", ".next", "vendor",
|
|
"skills", # hermes skills dir is huge
|
|
"audio_cache", "skins", "profiles",
|
|
})
|
|
|
|
# Session tool calls that appear repeatedly — candidates for workflow automation
|
|
TOOL_SEQUENCE_MIN_OCCURRENCES = 3
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Analyzers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def analyze_cron_jobs(hermes_home: str) -> List[Dict[str, Any]]:
|
|
"""Analyze cron job definitions for automation gaps."""
|
|
proposals = []
|
|
cron_dir = Path(hermes_home) / "cron"
|
|
jobs_file = cron_dir / "jobs.json"
|
|
|
|
if not jobs_file.exists():
|
|
# Try YAML format
|
|
for ext in (".yaml", ".yml"):
|
|
alt = cron_dir / f"jobs{ext}"
|
|
if alt.exists():
|
|
jobs_file = alt
|
|
break
|
|
|
|
if not jobs_file.exists():
|
|
return proposals
|
|
|
|
try:
|
|
if jobs_file.suffix == ".json":
|
|
with open(jobs_file) as f:
|
|
jobs = json.load(f)
|
|
else:
|
|
try:
|
|
import yaml
|
|
with open(jobs_file) as f:
|
|
jobs = yaml.safe_load(f)
|
|
except ImportError:
|
|
return proposals
|
|
except (json.JSONDecodeError, Exception):
|
|
return proposals
|
|
|
|
if not isinstance(jobs, list):
|
|
return proposals
|
|
|
|
# Look for disabled jobs (someone turned them off — might need a different approach)
|
|
disabled = [j for j in jobs if not j.get("enabled", True)]
|
|
if disabled:
|
|
names = [j.get("name", j.get("id", "?")) for j in disabled[:5]]
|
|
proposals.append({
|
|
"category": "cron_disabled",
|
|
"title": f"{len(disabled)} disabled cron job(s) may need automation rework",
|
|
"description": f"These jobs were disabled: {', '.join(names)}. Investigate why and whether a different automation approach is needed.",
|
|
"confidence": 0.7,
|
|
"impact": "medium",
|
|
"sources": [str(jobs_file)],
|
|
})
|
|
|
|
# Look for jobs with high error counts
|
|
error_jobs = [j for j in jobs if j.get("last_status") == "error"]
|
|
if error_jobs:
|
|
names = [j.get("name", j.get("id", "?")) for j in error_jobs[:5]]
|
|
proposals.append({
|
|
"category": "cron_errors",
|
|
"title": f"{len(error_jobs)} cron job(s) failing — may need automation rework",
|
|
"description": f"Jobs with errors: {', '.join(names)}. Failure patterns suggest missing dependencies or fragile automation.",
|
|
"confidence": 0.8,
|
|
"impact": "high",
|
|
"sources": [str(jobs_file)],
|
|
})
|
|
|
|
# Look for jobs with delivery errors (platform issues)
|
|
delivery_errors = [j for j in jobs if j.get("last_delivery_error")]
|
|
if delivery_errors:
|
|
proposals.append({
|
|
"category": "cron_delivery",
|
|
"title": f"{len(delivery_errors)} cron job(s) have delivery failures",
|
|
"description": "Delivery failures suggest missing retry logic or platform integration gaps.",
|
|
"confidence": 0.75,
|
|
"impact": "medium",
|
|
"sources": [str(jobs_file)],
|
|
})
|
|
|
|
# Look for jobs on short intervals that could be event-driven
|
|
for job in jobs:
|
|
schedule = job.get("schedule", "")
|
|
# Check for very frequent schedules (every minute, every 5 min)
|
|
if isinstance(schedule, str) and re.match(r"^\*\/([1-5])\s", schedule):
|
|
proposals.append({
|
|
"category": "cron_frequency",
|
|
"title": f"Job '{job.get('name', job.get('id', '?'))}' runs every {schedule.split()[0]} — consider event-driven",
|
|
"description": f"High-frequency cron ({schedule}) may be better as event-driven or daemon.",
|
|
"confidence": 0.6,
|
|
"impact": "low",
|
|
"sources": [str(jobs_file)],
|
|
})
|
|
|
|
return proposals
|
|
|
|
|
|
def analyze_documents(root_dirs: List[str]) -> List[Dict[str, Any]]:
|
|
"""Scan documentation for manual step patterns."""
|
|
proposals = []
|
|
doc_extensions = {".md", ".txt", ".rst", ".adoc"}
|
|
findings_by_category = defaultdict(list)
|
|
|
|
for root_dir in root_dirs:
|
|
root = Path(root_dir)
|
|
if not root.exists():
|
|
continue
|
|
|
|
for path in root.rglob("*"):
|
|
if path.is_dir():
|
|
continue
|
|
if path.suffix not in doc_extensions:
|
|
continue
|
|
# Skip excluded dirs and hidden dirs
|
|
parts = path.relative_to(root).parts if root in path.parents or root == path.parent else path.parts
|
|
if any(p.startswith(".") or p in EXCLUDE_DIRS for p in parts):
|
|
continue
|
|
if len(parts) > 8:
|
|
continue
|
|
|
|
try:
|
|
content = path.read_text(errors="replace")
|
|
except (PermissionError, OSError):
|
|
continue
|
|
|
|
lines = content.split("\n")
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
if not stripped:
|
|
continue
|
|
|
|
for pattern, category in MANUAL_STEP_PATTERNS:
|
|
if re.search(pattern, stripped):
|
|
findings_by_category[category].append({
|
|
"file": str(path),
|
|
"line": i + 1,
|
|
"text": stripped[:200],
|
|
})
|
|
|
|
# Generate proposals from findings
|
|
for category, findings in findings_by_category.items():
|
|
if len(findings) < 2:
|
|
continue
|
|
|
|
file_count = len(set(f["file"] for f in findings))
|
|
proposals.append({
|
|
"category": f"manual_{category}",
|
|
"title": f"{len(findings)} '{category}' markers across {file_count} doc(s)",
|
|
"description": f"Found in: {', '.join(set(Path(f['file']).name for f in findings[:5]))}",
|
|
"confidence": 0.65,
|
|
"impact": "medium",
|
|
"sources": list(set(f["file"] for f in findings[:10])),
|
|
"details": findings[:5], # sample
|
|
})
|
|
|
|
return proposals
|
|
|
|
|
|
def analyze_scripts(root_dirs: List[str]) -> List[Dict[str, Any]]:
|
|
"""Detect repeated command sequences in scripts."""
|
|
proposals = []
|
|
script_extensions = {".py", ".sh", ".bash", ".zsh"}
|
|
command_counter = Counter()
|
|
command_locations = defaultdict(list)
|
|
|
|
for root_dir in root_dirs:
|
|
root = Path(root_dir)
|
|
if not root.exists():
|
|
continue
|
|
|
|
for path in root.rglob("*"):
|
|
if path.is_dir():
|
|
continue
|
|
if path.suffix not in script_extensions:
|
|
continue
|
|
parts = path.relative_to(root).parts if root in path.parents or root == path.parent else path.parts
|
|
if any(p.startswith(".") or p in EXCLUDE_DIRS for p in parts):
|
|
continue
|
|
if len(parts) > 8:
|
|
continue
|
|
|
|
try:
|
|
content = path.read_text(errors="replace")
|
|
except (PermissionError, OSError):
|
|
continue
|
|
|
|
lines = content.split("\n")
|
|
for i, line in enumerate(lines):
|
|
stripped = line.strip()
|
|
if not stripped or stripped.startswith("#"):
|
|
continue
|
|
|
|
for cmd_prefix in SHELL_COMMAND_PATTERNS:
|
|
if cmd_prefix in stripped:
|
|
# Normalize the command
|
|
normalized = re.sub(r"\s+", " ", stripped)[:120]
|
|
command_counter[normalized] += 1
|
|
command_locations[normalized].append(f"{path}:{i+1}")
|
|
|
|
# Proposals for commands appearing 3+ times
|
|
for cmd, count in command_counter.most_common(20):
|
|
if count < 3:
|
|
break
|
|
locs = command_locations[cmd]
|
|
file_count = len(set(loc.split(":")[0] for loc in locs))
|
|
proposals.append({
|
|
"category": "repeated_command",
|
|
"title": f"Command repeated {count}x across {file_count} file(s): {cmd[:80]}",
|
|
"description": f"Locations: {', '.join(locs[:3])}",
|
|
"confidence": min(0.5 + (count * 0.1), 0.95),
|
|
"impact": "medium",
|
|
"sources": list(set(loc.split(":")[0] for loc in locs)),
|
|
})
|
|
|
|
return proposals
|
|
|
|
|
|
def analyze_session_transcripts(session_dirs: List[str]) -> List[Dict[str, Any]]:
|
|
"""Find repeated tool-call patterns in session transcripts."""
|
|
proposals = []
|
|
tool_sequence_counter = Counter()
|
|
tool_sequence_examples = {}
|
|
|
|
for session_dir in session_dirs:
|
|
session_path = Path(session_dir)
|
|
if not session_path.exists():
|
|
continue
|
|
|
|
for path in session_path.rglob("*.jsonl"):
|
|
try:
|
|
content = path.read_text(errors="replace")
|
|
except (PermissionError, OSError):
|
|
continue
|
|
|
|
# Extract tool calls in sequence
|
|
tool_sequence = []
|
|
for line in content.split("\n"):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
msg = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
# Look for tool calls in assistant messages
|
|
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
|
for tc in msg["tool_calls"]:
|
|
func_name = tc.get("function", {}).get("name", "?")
|
|
tool_sequence.append(func_name)
|
|
|
|
# Find 2-call sequences
|
|
for i in range(len(tool_sequence) - 1):
|
|
seq = (tool_sequence[i], tool_sequence[i + 1])
|
|
tool_sequence_counter[seq] += 1
|
|
if seq not in tool_sequence_examples:
|
|
tool_sequence_examples[seq] = str(path.name)
|
|
|
|
# Find 3-call sequences
|
|
for i in range(len(tool_sequence) - 2):
|
|
seq = (tool_sequence[i], tool_sequence[i + 1], tool_sequence[i + 2])
|
|
tool_sequence_counter[seq] += 1
|
|
if seq not in tool_sequence_examples:
|
|
tool_sequence_examples[seq] = str(path.name)
|
|
|
|
# Generate proposals for frequently repeated sequences
|
|
for seq, count in tool_sequence_counter.most_common(20):
|
|
if count < TOOL_SEQUENCE_MIN_OCCURRENCES:
|
|
break
|
|
seq_str = " -> ".join(seq)
|
|
proposals.append({
|
|
"category": "tool_sequence",
|
|
"title": f"Tool sequence '{seq_str}' repeated {count} times",
|
|
"description": f"Consider creating a workflow/skill that automates this sequence.",
|
|
"confidence": min(0.5 + (count * 0.05), 0.9),
|
|
"impact": "medium",
|
|
"sources": [tool_sequence_examples.get(seq, "unknown")],
|
|
})
|
|
|
|
return proposals
|
|
|
|
|
|
def analyze_shell_history(root_dirs: List[str]) -> List[Dict[str, Any]]:
|
|
"""Find repeated shell commands from history files."""
|
|
proposals = []
|
|
command_counter = Counter()
|
|
|
|
for root_dir in root_dirs:
|
|
root = Path(root_dir)
|
|
history_files = []
|
|
|
|
# Look for shell history files
|
|
for name in (".bash_history", ".zsh_history", ".python_history"):
|
|
p = root / name
|
|
if p.exists():
|
|
history_files.append(p)
|
|
|
|
# Also check in hermes home
|
|
for p in root.glob("**/*history*"):
|
|
if p.is_file() and p.suffix in ("", ".txt", ".log"):
|
|
history_files.append(p)
|
|
|
|
for hf in history_files:
|
|
try:
|
|
content = hf.read_text(errors="replace")
|
|
except (PermissionError, OSError):
|
|
continue
|
|
|
|
for line in content.split("\n"):
|
|
stripped = line.strip()
|
|
# ZSH history format: ": 1234567890:0;command"
|
|
stripped = re.sub(r"^:\s*\d+:\d+;", "", stripped)
|
|
if not stripped or len(stripped) < 5:
|
|
continue
|
|
# Skip trivial commands
|
|
if stripped in ("ls", "cd", "pwd", "clear", "exit"):
|
|
continue
|
|
command_counter[stripped] += 1
|
|
|
|
for cmd, count in command_counter.most_common(10):
|
|
if count < 5:
|
|
break
|
|
proposals.append({
|
|
"category": "shell_repetition",
|
|
"title": f"Shell command run {count}+ times: {cmd[:80]}",
|
|
"description": "Frequently repeated shell command — candidate for alias, function, or script.",
|
|
"confidence": min(0.4 + (count * 0.05), 0.85),
|
|
"impact": "low",
|
|
"sources": ["shell_history"],
|
|
})
|
|
|
|
return proposals
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Proposal output
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def deduplicate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Remove duplicate proposals based on title similarity."""
|
|
seen_titles = set()
|
|
unique = []
|
|
for p in proposals:
|
|
# Normalize title for dedup
|
|
key = re.sub(r"\d+", "N", p["title"]).lower()
|
|
if key not in seen_titles:
|
|
seen_titles.add(key)
|
|
unique.append(p)
|
|
return unique
|
|
|
|
|
|
def rank_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Sort proposals by impact * confidence (highest first)."""
|
|
impact_weight = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}
|
|
return sorted(
|
|
proposals,
|
|
key=lambda p: impact_weight.get(p.get("impact", "low"), 0.2) * p.get("confidence", 0.5),
|
|
reverse=True,
|
|
)
|
|
|
|
|
|
def format_text_report(proposals: List[Dict[str, Any]]) -> str:
|
|
"""Format proposals as human-readable text."""
|
|
if not proposals:
|
|
return "No automation opportunities found."
|
|
|
|
lines = [
|
|
"=" * 70,
|
|
" AUTOMATION OPPORTUNITY REPORT",
|
|
f" Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
|
|
f" Proposals: {len(proposals)}",
|
|
"=" * 70,
|
|
"",
|
|
]
|
|
|
|
for i, p in enumerate(proposals, 1):
|
|
score = p.get("confidence", 0.5) * {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}.get(p.get("impact", "low"), 0.2)
|
|
lines.append(f"[{i}] {p['title']}")
|
|
lines.append(f" Category: {p['category']} | Impact: {p.get('impact','?')} | Confidence: {p.get('confidence',0):.0%} | Score: {score:.2f}")
|
|
lines.append(f" {p['description']}")
|
|
if p.get("sources"):
|
|
lines.append(f" Sources: {', '.join(p['sources'][:3])}")
|
|
lines.append("")
|
|
|
|
# Summary by category
|
|
cat_counts = Counter(p["category"] for p in proposals)
|
|
lines.append("-" * 70)
|
|
lines.append("Summary by category:")
|
|
for cat, count in cat_counts.most_common():
|
|
lines.append(f" {cat}: {count}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Find automation opportunities across the fleet")
|
|
parser.add_argument("--hermes-home", default=os.path.expanduser("~/.hermes"),
|
|
help="Path to hermes home directory (default: ~/.hermes)")
|
|
parser.add_argument("--scan-dirs", nargs="*",
|
|
help="Additional directories to scan (default: hermes-home + cwd)")
|
|
parser.add_argument("--session-dirs", nargs="*",
|
|
help="Session transcript directories (default: hermes-home/sessions)")
|
|
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
|
parser.add_argument("--output", "-o", help="Write proposals to file")
|
|
parser.add_argument("--min-confidence", type=float, default=0.3,
|
|
help="Minimum confidence threshold (default: 0.3)")
|
|
parser.add_argument("--categories", nargs="*",
|
|
help="Only include these categories (cron, docs, scripts, sessions, shell)")
|
|
args = parser.parse_args()
|
|
|
|
hermes_home = os.path.expanduser(args.hermes_home)
|
|
|
|
# Default scan directories
|
|
scan_dirs = [hermes_home, "."]
|
|
if args.scan_dirs:
|
|
scan_dirs.extend(args.scan_dirs)
|
|
|
|
session_dirs = [os.path.join(hermes_home, "sessions")]
|
|
if args.session_dirs:
|
|
session_dirs.extend(args.session_dirs)
|
|
|
|
# Also check common session locations
|
|
for subdir in ("transcripts", "session-db"):
|
|
p = os.path.join(hermes_home, subdir)
|
|
if os.path.isdir(p):
|
|
session_dirs.append(p)
|
|
|
|
categories = set(args.categories) if args.categories else {"cron", "docs", "scripts", "sessions", "shell"}
|
|
|
|
# Run analyzers
|
|
all_proposals = []
|
|
|
|
if "cron" in categories:
|
|
all_proposals.extend(analyze_cron_jobs(hermes_home))
|
|
|
|
if "docs" in categories:
|
|
all_proposals.extend(analyze_documents(scan_dirs))
|
|
|
|
if "scripts" in categories:
|
|
all_proposals.extend(analyze_scripts(scan_dirs))
|
|
|
|
if "sessions" in categories:
|
|
all_proposals.extend(analyze_session_transcripts(session_dirs))
|
|
|
|
if "shell" in categories:
|
|
all_proposals.extend(analyze_shell_history(scan_dirs))
|
|
|
|
# Deduplicate and rank
|
|
all_proposals = deduplicate_proposals(all_proposals)
|
|
all_proposals = rank_proposals(all_proposals)
|
|
|
|
# Filter by confidence
|
|
all_proposals = [p for p in all_proposals if p.get("confidence", 0) >= args.min_confidence]
|
|
|
|
# Output
|
|
output = {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"hermes_home": hermes_home,
|
|
"scan_dirs": scan_dirs,
|
|
"total_proposals": len(all_proposals),
|
|
"proposals": all_proposals,
|
|
}
|
|
|
|
if args.json:
|
|
result = json.dumps(output, indent=2)
|
|
else:
|
|
result = format_text_report(all_proposals)
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
if args.json:
|
|
json.dump(output, f, indent=2)
|
|
else:
|
|
f.write(result)
|
|
print(f"Written to {args.output}", file=sys.stderr)
|
|
else:
|
|
print(result)
|
|
|
|
return 0 if all_proposals else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|