268 lines
9.7 KiB
Python
268 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Architecture Drift Detector — Multimodal Documentation Synthesis
|
|
================================================================
|
|
|
|
Analyzes architecture diagrams (images) and cross-references them with the
|
|
actual codebase to identify documentation drift. Uses vision analysis on
|
|
diagrams and file system analysis on code.
|
|
|
|
Usage:
|
|
python scripts/doc_drift_detector.py --diagram docs/architecture.png --src src/
|
|
python scripts/doc_drift_detector.py --check-readme # Analyze README diagrams
|
|
python scripts/doc_drift_detector.py --report # Full drift report
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
@dataclass
|
|
class DiagramComponent:
|
|
"""A component extracted from an architecture diagram via vision analysis."""
|
|
name: str
|
|
component_type: str # "service", "module", "database", "api", "agent"
|
|
description: str = ""
|
|
connections: list = field(default_factory=list)
|
|
source: str = "" # "diagram" or "code"
|
|
|
|
|
|
@dataclass
|
|
class CodeComponent:
|
|
"""A component found in the actual codebase."""
|
|
name: str
|
|
path: str
|
|
component_type: str # "module", "class", "service", "script"
|
|
imports: list = field(default_factory=list)
|
|
exports: list = field(default_factory=list)
|
|
lines_of_code: int = 0
|
|
|
|
|
|
@dataclass
|
|
class DriftReport:
|
|
"""Documentation drift analysis results."""
|
|
diagram_components: list = field(default_factory=list)
|
|
code_components: list = field(default_factory=list)
|
|
missing_from_code: list = field(default_factory=list) # In diagram but not code
|
|
missing_from_docs: list = field(default_factory=list) # In code but not diagram
|
|
connections_drift: list = field(default_factory=list) # Connection mismatches
|
|
confidence: float = 0.0
|
|
|
|
def summary(self) -> str:
|
|
lines = [
|
|
"=== Architecture Drift Report ===",
|
|
f"Diagram components: {len(self.diagram_components)}",
|
|
f"Code components: {len(self.code_components)}",
|
|
f"Missing from code (diagram-only): {len(self.missing_from_code)}",
|
|
f"Missing from docs (code-only): {len(self.missing_from_docs)}",
|
|
f"Connection drift issues: {len(self.connections_drift)}",
|
|
f"Confidence: {self.confidence:.0%}",
|
|
"",
|
|
]
|
|
if self.missing_from_code:
|
|
lines.append("⚠️ In diagram but NOT found in code:")
|
|
for c in self.missing_from_code:
|
|
lines.append(f" - {c.name} ({c.component_type})")
|
|
lines.append("")
|
|
if self.missing_from_docs:
|
|
lines.append("📝 In code but NOT in diagram:")
|
|
for c in self.missing_from_docs:
|
|
lines.append(f" - {c.name} at {c.path}")
|
|
lines.append("")
|
|
if self.connections_drift:
|
|
lines.append("🔗 Connection drift:")
|
|
for c in self.connections_drift:
|
|
lines.append(f" - {c}")
|
|
if not self.missing_from_code and not self.missing_from_docs and not self.connections_drift:
|
|
lines.append("✅ No significant drift detected!")
|
|
return "\n".join(lines)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"diagram_components": [vars(c) for c in self.diagram_components],
|
|
"code_components": [vars(c) for c in self.code_components],
|
|
"missing_from_code": [vars(c) for c in self.missing_from_code],
|
|
"missing_from_docs": [vars(c) for c in self.missing_from_docs],
|
|
"connections_drift": self.connections_drift,
|
|
"confidence": self.confidence
|
|
}
|
|
|
|
|
|
class ArchitectureDriftDetector:
|
|
"""Detects drift between architecture diagrams and actual code."""
|
|
|
|
def __init__(self, src_dir: str = "src"):
|
|
self.src_dir = Path(src_dir)
|
|
|
|
def analyze_diagram(self, diagram_path: str) -> list:
|
|
"""
|
|
Extract components from an architecture diagram.
|
|
Returns prompt for vision analysis — actual analysis done by calling agent.
|
|
"""
|
|
prompt = f"""Analyze this architecture diagram and extract all components.
|
|
|
|
For each component, identify:
|
|
- Name (as shown in diagram)
|
|
- Type (service, module, database, api, agent, frontend, etc.)
|
|
- Connections to other components
|
|
- Any version numbers or labels
|
|
|
|
Return as JSON array:
|
|
```json
|
|
[
|
|
{{"name": "ComponentName", "type": "service", "connections": ["OtherComponent"]}}
|
|
]
|
|
```
|
|
"""
|
|
return prompt
|
|
|
|
def scan_codebase(self) -> list:
|
|
"""Scan the codebase to find actual components/modules."""
|
|
components = []
|
|
|
|
if not self.src_dir.exists():
|
|
return components
|
|
|
|
# Scan Python modules
|
|
for py_file in self.src_dir.rglob("*.py"):
|
|
if py_file.name.startswith("_") and py_file.name != "__init__.py":
|
|
continue
|
|
name = py_file.stem
|
|
if name == "__init__":
|
|
name = py_file.parent.name
|
|
|
|
# Count lines
|
|
try:
|
|
content = py_file.read_text(errors="replace")
|
|
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("#")])
|
|
except:
|
|
loc = 0
|
|
|
|
# Extract imports
|
|
imports = re.findall(r"^from\s+(\S+)\s+import|^import\s+(\S+)", content, re.MULTILINE)
|
|
import_list = [i[0] or i[1] for i in imports]
|
|
|
|
components.append(CodeComponent(
|
|
name=name,
|
|
path=str(py_file.relative_to(self.src_dir.parent)),
|
|
component_type="module",
|
|
imports=import_list[:10], # Top 10
|
|
lines_of_code=loc
|
|
))
|
|
|
|
# Scan JavaScript/TypeScript
|
|
for ext in ["*.js", "*.ts", "*.tsx"]:
|
|
for js_file in self.src_dir.rglob(ext):
|
|
name = js_file.stem
|
|
try:
|
|
content = js_file.read_text(errors="replace")
|
|
loc = len([l for l in content.split("\n") if l.strip() and not l.strip().startswith("//")])
|
|
except:
|
|
loc = 0
|
|
|
|
components.append(CodeComponent(
|
|
name=name,
|
|
path=str(js_file.relative_to(self.src_dir.parent.parent if "mobile-app" in str(js_file) else self.src_dir.parent)),
|
|
component_type="module",
|
|
lines_of_code=loc
|
|
))
|
|
|
|
# Scan config and scripts
|
|
for ext in ["*.yaml", "*.yml", "*.json", "*.sh", "*.bash"]:
|
|
for cfg in Path(".").rglob(ext):
|
|
if ".git" in str(cfg) or "node_modules" in str(cfg):
|
|
continue
|
|
components.append(CodeComponent(
|
|
name=cfg.stem,
|
|
path=str(cfg),
|
|
component_type="config"
|
|
))
|
|
|
|
return components
|
|
|
|
def detect_drift(
|
|
self,
|
|
diagram_components: list,
|
|
code_components: list
|
|
) -> DriftReport:
|
|
"""Compare diagram components against codebase."""
|
|
report = DriftReport()
|
|
report.diagram_components = diagram_components
|
|
report.code_components = code_components
|
|
|
|
# Normalize names for matching
|
|
def normalize(name):
|
|
return re.sub(r'[^a-z0-9]', '', name.lower())
|
|
|
|
code_names = {normalize(c.name): c for c in code_components}
|
|
diagram_names = {normalize(c.name): c for c in diagram_components}
|
|
|
|
# Find diagram-only components
|
|
for norm_name, dc in diagram_names.items():
|
|
if norm_name not in code_names:
|
|
# Check partial matches
|
|
partial = [code_names[k] for k in code_names if norm_name in k or k in norm_name]
|
|
if not partial:
|
|
report.missing_from_code.append(dc)
|
|
|
|
# Find code-only components (significant ones only)
|
|
for norm_name, cc in code_names.items():
|
|
if norm_name not in diagram_names and cc.lines_of_code > 50:
|
|
report.missing_from_docs.append(cc)
|
|
|
|
# Confidence based on match rate
|
|
if diagram_components:
|
|
matched = len(diagram_components) - len(report.missing_from_code)
|
|
report.confidence = matched / len(diagram_components)
|
|
else:
|
|
report.confidence = 0.5 # No diagram to compare
|
|
|
|
return report
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Architecture Drift Detector")
|
|
parser.add_argument("--diagram", help="Path to architecture diagram image")
|
|
parser.add_argument("--src", default="src", help="Source directory to scan")
|
|
parser.add_argument("--report", action="store_true", help="Generate full report")
|
|
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
|
args = parser.parse_args()
|
|
|
|
detector = ArchitectureDriftDetector(args.src)
|
|
|
|
if args.diagram:
|
|
print(f"Diagram analysis prompt (use with vision_analyze tool):")
|
|
print(detector.analyze_diagram(args.diagram))
|
|
print()
|
|
|
|
if args.report or not args.diagram:
|
|
print("Scanning codebase...")
|
|
code_components = detector.scan_codebase()
|
|
print(f"Found {len(code_components)} components")
|
|
|
|
if args.json:
|
|
print(json.dumps([vars(c) for c in code_components], indent=2))
|
|
else:
|
|
# Show top components by LOC
|
|
by_loc = sorted(code_components, key=lambda c: c.lines_of_code, reverse=True)[:20]
|
|
print("\nTop components by lines of code:")
|
|
for c in by_loc:
|
|
print(f" {c.lines_of_code:5} {c.path}")
|
|
|
|
# Generate drift report with empty diagram (code-only analysis)
|
|
report = detector.detect_drift([], code_components)
|
|
print(f"\n{report.summary()}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|