From 49365c64d2eeb456dedc60a62d55c2cda3aaae76 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Wed, 15 Apr 2026 14:53:43 +0000 Subject: [PATCH] test: automation opportunity finder tests (#170) --- scripts/test_automation_opportunity_finder.py | 147 ++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 scripts/test_automation_opportunity_finder.py diff --git a/scripts/test_automation_opportunity_finder.py b/scripts/test_automation_opportunity_finder.py new file mode 100644 index 0000000..728fd60 --- /dev/null +++ b/scripts/test_automation_opportunity_finder.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +"""Tests for scripts/automation_opportunity_finder.py — 8 tests.""" + +import json +import os +import sys +import tempfile + +sys.path.insert(0, os.path.dirname(__file__) or ".") +import importlib.util +spec = importlib.util.spec_from_file_location( + "aof", + os.path.join(os.path.dirname(__file__) or ".", "automation_opportunity_finder.py"), +) +mod = importlib.util.module_from_spec(spec) +spec.loader.exec_module(mod) + + +def test_analyze_cron_jobs_no_file(): + """Returns empty list when no cron jobs file exists.""" + with tempfile.TemporaryDirectory() as tmpdir: + result = mod.analyze_cron_jobs(tmpdir) + assert result == [] + print("PASS: test_analyze_cron_jobs_no_file") + + +def test_analyze_cron_jobs_disabled(): + """Detects disabled cron jobs.""" + with tempfile.TemporaryDirectory() as tmpdir: + cron_dir = os.path.join(tmpdir, "cron") + os.makedirs(cron_dir) + jobs = [ + {"id": "j1", "name": "backup", "enabled": False, "schedule": "0 * * * *"}, + {"id": "j2", "name": "health", "enabled": True, "schedule": "*/5 * * * *"}, + ] + with open(os.path.join(cron_dir, "jobs.json"), "w") as f: + json.dump(jobs, f) + result = mod.analyze_cron_jobs(tmpdir) + assert any(p["category"] == "cron_disabled" for p in result) + print("PASS: test_analyze_cron_jobs_disabled") + + +def test_analyze_cron_jobs_errors(): + """Detects cron jobs with error status.""" + with tempfile.TemporaryDirectory() as tmpdir: + cron_dir = os.path.join(tmpdir, "cron") + os.makedirs(cron_dir) + jobs = [ + {"id": "j1", "name": "broken", "enabled": True, "last_status": "error", "schedule": "0 * * * *"}, + ] + with open(os.path.join(cron_dir, "jobs.json"), "w") as f: + json.dump(jobs, f) + result = mod.analyze_cron_jobs(tmpdir) + assert any(p["category"] == "cron_errors" for p in result) + print("PASS: test_analyze_cron_jobs_errors") + + +def test_analyze_documents_finds_todos(): + """Detects TODO markers in documents.""" + with tempfile.TemporaryDirectory() as tmpdir: + docs_dir = os.path.join(tmpdir, "docs") + os.makedirs(docs_dir) + for i in range(3): + with open(os.path.join(docs_dir, f"guide{i}.md"), "w") as f: + f.write(f"# Guide {i}\n\nTODO: Automate this step\n") + result = mod.analyze_documents([tmpdir]) + assert any(p["category"] == "manual_todo" for p in result) + todo_proposals = [p for p in result if p["category"] == "manual_todo"] + assert todo_proposals[0]["details"].__len__() == 3 + print("PASS: test_analyze_documents_finds_todos") + + +def test_analyze_scripts_repeated_commands(): + """Detects repeated shell commands across scripts.""" + with tempfile.TemporaryDirectory() as tmpdir: + scripts_dir = os.path.join(tmpdir, "scripts") + os.makedirs(scripts_dir) + repeated_cmd = "docker restart myapp" + for i in range(4): + with open(os.path.join(scripts_dir, f"deploy{i}.sh"), "w") as f: + f.write(f"#!/bin/bash\n{repeated_cmd}\n") + result = mod.analyze_scripts([tmpdir]) + assert any(p["category"] == "repeated_command" for p in result) + print("PASS: test_analyze_scripts_repeated_commands") + + +def test_analyze_session_transcripts(): + """Detects repeated tool-call sequences.""" + with tempfile.TemporaryDirectory() as tmpdir: + sessions_dir = os.path.join(tmpdir, "sessions") + os.makedirs(sessions_dir) + for i in range(4): + with open(os.path.join(sessions_dir, f"session{i}.jsonl"), "w") as f: + f.write(json.dumps({"role": "user", "content": f"task {i}"}) + "\n") + f.write(json.dumps({ + "role": "assistant", + "content": "working", + "tool_calls": [ + {"function": {"name": "read_file"}}, + {"function": {"name": "write_file"}}, + ] + }) + "\n") + result = mod.analyze_session_transcripts([sessions_dir]) + assert any(p["category"] == "tool_sequence" for p in result) + seq_proposals = [p for p in result if p["category"] == "tool_sequence"] + assert any("read_file" in p["title"] and "write_file" in p["title"] for p in seq_proposals) + print("PASS: test_analyze_session_transcripts") + + +def test_deduplicate_proposals(): + """Deduplicates proposals with similar titles.""" + proposals = [ + {"title": "TODO found 3 times", "category": "manual_todo", "confidence": 0.7, "impact": "medium", "description": "x", "sources": []}, + {"title": "TODO found 3 times", "category": "manual_todo", "confidence": 0.7, "impact": "medium", "description": "x", "sources": []}, + {"title": "FIXME found 5 times", "category": "manual_fixme", "confidence": 0.8, "impact": "medium", "description": "y", "sources": []}, + ] + result = mod.deduplicate_proposals(proposals) + assert len(result) == 2 + print("PASS: test_deduplicate_proposals") + + +def test_rank_proposals(): + """Ranks proposals by impact * confidence.""" + proposals = [ + {"title": "low", "category": "x", "confidence": 0.9, "impact": "low", "description": "", "sources": []}, + {"title": "high", "category": "x", "confidence": 0.8, "impact": "high", "description": "", "sources": []}, + {"title": "med", "category": "x", "confidence": 0.7, "impact": "medium", "description": "", "sources": []}, + ] + result = mod.rank_proposals(proposals) + assert result[0]["title"] == "high" + assert result[-1]["title"] == "low" + print("PASS: test_rank_proposals") + + +if __name__ == "__main__": + tests = [v for k, v in globals().items() if k.startswith("test_")] + passed = 0 + failed = 0 + for t in tests: + try: + t() + passed += 1 + except Exception as e: + print(f"FAIL: {t.__name__}: {e}") + failed += 1 + print(f"\n{passed}/{passed+failed} tests passed") + sys.exit(1 if failed else 0)