From e8359cf10a5db484672c27d454d3a6edde9679f9 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 15 Apr 2026 14:51:29 +0000 Subject: [PATCH] feat: automation opportunity finder (#170) Analyzes cron jobs, docs, scripts, session transcripts, and shell history to find manual processes that could be automated. Outputs ranked proposals with confidence scores and impact ratings. --- scripts/automation_opportunity_finder.py | 543 +++++++++++++++++++++++ 1 file changed, 543 insertions(+) create mode 100644 scripts/automation_opportunity_finder.py diff --git a/scripts/automation_opportunity_finder.py b/scripts/automation_opportunity_finder.py new file mode 100644 index 0000000..dbacc3e --- /dev/null +++ b/scripts/automation_opportunity_finder.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +""" +Automation Opportunity Finder — Scan fleet for manual processes that could be automated. + +Analyzes: +1. Cron jobs — finds manual steps between scheduled tasks +2. Documentation — extracts TODO/FIXME/manual-step patterns +3. Scripts — detects repeated command sequences +4. Session transcripts — finds repeated tool-call patterns + +Usage: + python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes + python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --json + python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --output proposals.json +""" + +import argparse +import json +import os +import re +import sys +from collections import Counter, defaultdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional + + +# --------------------------------------------------------------------------- +# Patterns that signal manual work +# --------------------------------------------------------------------------- + +MANUAL_STEP_PATTERNS = [ + # Explicit manual markers + (r"(?i)\bTODO[:\s]", "todo"), + (r"(?i)\bFIXME[:\s]", "fixme"), + (r"(?i)\bMANUAL[:\s]", "manual_step"), + (r"(?i)\bHACK[:\s]", "hack"), + (r"(?i)\bWORKAROUND[:\s]", "workaround"), + # Step-by-step instructions in docs + (r"(?i)^(\d+)[.\)]\s+(run|execute|ssh|scp|curl|cd|make|docker|ansible|git)", "sequential_step"), + # Explicitly manual operations + (r"(?i)\bmanually\b", "manual_keyword"), + (r"(?i)\bby hand\b", "manual_keyword"), + (r"(?i)\bdon\'?t forget to\b", "manual_keyword"), + (r"(?i)\bremember to\b", "manual_keyword"), + (r"(?i)\bmake sure to\b", "manual_keyword"), +] + +# Shell commands that appear frequently in runbooks — signal automatable workflows +SHELL_COMMAND_PATTERNS = [ + "ssh ", "scp ", "rsync ", "curl ", "wget ", + "docker ", "docker-compose ", "kubectl ", + "ansible-playbook ", "terraform ", "systemctl ", + "systemctl restart", "systemctl status", + "git push", "git pull", "git merge", "git checkout", + "pip install", "npm install", "cargo build", +] + +# Session tool calls that appear repeatedly — candidates for workflow automation +TOOL_SEQUENCE_MIN_OCCURRENCES = 3 + + +# --------------------------------------------------------------------------- +# Analyzers +# --------------------------------------------------------------------------- + +def analyze_cron_jobs(hermes_home: str) -> List[Dict[str, Any]]: + """Analyze cron job definitions for automation gaps.""" + proposals = [] + cron_dir = Path(hermes_home) / "cron" + jobs_file = cron_dir / "jobs.json" + + if not jobs_file.exists(): + # Try YAML format + for ext in (".yaml", ".yml"): + alt = cron_dir / f"jobs{ext}" + if alt.exists(): + jobs_file = alt + break + + if not jobs_file.exists(): + return proposals + + try: + if jobs_file.suffix == ".json": + with open(jobs_file) as f: + jobs = json.load(f) + else: + try: + import yaml + with open(jobs_file) as f: + jobs = yaml.safe_load(f) + except ImportError: + return proposals + except (json.JSONDecodeError, Exception): + return proposals + + if not isinstance(jobs, list): + return proposals + + # Look for disabled jobs (someone turned them off — might need a different approach) + disabled = [j for j in jobs if not j.get("enabled", True)] + if disabled: + names = [j.get("name", j.get("id", "?")) for j in disabled[:5]] + proposals.append({ + "category": "cron_disabled", + "title": f"{len(disabled)} disabled cron job(s) may need automation rework", + "description": f"These jobs were disabled: {', '.join(names)}. Investigate why and whether a different automation approach is needed.", + "confidence": 0.7, + "impact": "medium", + "sources": [str(jobs_file)], + }) + + # Look for jobs with high error counts + error_jobs = [j for j in jobs if j.get("last_status") == "error"] + if error_jobs: + names = [j.get("name", j.get("id", "?")) for j in error_jobs[:5]] + proposals.append({ + "category": "cron_errors", + "title": f"{len(error_jobs)} cron job(s) failing — may need automation rework", + "description": f"Jobs with errors: {', '.join(names)}. Failure patterns suggest missing dependencies or fragile automation.", + "confidence": 0.8, + "impact": "high", + "sources": [str(jobs_file)], + }) + + # Look for jobs with delivery errors (platform issues) + delivery_errors = [j for j in jobs if j.get("last_delivery_error")] + if delivery_errors: + proposals.append({ + "category": "cron_delivery", + "title": f"{len(delivery_errors)} cron job(s) have delivery failures", + "description": "Delivery failures suggest missing retry logic or platform integration gaps.", + "confidence": 0.75, + "impact": "medium", + "sources": [str(jobs_file)], + }) + + # Look for jobs on short intervals that could be event-driven + for job in jobs: + schedule = job.get("schedule", "") + # Check for very frequent schedules (every minute, every 5 min) + if isinstance(schedule, str) and re.match(r"^\*\/([1-5])\s", schedule): + proposals.append({ + "category": "cron_frequency", + "title": f"Job '{job.get('name', job.get('id', '?'))}' runs every {schedule.split()[0]} — consider event-driven", + "description": f"High-frequency cron ({schedule}) may be better as event-driven or daemon.", + "confidence": 0.6, + "impact": "low", + "sources": [str(jobs_file)], + }) + + return proposals + + +def analyze_documents(root_dirs: List[str]) -> List[Dict[str, Any]]: + """Scan documentation for manual step patterns.""" + proposals = [] + doc_extensions = {".md", ".txt", ".rst", ".adoc"} + findings_by_category = defaultdict(list) + + for root_dir in root_dirs: + root = Path(root_dir) + if not root.exists(): + continue + + for path in root.rglob("*"): + if path.is_dir(): + continue + if path.suffix not in doc_extensions: + continue + # Skip hidden dirs and common non-docs + if any(part.startswith(".") for part in path.parts): + continue + if "node_modules" in str(path) or "venv" in str(path): + continue + + try: + content = path.read_text(errors="replace") + except (PermissionError, OSError): + continue + + lines = content.split("\n") + for i, line in enumerate(lines): + stripped = line.strip() + if not stripped: + continue + + for pattern, category in MANUAL_STEP_PATTERNS: + if re.search(pattern, stripped): + findings_by_category[category].append({ + "file": str(path), + "line": i + 1, + "text": stripped[:200], + }) + + # Generate proposals from findings + for category, findings in findings_by_category.items(): + if len(findings) < 2: + continue + + file_count = len(set(f["file"] for f in findings)) + proposals.append({ + "category": f"manual_{category}", + "title": f"{len(findings)} '{category}' markers across {file_count} doc(s)", + "description": f"Found in: {', '.join(set(Path(f['file']).name for f in findings[:5]))}", + "confidence": 0.65, + "impact": "medium", + "sources": list(set(f["file"] for f in findings[:10])), + "details": findings[:5], # sample + }) + + return proposals + + +def analyze_scripts(root_dirs: List[str]) -> List[Dict[str, Any]]: + """Detect repeated command sequences in scripts.""" + proposals = [] + script_extensions = {".py", ".sh", ".bash", ".zsh"} + command_counter = Counter() + command_locations = defaultdict(list) + + for root_dir in root_dirs: + root = Path(root_dir) + if not root.exists(): + continue + + for path in root.rglob("*"): + if path.is_dir(): + continue + if path.suffix not in script_extensions: + continue + if any(part.startswith(".") for part in path.parts): + continue + if "node_modules" in str(path) or "venv" in str(path) or "__pycache__" in str(path): + continue + + try: + content = path.read_text(errors="replace") + except (PermissionError, OSError): + continue + + lines = content.split("\n") + for i, line in enumerate(lines): + stripped = line.strip() + if not stripped or stripped.startswith("#"): + continue + + for cmd_prefix in SHELL_COMMAND_PATTERNS: + if cmd_prefix in stripped: + # Normalize the command + normalized = re.sub(r"\s+", " ", stripped)[:120] + command_counter[normalized] += 1 + command_locations[normalized].append(f"{path}:{i+1}") + + # Proposals for commands appearing 3+ times + for cmd, count in command_counter.most_common(20): + if count < 3: + break + locs = command_locations[cmd] + file_count = len(set(loc.split(":")[0] for loc in locs)) + proposals.append({ + "category": "repeated_command", + "title": f"Command repeated {count}x across {file_count} file(s): {cmd[:80]}", + "description": f"Locations: {', '.join(locs[:3])}", + "confidence": min(0.5 + (count * 0.1), 0.95), + "impact": "medium", + "sources": list(set(loc.split(":")[0] for loc in locs)), + }) + + return proposals + + +def analyze_session_transcripts(session_dirs: List[str]) -> List[Dict[str, Any]]: + """Find repeated tool-call patterns in session transcripts.""" + proposals = [] + tool_sequence_counter = Counter() + tool_sequence_examples = {} + + for session_dir in session_dirs: + session_path = Path(session_dir) + if not session_path.exists(): + continue + + for path in session_path.rglob("*.jsonl"): + try: + content = path.read_text(errors="replace") + except (PermissionError, OSError): + continue + + # Extract tool calls in sequence + tool_sequence = [] + for line in content.split("\n"): + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + except json.JSONDecodeError: + continue + + # Look for tool calls in assistant messages + if msg.get("role") == "assistant" and msg.get("tool_calls"): + for tc in msg["tool_calls"]: + func_name = tc.get("function", {}).get("name", "?") + tool_sequence.append(func_name) + + # Find 2-call sequences + for i in range(len(tool_sequence) - 1): + seq = (tool_sequence[i], tool_sequence[i + 1]) + tool_sequence_counter[seq] += 1 + if seq not in tool_sequence_examples: + tool_sequence_examples[seq] = str(path.name) + + # Find 3-call sequences + for i in range(len(tool_sequence) - 2): + seq = (tool_sequence[i], tool_sequence[i + 1], tool_sequence[i + 2]) + tool_sequence_counter[seq] += 1 + if seq not in tool_sequence_examples: + tool_sequence_examples[seq] = str(path.name) + + # Generate proposals for frequently repeated sequences + for seq, count in tool_sequence_counter.most_common(20): + if count < TOOL_SEQUENCE_MIN_OCCURRENCES: + break + seq_str = " -> ".join(seq) + proposals.append({ + "category": "tool_sequence", + "title": f"Tool sequence '{seq_str}' repeated {count} times", + "description": f"Consider creating a workflow/skill that automates this sequence.", + "confidence": min(0.5 + (count * 0.05), 0.9), + "impact": "medium", + "sources": [tool_sequence_examples.get(seq, "unknown")], + }) + + return proposals + + +def analyze_shell_history(root_dirs: List[str]) -> List[Dict[str, Any]]: + """Find repeated shell commands from history files.""" + proposals = [] + command_counter = Counter() + + for root_dir in root_dirs: + root = Path(root_dir) + history_files = [] + + # Look for shell history files + for name in (".bash_history", ".zsh_history", ".python_history"): + p = root / name + if p.exists(): + history_files.append(p) + + # Also check in hermes home + for p in root.glob("**/*history*"): + if p.is_file() and p.suffix in ("", ".txt", ".log"): + history_files.append(p) + + for hf in history_files: + try: + content = hf.read_text(errors="replace") + except (PermissionError, OSError): + continue + + for line in content.split("\n"): + stripped = line.strip() + # ZSH history format: ": 1234567890:0;command" + stripped = re.sub(r"^:\s*\d+:\d+;", "", stripped) + if not stripped or len(stripped) < 5: + continue + # Skip trivial commands + if stripped in ("ls", "cd", "pwd", "clear", "exit"): + continue + command_counter[stripped] += 1 + + for cmd, count in command_counter.most_common(10): + if count < 5: + break + proposals.append({ + "category": "shell_repetition", + "title": f"Shell command run {count}+ times: {cmd[:80]}", + "description": "Frequently repeated shell command — candidate for alias, function, or script.", + "confidence": min(0.4 + (count * 0.05), 0.85), + "impact": "low", + "sources": ["shell_history"], + }) + + return proposals + + +# --------------------------------------------------------------------------- +# Proposal output +# --------------------------------------------------------------------------- + +def deduplicate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Remove duplicate proposals based on title similarity.""" + seen_titles = set() + unique = [] + for p in proposals: + # Normalize title for dedup + key = re.sub(r"\d+", "N", p["title"]).lower() + if key not in seen_titles: + seen_titles.add(key) + unique.append(p) + return unique + + +def rank_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Sort proposals by impact * confidence (highest first).""" + impact_weight = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2} + return sorted( + proposals, + key=lambda p: impact_weight.get(p.get("impact", "low"), 0.2) * p.get("confidence", 0.5), + reverse=True, + ) + + +def format_text_report(proposals: List[Dict[str, Any]]) -> str: + """Format proposals as human-readable text.""" + if not proposals: + return "No automation opportunities found." + + lines = [ + "=" * 70, + " AUTOMATION OPPORTUNITY REPORT", + f" Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}", + f" Proposals: {len(proposals)}", + "=" * 70, + "", + ] + + for i, p in enumerate(proposals, 1): + score = p.get("confidence", 0.5) * {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}.get(p.get("impact", "low"), 0.2) + lines.append(f"[{i}] {p['title']}") + lines.append(f" Category: {p['category']} | Impact: {p.get('impact','?')} | Confidence: {p.get('confidence',0):.0%} | Score: {score:.2f}") + lines.append(f" {p['description']}") + if p.get("sources"): + lines.append(f" Sources: {', '.join(p['sources'][:3])}") + lines.append("") + + # Summary by category + cat_counts = Counter(p["category"] for p in proposals) + lines.append("-" * 70) + lines.append("Summary by category:") + for cat, count in cat_counts.most_common(): + lines.append(f" {cat}: {count}") + + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +def main(): + parser = argparse.ArgumentParser(description="Find automation opportunities across the fleet") + parser.add_argument("--hermes-home", default=os.path.expanduser("~/.hermes"), + help="Path to hermes home directory (default: ~/.hermes)") + parser.add_argument("--scan-dirs", nargs="*", + help="Additional directories to scan (default: hermes-home + cwd)") + parser.add_argument("--session-dirs", nargs="*", + help="Session transcript directories (default: hermes-home/sessions)") + parser.add_argument("--json", action="store_true", help="Output as JSON") + parser.add_argument("--output", "-o", help="Write proposals to file") + parser.add_argument("--min-confidence", type=float, default=0.3, + help="Minimum confidence threshold (default: 0.3)") + parser.add_argument("--categories", nargs="*", + help="Only include these categories (cron, docs, scripts, sessions, shell)") + args = parser.parse_args() + + hermes_home = os.path.expanduser(args.hermes_home) + + # Default scan directories + scan_dirs = [hermes_home, "."] + if args.scan_dirs: + scan_dirs.extend(args.scan_dirs) + + session_dirs = [os.path.join(hermes_home, "sessions")] + if args.session_dirs: + session_dirs.extend(args.session_dirs) + + # Also check common session locations + for subdir in ("transcripts", "session-db"): + p = os.path.join(hermes_home, subdir) + if os.path.isdir(p): + session_dirs.append(p) + + categories = set(args.categories) if args.categories else {"cron", "docs", "scripts", "sessions", "shell"} + + # Run analyzers + all_proposals = [] + + if "cron" in categories: + all_proposals.extend(analyze_cron_jobs(hermes_home)) + + if "docs" in categories: + all_proposals.extend(analyze_documents(scan_dirs)) + + if "scripts" in categories: + all_proposals.extend(analyze_scripts(scan_dirs)) + + if "sessions" in categories: + all_proposals.extend(analyze_session_transcripts(session_dirs)) + + if "shell" in categories: + all_proposals.extend(analyze_shell_history(scan_dirs)) + + # Deduplicate and rank + all_proposals = deduplicate_proposals(all_proposals) + all_proposals = rank_proposals(all_proposals) + + # Filter by confidence + all_proposals = [p for p in all_proposals if p.get("confidence", 0) >= args.min_confidence] + + # Output + output = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "hermes_home": hermes_home, + "scan_dirs": scan_dirs, + "total_proposals": len(all_proposals), + "proposals": all_proposals, + } + + if args.json: + result = json.dumps(output, indent=2) + else: + result = format_text_report(all_proposals) + + if args.output: + with open(args.output, "w") as f: + if args.json: + json.dump(output, f, indent=2) + else: + f.write(result) + print(f"Written to {args.output}", file=sys.stderr) + else: + print(result) + + return 0 if all_proposals else 1 + + +if __name__ == "__main__": + sys.exit(main())