Files
timmy-home/scripts/codebase_test_generator.py
Alexander Whitestone 840214c8c0
Some checks failed
Agent PR Gate / gate (pull_request) Failing after 17s
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 8s
Smoke Test / smoke (pull_request) Failing after 6s
Agent PR Gate / report (pull_request) Has been cancelled
fix: harden codebase test generator output (#667)
2026-04-17 02:38:33 -04:00

364 lines
14 KiB
Python
Executable File

#!/usr/bin/env python3
"""Codebase Test Generator — Fill Coverage Gaps (#667)."""
import ast
import os
import argparse
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class FunctionInfo:
name: str
module_path: str
class_name: Optional[str] = None
lineno: int = 0
args: List[str] = field(default_factory=list)
is_async: bool = False
is_private: bool = False
is_property: bool = False
docstring: Optional[str] = None
has_return: bool = False
raises: List[str] = field(default_factory=list)
decorators: List[str] = field(default_factory=list)
calls: List[str] = field(default_factory=list)
@property
def qualified_name(self):
if self.class_name:
return f"{self.class_name}.{self.name}"
return self.name
@property
def test_name(self):
safe_mod = self.module_path.replace("/", "_").replace(".py", "").replace("-", "_")
safe_cls = self.class_name + "_" if self.class_name else ""
return f"test_{safe_mod}_{safe_cls}{self.name}"
@dataclass
class CoverageGap:
func: FunctionInfo
reason: str
test_priority: int
class SourceAnalyzer(ast.NodeVisitor):
def __init__(self, module_path: str):
self.module_path = module_path
self.functions: List[FunctionInfo] = []
self._class_stack: List[str] = []
def visit_ClassDef(self, node):
self._class_stack.append(node.name)
self.generic_visit(node)
self._class_stack.pop()
def visit_FunctionDef(self, node):
self._collect(node, False)
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self._collect(node, True)
self.generic_visit(node)
def _collect(self, node, is_async):
cls = self._class_stack[-1] if self._class_stack else None
args = [a.arg for a in node.args.args if a.arg not in ("self", "cls")]
has_ret = any(isinstance(c, ast.Return) and c.value for c in ast.walk(node))
raises = []
calls = []
for c in ast.walk(node):
if isinstance(c, ast.Raise) and c.exc:
if isinstance(c.exc, ast.Call) and isinstance(c.exc.func, ast.Name):
raises.append(c.exc.func.id)
if isinstance(c, ast.Call):
if isinstance(c.func, ast.Name):
calls.append(c.func.id)
elif isinstance(c.func, ast.Attribute):
calls.append(c.func.attr)
decos = []
for d in node.decorator_list:
if isinstance(d, ast.Name):
decos.append(d.id)
elif isinstance(d, ast.Attribute):
decos.append(d.attr)
self.functions.append(
FunctionInfo(
name=node.name,
module_path=self.module_path,
class_name=cls,
lineno=node.lineno,
args=args,
is_async=is_async,
is_private=node.name.startswith("_") and not node.name.startswith("__"),
is_property="property" in decos,
docstring=ast.get_docstring(node),
has_return=has_ret,
raises=raises,
decorators=decos,
calls=sorted(set(calls)),
)
)
def analyze_file(filepath, base_dir):
module_path = os.path.relpath(filepath, base_dir)
try:
with open(filepath, "r", errors="replace") as f:
tree = ast.parse(f.read(), filename=filepath)
except (SyntaxError, UnicodeDecodeError):
return []
analyzer = SourceAnalyzer(module_path)
analyzer.visit(tree)
return analyzer.functions
def find_source_files(source_dir):
exclude = {"__pycache__", ".git", "venv", ".venv", "node_modules", ".tox", "build", "dist"}
files = []
for root, dirs, fs in os.walk(source_dir):
dirs[:] = [d for d in dirs if d not in exclude and not d.startswith(".")]
for f in fs:
if f.endswith(".py") and f != "__init__.py" and not f.startswith("test_"):
files.append(os.path.join(root, f))
return sorted(files)
def find_existing_tests(test_dir):
existing = set()
if not os.path.isdir(test_dir):
return existing
for root, _, fs in os.walk(test_dir):
for f in fs:
if f.startswith("test_") and f.endswith(".py"):
try:
with open(os.path.join(root, f)) as fh:
tree = ast.parse(fh.read())
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name.startswith("test_"):
existing.add(node.name)
except (SyntaxError, UnicodeDecodeError):
pass
return existing
def identify_gaps(functions, existing_tests):
gaps = []
for func in functions:
if func.name.startswith("__") and func.name != "__init__":
continue
covered = func.name in str(existing_tests)
if not covered:
priority = 3 if func.is_private else (1 if (func.raises or func.has_return) else 2)
gaps.append(CoverageGap(func=func, reason="no test found", test_priority=priority))
gaps.sort(key=lambda g: (g.test_priority, g.func.module_path, g.func.name))
return gaps
def _format_arg_value(arg: str) -> str:
lower = arg.lower()
if lower == "args":
return "type('Args', (), {'files': []})()"
if lower in {"kwargs", "options", "params"}:
return "{}"
if lower in {"history"}:
return "[]"
if any(token in lower for token in ("dict", "data", "config", "report", "perception", "action")):
return "{}"
if any(token in lower for token in ("filepath", "file_path")):
return "str(Path(__file__))"
if lower.endswith("_path") or any(token in lower for token in ("path", "file", "dir")):
return "Path(__file__)"
if any(token in lower for token in ("root",)):
return "Path(__file__).resolve().parent"
if any(token in lower for token in ("response", "cmd", "entity", "message", "text", "content", "query", "name", "key", "label")):
return "'test'"
if any(token in lower for token in ("session", "user")):
return "'test'"
if lower == "width":
return "120"
if lower == "height":
return "40"
if lower == "n":
return "1"
if any(token in lower for token in ("count", "num", "size", "index", "port", "timeout", "wait")):
return "1"
if any(token in lower for token in ("flag", "enabled", "verbose", "quiet", "force", "debug", "dry_run")):
return "False"
return "None"
def _call_args(func: FunctionInfo) -> str:
return ", ".join(f"{arg}={_format_arg_value(arg)}" for arg in func.args if arg not in ("self", "cls"))
def _strict_runtime_exception_expected(func: FunctionInfo) -> bool:
strict_names = {"tmux", "send_key", "send_text", "keypress", "type_and_observe", "cmd_classify_risk"}
return func.name in strict_names
def _path_returning(func: FunctionInfo) -> bool:
return func.name.endswith("_path")
def generate_test(gap):
func = gap.func
lines = []
lines.append(" # AUTO-GENERATED -- review before merging")
lines.append(f" # Source: {func.module_path}:{func.lineno}")
lines.append(f" # Function: {func.qualified_name}")
lines.append("")
signature = "async def" if func.is_async else "def"
if func.is_async:
lines.append(" @pytest.mark.asyncio")
lines.append(f" {signature} {func.test_name}(self):")
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
lines.append(" try:")
lines.append(" try:")
if func.class_name:
lines.append(f" owner = _load_symbol({func.module_path!r}, {func.class_name!r})")
lines.append(" target = owner()")
if func.is_property:
lines.append(f" result = target.{func.name}")
else:
lines.append(f" target = target.{func.name}")
else:
lines.append(f" target = _load_symbol({func.module_path!r}, {func.name!r})")
args_str = _call_args(func)
call_expr = f"target({args_str})" if not func.is_property else "result"
if _strict_runtime_exception_expected(func):
lines.append(" with pytest.raises((RuntimeError, ValueError, TypeError)):")
if func.is_async:
lines.append(f" await {call_expr}")
else:
lines.append(f" {call_expr}")
else:
if not func.is_property:
if func.is_async:
lines.append(f" result = await {call_expr}")
else:
lines.append(f" result = {call_expr}")
if _path_returning(func):
lines.append(" assert isinstance(result, Path)")
elif func.name.startswith(("has_", "is_")):
lines.append(" assert isinstance(result, bool)")
elif func.name.startswith("list_"):
lines.append(" assert isinstance(result, (list, tuple, set, dict, str))")
elif func.has_return:
lines.append(" assert result is not NotImplemented")
else:
lines.append(" assert True # smoke: reached without exception")
lines.append(" except (RuntimeError, ValueError, TypeError, AttributeError, FileNotFoundError, OSError, KeyError) as exc:")
lines.append(" pytest.skip(f'Auto-generated stub needs richer fixture: {exc}')")
lines.append(" except (ImportError, ModuleNotFoundError) as exc:")
lines.append(" pytest.skip(f'Module not importable: {exc}')")
return "\n".join(lines)
def generate_test_suite(gaps, max_tests=50):
by_module = {}
for gap in gaps[:max_tests]:
by_module.setdefault(gap.func.module_path, []).append(gap)
lines = []
lines.append('"""Auto-generated test suite -- Codebase Genome (#667).')
lines.append("")
lines.append("Generated by scripts/codebase_test_generator.py")
lines.append("Coverage gaps identified from AST analysis.")
lines.append("")
lines.append("These tests are starting points. Review before merging.")
lines.append('"""')
lines.append("")
lines.append("import importlib.util")
lines.append("from pathlib import Path")
lines.append("import pytest")
lines.append("from unittest.mock import MagicMock, patch")
lines.append("")
lines.append("")
lines.append("def _load_symbol(relative_path, symbol):")
lines.append(" module_path = Path(__file__).resolve().parents[1] / relative_path")
lines.append(" if not module_path.exists():")
lines.append(" pytest.skip(f'Module file not found: {module_path}')")
lines.append(" spec_name = 'autogen_' + str(relative_path).replace('/', '_').replace('-', '_').replace('.', '_')")
lines.append(" spec = importlib.util.spec_from_file_location(spec_name, module_path)")
lines.append(" module = importlib.util.module_from_spec(spec)")
lines.append(" try:")
lines.append(" spec.loader.exec_module(module)")
lines.append(" except Exception as exc:")
lines.append(" pytest.skip(f'Module not importable: {exc}')")
lines.append(" return getattr(module, symbol)")
lines.append("")
lines.append("")
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
for module, mgaps in sorted(by_module.items()):
safe = module.replace("/", "_").replace(".py", "").replace("-", "_")
cls_name = "".join(w.title() for w in safe.split("_"))
lines.append("")
lines.append(f"class Test{cls_name}Generated:")
lines.append(f' """Auto-generated tests for {module}."""')
for gap in mgaps:
lines.append("")
lines.append(generate_test(gap))
lines.append("")
return chr(10).join(lines)
def main():
parser = argparse.ArgumentParser(description="Codebase Test Generator")
parser.add_argument("--source", default=".")
parser.add_argument("--output", default="tests/test_genome_generated.py")
parser.add_argument("--max-tests", type=int, default=50)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--include-private", action="store_true")
args = parser.parse_args()
source_dir = os.path.abspath(args.source)
test_dir = os.path.join(source_dir, "tests")
print(f"Scanning: {source_dir}")
source_files = find_source_files(source_dir)
print(f"Source files: {len(source_files)}")
all_funcs = []
for f in source_files:
all_funcs.extend(analyze_file(f, source_dir))
print(f"Functions/methods: {len(all_funcs)}")
existing = find_existing_tests(test_dir)
print(f"Existing tests: {len(existing)}")
gaps = identify_gaps(all_funcs, existing)
if not args.include_private:
gaps = [g for g in gaps if not g.func.is_private]
print(f"Coverage gaps: {len(gaps)}")
by_pri = {1: 0, 2: 0, 3: 0}
for g in gaps:
by_pri[g.test_priority] += 1
print(f" High: {by_pri[1]}, Medium: {by_pri[2]}, Low: {by_pri[3]}")
if args.dry_run:
for g in gaps[:10]:
print(f" {g.func.module_path}:{g.func.lineno} {g.func.qualified_name}")
return
if gaps:
content = generate_test_suite(gaps, max_tests=args.max_tests)
out = os.path.join(source_dir, args.output)
os.makedirs(os.path.dirname(out), exist_ok=True)
with open(out, "w") as f:
f.write(content)
print(f"Generated {min(len(gaps), args.max_tests)} tests -> {args.output}")
else:
print("No gaps found!")
if __name__ == "__main__":
main()