Compare commits

..

2 Commits

Author SHA1 Message Date
d7cfda8f03 test: sync refactoring opportunity finder tests (#210)
Some checks failed
Test / pytest (pull_request) Failing after 49s
2026-04-21 11:30:59 +00:00
b172e720e4 feat: implement refactoring opportunity finder — AST complexity + scoring (#210) 2026-04-21 11:30:58 +00:00
4 changed files with 147 additions and 575 deletions

View File

@@ -1,297 +0,0 @@
#!/usr/bin/env python3
"""
quality_gate.py — Score and filter knowledge entries.
Scores each entry on 4 dimensions:
- Specificity: concrete examples vs vague generalities
- Actionability: can this be used to do something?
- Freshness: is this still accurate?
- Source quality: was the model/provider reliable?
Usage:
from quality_gate import score_entry, filter_entries, quality_report
score = score_entry(entry)
filtered = filter_entries(entries, threshold=0.5)
report = quality_report(entries)
"""
import json
import math
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
# Source quality scores (higher = more reliable)
SOURCE_QUALITY = {
"claude-sonnet": 0.9,
"claude-opus": 0.95,
"gpt-4": 0.85,
"gpt-4-turbo": 0.85,
"gpt-5": 0.9,
"mimo-v2-pro": 0.8,
"gemini-pro": 0.8,
"llama-3-70b": 0.75,
"llama-3-8b": 0.7,
"ollama": 0.6,
"unknown": 0.5,
}
DEFAULT_SOURCE_QUALITY = 0.5
# Specificity indicators
SPECIFIC_INDICATORS = [
r"\b\d+\.\d+", # decimal numbers
r"\b\d{4}-\d{2}-\d{2}", # dates
r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns
r"`[^`]+`", # code/commands
r"https?://", # URLs
r"\b(example|instance|specifically|concretely)\b",
r"\b(step \d|first|second|third)\b",
r"\b(exactly|precisely|measured|counted)\b",
]
# Vagueness indicators (penalty)
VAGUE_INDICATORS = [
r"\b(generally|usually|often|sometimes|might|could|perhaps)\b",
r"\b(various|several|many|some|few)\b",
r"\b(it depends|varies|differs)\b",
r"\b(basically|essentially|fundamentally)\b",
r"\b(everyone knows|it's obvious|clearly)\b",
]
# Actionability indicators
ACTIONABLE_INDICATORS = [
r"\b(run|execute|install|deploy|configure|set up)\b",
r"\b(use|apply|implement|create|build)\b",
r"\b(check|verify|test|validate|confirm)\b",
r"\b(fix|resolve|solve|debug|troubleshoot)\b",
r"\b(if .+ then|when .+ do|to .+ use)\b",
r"```[a-z]*\n", # code blocks
r"\$\s", # shell commands
r"\b\d+\.\s", # numbered steps
]
def score_specificity(content: str) -> float:
"""Score specificity: 0=vague, 1=very specific."""
content_lower = content.lower()
score = 0.5 # baseline
# Check for specific indicators
specific_count = sum(
len(re.findall(p, content, re.IGNORECASE))
for p in SPECIFIC_INDICATORS
)
# Check for vague indicators
vague_count = sum(
len(re.findall(p, content_lower))
for p in VAGUE_INDICATORS
)
# Adjust score
score += min(specific_count * 0.05, 0.4)
score -= min(vague_count * 0.08, 0.3)
# Length bonus (longer = more detail, up to a point)
word_count = len(content.split())
if word_count > 50:
score += min((word_count - 50) * 0.001, 0.1)
return max(0.0, min(1.0, score))
def score_actionability(content: str) -> float:
"""Score actionability: 0=abstract, 1=highly actionable."""
content_lower = content.lower()
score = 0.3 # baseline (most knowledge is informational)
# Check for actionable indicators
actionable_count = sum(
len(re.findall(p, content_lower))
for p in ACTIONABLE_INDICATORS
)
score += min(actionable_count * 0.1, 0.6)
# Code blocks are highly actionable
if "```" in content:
score += 0.2
# Numbered steps are actionable
if re.search(r"\d+\.\s+\w", content):
score += 0.1
return max(0.0, min(1.0, score))
def score_freshness(timestamp: Optional[str]) -> float:
"""Score freshness: 1=new, decays over time."""
if not timestamp:
return 0.5
try:
if isinstance(timestamp, str):
ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
else:
ts = timestamp
now = datetime.now(timezone.utc)
age_days = (now - ts).days
# Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days
score = math.exp(-age_days / 180)
return max(0.1, min(1.0, score))
except (ValueError, TypeError):
return 0.5
def score_source_quality(model: Optional[str]) -> float:
"""Score source quality based on model/provider."""
if not model:
return DEFAULT_SOURCE_QUALITY
# Normalize model name
model_lower = model.lower()
for key, score in SOURCE_QUALITY.items():
if key in model_lower:
return score
return DEFAULT_SOURCE_QUALITY
def score_entry(entry: dict) -> float:
"""
Score a knowledge entry on quality (0.0-1.0).
Weights:
- specificity: 0.3
- actionability: 0.3
- freshness: 0.2
- source_quality: 0.2
"""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return round(
0.3 * specificity +
0.3 * actionability +
0.2 * freshness +
0.2 * source,
4
)
def score_entry_detailed(entry: dict) -> dict:
"""Score with breakdown."""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return {
"score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4),
"specificity": round(specificity, 4),
"actionability": round(actionability, 4),
"freshness": round(freshness, 4),
"source_quality": round(source, 4),
}
def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]:
"""Filter entries below quality threshold."""
filtered = []
for entry in entries:
if score_entry(entry) >= threshold:
filtered.append(entry)
return filtered
def quality_report(entries: List[dict]) -> str:
"""Generate quality distribution report."""
if not entries:
return "No entries to analyze."
scores = [score_entry(e) for e in entries]
avg = sum(scores) / len(scores)
min_score = min(scores)
max_score = max(scores)
# Distribution buckets
buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0}
for s in scores:
if s >= 0.7:
buckets["high"] += 1
elif s >= 0.5:
buckets["medium"] += 1
elif s >= 0.3:
buckets["low"] += 1
else:
buckets["rejected"] += 1
lines = [
"=" * 50,
" QUALITY GATE REPORT",
"=" * 50,
f" Total entries: {len(entries)}",
f" Average score: {avg:.3f}",
f" Min: {min_score:.3f}",
f" Max: {max_score:.3f}",
"",
" Distribution:",
]
for bucket, count in buckets.items():
pct = count / len(entries) * 100
bar = "" * int(pct / 5)
lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}")
passed = buckets["high"] + buckets["medium"]
lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)")
lines.append("=" * 50)
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Knowledge quality gate")
parser.add_argument("files", nargs="+", help="JSONL files to score")
parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--filter", action="store_true", help="Filter and write back")
args = parser.parse_args()
all_entries = []
for filepath in args.files:
with open(filepath) as f:
for line in f:
if line.strip():
all_entries.append(json.loads(line))
if args.json:
results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries]
print(json.dumps(results, indent=2))
elif args.filter:
filtered = filter_entries(all_entries, args.threshold)
print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})")
else:
print(quality_report(all_entries))
if __name__ == "__main__":
main()

View File

@@ -1,54 +1,144 @@
#!/usr/bin/env python3
"""
Finds refactoring opportunities in codebases
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
Refactoring Opportunity Finder — Engine 10.4
"""
import argparse
import ast
import json
import os
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import List, Optional, Tuple
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
@dataclass
class FileMetrics:
path: str
lines: int
complexity: float
max_complexity: int
functions: int
classes: int
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
class ComplexityVisitor(ast.NodeVisitor):
def __init__(self):
self.functions = []
self.classes = 0
def visit_FunctionDef(self, node):
complexity = 1
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
self.functions.append((node.name, complexity, node.lineno))
self.generic_visit(node)
def visit_AsyncFunctionDef(self, node):
self.visit_FunctionDef(node)
def visit_ClassDef(self, node):
self.classes += 1
self.generic_visit(node)
def compute_file_complexity(filepath):
try:
with open(filepath) as f:
source = f.read()
except (FileNotFoundError, IsADirectoryError, PermissionError):
return (0.0, 0, 0, 0, 0)
try:
tree = ast.parse(source)
except SyntaxError:
return (0.0, 0, 0, 0, 0)
lines = source.count("\n") + 1 if source.strip() else 0
visitor = ComplexityVisitor()
visitor.visit(tree)
if not visitor.functions:
return (0.0, 0, 0, visitor.classes, lines)
complexities = [c for _, c, _ in visitor.functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities)
return (round(avg, 1), max_c, len(visitor.functions), visitor.classes, lines)
def calculate_refactoring_score(metrics):
score = 0.0
complexity_score = min(40, metrics.complexity * 4)
if metrics.max_complexity > 10:
complexity_score = min(40, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
if metrics.lines <= 0:
pass
elif metrics.lines <= 100:
score += metrics.lines * 0.1
elif metrics.lines <= 500:
score += 10 + (metrics.lines - 100) * 0.0125
else:
score += min(20, 15 + (metrics.lines - 500) * 0.01)
churn_score = (metrics.churn_30d * 2) + (metrics.churn_90d * 0.5)
score += min(30, churn_score)
if metrics.test_coverage is None:
score += 5
elif metrics.test_coverage < 0.3:
score += 10
elif metrics.test_coverage < 0.6:
score += 7
elif metrics.test_coverage < 0.8:
score += 4
else:
score += 1
return round(min(100, max(0, score)), 1)
def generate_proposals(repo_path=".", threshold=30.0):
proposals = []
for root, dirs, files in os.walk(repo_path):
dirs[:] = [d for d in dirs if not d.startswith((".", "__pycache__", "node_modules", ".git", "venv"))]
for fname in files:
if not fname.endswith(".py"):
continue
filepath = os.path.join(root, fname)
relpath = os.path.relpath(filepath, repo_path)
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
if funcs == 0 and classes == 0:
continue
metrics = FileMetrics(path=relpath, lines=lines, complexity=avg, max_complexity=max_c, functions=funcs, classes=classes)
score = calculate_refactoring_score(metrics)
metrics.refactoring_score = score
if score >= threshold:
proposals.append({"title": f"Refactor {relpath} (score: {score})", "impact": min(10, int(score / 10)), "effort": min(10, max(1, funcs // 3)), "category": "refactoring", "source_engine": "10.4", "timestamp": datetime.now(timezone.utc).isoformat(), "metrics": asdict(metrics)})
return sorted(proposals, key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
def main():
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--repo", default=".")
parser.add_argument("--output", required=True)
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--threshold", type=float, default=30.0)
args = parser.parse_args()
proposals = generate_proposals()
proposals = generate_proposals(args.repo, args.threshold)
if not args.dry_run:
with open(args.output, "w") as f:
json.dump({"proposals": proposals}, f, indent=2)
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals:
print(f" - {p['title']}")
if __name__ == "__main__":
main()

View File

@@ -19,208 +19,95 @@ FileMetrics = mod.FileMetrics
def test_complexity_simple_function():
"""Simple function should have low complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def simple():
return 42
""")
f.write("\ndef simple():\n return 42\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 1.0, f"Expected 1.0, got {avg}"
assert max_c == 1, f"Expected 1, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert classes == 0, f"Expected 0, got {classes}"
assert avg == 1.0
assert max_c == 1
assert funcs == 1
assert classes == 0
os.unlink(f.name)
print("PASS: test_complexity_simple_function")
def test_complexity_with_conditionals():
"""Function with if/else should have higher complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def complex_func(x):
if x > 0:
if x > 10:
return "big"
else:
return "small"
elif x < 0:
return "negative"
else:
return "zero"
""")
f.write("\ndef complex_func(x):\n if x > 0:\n if x > 10:\n return 'big'\n else:\n return 'small'\n elif x < 0:\n return 'negative'\n else:\n return 'zero'\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 3 if/elif + 1 nested if = 5
assert max_c >= 4, f"Expected max_c >= 4, got {max_c}"
assert funcs == 1, f"Expected 1, got {funcs}"
assert max_c >= 4
assert funcs == 1
os.unlink(f.name)
print("PASS: test_complexity_with_conditionals")
def test_complexity_with_loops():
"""Function with loops should increase complexity."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
def loop_func(items):
result = []
for item in items:
if item > 0:
result.append(item)
while len(result) > 10:
result.pop()
return result
""")
f.write("\ndef loop_func(items):\n result = []\n for item in items:\n if item > 0:\n result.append(item)\n while len(result) > 10:\n result.pop()\n return result\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
# Base 1 + 1 for + 1 if + 1 while = 4
assert max_c >= 3, f"Expected max_c >= 3, got {max_c}"
assert max_c >= 3
os.unlink(f.name)
print("PASS: test_complexity_with_loops")
def test_complexity_with_class():
"""Class with methods should count both."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("""
class MyClass:
def method1(self):
if True:
pass
def method2(self):
for i in range(10):
pass
""")
f.write("\nclass MyClass:\n def method1(self):\n if True:\n pass\n def method2(self):\n for i in range(10):\n pass\n")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert classes == 1, f"Expected 1 class, got {classes}"
assert funcs == 2, f"Expected 2 functions, got {funcs}"
assert classes == 1
assert funcs == 2
os.unlink(f.name)
print("PASS: test_complexity_with_class")
def test_complexity_syntax_error():
"""File with syntax error should return zeros."""
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write("def broken(:\n pass")
f.flush()
avg, max_c, funcs, classes, lines = compute_file_complexity(f.name)
assert avg == 0.0, f"Expected 0.0, got {avg}"
assert funcs == 0, f"Expected 0, got {funcs}"
assert avg == 0.0
assert funcs == 0
os.unlink(f.name)
print("PASS: test_complexity_syntax_error")
def test_refactoring_score_high_complexity():
"""High complexity should give high score."""
metrics = FileMetrics(
path="test.py",
lines=200,
complexity=15.0,
max_complexity=25,
functions=10,
classes=2,
churn_30d=5,
churn_90d=15,
test_coverage=0.3,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=200, complexity=15.0, max_complexity=25, functions=10, classes=2, churn_30d=5, churn_90d=15, test_coverage=0.3, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score > 50, f"Expected score > 50, got {score}"
assert score > 50
print("PASS: test_refactoring_score_high_complexity")
def test_refactoring_score_low_complexity():
"""Low complexity should give lower score."""
metrics = FileMetrics(
path="test.py",
lines=50,
complexity=2.0,
max_complexity=3,
functions=3,
classes=0,
churn_30d=0,
churn_90d=1,
test_coverage=0.9,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=50, complexity=2.0, max_complexity=3, functions=3, classes=0, churn_30d=0, churn_90d=1, test_coverage=0.9, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
assert score < 30, f"Expected score < 30, got {score}"
assert score < 30
print("PASS: test_refactoring_score_low_complexity")
def test_refactoring_score_high_churn():
"""High churn should increase score."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=10,
churn_90d=20,
test_coverage=0.5,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=10, churn_90d=20, test_coverage=0.5, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Churn should contribute significantly
assert score > 40, f"Expected score > 40 for high churn, got {score}"
assert score > 40
print("PASS: test_refactoring_score_high_churn")
def test_refactoring_score_no_coverage():
"""No coverage data should assume medium risk."""
metrics = FileMetrics(
path="test.py",
lines=100,
complexity=5.0,
max_complexity=8,
functions=5,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=None,
refactoring_score=0.0
)
metrics = FileMetrics(path="test.py", lines=100, complexity=5.0, max_complexity=8, functions=5, classes=0, churn_30d=1, churn_90d=2, test_coverage=None, refactoring_score=0.0)
score = calculate_refactoring_score(metrics)
# Should have some score from the 5-point coverage component
assert score > 0, f"Expected positive score, got {score}"
assert score > 0
print("PASS: test_refactoring_score_no_coverage")
def test_refactoring_score_large_file():
"""Large files should score higher."""
metrics_small = FileMetrics(
path="small.py",
lines=50,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_large = FileMetrics(
path="large.py",
lines=1000,
complexity=5.0,
max_complexity=8,
functions=3,
classes=0,
churn_30d=1,
churn_90d=2,
test_coverage=0.8,
refactoring_score=0.0
)
metrics_small = FileMetrics(path="small.py", lines=50, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
metrics_large = FileMetrics(path="large.py", lines=1000, complexity=5.0, max_complexity=8, functions=3, classes=0, churn_30d=1, churn_90d=2, test_coverage=0.8, refactoring_score=0.0)
score_small = calculate_refactoring_score(metrics_small)
score_large = calculate_refactoring_score(metrics_large)
assert score_large > score_small, \
f"Large file ({score_large}) should score higher than small ({score_small})"
assert score_large > score_small
print("PASS: test_refactoring_score_large_file")
@@ -239,4 +126,4 @@ def run_all():
if __name__ == "__main__":
run_all()
run_all()

View File

@@ -1,108 +0,0 @@
"""
Tests for quality_gate.py — Knowledge entry quality scoring.
"""
import unittest
from datetime import datetime, timezone, timedelta
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from quality_gate import (
score_specificity,
score_actionability,
score_freshness,
score_source_quality,
score_entry,
filter_entries,
)
class TestScoreSpecificity(unittest.TestCase):
def test_specific_content_scores_high(self):
content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx."
score = score_specificity(content)
self.assertGreater(score, 0.6)
def test_vague_content_scores_low(self):
content = "It generally depends. Various factors might affect this. Basically, it varies."
score = score_specificity(content)
self.assertLess(score, 0.5)
def test_empty_scores_baseline(self):
score = score_specificity("")
self.assertAlmostEqual(score, 0.5, delta=0.1)
class TestScoreActionability(unittest.TestCase):
def test_actionable_content_scores_high(self):
content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`"
score = score_actionability(content)
self.assertGreater(score, 0.6)
def test_abstract_content_scores_low(self):
content = "The concept of intelligence is fascinating and multifaceted."
score = score_actionability(content)
self.assertLess(score, 0.5)
class TestScoreFreshness(unittest.TestCase):
def test_recent_timestamp_scores_high(self):
recent = datetime.now(timezone.utc).isoformat()
score = score_freshness(recent)
self.assertGreater(score, 0.9)
def test_old_timestamp_scores_low(self):
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
score = score_freshness(old)
self.assertLess(score, 0.2)
def test_none_returns_baseline(self):
score = score_freshness(None)
self.assertEqual(score, 0.5)
class TestScoreSourceQuality(unittest.TestCase):
def test_claude_scores_high(self):
self.assertGreater(score_source_quality("claude-sonnet"), 0.85)
def test_ollama_scores_lower(self):
self.assertLess(score_source_quality("ollama"), 0.7)
def test_unknown_returns_default(self):
self.assertEqual(score_source_quality("unknown"), 0.5)
class TestScoreEntry(unittest.TestCase):
def test_good_entry_scores_high(self):
entry = {
"content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.",
"model": "claude-sonnet",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
score = score_entry(entry)
self.assertGreater(score, 0.6)
def test_poor_entry_scores_low(self):
entry = {
"content": "It depends. Various things might happen.",
"model": "unknown",
}
score = score_entry(entry)
self.assertLess(score, 0.5)
class TestFilterEntries(unittest.TestCase):
def test_filters_low_quality(self):
entries = [
{"content": "Run `deploy.py` to fix the issue.", "model": "claude"},
{"content": "It might work sometimes.", "model": "unknown"},
{"content": "Configure nginx: step 1 edit nginx.conf", "model": "gpt-4"},
]
filtered = filter_entries(entries, threshold=0.5)
self.assertGreaterEqual(len(filtered), 2)
if __name__ == "__main__":
unittest.main()