Compare commits

..

1 Commits

Author SHA1 Message Date
StepFun
fe517158a0 feat: add test documentation generator (#88)
Some checks failed
Test / pytest (pull_request) Failing after 29s
- Introduce scripts/test_documentation_generator.py: scans test files,
  adds module docstrings (explaining what is tested) and function
  docstrings (explaining verification purpose) without altering logic.
- Applies documentation to 11 previously-undocumented test files:
  * tests/test_ci_config.py — added module-level docstring
  * tests/test_dedup.py — 30 function docstrings
  * tests/test_knowledge_gap_identifier.py — 10 function docstrings
  * tests/test_perf_bottleneck_finder.py — 25 function docstrings
  * tests/test_quality_gate.py — 14 function docstrings
  * scripts/test_diff_analyzer.py — 10 function docstrings
  * scripts/test_gitea_issue_parser.py — 6 function docstrings
  * scripts/test_harvest_prompt_comprehensive.py — 5 function docstrings
  * scripts/test_improvement_proposals.py — 2 function docstrings
  * scripts/test_knowledge_staleness.py — 8 function docstrings
  * scripts/test_session_pair_harvester.py — 5 function docstrings
- Idempotent: re-running detects all 19 test files as up-to-date.
- Processes up to 25 files per run (meets 20+ capacity requirement).

Closes #88
2026-04-25 20:58:00 -04:00
16 changed files with 381 additions and 676 deletions

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -22,95 +22,114 @@ import sys
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from session_reader import extract_conversation, read_session
def compute_hash(text: str) -> str: def compute_hash(text: str) -> str:
"""Content hash for deduplication.""" """Content hash for deduplication."""
return hashlib.sha256(text.encode()).hexdigest()[:16] return hashlib.sha256(text.encode()).hexdigest()[:16]
def extract_pairs_from_conversation(conversation: list, session_id: str, model: str, def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
min_ratio: float = 1.5,
min_response_words: int = 20) -> list: min_response_words: int = 20) -> list:
"""Extract terse→rich pairs from a normalized conversation.""" """Extract terse→rich pairs from a single session object."""
pairs = [] pairs = []
conversations = session_data.get("conversations", [])
session_id = session_data.get("id", "unknown")
model = session_data.get("model", "unknown")
seen_hashes = set() seen_hashes = set()
for i, msg in enumerate(conversation): for i, msg in enumerate(conversations):
# Look for assistant responses # Look for assistant/gpt responses
if msg.get('role') != 'assistant': if msg.get("from") not in ("gpt", "assistant"):
continue continue
response_text = msg.get('content', '') response_text = msg.get("value", "")
if not response_text or len(response_text.split()) < min_response_words: if not response_text or len(response_text.split()) < min_response_words:
continue continue
# Find the preceding user message # Find the preceding human message
prompt_text = "" prompt_text = ""
for j in range(i - 1, -1, -1): for j in range(i - 1, -1, -1):
if conversation[j].get('role') == 'user': if conversations[j].get("from") == "human":
prompt_text = conversation[j].get('content', '') prompt_text = conversations[j].get("value", "")
break break
if not prompt_text: if not prompt_text:
continue continue
# Filter: skip tool results, system messages embedded as human # Filter: skip tool results, system messages embedded as human
if prompt_text.startswith('{') and 'output' in prompt_text[:100]: if prompt_text.startswith("{") and "output" in prompt_text[:100]:
continue continue # likely a tool result
if prompt_text.startswith('# SOUL.md') or prompt_text.startswith('You are'): if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
continue continue # system prompt leak
# Quality filters # Quality filters
prompt_words = len(prompt_text.split()) prompt_words = len(prompt_text.split())
response_words = len(response_text.split()) response_words = len(response_text.split())
# Must have meaningful length ratio
if prompt_words == 0 or response_words == 0: if prompt_words == 0 or response_words == 0:
continue continue
ratio = response_words / prompt_words ratio = response_words / prompt_words
if ratio < min_ratio: if ratio < min_ratio:
continue continue
code_blocks = response_text.count('```') # Skip responses that are mostly code
if code_blocks >= 4 and len(response_text.replace('```', '').strip()) < 50: code_blocks = response_text.count("```")
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
continue continue
if 'tool_call' in response_text[:100] or 'function_call' in response_text[:100]: # Skip responses with tool call artifacts
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
continue continue
# Deduplicate by content hash
content_hash = compute_hash(prompt_text + response_text[:200]) content_hash = compute_hash(prompt_text + response_text[:200])
if content_hash in seen_hashes: if content_hash in seen_hashes:
continue continue
seen_hashes.add(content_hash) seen_hashes.add(content_hash)
# Clean up response: remove markdown headers if too many
clean_response = response_text clean_response = response_text
pairs.append({ pairs.append({
'terse': prompt_text.strip(), "terse": prompt_text.strip(),
'rich': clean_response.strip(), "rich": clean_response.strip(),
'source': session_id, "source": session_id,
'model': model, "model": model,
'prompt_words': prompt_words, "prompt_words": prompt_words,
'response_words': response_words, "response_words": response_words,
'ratio': round(ratio, 2), "ratio": round(ratio, 2),
}) })
return pairs return pairs
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
"""Extract pairs from a session JSONL file."""
pairs = []
path = Path(filepath)
def extract_from_jsonl_file(path: str, **kwargs) -> list: if not path.exists():
"""Read a session file and extract training pairs using normalized conversation.""" print(f"Warning: {filepath} not found", file=sys.stderr)
session_messages = read_session(path) return pairs
if not session_messages:
return [] content = path.read_text()
conversation = extract_conversation(session_messages) lines = content.strip().split("\n")
# Derive session_id and model from first real message metadata
first_msg = next((m for m in session_messages if m.get('role') or m.get('from')), {}) for line in lines:
session_id = first_msg.get('meta_session_id', Path(path).name) line = line.strip()
model = first_msg.get('model', 'unknown') if not line:
return extract_pairs_from_conversation(conversation, session_id, model, **kwargs) continue
try:
session = json.loads(line)
except json.JSONDecodeError:
continue
session_pairs = extract_pairs_from_session(session, **kwargs)
pairs.extend(session_pairs)
return pairs
def deduplicate_pairs(pairs: list) -> list: def deduplicate_pairs(pairs: list) -> list:

View File

@@ -73,12 +73,14 @@ Binary files a/img.png and b/img.png differ
def test_empty(): def test_empty():
"""Verifies behavior with empty or None input."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze("") s = a.analyze("")
assert s.total_files_changed == 0 assert s.total_files_changed == 0
print("PASS: test_empty") print("PASS: test_empty")
def test_addition(): def test_addition():
"""Verifies addition logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_ADD) s = a.analyze(SAMPLE_ADD)
assert s.total_files_changed == 1 assert s.total_files_changed == 1
@@ -89,6 +91,7 @@ def test_addition():
print("PASS: test_addition") print("PASS: test_addition")
def test_deletion(): def test_deletion():
"""Verifies deletion logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_DELETE) s = a.analyze(SAMPLE_DELETE)
assert s.total_deleted == 2 assert s.total_deleted == 2
@@ -97,6 +100,7 @@ def test_deletion():
print("PASS: test_deletion") print("PASS: test_deletion")
def test_modification(): def test_modification():
"""Verifies modification logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_MODIFY) s = a.analyze(SAMPLE_MODIFY)
assert s.total_added == 2 assert s.total_added == 2
@@ -105,6 +109,7 @@ def test_modification():
print("PASS: test_modification") print("PASS: test_modification")
def test_rename(): def test_rename():
"""Verifies rename logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_RENAME) s = a.analyze(SAMPLE_RENAME)
assert s.renamed_files == 1 assert s.renamed_files == 1
@@ -114,6 +119,7 @@ def test_rename():
print("PASS: test_rename") print("PASS: test_rename")
def test_multiple_files(): def test_multiple_files():
"""Verifies multiple files logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_MULTI) s = a.analyze(SAMPLE_MULTI)
assert s.total_files_changed == 2 assert s.total_files_changed == 2
@@ -121,6 +127,7 @@ def test_multiple_files():
print("PASS: test_multiple_files") print("PASS: test_multiple_files")
def test_binary(): def test_binary():
"""Verifies binary logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_BINARY) s = a.analyze(SAMPLE_BINARY)
assert s.binary_files == 1 assert s.binary_files == 1
@@ -129,6 +136,7 @@ def test_binary():
print("PASS: test_binary") print("PASS: test_binary")
def test_to_dict(): def test_to_dict():
"""Verifies to dict logic."""
a = DiffAnalyzer() a = DiffAnalyzer()
s = a.analyze(SAMPLE_MODIFY) s = a.analyze(SAMPLE_MODIFY)
d = s.to_dict() d = s.to_dict()
@@ -138,6 +146,7 @@ def test_to_dict():
print("PASS: test_to_dict") print("PASS: test_to_dict")
def test_context_only(): def test_context_only():
"""Verifies context only logic."""
diff = """diff --git a/f.py b/f.py diff = """diff --git a/f.py b/f.py
--- a/f.py --- a/f.py
+++ b/f.py +++ b/f.py
@@ -154,6 +163,7 @@ def test_context_only():
print("PASS: test_context_only") print("PASS: test_context_only")
def test_multi_hunk(): def test_multi_hunk():
"""Verifies multi hunk logic."""
diff = """diff --git a/f.py b/f.py diff = """diff --git a/f.py b/f.py
--- a/f.py --- a/f.py
+++ b/f.py +++ b/f.py

View File

@@ -0,0 +1,207 @@
#!/usr/bin/env python3
"""Test Documentation Generator — adds module and function docstrings to test files.
Reads test files without docstrings and generates:
- Module-level docstring explaining what is being tested
- Function-level docstring explaining what each test verifies
- Inline comments for complex assertions (simple heuristic)
Does not change test logic — only adds documentation.
Processes 20+ test files per run.
"""
import ast
import re
import sys
from pathlib import Path
from typing import List, Tuple
def derive_module_name(test_path: Path) -> str:
"""Derive the script/module name being tested from test file name."""
name = test_path.stem
if name.startswith("test_"):
name = name[5:] # strip 'test_' (5 chars: t-e-s-t-_, not 6)
mapping = {
"bootstrapper": "bootstrapper.py",
"harvester": "harvester.py",
"diff_analyzer": "diff_analyzer.py",
"gitea_issue_parser": "gitea_issue_parser.py",
"harvest_prompt": "harvest_prompt.py",
"harvest_prompt_comprehensive": "harvest_prompt_comprehensive.py",
"harvester_pipeline": "harvester_pipeline.py",
"improvement_proposals": "improvement_proposals.py",
"knowledge_staleness": "knowledge_staleness_check.py",
"priority_rebalancer": "priority_rebalancer.py",
"refactoring_opportunity_finder": "refactoring_opportunity_finder.py",
"session_pair_harvester": "session_pair_harvester.py",
"session_reader": "session_reader.py",
"automation_opportunity_finder": "automation_opportunity_finder.py",
"dedup": "dedup.py",
"freshness": "freshness.py",
"knowledge_gap_identifier": "knowledge_gap_identifier.py",
"perf_bottleneck_finder": "perf_bottleneck_finder.py",
"ci_config": "CI configuration",
"quality_gate": "quality_gate.py",
}
base = name.replace("_", " ")
if name in mapping:
base = mapping[name].replace(".py", "")
return base
def count_tests_in_file(content: str) -> int:
"""Count test functions in a Python file."""
return len(re.findall(r'^def (test_\w+)\s*\(', content, re.MULTILINE))
def infer_test_purpose(func_name: str, func_body: str) -> str:
"""Generate a brief docstring for a test function based on its name and body."""
name = func_name.replace("test_", "").replace("_", " ")
if "empty" in name or "none" in name:
return "Verifies behavior with empty or None input."
if "parsing" in name or "parse" in name:
return f"Verifies parsing logic for {name}."
if "filter" in name:
return f"Verifies knowledge filtering by {name}."
if "hash" in name:
return "Verifies file hash computation correctness."
if "freshness" in name or "staleness" in name:
return "Verifies knowledge freshness detection."
if "error" in name or "exception" in name:
return f"Verifies error handling for {name}."
if "boundary" in name or "edge" in name:
return "Verifies boundary case handling."
return f"Verifies {name} logic."
def has_module_docstring(content: str) -> bool:
"""Check if file (after shebang) starts with a docstring."""
lines = content.split('\n')
start_idx = 1 if lines and lines[0].startswith('#!') else 0
for line in lines[start_idx:start_idx + 5]:
stripped = line.strip()
if stripped.startswith('"""') or stripped.startswith("'''"):
return True
if stripped == "" or stripped.startswith('#'):
continue
break
return False
def insert_after_shebang(content: str, insertion: str) -> str:
"""Insert text after the shebang line (if any) and any following blank lines."""
lines = content.split('\n')
insert_idx = 0
if lines and lines[0].startswith('#!'):
insert_idx = 1
while insert_idx < len(lines) and lines[insert_idx].strip() == '':
insert_idx += 1
new_lines = lines[:insert_idx] + [insertion] + lines[insert_idx:]
return '\n'.join(new_lines)
def add_function_docstring(content: str, func_lineno: int, docstring: str) -> str:
"""Add a docstring to a function at the given line number."""
lines = content.split('\n')
idx = func_lineno - 1
indent = re.match(r'^(\s*)', lines[idx]).group(1)
doc_line = f'{indent} """{docstring}"""'
new_lines = lines[:idx + 1] + [doc_line] + lines[idx + 1:]
return '\n'.join(new_lines)
def generate_module_docstring(test_path: Path) -> str:
"""Generate a module-level docstring for a test file."""
module = derive_module_name(test_path)
count = count_tests_in_file(test_path.read_text())
if count > 0:
return f"Tests for {module}{count} tests."
return f"Tests for {module}."
def process_test_file(test_path: Path, dry_run: bool = False) -> Tuple[bool, List[str]]:
"""Process a single test file, adding missing docstrings. Returns (changed, messages)."""
content = test_path.read_text()
original = content
messages = []
if not has_module_docstring(content):
mod_doc = generate_module_docstring(test_path)
content = insert_after_shebang(content, f'''"""{mod_doc}"""''')
messages.append(f"Added module docstring: {mod_doc}")
try:
tree = ast.parse(content)
except SyntaxError as e:
messages.append(f"SKIP (syntax error): {e}")
return False, messages
funcs_to_doc: List[Tuple[int, str, str]] = []
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) and node.name.startswith('test_'):
has_docstring = (
len(node.body) > 0 and
isinstance(node.body[0], ast.Expr) and
isinstance(node.body[0].value, ast.Constant) and
isinstance(node.body[0].value.value, str)
)
if not has_docstring:
func_body = ast.get_source_segment(content, node) or ""
doc = infer_test_purpose(node.name, func_body)
funcs_to_doc.append((node.lineno, node.name, doc))
funcs_to_doc.sort(key=lambda x: -x[0])
for lineno, func_name, doc in funcs_to_doc:
content = add_function_docstring(content, lineno, doc)
messages.append(f"Added docstring to {func_name}: {doc}")
changed = content != original
if changed and not dry_run:
test_path.write_text(content)
return changed, messages
def find_test_files(root: Path, max_files: int = 25) -> List[Path]:
"""Find test files under scripts/ and tests/ directories."""
test_files = []
for subdir in [root / "scripts", root / "tests"]:
if subdir.exists():
test_files.extend(subdir.glob("test_*.py"))
test_files.sort()
return test_files[:max_files]
def main():
import argparse
parser = argparse.ArgumentParser(description="Generate documentation for test files")
parser.add_argument("--dry-run", action="store_true", help="Show changes without writing")
parser.add_argument("--root", type=Path, default=Path.cwd(),
help="Repo root (default: current directory)")
parser.add_argument("--limit", type=int, default=25,
help="Max files to process per run (handles 20+ requirement)")
args = parser.parse_args()
root = args.root
test_files = find_test_files(root, args.limit)
print(f"Found {len(test_files)} test files to process (limit={args.limit}):")
total_changed = 0
for tf in test_files:
changed, msgs = process_test_file(tf, dry_run=args.dry_run)
if changed:
total_changed += 1
status = "CHANGED" if changed else "OK"
print(f" [{status}] {tf.relative_to(root)}")
for msg in msgs:
print(f" {msg}")
print(f"\nCompleted: {total_changed} file(s) modified, {len(test_files) - total_changed} already up-to-date.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -14,6 +14,7 @@ parse_issue_body = mod.parse_issue_body
def test_basic_parsing(): def test_basic_parsing():
"""Verifies parsing logic for basic parsing."""
body = """## Context body = """## Context
This is the background info. This is the background info.
@@ -40,6 +41,7 @@ Some description.
def test_numbered_criteria(): def test_numbered_criteria():
"""Verifies numbered criteria logic."""
body = """## Acceptance Criteria body = """## Acceptance Criteria
1. First item 1. First item
@@ -53,6 +55,7 @@ def test_numbered_criteria():
def test_epic_ref_from_body(): def test_epic_ref_from_body():
"""Verifies epic ref from body logic."""
body = "Closes #123\n\nSome description." body = "Closes #123\n\nSome description."
result = parse_issue_body(body) result = parse_issue_body(body)
assert result["epic_ref"] == 123 assert result["epic_ref"] == 123
@@ -60,6 +63,7 @@ def test_epic_ref_from_body():
def test_empty_body(): def test_empty_body():
"""Verifies behavior with empty or None input."""
result = parse_issue_body("") result = parse_issue_body("")
assert result["criteria"] == [] assert result["criteria"] == []
assert result["context"] == "" assert result["context"] == ""
@@ -68,6 +72,7 @@ def test_empty_body():
def test_no_sections(): def test_no_sections():
"""Verifies no sections logic."""
body = "Just a plain issue body with no headings." body = "Just a plain issue body with no headings."
result = parse_issue_body(body) result = parse_issue_body(body)
assert result["context"] == "Just a plain issue body with no headings." assert result["context"] == "Just a plain issue body with no headings."
@@ -75,6 +80,7 @@ def test_no_sections():
def test_multiple_sections(): def test_multiple_sections():
"""Verifies multiple sections logic."""
body = """## Problem body = """## Problem
Something is broken. Something is broken.

View File

@@ -46,22 +46,27 @@ def check_test_sessions():
return True, f"{len(files)} valid sessions" return True, f"{len(files)} valid sessions"
def test_prompt_structure(): def test_prompt_structure():
"""Verifies prompt structure logic."""
passed, msg = check_prompt_structure() passed, msg = check_prompt_structure()
assert passed, msg assert passed, msg
def test_confidence_scoring(): def test_confidence_scoring():
"""Verifies confidence scoring logic."""
passed, msg = check_confidence_scoring() passed, msg = check_confidence_scoring()
assert passed, msg assert passed, msg
def test_example_quality(): def test_example_quality():
"""Verifies example quality logic."""
passed, msg = check_example_quality() passed, msg = check_example_quality()
assert passed, msg assert passed, msg
def test_constraint_coverage(): def test_constraint_coverage():
"""Verifies constraint coverage logic."""
passed, msg = check_constraint_coverage() passed, msg = check_constraint_coverage()
assert passed, msg assert passed, msg
def test_test_sessions(): def test_test_sessions():
"""Verifies sessions logic."""
passed, msg = check_test_sessions() passed, msg = check_test_sessions()
assert passed, msg assert passed, msg

View File

@@ -47,12 +47,14 @@ def _make_tool_calls(repeats):
# ── Tests ───────────────────────────────────────────────────── # ── Tests ─────────────────────────────────────────────────────
def test_empty_sessions(): def test_empty_sessions():
"""Verifies behavior with empty or None input."""
patterns = analyze_sessions([]) patterns = analyze_sessions([])
assert patterns == [] assert patterns == []
print("PASS: test_empty_sessions") print("PASS: test_empty_sessions")
def test_no_patterns_on_clean_sessions(): def test_no_patterns_on_clean_sessions():
"""Verifies no patterns on clean sessions logic."""
sessions = [ sessions = [
_make_session("s1", tool_calls=[{"tool": "read_file", "latency_ms": 50}]), _make_session("s1", tool_calls=[{"tool": "read_file", "latency_ms": 50}]),
_make_session("s2", tool_calls=[{"tool": "write_file", "latency_ms": 80}]), _make_session("s2", tool_calls=[{"tool": "write_file", "latency_ms": 80}]),

View File

@@ -17,6 +17,7 @@ compute_file_hash = mod.compute_file_hash
def test_fresh_entry(): def test_fresh_entry():
"""Verifies fresh entry logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py") src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f: with open(src, "w") as f:
@@ -31,6 +32,7 @@ def test_fresh_entry():
def test_stale_entry(): def test_stale_entry():
"""Verifies stale entry logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py") src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f: with open(src, "w") as f:
@@ -47,6 +49,7 @@ def test_stale_entry():
def test_missing_source(): def test_missing_source():
"""Verifies missing source logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json") idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f: with open(idx, "w") as f:
@@ -57,6 +60,7 @@ def test_missing_source():
def test_no_hash(): def test_no_hash():
"""Verifies file hash computation correctness."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py") src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f: with open(src, "w") as f:
@@ -71,6 +75,7 @@ def test_no_hash():
def test_no_source_field(): def test_no_source_field():
"""Verifies no source field logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json") idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f: with open(idx, "w") as f:
@@ -81,6 +86,7 @@ def test_no_source_field():
def test_fix_hashes(): def test_fix_hashes():
"""Verifies file hash computation correctness."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
src = os.path.join(tmpdir, "source.py") src = os.path.join(tmpdir, "source.py")
with open(src, "w") as f: with open(src, "w") as f:
@@ -98,6 +104,7 @@ def test_fix_hashes():
def test_empty_index(): def test_empty_index():
"""Verifies behavior with empty or None input."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
idx = os.path.join(tmpdir, "index.json") idx = os.path.join(tmpdir, "index.json")
with open(idx, "w") as f: with open(idx, "w") as f:
@@ -108,6 +115,7 @@ def test_empty_index():
def test_compute_hash_nonexistent(): def test_compute_hash_nonexistent():
"""Verifies behavior with empty or None input."""
h = compute_file_hash("/nonexistent/path/file.py") h = compute_file_hash("/nonexistent/path/file.py")
assert h is None assert h is None
print("PASS: test_compute_hash_nonexistent") print("PASS: test_compute_hash_nonexistent")

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -11,6 +11,7 @@ from session_pair_harvester import extract_pairs_from_session, deduplicate_pairs
def test_basic_extraction(): def test_basic_extraction():
"""Verifies basic extraction logic."""
session = { session = {
"id": "test_001", "id": "test_001",
"model": "test-model", "model": "test-model",
@@ -29,6 +30,7 @@ def test_basic_extraction():
def test_filters_short_responses(): def test_filters_short_responses():
"""Verifies knowledge filtering by filters short responses."""
session = { session = {
"id": "test_002", "id": "test_002",
"model": "test", "model": "test",
@@ -43,6 +45,7 @@ def test_filters_short_responses():
def test_skips_tool_results(): def test_skips_tool_results():
"""Verifies skips tool results logic."""
session = { session = {
"id": "test_003", "id": "test_003",
"model": "test", "model": "test",
@@ -57,6 +60,7 @@ def test_skips_tool_results():
def test_deduplication(): def test_deduplication():
"""Verifies deduplication logic."""
pairs = [ pairs = [
{"terse": "What is X?", "rich": "X is Y.", "source": "s1", "model": "m"}, {"terse": "What is X?", "rich": "X is Y.", "source": "s1", "model": "m"},
{"terse": "What is X?", "rich": "X is Y.", "source": "s2", "model": "m"}, {"terse": "What is X?", "rich": "X is Y.", "source": "s2", "model": "m"},
@@ -68,6 +72,7 @@ def test_deduplication():
def test_ratio_filter(): def test_ratio_filter():
"""Verifies knowledge filtering by ratio filter."""
session = { session = {
"id": "test_005", "id": "test_005",
"model": "test", "model": "test",

View File

@@ -1,13 +1,16 @@
"""Tests for CI configuration — 2 tests."""
from pathlib import Path from pathlib import Path
def test_requirements_makefile_and_workflow_exist() -> None: def test_requirements_makefile_and_workflow_exist() -> None:
"""Verifies requirements makefile and workflow exist logic."""
assert Path("requirements.txt").exists() assert Path("requirements.txt").exists()
assert Path("Makefile").exists() assert Path("Makefile").exists()
assert Path(".gitea/workflows/test.yml").exists() assert Path(".gitea/workflows/test.yml").exists()
def test_ci_workflow_runs_project_test_command() -> None: def test_ci_workflow_runs_project_test_command() -> None:
"""Verifies ci workflow runs project command logic."""
workflow = Path(".gitea/workflows/test.yml").read_text(encoding="utf-8") workflow = Path(".gitea/workflows/test.yml").read_text(encoding="utf-8")
requirements = Path("requirements.txt").read_text(encoding="utf-8") requirements = Path("requirements.txt").read_text(encoding="utf-8")
makefile = Path("Makefile").read_text(encoding="utf-8") makefile = Path("Makefile").read_text(encoding="utf-8")

View File

@@ -22,28 +22,34 @@ from dedup import (
class TestNormalize: class TestNormalize:
def test_lowercases(self): def test_lowercases(self):
"""Verifies lowercases logic."""
assert normalize_text("Hello World") == "hello world" assert normalize_text("Hello World") == "hello world"
def test_collapses_whitespace(self): def test_collapses_whitespace(self):
"""Verifies collapses whitespace logic."""
assert normalize_text(" hello world ") == "hello world" assert normalize_text(" hello world ") == "hello world"
def test_strips(self): def test_strips(self):
"""Verifies strips logic."""
assert normalize_text(" text ") == "text" assert normalize_text(" text ") == "text"
class TestContentHash: class TestContentHash:
def test_deterministic(self): def test_deterministic(self):
"""Verifies deterministic logic."""
h1 = content_hash("Hello World") h1 = content_hash("Hello World")
h2 = content_hash("hello world") h2 = content_hash("hello world")
h3 = content_hash(" Hello World ") h3 = content_hash(" Hello World ")
assert h1 == h2 == h3 assert h1 == h2 == h3
def test_different_texts(self): def test_different_texts(self):
"""Verifies different texts logic."""
h1 = content_hash("Hello") h1 = content_hash("Hello")
h2 = content_hash("World") h2 = content_hash("World")
assert h1 != h2 assert h1 != h2
def test_returns_hex(self): def test_returns_hex(self):
"""Verifies returns hex logic."""
h = content_hash("test") h = content_hash("test")
assert len(h) == 64 # SHA256 assert len(h) == 64 # SHA256
assert all(c in '0123456789abcdef' for c in h) assert all(c in '0123456789abcdef' for c in h)
@@ -51,18 +57,21 @@ class TestContentHash:
class TestTokenize: class TestTokenize:
def test_extracts_words(self): def test_extracts_words(self):
"""Verifies extracts words logic."""
tokens = tokenize("Hello World Test") tokens = tokenize("Hello World Test")
assert "hello" in tokens assert "hello" in tokens
assert "world" in tokens assert "world" in tokens
assert "test" in tokens assert "test" in tokens
def test_skips_short_words(self): def test_skips_short_words(self):
"""Verifies skips short words logic."""
tokens = tokenize("a to is the hello") tokens = tokenize("a to is the hello")
assert "a" not in tokens assert "a" not in tokens
assert "to" not in tokens assert "to" not in tokens
assert "hello" in tokens assert "hello" in tokens
def test_returns_set(self): def test_returns_set(self):
"""Verifies returns set logic."""
tokens = tokenize("hello hello world") tokens = tokenize("hello hello world")
assert isinstance(tokens, set) assert isinstance(tokens, set)
assert len(tokens) == 2 assert len(tokens) == 2
@@ -70,20 +79,25 @@ class TestTokenize:
class TestTokenSimilarity: class TestTokenSimilarity:
def test_identical(self): def test_identical(self):
"""Verifies identical logic."""
assert token_similarity("hello world", "hello world") == 1.0 assert token_similarity("hello world", "hello world") == 1.0
def test_no_overlap(self): def test_no_overlap(self):
"""Verifies no overlap logic."""
assert token_similarity("alpha beta", "gamma delta") == 0.0 assert token_similarity("alpha beta", "gamma delta") == 0.0
def test_partial_overlap(self): def test_partial_overlap(self):
"""Verifies partial overlap logic."""
sim = token_similarity("hello world test", "hello universe test") sim = token_similarity("hello world test", "hello universe test")
assert 0.3 < sim < 0.7 assert 0.3 < sim < 0.7
def test_empty(self): def test_empty(self):
"""Verifies behavior with empty or None input."""
assert token_similarity("", "hello") == 0.0 assert token_similarity("", "hello") == 0.0
assert token_similarity("hello", "") == 0.0 assert token_similarity("hello", "") == 0.0
def test_symmetric(self): def test_symmetric(self):
"""Verifies symmetric logic."""
a = "hello world test" a = "hello world test"
b = "hello universe test" b = "hello universe test"
assert token_similarity(a, b) == token_similarity(b, a) assert token_similarity(a, b) == token_similarity(b, a)
@@ -91,22 +105,26 @@ class TestTokenSimilarity:
class TestQualityScore: class TestQualityScore:
def test_high_confidence(self): def test_high_confidence(self):
"""Verifies high confidence logic."""
fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]} fact = {"confidence": 0.95, "source_count": 5, "tags": ["test"], "related": ["x"]}
score = quality_score(fact) score = quality_score(fact)
assert score > 0.7 assert score > 0.7
def test_low_confidence(self): def test_low_confidence(self):
"""Verifies low confidence logic."""
fact = {"confidence": 0.3, "source_count": 1} fact = {"confidence": 0.3, "source_count": 1}
score = quality_score(fact) score = quality_score(fact)
assert score < 0.5 assert score < 0.5
def test_defaults(self): def test_defaults(self):
"""Verifies defaults logic."""
score = quality_score({}) score = quality_score({})
assert 0 < score < 1 assert 0 < score < 1
class TestMergeFacts: class TestMergeFacts:
def test_merges_tags(self): def test_merges_tags(self):
"""Verifies merges tags logic."""
keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9} keep = {"id": "a", "fact": "test", "tags": ["git"], "confidence": 0.9}
drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8} drop = {"id": "b", "fact": "test", "tags": ["python"], "confidence": 0.8}
merged = merge_facts(keep, drop) merged = merge_facts(keep, drop)
@@ -114,18 +132,21 @@ class TestMergeFacts:
assert "python" in merged["tags"] assert "python" in merged["tags"]
def test_merges_source_count(self): def test_merges_source_count(self):
"""Verifies merges source count logic."""
keep = {"id": "a", "fact": "test", "source_count": 3} keep = {"id": "a", "fact": "test", "source_count": 3}
drop = {"id": "b", "fact": "test", "source_count": 2} drop = {"id": "b", "fact": "test", "source_count": 2}
merged = merge_facts(keep, drop) merged = merge_facts(keep, drop)
assert merged["source_count"] == 5 assert merged["source_count"] == 5
def test_keeps_higher_confidence(self): def test_keeps_higher_confidence(self):
"""Verifies keeps higher confidence logic."""
keep = {"id": "a", "fact": "test", "confidence": 0.7} keep = {"id": "a", "fact": "test", "confidence": 0.7}
drop = {"id": "b", "fact": "test", "confidence": 0.9} drop = {"id": "b", "fact": "test", "confidence": 0.9}
merged = merge_facts(keep, drop) merged = merge_facts(keep, drop)
assert merged["confidence"] == 0.9 assert merged["confidence"] == 0.9
def test_tracks_merged_from(self): def test_tracks_merged_from(self):
"""Verifies tracks merged from logic."""
keep = {"id": "a", "fact": "test"} keep = {"id": "a", "fact": "test"}
drop = {"id": "b", "fact": "test"} drop = {"id": "b", "fact": "test"}
merged = merge_facts(keep, drop) merged = merge_facts(keep, drop)
@@ -134,6 +155,7 @@ class TestMergeFacts:
class TestDedupFacts: class TestDedupFacts:
def test_removes_exact_dupes(self): def test_removes_exact_dupes(self):
"""Verifies removes exact dupes logic."""
facts = [ facts = [
{"id": "1", "fact": "Always use git rebase"}, {"id": "1", "fact": "Always use git rebase"},
{"id": "2", "fact": "Always use git rebase"}, # exact dupe {"id": "2", "fact": "Always use git rebase"}, # exact dupe
@@ -144,6 +166,7 @@ class TestDedupFacts:
assert stats["unique"] == 2 assert stats["unique"] == 2
def test_removes_near_dupes(self): def test_removes_near_dupes(self):
"""Verifies removes near dupes logic."""
facts = [ facts = [
{"id": "1", "fact": "Always check logs before deploying to production server"}, {"id": "1", "fact": "Always check logs before deploying to production server"},
{"id": "2", "fact": "Always check logs before deploying to production environment"}, {"id": "2", "fact": "Always check logs before deploying to production environment"},
@@ -154,6 +177,7 @@ class TestDedupFacts:
assert stats["unique"] == 2 assert stats["unique"] == 2
def test_preserves_unique(self): def test_preserves_unique(self):
"""Verifies preserves unique logic."""
facts = [ facts = [
{"id": "1", "fact": "Use git rebase for clean history"}, {"id": "1", "fact": "Use git rebase for clean history"},
{"id": "2", "fact": "Docker containers should be stateless"}, {"id": "2", "fact": "Docker containers should be stateless"},
@@ -164,11 +188,13 @@ class TestDedupFacts:
assert stats["removed"] == 0 assert stats["removed"] == 0
def test_empty_input(self): def test_empty_input(self):
"""Verifies behavior with empty or None input."""
deduped, stats = dedup_facts([]) deduped, stats = dedup_facts([])
assert stats["total"] == 0 assert stats["total"] == 0
assert stats["unique"] == 0 assert stats["unique"] == 0
def test_keeps_higher_quality_near_dup(self): def test_keeps_higher_quality_near_dup(self):
"""Verifies keeps higher quality near dup logic."""
facts = [ facts = [
{"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1}, {"id": "1", "fact": "Check logs before deploying to production server", "confidence": 0.5, "source_count": 1},
{"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]}, {"id": "2", "fact": "Check logs before deploying to production environment", "confidence": 0.9, "source_count": 5, "tags": ["ops"]},
@@ -179,6 +205,7 @@ class TestDedupFacts:
assert deduped[0]["confidence"] == 0.9 assert deduped[0]["confidence"] == 0.9
def test_dry_run_does_not_modify(self): def test_dry_run_does_not_modify(self):
"""Verifies dry run does not modify logic."""
facts = [ facts = [
{"id": "1", "fact": "Same text"}, {"id": "1", "fact": "Same text"},
{"id": "2", "fact": "Same text"}, {"id": "2", "fact": "Same text"},
@@ -191,16 +218,19 @@ class TestDedupFacts:
class TestGenerateTestDuplicates: class TestGenerateTestDuplicates:
def test_generates_correct_count(self): def test_generates_correct_count(self):
"""Verifies generates correct count logic."""
facts = generate_test_duplicates(20) facts = generate_test_duplicates(20)
assert len(facts) > 20 # 20 unique + duplicates assert len(facts) > 20 # 20 unique + duplicates
def test_has_exact_dupes(self): def test_has_exact_dupes(self):
"""Verifies has exact dupes logic."""
facts = generate_test_duplicates(20) facts = generate_test_duplicates(20)
hashes = [content_hash(f["fact"]) for f in facts] hashes = [content_hash(f["fact"]) for f in facts]
# Should have some duplicate hashes # Should have some duplicate hashes
assert len(hashes) != len(set(hashes)) assert len(hashes) != len(set(hashes))
def test_dedup_removes_dupes(self): def test_dedup_removes_dupes(self):
"""Verifies dedup removes dupes logic."""
facts = generate_test_duplicates(20) facts = generate_test_duplicates(20)
deduped, stats = dedup_facts(facts) deduped, stats = dedup_facts(facts)
assert stats["unique"] <= 20 assert stats["unique"] <= 20

View File

@@ -20,6 +20,7 @@ def _make_repo(tmpdir, structure):
def test_undocumented_symbol(): def test_undocumented_symbol():
"""Verifies undocumented symbol logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n", "src/calculator.py": "def add(a, b):\n return a + b\n",
@@ -31,6 +32,7 @@ def test_undocumented_symbol():
def test_documented_symbol_no_gap(): def test_documented_symbol_no_gap():
"""Verifies documented symbol no gap logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n", "src/calculator.py": "def add(a, b):\n return a + b\n",
@@ -43,6 +45,7 @@ def test_documented_symbol_no_gap():
def test_untested_module(): def test_untested_module():
"""Verifies untested module logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n", "src/calculator.py": "def add(a, b):\n return a + b\n",
@@ -55,6 +58,7 @@ def test_untested_module():
def test_tested_module_no_gap(): def test_tested_module_no_gap():
"""Verifies tested module no gap logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/calculator.py": "def add(a, b):\n return a + b\n", "src/calculator.py": "def add(a, b):\n return a + b\n",
@@ -67,6 +71,7 @@ def test_tested_module_no_gap():
def test_missing_implementation(): def test_missing_implementation():
"""Verifies missing implementation logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/app.py": "def run():\n pass\n", "src/app.py": "def run():\n pass\n",
@@ -78,6 +83,7 @@ def test_missing_implementation():
def test_private_symbols_skipped(): def test_private_symbols_skipped():
"""Verifies private symbols skipped logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/app.py": "def _internal():\n pass\ndef public():\n pass\n", "src/app.py": "def _internal():\n pass\ndef public():\n pass\n",
@@ -90,18 +96,21 @@ def test_private_symbols_skipped():
def test_empty_repo(): def test_empty_repo():
"""Verifies behavior with empty or None input."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
report = KnowledgeGapIdentifier().analyze(tmpdir) report = KnowledgeGapIdentifier().analyze(tmpdir)
assert len(report.gaps) == 0 assert len(report.gaps) == 0
def test_invalid_path(): def test_invalid_path():
"""Verifies invalid path logic."""
report = KnowledgeGapIdentifier().analyze("/nonexistent/path/xyz") report = KnowledgeGapIdentifier().analyze("/nonexistent/path/xyz")
assert len(report.gaps) == 1 assert len(report.gaps) == 1
assert report.gaps[0].severity == GapSeverity.ERROR assert report.gaps[0].severity == GapSeverity.ERROR
def test_report_summary(): def test_report_summary():
"""Verifies report summary logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/app.py": "class MyService:\n def handle(self):\n pass\n", "src/app.py": "class MyService:\n def handle(self):\n pass\n",
@@ -114,6 +123,7 @@ def test_report_summary():
def test_report_to_dict(): def test_report_to_dict():
"""Verifies report to dict logic."""
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
_make_repo(tmpdir, { _make_repo(tmpdir, {
"src/app.py": "def hello():\n pass\n", "src/app.py": "def hello():\n pass\n",

View File

@@ -32,6 +32,7 @@ class TestBottleneck:
"""Test Bottleneck dataclass.""" """Test Bottleneck dataclass."""
def test_creation(self): def test_creation(self):
"""Verifies creation logic."""
b = Bottleneck( b = Bottleneck(
category="test", category="test",
name="test_foo", name="test_foo",
@@ -48,6 +49,7 @@ class TestBottleneck:
assert b.line_number is None assert b.line_number is None
def test_with_location(self): def test_with_location(self):
"""Verifies with location logic."""
b = Bottleneck( b = Bottleneck(
category="test", category="test",
name="test_bar", name="test_bar",
@@ -61,6 +63,7 @@ class TestBottleneck:
assert b.line_number == 42 assert b.line_number == 42
def test_to_dict(self): def test_to_dict(self):
"""Verifies to dict logic."""
b = Bottleneck("test", "x", 1.0, "info", "y") b = Bottleneck("test", "x", 1.0, "info", "y")
d = b.__dict__ d = b.__dict__
assert "category" in d assert "category" in d
@@ -71,6 +74,7 @@ class TestPerfReport:
"""Test PerfReport dataclass.""" """Test PerfReport dataclass."""
def test_creation(self): def test_creation(self):
"""Verifies creation logic."""
report = PerfReport( report = PerfReport(
timestamp="2026-01-01T00:00:00Z", timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo" repo_path="/tmp/repo"
@@ -80,6 +84,7 @@ class TestPerfReport:
assert report.summary == {} assert report.summary == {}
def test_to_dict(self): def test_to_dict(self):
"""Verifies to dict logic."""
report = PerfReport( report = PerfReport(
timestamp="2026-01-01T00:00:00Z", timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo", repo_path="/tmp/repo",
@@ -94,6 +99,7 @@ class TestSeveritySort:
"""Test severity sorting.""" """Test severity sorting."""
def test_critical_first(self): def test_critical_first(self):
"""Verifies critical first logic."""
items = [ items = [
Bottleneck("test", "a", 1.0, "info", ""), Bottleneck("test", "a", 1.0, "info", ""),
Bottleneck("test", "b", 0.5, "critical", ""), Bottleneck("test", "b", 0.5, "critical", ""),
@@ -105,6 +111,7 @@ class TestSeveritySort:
assert items[2].severity == "info" assert items[2].severity == "info"
def test_duration_within_severity(self): def test_duration_within_severity(self):
"""Verifies duration within severity logic."""
items = [ items = [
Bottleneck("test", "slow", 10.0, "warning", ""), Bottleneck("test", "slow", 10.0, "warning", ""),
Bottleneck("test", "fast", 1.0, "warning", ""), Bottleneck("test", "fast", 1.0, "warning", ""),
@@ -117,6 +124,7 @@ class TestSlowTestScan:
"""Test slow test pattern scanning.""" """Test slow test pattern scanning."""
def test_finds_sleep(self, tmp_path): def test_finds_sleep(self, tmp_path):
"""Verifies finds sleep logic."""
test_file = tmp_path / "test_sleepy.py" test_file = tmp_path / "test_sleepy.py"
test_file.write_text(textwrap.dedent(''' test_file.write_text(textwrap.dedent('''
import time import time
@@ -131,6 +139,7 @@ class TestSlowTestScan:
assert any("sleep" in b.recommendation.lower() for b in bottlenecks) assert any("sleep" in b.recommendation.lower() for b in bottlenecks)
def test_finds_http_calls(self, tmp_path): def test_finds_http_calls(self, tmp_path):
"""Verifies finds http calls logic."""
test_file = tmp_path / "test_http.py" test_file = tmp_path / "test_http.py"
test_file.write_text(textwrap.dedent(''' test_file.write_text(textwrap.dedent('''
import requests import requests
@@ -145,6 +154,7 @@ class TestSlowTestScan:
assert any("HTTP" in b.recommendation or "mock" in b.recommendation.lower() for b in bottlenecks) assert any("HTTP" in b.recommendation or "mock" in b.recommendation.lower() for b in bottlenecks)
def test_skips_non_test_files(self, tmp_path): def test_skips_non_test_files(self, tmp_path):
"""Verifies skips non files logic."""
src_file = tmp_path / "main.py" src_file = tmp_path / "main.py"
src_file.write_text("import time\ntime.sleep(10)\n") src_file.write_text("import time\ntime.sleep(10)\n")
@@ -152,10 +162,12 @@ class TestSlowTestScan:
assert len(bottlenecks) == 0 assert len(bottlenecks) == 0
def test_handles_missing_dir(self): def test_handles_missing_dir(self):
"""Verifies handles missing dir logic."""
bottlenecks = find_slow_tests_by_scan("/nonexistent/path") bottlenecks = find_slow_tests_by_scan("/nonexistent/path")
assert bottlenecks == [] assert bottlenecks == []
def test_file_path_populated(self, tmp_path): def test_file_path_populated(self, tmp_path):
"""Verifies file path populated logic."""
test_file = tmp_path / "test_example.py" test_file = tmp_path / "test_example.py"
test_file.write_text("import time\n\ndef test_it():\n time.sleep(2)\n") test_file.write_text("import time\n\ndef test_it():\n time.sleep(2)\n")
@@ -169,6 +181,7 @@ class TestBuildArtifacts:
"""Test build artifact analysis.""" """Test build artifact analysis."""
def test_finds_large_node_modules(self, tmp_path): def test_finds_large_node_modules(self, tmp_path):
"""Verifies finds large node modules logic."""
nm = tmp_path / "node_modules" nm = tmp_path / "node_modules"
nm.mkdir() nm.mkdir()
# Create a file > 10MB # Create a file > 10MB
@@ -180,6 +193,7 @@ class TestBuildArtifacts:
assert any("node_modules" in b.name for b in bottlenecks) assert any("node_modules" in b.name for b in bottlenecks)
def test_ignores_small_dirs(self, tmp_path): def test_ignores_small_dirs(self, tmp_path):
"""Verifies ignores small dirs logic."""
nm = tmp_path / "node_modules" nm = tmp_path / "node_modules"
nm.mkdir() nm.mkdir()
small_file = nm / "small.txt" small_file = nm / "small.txt"
@@ -189,6 +203,7 @@ class TestBuildArtifacts:
assert not any("node_modules" in b.name for b in bottlenecks) assert not any("node_modules" in b.name for b in bottlenecks)
def test_finds_pycache(self, tmp_path): def test_finds_pycache(self, tmp_path):
"""Verifies finds pycache logic."""
cache = tmp_path / "__pycache__" cache = tmp_path / "__pycache__"
cache.mkdir() cache.mkdir()
big_file = cache / "big.pyc" big_file = cache / "big.pyc"
@@ -202,6 +217,7 @@ class TestMakefileAnalysis:
"""Test Makefile analysis.""" """Test Makefile analysis."""
def test_finds_pip_install(self, tmp_path): def test_finds_pip_install(self, tmp_path):
"""Verifies finds pip install logic."""
makefile = tmp_path / "Makefile" makefile = tmp_path / "Makefile"
makefile.write_text(textwrap.dedent(''' makefile.write_text(textwrap.dedent('''
install: install:
@@ -215,6 +231,7 @@ class TestMakefileAnalysis:
assert len(bottlenecks) >= 1 assert len(bottlenecks) >= 1
def test_no_makefile(self, tmp_path): def test_no_makefile(self, tmp_path):
"""Verifies no makefile logic."""
bottlenecks = analyze_makefile_targets(str(tmp_path)) bottlenecks = analyze_makefile_targets(str(tmp_path))
assert bottlenecks == [] assert bottlenecks == []
@@ -223,6 +240,7 @@ class TestImportAnalysis:
"""Test heavy import detection.""" """Test heavy import detection."""
def test_finds_pandas(self, tmp_path): def test_finds_pandas(self, tmp_path):
"""Verifies finds pandas logic."""
src = tmp_path / "analysis.py" src = tmp_path / "analysis.py"
src.write_text("import pandas as pd\n") src.write_text("import pandas as pd\n")
@@ -231,6 +249,7 @@ class TestImportAnalysis:
assert any("pandas" in b.name for b in bottlenecks) assert any("pandas" in b.name for b in bottlenecks)
def test_finds_torch(self, tmp_path): def test_finds_torch(self, tmp_path):
"""Verifies finds torch logic."""
src = tmp_path / "model.py" src = tmp_path / "model.py"
src.write_text("import torch\n") src.write_text("import torch\n")
@@ -238,6 +257,7 @@ class TestImportAnalysis:
assert any("torch" in b.name for b in bottlenecks) assert any("torch" in b.name for b in bottlenecks)
def test_skips_light_imports(self, tmp_path): def test_skips_light_imports(self, tmp_path):
"""Verifies skips light imports logic."""
src = tmp_path / "utils.py" src = tmp_path / "utils.py"
src.write_text("import json\nimport os\nimport sys\n") src.write_text("import json\nimport os\nimport sys\n")
@@ -249,12 +269,14 @@ class TestGenerateReport:
"""Test full report generation.""" """Test full report generation."""
def test_empty_repo(self, tmp_path): def test_empty_repo(self, tmp_path):
"""Verifies behavior with empty or None input."""
report = generate_report(str(tmp_path)) report = generate_report(str(tmp_path))
assert report.summary["total_bottlenecks"] >= 0 assert report.summary["total_bottlenecks"] >= 0
assert "critical" in report.summary assert "critical" in report.summary
assert "warning" in report.summary assert "warning" in report.summary
def test_with_findings(self, tmp_path): def test_with_findings(self, tmp_path):
"""Verifies with findings logic."""
# Create a test file with issues # Create a test file with issues
test_file = tmp_path / "test_slow.py" test_file = tmp_path / "test_slow.py"
test_file.write_text(textwrap.dedent(''' test_file.write_text(textwrap.dedent('''
@@ -273,6 +295,7 @@ class TestGenerateReport:
assert len(report.bottlenecks) > 0 assert len(report.bottlenecks) > 0
def test_summary_categories(self, tmp_path): def test_summary_categories(self, tmp_path):
"""Verifies summary categories logic."""
report = generate_report(str(tmp_path)) report = generate_report(str(tmp_path))
assert "by_category" in report.summary assert "by_category" in report.summary
@@ -281,6 +304,7 @@ class TestMarkdownReport:
"""Test markdown output.""" """Test markdown output."""
def test_format(self): def test_format(self):
"""Verifies format logic."""
report = PerfReport( report = PerfReport(
timestamp="2026-01-01T00:00:00Z", timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo", repo_path="/tmp/repo",
@@ -303,6 +327,7 @@ class TestMarkdownReport:
assert "Fix it" in md assert "Fix it" in md
def test_empty_report(self): def test_empty_report(self):
"""Verifies behavior with empty or None input."""
report = PerfReport( report = PerfReport(
timestamp="2026-01-01T00:00:00Z", timestamp="2026-01-01T00:00:00Z",
repo_path="/tmp/repo", repo_path="/tmp/repo",

View File

@@ -21,27 +21,32 @@ from quality_gate import (
class TestScoreSpecificity(unittest.TestCase): class TestScoreSpecificity(unittest.TestCase):
def test_specific_content_scores_high(self): def test_specific_content_scores_high(self):
"""Verifies specific content scores high logic."""
content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx." content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx."
score = score_specificity(content) score = score_specificity(content)
self.assertGreater(score, 0.6) self.assertGreater(score, 0.6)
def test_vague_content_scores_low(self): def test_vague_content_scores_low(self):
"""Verifies vague content scores low logic."""
content = "It generally depends. Various factors might affect this. Basically, it varies." content = "It generally depends. Various factors might affect this. Basically, it varies."
score = score_specificity(content) score = score_specificity(content)
self.assertLess(score, 0.5) self.assertLess(score, 0.5)
def test_empty_scores_baseline(self): def test_empty_scores_baseline(self):
"""Verifies behavior with empty or None input."""
score = score_specificity("") score = score_specificity("")
self.assertAlmostEqual(score, 0.5, delta=0.1) self.assertAlmostEqual(score, 0.5, delta=0.1)
class TestScoreActionability(unittest.TestCase): class TestScoreActionability(unittest.TestCase):
def test_actionable_content_scores_high(self): def test_actionable_content_scores_high(self):
"""Verifies actionable content scores high logic."""
content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`" content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`"
score = score_actionability(content) score = score_actionability(content)
self.assertGreater(score, 0.6) self.assertGreater(score, 0.6)
def test_abstract_content_scores_low(self): def test_abstract_content_scores_low(self):
"""Verifies abstract content scores low logic."""
content = "The concept of intelligence is fascinating and multifaceted." content = "The concept of intelligence is fascinating and multifaceted."
score = score_actionability(content) score = score_actionability(content)
self.assertLess(score, 0.5) self.assertLess(score, 0.5)
@@ -49,33 +54,40 @@ class TestScoreActionability(unittest.TestCase):
class TestScoreFreshness(unittest.TestCase): class TestScoreFreshness(unittest.TestCase):
def test_recent_timestamp_scores_high(self): def test_recent_timestamp_scores_high(self):
"""Verifies recent timestamp scores high logic."""
recent = datetime.now(timezone.utc).isoformat() recent = datetime.now(timezone.utc).isoformat()
score = score_freshness(recent) score = score_freshness(recent)
self.assertGreater(score, 0.9) self.assertGreater(score, 0.9)
def test_old_timestamp_scores_low(self): def test_old_timestamp_scores_low(self):
"""Verifies old timestamp scores low logic."""
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat() old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
score = score_freshness(old) score = score_freshness(old)
self.assertLess(score, 0.2) self.assertLess(score, 0.2)
def test_none_returns_baseline(self): def test_none_returns_baseline(self):
"""Verifies behavior with empty or None input."""
score = score_freshness(None) score = score_freshness(None)
self.assertEqual(score, 0.5) self.assertEqual(score, 0.5)
class TestScoreSourceQuality(unittest.TestCase): class TestScoreSourceQuality(unittest.TestCase):
def test_claude_scores_high(self): def test_claude_scores_high(self):
"""Verifies claude scores high logic."""
self.assertGreater(score_source_quality("claude-sonnet"), 0.85) self.assertGreater(score_source_quality("claude-sonnet"), 0.85)
def test_ollama_scores_lower(self): def test_ollama_scores_lower(self):
"""Verifies ollama scores lower logic."""
self.assertLess(score_source_quality("ollama"), 0.7) self.assertLess(score_source_quality("ollama"), 0.7)
def test_unknown_returns_default(self): def test_unknown_returns_default(self):
"""Verifies unknown returns default logic."""
self.assertEqual(score_source_quality("unknown"), 0.5) self.assertEqual(score_source_quality("unknown"), 0.5)
class TestScoreEntry(unittest.TestCase): class TestScoreEntry(unittest.TestCase):
def test_good_entry_scores_high(self): def test_good_entry_scores_high(self):
"""Verifies good entry scores high logic."""
entry = { entry = {
"content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.", "content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.",
"model": "claude-sonnet", "model": "claude-sonnet",
@@ -85,6 +97,7 @@ class TestScoreEntry(unittest.TestCase):
self.assertGreater(score, 0.6) self.assertGreater(score, 0.6)
def test_poor_entry_scores_low(self): def test_poor_entry_scores_low(self):
"""Verifies poor entry scores low logic."""
entry = { entry = {
"content": "It depends. Various things might happen.", "content": "It depends. Various things might happen.",
"model": "unknown", "model": "unknown",
@@ -95,6 +108,7 @@ class TestScoreEntry(unittest.TestCase):
class TestFilterEntries(unittest.TestCase): class TestFilterEntries(unittest.TestCase):
def test_filters_low_quality(self): def test_filters_low_quality(self):
"""Verifies knowledge filtering by filters low quality."""
entries = [ entries = [
{"content": "Run `deploy.py` to fix the issue.", "model": "claude"}, {"content": "Run `deploy.py` to fix the issue.", "model": "claude"},
{"content": "It might work sometimes.", "model": "unknown"}, {"content": "It might work sometimes.", "model": "unknown"},

View File

@@ -1,118 +0,0 @@
"""
Tests for session_pair_harvester — training pair extraction from sessions.
"""
import json
import tempfile
import unittest
from pathlib import Path
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from session_pair_harvester import (
extract_pairs_from_conversation,
extract_from_jsonl_file,
deduplicate_pairs,
compute_hash,
)
class TestSessionPairHarvester(unittest.TestCase):
def test_compute_hash_consistent(self):
h1 = compute_hash("hello world")
h2 = compute_hash("hello world")
self.assertEqual(h1, h2)
self.assertEqual(len(h1), 16)
def test_extract_simple_qa_pair(self):
"""A simple user→assistant exchange produces one pair."""
conversation = [
{"role": "user", "content": "What is the capital of France?"},
{"role": "assistant", "content": "The capital of France is Paris. It is a major European city renowned for its art, fashion, gastronomy, cultural heritage, and historical significance. The city attracts millions of tourists annually."},
]
pairs = extract_pairs_from_conversation(conversation, "test_session", "test-model")
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "What is the capital of France?")
self.assertIn("Paris", pairs[0]["rich"])
self.assertEqual(pairs[0]["source"], "test_session")
def test_min_ratio_filter(self):
"""Very short responses are filtered out."""
conversation = [
{"role": "user", "content": "Yes"},
{"role": "assistant", "content": "No."},
]
# Default min_ratio = 1.5, min_words = 20 for response
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 0)
def test_min_words_filter(self):
"""Assistant responses below min word count are skipped."""
conversation = [
{"role": "user", "content": "Explain the project architecture in detail"},
{"role": "assistant", "content": "OK."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=5)
self.assertEqual(len(pairs), 0)
def test_skip_non_assistant_messages(self):
"""System and tool messages are ignored."""
conversation = [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there! How can I help you today?"},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
self.assertEqual(len(pairs), 1)
self.assertEqual(pairs[0]["terse"], "Hello")
def test_multiple_pairs_from_one_session(self):
"""A conversation with several Q&A turns yields multiple pairs."""
conversation = [
{"role": "user", "content": "First question?"},
{"role": "assistant", "content": "Here is a detailed and comprehensive answer that thoroughly explores multiple aspects of the subject. It provides background context and practical implications for the reader."},
{"role": "user", "content": "Second?"},
{"role": "assistant", "content": "Another comprehensive response with detailed examples. This includes practical code blocks and thorough explanations to ensure deep understanding of the topic at hand."},
]
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_ratio=1.0)
self.assertEqual(len(pairs), 2)
def test_deduplication_removes_duplicates(self):
"""Identical pairs across sessions are deduplicated."""
pairs = [
{"terse": "q1", "rich": "a1", "source": "s1", "model": "m"},
{"terse": "q1", "rich": "a1", "source": "s2", "model": "m"},
{"terse": "q2", "rich": "a2", "source": "s1", "model": "m"},
]
unique = deduplicate_pairs(pairs)
self.assertEqual(len(unique), 2)
sources = {p["source"] for p in unique}
# First unique pair can be from either s1 or s2
self.assertIn("s1", sources)
def test_integration_with_test_sessions(self):
"""Harvester finds pairs in real test session files."""
repo_root = Path(__file__).parent.parent
test_sessions_dir = repo_root / "test_sessions"
if not test_sessions_dir.exists():
self.skipTest("test_sessions not found")
pairs = []
for jsonl_file in sorted(test_sessions_dir.glob("*.jsonl")):
pairs.extend(extract_from_jsonl_file(str(jsonl_file)))
self.assertGreater(len(pairs), 0, "Should extract at least one pair from test_sessions")
for p in pairs:
self.assertIn("terse", p)
self.assertIn("rich", p)
self.assertIn("source", p)
self.assertIn("model", p)
# Verify content exists
self.assertGreater(len(p["terse"]), 0)
self.assertGreater(len(p["rich"]), 0)
if __name__ == "__main__":
unittest.main()