Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Payne
c4586eb2be feat(review): add Review Quality Scorer (#127)
Some checks failed
Test / pytest (pull_request) Failing after 8s
Score PR reviews across 5 categories: style, logic, security,
performance, tests. Produces weighted composite and markdown report.

Closes #127
2026-04-26 12:22:25 -04:00
4 changed files with 600 additions and 155 deletions

340
scripts/review_quality_scorer.py Executable file
View File

@@ -0,0 +1,340 @@
#!/usr/bin/env python3
"""
review_quality_scorer.py — Evaluate code review quality.
Scores PR reviews on 5 dimensions (0-100 each):
- Style: formatting, naming, conventions, lint
- Logic: algorithmic correctness, edge cases, reasoning
- Security: vulnerabilities, auth/authz, data exposure
- Performance: efficiency, bottlenecks, resource usage
- Tests: coverage, test quality, missing tests
Produces a weighted composite score and a human-readable report.
Usage:
python3 review_quality_scorer.py --input reviews.json
python3 review_quality_scorer.py --pr 123 --org Timmy_Foundation --repo compounding-intelligence
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# Category weights (must sum to 1.0)
# ---------------------------------------------------------------------------
WEIGHTS = {
"style": 0.15,
"logic": 0.25,
"security": 0.25,
"performance": 0.15,
"tests": 0.20,
}
# ---------------------------------------------------------------------------
# Indicator patterns per category (presence suggests category was addressed)
# ---------------------------------------------------------------------------
STYLE_INDICATORS = [
r"\bstyle\b", r"\blint\b", r"\bformatting\b", r"\bnaming\b",
r"\bPEP8\b", r"\bblack\b", r"\bprettier\b", r"\bclang-format\b",
r"\bwhitespace\b", r"\bindentation\b", r"\bconsistent\b",
r"`(black|isort|flake8|eslint|prettier)`", r"\bformat\s+code\b",
r"\bstyle\s+guide\b", r"\bconventional\b",
]
LOGIC_INDICATORS = [
r"\blogic\b", r"\balgorithm\b", r"\bedge\s+case\b", r"\bredgecase\b",
r"\bincorrect\b", r"\bwrong\b", r"\bbug\b", r"\b flawed\b",
r"\bmissing\s+case\b", r"\bunhandled\b", r"\boverflow\b",
r"\bunderflow\b", r"\brace\b", r"\bcondition\b", r"\bcheck\s+this\b",
r"\bthink\s+about\b", r"\bwhat\s+happens\s+when\b",
r"\bcorrect\s+behavior\b", r"\bverify\s+logic\b",
]
SECURITY_INDICATORS = [
r"\bsecurity\b", r"\bvuln\b", r"\bCVE\b", r"\bexploit\b",
r"\battack\b", r"\bXSS\b", r"\bSQL\s+injection\b", r"\bRCE\b",
r"\bauthorization\b", r"\bauthentication\b", r"\bpermission\b",
r"\bsensitive\b", r"\bsecret\b", r"\bpassword\b", r"\btoken\b",
r"\bexposure\b", r"\bsanitize\b", r"\bescape\b", r"\bvalidate\b",
r"\binjection\b", r"\breadact\b", r"\bhardcode\b", r"\bcreds\b",
r"\bpublic\s+repo\b", r"\bexfil\b", r"\bleak\b",
]
PERFORMANCE_INDICATORS = [
r"\bperformance\b", r"\bslow\b", r"\boptimize\b", r"\bbottleneck\b",
r"\bcpu\b", r"\bmemory\b", r"\bleak\b", r"\bfast\b", r"\befficient\b",
r"\bcache\b", r"\boverhead\b", r"\bO\(n\)\b", r"\bcomplexity\b",
r"\bswap\b", r"\bpaging\b", r"\bmultiply\b", r"\bredundant\b",
r"\bperf\b", r"\bprofiling\b", r"\bprofiler\b", r"\bhot\s+path\b",
r"\batch\b", r"\block\b", r"\bthread\b", r"\bpool\b",
]
TESTS_INDICATORS = [
r"\btest\b", r"\btesting\b", r"\bcoverage\b", r"\bunittest\b",
r"\bpytest\b", r"\bassert\b", r"\bmock\b", r"\bstub\b",
r"\bfixture\b", r"\bspec\b", r"\bTDD\b", r"\bedge\s+case\b",
r"\bmissing\s+test\b", r"\bno\s+test\b", r"\btest\s+this\b",
r"\badd\s+tests\b", r"\btest\s+coverage\b", r"\bcoverage\s+gap\b",
r"\bregression\b", r"\bintegration\s+test\b", r"\bunit\s+test\b",
]
# Depth indicators (presence = deeper review)
DEPTH_MARKERS = [
r"\bwhy\b", r"\bbecause\b", r"\bexplain\b", r"\bconsider\b",
r"\balternative\b", r"\boption\b", r"\bsuggestion\b", r"\bfix\b",
r"\bupdate\b", r"\bchange\b", r"\bimprove\b", r"\bperhaps\b",
r"\bcould\s+also\b", r"\baside\b", r"\bfootnote\b",
]
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _category_presence_score(comments: List[str], indicators: List[str]) -> float:
"""Score 0-1 based on indicator keyword matches."""
if not comments:
return 0.0
text = " ".join(comments).lower()
hits = sum(len(re.findall(pat, text, re.IGNORECASE)) for pat in indicators)
# Normalize: 1 hit = 0.2, 2 = 0.4, ... cap at 1.0
return min(1.0, hits * 0.2)
def _depth_score(comments: List[str]) -> float:
"""Measure review depth: number of substantive comments."""
if not comments:
return 0.0
depth_markers = sum(
1 for c in comments
if len(c.split()) >= 10 and re.search("|".join(DEPTH_MARKERS), c, re.IGNORECASE)
)
# 0 comments → 0, 1-2 → 0.3-0.6, 3+ → 0.7+
return min(1.0, 0.1 + depth_markers * 0.3)
def _category_score(comments: List[str], indicators: List[str], weight: float) -> float:
"""Combined score: 60% presence, 40% depth."""
pres = _category_presence_score(comments, indicators)
depth = _depth_score(comments)
return round((0.6 * pres + 0.4 * depth) * 100, 1)
@dataclass
class ReviewQualityReport:
"""Quality Scores: one 0-100 per category + composite."""
style: float
logic: float
security: float
performance: float
tests: float
composite: float
breakdown: Dict[str, float]
comment_count: int
review_count: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_markdown(self) -> str:
lines = [
"# PR Review Quality Report",
"",
f"**Composite Score:** {self.composite:.1f} / 100",
f"**Reviews analyzed:** {self.review_count}",
f"**Comments found:** {self.comment_count}",
"",
"## Category Scores",
"| Category | Score | Weight | Contribution |",
"|----------|-------|--------|--------------|",
]
for cat in ["style", "logic", "security", "performance", "tests"]:
score = getattr(self, cat)
weight = WEIGHTS[cat]
contrib = score * weight
bar = "" * int(score / 5)
lines.append(f"| {cat.capitalize():10} | {score:5.1f} | {weight:.2f} | {contrib:5.1f} | {bar}")
lines.extend(["", "## Interpretation", ""])
if self.composite >= 80:
verdict = "Excellent — review is thorough across all categories."
elif self.composite >= 60:
verdict = "Good — major areas covered, some gaps remain."
elif self.composite >= 40:
verdict = "Fair — several categories need more attention."
else:
verdict = "Poor — review lacks depth in multiple critical areas."
lines.append(f"- {verdict}")
lines.append("")
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Core
# ---------------------------------------------------------------------------
def score_review(comments: List[str]) -> ReviewQualityReport:
"""Score a list of review comment bodies."""
# Extract individual comments (body strings)
bodies = [c.strip() for c in comments if c and c.strip()]
if not bodies:
bodies = ["(no substantive review comments found)"]
# Deduplicate to avoid counting +1 on same person repeating themselves
# Actually, keep all; depth naturally inflates with volume.
style_s = _category_score(bodies, STYLE_INDICATORS, WEIGHTS["style"])
logic_s = _category_score(bodies, LOGIC_INDICATORS, WEIGHTS["logic"])
sec_s = _category_score(bodies, SECURITY_INDICATORS, WEIGHTS["security"])
perf_s = _category_score(bodies, PERFORMANCE_INDICATORS, WEIGHTS["performance"])
tests_s = _category_score(bodies, TESTS_INDICATORS, WEIGHTS["tests"])
composite = round(
style_s * WEIGHTS["style"] +
logic_s * WEIGHTS["logic"] +
sec_s * WEIGHTS["security"] +
perf_s * WEIGHTS["performance"] +
tests_s * WEIGHTS["tests"],
1
)
return ReviewQualityReport(
style=style_s,
logic=logic_s,
security=sec_s,
performance=perf_s,
tests=tests_s,
composite=composite,
breakdown={k: round(v, 1) for k, v in [
("style", style_s), ("logic", logic_s), ("security", sec_s),
("performance", perf_s), ("tests", tests_s)]},
comment_count=len(bodies),
review_count=len(set(bodies)) # Approx unique reviews
)
# ---------------------------------------------------------------------------
# Gitea integration (optional — fetch PR comments)
# ---------------------------------------------------------------------------
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_pr_comments(self, org: str, repo: str, pr_number: int) -> List[Dict]:
"""Fetch review comments (not PR discussion comments)."""
comments = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/comments",
{"limit": 100, "page": page}
)
if not batch:
break
comments.extend(batch)
if len(batch) < 100:
break
page += 1
return comments
def fetch_review_comments(org: str, repo: str, pr_number: int, token: str) -> List[str]:
client = GiteaClient(token)
raw = client.get_pr_comments(org, repo, pr_number)
# Each comment object: {body, user, ...}
return [c.get("body", "") for c in raw if c.get("body")]
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Review Quality Scorer")
parser.add_argument("--input", help="JSON file with review findings (list of strings)")
parser.add_argument("--pr", type=int, help="PR number to fetch from Gitea")
parser.add_argument("--org", default="Timmy_Foundation", help="Gitea org")
parser.add_argument("--repo", default="compounding-intelligence", help="Gitea repo")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--output", default="metrics/review_quality_report.json", help="Output path")
parser.add_argument("--markdown", action="store_true", help="Emit human-readable markdown to stdout")
args = parser.parse_args()
# Load token if file path
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
# Get review bodies
if args.input:
with open(args.input) as f:
data = json.load(f)
if isinstance(data, dict) and "reviews" in data:
comments = data["reviews"]
elif isinstance(data, list):
comments = data
else:
print("ERROR: Input JSON must be a list or 'reviews' key", file=sys.stderr)
sys.exit(1)
elif args.pr:
if not token:
print("ERROR: Gitea token required for --pr", file=sys.stderr)
sys.exit(1)
comments = fetch_review_comments(args.org, args.repo, args.pr, token)
print(f"Fetched {len(comments)} review comments from PR #{args.pr}")
else:
print("ERROR: Must provide either --input or --pr", file=sys.stderr)
sys.exit(1)
# Score
report = score_review(comments)
# Output
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
with open(args.output, "w") as f:
json.dump(report.to_dict(), f, indent=2)
print(f"Report saved: {args.output}")
if args.markdown:
print(report.to_markdown())
else:
print(json.dumps(report.to_dict(), indent=2))
if __name__ == "__main__":
main()

View File

@@ -22,95 +22,114 @@ import sys
from pathlib import Path
from typing import Optional
from session_reader import extract_conversation, read_session
def compute_hash(text: str) -> str:
"""Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_conversation(conversation: list, session_id: str, model: str,
min_ratio: float = 1.5,
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a normalized conversation."""
"""Extract terse→rich pairs from a single session object."""
pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set()
for i, msg in enumerate(conversation):
# Look for assistant responses
if msg.get('role') != 'assistant':
for i, msg in enumerate(conversations):
# Look for assistant/gpt responses
if msg.get("from") not in ("gpt", "assistant"):
continue
response_text = msg.get('content', '')
response_text = msg.get("value", "")
if not response_text or len(response_text.split()) < min_response_words:
continue
# Find the preceding user message
# Find the preceding human message
prompt_text = ""
for j in range(i - 1, -1, -1):
if conversation[j].get('role') == 'user':
prompt_text = conversation[j].get('content', '')
if conversations[j].get("from") == "human":
prompt_text = conversations[j].get("value", "")
break
if not prompt_text:
continue
# Filter: skip tool results, system messages embedded as human
if prompt_text.startswith('{') and 'output' in prompt_text[:100]:
continue
if prompt_text.startswith('# SOUL.md') or prompt_text.startswith('You are'):
continue
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue # likely a tool result
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue # system prompt leak
# Quality filters
prompt_words = len(prompt_text.split())
response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0:
continue
ratio = response_words / prompt_words
if ratio < min_ratio:
continue
code_blocks = response_text.count('```')
if code_blocks >= 4 and len(response_text.replace('```', '').strip()) < 50:
# Skip responses that are mostly code
code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
continue
if 'tool_call' in response_text[:100] or 'function_call' in response_text[:100]:
# Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes:
continue
seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text
pairs.append({
'terse': prompt_text.strip(),
'rich': clean_response.strip(),
'source': session_id,
'model': model,
'prompt_words': prompt_words,
'response_words': response_words,
'ratio': round(ratio, 2),
"terse": prompt_text.strip(),
"rich": clean_response.strip(),
"source": session_id,
"model": model,
"prompt_words": prompt_words,
"response_words": response_words,
"ratio": round(ratio, 2),
})
return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
def extract_from_jsonl_file(path: str, **kwargs) -> list:
"""Read a session file and extract training pairs using normalized conversation."""
session_messages = read_session(path)
if not session_messages:
return []
conversation = extract_conversation(session_messages)
# Derive session_id and model from first real message metadata
first_msg = next((m for m in session_messages if m.get('role') or m.get('from')), {})
session_id = first_msg.get('meta_session_id', Path(path).name)
model = first_msg.get('model', 'unknown')
return extract_pairs_from_conversation(conversation, session_id, model, **kwargs)
if not path.exists():
print(f"Warning: {filepath} not found", file=sys.stderr)
return pairs
content = path.read_text()
lines = content.strip().split("\n")
for line in lines:
line = line.strip()
if not line:
continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def deduplicate_pairs(pairs: list) -> list:

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Tests for Review Quality Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from review_quality_scorer import (
score_review,
_category_presence_score,
_depth_score,
_category_score,
ReviewQualityReport,
STYLE_INDICATORS,
LOGIC_INDICATORS,
SECURITY_INDICATORS,
PERFORMANCE_INDICATORS,
TESTS_INDICATORS,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if abs(a - b) > 0.1:
raise AssertionError(f"{msg} expected ~{b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== Review Quality Scorer Tests ===\n")
print("-- Category Presence --")
@test("style keywords detected")
def _():
score = _category_presence_score(
["Please fix the formatting and run black."],
STYLE_INDICATORS
)
assert_true(score > 0, f"Style score should be > 0, got {score}")
@test("logic keywords detected")
def _():
score = _category_presence_score(
["Consider the edge case when input is empty.", "This algorithm is O(n^2)"],
LOGIC_INDICATORS
)
assert_true(score > 0, f"Logic score should be > 0")
@test("security keywords detected")
def _():
score = _category_presence_score(
["Potential SQL injection here.", "Don't hardcode secrets"],
SECURITY_INDICATORS
)
assert_true(score > 0)
@test("performance keywords detected")
def _():
score = _category_presence_score(
["This loop is a bottleneck.", "Memory usage could be optimized"],
PERFORMANCE_INDICATORS
)
assert_true(score > 0)
@test("tests keywords detected")
def _():
score = _category_presence_score(
["Add tests for this branch.", "Missing test coverage here"],
TESTS_INDICATORS
)
assert_true(score > 0)
@test("no keywords → score 0")
def _():
score = _category_presence_score(
["Looks good to me."],
STYLE_INDICATORS
)
assert_eq(score, 0.0)
print("\n-- Depth Scoring --")
@test("shallow comment → low depth")
def _():
d = _depth_score(["OK"])
assert_eq(d, 0.0)
@test("substantive comment → positive depth")
def _():
d = _depth_score([
"Please consider updating this logic: when x is zero we divide by zero. "
"Why not add an early return? This would fix the edge case."
])
assert_true(d > 0.3)
print("\n-- Category Score Integration --")
@test("thorough style review scores high on style")
def _():
comments = [
"The indentation is inconsistent — please run black to auto-format.",
"Function names are camelCase but should be snake_case per PEP8.",
"Trailing whitespace on several lines — please clean up.",
"Missing .gitignore would accidentally commit __pycache__ and .venv.",
"Consider adding a linter (flake8) to catch these style issues early.",
]
rpt = score_review(comments)
assert_true(rpt.style >= 50, f"Style score should be >= 50, got {rpt.style}")
@test("thorough logic review scores high on logic")
def _():
comments = [
"What happens if the input list is empty? The algorithm would crash.",
"This nested loop is O(n^2). Could we use a dictionary for O(n)?",
"Negative numbers aren't handled — possible overflow.",
"Consider the edge case where the user passes None.",
"Please add input validation at the start of the function.",
"Why not extract this into a pure function for easier testing?",
]
rpt = score_review(comments)
assert_true(rpt.logic >= 50)
@test("thorough security review scores high on security")
def _():
comments = [
"The SQL query uses string concatenation — vulnerable to SQL injection.",
"API token is hardcoded in source — move to environment variables.",
"Check for XSS when rendering user-provided HTML.",
"Are we validating all user inputs before processing?",
"Consider rate limiting to prevent abuse.",
"Ensure secrets are never committed to the repository.",
]
rpt = score_review(comments)
assert_true(rpt.security >= 50)
@test("combines multiple categories")
def _():
comments = [
"Please run black to auto-format. Also, the O(n²) loop here will hurt performance on large inputs.",
"Security risk: hardcoded API token. Style: inconsistent indentation. Logic: missing null check could crash.",
"Missing test coverage for edge cases. Also consider caching the result to improve performance.",
"Naming violates PEP8 (style). Edge case: negative inputs cause overflow (logic). Potential XSS when rendering user HTML (security).",
"Run a linter (style), add unit tests (tests), and check for memory leaks (performance).",
]
rpt = score_review(comments)
assert_true(rpt.style >= 50)
assert_true(rpt.logic >= 40)
assert_true(rpt.security >= 50)
assert_true(rpt.performance >= 50)
assert_true(rpt.tests >= 50)
@test("composite is weighted average")
def _():
# Generate a known distribution to verify math
comments = ["x"] * 20 # very shallow
rpt = score_review(comments)
# All categories should be equal-ish
assert_true(0 <= rpt.composite <= 100)
print("\n-- Edge Cases --")
@test("empty comments produces non-zero baseline")
def _():
rpt = score_review([])
assert_true(rpt.composite >= 0)
@test("single one-word comment → very low")
def _():
rpt = score_review(["OK"])
assert_true(rpt.composite < 40)
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,118 +0,0 @@
"""
Tests for session_pair_harvester — training pair extraction from sessions.
"""
import json
import tempfile
import unittest
from pathlib import Path
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from session_pair_harvester import (
extract_pairs_from_conversation,
extract_from_jsonl_file,
deduplicate_pairs,
compute_hash,
)
class TestSessionPairHarvester(unittest.TestCase):
def test_compute_hash_consistent(self):
h1 = compute_hash("hello world")
h2 = compute_hash("hello world")
self.assertEqual(h1, h2)
self.assertEqual(len(h1), 16)
def test_extract_simple_qa_pair(self):
"""A simple user→assistant exchange produces one pair."""
conversation = [
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris. It is a major European city renowned for its art, fashion, gastronomy, cultural heritage, and historical significance. The city attracts millions of tourists annually."},
]
pairs = extract_pairs_from_conversation(conversation, "test_session", "test-model")
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "What is the capital of France?")
self.assertIn("Paris", pairs[0]["rich"])
self.assertEqual(pairs[0]["source"], "test_session")
def test_min_ratio_filter(self):
"""Very short responses are filtered out."""
conversation = [
{"role": "user", "content": "Yes"},
{"role": "assistant", "content": "No."},
]
# Default min_ratio = 1.5, min_words = 20 for response
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 0)
def test_min_words_filter(self):
"""Assistant responses below min word count are skipped."""
conversation = [
{"role": "user", "content": "Explain the project architecture in detail"},
{"role": "assistant", "content": "OK."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=5)
self.assertEqual(len(pairs), 0)
def test_skip_non_assistant_messages(self):
"""System and tool messages are ignored."""
conversation = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there! How can I help you today?"},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "Hello")
def test_multiple_pairs_from_one_session(self):
"""A conversation with several Q&A turns yields multiple pairs."""
conversation = [
{"role": "user", "content": "First question?"},
{"role": "assistant", "content": "Here is a detailed and comprehensive answer that thoroughly explores multiple aspects of the subject. It provides background context and practical implications for the reader."},
{"role": "user", "content": "Second?"},
{"role": "assistant", "content": "Another comprehensive response with detailed examples. This includes practical code blocks and thorough explanations to ensure deep understanding of the topic at hand."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_ratio=1.0)
self.assertEqual(len(pairs), 2)
def test_deduplication_removes_duplicates(self):
"""Identical pairs across sessions are deduplicated."""
pairs = [
{"terse": "q1", "rich": "a1", "source": "s1", "model": "m"},
{"terse": "q1", "rich": "a1", "source": "s2", "model": "m"},
{"terse": "q2", "rich": "a2", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
self.assertEqual(len(unique), 2)
sources = {p["source"] for p in unique}
# First unique pair can be from either s1 or s2
self.assertIn("s1", sources)
def test_integration_with_test_sessions(self):
"""Harvester finds pairs in real test session files."""
repo_root = Path(__file__).parent.parent
test_sessions_dir = repo_root / "test_sessions"
if not test_sessions_dir.exists():
self.skipTest("test_sessions not found")
pairs = []
for jsonl_file in sorted(test_sessions_dir.glob("*.jsonl")):
pairs.extend(extract_from_jsonl_file(str(jsonl_file)))
self.assertGreater(len(pairs), 0, "Should extract at least one pair from test_sessions")
for p in pairs:
self.assertIn("terse", p)
self.assertIn("rich", p)
self.assertIn("source", p)
self.assertIn("model", p)
# Verify content exists
self.assertGreater(len(p["terse"]), 0)
self.assertGreater(len(p["rich"]), 0)
if __name__ == "__main__":
unittest.main()