Compare commits

..

5 Commits

Author SHA1 Message Date
b3592e14ad test: add tests for Performance Bottleneck Finder
Refs #171
2026-04-15 14:48:59 +00:00
be805a1b4c feat: add Performance Bottleneck Finder (#171)
Analyzes: slow tests, build artifacts, CI workflows, heavy imports.
Outputs: markdown report or JSON. Designed for weekly cron.

Closes #171
2026-04-15 14:47:27 +00:00
e6f1b07f16 Merge pull request 'feat: Knowledge store staleness detector (closes #179)' (#185) from feat/179-staleness-check into main 2026-04-15 06:09:14 +00:00
81c02f6709 feat: Add staleness detector tests (closes #179) 2026-04-15 04:00:46 +00:00
c2c3c6a3b9 feat: Add knowledge staleness detector (closes #179) 2026-04-15 04:00:12 +00:00
5 changed files with 1130 additions and 282 deletions

View File

@@ -1,282 +0,0 @@
#!/usr/bin/env python3
"""
Dead Code Detector for Python Codebases
AST-based analysis to find defined but never-called functions and classes.
Excludes entry points, plugin hooks, __init__ exports.
Usage:
python3 scripts/dead_code_detector.py /path/to/repo/
python3 scripts/dead_code_detector.py hermes-agent/ --format json
python3 scripts/dead_code_detector.py . --exclude tests/,venv/
Output: file:line, function/class name, last git author (if available)
"""
import argparse
import ast
import json
import os
import subprocess
import sys
from collections import defaultdict
from pathlib import Path
from typing import Optional
# Names that are expected to be unused (entry points, protocol methods, etc.)
SAFE_UNUSED_PATTERNS = {
# Python dunders
"__init__", "__str__", "__repr__", "__eq__", "__hash__", "__len__",
"__getitem__", "__setitem__", "__contains__", "__iter__", "__next__",
"__enter__", "__exit__", "__call__", "__bool__", "__del__",
"__post_init__", "__class_getitem__",
# Common entry points
"main", "app", "handler", "setup", "teardown", "fixture",
# pytest
"conftest", "test_", "pytest_", # prefix patterns
# Protocols / abstract
"abstractmethod", "abc_",
}
def is_safe_unused(name: str, filepath: str) -> bool:
"""Check if an unused name is expected to be unused."""
# Test files are exempt
if "test" in filepath.lower():
return True
# Known patterns
for pattern in SAFE_UNUSED_PATTERNS:
if name.startswith(pattern) or name == pattern:
return True
# __init__.py exports are often unused internally
if filepath.endswith("__init__.py"):
return True
return False
def get_git_blame(filepath: str, lineno: int) -> Optional[str]:
"""Get last author of a line via git blame."""
try:
result = subprocess.run(
["git", "blame", "-L", f"{lineno},{lineno}", "--porcelain", filepath],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.split("\n"):
if line.startswith("author "):
return line[7:]
except:
pass
return None
class DefinitionCollector(ast.NodeVisitor):
"""Collect all function and class definitions."""
def __init__(self):
self.definitions = [] # (name, type, lineno, filepath)
def visit_FunctionDef(self, node):
self.definitions.append((node.name, "function", node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.definitions.append((node.name, "async_function", node.lineno))
self.generic_visit(node)
def visit_ClassDef(self, node):
self.definitions.append((node.name, "class", node.lineno))
self.generic_visit(node)
class NameUsageCollector(ast.NodeVisitor):
"""Collect all name references (calls, imports, attribute access)."""
def __init__(self):
self.names = set()
self.calls = set()
self.imports = set()
def visit_Name(self, node):
self.names.add(node.id)
self.generic_visit(node)
def visit_Attribute(self, node):
if isinstance(node.value, ast.Name):
self.names.add(node.value.id)
self.generic_visit(node)
def visit_Call(self, node):
if isinstance(node.func, ast.Name):
self.calls.add(node.func.id)
elif isinstance(node.func, ast.Attribute):
if isinstance(node.func.value, ast.Name):
self.names.add(node.func.value.id)
self.calls.add(node.func.attr)
self.generic_visit(node)
def visit_Import(self, node):
for alias in node.names:
self.imports.add(alias.asname or alias.name)
self.generic_visit(node)
def visit_ImportFrom(self, node):
for alias in node.names:
self.imports.add(alias.asname or alias.name)
self.generic_visit(node)
def analyze_file(filepath: str) -> dict:
"""Analyze a single Python file for dead code."""
path = Path(filepath)
try:
content = path.read_text()
tree = ast.parse(content, filename=str(filepath))
except (SyntaxError, UnicodeDecodeError):
return {"error": f"Could not parse {filepath}"}
# Collect definitions
def_collector = DefinitionCollector()
def_collector.visit(tree)
definitions = def_collector.definitions
# Collect usage
usage_collector = NameUsageCollector()
usage_collector.visit(tree)
used_names = usage_collector.names | usage_collector.calls | usage_collector.imports
# Also scan the entire repo for references to this file's definitions
# (this is done at the repo level, not file level)
dead = []
for name, def_type, lineno in definitions:
if name.startswith("_") and not name.startswith("__"):
# Private functions — might be used externally, less likely dead
pass
if name not in used_names:
if not is_safe_unused(name, filepath):
dead.append({
"name": name,
"type": def_type,
"file": filepath,
"line": lineno,
})
return {"definitions": len(definitions), "dead": dead}
def scan_repo(repo_path: str, exclude_patterns: list = None) -> dict:
"""Scan an entire repo for dead code."""
path = Path(repo_path)
exclude = exclude_patterns or ["venv", ".venv", "node_modules", "__pycache__",
".git", "dist", "build", ".tox", "vendor"]
all_definitions = {} # name -> [{file, line, type}]
all_files = []
dead_code = []
# First pass: collect all definitions across repo
for fpath in path.rglob("*.py"):
parts = fpath.parts
if any(ex in parts for ex in exclude):
continue
if fpath.name.startswith("."):
continue
try:
content = fpath.read_text(errors="ignore")
tree = ast.parse(content, filename=str(fpath))
except:
continue
all_files.append(str(fpath))
collector = DefinitionCollector()
collector.visit(tree)
for name, def_type, lineno in collector.definitions:
rel_path = str(fpath.relative_to(path))
if name not in all_definitions:
all_definitions[name] = []
all_definitions[name].append({
"file": rel_path,
"line": lineno,
"type": def_type,
})
# Second pass: check each name for usage across entire repo
all_used_names = set()
for fpath_str in all_files:
try:
content = Path(fpath_str).read_text(errors="ignore")
tree = ast.parse(content)
except:
continue
usage = NameUsageCollector()
usage.visit(tree)
all_used_names.update(usage.names)
all_used_names.update(usage.calls)
all_used_names.update(usage.imports)
# Find dead code
for name, locations in all_definitions.items():
if name not in all_used_names:
for loc in locations:
if not is_safe_unused(name, loc["file"]):
dead_code.append({
"name": name,
"type": loc["type"],
"file": loc["file"],
"line": loc["line"],
})
return {
"repo": path.name,
"files_scanned": len(all_files),
"total_definitions": sum(len(v) for v in all_definitions.values()),
"dead_code_count": len(dead_code),
"dead_code": sorted(dead_code, key=lambda x: (x["file"], x["line"])),
}
def main():
parser = argparse.ArgumentParser(description="Find dead code in Python codebases")
parser.add_argument("repo", help="Repository path to scan")
parser.add_argument("--format", choices=["text", "json"], default="text")
parser.add_argument("--exclude", help="Comma-separated patterns to exclude")
parser.add_argument("--git-blame", action="store_true", help="Include git blame info")
args = parser.parse_args()
exclude = args.exclude.split(",") if args.exclude else None
result = scan_repo(args.repo, exclude)
if args.format == "json":
print(json.dumps(result, indent=2))
else:
print(f"Dead Code Report: {result['repo']}")
print(f"Files scanned: {result['files_scanned']}")
print(f"Total definitions: {result['total_definitions']}")
print(f"Dead code found: {result['dead_code_count']}")
print()
if result["dead_code"]:
print(f"{'File':<45} {'Line':>4} {'Type':<10} {'Name'}")
print("-" * 85)
for item in result["dead_code"]:
author = ""
if args.git_blame:
author = get_git_blame(
os.path.join(args.repo, item["file"]),
item["line"]
) or ""
author = f" ({author})" if author else ""
print(f"{item['file']:<45} {item['line']:>4} {item['type']:<10} {item['name']}{author}")
else:
print("No dead code detected!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,131 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector — Detect stale knowledge entries by comparing source file hashes.
Usage:
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
"""
import argparse
import hashlib
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file doesn't exist."""
try:
with open(filepath, "rb") as f:
return "sha256:" + hashlib.sha256(f.read()).hexdigest()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return None
def check_staleness(index_path: str, repo_root: str = ".") -> List[Dict[str, Any]]:
"""Check all entries in knowledge index for staleness.
Returns list of entries with staleness info:
- status: "fresh" | "stale" | "missing_source" | "no_hash"
- current_hash: computed hash (if source exists)
- stored_hash: hash from index
"""
with open(index_path) as f:
data = json.load(f)
facts = data.get("facts", [])
results = []
for entry in facts:
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results.append({**entry, "status": "no_source", "current_hash": None})
continue
full_path = os.path.join(repo_root, source_file)
current_hash = compute_file_hash(full_path)
if current_hash is None:
results.append({**entry, "status": "missing_source", "current_hash": None})
elif not stored_hash:
results.append({**entry, "status": "no_hash", "current_hash": current_hash})
elif current_hash != stored_hash:
results.append({**entry, "status": "stale", "current_hash": current_hash})
else:
results.append({**entry, "status": "fresh", "current_hash": current_hash})
return results
def fix_hashes(index_path: str, repo_root: str = ".") -> int:
"""Add hashes to entries missing them. Returns count of fixed entries."""
with open(index_path) as f:
data = json.load(f)
fixed = 0
for entry in data.get("facts", []):
if entry.get("source_hash"):
continue
source_file = entry.get("source_file")
if not source_file:
continue
full_path = os.path.join(repo_root, source_file)
h = compute_file_hash(full_path)
if h:
entry["source_hash"] = h
fixed += 1
with open(index_path, "w") as f:
json.dump(data, f, indent=2)
return fixed
def main():
parser = argparse.ArgumentParser(description="Check knowledge store staleness")
parser.add_argument("--index", required=True, help="Path to knowledge/index.json")
parser.add_argument("--repo", default=".", help="Repo root for source file resolution")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--fix", action="store_true", help="Add hashes to entries missing them")
args = parser.parse_args()
if args.fix:
fixed = fix_hashes(args.index, args.repo)
print(f"Fixed {fixed} entries with missing hashes.")
return
results = check_staleness(args.index, args.repo)
if args.json:
print(json.dumps(results, indent=2))
else:
stale = [r for r in results if r["status"] != "fresh"]
fresh = [r for r in results if r["status"] == "fresh"]
print(f"Knowledge Store Staleness Check")
print(f" Total entries: {len(results)}")
print(f" Fresh: {len(fresh)}")
print(f" Stale/Issues: {len(stale)}")
print()
if stale:
print("Issues found:")
for r in stale:
status = r["status"]
fact = r.get("fact", "?")[:60]
source = r.get("source_file", "?")
print(f" [{status}] {source}: {fact}")
else:
print("All entries are fresh!")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,551 @@
#!/usr/bin/env python3
"""
Performance Bottleneck Finder — Identify slow tests, builds, and CI steps.
Analyzes:
1. Pytest output for slow tests
2. Build logs for slow steps
3. CI workflow durations
4. File system for large/slow artifacts
Usage:
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo --json
python3 scripts/perf_bottleneck_finder.py --repo /path/to/repo --report metrics/perf_report.md
Weekly cron:
0 9 * * 1 cd /path/to/compounding-intelligence && python3 scripts/perf_bottleneck_finder.py --repo /path/to/target --report metrics/perf_report.md
"""
import argparse
import json
import os
import re
import subprocess
import sys
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
# ── Configuration ──────────────────────────────────────────────────
SLOW_TEST_THRESHOLD_S = 2.0 # Tests slower than this are flagged
SLOW_BUILD_STEP_THRESHOLD_S = 10.0
TOP_N_BOTTLENECKS = 10 # Report top N bottlenecks
PYTEST_DURATIONS_COUNT = 20 # Number of slow tests to collect
LOG_EXTENSIONS = {".log", ".txt"}
@dataclass
class Bottleneck:
"""A single performance bottleneck."""
category: str # "test", "build", "ci", "artifact", "import"
name: str # What's slow
duration_s: float # How long it takes
severity: str # "critical", "warning", "info"
recommendation: str # How to fix
file_path: Optional[str] = None
line_number: Optional[int] = None
@dataclass
class PerfReport:
"""Full performance report."""
timestamp: str
repo_path: str
bottlenecks: List[Bottleneck] = field(default_factory=list)
summary: Dict[str, Any] = field(default_factory=dict)
test_stats: Dict[str, Any] = field(default_factory=dict)
build_stats: Dict[str, Any] = field(default_factory=dict)
ci_stats: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict:
d = asdict(self)
return d
# ── Test Analysis ──────────────────────────────────────────────────
def find_slow_tests_pytest(repo_path: str) -> List[Bottleneck]:
"""Run pytest --durations and parse slow tests."""
bottlenecks = []
# Try to run pytest with durations
try:
result = subprocess.run(
["python3", "-m", "pytest", "--co", "-q", "--durations=0"],
cwd=repo_path, capture_output=True, text=True, timeout=30
)
# If tests exist, try to get durations from last run
durations_file = os.path.join(repo_path, ".pytest_cache", "v", "cache", "durations")
if os.path.exists(durations_file):
with open(durations_file) as f:
for line in f:
parts = line.strip().split()
if len(parts) >= 2:
try:
duration = float(parts[0])
test_name = " ".join(parts[1:])
if duration > SLOW_TEST_THRESHOLD_S:
severity = "critical" if duration > 10 else "warning"
bottlenecks.append(Bottleneck(
category="test",
name=test_name,
duration_s=duration,
severity=severity,
recommendation=f"Test takes {duration:.1f}s. Consider mocking slow I/O, using fixtures, or marking with @pytest.mark.slow."
))
except ValueError:
continue
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
return bottlenecks
def find_slow_tests_by_scan(repo_path: str) -> List[Bottleneck]:
"""Scan test files for patterns that indicate slow tests."""
bottlenecks = []
test_patterns = [
(r"time\.sleep\((\d+(?:\.\d+)?)\)", "Contains time.sleep() — consider using mock or async wait"),
(r"subprocess\.run\(.*timeout=(\d+)", "Subprocess with timeout — may block test"),
(r"requests\.(get|post|put|delete)\(", "Real HTTP call — mock with responses or httpretty"),
(r"open\([^)]*['"]w['"]", "File I/O in test — use tmp_path fixture"),
]
for root, dirs, files in os.walk(repo_path):
# Skip hidden and cache dirs
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__', 'node_modules', '.git'))]
for fname in files:
if not (fname.startswith("test_") or fname.endswith("_test.py")):
continue
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, repo_path)
try:
with open(fpath) as f:
lines = f.readlines()
except (PermissionError, UnicodeDecodeError):
continue
for i, line in enumerate(lines):
for pattern, recommendation in test_patterns:
match = re.search(pattern, line)
if match:
duration = 1.0 # Default estimate
if "sleep" in pattern:
try:
duration = float(match.group(1))
except (ValueError, IndexError):
duration = 1.0
elif "timeout" in pattern:
try:
duration = float(match.group(1))
except (ValueError, IndexError):
duration = 10.0
else:
duration = 2.0 # Estimated
bottlenecks.append(Bottleneck(
category="test",
name=f"{rel_path}:{i+1}",
duration_s=duration,
severity="warning" if duration < 5 else "critical",
recommendation=recommendation,
file_path=rel_path,
line_number=i + 1
))
return bottlenecks
# ── Build Analysis ─────────────────────────────────────────────────
def analyze_build_artifacts(repo_path: str) -> List[Bottleneck]:
"""Find large build artifacts that slow down builds."""
bottlenecks = []
large_dirs = {
"node_modules": "Consider using npm ci --production or yarn --production",
"__pycache__": "Consider .gitignore and cleaning before builds",
".tox": "Consider caching tox environments",
".pytest_cache": "Consider cleaning between CI runs",
"dist": "Check if dist/ artifacts are being rebuilt unnecessarily",
"build": "Check if build/ artifacts are being rebuilt unnecessarily",
".next": "Next.js cache — consider incremental builds",
"venv": "Virtual env in repo — move outside or use Docker",
}
for dirname, recommendation in large_dirs.items():
dirpath = os.path.join(repo_path, dirname)
if os.path.isdir(dirpath):
total_size = 0
file_count = 0
for root, dirs, files in os.walk(dirpath):
for f in files:
try:
fpath = os.path.join(root, f)
total_size += os.path.getsize(fpath)
file_count += 1
except OSError:
pass
if total_size > 10 * 1024 * 1024: # > 10MB
size_mb = total_size / (1024 * 1024)
bottlenecks.append(Bottleneck(
category="build",
name=f"{dirname}/ ({size_mb:.1f}MB, {file_count} files)",
duration_s=size_mb * 0.5, # Rough estimate
severity="critical" if size_mb > 100 else "warning",
recommendation=recommendation
))
return bottlenecks
def analyze_makefile_targets(repo_path: str) -> List[Bottleneck]:
"""Analyze Makefile for potentially slow targets."""
bottlenecks = []
makefiles = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__'))]
for f in files:
if f in ("Makefile", "makefile", "GNUmakefile"):
makefiles.append(os.path.join(root, f))
slow_patterns = [
(r"pip install", "pip install without --no-deps or constraints"),
(r"npm install(?!.*--production)", "npm install without --production flag"),
(r"docker build", "Docker build — consider multi-stage and layer caching"),
(r"pytest(?!.*-x|--maxfail)", "pytest without early exit on failure"),
(r"mypy|mypy --strict", "Type checking — consider incremental mode"),
]
for mfile in makefiles:
rel_path = os.path.relpath(mfile, repo_path)
try:
with open(mfile) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
for pattern, recommendation in slow_patterns:
if re.search(pattern, content):
bottlenecks.append(Bottleneck(
category="build",
name=f"{rel_path}: {pattern}",
duration_s=5.0,
severity="info",
recommendation=recommendation,
file_path=rel_path
))
return bottlenecks
# ── CI Analysis ────────────────────────────────────────────────────
def analyze_github_actions(repo_path: str) -> List[Bottleneck]:
"""Analyze GitHub Actions workflow files for inefficiencies."""
bottlenecks = []
workflow_dir = os.path.join(repo_path, ".github", "workflows")
if not os.path.isdir(workflow_dir):
return bottlenecks
slow_patterns = [
(r"runs-on:\s*ubuntu-latest", 0, "Consider caching dependencies between runs"),
(r"npm install", 2, "Use npm ci instead of npm install for reproducible builds"),
(r"pip install(?!.*--cache-dir)", 2, "Add --cache-dir or use pip cache action"),
(r"docker build(?!.*--cache-from)", 5, "Use Docker layer caching"),
(r"python -m pytest(?!.*-n|--numprocesses)", 3, "Consider pytest-xdist for parallel test execution"),
]
for fname in os.listdir(workflow_dir):
if not fname.endswith(('.yml', '.yaml')):
continue
fpath = os.path.join(workflow_dir, fname)
try:
with open(fpath) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
for pattern, est_savings, recommendation in slow_patterns:
if re.search(pattern, content):
bottlenecks.append(Bottleneck(
category="ci",
name=f"{fname}: {pattern}",
duration_s=est_savings,
severity="info",
recommendation=recommendation,
file_path=f".github/workflows/{fname}"
))
return bottlenecks
def analyze_gitea_ci(repo_path: str) -> List[Bottleneck]:
"""Analyze Gitea/Drone CI config files."""
bottlenecks = []
ci_files = [".gitea/workflows", ".drone.yml", ".woodpecker.yml"]
for ci_path in ci_files:
full_path = os.path.join(repo_path, ci_path)
if os.path.isfile(full_path):
try:
with open(full_path) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
if "pip install" in content and "--cache-dir" not in content:
bottlenecks.append(Bottleneck(
category="ci",
name=f"{ci_path}: pip without cache",
duration_s=5.0,
severity="warning",
recommendation="Add --cache-dir or mount pip cache volume",
file_path=ci_path
))
elif os.path.isdir(full_path):
for fname in os.listdir(full_path):
if not fname.endswith(('.yml', '.yaml')):
continue
fpath = os.path.join(full_path, fname)
try:
with open(fpath) as f:
content = f.read()
except (PermissionError, UnicodeDecodeError):
continue
if "pip install" in content and "--cache-dir" not in content:
bottlenecks.append(Bottleneck(
category="ci",
name=f"{ci_path}/{fname}: pip without cache",
duration_s=5.0,
severity="warning",
recommendation="Add --cache-dir or mount pip cache volume",
file_path=f"{ci_path}/{fname}"
))
return bottlenecks
# ── Import Analysis ────────────────────────────────────────────────
def find_slow_imports(repo_path: str) -> List[Bottleneck]:
"""Find Python files with heavy import chains."""
bottlenecks = []
heavy_imports = {
"pandas": 0.5,
"numpy": 0.3,
"torch": 2.0,
"tensorflow": 3.0,
"scipy": 0.5,
"matplotlib": 0.8,
"sklearn": 0.5,
"transformers": 1.5,
}
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith(('.', '__pycache__', 'node_modules'))]
for fname in files:
if not fname.endswith(".py"):
continue
fpath = os.path.join(root, fname)
rel_path = os.path.relpath(fpath, repo_path)
try:
with open(fpath) as f:
lines = f.readlines()
except (PermissionError, UnicodeDecodeError):
continue
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith("import ") or stripped.startswith("from "):
for heavy, est_time in heavy_imports.items():
if heavy in stripped:
bottlenecks.append(Bottleneck(
category="import",
name=f"{rel_path}:{i+1}: import {heavy}",
duration_s=est_time,
severity="info" if est_time < 1.0 else "warning",
recommendation=f"Heavy import ({heavy} ~{est_time}s). Consider lazy import or conditional import.",
file_path=rel_path,
line_number=i + 1
))
return bottlenecks
# ── Report Generation ──────────────────────────────────────────────
def severity_sort_key(b: Bottleneck) -> Tuple[int, float]:
"""Sort by severity then duration."""
sev_order = {"critical": 0, "warning": 1, "info": 2}
return (sev_order.get(b.severity, 3), -b.duration_s)
def generate_report(repo_path: str) -> PerfReport:
"""Run all analyses and generate a performance report."""
report = PerfReport(
timestamp=datetime.now(timezone.utc).isoformat(),
repo_path=os.path.abspath(repo_path)
)
# Collect all bottlenecks
all_bottlenecks = []
print("Scanning for slow tests (pytest cache)...")
all_bottlenecks.extend(find_slow_tests_pytest(repo_path))
print("Scanning for slow test patterns...")
all_bottlenecks.extend(find_slow_tests_by_scan(repo_path))
print("Analyzing build artifacts...")
all_bottlenecks.extend(analyze_build_artifacts(repo_path))
print("Analyzing Makefiles...")
all_bottlenecks.extend(analyze_makefile_targets(repo_path))
print("Analyzing CI workflows...")
all_bottlenecks.extend(analyze_github_actions(repo_path))
all_bottlenecks.extend(analyze_gitea_ci(repo_path))
print("Scanning for heavy imports...")
all_bottlenecks.extend(find_slow_imports(repo_path))
# Sort by severity and duration
all_bottlenecks.sort(key=severity_sort_key)
report.bottlenecks = all_bottlenecks[:TOP_N_BOTTLENECKS * 2] # Keep more for stats
# Compute summary
by_category = defaultdict(list)
for b in all_bottlenecks:
by_category[b.category].append(b)
report.summary = {
"total_bottlenecks": len(all_bottlenecks),
"critical": sum(1 for b in all_bottlenecks if b.severity == "critical"),
"warning": sum(1 for b in all_bottlenecks if b.severity == "warning"),
"info": sum(1 for b in all_bottlenecks if b.severity == "info"),
"estimated_total_slowdown_s": sum(b.duration_s for b in all_bottlenecks),
"by_category": {cat: len(items) for cat, items in by_category.items()},
}
report.test_stats = {
"slow_tests": len(by_category.get("test", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("test", [])),
}
report.build_stats = {
"build_issues": len(by_category.get("build", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("build", [])),
}
report.ci_stats = {
"ci_issues": len(by_category.get("ci", [])),
"total_estimated_s": sum(b.duration_s for b in by_category.get("ci", [])),
}
return report
def format_markdown(report: PerfReport) -> str:
"""Format report as markdown."""
lines = []
lines.append(f"# Performance Bottleneck Report")
lines.append(f"")
lines.append(f"Generated: {report.timestamp}")
lines.append(f"Repository: {report.repo_path}")
lines.append(f"")
# Summary
s = report.summary
lines.append(f"## Summary")
lines.append(f"")
lines.append(f"- **Total bottlenecks:** {s['total_bottlenecks']}")
lines.append(f"- **Critical:** {s['critical']} | **Warning:** {s['warning']} | **Info:** {s['info']}")
lines.append(f"- **Estimated total slowdown:** {s['estimated_total_slowdown_s']:.1f}s")
lines.append(f"- **By category:** {', '.join(f'{k}: {v}' for k, v in s['by_category'].items())}")
lines.append(f"")
# Top bottlenecks
lines.append(f"## Top {min(TOP_N_BOTTLENECKS, len(report.bottlenecks))} Bottlenecks")
lines.append(f"")
for i, b in enumerate(report.bottlenecks[:TOP_N_BOTTLENECKS], 1):
icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(b.severity, "")
loc = f" ({b.file_path}:{b.line_number})" if b.file_path else ""
lines.append(f"{i}. {icon} **{b.category}** — {b.name}{loc}")
lines.append(f" - Duration: ~{b.duration_s:.1f}s | Severity: {b.severity}")
lines.append(f" - Fix: {b.recommendation}")
lines.append(f"")
# Category breakdowns
for cat in ["test", "build", "ci", "import"]:
items = [b for b in report.bottlenecks if b.category == cat]
if items:
lines.append(f"## {cat.title()} Bottlenecks")
lines.append(f"")
for b in items:
icon = {"critical": "🔴", "warning": "🟡", "info": "🔵"}.get(b.severity, "")
loc = f" ({b.file_path}:{b.line_number})" if b.file_path else ""
lines.append(f"- {icon} {b.name}{loc} — ~{b.duration_s:.1f}s — {b.recommendation}")
lines.append(f"")
return "
".join(lines)
# ── Main ───────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Performance Bottleneck Finder")
parser.add_argument("--repo", default=".", help="Path to repository to analyze")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--report", help="Write markdown report to file")
parser.add_argument("--threshold", type=float, default=SLOW_TEST_THRESHOLD_S,
help="Slow test threshold in seconds")
args = parser.parse_args()
global SLOW_TEST_THRESHOLD_S
SLOW_TEST_THRESHOLD_S = args.threshold
if not os.path.isdir(args.repo):
print(f"Error: {args.repo} is not a directory", file=sys.stderr)
sys.exit(1)
report = generate_report(args.repo)
if args.json:
print(json.dumps(report.to_dict(), indent=2))
else:
md = format_markdown(report)
if args.report:
os.makedirs(os.path.dirname(args.report) or ".", exist_ok=True)
with open(args.report, "w") as f:
f.write(md)
print(f"Report written to {args.report}")
else:
print(md)
# Exit code: 1 if critical bottlenecks found
if report.summary.get("critical", 0) > 0:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,129 @@
#!/usr/bin/env python3
"""Tests for scripts/knowledge_staleness_check.py — 8 tests."""
import json
import os
import sys
import tempfile
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("ks", os.path.join(os.path.dirname(__file__) or ".", "knowledge_staleness_check.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
check_staleness = mod.check_staleness
fix_hashes = mod.fix_hashes
compute_file_hash = mod.compute_file_hash
def test_fresh_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("print('hello')")
h = compute_file_hash(src)
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "hello", "source_file": "source.py", "source_hash": h}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "fresh"
print("PASS: test_fresh_entry")
def test_stale_entry():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("original content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "old", "source_file": "source.py", "source_hash": "sha256:wrong"}]}, f)
# Now change the source
with open(src, "w") as f:
f.write("modified content")
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "stale"
print("PASS: test_stale_entry")
def test_missing_source():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "gone", "source_file": "nonexistent.py", "source_hash": "sha256:abc"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "missing_source"
print("PASS: test_missing_source")
def test_no_hash():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "no hash", "source_file": "source.py"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_hash"
assert results[0]["current_hash"].startswith("sha256:")
print("PASS: test_no_hash")
def test_no_source_field():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "orphan"}]}, f)
results = check_staleness(idx, tmpdir)
assert results[0]["status"] == "no_source"
print("PASS: test_no_source_field")
def test_fix_hashes():
with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f:
f.write("content for hashing")
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": [{"fact": "needs hash", "source_file": "source.py"}]}, f)
fixed = fix_hashes(idx, tmpdir)
assert fixed == 1
# Verify hash was added
with open(idx) as f:
data = json.load(f)
assert data["facts"][0]["source_hash"].startswith("sha256:")
print("PASS: test_fix_hashes")
def test_empty_index():
with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f:
json.dump({"facts": []}, f)
results = check_staleness(idx, tmpdir)
assert results == []
print("PASS: test_empty_index")
def test_compute_hash_nonexistent():
h = compute_file_hash("/nonexistent/path/file.py")
assert h is None
print("PASS: test_compute_hash_nonexistent")
def run_all():
test_fresh_entry()
test_stale_entry()
test_missing_source()
test_no_hash()
test_no_source_field()
test_fix_hashes()
test_empty_index()
test_compute_hash_nonexistent()
print("\nAll 8 tests passed!")
if __name__ == "__main__":
run_all()

View File

@@ -0,0 +1,319 @@
#!/usr/bin/env python3
"""
Tests for Performance Bottleneck Finder.
"""
import json
import os
import tempfile
import textwrap
from pathlib import Path
import pytest
# Add scripts to path
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "scripts"))
from perf_bottleneck_finder import (
Bottleneck,
PerfReport,
find_slow_tests_by_scan,
analyze_build_artifacts,
analyze_makefile_targets,
find_slow_imports,
generate_report,
format_markdown,
severity_sort_key,
)
class TestBottleneck:
"""Test Bottleneck dataclass."""
def test_creation(self):
b = Bottleneck(
category="test",
name="test_foo",
duration_s=5.0,
severity="warning",
recommendation="Mock it"
)
assert b.category == "test"
assert b.name == "test_foo"
assert b.duration_s == 5.0
assert b.severity == "warning"
assert b.recommendation == "Mock it"
assert b.file_path is None
assert b.line_number is None
def test_with_location(self):
b = Bottleneck(
category="test",
name="test_bar",
duration_s=2.0,
severity="info",
recommendation="Consider",
file_path="tests/test_bar.py",
line_number=42
)
assert b.file_path == "tests/test_bar.py"
assert b.line_number == 42
def test_to_dict(self):
b = Bottleneck("test", "x", 1.0, "info", "y")
d = b.__dict__
assert "category" in d
assert "duration_s" in d
class TestPerfReport:
"""Test PerfReport dataclass."""
def test_creation(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo"
)
assert report.timestamp == "2026-01-01T00:00:00Z"
assert report.bottlenecks == []
assert report.summary == {}
def test_to_dict(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
bottlenecks=[Bottleneck("test", "x", 1.0, "info", "y")]
)
d = report.to_dict()
assert "bottlenecks" in d
assert len(d["bottlenecks"]) == 1
class TestSeveritySort:
"""Test severity sorting."""
def test_critical_first(self):
items = [
Bottleneck("test", "a", 1.0, "info", ""),
Bottleneck("test", "b", 0.5, "critical", ""),
Bottleneck("test", "c", 2.0, "warning", ""),
]
items.sort(key=severity_sort_key)
assert items[0].severity == "critical"
assert items[1].severity == "warning"
assert items[2].severity == "info"
def test_duration_within_severity(self):
items = [
Bottleneck("test", "slow", 10.0, "warning", ""),
Bottleneck("test", "fast", 1.0, "warning", ""),
]
items.sort(key=severity_sort_key)
assert items[0].name == "slow" # Higher duration first within same severity
class TestSlowTestScan:
"""Test slow test pattern scanning."""
def test_finds_sleep(self, tmp_path):
test_file = tmp_path / "test_sleepy.py"
test_file.write_text(textwrap.dedent('''
import time
def test_slow():
time.sleep(5)
assert True
'''))
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("sleep" in b.recommendation.lower() for b in bottlenecks)
def test_finds_http_calls(self, tmp_path):
test_file = tmp_path / "test_http.py"
test_file.write_text(textwrap.dedent('''
import requests
def test_api():
resp = requests.get("https://example.com")
assert resp.status_code == 200
'''))
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("HTTP" in b.recommendation or "mock" in b.recommendation.lower() for b in bottlenecks)
def test_skips_non_test_files(self, tmp_path):
src_file = tmp_path / "main.py"
src_file.write_text("import time\ntime.sleep(10)\n")
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) == 0
def test_handles_missing_dir(self):
bottlenecks = find_slow_tests_by_scan("/nonexistent/path")
assert bottlenecks == []
def test_file_path_populated(self, tmp_path):
test_file = tmp_path / "test_example.py"
test_file.write_text("import time\n\ndef test_it():\n time.sleep(2)\n")
bottlenecks = find_slow_tests_by_scan(str(tmp_path))
assert len(bottlenecks) >= 1
assert bottlenecks[0].file_path is not None
assert bottlenecks[0].line_number is not None
class TestBuildArtifacts:
"""Test build artifact analysis."""
def test_finds_large_node_modules(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
# Create a file > 10MB
big_file = nm / "big.txt"
big_file.write_bytes(b"x" * (11 * 1024 * 1024))
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("node_modules" in b.name for b in bottlenecks)
def test_ignores_small_dirs(self, tmp_path):
nm = tmp_path / "node_modules"
nm.mkdir()
small_file = nm / "small.txt"
small_file.write_bytes(b"x" * 100)
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert not any("node_modules" in b.name for b in bottlenecks)
def test_finds_pycache(self, tmp_path):
cache = tmp_path / "__pycache__"
cache.mkdir()
big_file = cache / "big.pyc"
big_file.write_bytes(b"x" * (11 * 1024 * 1024))
bottlenecks = analyze_build_artifacts(str(tmp_path))
assert any("__pycache__" in b.name for b in bottlenecks)
class TestMakefileAnalysis:
"""Test Makefile analysis."""
def test_finds_pip_install(self, tmp_path):
makefile = tmp_path / "Makefile"
makefile.write_text(textwrap.dedent('''
install:
pip install -r requirements.txt
test:
pytest
'''))
bottlenecks = analyze_makefile_targets(str(tmp_path))
assert len(bottlenecks) >= 1
def test_no_makefile(self, tmp_path):
bottlenecks = analyze_makefile_targets(str(tmp_path))
assert bottlenecks == []
class TestImportAnalysis:
"""Test heavy import detection."""
def test_finds_pandas(self, tmp_path):
src = tmp_path / "analysis.py"
src.write_text("import pandas as pd\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert len(bottlenecks) >= 1
assert any("pandas" in b.name for b in bottlenecks)
def test_finds_torch(self, tmp_path):
src = tmp_path / "model.py"
src.write_text("import torch\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert any("torch" in b.name for b in bottlenecks)
def test_skips_light_imports(self, tmp_path):
src = tmp_path / "utils.py"
src.write_text("import json\nimport os\nimport sys\n")
bottlenecks = find_slow_imports(str(tmp_path))
assert len(bottlenecks) == 0
class TestGenerateReport:
"""Test full report generation."""
def test_empty_repo(self, tmp_path):
report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 0
assert "critical" in report.summary
assert "warning" in report.summary
def test_with_findings(self, tmp_path):
# Create a test file with issues
test_file = tmp_path / "test_slow.py"
test_file.write_text(textwrap.dedent('''
import time
import requests
def test_sleepy():
time.sleep(3)
def test_http():
requests.get("https://example.com")
'''))
report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 2
assert len(report.bottlenecks) > 0
def test_summary_categories(self, tmp_path):
report = generate_report(str(tmp_path))
assert "by_category" in report.summary
class TestMarkdownReport:
"""Test markdown output."""
def test_format(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
bottlenecks=[
Bottleneck("test", "slow_test", 5.0, "critical", "Fix it")
],
summary={
"total_bottlenecks": 1,
"critical": 1,
"warning": 0,
"info": 0,
"estimated_total_slowdown_s": 5.0,
"by_category": {"test": 1},
}
)
md = format_markdown(report)
assert "# Performance Bottleneck Report" in md
assert "slow_test" in md
assert "🔴" in md
assert "Fix it" in md
def test_empty_report(self):
report = PerfReport(
timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo",
summary={
"total_bottlenecks": 0,
"critical": 0,
"warning": 0,
"info": 0,
"estimated_total_slowdown_s": 0,
"by_category": {},
}
)
md = format_markdown(report)
assert "Total bottlenecks:** 0" in md