558 lines
21 KiB
Python
558 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""Generate a deterministic GENOME.md for a repository."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import os
|
|
import re
|
|
from pathlib import Path
|
|
from typing import NamedTuple
|
|
|
|
|
|
IGNORED_DIRS = {
|
|
".git",
|
|
".hg",
|
|
".svn",
|
|
".venv",
|
|
"venv",
|
|
"node_modules",
|
|
"__pycache__",
|
|
".mypy_cache",
|
|
".pytest_cache",
|
|
"dist",
|
|
"build",
|
|
"coverage",
|
|
}
|
|
|
|
TEXT_SUFFIXES = {
|
|
".py",
|
|
".js",
|
|
".mjs",
|
|
".cjs",
|
|
".ts",
|
|
".tsx",
|
|
".jsx",
|
|
".html",
|
|
".css",
|
|
".md",
|
|
".txt",
|
|
".json",
|
|
".yaml",
|
|
".yml",
|
|
".sh",
|
|
".ini",
|
|
".cfg",
|
|
".toml",
|
|
}
|
|
|
|
SOURCE_SUFFIXES = {".py", ".js", ".mjs", ".cjs", ".ts", ".tsx", ".jsx", ".sh"}
|
|
DOC_FILENAMES = {"README.md", "CONTRIBUTING.md", "SOUL.md"}
|
|
|
|
|
|
class RepoFile(NamedTuple):
|
|
path: str
|
|
abs_path: Path
|
|
size_bytes: int
|
|
line_count: int
|
|
kind: str
|
|
|
|
|
|
class RunSummary(NamedTuple):
|
|
markdown: str
|
|
source_count: int
|
|
test_count: int
|
|
doc_count: int
|
|
|
|
|
|
def _is_text_file(path: Path) -> bool:
|
|
return path.suffix.lower() in TEXT_SUFFIXES or path.name in {"Dockerfile", "Makefile"}
|
|
|
|
|
|
def _file_kind(rel_path: str, path: Path) -> str:
|
|
suffix = path.suffix.lower()
|
|
if rel_path.startswith("tests/") or path.name.startswith("test_"):
|
|
return "test"
|
|
if rel_path.startswith("docs/") or path.name in DOC_FILENAMES or suffix == ".md":
|
|
return "doc"
|
|
if suffix in {".json", ".yaml", ".yml", ".toml", ".ini", ".cfg"}:
|
|
return "config"
|
|
if suffix == ".sh":
|
|
return "script"
|
|
if rel_path.startswith("scripts/") and suffix == ".py" and path.name != "__init__.py":
|
|
return "script"
|
|
if suffix in SOURCE_SUFFIXES:
|
|
return "source"
|
|
return "other"
|
|
|
|
|
|
def collect_repo_files(repo_root: str | Path) -> list[RepoFile]:
|
|
root = Path(repo_root).resolve()
|
|
files: list[RepoFile] = []
|
|
for current_root, dirnames, filenames in os.walk(root):
|
|
dirnames[:] = sorted(d for d in dirnames if d not in IGNORED_DIRS)
|
|
base = Path(current_root)
|
|
for filename in sorted(filenames):
|
|
path = base / filename
|
|
if not _is_text_file(path):
|
|
continue
|
|
rel_path = path.relative_to(root).as_posix()
|
|
text = path.read_text(encoding="utf-8", errors="replace")
|
|
files.append(
|
|
RepoFile(
|
|
path=rel_path,
|
|
abs_path=path,
|
|
size_bytes=path.stat().st_size,
|
|
line_count=max(1, len(text.splitlines())),
|
|
kind=_file_kind(rel_path, path),
|
|
)
|
|
)
|
|
return sorted(files, key=lambda item: item.path)
|
|
|
|
|
|
def _safe_text(path: Path) -> str:
|
|
return path.read_text(encoding="utf-8", errors="replace")
|
|
|
|
|
|
def _sanitize_node_id(name: str) -> str:
|
|
cleaned = re.sub(r"[^A-Za-z0-9_]", "_", name)
|
|
return cleaned or "node"
|
|
|
|
|
|
def _component_name(path: str) -> str:
|
|
if "/" in path:
|
|
return path.split("/", 1)[0]
|
|
return Path(path).stem or path
|
|
|
|
|
|
def _priority_files(files: list[RepoFile], kinds: tuple[str, ...], limit: int = 8) -> list[RepoFile]:
|
|
items = [item for item in files if item.kind in kinds]
|
|
items.sort(key=lambda item: (-int(item.path.count("/") == 0), -item.line_count, item.path))
|
|
return items[:limit]
|
|
|
|
|
|
def _readme_summary(root: Path) -> str:
|
|
readme = root / "README.md"
|
|
if not readme.exists():
|
|
return "Repository-specific overview missing from README.md. Genome generated from code structure and tests."
|
|
paragraphs: list[str] = []
|
|
current: list[str] = []
|
|
for raw_line in _safe_text(readme).splitlines():
|
|
line = raw_line.strip()
|
|
if not line:
|
|
if current:
|
|
paragraphs.append(" ".join(current).strip())
|
|
current = []
|
|
continue
|
|
if line.startswith("#"):
|
|
continue
|
|
current.append(line)
|
|
if current:
|
|
paragraphs.append(" ".join(current).strip())
|
|
return paragraphs[0] if paragraphs else "README.md exists but does not contain a prose overview paragraph."
|
|
|
|
|
|
def _extract_python_imports(text: str) -> set[str]:
|
|
try:
|
|
tree = ast.parse(text)
|
|
except SyntaxError:
|
|
return set()
|
|
imports: set[str] = set()
|
|
for node in ast.walk(tree):
|
|
if isinstance(node, ast.Import):
|
|
for alias in node.names:
|
|
imports.add(alias.name.split(".", 1)[0])
|
|
elif isinstance(node, ast.ImportFrom):
|
|
if node.module:
|
|
imports.add(node.module.split(".", 1)[0])
|
|
return imports
|
|
|
|
|
|
def _extract_python_symbols(text: str) -> tuple[list[tuple[str, int]], list[tuple[str, int]]]:
|
|
try:
|
|
tree = ast.parse(text)
|
|
except SyntaxError:
|
|
return [], []
|
|
classes: list[tuple[str, int]] = []
|
|
functions: list[tuple[str, int]] = []
|
|
for node in tree.body:
|
|
if isinstance(node, ast.ClassDef):
|
|
classes.append((node.name, node.lineno))
|
|
elif isinstance(node, ast.FunctionDef):
|
|
functions.append((node.name, node.lineno))
|
|
return classes, functions
|
|
|
|
|
|
def _build_component_edges(files: list[RepoFile]) -> list[tuple[str, str]]:
|
|
known_components = {_component_name(item.path) for item in files if item.kind in {"source", "script", "test"}}
|
|
edges: set[tuple[str, str]] = set()
|
|
for item in files:
|
|
if item.kind not in {"source", "script", "test"} or item.abs_path.suffix.lower() != ".py":
|
|
continue
|
|
src = _component_name(item.path)
|
|
imports = _extract_python_imports(_safe_text(item.abs_path))
|
|
for imported in imports:
|
|
if imported in known_components and imported != src:
|
|
edges.add((src, imported))
|
|
return sorted(edges)
|
|
|
|
|
|
def _render_mermaid(files: list[RepoFile]) -> str:
|
|
components = sorted(
|
|
{
|
|
_component_name(item.path)
|
|
for item in files
|
|
if item.kind in {"source", "script", "test", "config"}
|
|
and not _component_name(item.path).startswith(".")
|
|
}
|
|
)
|
|
edges = _build_component_edges(files)
|
|
lines = ["graph TD"]
|
|
if not components:
|
|
lines.append(" repo[\"repository\"]")
|
|
return "\n".join(lines)
|
|
|
|
for component in components[:10]:
|
|
node_id = _sanitize_node_id(component)
|
|
lines.append(f" {node_id}[\"{component}\"]")
|
|
|
|
seen_components = set(components[:10])
|
|
emitted = False
|
|
for src, dst in edges:
|
|
if src in seen_components and dst in seen_components:
|
|
lines.append(f" {_sanitize_node_id(src)} --> {_sanitize_node_id(dst)}")
|
|
emitted = True
|
|
if not emitted:
|
|
root_id = "repo_root"
|
|
lines.insert(1, f" {root_id}[\"repo\"]")
|
|
for component in components[:6]:
|
|
lines.append(f" {root_id} --> {_sanitize_node_id(component)}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _entry_points(files: list[RepoFile]) -> list[dict[str, str]]:
|
|
points: list[dict[str, str]] = []
|
|
for item in files:
|
|
text = _safe_text(item.abs_path)
|
|
if item.kind == "script":
|
|
points.append({"path": item.path, "reason": "operational script", "command": f"python3 {item.path}" if item.abs_path.suffix == ".py" else f"bash {item.path}"})
|
|
continue
|
|
if item.abs_path.suffix == ".py" and "if __name__ == '__main__':" in text:
|
|
points.append({"path": item.path, "reason": "python main guard", "command": f"python3 {item.path}"})
|
|
elif item.path in {"app.py", "server.py", "main.py"}:
|
|
points.append({"path": item.path, "reason": "top-level executable", "command": f"python3 {item.path}"})
|
|
seen: set[str] = set()
|
|
deduped: list[dict[str, str]] = []
|
|
for point in points:
|
|
if point["path"] in seen:
|
|
continue
|
|
seen.add(point["path"])
|
|
deduped.append(point)
|
|
return deduped[:12]
|
|
|
|
|
|
def _test_coverage(files: list[RepoFile]) -> tuple[list[RepoFile], list[RepoFile], list[RepoFile]]:
|
|
source_files = [
|
|
item
|
|
for item in files
|
|
if item.kind in {"source", "script"}
|
|
and item.path not in {"pipelines/codebase-genome.py", "pipelines/codebase_genome.py"}
|
|
and not item.path.endswith("/__init__.py")
|
|
]
|
|
test_files = [item for item in files if item.kind == "test"]
|
|
combined_test_text = "\n".join(_safe_text(item.abs_path) for item in test_files)
|
|
entry_paths = {point["path"] for point in _entry_points(files)}
|
|
|
|
gaps: list[RepoFile] = []
|
|
for item in source_files:
|
|
stem = item.abs_path.stem
|
|
if item.path in entry_paths:
|
|
continue
|
|
if stem and stem in combined_test_text:
|
|
continue
|
|
gaps.append(item)
|
|
gaps.sort(key=lambda item: (-item.line_count, item.path))
|
|
return source_files, test_files, gaps
|
|
|
|
|
|
def _security_findings(files: list[RepoFile]) -> list[dict[str, str]]:
|
|
rules = [
|
|
("high", "shell execution", re.compile(r"shell\s*=\s*True"), "shell=True expands blast radius for command execution"),
|
|
("high", "dynamic evaluation", re.compile(r"\b(eval|exec)\s*\("), "dynamic evaluation bypasses static guarantees"),
|
|
("medium", "unsafe deserialization", re.compile(r"pickle\.load\(|yaml\.load\("), "deserialization of untrusted data can execute code"),
|
|
("medium", "network egress", re.compile(r"urllib\.request\.urlopen\(|requests\.(get|post|put|delete)\("), "outbound network calls create runtime dependency and failure surface"),
|
|
("medium", "hardcoded http endpoint", re.compile(r"http://[^\s\"']+"), "plaintext or fixed HTTP endpoints can drift or leak across environments"),
|
|
]
|
|
findings: list[dict[str, str]] = []
|
|
for item in files:
|
|
if item.kind not in {"source", "script", "config"}:
|
|
continue
|
|
for lineno, line in enumerate(_safe_text(item.abs_path).splitlines(), start=1):
|
|
for severity, category, pattern, detail in rules:
|
|
if pattern.search(line):
|
|
findings.append(
|
|
{
|
|
"severity": severity,
|
|
"category": category,
|
|
"ref": f"{item.path}:{lineno}",
|
|
"line": line.strip(),
|
|
"detail": detail,
|
|
}
|
|
)
|
|
break
|
|
if len(findings) >= 12:
|
|
return findings
|
|
return findings
|
|
|
|
|
|
def _dead_code_candidates(files: list[RepoFile]) -> list[RepoFile]:
|
|
source_files = [item for item in files if item.kind in {"source", "script"} and item.abs_path.suffix == ".py"]
|
|
imports_by_file = {
|
|
item.path: _extract_python_imports(_safe_text(item.abs_path))
|
|
for item in source_files
|
|
}
|
|
imported_names = {name for imports in imports_by_file.values() for name in imports}
|
|
referenced_by_tests = "\n".join(_safe_text(item.abs_path) for item in files if item.kind == "test")
|
|
entry_paths = {point["path"] for point in _entry_points(files)}
|
|
|
|
candidates: list[RepoFile] = []
|
|
for item in source_files:
|
|
stem = item.abs_path.stem
|
|
if item.path in entry_paths:
|
|
continue
|
|
if stem in imported_names:
|
|
continue
|
|
if stem in referenced_by_tests:
|
|
continue
|
|
if stem in {"__init__", "conftest"}:
|
|
continue
|
|
candidates.append(item)
|
|
candidates.sort(key=lambda item: (-item.line_count, item.path))
|
|
return candidates[:10]
|
|
|
|
|
|
def _performance_findings(files: list[RepoFile]) -> list[dict[str, str]]:
|
|
findings: list[dict[str, str]] = []
|
|
for item in files:
|
|
if item.kind in {"source", "script"} and item.line_count >= 350:
|
|
findings.append({
|
|
"ref": item.path,
|
|
"detail": f"large module ({item.line_count} lines) likely hides multiple responsibilities",
|
|
})
|
|
for item in files:
|
|
if item.kind not in {"source", "script"}:
|
|
continue
|
|
text = _safe_text(item.abs_path)
|
|
if "os.walk(" in text or ".rglob(" in text or "glob.glob(" in text:
|
|
findings.append({
|
|
"ref": item.path,
|
|
"detail": "per-run filesystem scan detected; performance scales with repo size",
|
|
})
|
|
if "urllib.request.urlopen(" in text or "requests.get(" in text or "requests.post(" in text:
|
|
findings.append({
|
|
"ref": item.path,
|
|
"detail": "network-bound execution path can dominate runtime and create flaky throughput",
|
|
})
|
|
deduped: list[dict[str, str]] = []
|
|
seen: set[tuple[str, str]] = set()
|
|
for finding in findings:
|
|
key = (finding["ref"], finding["detail"])
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(finding)
|
|
return deduped[:10]
|
|
|
|
|
|
def _key_abstractions(files: list[RepoFile]) -> list[dict[str, object]]:
|
|
abstractions: list[dict[str, object]] = []
|
|
for item in _priority_files(files, ("source", "script"), limit=10):
|
|
if item.abs_path.suffix != ".py":
|
|
continue
|
|
classes, functions = _extract_python_symbols(_safe_text(item.abs_path))
|
|
if not classes and not functions:
|
|
continue
|
|
abstractions.append(
|
|
{
|
|
"path": item.path,
|
|
"classes": classes[:4],
|
|
"functions": [entry for entry in functions[:6] if not entry[0].startswith("_")],
|
|
}
|
|
)
|
|
return abstractions[:8]
|
|
|
|
|
|
def _api_surface(entry_points: list[dict[str, str]], abstractions: list[dict[str, object]]) -> list[str]:
|
|
api_lines: list[str] = []
|
|
for entry in entry_points[:8]:
|
|
api_lines.append(f"- CLI: `{entry['command']}` — {entry['reason']} (`{entry['path']}`)")
|
|
for abstraction in abstractions[:5]:
|
|
for func_name, lineno in abstraction["functions"]:
|
|
api_lines.append(f"- Python: `{func_name}()` from `{abstraction['path']}:{lineno}`")
|
|
if len(api_lines) >= 14:
|
|
return api_lines
|
|
return api_lines
|
|
|
|
|
|
def _data_flow(entry_points: list[dict[str, str]], files: list[RepoFile], gaps: list[RepoFile]) -> list[str]:
|
|
components = sorted(
|
|
{
|
|
_component_name(item.path)
|
|
for item in files
|
|
if item.kind in {"source", "script", "test", "config"} and not _component_name(item.path).startswith(".")
|
|
}
|
|
)
|
|
lines = []
|
|
if entry_points:
|
|
lines.append(f"1. Operators enter through {', '.join(f'`{item['path']}`' for item in entry_points[:3])}.")
|
|
else:
|
|
lines.append("1. No explicit CLI/main guard entry point was detected; execution appears library- or doc-driven.")
|
|
if components:
|
|
lines.append(f"2. Core logic fans into top-level components: {', '.join(f'`{name}`' for name in components[:6])}.")
|
|
if gaps:
|
|
lines.append(f"3. Validation is incomplete around {', '.join(f'`{item.path}`' for item in gaps[:3])}, so changes there carry regression risk.")
|
|
else:
|
|
lines.append("3. Tests appear to reference the currently indexed source set, reducing blind spots in the hot path.")
|
|
lines.append("4. Final artifacts land as repository files, docs, or runtime side effects depending on the selected entry point.")
|
|
return lines
|
|
|
|
|
|
def generate_genome_markdown(repo_root: str | Path, repo_name: str | None = None) -> str:
|
|
root = Path(repo_root).resolve()
|
|
files = collect_repo_files(root)
|
|
repo_display = repo_name or root.name
|
|
summary = _readme_summary(root)
|
|
entry_points = _entry_points(files)
|
|
source_files, test_files, coverage_gaps = _test_coverage(files)
|
|
security = _security_findings(files)
|
|
dead_code = _dead_code_candidates(files)
|
|
performance = _performance_findings(files)
|
|
abstractions = _key_abstractions(files)
|
|
api_surface = _api_surface(entry_points, abstractions)
|
|
data_flow = _data_flow(entry_points, files, coverage_gaps)
|
|
mermaid = _render_mermaid(files)
|
|
|
|
lines: list[str] = [
|
|
f"# GENOME.md — {repo_display}",
|
|
"",
|
|
"Generated by `pipelines/codebase_genome.py`.",
|
|
"",
|
|
"## Project Overview",
|
|
"",
|
|
summary,
|
|
"",
|
|
f"- Text files indexed: {len(files)}",
|
|
f"- Source and script files: {len(source_files)}",
|
|
f"- Test files: {len(test_files)}",
|
|
f"- Documentation files: {len([item for item in files if item.kind == 'doc'])}",
|
|
"",
|
|
"## Architecture",
|
|
"",
|
|
"```mermaid",
|
|
mermaid,
|
|
"```",
|
|
"",
|
|
"## Entry Points",
|
|
"",
|
|
]
|
|
|
|
if entry_points:
|
|
for item in entry_points:
|
|
lines.append(f"- `{item['path']}` — {item['reason']} (`{item['command']}`)")
|
|
else:
|
|
lines.append("- No explicit entry point detected.")
|
|
|
|
lines.extend(["", "## Data Flow", ""])
|
|
lines.extend(data_flow)
|
|
|
|
lines.extend(["", "## Key Abstractions", ""])
|
|
if abstractions:
|
|
for abstraction in abstractions:
|
|
path = abstraction["path"]
|
|
classes = abstraction["classes"]
|
|
functions = abstraction["functions"]
|
|
class_bits = ", ".join(f"`{name}`:{lineno}" for name, lineno in classes) or "none detected"
|
|
function_bits = ", ".join(f"`{name}()`:{lineno}" for name, lineno in functions) or "none detected"
|
|
lines.append(f"- `{path}` — classes {class_bits}; functions {function_bits}")
|
|
else:
|
|
lines.append("- No Python classes or top-level functions detected in the highest-priority source files.")
|
|
|
|
lines.extend(["", "## API Surface", ""])
|
|
if api_surface:
|
|
lines.extend(api_surface)
|
|
else:
|
|
lines.append("- No obvious public API surface detected.")
|
|
|
|
lines.extend(["", "## Test Coverage Report", ""])
|
|
lines.append(f"- Source and script files inspected: {len(source_files)}")
|
|
lines.append(f"- Test files inspected: {len(test_files)}")
|
|
if coverage_gaps:
|
|
lines.append("- Coverage gaps:")
|
|
for item in coverage_gaps[:12]:
|
|
lines.append(f" - `{item.path}` — no matching test reference detected")
|
|
else:
|
|
lines.append("- No obvious coverage gaps detected by the stem-matching heuristic.")
|
|
|
|
lines.extend(["", "## Security Audit Findings", ""])
|
|
if security:
|
|
for finding in security:
|
|
lines.append(
|
|
f"- [{finding['severity']}] `{finding['ref']}` — {finding['category']}: {finding['detail']}. Evidence: `{finding['line']}`"
|
|
)
|
|
else:
|
|
lines.append("- No high-signal security findings detected by the static heuristics in this pass.")
|
|
|
|
lines.extend(["", "## Dead Code Candidates", ""])
|
|
if dead_code:
|
|
for item in dead_code:
|
|
lines.append(f"- `{item.path}` — not imported by indexed Python modules and not referenced by tests")
|
|
else:
|
|
lines.append("- No obvious dead-code candidates detected.")
|
|
|
|
lines.extend(["", "## Performance Bottleneck Analysis", ""])
|
|
if performance:
|
|
for finding in performance:
|
|
lines.append(f"- `{finding['ref']}` — {finding['detail']}")
|
|
else:
|
|
lines.append("- No obvious performance hotspots detected by the static heuristics in this pass.")
|
|
|
|
return "\n".join(lines).rstrip() + "\n"
|
|
|
|
|
|
def write_genome(repo_root: str | Path, repo_name: str | None = None, output_path: str | Path | None = None) -> RunSummary:
|
|
root = Path(repo_root).resolve()
|
|
markdown = generate_genome_markdown(root, repo_name=repo_name)
|
|
out_path = Path(output_path) if output_path else root / "GENOME.md"
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(markdown, encoding="utf-8")
|
|
files = collect_repo_files(root)
|
|
source_files, test_files, _ = _test_coverage(files)
|
|
return RunSummary(
|
|
markdown=markdown,
|
|
source_count=len(source_files),
|
|
test_count=len(test_files),
|
|
doc_count=len([item for item in files if item.kind == "doc"]),
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="Generate a deterministic GENOME.md for a repository")
|
|
parser.add_argument("--repo-root", required=True, help="Path to the repository to analyze")
|
|
parser.add_argument("--repo", dest="repo_name", default=None, help="Optional repo display name")
|
|
parser.add_argument("--repo-name", dest="repo_name_override", default=None, help="Optional repo display name")
|
|
parser.add_argument("--output", default=None, help="Path to write GENOME.md (defaults to <repo-root>/GENOME.md)")
|
|
args = parser.parse_args()
|
|
|
|
repo_name = args.repo_name_override or args.repo_name
|
|
summary = write_genome(args.repo_root, repo_name=repo_name, output_path=args.output)
|
|
target = Path(args.output) if args.output else Path(args.repo_root).resolve() / "GENOME.md"
|
|
print(
|
|
f"GENOME.md saved to {target} "
|
|
f"(sources={summary.source_count}, tests={summary.test_count}, docs={summary.doc_count})"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|