Compare commits

..

1 Commits

Author SHA1 Message Date
Rockachopa
180464cc5e feat: add code duplication detector (#162)
Some checks failed
Test / pytest (pull_request) Failing after 12s
Detect duplicate functions/blocks across Python files.
Reports duplication percentage and outputs JSON report.
Closes #162.
2026-04-26 11:19:30 -04:00
5 changed files with 534 additions and 20640 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,366 @@
#!/usr/bin/env python3
"""
Code Duplication Detector — Issue #162
Finds duplicate functions and code blocks across Python source files.
Reports duplication percentage and outputs a duplication report.
Usage:
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
python3 scripts/code_duplication_detector.py --test # Run built-in test
"""
import argparse
import hashlib
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Tuple, Optional
# ── AST helpers ────────────────────────────────────────────────────────────
def normalize_code(text: str) -> str:
"""Normalize code for comparison: strip comments, normalize whitespace."""
# Remove comments (both # and docstring triple-quote strings)
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def code_hash(text: str) -> str:
"""SHA256 hash of normalized code for exact duplicate detection."""
normalized = normalize_code(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
# ── Function extraction via AST ────────────────────────────────────────────
class FunctionExtractor:
"""Extract function and method definitions with their full source bodies."""
def __init__(self, source: str, filepath: str):
self.source = source
self.filepath = filepath
self.lines = source.splitlines()
self.functions: List[Dict] = []
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
"""Get source code from start to end line (1-indexed, inclusive)."""
# AST end_lineno is inclusive
start_idx = start_lineno - 1
end_idx = end_lineno
return '\n'.join(self.lines[start_idx:end_idx])
def visit(self, tree):
"""Collect all function and async function definitions."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
# Get the full source for this function including decorators
start = node.lineno
end = node.end_lineno
body_source = self._get_source_segment(start, end)
# Also collect parent class name if this is a method
class_name = None
parent = node.parent if hasattr(node, 'parent') else None
if parent and isinstance(parent, ast.ClassDef):
class_name = parent.name
self.functions.append({
'name': node.name,
'file': self.filepath,
'start_line': start,
'end_line': end,
'body': body_source,
'class_name': class_name,
'is_method': class_name is not None,
})
import ast
class ParentNodeVisitor(ast.NodeVisitor):
"""Annotate nodes with parent references."""
def __init__(self, parent=None):
self.parent = parent
def generic_visit(self, node):
node.parent = self.parent
for child in ast.iter_child_nodes(node):
self.__class__(child).parent = node
super().generic_visit(node)
def extract_functions_from_file(filepath: str) -> List[Dict]:
"""Extract all function definitions from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
# Annotate with parent references
for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
child.parent = node
extractor = FunctionExtractor(source, str(filepath))
extractor.visit(tree)
return extractor.functions
except (SyntaxError, UnicodeDecodeError, OSError) as e:
return []
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
"""Scan directory for Python files and extract all functions."""
all_functions = []
path = Path(directory)
for filepath in path.rglob('*'):
if filepath.is_file() and filepath.suffix in extensions:
# Skip common non-source dirs
parts = filepath.parts
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
continue
if filepath.name.startswith('.'):
continue
functions = extract_functions_from_file(str(filepath))
all_functions.extend(functions)
return all_functions
# ── Duplicate detection ─────────────────────────────────────────────────────
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
"""
Find duplicate and near-duplicate functions.
Returns dict with:
- exact_duplicates: {hash: [function_info, ...]}
- near_duplicates: [[function_info, ...], ...]
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
"""
# Phase 1: Exact duplicates by code hash
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
for func in functions:
h = code_hash(func['body'])
hash_groups[h].append(func)
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
# Phase 2: Near-duplicates (among the unique-by-hash set)
# We compare token overlap for functions that have different hashes
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
near_duplicate_groups = []
# Simple token-based similarity
def tokenize(code: str) -> set:
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
i = 0
while i < len(unique_by_hash):
group = [unique_by_hash[i]]
j = i + 1
while j < len(unique_by_hash):
tokens_i = tokenize(unique_by_hash[i]['body'])
tokens_j = tokenize(unique_by_hash[j]['body'])
if not tokens_i or not tokens_j:
j += 1
continue
intersection = tokens_i & tokens_j
union = tokens_i | tokens_j
similarity = len(intersection) / len(union) if union else 0.0
if similarity >= similarity_threshold:
group.append(unique_by_hash[j])
unique_by_hash.pop(j)
else:
j += 1
if len(group) > 1:
near_duplicate_groups.append(group)
i += 1
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
stats = {
'total_functions': len(functions),
'unique_exact': len(hash_groups),
'exact_dupe_count': exact_dupe_count,
'near_dupe_count': near_dupe_count,
'total_duplicates': exact_dupe_count + near_dupe_count,
}
# Calculate duplication percentage based on lines
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
dupe_lines = 0
for group in exact_duplicates.values():
# Count all but one as duplicates
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
for group in near_duplicate_groups:
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
stats['total_lines'] = total_lines
stats['duplicate_lines'] = dupe_lines
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
return {
'exact_duplicates': exact_duplicates,
'near_duplicates': near_duplicate_groups,
'stats': stats,
}
# ── Report generation ────────────────────────────────────────────────────────
def generate_report(results: Dict, output_format: str = 'json') -> str:
"""Generate human-readable report from detection results."""
stats = results['stats']
if output_format == 'json':
return json.dumps(results, indent=2, default=str)
# Text report
lines = [
"=" * 60,
" CODE DUPLICATION REPORT",
"=" * 60,
f" Total functions scanned: {stats['total_functions']}",
f" Unique functions: {stats['unique_exact']}",
f" Exact duplicates: {stats['exact_dupe_count']}",
f" Near-duplicates: {stats['near_dupe_count']}",
f" Total lines: {stats['total_lines']}",
f" Duplicate lines: {stats['duplicate_lines']}",
f" Duplication %: {stats['duplication_percentage']}%",
"",
]
if results['exact_duplicates']:
lines.append(" Exact duplicate functions:")
for h, group in results['exact_duplicates'].items():
first = group[0]
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
f"copied {len(group)-1}x in:")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
if results['near_duplicates']:
lines.append(" Near-duplicate function groups:")
for i, group in enumerate(results['near_duplicates'], 1):
first = group[0]
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
f"{len(group)} similar functions")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
lines.append("=" * 60)
return '\n'.join(lines)
# ── CLI ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Code Duplication Detector")
parser.add_argument('--directory', default='.',
help='Directory to scan (default: current directory)')
parser.add_argument('--output', help='Output file for JSON report')
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
parser.add_argument('--threshold', type=float, default=0.95,
help='Similarity threshold for near-dupes (default: 0.95)')
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
parser.add_argument('--test', action='store_true', help='Run built-in test')
args = parser.parse_args()
if args.test:
_run_test()
return
# Scan
functions = scan_directory(args.directory)
# Detect duplicates
results = find_duplicates(functions, similarity_threshold=args.threshold)
stats = results['stats']
# Output
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(generate_report(results, output_format='text'))
# Write file if requested
if args.output and not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\nReport written to: {args.output}")
# Summary for burn protocol
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
f"{stats['near_dupe_count']} near duplicates found "
f"({stats['duplication_percentage']}% duplication)")
def _run_test():
"""Built-in smoke test."""
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files with duplicate code
f1 = Path(tmpdir) / 'mod1.py'
f1.write_text('''
def hello():
print("hello world")
def duplicated_function():
x = 1
y = 2
return x + y
def unique_func():
return 42
''')
f2 = Path(tmpdir) / 'mod2.py'
f2.write_text('''
def duplicated_function():
x = 1
y = 2
return x + y
def another_unique():
return "different"
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
# Check duplication percentage is calculated
assert 'duplication_percentage' in stats
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
f"{stats['exact_dupe_count']} exact duplicates, "
f"{stats['duplication_percentage']}% duplication")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""
Smoke test for code duplication detector — verifies:
- Function extraction from Python files
- Exact duplicate detection
- Near-duplicate detection (token similarity)
- Report generation and stats
- JSON output format
"""
import json
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from code_duplication_detector import (
extract_functions_from_file,
scan_directory,
find_duplicates,
generate_report,
)
def test_extract_functions():
"""Test that function extraction works."""
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / 'sample.py'
test_file.write_text('''
def foo():
return 1
def bar():
return 2
class MyClass:
def method(self):
return 3
''')
functions = extract_functions_from_file(str(test_file))
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
names = {f['name'] for f in functions}
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
print(" [PASS] function extraction works")
def test_exact_duplicate_detection():
"""Test that identical functions are flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two files with the same function
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
print(" [PASS] exact duplicate detection works")
def test_unique_functions_not_flagged():
"""Test that different functions are not flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def func_a(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def func_b(): return 2')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
assert results['stats']['exact_dupe_count'] == 0
assert len(results['exact_duplicates']) == 0
print(" [PASS] unique functions not flagged as duplicates")
def test_duplication_percentage_calculated():
"""Test that duplication percentage is computed."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create file with mostly duplicated content
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def common():
x = 1
y = 2
return x + y
def unique1():
return 100
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def common():
x = 1
y = 2
return x + y
def unique2():
return 200
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert 'duplication_percentage' in stats
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
assert stats['duplication_percentage'] > 0
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
def test_report_output_format():
"""Test that report output is valid."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def dup(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def dup(): return 1')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
# Text report
text = generate_report(results, output_format='text')
assert 'CODE DUPLICATION REPORT' in text
assert 'Total functions' in text
print(" [PASS] text report format valid")
# JSON report
json_out = generate_report(results, output_format='json')
data = json.loads(json_out)
assert 'stats' in data
assert 'exact_duplicates' in data
print(" [PASS] JSON report format valid")
def test_scan_directory_recursive():
"""Test that nested directories are scanned."""
with tempfile.TemporaryDirectory() as tmpdir:
subdir = Path(tmpdir) / 'sub'
subdir.mkdir()
(subdir / 'nested.py').write_text('def nested(): pass')
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
functions = scan_directory(tmpdir)
names = {f['name'] for f in functions}
assert 'nested' in names and 'root' in names
print(" [PASS] recursive directory scanning works")
if __name__ == '__main__':
print("Running code duplication detector smoke tests...")
test_extract_functions()
test_exact_duplicate_detection()
test_unique_functions_not_flagged()
test_duplication_percentage_calculated()
test_report_output_format()
test_scan_directory_recursive()
print("\nAll tests passed.")

View File

@@ -1,377 +0,0 @@
#!/usr/bin/env python3
"""
transcript_harvester.py — Rule-based knowledge extraction from Hermes session transcripts.
Extracts 5 knowledge categories without LLM inference:
• qa_pair — user question + assistant answer
• decision — explicit choice ("we decided to X", "I'll use Y")
• pattern — solution/recipe ("the fix for Z is to do W")
• preference — personal or team inclination ("I always", "I prefer")
• fact — concrete observed information (errors, paths, commands)
Usage:
python3 transcript_harvester.py --session ~/.hermes/sessions/session_xxx.jsonl
python3 transcript_harvester.py --batch --sessions-dir ~/.hermes/sessions --limit 50
python3 transcript_harvester.py --session session.jsonl --output knowledge/transcripts/
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Import session_reader from the same scripts directory
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session
# --- Pattern matchers --------------------------------------------------------
DECISION_PATTERNS = [
r"\b(we\s+(?:decided|chose|agreed|will|are going)\s+to\s+.*)",
r"\b(I\s+will\s+use|I\s+choose|I\s+am going\s+to)\s+.*",
r"\b(let's\s+(?:use|go\s+with|do|try))\s+.*",
r"\b(the\s+(?:decision|choice)\s+is)\s+.*",
r"\b(I'll\s+implement|I'll\s+deploy|I'll\s+create)\s+.*",
]
PATTERN_PATTERNS = [
r"\b(the\s+fix\s+for\s+.*\s+is\s+to\s+.*)",
r"\b(solution:?\s+.*)",
r"\b(approach:?\s+.*)",
r"\b(procedure:?\s+.*)",
r"\b(to\s+resolve\s+this.*?,\s+.*)",
r"\b(used\s+.*\s+to\s+.*)", # "used X to do Y"
r"\b(by\s+doing\s+.*\s+we\s+.*)",
r"\b(Here's\s+the\s+.*\s+process:?)", # "Here's the deployment process:"
r"\b(The\s+steps\s+are:?)",
r"\b(steps\s+to\s+.*:?)",
r"\b(Implementation\s+plan:?)",
r"\b(\d+\.\s+.*\n\d+\.)", # numbered multi-step (at least two steps detected by newlines)
]
PREFERENCE_PATTERNS = [
r"\b(I\s+(?:always|never|prefer|usually|typically|generally)\s+.*)",
r"\b(I\s+like\s+.*)",
r"\b(My\s+preference\s+is\s+.*)",
r"\b(Alexander\s+(?:prefers|always|never).*)",
r"\b(We\s+always\s+.*)",
]
ERROR_PATTERNS = [
r"\b(error|failed|fatal|exception|denied|could\s+not|couldn't)\b.*",
]
# For a fix that follows an error within 2 messages
FIX_INDICATORS = [
r"\b(fixed|resolved|added|generated|created|corrected|worked)\b",
r"\b(the\s+key\s+is|solution\s+was|generate\s+a\s+new)\b",
]
def is_decision(text: str) -> bool:
for p in DECISION_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_pattern(text: str) -> bool:
for p in PATTERN_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_preference(text: str) -> bool:
for p in PREFERENCE_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_error(text: str) -> bool:
for p in ERROR_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_fix_indicator(text: str) -> bool:
for p in FIX_INDICATORS:
if re.search(p, text, re.IGNORECASE):
return True
return False
# --- Extractors --------------------------------------------------------------
def extract_qa_pair(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a question→answer pair: user question followed by assistant answer."""
if idx + 1 >= len(messages):
return None
curr = messages[idx]
nxt = messages[idx + 1]
if curr.get('role') != 'user' or nxt.get('role') != 'assistant':
return None
question = curr.get('content', '').strip()
answer = nxt.get('content', '').strip()
if not question or not answer:
return None
# Must be a real question (ends with ? or starts with WH-)
if not (question.endswith('?') or re.match(r'^(how|what|why|when|where|who|which|can|do|is|are)', question, re.IGNORECASE)):
return None
# Skip very short answers ("OK", "Yes")
if len(answer.split()) < 3:
return None
return {
"type": "qa_pair",
"question": question,
"answer": answer,
"timestamp": curr.get('timestamp', ''),
}
def extract_decision(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a decision statement from assistant or user message."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_decision(text):
return None
return {
"type": "decision",
"decision": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_pattern(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a pattern or solution description."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_pattern(text):
return None
return {
"type": "pattern",
"pattern": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_preference(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a stated preference."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_preference(text):
return None
return {
"type": "preference",
"preference": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_error_fix(messages: list[dict], idx: int) -> Optional[dict]:
"""
Link an error to its fix. Catch two patterns:
1. Error statement followed by explicit fix indicator ("fixed", "resolved")
2. Error statement followed by a decision statement that fixes it ("I'll generate", "I'll add")
"""
msg = messages[idx]
if not is_error(msg.get('content', '')):
return None
error_text = msg.get('content', '').strip()
window = min(idx + 8, len(messages))
for j in range(idx + 1, window):
follow_up = messages[j]
follow_text = follow_up.get('content', '').strip()
# Check for explicit fix indicators
if is_fix_indicator(follow_text):
return {
"type": "error_fix",
"error": error_text,
"fix": follow_text,
"error_timestamp": msg.get('timestamp', ''),
"fix_timestamp": follow_up.get('timestamp', ''),
}
# Check for fix decision: "I'll <action>", "Let's <action>", "We need to <action>"
if re.match(r"^(I'll|I will|Let's|We (will|should|need to))\s+\w+", follow_text, re.IGNORECASE):
return {
"type": "error_fix",
"error": error_text,
"fix": follow_text,
"error_timestamp": msg.get('timestamp', ''),
"fix_timestamp": follow_up.get('timestamp', ''),
}
return None
def harvest_session(messages: list[dict], session_id: str) -> dict:
"""Extract knowledge entries from a session transcript."""
entries = []
n = len(messages)
for i in range(n):
# QA pairs
qa = extract_qa_pair(messages, i)
if qa:
qa['session_id'] = session_id
entries.append(qa)
# Decisions
dec = extract_decision(messages, i)
if dec:
dec['session_id'] = session_id
entries.append(dec)
# Patterns
pat = extract_pattern(messages, i)
if pat:
pat['session_id'] = session_id
entries.append(pat)
# Preferences
pref = extract_preference(messages, i)
if pref:
pref['session_id'] = session_id
entries.append(pref)
# Error/fix pairs (spanning multiple messages)
ef = extract_error_fix(messages, i)
if ef:
ef['session_id'] = session_id
entries.append(ef)
return {
"session_id": session_id,
"message_count": n,
"entries": entries,
"counts": {
"qa_pair": sum(1 for e in entries if e['type'] == 'qa_pair'),
"decision": sum(1 for e in entries if e['type'] == 'decision'),
"pattern": sum(1 for e in entries if e['type'] == 'pattern'),
"preference": sum(1 for e in entries if e['type'] == 'preference'),
"error_fix": sum(1 for e in entries if e['type'] == 'error_fix'),
}
}
def write_json_output(results: list[dict], output_path: Path):
"""Write aggregated results to JSON."""
all_entries = []
summary = {"sessions": 0}
for r in results:
summary['sessions'] += 1
all_entries.extend(r['entries'])
output = {
"harvester": "transcript_harvester",
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": summary,
"total_entries": len(all_entries),
"entries": all_entries,
}
output_path.write_text(json.dumps(output, indent=2, ensure_ascii=False))
return output
def write_report(results: list[dict], report_path: Path):
"""Write a human-readable markdown report."""
lines = []
lines.append("# Transcript Harvester Report")
lines.append(f"Generated: {datetime.now(timezone.utc).isoformat()}")
lines.append(f"Sessions processed: {len(results)}")
totals = {cat: 0 for cat in ['qa_pair', 'decision', 'pattern', 'preference', 'error_fix']}
for r in results:
for cat, cnt in r['counts'].items():
totals[cat] += cnt # BUG: should be += cnt
lines.append("\n## Extracted Knowledge by Category\n")
for cat, cnt in totals.items():
lines.append(f"- **{cat}**: {cnt}")
lines.append("\n## Sample Entries\n")
for r in results:
for entry in r['entries'][:3]:
lines.append(f"\n### {entry['type'].upper()} ({r['session_id']})\n")
if entry['type'] == 'qa_pair':
lines.append(f"**Q:** {entry['question']}\n")
lines.append(f"**A:** {entry['answer']}\n")
elif entry['type'] == 'decision':
lines.append(f"**Decision:** {entry['decision']}\n")
lines.append(f"By: {entry['by']}\n")
elif entry['type'] == 'pattern':
lines.append(f"**Pattern:** {entry['pattern']}\n")
elif entry['type'] == 'preference':
lines.append(f"**Preference:** {entry['preference']}\n")
elif entry['type'] == 'error_fix':
lines.append(f"**Error:** {entry['error']}\n")
lines.append(f"**Fixed by:** {entry['fix']}\n")
report_path.write_text("\n".join(lines))
def find_recent_sessions(sessions_dir: Path, limit: int = 50) -> list[Path]:
"""Find up to `limit` most recent .jsonl session files."""
sessions = sorted(sessions_dir.glob("*.jsonl"), reverse=True)
return sessions[:limit] if limit > 0 else sessions
def main():
parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts")
parser.add_argument('--session', help='Single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode')
parser.add_argument('--sessions-dir', default=str(Path.home() / '.hermes' / 'sessions'),
help='Directory of session files')
parser.add_argument('--output', default='knowledge/transcripts',
help='Output directory (default: knowledge/transcripts)')
parser.add_argument('--limit', type=int, default=50,
help='Max sessions to process in batch (default: 50)')
args = parser.parse_args()
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
results = []
if args.session:
messages = read_session(args.session)
session_id = Path(args.session).stem
results.append(harvest_session(messages, session_id))
elif args.batch:
sessions_dir = Path(args.sessions_dir)
sessions = find_recent_sessions(sessions_dir, args.limit)
print(f"Processing {len(sessions)} sessions...")
for sf in sessions:
messages = read_session(str(sf))
results.append(harvest_session(messages, sf.stem))
else:
parser.print_help()
sys.exit(1)
# Write outputs
json_path = output_dir / "transcript_knowledge.json"
report_path = output_dir / "transcript_report.md"
output = write_json_output(results, json_path)
write_report(results, report_path)
print(f"\nDone: {output['total_entries']} entries from {len(results)} sessions")
print(f"Output: {json_path}")
print(f"Report: {report_path}")
# Print category totals
totals = {}
for r in results:
for cat, cnt in r['counts'].items():
totals[cat] = totals.get(cat, 0) + cnt
print("\nCategory counts:")
for cat, cnt in sorted(totals.items()):
print(f" {cat}: {cnt}")
if __name__ == '__main__':
main()