Compare commits

..

1 Commits

Author SHA1 Message Date
TERRA
fe4a04145a feat: add diff analyzer for PR change categorization (closes #118) 2026-04-14 23:25:15 -04:00
3 changed files with 463 additions and 353 deletions

239
scripts/diff_analyzer.py Normal file
View File

@@ -0,0 +1,239 @@
"""
Diff Analyzer — Pipeline 6.1
Reads PR diffs and categorizes changes: new code, deleted code, modified code, moved code.
Produces a change summary with line counts per category.
Usage:
from diff_analyzer import DiffAnalyzer
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff_text)
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional
class ChangeCategory(Enum):
"""Categories of code changes in a diff hunk."""
ADDED = "added"
DELETED = "deleted"
MODIFIED = "modified"
MOVED = "moved"
CONTEXT = "context"
@dataclass
class Hunk:
"""A single diff hunk with metadata."""
header: str
old_start: int
old_count: int
new_start: int
new_count: int
lines: List[str] = field(default_factory=list)
category: ChangeCategory = ChangeCategory.CONTEXT
old_lines: int = 0
new_lines: int = 0
@dataclass
class FileChange:
"""Changes within a single file."""
path: str
old_path: Optional[str] = None # For renames
is_new: bool = False
is_deleted: bool = False
is_renamed: bool = False
hunks: List[Hunk] = field(default_factory=list)
added_lines: int = 0
deleted_lines: int = 0
context_lines: int = 0
@dataclass
class ChangeSummary:
"""Summary of all changes in a diff."""
files_changed: int = 0
files_added: int = 0
files_deleted: int = 0
files_renamed: int = 0
files_modified: int = 0
total_added: int = 0
total_deleted: int = 0
total_context: int = 0
hunks_added: int = 0
hunks_deleted: int = 0
hunks_modified: int = 0
hunks_moved: int = 0
file_changes: List[FileChange] = field(default_factory=list)
def to_dict(self) -> dict:
"""Serialize to dict for JSON output."""
return {
"files_changed": self.files_changed,
"files_added": self.files_added,
"files_deleted": self.files_deleted,
"files_renamed": self.files_renamed,
"files_modified": self.files_modified,
"total_added": self.total_added,
"total_deleted": self.total_deleted,
"total_context": self.total_context,
"hunks_added": self.hunks_added,
"hunks_deleted": self.hunks_deleted,
"hunks_modified": self.hunks_modified,
"hunks_moved": self.hunks_moved,
"files": [
{
"path": fc.path,
"old_path": fc.old_path,
"is_new": fc.is_new,
"is_deleted": fc.is_deleted,
"is_renamed": fc.is_renamed,
"added": fc.added_lines,
"deleted": fc.deleted_lines,
"context": fc.context_lines,
}
for fc in self.file_changes
],
}
# Regex for unified diff headers
_HUNK_RE = re.compile(
r"^@@\s+-(\d+)(?:,(\d+))?\s+\+(\d+)(?:,(\d+))?\s+@@(.*)$"
)
_FILE_HEADER_RE = re.compile(r"^diff --git a/(.*) b/(.*)$")
_RENAME_RE = re.compile(r"^rename from (.+)$|^rename to (.+)$")
class DiffAnalyzer:
"""Parses unified diffs and categorizes changes."""
def analyze(self, diff_text: str) -> ChangeSummary:
"""Analyze a unified diff string and return a ChangeSummary."""
summary = ChangeSummary()
lines = diff_text.splitlines(keepends=False)
current_file: Optional[FileChange] = None
current_hunk: Optional[Hunk] = None
old_path: Optional[str] = None
new_path: Optional[str] = None
for line in lines:
# File header
m = _FILE_HEADER_RE.match(line)
if m:
# Save previous file
if current_file:
self._classify_file(current_file)
summary.file_changes.append(current_file)
old_path = m.group(1)
new_path = m.group(2)
current_file = FileChange(path=new_path, old_path=old_path)
current_hunk = None
continue
if current_file is None:
continue
# Detect new/deleted file markers
if line.startswith("new file mode"):
current_file.is_new = True
continue
if line.startswith("deleted file mode"):
current_file.is_deleted = True
continue
# Detect renames
rm = _RENAME_RE.match(line)
if rm:
current_file.is_renamed = True
continue
# Hunk header
hm = _HUNK_RE.match(line)
if hm:
if current_hunk:
self._classify_hunk(current_hunk)
current_file.hunks.append(current_hunk)
current_hunk = Hunk(
header=line,
old_start=int(hm.group(1)),
old_count=int(hm.group(2) or 1),
new_start=int(hm.group(3)),
new_count=int(hm.group(4) or 1),
)
continue
if current_hunk is None:
continue
# Hunk content
current_hunk.lines.append(line)
if line.startswith("+"):
current_hunk.new_lines += 1
current_file.added_lines += 1
elif line.startswith("-"):
current_hunk.old_lines += 1
current_file.deleted_lines += 1
elif line.startswith(" "):
current_file.context_lines += 1
# Finalize last hunk and file
if current_hunk:
self._classify_hunk(current_hunk)
if current_file:
current_file.hunks.append(current_hunk)
if current_file:
self._classify_file(current_file)
summary.file_changes.append(current_file)
# Aggregate
summary.files_changed = len(summary.file_changes)
for fc in summary.file_changes:
summary.total_added += fc.added_lines
summary.total_deleted += fc.deleted_lines
summary.total_context += fc.context_lines
if fc.is_new:
summary.files_added += 1
elif fc.is_deleted:
summary.files_deleted += 1
elif fc.is_renamed:
summary.files_renamed += 1
else:
summary.files_modified += 1
for h in fc.hunks:
if h.category == ChangeCategory.ADDED:
summary.hunks_added += 1
elif h.category == ChangeCategory.DELETED:
summary.hunks_deleted += 1
elif h.category == ChangeCategory.MODIFIED:
summary.hunks_modified += 1
elif h.category == ChangeCategory.MOVED:
summary.hunks_moved += 1
return summary
def _classify_hunk(self, hunk: Hunk) -> None:
"""Classify a hunk based on its add/delete ratio."""
if hunk.new_lines > 0 and hunk.old_lines == 0:
hunk.category = ChangeCategory.ADDED
elif hunk.old_lines > 0 and hunk.new_lines == 0:
hunk.category = ChangeCategory.DELETED
elif hunk.new_lines > 0 and hunk.old_lines > 0:
hunk.category = ChangeCategory.MODIFIED
else:
hunk.category = ChangeCategory.CONTEXT
def _classify_file(self, fc: FileChange) -> None:
"""Final file classification (renames already detected via headers)."""
pass

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
sampler.py — Score and rank sessions by harvest value.
With 20k+ sessions on disk, we can't harvest all at once. This script
scores each session by how likely it is to contain valuable knowledge,
so the harvester processes the best ones first.
Scoring strategy:
- Recency: last 7d=3pts, last 30d=2pts, older=1pt
- Length: >50 messages=3pts, >20=2pts, <20=1pt
- Repo uniqueness: first session for a repo=5pts, otherwise=1pt
- Outcome: failure=3pts (most to learn), success=2pts, unknown=1pt
- Tool calls: >10 tool invocations=2pts (complex sessions)
Usage:
python3 sampler.py --count 100 # Top 100 sessions
python3 sampler.py --repo the-nexus --count 20 # Top 20 for a repo
python3 sampler.py --since 2026-04-01 # All sessions since date
python3 sampler.py --count 50 --min-score 8 # Only high-value sessions
python3 sampler.py --count 100 --output sample.json # Save to file
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
# --- Fast session scanning (no full parse) ---
def scan_session_fast(path: str) -> dict:
"""Extract scoring metadata from a session without parsing the full JSONL.
Reads only: first line, last ~20 lines, and line count. This processes
20k sessions in seconds instead of minutes.
"""
meta = {
'path': path,
'message_count': 0,
'has_tool_calls': False,
'tool_call_count': 0,
'first_timestamp': '',
'last_timestamp': '',
'is_failure': False,
'repos_mentioned': set(),
'first_role': '',
'last_content_preview': '',
}
try:
file_size = os.path.getsize(path)
if file_size == 0:
return meta
with open(path, 'r', encoding='utf-8', errors='replace') as f:
# Read first line for timestamp + role
first_line = f.readline().strip()
if first_line:
try:
first_msg = json.loads(first_line)
meta['first_timestamp'] = first_msg.get('timestamp', '')
meta['first_role'] = first_msg.get('role', '')
except json.JSONDecodeError:
pass
# Fast line count + collect tail lines
# For the tail, seek to near end of file
tail_lines = []
line_count = 1 # already read first
if file_size > 8192:
# Seek to last 8KB for tail sampling
f.seek(max(0, file_size - 8192))
f.readline() # skip partial line
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
# We lost the exact count for big files — estimate from file size
# Average JSONL line is ~500 bytes
if line_count < 100:
line_count = max(line_count, file_size // 500)
else:
# Small file — read all
for line in f:
line = line.strip()
if line:
tail_lines.append(line)
line_count += 1
meta['message_count'] = line_count
# Parse tail lines for outcome, tool calls, repos
for line in tail_lines[-30:]: # last 30 non-empty lines
try:
msg = json.loads(line)
# Track last timestamp
ts = msg.get('timestamp', '')
if ts:
meta['last_timestamp'] = ts
# Count tool calls
if msg.get('tool_calls'):
meta['has_tool_calls'] = True
meta['tool_call_count'] += len(msg['tool_calls'])
# Detect failure signals in content
content = ''
if isinstance(msg.get('content'), str):
content = msg['content'].lower()
elif isinstance(msg.get('content'), list):
for part in msg['content']:
if isinstance(part, dict) and part.get('type') == 'text':
content += part.get('text', '').lower()
if content:
meta['last_content_preview'] = content[:200]
failure_signals = ['error', 'failed', 'cannot', 'unable',
'exception', 'traceback', 'rejected', 'denied']
if any(sig in content for sig in failure_signals):
meta['is_failure'] = True
# Extract repo references from tool call arguments
if msg.get('tool_calls'):
for tc in msg['tool_calls']:
args = tc.get('function', {}).get('arguments', '')
if isinstance(args, str):
# Look for repo patterns
for pattern in ['Timmy_Foundation/', 'Rockachopa/', 'compounding-intelligence', 'the-nexus', 'timmy-home', 'hermes-agent', 'the-beacon', 'the-door']:
if pattern in args:
repo = pattern.rstrip('/')
meta['repos_mentioned'].add(repo)
except json.JSONDecodeError:
continue
except (IOError, OSError):
pass
meta['repos_mentioned'] = list(meta['repos_mentioned'])
return meta
# --- Filename timestamp parsing ---
def parse_session_timestamp(filename: str) -> Optional[datetime]:
"""Parse timestamp from session filename.
Common formats:
session_20260413_123456_hash.jsonl
20260413_123456_hash.jsonl
"""
stem = Path(filename).stem
parts = stem.split('_')
# Try session_YYYYMMDD_HHMMSS format
for i, part in enumerate(parts):
if len(part) == 8 and part.isdigit():
date_part = part
time_part = parts[i + 1] if i + 1 < len(parts) and len(parts[i + 1]) == 6 else '000000'
try:
return datetime.strptime(f"{date_part}_{time_part}", '%Y%m%d_%H%M%S').replace(tzinfo=timezone.utc)
except ValueError:
continue
# Fallback: use file modification time
return None
# --- Scoring ---
def score_session(meta: dict, now: datetime, seen_repos: set) -> tuple[int, dict]:
"""Score a session for harvest value. Returns (score, breakdown)."""
score = 0
breakdown = {}
# 1. Recency
ts = parse_session_timestamp(os.path.basename(meta['path']))
if ts is None:
# Fallback to mtime
try:
ts = datetime.fromtimestamp(os.path.getmtime(meta['path']), tz=timezone.utc)
except OSError:
ts = now - timedelta(days=365)
age_days = (now - ts).days
if age_days <= 7:
recency = 3
elif age_days <= 30:
recency = 2
else:
recency = 1
score += recency
breakdown['recency'] = recency
# 2. Length
count = meta['message_count']
if count > 50:
length = 3
elif count > 20:
length = 2
else:
length = 1
score += length
breakdown['length'] = length
# 3. Repo uniqueness (first session mentioning a repo gets bonus)
repo_score = 0
for repo in meta.get('repos_mentioned', []):
if repo not in seen_repos:
seen_repos.add(repo)
repo_score = max(repo_score, 5)
else:
repo_score = max(repo_score, 1)
score += repo_score
breakdown['repo_unique'] = repo_score
# 4. Outcome
if meta.get('is_failure'):
outcome = 3
elif meta.get('last_content_preview', '').strip():
outcome = 2 # has some content = likely completed
else:
outcome = 1
score += outcome
breakdown['outcome'] = outcome
# 5. Tool calls
if meta.get('tool_call_count', 0) > 10:
tool = 2
else:
tool = 0
score += tool
breakdown['tool_calls'] = tool
return score, breakdown
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Score and rank sessions for harvesting")
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
help='Directory containing session files')
parser.add_argument('--count', type=int, default=100, help='Number of top sessions to return')
parser.add_argument('--repo', default='', help='Filter to sessions mentioning this repo')
parser.add_argument('--since', default='', help='Only score sessions after this date (YYYY-MM-DD)')
parser.add_argument('--min-score', type=int, default=0, help='Minimum score threshold')
parser.add_argument('--output', default='', help='Output file (JSON). Default: stdout')
parser.add_argument('--format', choices=['json', 'paths', 'table'], default='table',
help='Output format: json (full), paths (one per line), table (human)')
parser.add_argument('--top-percent', type=float, default=0, help='Return top N%% instead of --count')
args = parser.parse_args()
sessions_dir = Path(args.sessions_dir)
if not sessions_dir.is_dir():
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
sys.exit(1)
# Find all JSONL files
print(f"Scanning {sessions_dir}...", file=sys.stderr)
t0 = time.time()
session_files = list(sessions_dir.glob('*.jsonl'))
total = len(session_files)
print(f"Found {total} session files", file=sys.stderr)
# Parse since date
since_dt = None
if args.since:
since_dt = datetime.strptime(args.since, '%Y-%m-%d').replace(tzinfo=timezone.utc)
# Score all sessions
now = datetime.now(timezone.utc)
seen_repos = set() # Track repos for uniqueness scoring
scored = []
for i, sf in enumerate(session_files):
# Date filter (fast path: check filename first)
if since_dt:
ts = parse_session_timestamp(sf.name)
if ts and ts < since_dt:
continue
meta = scan_session_fast(str(sf))
# Repo filter
if args.repo:
repos = meta.get('repos_mentioned', [])
if args.repo.lower() not in [r.lower() for r in repos]:
# Also check filename
if args.repo.lower() not in sf.name.lower():
continue
score, breakdown = score_session(meta, now, seen_repos)
if score >= args.min_score:
scored.append({
'path': str(sf),
'filename': sf.name,
'score': score,
'breakdown': breakdown,
'message_count': meta['message_count'],
'repos': meta['repos_mentioned'],
'is_failure': meta['is_failure'],
})
if (i + 1) % 5000 == 0:
elapsed = time.time() - t0
print(f" Scanned {i + 1}/{total} ({elapsed:.1f}s)", file=sys.stderr)
elapsed = time.time() - t0
print(f"Scored {len(scored)} sessions in {elapsed:.1f}s", file=sys.stderr)
# Sort by score descending
scored.sort(key=lambda x: x['score'], reverse=True)
# Apply count or percent
if args.top_percent > 0:
count = max(1, int(len(scored) * args.top_percent / 100))
else:
count = args.count
scored = scored[:count]
# Output
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(scored, f, indent=2)
print(f"Wrote {len(scored)} sessions to {args.output}", file=sys.stderr)
elif args.format == 'json':
json.dump(scored, sys.stdout, indent=2)
elif args.format == 'paths':
for s in scored:
print(s['path'])
else: # table
print(f"{'SCORE':>5} {'MSGS':>5} {'REPOS':<25} {'FILE'}")
print(f"{'-'*5} {'-'*5} {'-'*25} {'-'*40}")
for s in scored:
repos = ', '.join(s['repos'][:2]) if s['repos'] else '-'
fail = ' FAIL' if s['is_failure'] else ''
print(f"{s['score']:>5} {s['message_count']:>5} {repos:<25} {s['filename'][:40]}{fail}")
if __name__ == '__main__':
main()

224
tests/test_diff_analyzer.py Normal file
View File

@@ -0,0 +1,224 @@
"""Tests for diff_analyzer module."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'scripts'))
from diff_analyzer import DiffAnalyzer, ChangeCategory
def test_parse_simple_addition():
diff = """diff --git a/foo.py b/foo.py
new file mode 100644
--- /dev/null
+++ b/foo.py
@@ -0,0 +1,3 @@
+def hello():
+ return "world"
+# end
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_added == 1
assert summary.files_modified == 0
assert summary.total_added == 3
assert summary.total_deleted == 0
assert summary.hunks_added == 1
assert len(summary.file_changes) == 1
assert summary.file_changes[0].is_new is True
assert summary.file_changes[0].path == "foo.py"
def test_parse_simple_deletion():
diff = """diff --git a/old.py b/old.py
deleted file mode 100644
--- a/old.py
+++ /dev/null
@@ -1,2 +0,0 @@
-x = 1
-y = 2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_deleted == 1
assert summary.total_deleted == 2
assert summary.total_added == 0
assert summary.hunks_deleted == 1
assert summary.file_changes[0].is_deleted is True
def test_parse_modification():
diff = """diff --git a/bar.py b/bar.py
--- a/bar.py
+++ b/bar.py
@@ -10,3 +10,4 @@ def foo():
existing()
- old_call()
+ new_call()
+ extra_step()
return
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_modified == 1
assert summary.total_added == 2 # +new_call(), +extra_step()
assert summary.total_deleted == 1 # -old_call()
assert summary.total_context == 2 # 2 context lines
assert summary.hunks_modified == 1
def test_parse_multiple_files():
diff = """diff --git a/a.py b/a.py
--- a/a.py
+++ b/a.py
@@ -1,1 +1,2 @@
existing
+added
diff --git a/b.py b/b.py
new file mode 100644
--- /dev/null
+++ b/b.py
@@ -0,0 +1,1 @@
+new file
diff --git a/c.py b/c.py
deleted file mode 100644
--- a/c.py
+++ /dev/null
@@ -1,1 +0,0 @@
-gone
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 3
assert summary.files_added == 1
assert summary.files_deleted == 1
assert summary.files_modified == 1
assert summary.total_added == 2
assert summary.total_deleted == 1
def test_parse_rename():
diff = """diff --git a/old_name.py b/new_name.py
rename from old_name.py
rename to new_name.py
--- a/old_name.py
+++ b/new_name.py
@@ -1,1 +1,1 @@
-old_func()
+new_func()
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.files_renamed == 1
assert summary.file_changes[0].is_renamed is True
assert summary.file_changes[0].old_path == "old_name.py"
assert summary.file_changes[0].path == "new_name.py"
def test_parse_mixed_hunks():
"""A file with one add hunk and one delete hunk."""
diff = """diff --git a/mixed.py b/mixed.py
--- a/mixed.py
+++ b/mixed.py
@@ -5,0 +6,2 @@
+new_line_1
+new_line_2
@@ -20,2 +22,0 @@
-removed_1
-removed_2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.hunks_added == 1
assert summary.hunks_deleted == 1
assert summary.total_added == 2
assert summary.total_deleted == 2
def test_empty_diff():
analyzer = DiffAnalyzer()
summary = analyzer.analyze("")
assert summary.files_changed == 0
assert summary.total_added == 0
assert summary.total_deleted == 0
def test_to_dict():
diff = """diff --git a/test.py b/test.py
new file mode 100644
--- /dev/null
+++ b/test.py
@@ -0,0 +1,2 @@
+line1
+line2
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
d = summary.to_dict()
assert d["files_changed"] == 1
assert d["files_added"] == 1
assert d["total_added"] == 2
assert d["total_deleted"] == 0
assert len(d["files"]) == 1
assert d["files"][0]["path"] == "test.py"
assert d["files"][0]["is_new"] is True
def test_context_only_hunk():
"""A hunk with only context lines (rare but possible)."""
diff = """diff --git a/noop.py b/noop.py
--- a/noop.py
+++ b/noop.py
@@ -5,3 +5,3 @@
context1
context2
context3
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.total_context == 3
assert summary.total_added == 0
assert summary.total_deleted == 0
def test_binary_files_skipped():
"""Binary file diffs have no content lines — just headers."""
diff = """diff --git a/image.png b/image.png
--- a/image.png
+++ b/image.png
Binary files a/image.png and b/image.png differ
"""
analyzer = DiffAnalyzer()
summary = analyzer.analyze(diff)
assert summary.files_changed == 1
assert summary.total_added == 0
assert summary.total_deleted == 0
if __name__ == "__main__":
test_parse_simple_addition()
test_parse_simple_deletion()
test_parse_modification()
test_parse_multiple_files()
test_parse_rename()
test_parse_mixed_hunks()
test_empty_diff()
test_to_dict()
test_context_only_hunk()
test_binary_files_skipped()
print("All 10 tests passed.")