148 lines
6.0 KiB
Python
148 lines
6.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Tests for scripts/automation_opportunity_finder.py — 8 tests."""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
sys.path.insert(0, os.path.dirname(__file__) or ".")
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location(
|
|
"aof",
|
|
os.path.join(os.path.dirname(__file__) or ".", "automation_opportunity_finder.py"),
|
|
)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(mod)
|
|
|
|
|
|
def test_analyze_cron_jobs_no_file():
|
|
"""Returns empty list when no cron jobs file exists."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
result = mod.analyze_cron_jobs(tmpdir)
|
|
assert result == []
|
|
print("PASS: test_analyze_cron_jobs_no_file")
|
|
|
|
|
|
def test_analyze_cron_jobs_disabled():
|
|
"""Detects disabled cron jobs."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cron_dir = os.path.join(tmpdir, "cron")
|
|
os.makedirs(cron_dir)
|
|
jobs = [
|
|
{"id": "j1", "name": "backup", "enabled": False, "schedule": "0 * * * *"},
|
|
{"id": "j2", "name": "health", "enabled": True, "schedule": "*/5 * * * *"},
|
|
]
|
|
with open(os.path.join(cron_dir, "jobs.json"), "w") as f:
|
|
json.dump(jobs, f)
|
|
result = mod.analyze_cron_jobs(tmpdir)
|
|
assert any(p["category"] == "cron_disabled" for p in result)
|
|
print("PASS: test_analyze_cron_jobs_disabled")
|
|
|
|
|
|
def test_analyze_cron_jobs_errors():
|
|
"""Detects cron jobs with error status."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
cron_dir = os.path.join(tmpdir, "cron")
|
|
os.makedirs(cron_dir)
|
|
jobs = [
|
|
{"id": "j1", "name": "broken", "enabled": True, "last_status": "error", "schedule": "0 * * * *"},
|
|
]
|
|
with open(os.path.join(cron_dir, "jobs.json"), "w") as f:
|
|
json.dump(jobs, f)
|
|
result = mod.analyze_cron_jobs(tmpdir)
|
|
assert any(p["category"] == "cron_errors" for p in result)
|
|
print("PASS: test_analyze_cron_jobs_errors")
|
|
|
|
|
|
def test_analyze_documents_finds_todos():
|
|
"""Detects TODO markers in documents."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
docs_dir = os.path.join(tmpdir, "docs")
|
|
os.makedirs(docs_dir)
|
|
for i in range(3):
|
|
with open(os.path.join(docs_dir, f"guide{i}.md"), "w") as f:
|
|
f.write(f"# Guide {i}\n\nTODO: Automate this step\n")
|
|
result = mod.analyze_documents([tmpdir])
|
|
assert any(p["category"] == "manual_todo" for p in result)
|
|
todo_proposals = [p for p in result if p["category"] == "manual_todo"]
|
|
assert todo_proposals[0]["details"].__len__() == 3
|
|
print("PASS: test_analyze_documents_finds_todos")
|
|
|
|
|
|
def test_analyze_scripts_repeated_commands():
|
|
"""Detects repeated shell commands across scripts."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
scripts_dir = os.path.join(tmpdir, "scripts")
|
|
os.makedirs(scripts_dir)
|
|
repeated_cmd = "docker restart myapp"
|
|
for i in range(4):
|
|
with open(os.path.join(scripts_dir, f"deploy{i}.sh"), "w") as f:
|
|
f.write(f"#!/bin/bash\n{repeated_cmd}\n")
|
|
result = mod.analyze_scripts([tmpdir])
|
|
assert any(p["category"] == "repeated_command" for p in result)
|
|
print("PASS: test_analyze_scripts_repeated_commands")
|
|
|
|
|
|
def test_analyze_session_transcripts():
|
|
"""Detects repeated tool-call sequences."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
sessions_dir = os.path.join(tmpdir, "sessions")
|
|
os.makedirs(sessions_dir)
|
|
for i in range(4):
|
|
with open(os.path.join(sessions_dir, f"session{i}.jsonl"), "w") as f:
|
|
f.write(json.dumps({"role": "user", "content": f"task {i}"}) + "\n")
|
|
f.write(json.dumps({
|
|
"role": "assistant",
|
|
"content": "working",
|
|
"tool_calls": [
|
|
{"function": {"name": "read_file"}},
|
|
{"function": {"name": "write_file"}},
|
|
]
|
|
}) + "\n")
|
|
result = mod.analyze_session_transcripts([sessions_dir])
|
|
assert any(p["category"] == "tool_sequence" for p in result)
|
|
seq_proposals = [p for p in result if p["category"] == "tool_sequence"]
|
|
assert any("read_file" in p["title"] and "write_file" in p["title"] for p in seq_proposals)
|
|
print("PASS: test_analyze_session_transcripts")
|
|
|
|
|
|
def test_deduplicate_proposals():
|
|
"""Deduplicates proposals with similar titles."""
|
|
proposals = [
|
|
{"title": "TODO found 3 times", "category": "manual_todo", "confidence": 0.7, "impact": "medium", "description": "x", "sources": []},
|
|
{"title": "TODO found 3 times", "category": "manual_todo", "confidence": 0.7, "impact": "medium", "description": "x", "sources": []},
|
|
{"title": "FIXME found 5 times", "category": "manual_fixme", "confidence": 0.8, "impact": "medium", "description": "y", "sources": []},
|
|
]
|
|
result = mod.deduplicate_proposals(proposals)
|
|
assert len(result) == 2
|
|
print("PASS: test_deduplicate_proposals")
|
|
|
|
|
|
def test_rank_proposals():
|
|
"""Ranks proposals by impact * confidence."""
|
|
proposals = [
|
|
{"title": "low", "category": "x", "confidence": 0.9, "impact": "low", "description": "", "sources": []},
|
|
{"title": "high", "category": "x", "confidence": 0.8, "impact": "high", "description": "", "sources": []},
|
|
{"title": "med", "category": "x", "confidence": 0.7, "impact": "medium", "description": "", "sources": []},
|
|
]
|
|
result = mod.rank_proposals(proposals)
|
|
assert result[0]["title"] == "high"
|
|
assert result[-1]["title"] == "low"
|
|
print("PASS: test_rank_proposals")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
tests = [v for k, v in globals().items() if k.startswith("test_")]
|
|
passed = 0
|
|
failed = 0
|
|
for t in tests:
|
|
try:
|
|
t()
|
|
passed += 1
|
|
except Exception as e:
|
|
print(f"FAIL: {t.__name__}: {e}")
|
|
failed += 1
|
|
print(f"\n{passed}/{passed+failed} tests passed")
|
|
sys.exit(1 if failed else 0)
|