#!/usr/bin/env python3 """Generate a deterministic GENOME.md for a repository.""" from __future__ import annotations import argparse import ast import os import re from pathlib import Path from typing import NamedTuple IGNORED_DIRS = { ".git", ".hg", ".svn", ".venv", "venv", "node_modules", "__pycache__", ".mypy_cache", ".pytest_cache", "dist", "build", "coverage", } TEXT_SUFFIXES = { ".py", ".js", ".mjs", ".cjs", ".ts", ".tsx", ".jsx", ".html", ".css", ".md", ".txt", ".json", ".yaml", ".yml", ".sh", ".ini", ".cfg", ".toml", } SOURCE_SUFFIXES = {".py", ".js", ".mjs", ".cjs", ".ts", ".tsx", ".jsx", ".sh"} DOC_FILENAMES = {"README.md", "CONTRIBUTING.md", "SOUL.md"} class RepoFile(NamedTuple): path: str abs_path: Path size_bytes: int line_count: int kind: str class RunSummary(NamedTuple): markdown: str source_count: int test_count: int doc_count: int def _is_text_file(path: Path) -> bool: return path.suffix.lower() in TEXT_SUFFIXES or path.name in {"Dockerfile", "Makefile"} def _file_kind(rel_path: str, path: Path) -> str: suffix = path.suffix.lower() if rel_path.startswith("tests/") or path.name.startswith("test_"): return "test" if rel_path.startswith("docs/") or path.name in DOC_FILENAMES or suffix == ".md": return "doc" if suffix in {".json", ".yaml", ".yml", ".toml", ".ini", ".cfg"}: return "config" if suffix == ".sh": return "script" if rel_path.startswith("scripts/") and suffix == ".py" and path.name != "__init__.py": return "script" if suffix in SOURCE_SUFFIXES: return "source" return "other" def collect_repo_files(repo_root: str | Path) -> list[RepoFile]: root = Path(repo_root).resolve() files: list[RepoFile] = [] for current_root, dirnames, filenames in os.walk(root): dirnames[:] = sorted(d for d in dirnames if d not in IGNORED_DIRS) base = Path(current_root) for filename in sorted(filenames): path = base / filename if not _is_text_file(path): continue rel_path = path.relative_to(root).as_posix() text = path.read_text(encoding="utf-8", errors="replace") files.append( RepoFile( path=rel_path, abs_path=path, size_bytes=path.stat().st_size, line_count=max(1, len(text.splitlines())), kind=_file_kind(rel_path, path), ) ) return sorted(files, key=lambda item: item.path) def _safe_text(path: Path) -> str: return path.read_text(encoding="utf-8", errors="replace") def _sanitize_node_id(name: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9_]", "_", name) return cleaned or "node" def _component_name(path: str) -> str: if "/" in path: return path.split("/", 1)[0] return Path(path).stem or path def _priority_files(files: list[RepoFile], kinds: tuple[str, ...], limit: int = 8) -> list[RepoFile]: items = [item for item in files if item.kind in kinds] items.sort(key=lambda item: (-int(item.path.count("/") == 0), -item.line_count, item.path)) return items[:limit] def _readme_summary(root: Path) -> str: readme = root / "README.md" if not readme.exists(): return "Repository-specific overview missing from README.md. Genome generated from code structure and tests." paragraphs: list[str] = [] current: list[str] = [] for raw_line in _safe_text(readme).splitlines(): line = raw_line.strip() if not line: if current: paragraphs.append(" ".join(current).strip()) current = [] continue if line.startswith("#"): continue current.append(line) if current: paragraphs.append(" ".join(current).strip()) return paragraphs[0] if paragraphs else "README.md exists but does not contain a prose overview paragraph." def _extract_python_imports(text: str) -> set[str]: try: tree = ast.parse(text) except SyntaxError: return set() imports: set[str] = set() for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: imports.add(alias.name.split(".", 1)[0]) elif isinstance(node, ast.ImportFrom): if node.module: imports.add(node.module.split(".", 1)[0]) return imports def _extract_python_symbols(text: str) -> tuple[list[tuple[str, int]], list[tuple[str, int]]]: try: tree = ast.parse(text) except SyntaxError: return [], [] classes: list[tuple[str, int]] = [] functions: list[tuple[str, int]] = [] for node in tree.body: if isinstance(node, ast.ClassDef): classes.append((node.name, node.lineno)) elif isinstance(node, ast.FunctionDef): functions.append((node.name, node.lineno)) return classes, functions def _build_component_edges(files: list[RepoFile]) -> list[tuple[str, str]]: known_components = {_component_name(item.path) for item in files if item.kind in {"source", "script", "test"}} edges: set[tuple[str, str]] = set() for item in files: if item.kind not in {"source", "script", "test"} or item.abs_path.suffix.lower() != ".py": continue src = _component_name(item.path) imports = _extract_python_imports(_safe_text(item.abs_path)) for imported in imports: if imported in known_components and imported != src: edges.add((src, imported)) return sorted(edges) def _render_mermaid(files: list[RepoFile]) -> str: components = sorted( { _component_name(item.path) for item in files if item.kind in {"source", "script", "test", "config"} and not _component_name(item.path).startswith(".") } ) edges = _build_component_edges(files) lines = ["graph TD"] if not components: lines.append(" repo[\"repository\"]") return "\n".join(lines) for component in components[:10]: node_id = _sanitize_node_id(component) lines.append(f" {node_id}[\"{component}\"]") seen_components = set(components[:10]) emitted = False for src, dst in edges: if src in seen_components and dst in seen_components: lines.append(f" {_sanitize_node_id(src)} --> {_sanitize_node_id(dst)}") emitted = True if not emitted: root_id = "repo_root" lines.insert(1, f" {root_id}[\"repo\"]") for component in components[:6]: lines.append(f" {root_id} --> {_sanitize_node_id(component)}") return "\n".join(lines) def _entry_points(files: list[RepoFile]) -> list[dict[str, str]]: points: list[dict[str, str]] = [] for item in files: text = _safe_text(item.abs_path) if item.kind == "script": points.append({"path": item.path, "reason": "operational script", "command": f"python3 {item.path}" if item.abs_path.suffix == ".py" else f"bash {item.path}"}) continue if item.abs_path.suffix == ".py" and "if __name__ == '__main__':" in text: points.append({"path": item.path, "reason": "python main guard", "command": f"python3 {item.path}"}) elif item.path in {"app.py", "server.py", "main.py"}: points.append({"path": item.path, "reason": "top-level executable", "command": f"python3 {item.path}"}) seen: set[str] = set() deduped: list[dict[str, str]] = [] for point in points: if point["path"] in seen: continue seen.add(point["path"]) deduped.append(point) return deduped[:12] def _test_coverage(files: list[RepoFile]) -> tuple[list[RepoFile], list[RepoFile], list[RepoFile]]: source_files = [ item for item in files if item.kind in {"source", "script"} and item.path not in {"pipelines/codebase-genome.py", "pipelines/codebase_genome.py"} and not item.path.endswith("/__init__.py") ] test_files = [item for item in files if item.kind == "test"] combined_test_text = "\n".join(_safe_text(item.abs_path) for item in test_files) entry_paths = {point["path"] for point in _entry_points(files)} gaps: list[RepoFile] = [] for item in source_files: stem = item.abs_path.stem if item.path in entry_paths: continue if stem and stem in combined_test_text: continue gaps.append(item) gaps.sort(key=lambda item: (-item.line_count, item.path)) return source_files, test_files, gaps def _security_findings(files: list[RepoFile]) -> list[dict[str, str]]: rules = [ ("high", "shell execution", re.compile(r"shell\s*=\s*True"), "shell=True expands blast radius for command execution"), ("high", "dynamic evaluation", re.compile(r"\b(eval|exec)\s*\("), "dynamic evaluation bypasses static guarantees"), ("medium", "unsafe deserialization", re.compile(r"pickle\.load\(|yaml\.load\("), "deserialization of untrusted data can execute code"), ("medium", "network egress", re.compile(r"urllib\.request\.urlopen\(|requests\.(get|post|put|delete)\("), "outbound network calls create runtime dependency and failure surface"), ("medium", "hardcoded http endpoint", re.compile(r"http://[^\s\"']+"), "plaintext or fixed HTTP endpoints can drift or leak across environments"), ] findings: list[dict[str, str]] = [] for item in files: if item.kind not in {"source", "script", "config"}: continue for lineno, line in enumerate(_safe_text(item.abs_path).splitlines(), start=1): for severity, category, pattern, detail in rules: if pattern.search(line): findings.append( { "severity": severity, "category": category, "ref": f"{item.path}:{lineno}", "line": line.strip(), "detail": detail, } ) break if len(findings) >= 12: return findings return findings def _dead_code_candidates(files: list[RepoFile]) -> list[RepoFile]: source_files = [item for item in files if item.kind in {"source", "script"} and item.abs_path.suffix == ".py"] imports_by_file = { item.path: _extract_python_imports(_safe_text(item.abs_path)) for item in source_files } imported_names = {name for imports in imports_by_file.values() for name in imports} referenced_by_tests = "\n".join(_safe_text(item.abs_path) for item in files if item.kind == "test") entry_paths = {point["path"] for point in _entry_points(files)} candidates: list[RepoFile] = [] for item in source_files: stem = item.abs_path.stem if item.path in entry_paths: continue if stem in imported_names: continue if stem in referenced_by_tests: continue if stem in {"__init__", "conftest"}: continue candidates.append(item) candidates.sort(key=lambda item: (-item.line_count, item.path)) return candidates[:10] def _performance_findings(files: list[RepoFile]) -> list[dict[str, str]]: findings: list[dict[str, str]] = [] for item in files: if item.kind in {"source", "script"} and item.line_count >= 350: findings.append({ "ref": item.path, "detail": f"large module ({item.line_count} lines) likely hides multiple responsibilities", }) for item in files: if item.kind not in {"source", "script"}: continue text = _safe_text(item.abs_path) if "os.walk(" in text or ".rglob(" in text or "glob.glob(" in text: findings.append({ "ref": item.path, "detail": "per-run filesystem scan detected; performance scales with repo size", }) if "urllib.request.urlopen(" in text or "requests.get(" in text or "requests.post(" in text: findings.append({ "ref": item.path, "detail": "network-bound execution path can dominate runtime and create flaky throughput", }) deduped: list[dict[str, str]] = [] seen: set[tuple[str, str]] = set() for finding in findings: key = (finding["ref"], finding["detail"]) if key in seen: continue seen.add(key) deduped.append(finding) return deduped[:10] def _key_abstractions(files: list[RepoFile]) -> list[dict[str, object]]: abstractions: list[dict[str, object]] = [] for item in _priority_files(files, ("source", "script"), limit=10): if item.abs_path.suffix != ".py": continue classes, functions = _extract_python_symbols(_safe_text(item.abs_path)) if not classes and not functions: continue abstractions.append( { "path": item.path, "classes": classes[:4], "functions": [entry for entry in functions[:6] if not entry[0].startswith("_")], } ) return abstractions[:8] def _api_surface(entry_points: list[dict[str, str]], abstractions: list[dict[str, object]]) -> list[str]: api_lines: list[str] = [] for entry in entry_points[:8]: api_lines.append(f"- CLI: `{entry['command']}` — {entry['reason']} (`{entry['path']}`)") for abstraction in abstractions[:5]: for func_name, lineno in abstraction["functions"]: api_lines.append(f"- Python: `{func_name}()` from `{abstraction['path']}:{lineno}`") if len(api_lines) >= 14: return api_lines return api_lines def _data_flow(entry_points: list[dict[str, str]], files: list[RepoFile], gaps: list[RepoFile]) -> list[str]: components = sorted( { _component_name(item.path) for item in files if item.kind in {"source", "script", "test", "config"} and not _component_name(item.path).startswith(".") } ) lines = [] if entry_points: lines.append(f"1. Operators enter through {', '.join(f'`{item['path']}`' for item in entry_points[:3])}.") else: lines.append("1. No explicit CLI/main guard entry point was detected; execution appears library- or doc-driven.") if components: lines.append(f"2. Core logic fans into top-level components: {', '.join(f'`{name}`' for name in components[:6])}.") if gaps: lines.append(f"3. Validation is incomplete around {', '.join(f'`{item.path}`' for item in gaps[:3])}, so changes there carry regression risk.") else: lines.append("3. Tests appear to reference the currently indexed source set, reducing blind spots in the hot path.") lines.append("4. Final artifacts land as repository files, docs, or runtime side effects depending on the selected entry point.") return lines def generate_genome_markdown(repo_root: str | Path, repo_name: str | None = None) -> str: root = Path(repo_root).resolve() files = collect_repo_files(root) repo_display = repo_name or root.name summary = _readme_summary(root) entry_points = _entry_points(files) source_files, test_files, coverage_gaps = _test_coverage(files) security = _security_findings(files) dead_code = _dead_code_candidates(files) performance = _performance_findings(files) abstractions = _key_abstractions(files) api_surface = _api_surface(entry_points, abstractions) data_flow = _data_flow(entry_points, files, coverage_gaps) mermaid = _render_mermaid(files) lines: list[str] = [ f"# GENOME.md — {repo_display}", "", "Generated by `pipelines/codebase_genome.py`.", "", "## Project Overview", "", summary, "", f"- Text files indexed: {len(files)}", f"- Source and script files: {len(source_files)}", f"- Test files: {len(test_files)}", f"- Documentation files: {len([item for item in files if item.kind == 'doc'])}", "", "## Architecture", "", "```mermaid", mermaid, "```", "", "## Entry Points", "", ] if entry_points: for item in entry_points: lines.append(f"- `{item['path']}` — {item['reason']} (`{item['command']}`)") else: lines.append("- No explicit entry point detected.") lines.extend(["", "## Data Flow", ""]) lines.extend(data_flow) lines.extend(["", "## Key Abstractions", ""]) if abstractions: for abstraction in abstractions: path = abstraction["path"] classes = abstraction["classes"] functions = abstraction["functions"] class_bits = ", ".join(f"`{name}`:{lineno}" for name, lineno in classes) or "none detected" function_bits = ", ".join(f"`{name}()`:{lineno}" for name, lineno in functions) or "none detected" lines.append(f"- `{path}` — classes {class_bits}; functions {function_bits}") else: lines.append("- No Python classes or top-level functions detected in the highest-priority source files.") lines.extend(["", "## API Surface", ""]) if api_surface: lines.extend(api_surface) else: lines.append("- No obvious public API surface detected.") lines.extend(["", "## Test Coverage Report", ""]) lines.append(f"- Source and script files inspected: {len(source_files)}") lines.append(f"- Test files inspected: {len(test_files)}") if coverage_gaps: lines.append("- Coverage gaps:") for item in coverage_gaps[:12]: lines.append(f" - `{item.path}` — no matching test reference detected") else: lines.append("- No obvious coverage gaps detected by the stem-matching heuristic.") lines.extend(["", "## Security Audit Findings", ""]) if security: for finding in security: lines.append( f"- [{finding['severity']}] `{finding['ref']}` — {finding['category']}: {finding['detail']}. Evidence: `{finding['line']}`" ) else: lines.append("- No high-signal security findings detected by the static heuristics in this pass.") lines.extend(["", "## Dead Code Candidates", ""]) if dead_code: for item in dead_code: lines.append(f"- `{item.path}` — not imported by indexed Python modules and not referenced by tests") else: lines.append("- No obvious dead-code candidates detected.") lines.extend(["", "## Performance Bottleneck Analysis", ""]) if performance: for finding in performance: lines.append(f"- `{finding['ref']}` — {finding['detail']}") else: lines.append("- No obvious performance hotspots detected by the static heuristics in this pass.") return "\n".join(lines).rstrip() + "\n" def write_genome(repo_root: str | Path, repo_name: str | None = None, output_path: str | Path | None = None) -> RunSummary: root = Path(repo_root).resolve() markdown = generate_genome_markdown(root, repo_name=repo_name) out_path = Path(output_path) if output_path else root / "GENOME.md" out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(markdown, encoding="utf-8") files = collect_repo_files(root) source_files, test_files, _ = _test_coverage(files) return RunSummary( markdown=markdown, source_count=len(source_files), test_count=len(test_files), doc_count=len([item for item in files if item.kind == "doc"]), ) def main() -> None: parser = argparse.ArgumentParser(description="Generate a deterministic GENOME.md for a repository") parser.add_argument("--repo-root", required=True, help="Path to the repository to analyze") parser.add_argument("--repo", dest="repo_name", default=None, help="Optional repo display name") parser.add_argument("--repo-name", dest="repo_name_override", default=None, help="Optional repo display name") parser.add_argument("--output", default=None, help="Path to write GENOME.md (defaults to /GENOME.md)") args = parser.parse_args() repo_name = args.repo_name_override or args.repo_name summary = write_genome(args.repo_root, repo_name=repo_name, output_path=args.output) target = Path(args.output) if args.output else Path(args.repo_root).resolve() / "GENOME.md" print( f"GENOME.md saved to {target} " f"(sources={summary.source_count}, tests={summary.test_count}, docs={summary.doc_count})" ) if __name__ == "__main__": main()