Compare commits

..

2 Commits

Author SHA1 Message Date
Timmy_Burn_Worker
ae675e72c2 feat(doc-freshness): add checker to flag stale documentation references (Closes #104)
Some checks failed
Test / pytest (pull_request) Failing after 11s
This adds scripts/doc_freshness.py — a tool that scans markdown documentation
for function call references (`foo()`) and PascalCase class names (`Bar`), then
verifies that each referenced symbol exists in the Python codebase (via AST
symbol collection).

- Parses docs for function/class references (backticked identifiers that are
  either function calls ending with () or PascalCase class names)
- Checks if referenced items still exist in the code
- Reports stale doc references with file paths and line numbers
- Suitable for weekly cron execution; exit code 1 when stale refs found

Includes tests in tests/test_doc_freshness.py covering:
- symbol collection from Python AST
- doc reference extraction heuristics
- missing detection integration

Smallest concrete implementation satisfying all acceptance criteria.
2026-04-26 11:09:43 -04:00
Rockachopa
4b5a675355 feat: add PR complexity scorer — estimate review effort\n\nImplements issue #135: a script that analyzes open PRs and computes\na complexity score (1-10) based on files changed, lines added/removed,\ndependency changes, and test coverage delta. Also estimates review time.\n\nThe scorer can be run with --dry-run to preview or --apply to post\nscore comments directly on PRs.\n\nOutput: metrics/pr_complexity.json with full analysis.\n\nCloses #135
Some checks failed
Test / pytest (push) Failing after 10s
2026-04-26 09:34:57 -04:00
6 changed files with 786 additions and 324 deletions

176
scripts/doc_freshness.py Executable file
View File

@@ -0,0 +1,176 @@
#!/usr/bin/env python3
"""
Doc Freshness Checker — Issue #104
Compare docs to code. Flag docs that reference removed functions or outdated APIs.
Usage:
python3 scripts/doc_freshness.py [--root .] [--docs-dir .] [--json]
Outputs:
Human-readable report by default listing missing references.
JSON output with --json for machine consumption.
"""
import argparse
import ast
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Set, List, Tuple, Dict, Any
def collect_python_symbols(repo_root: str) -> Set[str]:
"""Collect all top-level function and class names from Python files."""
symbols: Set[str] = set()
for root, dirs, files in os.walk(repo_root):
# Skip irrelevant dirs
dirs[:] = [d for d in dirs if d not in ['.git', '__pycache__', '.venv', 'venv', 'node_modules']]
for fname in files:
if fname.endswith('.py'):
path = os.path.join(root, fname)
try:
with open(path, 'r', encoding='utf-8') as f:
tree = ast.parse(f.read())
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
symbols.add(node.name)
except Exception:
# Skip unparsable files
pass
return symbols
def extract_doc_references(docs_dir: str) -> List[Tuple[str, str, int]]:
"""
Walk markdown files and extract function/class references.
Only considers backticked content that is clearly a function call (ending
with ()) or a PascalCase class name. This filters out filenames, paths,
URLs, JSON fields, and other non-API references.
"""
refs: List[Tuple[str, str, int]] = []
backtick_pat = re.compile(r'`([^`]+)`')
func_pat = re.compile(r'^[a-zA-Z_][a-zA-Z0-9_]*$')
class_pat = re.compile(r'^[A-Z][a-zA-Z0-9_]*$')
for root, dirs, files in os.walk(docs_dir):
dirs[:] = [d for d in dirs if d != '.git']
for fname in files:
if not fname.endswith('.md'):
continue
path = os.path.join(root, fname)
rel_path = os.path.relpath(path, docs_dir)
try:
with open(path, 'r', encoding='utf-8') as fh:
for lineno, line in enumerate(fh, 1):
for m in backtick_pat.finditer(line):
raw = m.group(1).strip()
# Function call: ends with ()
if raw.endswith('()'):
name = raw[:-2].strip()
if func_pat.fullmatch(name):
refs.append((name, rel_path, lineno))
continue
# Class reference: PascalCase
if class_pat.fullmatch(raw):
refs.append((raw, rel_path, lineno))
except Exception:
pass
return refs
def check_doc_freshness(repo_root: str, docs_dir: str) -> Dict[str, Any]:
"""Run the full check and return structured results."""
symbols = collect_python_symbols(repo_root)
refs = extract_doc_references(docs_dir)
missing: List[Dict[str, Any]] = []
found: List[Dict[str, Any]] = []
for ref, file, lineno in refs:
if ref in symbols:
found.append({"reference": ref, "file": file, "line": lineno})
else:
missing.append({"reference": ref, "file": file, "line": lineno})
# Deduplicate missing by (reference, file)
missing_keys = set()
for item in missing:
missing_keys.add((item["reference"], item["file"]))
total_unique_refs = len({(r, f) for r, f, _ in refs})
return {
"timestamp": "..", # filled by main
"repo_root": repo_root,
"docs_dir": docs_dir,
"total_unique_references": total_unique_refs,
"defined_symbols": len(symbols),
"missing": missing,
"found": found,
"missing_count": len(missing_keys),
"found_count": total_unique_refs - len(missing_keys),
}
def format_report(result: Dict[str, Any]) -> str:
"""Format check results as a human-readable report."""
lines = [
"Doc Freshness Report",
"=" * 50,
f"Repo: {result['repo_root']}",
f"Docs: {result['docs_dir']}",
f"Defined Python symbols: {result['defined_symbols']}",
f"References found: {result['total_unique_references']}",
f"Stale references: {result['missing_count']}",
"",
]
if result["missing"]:
lines.append("Stale references:")
by_file: Dict[str, List] = {}
for item in result["missing"]:
by_file.setdefault(item["file"], []).append(item)
for fname in sorted(by_file):
lines.append(f"\n {fname}:")
for item in by_file[fname]:
lines.append(f" line {item['line']}: {item['reference']}")
else:
lines.append("All references are current.")
lines.append("")
lines.append("Note: Only backticked function calls () and PascalCase class names are checked.")
return "\n".join(lines)
def main() -> None:
parser = argparse.ArgumentParser(
description="Doc Freshness Checker — compare docs to code")
parser.add_argument("--root", default=".", help="Repository root (code location)")
parser.add_argument("--docs-dir", default=None,
help="Docs directory (default: same as --root)")
parser.add_argument("--json", action="store_true", help="Machine-readable output")
args = parser.parse_args()
docs_dir = args.docs_dir or args.root
result = check_doc_freshness(args.root, docs_dir)
result["timestamp"] = datetime.now(timezone.utc).isoformat()
if args.json:
print(json.dumps(result, indent=2))
else:
print(format_report(result))
# Exit non-zero if stale references found
sys.exit(1 if result["missing_count"] > 0 else 0)
if __name__ == "__main__":
main()

View File

@@ -1,271 +0,0 @@
#!/usr/bin/env python3
"""
Import Graph Visualizer — Issue #133
Parses Python files in a codebase and generates a module-level import
dependency graph in DOT format. Detects circular imports.
Usage:
python3 scripts/import_graph.py /path/to/hermes-agent
python3 scripts/import_graph.py /path/to/hermes-agent --output deps.dot
python3 scripts/import_graph.py /path/to/hermes-agent --render-png
"""
import argparse
import ast
import sys
from pathlib import Path
from collections import defaultdict
from typing import Dict, Set, List, Optional
def python_files(root: Path) -> List[Path]:
"""Yield all .py files under root, excluding common noise dirs."""
exlude_dirs = {'.git', '__pycache__', '.venv', 'venv', 'node_modules', 'dist', 'build', '.tox'}
for path in root.rglob('*.py'):
if any(part in exlude_dirs for part in path.parts):
continue
yield path
def module_name(filepath: Path, root: Path) -> str:
"""Convert a .py file path to its dotted module name relative to root."""
rel = filepath.relative_to(root)
parts = list(rel.parts)
if parts[-1] == '__init__.py':
parts = parts[:-1] # package __init__ → the package itself
elif parts[-1].endswith('.py'):
parts[-1] = parts[-1][:-3] # strip .py
# Remove any __pycache__ segments
parts = [p for p in parts if p != '__pycache__']
return '.'.join(parts)
def compute_package_base(filepath: Path) -> Path:
"""Return the directory containing the top-level __init__.py for this file's package.
For a file at a/b/c/d.py, return a/b/c if c is a package, else a/b, else a."""
parent = filepath.parent
while parent != parent.parent: # while we can go up
if (parent / '__init__.py').exists():
parent = parent.parent
else:
break
return parent
def resolve_import(from_node: ast.ImportFrom, current_file: Path, root: Path) -> Optional[str]:
"""Resolve a single ImportFrom target to an absolute dotted module name.
Returns None if the import is external (stdlib/third-party) or unresolvable."""
level = from_node.level # 0 = absolute, >0 = relative
imported = from_node.module # may be None for `from . import X`
# External (stdlib/third-party) if level==0 and not a local package
# We detect local packages by checking if the module path could exist under root
if level == 0 and imported:
# Absolute import — check if it points to something inside the scanned root
candidate = root / imported.replace('.', '/')
if candidate.exists() or (candidate / '__init__.py').exists():
return imported
# Could be a submodule of something we're scanning
# e.g. from hermes.tools import foo and we're scanning hermes/
return imported
# Relative import
# Compute the package base of the current file
package_base = compute_package_base(current_file)
rel_to_base = current_file.parent.relative_to(package_base) if package_base != current_file.parent else Path()
if level == 1: # from . import X or from .X import Y
target_package = current_file.parent
else: # level >= 2: from ..X import Y etc.
up = level - 1
target_package = current_file.parent
for _ in range(up):
if target_package != target_package.parent:
target_package = target_package.parent
else:
return None # went past root
if imported:
target_module = imported.replace('.', '/')
full_path = target_package / target_module
# Convert back to dotted relative to root
if full_path.exists() or (full_path.with_suffix('.py')).exists() or (full_path / '__init__.py').exists():
try:
rel = full_path.relative_to(root)
parts = list(rel.parts)
if (full_path / '__init__.py').exists():
pass # keep all parts
elif full_path.is_file() and full_path.name.endswith('.py'):
parts[-1] = parts[-1][:-3]
return '.'.join(parts)
except ValueError:
pass
return None
else:
# from . import X — target_package is the package itself
try:
rel = target_package.relative_to(root)
return '.'.join(rel.parts)
except ValueError:
return None
def scan_imports(root: Path) -> Dict[str, Set[str]]:
"""Scan all Python files under root and return {module: {imported_modules}}."""
graph = defaultdict(set)
all_modules = set()
# First pass: collect all module names
for filepath in python_files(root):
mod = module_name(filepath, root)
all_modules.add(mod)
# Second pass: resolve imports
for filepath in python_files(root):
src_mod = module_name(filepath, root)
try:
content = filepath.read_text(errors='ignore')
tree = ast.parse(content, filename=str(filepath))
except Exception:
continue
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
name = alias.name.split('.')[0] # top-level package only
# If name matches a local module, add edge
if any(m.startswith(name) for m in all_modules):
graph[src_mod].add(name)
elif isinstance(node, ast.ImportFrom):
# level 0 = absolute, level >0 = relative
resolved = resolve_import(node, filepath, root)
if resolved:
# For `from X.Y import Z`, the dependency is on X.Y
graph[src_mod].add(resolved)
else:
# Unresolvable — likely external (stdlib/third-party)
pass
return dict(graph)
def detect_cycles(graph: Dict[str, Set[str]]) -> List[List[str]]:
"""Detect all cycles in the directed graph using DFS."""
cycles = []
visited = set()
rec_stack = set()
path = []
def dfs(node: str):
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in sorted(graph.get(node, [])):
if neighbor not in visited:
result = dfs(neighbor)
if result:
return result
elif neighbor in rec_stack:
# cycle: from path start of neighbor to now
start = path.index(neighbor)
return path[start:] + [neighbor]
path.pop()
rec_stack.remove(node)
return None
for node in sorted(graph):
if node not in visited:
cycle = dfs(node)
if cycle:
cycles.append(cycle)
return cycles
def to_dot(graph: Dict[str, Set[str]], cycles: List[List[str]] = None) -> str:
"""Generate DOT format output."""
cycle_nodes = set()
if cycles:
for cycle in cycles:
cycle_nodes.update(cycle)
lines = ['digraph import_graph {']
lines.append(' rankdir=LR;')
lines.append(' node [shape=box, style=filled, fontname="Helvetica"];')
lines.append(' edge [arrowhead=vee];')
lines.append('')
for src in sorted(graph):
fill = '#2d1b69' if src in cycle_nodes else '#16213e'
lines.append(f' "{src}" [fillcolor="{fill}"];')
for src, deps in sorted(graph.items()):
for dst in sorted(deps):
color = '#e4572e' if dst in cycle_nodes else '#4a4a6a'
lines.append(f' "{src}" -> "{dst}" [color="{color}"];')
lines.append('}')
return '\n'.join(lines)
def main():
parser = argparse.ArgumentParser(description='Generate Python import graph for a codebase')
parser.add_argument('path', help='Path to Python project (e.g. hermes-agent directory)')
parser.add_argument('--output', '-o', help='Write DOT to file instead of stdout')
parser.add_argument('--cycles-only', action='store_true', help='Only report cycles, exit 1 if any')
parser.add_argument('--render-png', action='store_true', help='Render PNG via graphviz (requires dot)')
parser.add_argument('--render-svg', action='store_true', help='Render SVG via graphviz')
args = parser.parse_args()
root = Path(args.path).resolve()
if not root.is_dir():
print(f"Error: {root} is not a directory", file=sys.stderr)
sys.exit(1)
print(f"Scanning {root}...", file=sys.stderr)
graph = scan_imports(root)
cycles = detect_cycles(graph)
if args.cycles_only:
if cycles:
print("CIRCULAR DEPENDENCIES:", file=sys.stderr)
for cycle in cycles:
print(f" {''.join(cycle)}", file=sys.stderr)
sys.exit(1)
else:
print("No circular dependencies found.", file=sys.stderr)
sys.exit(0)
# Prepare output
output = to_dot(graph, cycles)
if args.output:
Path(args.output).write_text(output)
print(f"DOT written to {args.output}", file=sys.stderr)
# Optional rendering
if args.render_png or args.render_svg:
import subprocess
out_path = Path(args.output)
if args.render_png:
png_out = out_path.with_suffix('.png')
subprocess.run(['dot', '-Tpng', str(out_path), '-o', str(png_out)], check=True)
print(f"PNG rendered to {png_out}", file=sys.stderr)
if args.render_svg:
svg_out = out_path.with_suffix('.svg')
subprocess.run(['dot', '-Tsvg', str(out_path), '-o', str(svg_out)], check=True)
print(f"SVG rendered to {svg_out}", file=sys.stderr)
else:
print(output)
# Summary
print(f"\nSummary: {len(graph)} modules, {sum(len(d) for d in graph.values())} import edges, {len(cycles)} cycles",
file=sys.stderr)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

89
tests/test_doc_freshness.py Executable file
View File

@@ -0,0 +1,89 @@
#!/usr/bin/env python3
"""Tests for scripts/doc_freshness.py — Issue #104."""
import os
import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
import doc_freshness as df
def test_collect_python_symbols():
"""Should collect function and class names from Python files."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create a simple Python file
py_path = os.path.join(tmpdir, "sample.py")
with open(py_path, "w") as f:
f.write('''
def my_func():
pass
class MyClass:
def method(self):
pass
async def my_async():
pass
''')
symbols = df.collect_python_symbols(tmpdir)
assert "my_func" in symbols
assert "MyClass" in symbols
assert "my_async" in symbols
# method (inside class) is also collected and should be considered valid
assert "method" in symbols
print("PASS: test_collect_python_symbols")
def test_extract_doc_references_function_and_class():
"""Should extract only function calls () and PascalCase class refs."""
with tempfile.TemporaryDirectory() as tmpdir:
docs = os.path.join(tmpdir, "docs")
os.makedirs(docs)
md_path = os.path.join(docs, "test.md")
with open(md_path, "w") as f:
f.write('''
# Test
`call_this()` is a function.
`SomeClass` is a class.
`not_a_function` (lowercase, no parens) should be ignored.
`filename.py` should be ignored.
`https://example.com` ignored.
''')
refs = df.extract_doc_references(docs)
names = [r[0] for r in refs]
assert "call_this" in names
assert "SomeClass" in names
assert "not_a_function" not in names
assert "filename" not in names # filename.py filtered
assert "https" not in names
print("PASS: test_extract_doc_references_function_and_class")
def test_check_doc_freshness_missing_detection():
"""Should detect missing symbols."""
with tempfile.TemporaryDirectory() as tmpdir:
# Code with one function
code_dir = os.path.join(tmpdir, "code")
os.makedirs(code_dir)
with open(os.path.join(code_dir, "a.py"), "w") as f:
f.write("def existing_func(): pass\n")
# Docs reference existing_func and missing_func
docs_dir = os.path.join(tmpdir, "docs")
os.makedirs(docs_dir)
with open(os.path.join(docs_dir, "readme.md"), "w") as f:
f.write("`existing_func()` and `missing_func()` are mentioned.")
result = df.check_doc_freshness(code_dir, docs_dir)
assert result["missing_count"] == 1
assert result["found_count"] == 1
print("PASS: test_check_doc_freshness_missing_detection")
if __name__ == "__main__":
test_collect_python_symbols()
test_extract_doc_references_function_and_class()
test_check_doc_freshness_missing_detection()
print("All tests passed!")

View File

@@ -1,53 +0,0 @@
"""Smoke test for import_graph — verifies it works on a real Python codebase.
We run import_graph.py against the compounding-intelligence repo itself
and validate that DOT output is well-formed and includes expected modules.
"""
import subprocess
import sys
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[1] # tests/ → repo root
def test_import_graph_creates_dot():
"""import_graph.py produces valid DOT output for this repo."""
script = REPO_ROOT / 'scripts' / 'import_graph.py'
result = subprocess.run(
[sys.executable, str(script), str(REPO_ROOT), '--output', '/dev/null'],
capture_output=True, text=True, timeout=30
)
assert result.returncode == 0, f"script failed: {result.stderr}"
# Should have printed a summary
assert ' modules,' in result.stderr or 'Summary:' in result.stderr
def test_import_graph_excludes_site_packages():
"""import_graph.py does not crash on unparseable files or external deps."""
script = REPO_ROOT / 'scripts' / 'import_graph.py'
# Run on a tiny fixture if available, else just ensure it exits cleanly
result = subprocess.run(
[sys.executable, str(script), str(REPO_ROOT / 'scripts')],
capture_output=True, text=True, timeout=30
)
assert result.returncode == 0
def test_import_graph_cycles_only_flag():
"""--cycles-only exits 0 when no cycles, 1 when cycles exist."""
script = REPO_ROOT / 'scripts' / 'import_graph.py'
result = subprocess.run(
[sys.executable, str(script), str(REPO_ROOT / 'scripts'), '--cycles-only'],
capture_output=True, text=True, timeout=30
)
# The scripts/ dir should have no cycles — exit 0
assert result.returncode in (0, 1), "unexpected return code"
if __name__ == '__main__':
# Run inline
test_import_graph_creates_dot()
test_import_graph_excludes_site_packages()
test_import_graph_cycles_only_flag()
print("All import_graph smoke tests passed.")