feat: automation opportunity finder (#170)
Analyzes cron jobs, docs, scripts, session transcripts, and shell history to find manual processes that could be automated. Outputs ranked proposals with confidence scores and impact ratings.
This commit is contained in:
543
scripts/automation_opportunity_finder.py
Normal file
543
scripts/automation_opportunity_finder.py
Normal file
@@ -0,0 +1,543 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Automation Opportunity Finder — Scan fleet for manual processes that could be automated.
|
||||
|
||||
Analyzes:
|
||||
1. Cron jobs — finds manual steps between scheduled tasks
|
||||
2. Documentation — extracts TODO/FIXME/manual-step patterns
|
||||
3. Scripts — detects repeated command sequences
|
||||
4. Session transcripts — finds repeated tool-call patterns
|
||||
|
||||
Usage:
|
||||
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes
|
||||
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --json
|
||||
python3 scripts/automation_opportunity_finder.py --hermes-home ~/.hermes --output proposals.json
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import Counter, defaultdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Patterns that signal manual work
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
MANUAL_STEP_PATTERNS = [
|
||||
# Explicit manual markers
|
||||
(r"(?i)\bTODO[:\s]", "todo"),
|
||||
(r"(?i)\bFIXME[:\s]", "fixme"),
|
||||
(r"(?i)\bMANUAL[:\s]", "manual_step"),
|
||||
(r"(?i)\bHACK[:\s]", "hack"),
|
||||
(r"(?i)\bWORKAROUND[:\s]", "workaround"),
|
||||
# Step-by-step instructions in docs
|
||||
(r"(?i)^(\d+)[.\)]\s+(run|execute|ssh|scp|curl|cd|make|docker|ansible|git)", "sequential_step"),
|
||||
# Explicitly manual operations
|
||||
(r"(?i)\bmanually\b", "manual_keyword"),
|
||||
(r"(?i)\bby hand\b", "manual_keyword"),
|
||||
(r"(?i)\bdon\'?t forget to\b", "manual_keyword"),
|
||||
(r"(?i)\bremember to\b", "manual_keyword"),
|
||||
(r"(?i)\bmake sure to\b", "manual_keyword"),
|
||||
]
|
||||
|
||||
# Shell commands that appear frequently in runbooks — signal automatable workflows
|
||||
SHELL_COMMAND_PATTERNS = [
|
||||
"ssh ", "scp ", "rsync ", "curl ", "wget ",
|
||||
"docker ", "docker-compose ", "kubectl ",
|
||||
"ansible-playbook ", "terraform ", "systemctl ",
|
||||
"systemctl restart", "systemctl status",
|
||||
"git push", "git pull", "git merge", "git checkout",
|
||||
"pip install", "npm install", "cargo build",
|
||||
]
|
||||
|
||||
# Session tool calls that appear repeatedly — candidates for workflow automation
|
||||
TOOL_SEQUENCE_MIN_OCCURRENCES = 3
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Analyzers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def analyze_cron_jobs(hermes_home: str) -> List[Dict[str, Any]]:
|
||||
"""Analyze cron job definitions for automation gaps."""
|
||||
proposals = []
|
||||
cron_dir = Path(hermes_home) / "cron"
|
||||
jobs_file = cron_dir / "jobs.json"
|
||||
|
||||
if not jobs_file.exists():
|
||||
# Try YAML format
|
||||
for ext in (".yaml", ".yml"):
|
||||
alt = cron_dir / f"jobs{ext}"
|
||||
if alt.exists():
|
||||
jobs_file = alt
|
||||
break
|
||||
|
||||
if not jobs_file.exists():
|
||||
return proposals
|
||||
|
||||
try:
|
||||
if jobs_file.suffix == ".json":
|
||||
with open(jobs_file) as f:
|
||||
jobs = json.load(f)
|
||||
else:
|
||||
try:
|
||||
import yaml
|
||||
with open(jobs_file) as f:
|
||||
jobs = yaml.safe_load(f)
|
||||
except ImportError:
|
||||
return proposals
|
||||
except (json.JSONDecodeError, Exception):
|
||||
return proposals
|
||||
|
||||
if not isinstance(jobs, list):
|
||||
return proposals
|
||||
|
||||
# Look for disabled jobs (someone turned them off — might need a different approach)
|
||||
disabled = [j for j in jobs if not j.get("enabled", True)]
|
||||
if disabled:
|
||||
names = [j.get("name", j.get("id", "?")) for j in disabled[:5]]
|
||||
proposals.append({
|
||||
"category": "cron_disabled",
|
||||
"title": f"{len(disabled)} disabled cron job(s) may need automation rework",
|
||||
"description": f"These jobs were disabled: {', '.join(names)}. Investigate why and whether a different automation approach is needed.",
|
||||
"confidence": 0.7,
|
||||
"impact": "medium",
|
||||
"sources": [str(jobs_file)],
|
||||
})
|
||||
|
||||
# Look for jobs with high error counts
|
||||
error_jobs = [j for j in jobs if j.get("last_status") == "error"]
|
||||
if error_jobs:
|
||||
names = [j.get("name", j.get("id", "?")) for j in error_jobs[:5]]
|
||||
proposals.append({
|
||||
"category": "cron_errors",
|
||||
"title": f"{len(error_jobs)} cron job(s) failing — may need automation rework",
|
||||
"description": f"Jobs with errors: {', '.join(names)}. Failure patterns suggest missing dependencies or fragile automation.",
|
||||
"confidence": 0.8,
|
||||
"impact": "high",
|
||||
"sources": [str(jobs_file)],
|
||||
})
|
||||
|
||||
# Look for jobs with delivery errors (platform issues)
|
||||
delivery_errors = [j for j in jobs if j.get("last_delivery_error")]
|
||||
if delivery_errors:
|
||||
proposals.append({
|
||||
"category": "cron_delivery",
|
||||
"title": f"{len(delivery_errors)} cron job(s) have delivery failures",
|
||||
"description": "Delivery failures suggest missing retry logic or platform integration gaps.",
|
||||
"confidence": 0.75,
|
||||
"impact": "medium",
|
||||
"sources": [str(jobs_file)],
|
||||
})
|
||||
|
||||
# Look for jobs on short intervals that could be event-driven
|
||||
for job in jobs:
|
||||
schedule = job.get("schedule", "")
|
||||
# Check for very frequent schedules (every minute, every 5 min)
|
||||
if isinstance(schedule, str) and re.match(r"^\*\/([1-5])\s", schedule):
|
||||
proposals.append({
|
||||
"category": "cron_frequency",
|
||||
"title": f"Job '{job.get('name', job.get('id', '?'))}' runs every {schedule.split()[0]} — consider event-driven",
|
||||
"description": f"High-frequency cron ({schedule}) may be better as event-driven or daemon.",
|
||||
"confidence": 0.6,
|
||||
"impact": "low",
|
||||
"sources": [str(jobs_file)],
|
||||
})
|
||||
|
||||
return proposals
|
||||
|
||||
|
||||
def analyze_documents(root_dirs: List[str]) -> List[Dict[str, Any]]:
|
||||
"""Scan documentation for manual step patterns."""
|
||||
proposals = []
|
||||
doc_extensions = {".md", ".txt", ".rst", ".adoc"}
|
||||
findings_by_category = defaultdict(list)
|
||||
|
||||
for root_dir in root_dirs:
|
||||
root = Path(root_dir)
|
||||
if not root.exists():
|
||||
continue
|
||||
|
||||
for path in root.rglob("*"):
|
||||
if path.is_dir():
|
||||
continue
|
||||
if path.suffix not in doc_extensions:
|
||||
continue
|
||||
# Skip hidden dirs and common non-docs
|
||||
if any(part.startswith(".") for part in path.parts):
|
||||
continue
|
||||
if "node_modules" in str(path) or "venv" in str(path):
|
||||
continue
|
||||
|
||||
try:
|
||||
content = path.read_text(errors="replace")
|
||||
except (PermissionError, OSError):
|
||||
continue
|
||||
|
||||
lines = content.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
|
||||
for pattern, category in MANUAL_STEP_PATTERNS:
|
||||
if re.search(pattern, stripped):
|
||||
findings_by_category[category].append({
|
||||
"file": str(path),
|
||||
"line": i + 1,
|
||||
"text": stripped[:200],
|
||||
})
|
||||
|
||||
# Generate proposals from findings
|
||||
for category, findings in findings_by_category.items():
|
||||
if len(findings) < 2:
|
||||
continue
|
||||
|
||||
file_count = len(set(f["file"] for f in findings))
|
||||
proposals.append({
|
||||
"category": f"manual_{category}",
|
||||
"title": f"{len(findings)} '{category}' markers across {file_count} doc(s)",
|
||||
"description": f"Found in: {', '.join(set(Path(f['file']).name for f in findings[:5]))}",
|
||||
"confidence": 0.65,
|
||||
"impact": "medium",
|
||||
"sources": list(set(f["file"] for f in findings[:10])),
|
||||
"details": findings[:5], # sample
|
||||
})
|
||||
|
||||
return proposals
|
||||
|
||||
|
||||
def analyze_scripts(root_dirs: List[str]) -> List[Dict[str, Any]]:
|
||||
"""Detect repeated command sequences in scripts."""
|
||||
proposals = []
|
||||
script_extensions = {".py", ".sh", ".bash", ".zsh"}
|
||||
command_counter = Counter()
|
||||
command_locations = defaultdict(list)
|
||||
|
||||
for root_dir in root_dirs:
|
||||
root = Path(root_dir)
|
||||
if not root.exists():
|
||||
continue
|
||||
|
||||
for path in root.rglob("*"):
|
||||
if path.is_dir():
|
||||
continue
|
||||
if path.suffix not in script_extensions:
|
||||
continue
|
||||
if any(part.startswith(".") for part in path.parts):
|
||||
continue
|
||||
if "node_modules" in str(path) or "venv" in str(path) or "__pycache__" in str(path):
|
||||
continue
|
||||
|
||||
try:
|
||||
content = path.read_text(errors="replace")
|
||||
except (PermissionError, OSError):
|
||||
continue
|
||||
|
||||
lines = content.split("\n")
|
||||
for i, line in enumerate(lines):
|
||||
stripped = line.strip()
|
||||
if not stripped or stripped.startswith("#"):
|
||||
continue
|
||||
|
||||
for cmd_prefix in SHELL_COMMAND_PATTERNS:
|
||||
if cmd_prefix in stripped:
|
||||
# Normalize the command
|
||||
normalized = re.sub(r"\s+", " ", stripped)[:120]
|
||||
command_counter[normalized] += 1
|
||||
command_locations[normalized].append(f"{path}:{i+1}")
|
||||
|
||||
# Proposals for commands appearing 3+ times
|
||||
for cmd, count in command_counter.most_common(20):
|
||||
if count < 3:
|
||||
break
|
||||
locs = command_locations[cmd]
|
||||
file_count = len(set(loc.split(":")[0] for loc in locs))
|
||||
proposals.append({
|
||||
"category": "repeated_command",
|
||||
"title": f"Command repeated {count}x across {file_count} file(s): {cmd[:80]}",
|
||||
"description": f"Locations: {', '.join(locs[:3])}",
|
||||
"confidence": min(0.5 + (count * 0.1), 0.95),
|
||||
"impact": "medium",
|
||||
"sources": list(set(loc.split(":")[0] for loc in locs)),
|
||||
})
|
||||
|
||||
return proposals
|
||||
|
||||
|
||||
def analyze_session_transcripts(session_dirs: List[str]) -> List[Dict[str, Any]]:
|
||||
"""Find repeated tool-call patterns in session transcripts."""
|
||||
proposals = []
|
||||
tool_sequence_counter = Counter()
|
||||
tool_sequence_examples = {}
|
||||
|
||||
for session_dir in session_dirs:
|
||||
session_path = Path(session_dir)
|
||||
if not session_path.exists():
|
||||
continue
|
||||
|
||||
for path in session_path.rglob("*.jsonl"):
|
||||
try:
|
||||
content = path.read_text(errors="replace")
|
||||
except (PermissionError, OSError):
|
||||
continue
|
||||
|
||||
# Extract tool calls in sequence
|
||||
tool_sequence = []
|
||||
for line in content.split("\n"):
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
msg = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
# Look for tool calls in assistant messages
|
||||
if msg.get("role") == "assistant" and msg.get("tool_calls"):
|
||||
for tc in msg["tool_calls"]:
|
||||
func_name = tc.get("function", {}).get("name", "?")
|
||||
tool_sequence.append(func_name)
|
||||
|
||||
# Find 2-call sequences
|
||||
for i in range(len(tool_sequence) - 1):
|
||||
seq = (tool_sequence[i], tool_sequence[i + 1])
|
||||
tool_sequence_counter[seq] += 1
|
||||
if seq not in tool_sequence_examples:
|
||||
tool_sequence_examples[seq] = str(path.name)
|
||||
|
||||
# Find 3-call sequences
|
||||
for i in range(len(tool_sequence) - 2):
|
||||
seq = (tool_sequence[i], tool_sequence[i + 1], tool_sequence[i + 2])
|
||||
tool_sequence_counter[seq] += 1
|
||||
if seq not in tool_sequence_examples:
|
||||
tool_sequence_examples[seq] = str(path.name)
|
||||
|
||||
# Generate proposals for frequently repeated sequences
|
||||
for seq, count in tool_sequence_counter.most_common(20):
|
||||
if count < TOOL_SEQUENCE_MIN_OCCURRENCES:
|
||||
break
|
||||
seq_str = " -> ".join(seq)
|
||||
proposals.append({
|
||||
"category": "tool_sequence",
|
||||
"title": f"Tool sequence '{seq_str}' repeated {count} times",
|
||||
"description": f"Consider creating a workflow/skill that automates this sequence.",
|
||||
"confidence": min(0.5 + (count * 0.05), 0.9),
|
||||
"impact": "medium",
|
||||
"sources": [tool_sequence_examples.get(seq, "unknown")],
|
||||
})
|
||||
|
||||
return proposals
|
||||
|
||||
|
||||
def analyze_shell_history(root_dirs: List[str]) -> List[Dict[str, Any]]:
|
||||
"""Find repeated shell commands from history files."""
|
||||
proposals = []
|
||||
command_counter = Counter()
|
||||
|
||||
for root_dir in root_dirs:
|
||||
root = Path(root_dir)
|
||||
history_files = []
|
||||
|
||||
# Look for shell history files
|
||||
for name in (".bash_history", ".zsh_history", ".python_history"):
|
||||
p = root / name
|
||||
if p.exists():
|
||||
history_files.append(p)
|
||||
|
||||
# Also check in hermes home
|
||||
for p in root.glob("**/*history*"):
|
||||
if p.is_file() and p.suffix in ("", ".txt", ".log"):
|
||||
history_files.append(p)
|
||||
|
||||
for hf in history_files:
|
||||
try:
|
||||
content = hf.read_text(errors="replace")
|
||||
except (PermissionError, OSError):
|
||||
continue
|
||||
|
||||
for line in content.split("\n"):
|
||||
stripped = line.strip()
|
||||
# ZSH history format: ": 1234567890:0;command"
|
||||
stripped = re.sub(r"^:\s*\d+:\d+;", "", stripped)
|
||||
if not stripped or len(stripped) < 5:
|
||||
continue
|
||||
# Skip trivial commands
|
||||
if stripped in ("ls", "cd", "pwd", "clear", "exit"):
|
||||
continue
|
||||
command_counter[stripped] += 1
|
||||
|
||||
for cmd, count in command_counter.most_common(10):
|
||||
if count < 5:
|
||||
break
|
||||
proposals.append({
|
||||
"category": "shell_repetition",
|
||||
"title": f"Shell command run {count}+ times: {cmd[:80]}",
|
||||
"description": "Frequently repeated shell command — candidate for alias, function, or script.",
|
||||
"confidence": min(0.4 + (count * 0.05), 0.85),
|
||||
"impact": "low",
|
||||
"sources": ["shell_history"],
|
||||
})
|
||||
|
||||
return proposals
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Proposal output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def deduplicate_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Remove duplicate proposals based on title similarity."""
|
||||
seen_titles = set()
|
||||
unique = []
|
||||
for p in proposals:
|
||||
# Normalize title for dedup
|
||||
key = re.sub(r"\d+", "N", p["title"]).lower()
|
||||
if key not in seen_titles:
|
||||
seen_titles.add(key)
|
||||
unique.append(p)
|
||||
return unique
|
||||
|
||||
|
||||
def rank_proposals(proposals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Sort proposals by impact * confidence (highest first)."""
|
||||
impact_weight = {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}
|
||||
return sorted(
|
||||
proposals,
|
||||
key=lambda p: impact_weight.get(p.get("impact", "low"), 0.2) * p.get("confidence", 0.5),
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
|
||||
def format_text_report(proposals: List[Dict[str, Any]]) -> str:
|
||||
"""Format proposals as human-readable text."""
|
||||
if not proposals:
|
||||
return "No automation opportunities found."
|
||||
|
||||
lines = [
|
||||
"=" * 70,
|
||||
" AUTOMATION OPPORTUNITY REPORT",
|
||||
f" Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
|
||||
f" Proposals: {len(proposals)}",
|
||||
"=" * 70,
|
||||
"",
|
||||
]
|
||||
|
||||
for i, p in enumerate(proposals, 1):
|
||||
score = p.get("confidence", 0.5) * {"critical": 1.0, "high": 0.8, "medium": 0.5, "low": 0.2}.get(p.get("impact", "low"), 0.2)
|
||||
lines.append(f"[{i}] {p['title']}")
|
||||
lines.append(f" Category: {p['category']} | Impact: {p.get('impact','?')} | Confidence: {p.get('confidence',0):.0%} | Score: {score:.2f}")
|
||||
lines.append(f" {p['description']}")
|
||||
if p.get("sources"):
|
||||
lines.append(f" Sources: {', '.join(p['sources'][:3])}")
|
||||
lines.append("")
|
||||
|
||||
# Summary by category
|
||||
cat_counts = Counter(p["category"] for p in proposals)
|
||||
lines.append("-" * 70)
|
||||
lines.append("Summary by category:")
|
||||
for cat, count in cat_counts.most_common():
|
||||
lines.append(f" {cat}: {count}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Find automation opportunities across the fleet")
|
||||
parser.add_argument("--hermes-home", default=os.path.expanduser("~/.hermes"),
|
||||
help="Path to hermes home directory (default: ~/.hermes)")
|
||||
parser.add_argument("--scan-dirs", nargs="*",
|
||||
help="Additional directories to scan (default: hermes-home + cwd)")
|
||||
parser.add_argument("--session-dirs", nargs="*",
|
||||
help="Session transcript directories (default: hermes-home/sessions)")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
parser.add_argument("--output", "-o", help="Write proposals to file")
|
||||
parser.add_argument("--min-confidence", type=float, default=0.3,
|
||||
help="Minimum confidence threshold (default: 0.3)")
|
||||
parser.add_argument("--categories", nargs="*",
|
||||
help="Only include these categories (cron, docs, scripts, sessions, shell)")
|
||||
args = parser.parse_args()
|
||||
|
||||
hermes_home = os.path.expanduser(args.hermes_home)
|
||||
|
||||
# Default scan directories
|
||||
scan_dirs = [hermes_home, "."]
|
||||
if args.scan_dirs:
|
||||
scan_dirs.extend(args.scan_dirs)
|
||||
|
||||
session_dirs = [os.path.join(hermes_home, "sessions")]
|
||||
if args.session_dirs:
|
||||
session_dirs.extend(args.session_dirs)
|
||||
|
||||
# Also check common session locations
|
||||
for subdir in ("transcripts", "session-db"):
|
||||
p = os.path.join(hermes_home, subdir)
|
||||
if os.path.isdir(p):
|
||||
session_dirs.append(p)
|
||||
|
||||
categories = set(args.categories) if args.categories else {"cron", "docs", "scripts", "sessions", "shell"}
|
||||
|
||||
# Run analyzers
|
||||
all_proposals = []
|
||||
|
||||
if "cron" in categories:
|
||||
all_proposals.extend(analyze_cron_jobs(hermes_home))
|
||||
|
||||
if "docs" in categories:
|
||||
all_proposals.extend(analyze_documents(scan_dirs))
|
||||
|
||||
if "scripts" in categories:
|
||||
all_proposals.extend(analyze_scripts(scan_dirs))
|
||||
|
||||
if "sessions" in categories:
|
||||
all_proposals.extend(analyze_session_transcripts(session_dirs))
|
||||
|
||||
if "shell" in categories:
|
||||
all_proposals.extend(analyze_shell_history(scan_dirs))
|
||||
|
||||
# Deduplicate and rank
|
||||
all_proposals = deduplicate_proposals(all_proposals)
|
||||
all_proposals = rank_proposals(all_proposals)
|
||||
|
||||
# Filter by confidence
|
||||
all_proposals = [p for p in all_proposals if p.get("confidence", 0) >= args.min_confidence]
|
||||
|
||||
# Output
|
||||
output = {
|
||||
"generated_at": datetime.now(timezone.utc).isoformat(),
|
||||
"hermes_home": hermes_home,
|
||||
"scan_dirs": scan_dirs,
|
||||
"total_proposals": len(all_proposals),
|
||||
"proposals": all_proposals,
|
||||
}
|
||||
|
||||
if args.json:
|
||||
result = json.dumps(output, indent=2)
|
||||
else:
|
||||
result = format_text_report(all_proposals)
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
if args.json:
|
||||
json.dump(output, f, indent=2)
|
||||
else:
|
||||
f.write(result)
|
||||
print(f"Written to {args.output}", file=sys.stderr)
|
||||
else:
|
||||
print(result)
|
||||
|
||||
return 0 if all_proposals else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user