Compare commits

..

2 Commits

3 changed files with 405 additions and 221 deletions

216
scripts/diff_analyzer.py Normal file
View File

@@ -0,0 +1,216 @@
#!/usr/bin/env python3
"""
Diff Analyzer — Parse unified diffs and categorize every change.
Pipeline 6.1 for Compounding Intelligence.
"""
import re
from dataclasses import dataclass, field, asdict
from enum import Enum
from typing import List, Dict, Any, Optional
class ChangeCategory(Enum):
ADDED = "added"
DELETED = "deleted"
MODIFIED = "modified"
MOVED = "moved"
CONTEXT = "context"
@dataclass
class Hunk:
"""A single diff hunk with header, line ranges, and category."""
header: str
old_start: int
old_count: int
new_start: int
new_count: int
lines: List[str] = field(default_factory=list)
category: ChangeCategory = ChangeCategory.CONTEXT
def to_dict(self) -> Dict[str, Any]:
d = asdict(self)
d["category"] = self.category.value
return d
@dataclass
class FileChange:
"""A single file's changes."""
path: str
old_path: Optional[str] = None # For renames
hunks: List[Hunk] = field(default_factory=list)
added_lines: int = 0
deleted_lines: int = 0
is_new: bool = False
is_deleted: bool = False
is_renamed: bool = False
is_binary: bool = False
def to_dict(self) -> Dict[str, Any]:
return {
"path": self.path,
"old_path": self.old_path,
"hunks": [h.to_dict() for h in self.hunks],
"added_lines": self.added_lines,
"deleted_lines": self.deleted_lines,
"is_new": self.is_new,
"is_deleted": self.is_deleted,
"is_renamed": self.is_renamed,
"is_binary": self.is_binary,
}
@dataclass
class ChangeSummary:
"""Aggregate stats + per-file breakdown."""
files: List[FileChange] = field(default_factory=list)
total_added: int = 0
total_deleted: int = 0
total_files_changed: int = 0
total_hunks: int = 0
new_files: int = 0
deleted_files: int = 0
renamed_files: int = 0
binary_files: int = 0
def to_dict(self) -> Dict[str, Any]:
return {
"total_files_changed": self.total_files_changed,
"total_added": self.total_added,
"total_deleted": self.total_deleted,
"total_hunks": self.total_hunks,
"new_files": self.new_files,
"deleted_files": self.deleted_files,
"renamed_files": self.renamed_files,
"binary_files": self.binary_files,
"files": [f.to_dict() for f in self.files],
}
class DiffAnalyzer:
"""Parses unified diff format and produces structured ChangeSummary."""
HUNK_HEADER_RE = re.compile(r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$")
DIFF_FILE_RE = re.compile(r"^diff --git a/(.*) b/(.*)")
RENAME_RE = re.compile(r"^rename from (.+)$")
RENAME_TO_RE = re.compile(r"^rename to (.+)$")
NEW_FILE_RE = re.compile(r"^new file mode")
DELETED_FILE_RE = re.compile(r"^deleted file mode")
BINARY_RE = re.compile(r"^Binary files .* differ")
def analyze(self, diff_text: str) -> ChangeSummary:
"""Parse a unified diff and return a ChangeSummary."""
summary = ChangeSummary()
if not diff_text or not diff_text.strip():
return summary
# Split diff into per-file sections
file_diffs = self._split_files(diff_text)
for file_diff in file_diffs:
fc = self._parse_file_diff(file_diff)
summary.files.append(fc)
summary.total_added += fc.added_lines
summary.total_deleted += fc.deleted_lines
summary.total_hunks += len(fc.hunks)
if fc.is_new:
summary.new_files += 1
if fc.is_deleted:
summary.deleted_files += 1
if fc.is_renamed:
summary.renamed_files += 1
if fc.is_binary:
summary.binary_files += 1
summary.total_files_changed = len(summary.files)
return summary
def _split_files(self, diff_text: str) -> List[str]:
"""Split a multi-file diff into individual file diffs."""
lines = diff_text.split("\n")
chunks = []
current = []
for line in lines:
if line.startswith("diff --git ") and current:
chunks.append("\n".join(current))
current = [line]
else:
current.append(line)
if current:
chunks.append("\n".join(current))
return chunks
def _parse_file_diff(self, diff_text: str) -> FileChange:
"""Parse a single file's diff section."""
lines = diff_text.split("\n")
fc = FileChange(path="")
# Extract file paths
for line in lines:
m = self.DIFF_FILE_RE.match(line)
if m:
fc.path = m.group(2)
break
# Check for special states
for line in lines:
if self.NEW_FILE_RE.match(line):
fc.is_new = True
elif self.DELETED_FILE_RE.match(line):
fc.is_deleted = True
elif self.RENAME_RE.match(line):
fc.old_path = m.group(1) if (m := self.RENAME_RE.match(line)) else None
fc.is_renamed = True
elif self.BINARY_RE.match(line):
fc.is_binary = True
return fc # No hunks for binary
# Rename TO
for line in lines:
m = self.RENAME_TO_RE.match(line)
if m and fc.is_renamed:
fc.path = m.group(1)
# Parse hunks
current_hunk = None
for line in lines:
m = self.HUNK_HEADER_RE.match(line)
if m:
if current_hunk:
self._classify_hunk(current_hunk, fc)
fc.hunks.append(current_hunk)
current_hunk = Hunk(
header=m.group(5).strip(),
old_start=int(m.group(1)),
old_count=int(m.group(2) or 1),
new_start=int(m.group(3)),
new_count=int(m.group(4) or 1),
)
elif current_hunk and (line.startswith("+") or line.startswith("-") or line.startswith(" ")):
current_hunk.lines.append(line)
if current_hunk:
self._classify_hunk(current_hunk, fc)
fc.hunks.append(current_hunk)
return fc
def _classify_hunk(self, hunk: Hunk, fc: FileChange):
"""Classify a hunk and count lines."""
added = sum(1 for l in hunk.lines if l.startswith("+"))
deleted = sum(1 for l in hunk.lines if l.startswith("-"))
fc.added_lines += added
fc.deleted_lines += deleted
if added > 0 and deleted == 0:
hunk.category = ChangeCategory.ADDED
elif deleted > 0 and added == 0:
hunk.category = ChangeCategory.DELETED
elif added > 0 and deleted > 0:
hunk.category = ChangeCategory.MODIFIED
else:
hunk.category = ChangeCategory.CONTEXT

View File

@@ -1,221 +0,0 @@
#!/usr/bin/env python3
"""
Knowledge Store Staleness Detector
Checks knowledge entries against their source files to detect staleness.
An entry is stale when its source file has been modified since extraction.
Usage:
python3 scripts/knowledge_staleness_check.py knowledge/index.json
python3 scripts/knowledge_staleness_check.py --repo /path/to/repo --index knowledge/index.json
python3 scripts/knowledge_staleness_check.py --index knowledge/index.json --fix
Expected index.json format:
{
"version": 1,
"facts": [
{
"fact": "...",
"category": "fact|pitfall|pattern|tool-quirk",
"repo": "repo-name",
"confidence": 0.8,
"source_file": "path/to/file.py",
"source_hash": "sha256:abcdef...",
"extracted_at": "2026-04-13T20:00:00Z"
}
]
}
"""
import argparse
import hashlib
import json
import sys
from pathlib import Path
from typing import Optional
def compute_file_hash(filepath: str) -> Optional[str]:
"""Compute SHA-256 hash of a file. Returns None if file not found."""
path = Path(filepath)
if not path.exists():
return None
content = path.read_bytes()
return hashlib.sha256(content).hexdigest()[:16]
def check_staleness(index_path: str, repo_root: str = None) -> dict:
"""Check all entries in the knowledge index for staleness."""
index = Path(index_path)
if not index.exists():
return {"error": f"Index not found: {index_path}"}
data = json.loads(index.read_text())
facts = data.get("facts", [])
if not facts:
return {
"total": 0,
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
# Determine repo root
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent # knowledge/index.json -> repo root
results = {
"total": len(facts),
"stale": 0,
"fresh": 0,
"no_source": 0,
"missing_files": 0,
"stale_entries": [],
}
for i, entry in enumerate(facts):
source_file = entry.get("source_file")
stored_hash = entry.get("source_hash")
if not source_file:
results["no_source"] += 1
continue
if not stored_hash:
# Entry has source file but no hash — consider stale
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "no_hash",
})
continue
# Compute current hash
full_path = root / source_file
current_hash = compute_file_hash(str(full_path))
if current_hash is None:
results["missing_files"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"reason": "file_missing",
})
elif current_hash != stored_hash:
results["stale"] += 1
results["stale_entries"].append({
"index": i,
"fact": entry.get("fact", "")[:100],
"source_file": source_file,
"stored_hash": stored_hash,
"current_hash": current_hash,
"reason": "hash_mismatch",
})
else:
results["fresh"] += 1
return results
def add_hashes_to_index(index_path: str, repo_root: str = None) -> dict:
"""Add source hashes to entries that are missing them."""
index = Path(index_path)
data = json.loads(index.read_text())
facts = data.get("facts", [])
if repo_root:
root = Path(repo_root)
else:
root = index.parent.parent
updated = 0
skipped = 0
for entry in facts:
source_file = entry.get("source_file")
if not source_file or entry.get("source_hash"):
skipped += 1
continue
full_path = root / source_file
file_hash = compute_file_hash(str(full_path))
if file_hash:
entry["source_hash"] = file_hash
updated += 1
if updated > 0:
index.write_text(json.dumps(data, indent=2) + "\n")
return {"updated": updated, "skipped": skipped, "total": len(facts)}
def report_staleness(results: dict) -> str:
"""Format staleness check results as a report."""
lines = []
lines.append("=" * 50)
lines.append("KNOWLEDGE STORE STALENESS REPORT")
lines.append("=" * 50)
lines.append(f"Total entries: {results['total']}")
lines.append(f"Fresh: {results['fresh']}")
lines.append(f"Stale: {results['stale']}")
lines.append(f"No source: {results['no_source']}")
lines.append(f"Missing files: {results['missing_files']}")
lines.append("")
if results["stale_entries"]:
lines.append("STALE ENTRIES:")
lines.append("-" * 50)
for entry in results["stale_entries"]:
lines.append(f" [{entry['reason']}] {entry['source_file']}")
lines.append(f" {entry['fact']}")
if entry.get("stored_hash") and entry.get("current_hash"):
lines.append(f" stored: {entry['stored_hash']}")
lines.append(f" current: {entry['current_hash']}")
lines.append("")
if results["total"] > 0:
staleness_pct = results["stale"] / results["total"] * 100
lines.append(f"Staleness rate: {staleness_pct:.1f}%")
else:
lines.append("No entries to check.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Check knowledge store for stale entries")
parser.add_argument("--index", default="knowledge/index.json", help="Path to index.json")
parser.add_argument("--repo", help="Repository root (default: auto-detect from index path)")
parser.add_argument("--fix", action="store_true", help="Add missing hashes to index")
parser.add_argument("--json", action="store_true", help="Output JSON instead of report")
args = parser.parse_args()
if args.fix:
result = add_hashes_to_index(args.index, args.repo)
if args.json:
print(json.dumps(result, indent=2))
else:
print(f"Updated {result['updated']} entries with source hashes.")
print(f"Skipped {result['skipped']} (already had hashes or no source file).")
else:
results = check_staleness(args.index, args.repo)
if "error" in results:
print(f"Error: {results['error']}", file=sys.stderr)
sys.exit(1)
if args.json:
print(json.dumps(results, indent=2))
else:
print(report_staleness(results))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Tests for scripts/diff_analyzer.py — 10 tests."""
import sys
import os
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location("da", os.path.join(os.path.dirname(__file__) or ".", "diff_analyzer.py"))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
DiffAnalyzer = mod.DiffAnalyzer
ChangeCategory = mod.ChangeCategory
SAMPLE_ADD = """diff --git a/new.py b/new.py
new file mode 100644
--- /dev/null
+++ b/new.py
@@ -0,0 +1,3 @@
+def hello():
+ print("world")
+ return True
"""
SAMPLE_DELETE = """diff --git a/old.py b/old.py
deleted file mode 100644
--- a/old.py
+++ /dev/null
@@ -1,2 +0,0 @@
-def goodbye():
- pass
"""
SAMPLE_MODIFY = """diff --git a/app.py b/app.py
--- a/app.py
+++ b/app.py
@@ -1,3 +1,4 @@
def main():
- print("old")
+ print("new")
+ print("extra")
return 0
"""
SAMPLE_RENAME = """diff --git a/old_name.py b/new_name.py
rename from old_name.py
rename to new_name.py
--- a/old_name.py
+++ b/new_name.py
@@ -1,1 +1,1 @@
-old content
+new content
"""
SAMPLE_MULTI = """diff --git a/a.py b/a.py
--- a/a.py
+++ b/a.py
@@ -1,1 +1,2 @@
existing
+added line
diff --git b/b.py b/b.py
new file mode 100644
--- /dev/null
+++ b/b.py
@@ -0,0 +1,1 @@
+new file
"""
SAMPLE_BINARY = """diff --git a/img.png b/img.png
Binary files a/img.png and b/img.png differ
"""
def test_empty():
a = DiffAnalyzer()
s = a.analyze("")
assert s.total_files_changed == 0
print("PASS: test_empty")
def test_addition():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_ADD)
assert s.total_files_changed == 1
assert s.total_added == 3
assert s.total_deleted == 0
assert s.new_files == 1
assert s.files[0].hunks[0].category == ChangeCategory.ADDED
print("PASS: test_addition")
def test_deletion():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_DELETE)
assert s.total_deleted == 2
assert s.deleted_files == 1
assert s.files[0].hunks[0].category == ChangeCategory.DELETED
print("PASS: test_deletion")
def test_modification():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_MODIFY)
assert s.total_added == 2
assert s.total_deleted == 1
assert s.files[0].hunks[0].category == ChangeCategory.MODIFIED
print("PASS: test_modification")
def test_rename():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_RENAME)
assert s.renamed_files == 1
assert s.files[0].old_path == "old_name.py"
assert s.files[0].path == "new_name.py"
assert s.files[0].is_renamed == True
print("PASS: test_rename")
def test_multiple_files():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_MULTI)
assert s.total_files_changed == 2
assert s.new_files == 1
print("PASS: test_multiple_files")
def test_binary():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_BINARY)
assert s.binary_files == 1
assert s.files[0].is_binary == True
assert len(s.files[0].hunks) == 0
print("PASS: test_binary")
def test_to_dict():
a = DiffAnalyzer()
s = a.analyze(SAMPLE_MODIFY)
d = s.to_dict()
assert "total_files_changed" in d
assert "files" in d
assert isinstance(d["files"], list)
print("PASS: test_to_dict")
def test_context_only():
diff = """diff --git a/f.py b/f.py
--- a/f.py
+++ b/f.py
@@ -1,3 +1,3 @@
line1
-old
+new
line3
"""
a = DiffAnalyzer()
s = a.analyze(diff)
# Has both added and deleted = MODIFIED
assert s.files[0].hunks[0].category == ChangeCategory.MODIFIED
print("PASS: test_context_only")
def test_multi_hunk():
diff = """diff --git a/f.py b/f.py
--- a/f.py
+++ b/f.py
@@ -1,1 +1,2 @@
existing
+first addition
@@ -10,1 +11,2 @@
more
+second addition
"""
a = DiffAnalyzer()
s = a.analyze(diff)
assert s.total_hunks == 2
assert s.total_added == 2
print("PASS: test_multi_hunk")
def run_all():
test_empty()
test_addition()
test_deletion()
test_modification()
test_rename()
test_multiple_files()
test_binary()
test_to_dict()
test_context_only()
test_multi_hunk()
print("\nAll 10 tests passed!")
if __name__ == "__main__":
run_all()