[gemma-4-multimodal] Multimodal Documentation Synthesis #1488

Open
Rockachopa wants to merge 1 commits from gemma4-worker-20260409-105326-1483 into main

View File

@@ -0,0 +1,267 @@
#!/usr/bin/env python3
"""
Architecture Drift Detector — Multimodal Documentation Synthesis
================================================================
Analyzes architecture diagrams (images) and cross-references them with the
actual codebase to identify documentation drift. Uses vision analysis on
diagrams and file system analysis on code.
Usage:
python scripts/doc_drift_detector.py --diagram docs/architecture.png --src src/
python scripts/doc_drift_detector.py --check-readme # Analyze README diagrams
python scripts/doc_drift_detector.py --report # Full drift report
"""
import argparse
import json
import os
import re
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
@dataclass
class DiagramComponent:
"""A component extracted from an architecture diagram via vision analysis."""
name: str
component_type: str # "service", "module", "database", "api", "agent"
description: str = ""
connections: list = field(default_factory=list)
source: str = "" # "diagram" or "code"
@dataclass
class CodeComponent:
"""A component found in the actual codebase."""
name: str
path: str
component_type: str # "module", "class", "service", "script"
imports: list = field(default_factory=list)
exports: list = field(default_factory=list)
lines_of_code: int = 0
@dataclass
class DriftReport:
"""Documentation drift analysis results."""
diagram_components: list = field(default_factory=list)
code_components: list = field(default_factory=list)
missing_from_code: list = field(default_factory=list) # In diagram but not code
missing_from_docs: list = field(default_factory=list) # In code but not diagram
connections_drift: list = field(default_factory=list) # Connection mismatches
confidence: float = 0.0
def summary(self) -> str:
lines = [
"=== Architecture Drift Report ===",
f"Diagram components: {len(self.diagram_components)}",
f"Code components: {len(self.code_components)}",
f"Missing from code (diagram-only): {len(self.missing_from_code)}",
f"Missing from docs (code-only): {len(self.missing_from_docs)}",
f"Connection drift issues: {len(self.connections_drift)}",
f"Confidence: {self.confidence:.0%}",
"",
]
if self.missing_from_code:
lines.append("⚠️ In diagram but NOT found in code:")
for c in self.missing_from_code:
lines.append(f" - {c.name} ({c.component_type})")
lines.append("")
if self.missing_from_docs:
lines.append("📝 In code but NOT in diagram:")
for c in self.missing_from_docs:
lines.append(f" - {c.name} at {c.path}")
lines.append("")
if self.connections_drift:
lines.append("🔗 Connection drift:")
for c in self.connections_drift:
lines.append(f" - {c}")
if not self.missing_from_code and not self.missing_from_docs and not self.connections_drift:
lines.append("✅ No significant drift detected!")
return "\n".join(lines)
def to_dict(self) -> dict:
return {
"diagram_components": [vars(c) for c in self.diagram_components],
"code_components": [vars(c) for c in self.code_components],
"missing_from_code": [vars(c) for c in self.missing_from_code],
"missing_from_docs": [vars(c) for c in self.missing_from_docs],
"connections_drift": self.connections_drift,
"confidence": self.confidence
}
class ArchitectureDriftDetector:
"""Detects drift between architecture diagrams and actual code."""
def __init__(self, src_dir: str = "src"):
self.src_dir = Path(src_dir)
def analyze_diagram(self, diagram_path: str) -> list:
"""
Extract components from an architecture diagram.
Returns prompt for vision analysis — actual analysis done by calling agent.
"""
prompt = f"""Analyze this architecture diagram and extract all components.
For each component, identify:
- Name (as shown in diagram)
- Type (service, module, database, api, agent, frontend, etc.)
- Connections to other components
- Any version numbers or labels
Return as JSON array:
```json
[
{{"name": "ComponentName", "type": "service", "connections": ["OtherComponent"]}}
]
```
"""
return prompt
def scan_codebase(self) -> list:
"""Scan the codebase to find actual components/modules."""
components = []
if not self.src_dir.exists():
return components
# Scan Python modules
for py_file in self.src_dir.rglob("*.py"):
if py_file.name.startswith("_") and py_file.name != "__init__.py":
continue
name = py_file.stem
if name == "__init__":
name = py_file.parent.name
# Count lines
try:
content = py_file.read_text(errors="replace")
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("#")])
except:
loc = 0
# Extract imports
imports = re.findall(r"^from\s+(\S+)\s+import|^import\s+(\S+)", content, re.MULTILINE)
import_list = [i[0] or i[1] for i in imports]
components.append(CodeComponent(
name=name,
path=str(py_file.relative_to(self.src_dir.parent)),
component_type="module",
imports=import_list[:10], # Top 10
lines_of_code=loc
))
# Scan JavaScript/TypeScript
for ext in ["*.js", "*.ts", "*.tsx"]:
for js_file in self.src_dir.rglob(ext):
name = js_file.stem
try:
content = js_file.read_text(errors="replace")
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("//")])
except:
loc = 0
components.append(CodeComponent(
name=name,
path=str(js_file.relative_to(self.src_dir.parent.parent if "mobile-app" in str(js_file) else self.src_dir.parent)),
component_type="module",
lines_of_code=loc
))
# Scan config and scripts
for ext in ["*.yaml", "*.yml", "*.json", "*.sh", "*.bash"]:
for cfg in Path(".").rglob(ext):
if ".git" in str(cfg) or "node_modules" in str(cfg):
continue
components.append(CodeComponent(
name=cfg.stem,
path=str(cfg),
component_type="config"
))
return components
def detect_drift(
self,
diagram_components: list,
code_components: list
) -> DriftReport:
"""Compare diagram components against codebase."""
report = DriftReport()
report.diagram_components = diagram_components
report.code_components = code_components
# Normalize names for matching
def normalize(name):
return re.sub(r'[^a-z0-9]', '', name.lower())
code_names = {normalize(c.name): c for c in code_components}
diagram_names = {normalize(c.name): c for c in diagram_components}
# Find diagram-only components
for norm_name, dc in diagram_names.items():
if norm_name not in code_names:
# Check partial matches
partial = [code_names[k] for k in code_names if norm_name in k or k in norm_name]
if not partial:
report.missing_from_code.append(dc)
# Find code-only components (significant ones only)
for norm_name, cc in code_names.items():
if norm_name not in diagram_names and cc.lines_of_code > 50:
report.missing_from_docs.append(cc)
# Confidence based on match rate
if diagram_components:
matched = len(diagram_components) - len(report.missing_from_code)
report.confidence = matched / len(diagram_components)
else:
report.confidence = 0.5 # No diagram to compare
return report
def main():
parser = argparse.ArgumentParser(description="Architecture Drift Detector")
parser.add_argument("--diagram", help="Path to architecture diagram image")
parser.add_argument("--src", default="src", help="Source directory to scan")
parser.add_argument("--report", action="store_true", help="Generate full report")
parser.add_argument("--json", action="store_true", help="Output as JSON")
args = parser.parse_args()
detector = ArchitectureDriftDetector(args.src)
if args.diagram:
print(f"Diagram analysis prompt (use with vision_analyze tool):")
print(detector.analyze_diagram(args.diagram))
print()
if args.report or not args.diagram:
print("Scanning codebase...")
code_components = detector.scan_codebase()
print(f"Found {len(code_components)} components")
if args.json:
print(json.dumps([vars(c) for c in code_components], indent=2))
else:
# Show top components by LOC
by_loc = sorted(code_components, key=lambda c: c.lines_of_code, reverse=True)[:20]
print("\nTop components by lines of code:")
for c in by_loc:
print(f" {c.lines_of_code:5} {c.path}")
# Generate drift report with empty diagram (code-only analysis)
report = detector.detect_drift([], code_components)
print(f"\n{report.summary()}")
return 0
if __name__ == "__main__":
sys.exit(main())