[gemma-4-multimodal] Multimodal Documentation Synthesis #1488
267
scripts/doc_drift_detector.py
Normal file
267
scripts/doc_drift_detector.py
Normal file
@@ -0,0 +1,267 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Architecture Drift Detector — Multimodal Documentation Synthesis
|
||||
================================================================
|
||||
|
||||
Analyzes architecture diagrams (images) and cross-references them with the
|
||||
actual codebase to identify documentation drift. Uses vision analysis on
|
||||
diagrams and file system analysis on code.
|
||||
|
||||
Usage:
|
||||
python scripts/doc_drift_detector.py --diagram docs/architecture.png --src src/
|
||||
python scripts/doc_drift_detector.py --check-readme # Analyze README diagrams
|
||||
python scripts/doc_drift_detector.py --report # Full drift report
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiagramComponent:
|
||||
"""A component extracted from an architecture diagram via vision analysis."""
|
||||
name: str
|
||||
component_type: str # "service", "module", "database", "api", "agent"
|
||||
description: str = ""
|
||||
connections: list = field(default_factory=list)
|
||||
source: str = "" # "diagram" or "code"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CodeComponent:
|
||||
"""A component found in the actual codebase."""
|
||||
name: str
|
||||
path: str
|
||||
component_type: str # "module", "class", "service", "script"
|
||||
imports: list = field(default_factory=list)
|
||||
exports: list = field(default_factory=list)
|
||||
lines_of_code: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class DriftReport:
|
||||
"""Documentation drift analysis results."""
|
||||
diagram_components: list = field(default_factory=list)
|
||||
code_components: list = field(default_factory=list)
|
||||
missing_from_code: list = field(default_factory=list) # In diagram but not code
|
||||
missing_from_docs: list = field(default_factory=list) # In code but not diagram
|
||||
connections_drift: list = field(default_factory=list) # Connection mismatches
|
||||
confidence: float = 0.0
|
||||
|
||||
def summary(self) -> str:
|
||||
lines = [
|
||||
"=== Architecture Drift Report ===",
|
||||
f"Diagram components: {len(self.diagram_components)}",
|
||||
f"Code components: {len(self.code_components)}",
|
||||
f"Missing from code (diagram-only): {len(self.missing_from_code)}",
|
||||
f"Missing from docs (code-only): {len(self.missing_from_docs)}",
|
||||
f"Connection drift issues: {len(self.connections_drift)}",
|
||||
f"Confidence: {self.confidence:.0%}",
|
||||
"",
|
||||
]
|
||||
if self.missing_from_code:
|
||||
lines.append("⚠️ In diagram but NOT found in code:")
|
||||
for c in self.missing_from_code:
|
||||
lines.append(f" - {c.name} ({c.component_type})")
|
||||
lines.append("")
|
||||
if self.missing_from_docs:
|
||||
lines.append("📝 In code but NOT in diagram:")
|
||||
for c in self.missing_from_docs:
|
||||
lines.append(f" - {c.name} at {c.path}")
|
||||
lines.append("")
|
||||
if self.connections_drift:
|
||||
lines.append("🔗 Connection drift:")
|
||||
for c in self.connections_drift:
|
||||
lines.append(f" - {c}")
|
||||
if not self.missing_from_code and not self.missing_from_docs and not self.connections_drift:
|
||||
lines.append("✅ No significant drift detected!")
|
||||
return "\n".join(lines)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"diagram_components": [vars(c) for c in self.diagram_components],
|
||||
"code_components": [vars(c) for c in self.code_components],
|
||||
"missing_from_code": [vars(c) for c in self.missing_from_code],
|
||||
"missing_from_docs": [vars(c) for c in self.missing_from_docs],
|
||||
"connections_drift": self.connections_drift,
|
||||
"confidence": self.confidence
|
||||
}
|
||||
|
||||
|
||||
class ArchitectureDriftDetector:
|
||||
"""Detects drift between architecture diagrams and actual code."""
|
||||
|
||||
def __init__(self, src_dir: str = "src"):
|
||||
self.src_dir = Path(src_dir)
|
||||
|
||||
def analyze_diagram(self, diagram_path: str) -> list:
|
||||
"""
|
||||
Extract components from an architecture diagram.
|
||||
Returns prompt for vision analysis — actual analysis done by calling agent.
|
||||
"""
|
||||
prompt = f"""Analyze this architecture diagram and extract all components.
|
||||
|
||||
For each component, identify:
|
||||
- Name (as shown in diagram)
|
||||
- Type (service, module, database, api, agent, frontend, etc.)
|
||||
- Connections to other components
|
||||
- Any version numbers or labels
|
||||
|
||||
Return as JSON array:
|
||||
```json
|
||||
[
|
||||
{{"name": "ComponentName", "type": "service", "connections": ["OtherComponent"]}}
|
||||
]
|
||||
```
|
||||
"""
|
||||
return prompt
|
||||
|
||||
def scan_codebase(self) -> list:
|
||||
"""Scan the codebase to find actual components/modules."""
|
||||
components = []
|
||||
|
||||
if not self.src_dir.exists():
|
||||
return components
|
||||
|
||||
# Scan Python modules
|
||||
for py_file in self.src_dir.rglob("*.py"):
|
||||
if py_file.name.startswith("_") and py_file.name != "__init__.py":
|
||||
continue
|
||||
name = py_file.stem
|
||||
if name == "__init__":
|
||||
name = py_file.parent.name
|
||||
|
||||
# Count lines
|
||||
try:
|
||||
content = py_file.read_text(errors="replace")
|
||||
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("#")])
|
||||
except:
|
||||
loc = 0
|
||||
|
||||
# Extract imports
|
||||
imports = re.findall(r"^from\s+(\S+)\s+import|^import\s+(\S+)", content, re.MULTILINE)
|
||||
import_list = [i[0] or i[1] for i in imports]
|
||||
|
||||
components.append(CodeComponent(
|
||||
name=name,
|
||||
path=str(py_file.relative_to(self.src_dir.parent)),
|
||||
component_type="module",
|
||||
imports=import_list[:10], # Top 10
|
||||
lines_of_code=loc
|
||||
))
|
||||
|
||||
# Scan JavaScript/TypeScript
|
||||
for ext in ["*.js", "*.ts", "*.tsx"]:
|
||||
for js_file in self.src_dir.rglob(ext):
|
||||
name = js_file.stem
|
||||
try:
|
||||
content = js_file.read_text(errors="replace")
|
||||
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("//")])
|
||||
except:
|
||||
loc = 0
|
||||
|
||||
components.append(CodeComponent(
|
||||
name=name,
|
||||
path=str(js_file.relative_to(self.src_dir.parent.parent if "mobile-app" in str(js_file) else self.src_dir.parent)),
|
||||
component_type="module",
|
||||
lines_of_code=loc
|
||||
))
|
||||
|
||||
# Scan config and scripts
|
||||
for ext in ["*.yaml", "*.yml", "*.json", "*.sh", "*.bash"]:
|
||||
for cfg in Path(".").rglob(ext):
|
||||
if ".git" in str(cfg) or "node_modules" in str(cfg):
|
||||
continue
|
||||
components.append(CodeComponent(
|
||||
name=cfg.stem,
|
||||
path=str(cfg),
|
||||
component_type="config"
|
||||
))
|
||||
|
||||
return components
|
||||
|
||||
def detect_drift(
|
||||
self,
|
||||
diagram_components: list,
|
||||
code_components: list
|
||||
) -> DriftReport:
|
||||
"""Compare diagram components against codebase."""
|
||||
report = DriftReport()
|
||||
report.diagram_components = diagram_components
|
||||
report.code_components = code_components
|
||||
|
||||
# Normalize names for matching
|
||||
def normalize(name):
|
||||
return re.sub(r'[^a-z0-9]', '', name.lower())
|
||||
|
||||
code_names = {normalize(c.name): c for c in code_components}
|
||||
diagram_names = {normalize(c.name): c for c in diagram_components}
|
||||
|
||||
# Find diagram-only components
|
||||
for norm_name, dc in diagram_names.items():
|
||||
if norm_name not in code_names:
|
||||
# Check partial matches
|
||||
partial = [code_names[k] for k in code_names if norm_name in k or k in norm_name]
|
||||
if not partial:
|
||||
report.missing_from_code.append(dc)
|
||||
|
||||
# Find code-only components (significant ones only)
|
||||
for norm_name, cc in code_names.items():
|
||||
if norm_name not in diagram_names and cc.lines_of_code > 50:
|
||||
report.missing_from_docs.append(cc)
|
||||
|
||||
# Confidence based on match rate
|
||||
if diagram_components:
|
||||
matched = len(diagram_components) - len(report.missing_from_code)
|
||||
report.confidence = matched / len(diagram_components)
|
||||
else:
|
||||
report.confidence = 0.5 # No diagram to compare
|
||||
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Architecture Drift Detector")
|
||||
parser.add_argument("--diagram", help="Path to architecture diagram image")
|
||||
parser.add_argument("--src", default="src", help="Source directory to scan")
|
||||
parser.add_argument("--report", action="store_true", help="Generate full report")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
detector = ArchitectureDriftDetector(args.src)
|
||||
|
||||
if args.diagram:
|
||||
print(f"Diagram analysis prompt (use with vision_analyze tool):")
|
||||
print(detector.analyze_diagram(args.diagram))
|
||||
print()
|
||||
|
||||
if args.report or not args.diagram:
|
||||
print("Scanning codebase...")
|
||||
code_components = detector.scan_codebase()
|
||||
print(f"Found {len(code_components)} components")
|
||||
|
||||
if args.json:
|
||||
print(json.dumps([vars(c) for c in code_components], indent=2))
|
||||
else:
|
||||
# Show top components by LOC
|
||||
by_loc = sorted(code_components, key=lambda c: c.lines_of_code, reverse=True)[:20]
|
||||
print("\nTop components by lines of code:")
|
||||
for c in by_loc:
|
||||
print(f" {c.lines_of_code:5} {c.path}")
|
||||
|
||||
# Generate drift report with empty diagram (code-only analysis)
|
||||
report = detector.detect_drift([], code_components)
|
||||
print(f"\n{report.summary()}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user