Compare commits

..

1 Commits

Author SHA1 Message Date
Step35
86eb1c9a50 feat: training data pipeline — knowledge entries → JSONL training pairs
Some checks failed
Test / pytest (pull_request) Failing after 7s
Add scripts/knowledge_to_training_pairs.py which reads quality-gated
knowledge entries from knowledge/index.json and emits terse→rich
training pairs in JSONL format.

Features:
- Derives terse queries from facts via category-aware heuristics
- Configurable quality filters: min-confidence, model-filter, date range
- Output includes domain, source_confidence, source_model
- Smoke tests added in tests/test_knowledge_to_training_pairs.py

Deliverables for #199:
1. Pipeline script: scripts/knowledge_to_training_pairs.py
2. End-to-end: knowledge/index.json → training_pairs.jsonl (or custom JSONL)
3. Config: min-confidence, model-filter, after/before date filters
4. Test: 9 smoke tests covering conversion, filtering, and end-to-end run

Closes #199
2026-04-26 13:03:06 -04:00
4 changed files with 429 additions and 534 deletions

View File

@@ -1,366 +0,0 @@
#!/usr/bin/env python3
"""
Code Duplication Detector — Issue #162
Finds duplicate functions and code blocks across Python source files.
Reports duplication percentage and outputs a duplication report.
Usage:
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
python3 scripts/code_duplication_detector.py --test # Run built-in test
"""
import argparse
import hashlib
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Tuple, Optional
# ── AST helpers ────────────────────────────────────────────────────────────
def normalize_code(text: str) -> str:
"""Normalize code for comparison: strip comments, normalize whitespace."""
# Remove comments (both # and docstring triple-quote strings)
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def code_hash(text: str) -> str:
"""SHA256 hash of normalized code for exact duplicate detection."""
normalized = normalize_code(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
# ── Function extraction via AST ────────────────────────────────────────────
class FunctionExtractor:
"""Extract function and method definitions with their full source bodies."""
def __init__(self, source: str, filepath: str):
self.source = source
self.filepath = filepath
self.lines = source.splitlines()
self.functions: List[Dict] = []
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
"""Get source code from start to end line (1-indexed, inclusive)."""
# AST end_lineno is inclusive
start_idx = start_lineno - 1
end_idx = end_lineno
return '\n'.join(self.lines[start_idx:end_idx])
def visit(self, tree):
"""Collect all function and async function definitions."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
# Get the full source for this function including decorators
start = node.lineno
end = node.end_lineno
body_source = self._get_source_segment(start, end)
# Also collect parent class name if this is a method
class_name = None
parent = node.parent if hasattr(node, 'parent') else None
if parent and isinstance(parent, ast.ClassDef):
class_name = parent.name
self.functions.append({
'name': node.name,
'file': self.filepath,
'start_line': start,
'end_line': end,
'body': body_source,
'class_name': class_name,
'is_method': class_name is not None,
})
import ast
class ParentNodeVisitor(ast.NodeVisitor):
"""Annotate nodes with parent references."""
def __init__(self, parent=None):
self.parent = parent
def generic_visit(self, node):
node.parent = self.parent
for child in ast.iter_child_nodes(node):
self.__class__(child).parent = node
super().generic_visit(node)
def extract_functions_from_file(filepath: str) -> List[Dict]:
"""Extract all function definitions from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
# Annotate with parent references
for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
child.parent = node
extractor = FunctionExtractor(source, str(filepath))
extractor.visit(tree)
return extractor.functions
except (SyntaxError, UnicodeDecodeError, OSError) as e:
return []
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
"""Scan directory for Python files and extract all functions."""
all_functions = []
path = Path(directory)
for filepath in path.rglob('*'):
if filepath.is_file() and filepath.suffix in extensions:
# Skip common non-source dirs
parts = filepath.parts
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
continue
if filepath.name.startswith('.'):
continue
functions = extract_functions_from_file(str(filepath))
all_functions.extend(functions)
return all_functions
# ── Duplicate detection ─────────────────────────────────────────────────────
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
"""
Find duplicate and near-duplicate functions.
Returns dict with:
- exact_duplicates: {hash: [function_info, ...]}
- near_duplicates: [[function_info, ...], ...]
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
"""
# Phase 1: Exact duplicates by code hash
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
for func in functions:
h = code_hash(func['body'])
hash_groups[h].append(func)
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
# Phase 2: Near-duplicates (among the unique-by-hash set)
# We compare token overlap for functions that have different hashes
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
near_duplicate_groups = []
# Simple token-based similarity
def tokenize(code: str) -> set:
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
i = 0
while i < len(unique_by_hash):
group = [unique_by_hash[i]]
j = i + 1
while j < len(unique_by_hash):
tokens_i = tokenize(unique_by_hash[i]['body'])
tokens_j = tokenize(unique_by_hash[j]['body'])
if not tokens_i or not tokens_j:
j += 1
continue
intersection = tokens_i & tokens_j
union = tokens_i | tokens_j
similarity = len(intersection) / len(union) if union else 0.0
if similarity >= similarity_threshold:
group.append(unique_by_hash[j])
unique_by_hash.pop(j)
else:
j += 1
if len(group) > 1:
near_duplicate_groups.append(group)
i += 1
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
stats = {
'total_functions': len(functions),
'unique_exact': len(hash_groups),
'exact_dupe_count': exact_dupe_count,
'near_dupe_count': near_dupe_count,
'total_duplicates': exact_dupe_count + near_dupe_count,
}
# Calculate duplication percentage based on lines
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
dupe_lines = 0
for group in exact_duplicates.values():
# Count all but one as duplicates
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
for group in near_duplicate_groups:
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
stats['total_lines'] = total_lines
stats['duplicate_lines'] = dupe_lines
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
return {
'exact_duplicates': exact_duplicates,
'near_duplicates': near_duplicate_groups,
'stats': stats,
}
# ── Report generation ────────────────────────────────────────────────────────
def generate_report(results: Dict, output_format: str = 'json') -> str:
"""Generate human-readable report from detection results."""
stats = results['stats']
if output_format == 'json':
return json.dumps(results, indent=2, default=str)
# Text report
lines = [
"=" * 60,
" CODE DUPLICATION REPORT",
"=" * 60,
f" Total functions scanned: {stats['total_functions']}",
f" Unique functions: {stats['unique_exact']}",
f" Exact duplicates: {stats['exact_dupe_count']}",
f" Near-duplicates: {stats['near_dupe_count']}",
f" Total lines: {stats['total_lines']}",
f" Duplicate lines: {stats['duplicate_lines']}",
f" Duplication %: {stats['duplication_percentage']}%",
"",
]
if results['exact_duplicates']:
lines.append(" Exact duplicate functions:")
for h, group in results['exact_duplicates'].items():
first = group[0]
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
f"copied {len(group)-1}x in:")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
if results['near_duplicates']:
lines.append(" Near-duplicate function groups:")
for i, group in enumerate(results['near_duplicates'], 1):
first = group[0]
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
f"{len(group)} similar functions")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
lines.append("=" * 60)
return '\n'.join(lines)
# ── CLI ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Code Duplication Detector")
parser.add_argument('--directory', default='.',
help='Directory to scan (default: current directory)')
parser.add_argument('--output', help='Output file for JSON report')
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
parser.add_argument('--threshold', type=float, default=0.95,
help='Similarity threshold for near-dupes (default: 0.95)')
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
parser.add_argument('--test', action='store_true', help='Run built-in test')
args = parser.parse_args()
if args.test:
_run_test()
return
# Scan
functions = scan_directory(args.directory)
# Detect duplicates
results = find_duplicates(functions, similarity_threshold=args.threshold)
stats = results['stats']
# Output
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(generate_report(results, output_format='text'))
# Write file if requested
if args.output and not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\nReport written to: {args.output}")
# Summary for burn protocol
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
f"{stats['near_dupe_count']} near duplicates found "
f"({stats['duplication_percentage']}% duplication)")
def _run_test():
"""Built-in smoke test."""
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files with duplicate code
f1 = Path(tmpdir) / 'mod1.py'
f1.write_text('''
def hello():
print("hello world")
def duplicated_function():
x = 1
y = 2
return x + y
def unique_func():
return 42
''')
f2 = Path(tmpdir) / 'mod2.py'
f2.write_text('''
def duplicated_function():
x = 1
y = 2
return x + y
def another_unique():
return "different"
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
# Check duplication percentage is calculated
assert 'duplication_percentage' in stats
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
f"{stats['exact_dupe_count']} exact duplicates, "
f"{stats['duplication_percentage']}% duplication")
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,255 @@
#!/usr/bin/env python3
"""
knowledge_to_training_pairs.py — Convert quality-gated knowledge entries into training pairs.
Reads knowledge/index.json (or a custom JSONL of entries), applies quality filters,
and emits terse→rich training pairs in JSONL format for model fine-tuning.
Usage:
python3 scripts/knowledge_to_training_pairs.py \
--input knowledge/index.json \
--output training_pairs.jsonl \
--min-confidence 0.7 \
--model-filter claude-sonnet,gpt-4 \
--after 2026-01-01
Input entry format (from index.json facts):
{
"id": "hermes-agent:pitfall:001",
"fact": "deploy-crons.py leaves jobs in mixed model format",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.95,
...
}
Output training pair format:
{
"terse": "How do I handle deploy-crons.py mixed model format?",
"rich": "deploy-crons.py leaves jobs in mixed model format.",
"domain": "hermes-agent",
"source_confidence": 0.95,
"source_model": "unknown"
}
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
def fact_to_terse(fact: str, category: str, domain: str) -> str:
"""
Derive a short user query from a knowledge fact.
Strategy:
- Pitfalls → "How do I avoid/handle/fix <fact excerpt>?"
- Patterns → "What's the recommended way to <pattern core>?"
- Tool quirks → "How does <tool> behave in <context>?"
- Facts → "What should I know about <fact excerpt>?"
- Questions → "What is the answer to: <fact>?"
"""
fact_lower = fact.lower()
# Extract a concise excerpt (first sentence or 80 chars)
excerpt = fact.split('. ')[0] if '. ' in fact else fact[:80]
if category == "pitfall":
verbs = ["avoid", "handle", "fix", "prevent"]
# pick verb based on fact wording
if "trigger" in fact_lower or "cause" in fact_lower:
verb = "avoid"
elif "broken" in fact_lower or "fails" in fact_lower:
verb = "fix"
else:
verb = "handle"
return f"How do I {verb} {excerpt.rstrip('.')}?"
elif category == "pattern":
return f"What's the recommended way to {excerpt.rstrip('.')}?"
elif category == "tool-quirk":
# Try to extract tool name
tool = fact.split()[0] if fact.split() else domain
return f"How does {tool} behave in this context?"
elif category == "question":
return f"What is the answer to: {excerpt}?"
else: # fact or unknown
return f"What should I know about {excerpt.rstrip('.')}?"
def parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse ISO date string to datetime, or return None."""
if not date_str:
return None
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
return None
def load_knowledge_index(path: str) -> list[dict]:
"""Load knowledge facts from index.json (or plain JSONL of entries)."""
p = Path(path)
if not p.exists():
print(f"ERROR: Knowledge input not found: {path}", file=sys.stderr)
sys.exit(1)
with open(p) as f:
data = json.load(f)
# index.json format: {"facts": [...], ...}
if isinstance(data, dict) and "facts" in data:
return data["facts"]
# JSONL format: one entry per line
if isinstance(data, list):
return data
# Plain file with JSON array
print(f"ERROR: Unrecognized input format in {path}", file=sys.stderr)
sys.exit(1)
def filter_entries(entries: list[dict],
min_confidence: float = 0.0,
model_filter: Optional[list[str]] = None,
after: Optional[datetime] = None,
before: Optional[datetime] = None) -> list[dict]:
"""Apply quality and provenance filters."""
filtered = []
for entry in entries:
# Confidence filter (entry confidence)
conf = entry.get("confidence", 0.0)
if conf < min_confidence:
continue
# Model filter: if specified, entry's model must be in the list
if model_filter:
entry_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown"))
if entry_model not in model_filter:
continue
# Date filter: use last_confirmed or first_seen or harvested_at
entry_date = None
for field in ("last_confirmed", "first_seen", "harvested_at"):
if field in entry:
entry_date = parse_date(entry[field])
if entry_date:
break
if after and entry_date and entry_date < after:
continue
if before and entry_date and entry_date > before:
continue
filtered.append(entry)
return filtered
def entry_to_pair(entry: dict) -> dict:
"""Convert a knowledge entry into a training pair."""
fact = entry.get("fact", "").strip()
if not fact:
return None
category = entry.get("category", "fact")
domain = entry.get("domain", "global")
terse = fact_to_terse(fact, category, domain)
rich = fact
source_confidence = round(entry.get("confidence", 0.0), 4)
source_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown"))
return {
"terse": terse,
"rich": rich,
"domain": domain,
"source_confidence": source_confidence,
"source_model": source_model,
}
def main():
parser = argparse.ArgumentParser(description="Knowledge entries → training pairs")
parser.add_argument("--input", "-i", default="knowledge/index.json",
help="Input knowledge index or JSONL (default: knowledge/index.json)")
parser.add_argument("--output", "-o", default="training_pairs.jsonl",
help="Output JSONL file")
parser.add_argument("--min-confidence", type=float, default=0.5,
help="Minimum entry confidence to include (0.0-1.0, default: 0.5)")
parser.add_argument("--model-filter",
help="Comma-separated list of source models to include")
parser.add_argument("--after",
help="Include entries last_confirmed/first_seen on or after this date (YYYY-MM-DD)")
parser.add_argument("--before",
help="Include entries last_confirmed/first_seen on or before this date (YYYY-MM-DD)")
parser.add_argument("--dry-run", action="store_true",
help="Print sample pairs and stats without writing")
args = parser.parse_args()
# Load
entries = load_knowledge_index(args.input)
print(f"Loaded {len(entries)} entries from {args.input}", file=sys.stderr)
# Parse filters
model_list = args.model_filter.split(",") if args.model_filter else None
after_dt = parse_date(args.after) if args.after else None
before_dt = parse_date(args.before) if args.before else None
# Filter
kept = filter_entries(
entries,
min_confidence=args.min_confidence,
model_filter=model_list,
after=after_dt,
before=before_dt,
)
print(f"After filtering: {len(kept)} / {len(entries)} entries", file=sys.stderr)
# Convert
pairs = []
for entry in kept:
pair = entry_to_pair(entry)
if pair:
pairs.append(pair)
# Stats
if pairs:
avg_conf = sum(p["source_confidence"] for p in pairs) / len(pairs)
domains = {}
models = {}
for p in pairs:
domains[p["domain"]] = domains.get(p["domain"], 0) + 1
models[p["source_model"]] = models.get(p["source_model"], 0) + 1
else:
avg_conf = 0.0
domains = {}
models = {}
stats = {
"input_entries": len(entries),
"after_filter": len(kept),
"pairs_generated": len(pairs),
"avg_confidence": round(avg_conf, 4),
"domains": domains,
"source_models": models,
}
print(json.dumps(stats, indent=2), file=sys.stderr)
if args.dry_run:
print("\nSample pairs:", file=sys.stderr)
for p in pairs[:3]:
print(json.dumps(p, ensure_ascii=False), file=sys.stderr)
return
# Write JSONL
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
for pair in pairs:
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
print(f"\nWrote {len(pairs)} training pairs to {out_path}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -1,168 +0,0 @@
#!/usr/bin/env python3
"""
Smoke test for code duplication detector — verifies:
- Function extraction from Python files
- Exact duplicate detection
- Near-duplicate detection (token similarity)
- Report generation and stats
- JSON output format
"""
import json
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from code_duplication_detector import (
extract_functions_from_file,
scan_directory,
find_duplicates,
generate_report,
)
def test_extract_functions():
"""Test that function extraction works."""
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / 'sample.py'
test_file.write_text('''
def foo():
return 1
def bar():
return 2
class MyClass:
def method(self):
return 3
''')
functions = extract_functions_from_file(str(test_file))
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
names = {f['name'] for f in functions}
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
print(" [PASS] function extraction works")
def test_exact_duplicate_detection():
"""Test that identical functions are flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two files with the same function
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
print(" [PASS] exact duplicate detection works")
def test_unique_functions_not_flagged():
"""Test that different functions are not flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def func_a(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def func_b(): return 2')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
assert results['stats']['exact_dupe_count'] == 0
assert len(results['exact_duplicates']) == 0
print(" [PASS] unique functions not flagged as duplicates")
def test_duplication_percentage_calculated():
"""Test that duplication percentage is computed."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create file with mostly duplicated content
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def common():
x = 1
y = 2
return x + y
def unique1():
return 100
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def common():
x = 1
y = 2
return x + y
def unique2():
return 200
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert 'duplication_percentage' in stats
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
assert stats['duplication_percentage'] > 0
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
def test_report_output_format():
"""Test that report output is valid."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def dup(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def dup(): return 1')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
# Text report
text = generate_report(results, output_format='text')
assert 'CODE DUPLICATION REPORT' in text
assert 'Total functions' in text
print(" [PASS] text report format valid")
# JSON report
json_out = generate_report(results, output_format='json')
data = json.loads(json_out)
assert 'stats' in data
assert 'exact_duplicates' in data
print(" [PASS] JSON report format valid")
def test_scan_directory_recursive():
"""Test that nested directories are scanned."""
with tempfile.TemporaryDirectory() as tmpdir:
subdir = Path(tmpdir) / 'sub'
subdir.mkdir()
(subdir / 'nested.py').write_text('def nested(): pass')
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
functions = scan_directory(tmpdir)
names = {f['name'] for f in functions}
assert 'nested' in names and 'root' in names
print(" [PASS] recursive directory scanning works")
if __name__ == '__main__':
print("Running code duplication detector smoke tests...")
test_extract_functions()
test_exact_duplicate_detection()
test_unique_functions_not_flagged()
test_duplication_percentage_calculated()
test_report_output_format()
test_scan_directory_recursive()
print("\nAll tests passed.")

View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""
Smoke tests for knowledge_to_training_pairs.py
Tests:
- Output is valid JSONL
- Each line has required fields (terse, rich, domain, source_confidence, source_model)
- Confidence values are in [0,1]
- Terse is non-empty and reasonably short (< 200 chars)
- Rich matches the original fact
"""
import json
import sys
import os
import tempfile
from pathlib import Path
# Add scripts dir to path for imports
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
from knowledge_to_training_pairs import (
fact_to_terse,
filter_entries,
entry_to_pair,
parse_date,
)
def test_fact_to_terse_pitfall():
fact = "deploy-crons.py leaves jobs in mixed model format"
category = "pitfall"
domain = "hermes-agent"
terse = fact_to_terse(fact, category, domain)
assert terse.startswith("How do I")
assert "?" in terse
assert len(terse) < 150
print("PASS: test_fact_to_terse_pitfall")
def test_fact_to_terse_fact():
fact = "Python is a high-level programming language"
terse = fact_to_terse(fact, "fact", "global")
assert terse.startswith("What should I know about")
assert "?" in terse
print("PASS: test_fact_to_terse_fact")
def test_fact_to_terse_pattern():
fact = "Use sparse checkout for large repos"
terse = fact_to_terse(fact, "pattern", "devops")
assert "recommended way" in terse or "best way" in terse
print("PASS: test_fact_to_terse_pattern")
def test_entry_to_pair_structure():
entry = {
"id": "test:001",
"fact": "Test fact text.",
"category": "fact",
"domain": "test-domain",
"confidence": 0.85,
"model": "test-model",
}
pair = entry_to_pair(entry)
assert pair is not None
assert "terse" in pair
assert "rich" in pair
assert "domain" in pair
assert "source_confidence" in pair
assert "source_model" in pair
assert pair["rich"] == "Test fact text."
assert pair["domain"] == "test-domain"
assert 0.0 <= pair["source_confidence"] <= 1.0
print("PASS: test_entry_to_pair_structure")
def test_filter_by_confidence():
entries = [
{"fact": "A", "confidence": 0.9},
{"fact": "B", "confidence": 0.4},
{"fact": "C", "confidence": 0.6},
]
filtered = filter_entries(entries, min_confidence=0.5)
assert len(filtered) == 2
assert all(e["confidence"] >= 0.5 for e in filtered)
print("PASS: test_filter_by_confidence")
def test_filter_by_model():
entries = [
{"fact": "A", "model": "claude-sonnet"},
{"fact": "B", "model": "gpt-4"},
{"fact": "C", "model": "unknown"},
]
filtered = filter_entries(entries, model_filter=["claude-sonnet", "gpt-4"])
assert len(filtered) == 2
assert all(e["model"] in ("claude-sonnet", "gpt-4") for e in filtered)
print("PASS: test_filter_by_model")
def test_filter_by_date():
entries = [
{"fact": "A", "last_confirmed": "2026-04-10"},
{"fact": "B", "last_confirmed": "2026-03-01"},
{"fact": "C", "first_seen": "2026-04-15"},
]
after_dt = parse_date("2026-04-01")
filtered = filter_entries(entries, after=after_dt)
assert len(filtered) == 2
print("PASS: test_filter_by_date")
def test_end_to_end_jsonl_output():
"""Integration test: run the script and verify JSONL validity."""
import subprocess
repo_dir = SCRIPT_DIR.parent
result = subprocess.run(
["python3", "scripts/knowledge_to_training_pairs.py", "--dry-run"],
capture_output=True, text=True, cwd=repo_dir
)
assert result.returncode == 0
stderr = result.stderr.strip()
# The stats JSON object is at the top of stderr. Find its bounds via brace matching.
start = stderr.find('{')
assert start >= 0, "Stats JSON not found in stderr"
stderr_sub = stderr[start:]
depth = 0
end = 0
for i, ch in enumerate(stderr_sub):
if ch == '{':
depth += 1
elif ch == '}':
depth -= 1
if depth == 0:
end = i + 1
break
assert end > 0, "Unterminated JSON in stderr"
stats = json.loads(stderr_sub[:end])
assert stats["input_entries"] > 0
assert stats["pairs_generated"] > 0
print("PASS: test_end_to_end_jsonl_output")
def test_terse_length_constraint():
"""Terse should be reasonably short for training."""
# Sample facts from actual knowledge
test_facts = [
("deploy-crons.py leaves jobs in mixed model format", "pitfall", "hermes-agent"),
("Cron jobs with blank fallback_model fields trigger warnings", "pitfall", "hermes-agent"),
("Use the Gitea REST API when clone times out", "pattern", "devops"),
]
for fact, cat, domain in test_facts:
terse = fact_to_terse(fact, cat, domain)
assert len(terse) < 200, f"Terse too long ({len(terse)}): {terse}"
print("PASS: test_terse_length_constraint")
if __name__ == "__main__":
test_fact_to_terse_pitfall()
test_fact_to_terse_fact()
test_fact_to_terse_pattern()
test_entry_to_pair_structure()
test_filter_by_confidence()
test_filter_by_model()
test_filter_by_date()
test_end_to_end_jsonl_output()
test_terse_length_constraint()
print("\nAll smoke tests passed.")