diff --git a/scripts/knowledge_gap_identifier.py b/scripts/knowledge_gap_identifier.py new file mode 100644 index 0000000..27cf750 --- /dev/null +++ b/scripts/knowledge_gap_identifier.py @@ -0,0 +1,275 @@ +""" +Knowledge Gap Identifier — Pipeline 10.7 + +Cross-references code, docs, and tests to find gaps: +- Undocumented functions/classes +- Untested code paths +- Documented but missing implementations +- Test files without corresponding source + +Produces a gap report with severity and suggestions. +""" + +from __future__ import annotations + +import ast +import os +import re +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Dict, List, Optional, Set + + +class GapSeverity(Enum): + INFO = "info" + WARNING = "warning" + ERROR = "error" + + +class GapType(Enum): + UNDOCUMENTED = "undocumented" + UNTESTED = "untested" + MISSING_IMPLEMENTATION = "missing_implementation" + ORPHAN_TEST = "orphan_test" + STALE_DOC = "stale_doc" + + +@dataclass +class Gap: + """A single knowledge gap.""" + gap_type: GapType + severity: GapSeverity + file: str + line: Optional[int] + name: str + description: str + suggestion: str + + +@dataclass +class GapReport: + """Full gap analysis report.""" + repo_path: str + gaps: List[Gap] = field(default_factory=list) + stats: Dict[str, int] = field(default_factory=dict) + + def summary(self) -> str: + lines = [f"Gap Report for {self.repo_path}", "=" * 40] + by_type = {} + for g in self.gaps: + by_type.setdefault(g.gap_type.value, []).append(g) + + for gtype, items in sorted(by_type.items()): + lines.append(f"\n{gtype.upper()} ({len(items)}):") + for g in items: + loc = f"{g.file}:{g.line}" if g.line else g.file + lines.append(f" [{g.severity.value}] {g.name} @ {loc}") + lines.append(f" {g.description}") + + lines.append(f"\nTotal gaps: {len(self.gaps)}") + self.stats = {k: len(v) for k, v in by_type.items()} + return "\n".join(lines) + + def to_dict(self) -> dict: + return { + "repo_path": self.repo_path, + "total_gaps": len(self.gaps), + "stats": {k: len(v) for k, v in + {gt: [g for g in self.gaps if g.gap_type == gt] + for gt in GapType}.items() if v}, + "gaps": [ + { + "type": g.gap_type.value, + "severity": g.severity.value, + "file": g.file, + "line": g.line, + "name": g.name, + "description": g.description, + "suggestion": g.suggestion, + } + for g in self.gaps + ], + } + + +def _collect_python_files(root: Path) -> List[Path]: + """Collect .py files, excluding venv/node_modules/.git.""" + skip = {".git", "venv", "env", ".venv", "node_modules", "__pycache__", ".tox", ".mypy_cache"} + files = [] + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in skip] + for f in filenames: + if f.endswith(".py"): + files.append(Path(dirpath) / f) + return files + + +def _extract_python_symbols(filepath: Path) -> Set[str]: + """Extract top-level function and class names from a Python file.""" + symbols = set() + try: + source = filepath.read_text(encoding="utf-8", errors="replace") + tree = ast.parse(source, filename=str(filepath)) + except (SyntaxError, UnicodeDecodeError): + return symbols + + for node in ast.iter_child_nodes(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)): + symbols.add(node.name) + return symbols + + +def _extract_doc_symbols(filepath: Path) -> Set[str]: + """Extract function/class names mentioned in markdown docs.""" + symbols = set() + try: + text = filepath.read_text(encoding="utf-8", errors="replace") + except (UnicodeDecodeError, OSError): + return symbols + + # Match backtick-quoted identifiers: `ClassName`, `func_name`, `func()` + for m in re.finditer(r"`([A-Za-z_]\w+)(?:\(\))?`", text): + symbols.add(m.group(1)) + # Match ## ClassName or ### func_name headings + for m in re.finditer(r"^#{1,4}\s+(\w+)", text, re.MULTILINE): + symbols.add(m.group(1)) + return symbols + + +def _collect_test_files(root: Path) -> Dict[str, Path]: + """Map test module names to their file paths.""" + test_map = {} + for dirpath, dirnames, filenames in os.walk(root): + dirnames[:] = [d for d in dirnames if d not in {".git", "venv", "node_modules"}] + for f in filenames: + if f.startswith("test_") and f.endswith(".py"): + # test_foo.py -> foo + module_name = f[5:-3] + test_map[module_name] = Path(dirpath) / f + return test_map + + +class KnowledgeGapIdentifier: + """Analyzes a repo for knowledge gaps between code, docs, and tests.""" + + def analyze(self, repo_path: str) -> GapReport: + root = Path(repo_path).resolve() + report = GapReport(repo_path=str(root)) + + if not root.is_dir(): + report.gaps.append(Gap( + gap_type=GapType.UNDOCUMENTED, + severity=GapSeverity.ERROR, + file=str(root), + line=None, + name="repo", + description="Path is not a directory", + suggestion="Provide a valid repo directory", + )) + return report + + # Collect artifacts + py_files = _collect_python_files(root) + doc_files = list(root.glob("docs/**/*.md")) + list(root.glob("*.md")) + test_map = _collect_test_files(root / "tests") if (root / "tests").is_dir() else {} + + # Extract symbols from each source file + source_symbols: Dict[str, Set[str]] = {} # relative_path -> symbols + all_source_symbols: Set[str] = set() + + for pf in py_files: + rel = str(pf.relative_to(root)) + # Skip test files and setup/config + if "/tests/" in rel or rel.startswith("tests/") or rel.startswith("test_"): + continue + if pf.name in ("setup.py", "conftest.py", "conf.py"): + continue + + syms = _extract_python_symbols(pf) + if syms: + source_symbols[rel] = syms + all_source_symbols.update(syms) + + # Extract documented symbols + doc_symbols: Set[str] = set() + for df in doc_files: + doc_symbols.update(_extract_doc_symbols(df)) + + # Extract test-covered symbols + tested_modules: Set[str] = set(test_map.keys()) + + # --- Find gaps --- + + # 1. Undocumented: source symbols not in any doc + for rel_path, syms in source_symbols.items(): + for sym in sorted(syms): + if sym.startswith("_") and not sym.startswith("__"): + continue # Skip private + if sym not in doc_symbols: + report.gaps.append(Gap( + gap_type=GapType.UNDOCUMENTED, + severity=GapSeverity.WARNING, + file=rel_path, + line=None, + name=sym, + description=f"{sym} defined in {rel_path} but not referenced in any docs", + suggestion=f"Add documentation for {sym} in a .md file", + )) + + # 2. Untested: source modules without a corresponding test file + for rel_path in source_symbols: + module_name = Path(rel_path).stem + if module_name not in tested_modules and module_name not in ("__init__", "main", "config"): + report.gaps.append(Gap( + gap_type=GapType.UNTESTED, + severity=GapSeverity.ERROR, + file=rel_path, + line=None, + name=module_name, + description=f"No test file found for {rel_path}", + suggestion=f"Create tests/test_{module_name}.py", + )) + + # 3. Missing implementation: doc references symbol not in any source + referenced_but_missing = doc_symbols - all_source_symbols + for sym in sorted(referenced_but_missing): + # Filter out common non-code terms + if sym.lower() in {"todo", "fixme", "note", "example", "usage", "api", + "install", "setup", "config", "license", "contributing", + "changelog", "readme", "python", "bash", "json", "yaml", + "http", "url", "cli", "gui", "ui", "api", "rest"}: + continue + if len(sym) < 3: + continue + report.gaps.append(Gap( + gap_type=GapType.MISSING_IMPLEMENTATION, + severity=GapSeverity.INFO, + file="(docs)", + line=None, + name=sym, + description=f"{sym} referenced in docs but not found in source code", + suggestion=f"Verify if {sym} should be implemented or update docs", + )) + + # 4. Orphan tests: test files without matching source + for test_mod, test_path in test_map.items(): + if test_mod not in tested_modules and not any( + test_mod in Path(f).stem for f in source_symbols + ): + # Check if any source file partially matches + matches_source = any(test_mod.replace("_", "-") in f or test_mod.replace("_", "") in Path(f).stem + for f in source_symbols) + if not matches_source: + rel = str(test_path.relative_to(root)) + report.gaps.append(Gap( + gap_type=GapType.ORPHAN_TEST, + severity=GapSeverity.WARNING, + file=rel, + line=None, + name=test_mod, + description=f"Test file {rel} exists but no matching source module found", + suggestion=f"Verify if the source was renamed or removed", + )) + + return report diff --git a/tests/test_knowledge_gap_identifier.py b/tests/test_knowledge_gap_identifier.py new file mode 100644 index 0000000..d26ca6d --- /dev/null +++ b/tests/test_knowledge_gap_identifier.py @@ -0,0 +1,141 @@ +"""Tests for knowledge_gap_identifier module.""" + +import sys +import os +import tempfile +import shutil +from pathlib import Path + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts')) + +from knowledge_gap_identifier import KnowledgeGapIdentifier, GapType, GapSeverity + + +def _make_repo(tmpdir, structure): + """Create a test repo from a dict of {path: content}.""" + for rel_path, content in structure.items(): + p = Path(tmpdir) / rel_path + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(content) + + +def test_undocumented_symbol(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/calculator.py": "def add(a, b):\n return a + b\n", + "README.md": "# Calculator\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + undocumented = [g for g in report.gaps if g.gap_type == GapType.UNDOCUMENTED] + assert any(g.name == "add" for g in undocumented), "add should be undocumented" + + +def test_documented_symbol_no_gap(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/calculator.py": "def add(a, b):\n return a + b\n", + "README.md": "# Calculator\nUse `add()` to add numbers.\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + undocumented = [g for g in report.gaps + if g.gap_type == GapType.UNDOCUMENTED and g.name == "add"] + assert len(undocumented) == 0, "add is documented, should not be flagged" + + +def test_untested_module(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/calculator.py": "def add(a, b):\n return a + b\n", + "src/helper.py": "def format(x):\n return str(x)\n", + "tests/test_calculator.py": "from src.calculator import add\nassert add(1,2) == 3\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + untested = [g for g in report.gaps if g.gap_type == GapType.UNTESTED] + assert any("helper" in g.name for g in untested), "helper should be untested" + + +def test_tested_module_no_gap(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/calculator.py": "def add(a, b):\n return a + b\n", + "tests/test_calculator.py": "def test_add():\n assert True\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + untested = [g for g in report.gaps + if g.gap_type == GapType.UNTESTED and "calculator" in g.name] + assert len(untested) == 0, "calculator has tests, should not be flagged" + + +def test_missing_implementation(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/app.py": "def run():\n pass\n", + "docs/api.md": "# API\nUse `NonExistentClass` to do things.\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + missing = [g for g in report.gaps if g.gap_type == GapType.MISSING_IMPLEMENTATION] + assert any(g.name == "NonExistentClass" for g in missing) + + +def test_private_symbols_skipped(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/app.py": "def _internal():\n pass\ndef public():\n pass\n", + "README.md": "# App\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + undocumented_names = [g.name for g in report.gaps if g.gap_type == GapType.UNDOCUMENTED] + assert "_internal" not in undocumented_names, "Private symbols should be skipped" + assert "public" in undocumented_names + + +def test_empty_repo(): + with tempfile.TemporaryDirectory() as tmpdir: + report = KnowledgeGapIdentifier().analyze(tmpdir) + assert len(report.gaps) == 0 + + +def test_invalid_path(): + report = KnowledgeGapIdentifier().analyze("/nonexistent/path/xyz") + assert len(report.gaps) == 1 + assert report.gaps[0].severity == GapSeverity.ERROR + + +def test_report_summary(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/app.py": "class MyService:\n def handle(self):\n pass\n", + "README.md": "# App\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + summary = report.summary() + assert "UNDOCUMENTED" in summary + assert "MyService" in summary + + +def test_report_to_dict(): + with tempfile.TemporaryDirectory() as tmpdir: + _make_repo(tmpdir, { + "src/app.py": "def hello():\n pass\n", + "README.md": "# App\n", + }) + report = KnowledgeGapIdentifier().analyze(tmpdir) + d = report.to_dict() + assert "total_gaps" in d + assert "gaps" in d + assert isinstance(d["gaps"], list) + assert d["total_gaps"] > 0 + + +if __name__ == "__main__": + test_undocumented_symbol() + test_documented_symbol_no_gap() + test_untested_module() + test_tested_module_no_gap() + test_missing_implementation() + test_private_symbols_skipped() + test_empty_repo() + test_invalid_path() + test_report_summary() + test_report_to_dict() + print("All 10 tests passed.")