#!/usr/bin/env python3 """ Automation Opportunity Finder — Scan fleet for manual processes that could be automated. Analyzes: 1. Cron jobs — finds manual steps between scheduled tasks 2. Documentation — extracts TODO/FIXME/manual-step patterns 3. Scripts — detects repeated command sequences 4. Session transcripts — finds repeated tool-call patterns Usage: python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --json python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --output proposals.json """ import argparse import json import os import re import sys from collections import Counter, defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional # --------------------------------------------------------------------------- # Patterns that signal manual work # --------------------------------------------------------------------------- MANUAL_STEP_PATTERNS = [ # Explicit manual markers (r"(?i)\bTODO[:\s]", "todo"), (r"(?i)\bFIXME[:\s]", "fixme"), (r"(?i)\bMANUAL[:\s]", "manual_step"), (r"(?i)\bHACK[:\s]", "hack"), (r"(?i)\bWORKAROUND[:\s]", "workaround"), # Step-by-step instructions in docs (r"(?i)^(\d+)[.\)]\s+(run|execute|ssh|scp|curl|cd|make|docker|ansible|git)", "sequential_step"), # Explicitly manual operations (r"(?i)\bmanually\b", "manual_keyword"), (r"(?i)\bby hand\b", "manual_keyword"), (r"(?i)\bdon\'?t forget to\b", "manual_keyword"), (r"(?i)\bremember to\b", "manual_keyword"), (r"(?i)\bmake sure to\b", "manual_keyword"), ] # Shell commands that appear frequently in runbooks — signal automatable workflows SHELL_COMMAND_PATTERNS = [ "ssh ", "scp ", "rsync ", "curl ", "wget ", "docker ", "docker-compose ", "kubectl ", "ansible-playbook ", "terraform ", "systemctl ", "systemctl restart", "systemctl status", "git push", "git pull", "git merge", "git checkout", "pip install", "npm install", "cargo build", ] # Directories to skip during scans — large/uninteresting trees EXCLUDE_DIRS = frozenset({ "node_modules", "venv", ".venv", "__pycache__", ".git", "site-packages", "dist", "build", ".tox", ".mypy_cache", ".pytest_cache", "coverage", ".next", "vendor", "skills", # hermes skills dir is huge "audio_cache", "skins", "profiles", }) # Session tool calls that appear repeatedly — candidates for workflow automation TOOL_SEQUENCE_MIN_OCCURRENCES = 3 # --------------------------------------------------------------------------- # Analyzers # --------------------------------------------------------------------------- def analyze_cron_jobs(hermes_home: str) -> List[Dict[str, Any]]: """Analyze cron job definitions for automation gaps.""" proposals = [] cron_dir = Path(hermes_home) / "cron" jobs_file = cron_dir / "jobs.json" if not jobs_file.exists(): # Try YAML format for ext in (".yaml", ".yml"): alt = cron_dir / f"jobs{ext}" if alt.exists(): jobs_file = alt break if not jobs_file.exists(): return proposals try: if jobs_file.suffix == ".json": with open(jobs_file) as f: jobs = json.load(f) else: try: import yaml with open(jobs_file) as f: jobs = yaml.safe_load(f) except ImportError: return proposals except (json.JSONDecodeError, Exception): return proposals if not isinstance(jobs, list): return proposals # Look for disabled jobs (someone turned them off — might need a different approach) disabled = [j for j in jobs if not j.get("enabled", True)] if disabled: names = [j.get("name", j.get("id", "?")) for j in disabled[:5]] proposals.append({ "category": "cron_disabled", "title": f"{len(disabled)} disabled cron job(s) may need automation rework", "description": f"These jobs were disabled: {', '.join(names)}. Investigate why and whether a different automation approach is needed.", "confidence": 0.7, "impact": "medium", "sources": [str(jobs_file)], }) # Look for jobs with high error counts error_jobs = [j for j in jobs if j.get("last_status") == "error"] if error_jobs: names = [j.get("name", j.get("id", "?")) for j in error_jobs[:5]] proposals.append({ "category": "cron_errors", "title": f"{len(error_jobs)} cron job(s) failing — may need automation rework", "description": f"Jobs with errors: {', '.join(names)}. Failure patterns suggest missing dependencies or fragile automation.", "confidence": 0.8, "impact": "high", "sources": [str(jobs_file)], }) # Look for jobs with delivery errors (platform issues) delivery_errors = [j for j in jobs if j.get("last_delivery_error")] if delivery_errors: proposals.append({ "category": "cron_delivery", "title": f"{len(delivery_errors)} cron job(s) have delivery failures", "description": "Delivery failures suggest missing retry logic or platform integration gaps.", "confidence": 0.75, "impact": "medium", "sources": [str(jobs_file)], }) # Look for jobs on short intervals that could be event-driven for job in jobs: schedule = job.get("schedule", "") # Check for very frequent schedules (every minute, every 5 min) if isinstance(schedule, str) and re.match(r"^\*\/([1-5])\s", schedule): proposals.append({ "category": "cron_frequency", "title": f"Job '{job.get('name', job.get('id', '?'))}' runs every {schedule.split()[0]} — consider event-driven", "description": f"High-frequency cron ({schedule}) may be better as event-driven or daemon.", "confidence": 0.6, "impact": "low", "sources": [str(jobs_file)], }) return proposals def analyze_documents(root_dirs: List[str]) -> List[Dict[str, Any]]: """Scan documentation for manual step patterns.""" proposals = [] doc_extensions = {".md", ".txt", ".rst", ".adoc"} findings_by_category = defaultdict(list) for root_dir in root_dirs: root = Path(root_dir) if not root.exists(): continue for path in root.rglob("*"): if path.is_dir(): continue if path.suffix not in doc_extensions: continue # Skip excluded dirs and hidden dirs parts = path.relative_to(root).parts if root in path.parents or root == path.parent else path.parts if any(p.startswith(".") or p in EXCLUDE_DIRS for p in parts): continue if len(parts) > 8: continue try: content = path.read_text(errors="replace") except (PermissionError, OSError): continue lines = content.split("\n") for i, line in enumerate(lines): stripped = line.strip() if not stripped: continue for pattern, category in MANUAL_STEP_PATTERNS: if re.search(pattern, stripped): findings_by_category[category].append({ "file": str(path), "line": i + 1, "text": stripped[:200], }) # Generate proposals from findings for category, findings in findings_by_category.items(): if len(findings) < 2: continue file_count = len(set(f["file"] for f in findings)) proposals.append({ "category": f"manual_{category}", "title": f"{len(findings)} '{category}' markers across {file_count} doc(s)", "description": f"Found in: {', '.join(set(Path(f['file']).name for f in findings[:5]))}", "confidence": 0.65, "impact": "medium", "sources": list(set(f["file"] for f in findings[:10])), "details": findings[:5], # sample }) return proposals def analyze_scripts(root_dirs: List[str]) -> List[Dict[str, Any]]: """Detect repeated command sequences in scripts.""" proposals = [] script_extensions = {".py", ".sh", ".bash", ".zsh"} command_counter = Counter() command_locations = defaultdict(list) for root_dir in root_dirs: root = Path(root_dir) if not root.exists(): continue for path in root.rglob("*"): if path.is_dir(): continue if path.suffix not in script_extensions: continue parts = path.relative_to(root).parts if root in path.parents or root == path.parent else path.parts if any(p.startswith(".") or p in EXCLUDE_DIRS for p in parts): continue if len(parts) > 8: continue try: content = path.read_text(errors="replace") except (PermissionError, OSError): continue lines = content.split("\n") for i, line in enumerate(lines): stripped = line.strip() if not stripped or stripped.startswith("#"): continue for cmd_prefix in SHELL_COMMAND_PATTERNS: if cmd_prefix in stripped: # Normalize the command normalized = re.sub(r"\s+", " ", stripped)[:120] command_counter[normalized] += 1 command_locations[normalized].append(f"{path}:{i+1}") # Proposals for commands appearing 3+ times for cmd, count in command_counter.most_common(20): if count < 3: break locs = command_locations[cmd] file_count = len(set(loc.split(":")[0] for loc in locs)) proposals.append({ "category": "repeated_command", "title": f"Command repeated {count}x across {file_count} file(s): {cmd[:80]}", "description": f"Locations: {', '.join(locs[:3])}", "confidence": min(0.5 + (count * 0.1), 0.95), "impact": "medium", "sources": list(set(loc.split(":")[0] for loc in locs)), }) return proposals def analyze_session_transcripts(session_dirs: List[str]) -> List[Dict[str, Any]]: """Find repeated tool-call patterns in session transcripts.""" proposals = [] tool_sequence_counter = Counter() tool_sequence_examples = {} for session_dir in session_dirs: session_path = Path(session_dir) if not session_path.exists(): continue for path in session_path.rglob("*.jsonl"): try: content = path.read_text(errors="replace") except (PermissionError, OSError): continue # Extract tool calls in sequence tool_sequence = [] for line in content.split("\n"): line = line.strip() if not line: continue try: msg = json.loads(line) except json.JSONDecodeError: continue # Look for tool calls in assistant messages if msg.get("role") == "assistant" and msg.get("tool_calls"): for tc in msg["tool_calls"]: func_name = tc.get("function", {}).get("name", "?") tool_sequence.append(func_name) # Find 2-call sequences for i in range(len(tool_sequence) - 1): seq = (tool_sequence[i], tool_sequence[i + 1]) tool_sequence_counter[seq] += 1 if seq not in tool_sequence_examples: tool_sequence_examples[seq] = str(path.name) # Find 3-call sequences for i in range(len(tool_sequence) - 2): seq = (tool_sequence[i], tool_sequence[i + 1], tool_sequence[i + 2]) tool_sequence_counter[seq] += 1 if seq not in tool_sequence_examples: tool_sequence_examples[seq] = str(path.name) # Generate proposals for frequently repeated sequences for seq, count in tool_sequence_counter.most_common(20): if count < TOOL_SEQUENCE_MIN_OCCURRENCES: break seq_str = " -> ".join(seq) proposals.append({ "category": "tool_sequence", "title": f"Tool sequence '{seq_str}' repeated {count} times", "description": f"Consider creating a workflow/skill that automates this sequence.", "confidence": min(0.5 + (count * 0.05), 0.9), "impact": "medium", "sources": [tool_sequence_examples.get(seq, "unknown")], }) return proposals def analyze_shell_history(root_dirs: List[str]) -> List[Dict[str, Any]]: """Find repeated shell commands from history files.""" proposals = [] command_counter = Counter() for root_dir in root_dirs: root = Path(root_dir) history_files = [] # Look for shell history files for name in (".bash_history", ".zsh_history", ".python_history"): p = root / name if p.exists(): history_files.append(p) # Also check in hermes home for p in root.glob("**/*history*"): if p.is_file() and p.suffix in ("", ".txt", ".log"): history_files.append(p) for hf in history_files: try: content = hf.read_text(errors="replace") except (PermissionError, OSError): continue for line in content.split("\n"): stripped = line.strip() # ZSH history format: ": 1234567890:0;command" stripped = re.sub(r"^:\s*\d+:\d+;", "", stripped) if not stripped or len(stripped) < 5: continue # Skip trivial commands if stripped in ("ls", "cd", "pwd", "clear", "exit"): continue command_counter[stripped] += 1 for cmd, count in command_counter.most_common(10): if count < 5: break proposals.append({ "category": "shell_repetition", "title": f"Shell command run {count}+ times: {cmd[:80]}", "description": "Frequently repeated shell command — candidate for alias, function, or script.", "confidence": min(0.4 + (count * 0.05), 0.85), "impact": "low", "sources": ["shell_history"], }) return proposals # --------------------------------------------------------------------------- # Proposal output # --------------------------------------------------------------------------- def deduplicate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Remove duplicate proposals based on title similarity.""" seen_titles = set() unique = [] for p in proposals: # Normalize title for dedup key = re.sub(r"\d+", "N", p["title"]).lower() if key not in seen_titles: seen_titles.add(key) unique.append(p) return unique def rank_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Sort proposals by impact * confidence (highest first).""" impact_weight = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2} return sorted( proposals, key=lambda p: impact_weight.get(p.get("impact", "low"), 0.2) * p.get("confidence", 0.5), reverse=True, ) def format_text_report(proposals: List[Dict[str, Any]]) -> str: """Format proposals as human-readable text.""" if not proposals: return "No automation opportunities found." lines = [ "=" * 70, " AUTOMATION OPPORTUNITY REPORT", f" Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}", f" Proposals: {len(proposals)}", "=" * 70, "", ] for i, p in enumerate(proposals, 1): score = p.get("confidence", 0.5) * {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}.get(p.get("impact", "low"), 0.2) lines.append(f"[{i}] {p['title']}") lines.append(f" Category: {p['category']} | Impact: {p.get('impact','?')} | Confidence: {p.get('confidence',0):.0%} | Score: {score:.2f}") lines.append(f" {p['description']}") if p.get("sources"): lines.append(f" Sources: {', '.join(p['sources'][:3])}") lines.append("") # Summary by category cat_counts = Counter(p["category"] for p in proposals) lines.append("-" * 70) lines.append("Summary by category:") for cat, count in cat_counts.most_common(): lines.append(f" {cat}: {count}") return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser(description="Find automation opportunities across the fleet") parser.add_argument("--hermes-home", default=os.path.expanduser("~/.hermes"), help="Path to hermes home directory (default: ~/.hermes)") parser.add_argument("--scan-dirs", nargs="*", help="Additional directories to scan (default: hermes-home + cwd)") parser.add_argument("--session-dirs", nargs="*", help="Session transcript directories (default: hermes-home/sessions)") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--output", "-o", help="Write proposals to file") parser.add_argument("--min-confidence", type=float, default=0.3, help="Minimum confidence threshold (default: 0.3)") parser.add_argument("--categories", nargs="*", help="Only include these categories (cron, docs, scripts, sessions, shell)") args = parser.parse_args() hermes_home = os.path.expanduser(args.hermes_home) # Default scan directories scan_dirs = [hermes_home, "."] if args.scan_dirs: scan_dirs.extend(args.scan_dirs) session_dirs = [os.path.join(hermes_home, "sessions")] if args.session_dirs: session_dirs.extend(args.session_dirs) # Also check common session locations for subdir in ("transcripts", "session-db"): p = os.path.join(hermes_home, subdir) if os.path.isdir(p): session_dirs.append(p) categories = set(args.categories) if args.categories else {"cron", "docs", "scripts", "sessions", "shell"} # Run analyzers all_proposals = [] if "cron" in categories: all_proposals.extend(analyze_cron_jobs(hermes_home)) if "docs" in categories: all_proposals.extend(analyze_documents(scan_dirs)) if "scripts" in categories: all_proposals.extend(analyze_scripts(scan_dirs)) if "sessions" in categories: all_proposals.extend(analyze_session_transcripts(session_dirs)) if "shell" in categories: all_proposals.extend(analyze_shell_history(scan_dirs)) # Deduplicate and rank all_proposals = deduplicate_proposals(all_proposals) all_proposals = rank_proposals(all_proposals) # Filter by confidence all_proposals = [p for p in all_proposals if p.get("confidence", 0) >= args.min_confidence] # Output output = { "generated_at": datetime.now(timezone.utc).isoformat(), "hermes_home": hermes_home, "scan_dirs": scan_dirs, "total_proposals": len(all_proposals), "proposals": all_proposals, } if args.json: result = json.dumps(output, indent=2) else: result = format_text_report(all_proposals) if args.output: with open(args.output, "w") as f: if args.json: json.dump(output, f, indent=2) else: f.write(result) print(f"Written to {args.output}", file=sys.stderr) else: print(result) return 0 if all_proposals else 1 if __name__ == "__main__": sys.exit(main())