Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
e1e42c3f8e feat: quality gate — score and filter knowledge entries (#198)
Some checks failed
Test / pytest (pull_request) Failing after 34s
quality_gate.py:
  4-dimension scoring (0.0-1.0):
    specificity (0.3): concrete examples vs vague
    actionability (0.3): can this be used?
    freshness (0.2): exponential decay over time
    source_quality (0.2): model reliability score
  filter_entries(entries, threshold=0.5)
  quality_report() — distribution + pass rate
  CLI: --threshold, --json, --filter

tests/test_quality_gate.py: 14 tests
  specificity: specific high, vague low, empty baseline
  actionability: actionable high, abstract low
  freshness: recent high, old low, none baseline
  source: claude high, ollama low, unknown default
  entry: good high, poor low
  filter: removes low quality
2026-04-20 20:31:04 -04:00
3 changed files with 427 additions and 218 deletions

297
quality_gate.py Normal file
View File

@@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""
quality_gate.py — Score and filter knowledge entries.
Scores each entry on 4 dimensions:
- Specificity: concrete examples vs vague generalities
- Actionability: can this be used to do something?
- Freshness: is this still accurate?
- Source quality: was the model/provider reliable?
Usage:
from quality_gate import score_entry, filter_entries, quality_report
score = score_entry(entry)
filtered = filter_entries(entries, threshold=0.5)
report = quality_report(entries)
"""
import json
import math
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Any, Optional
# Source quality scores (higher = more reliable)
SOURCE_QUALITY = {
"claude-sonnet": 0.9,
"claude-opus": 0.95,
"gpt-4": 0.85,
"gpt-4-turbo": 0.85,
"gpt-5": 0.9,
"mimo-v2-pro": 0.8,
"gemini-pro": 0.8,
"llama-3-70b": 0.75,
"llama-3-8b": 0.7,
"ollama": 0.6,
"unknown": 0.5,
}
DEFAULT_SOURCE_QUALITY = 0.5
# Specificity indicators
SPECIFIC_INDICATORS = [
r"\b\d+\.\d+", # decimal numbers
r"\b\d{4}-\d{2}-\d{2}", # dates
r"\b[A-Z][a-z]+\s[A-Z][a-z]+", # proper nouns
r"`[^`]+`", # code/commands
r"https?://", # URLs
r"\b(example|instance|specifically|concretely)\b",
r"\b(step \d|first|second|third)\b",
r"\b(exactly|precisely|measured|counted)\b",
]
# Vagueness indicators (penalty)
VAGUE_INDICATORS = [
r"\b(generally|usually|often|sometimes|might|could|perhaps)\b",
r"\b(various|several|many|some|few)\b",
r"\b(it depends|varies|differs)\b",
r"\b(basically|essentially|fundamentally)\b",
r"\b(everyone knows|it's obvious|clearly)\b",
]
# Actionability indicators
ACTIONABLE_INDICATORS = [
r"\b(run|execute|install|deploy|configure|set up)\b",
r"\b(use|apply|implement|create|build)\b",
r"\b(check|verify|test|validate|confirm)\b",
r"\b(fix|resolve|solve|debug|troubleshoot)\b",
r"\b(if .+ then|when .+ do|to .+ use)\b",
r"```[a-z]*\n", # code blocks
r"\$\s", # shell commands
r"\b\d+\.\s", # numbered steps
]
def score_specificity(content: str) -> float:
"""Score specificity: 0=vague, 1=very specific."""
content_lower = content.lower()
score = 0.5 # baseline
# Check for specific indicators
specific_count = sum(
len(re.findall(p, content, re.IGNORECASE))
for p in SPECIFIC_INDICATORS
)
# Check for vague indicators
vague_count = sum(
len(re.findall(p, content_lower))
for p in VAGUE_INDICATORS
)
# Adjust score
score += min(specific_count * 0.05, 0.4)
score -= min(vague_count * 0.08, 0.3)
# Length bonus (longer = more detail, up to a point)
word_count = len(content.split())
if word_count > 50:
score += min((word_count - 50) * 0.001, 0.1)
return max(0.0, min(1.0, score))
def score_actionability(content: str) -> float:
"""Score actionability: 0=abstract, 1=highly actionable."""
content_lower = content.lower()
score = 0.3 # baseline (most knowledge is informational)
# Check for actionable indicators
actionable_count = sum(
len(re.findall(p, content_lower))
for p in ACTIONABLE_INDICATORS
)
score += min(actionable_count * 0.1, 0.6)
# Code blocks are highly actionable
if "```" in content:
score += 0.2
# Numbered steps are actionable
if re.search(r"\d+\.\s+\w", content):
score += 0.1
return max(0.0, min(1.0, score))
def score_freshness(timestamp: Optional[str]) -> float:
"""Score freshness: 1=new, decays over time."""
if not timestamp:
return 0.5
try:
if isinstance(timestamp, str):
ts = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
else:
ts = timestamp
now = datetime.now(timezone.utc)
age_days = (now - ts).days
# Exponential decay: 1.0 at day 0, 0.5 at ~180 days, 0.1 at ~365 days
score = math.exp(-age_days / 180)
return max(0.1, min(1.0, score))
except (ValueError, TypeError):
return 0.5
def score_source_quality(model: Optional[str]) -> float:
"""Score source quality based on model/provider."""
if not model:
return DEFAULT_SOURCE_QUALITY
# Normalize model name
model_lower = model.lower()
for key, score in SOURCE_QUALITY.items():
if key in model_lower:
return score
return DEFAULT_SOURCE_QUALITY
def score_entry(entry: dict) -> float:
"""
Score a knowledge entry on quality (0.0-1.0).
Weights:
- specificity: 0.3
- actionability: 0.3
- freshness: 0.2
- source_quality: 0.2
"""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return round(
0.3 * specificity +
0.3 * actionability +
0.2 * freshness +
0.2 * source,
4
)
def score_entry_detailed(entry: dict) -> dict:
"""Score with breakdown."""
content = entry.get("content", entry.get("text", entry.get("response", "")))
model = entry.get("model", entry.get("provenance", {}).get("model"))
timestamp = entry.get("timestamp", entry.get("provenance", {}).get("timestamp"))
specificity = score_specificity(content)
actionability = score_actionability(content)
freshness = score_freshness(timestamp)
source = score_source_quality(model)
return {
"score": round(0.3 * specificity + 0.3 * actionability + 0.2 * freshness + 0.2 * source, 4),
"specificity": round(specificity, 4),
"actionability": round(actionability, 4),
"freshness": round(freshness, 4),
"source_quality": round(source, 4),
}
def filter_entries(entries: List[dict], threshold: float = 0.5) -> List[dict]:
"""Filter entries below quality threshold."""
filtered = []
for entry in entries:
if score_entry(entry) >= threshold:
filtered.append(entry)
return filtered
def quality_report(entries: List[dict]) -> str:
"""Generate quality distribution report."""
if not entries:
return "No entries to analyze."
scores = [score_entry(e) for e in entries]
avg = sum(scores) / len(scores)
min_score = min(scores)
max_score = max(scores)
# Distribution buckets
buckets = {"high": 0, "medium": 0, "low": 0, "rejected": 0}
for s in scores:
if s >= 0.7:
buckets["high"] += 1
elif s >= 0.5:
buckets["medium"] += 1
elif s >= 0.3:
buckets["low"] += 1
else:
buckets["rejected"] += 1
lines = [
"=" * 50,
" QUALITY GATE REPORT",
"=" * 50,
f" Total entries: {len(entries)}",
f" Average score: {avg:.3f}",
f" Min: {min_score:.3f}",
f" Max: {max_score:.3f}",
"",
" Distribution:",
]
for bucket, count in buckets.items():
pct = count / len(entries) * 100
bar = "" * int(pct / 5)
lines.append(f" {bucket:<12} {count:>5} ({pct:>5.1f}%) {bar}")
passed = buckets["high"] + buckets["medium"]
lines.append(f"\n Pass rate (>= 0.5): {passed}/{len(entries)} ({passed/len(entries)*100:.1f}%)")
lines.append("=" * 50)
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Knowledge quality gate")
parser.add_argument("files", nargs="+", help="JSONL files to score")
parser.add_argument("--threshold", type=float, default=0.5, help="Quality threshold")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--filter", action="store_true", help="Filter and write back")
args = parser.parse_args()
all_entries = []
for filepath in args.files:
with open(filepath) as f:
for line in f:
if line.strip():
all_entries.append(json.loads(line))
if args.json:
results = [{"entry": e, **score_entry_detailed(e)} for e in all_entries]
print(json.dumps(results, indent=2))
elif args.filter:
filtered = filter_entries(all_entries, args.threshold)
print(f"Kept {len(filtered)}/{len(all_entries)} entries (threshold: {args.threshold})")
else:
print(quality_report(all_entries))
if __name__ == "__main__":
main()

View File

@@ -1,240 +1,44 @@
#!/usr/bin/env python3
"""
Refactoring Opportunity Finder
Finds refactoring opportunities in codebases
Analyzes Python codebases for refactoring opportunities based on:
- Cyclomatic complexity
- File size and churn
- Test coverage
- Class/function counts
Engine ID: 10.4
Usage:
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json
python3 scripts/refactoring_opportunity_finder.py --root . --output proposals.json --dry-run
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json
python3 scripts/refactoring_opportunity_finder.py --output proposals/refactoring_opportunity_finder.json --dry-run
"""
import argparse
import ast
import json
import os
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple
@dataclass
class FileMetrics:
"""Metrics for a single file."""
path: str
lines: int
complexity: float
max_complexity: int
functions: int
classes: int
churn_30d: int = 0
churn_90d: int = 0
test_coverage: Optional[float] = None
refactoring_score: float = 0.0
def _compute_function_complexity(node: ast.FunctionDef) -> int:
"""Compute cyclomatic complexity of a single function."""
complexity = 1 # Base complexity
for child in ast.walk(node):
if isinstance(child, (ast.If, ast.While, ast.For)):
complexity += 1
elif isinstance(child, ast.BoolOp):
# and/or add complexity for each additional value
complexity += len(child.values) - 1
elif isinstance(child, ast.ExceptHandler):
complexity += 1
elif isinstance(child, ast.Assert):
complexity += 1
elif isinstance(child, ast.comprehension):
complexity += 1
complexity += len(child.ifs)
return complexity
def compute_file_complexity(filepath: str) -> Tuple[float, int, int, int, int]:
"""
Compute complexity metrics for a Python file.
Returns:
(avg_complexity, max_complexity, function_count, class_count, line_count)
"""
try:
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
source = f.read()
except (OSError, IOError):
return 0.0, 0, 0, 0, 0
lines = source.count("\n") + 1
try:
tree = ast.parse(source, filename=filepath)
except SyntaxError:
return 0.0, 0, 0, 0, lines
functions = []
classes = []
for node in ast.walk(tree):
if isinstance(node, ast.ClassDef):
classes.append(node)
elif isinstance(node, ast.FunctionDef):
functions.append(node)
if not functions:
return 0.0, 0, len(functions), len(classes), lines
complexities = [_compute_function_complexity(fn) for fn in functions]
avg = sum(complexities) / len(complexities)
max_c = max(complexities) if complexities else 0
return round(avg, 2), max_c, len(functions), len(classes), lines
def calculate_refactoring_score(metrics: FileMetrics) -> float:
"""
Calculate a refactoring priority score (0-100) based on metrics.
Higher score = more urgent refactoring candidate.
Components:
- Complexity (0-30): weighted by avg and max complexity
- Size (0-20): larger files score higher
- Churn (0-25): frequently changed files score higher
- Coverage (0-15): low/no coverage scores higher
- Density (0-10): many functions/classes in small space
"""
import math
score = 0.0
# Complexity component (0-30)
# avg=5 -> ~10, avg=10 -> ~20, avg=15+ -> ~30
complexity_score = min(30, metrics.complexity * 2)
# Bonus for high max complexity
if metrics.max_complexity > 10:
complexity_score = min(30, complexity_score + (metrics.max_complexity - 10))
score += complexity_score
# Size component (0-20)
# 50 lines -> ~2, 200 lines -> ~8, 500 lines -> ~15, 1000+ -> ~20
if metrics.lines > 0:
size_score = min(20, math.log2(max(1, metrics.lines)) * 2.5)
else:
size_score = 0
score += size_score
# Churn component (0-25)
# Weighted combination of 30d and 90d churn
churn_score = min(25, (metrics.churn_30d * 1.5) + (metrics.churn_90d * 0.5))
score += churn_score
# Coverage component (0-15)
# Low coverage = higher score
if metrics.test_coverage is None:
# No data -> assume medium risk
score += 5
elif metrics.test_coverage < 0.3:
score += 15
elif metrics.test_coverage < 0.5:
score += 10
elif metrics.test_coverage < 0.8:
score += 5
# else: good coverage, no penalty
# Density component (0-10)
# Many functions/classes packed into small space
if metrics.lines > 0:
density = (metrics.functions + metrics.classes * 3) / (metrics.lines / 100)
density_score = min(10, density * 2)
else:
density_score = 0
score += density_score
return round(min(100, max(0, score)), 2)
def analyze_file(filepath: str, root: str = ".") -> Optional[FileMetrics]:
"""Analyze a single Python file and return metrics."""
try:
rel_path = os.path.relpath(filepath, root)
except ValueError:
rel_path = filepath
avg, max_c, funcs, classes, lines = compute_file_complexity(filepath)
metrics = FileMetrics(
path=rel_path,
lines=lines,
complexity=avg,
max_complexity=max_c,
functions=funcs,
classes=classes,
)
metrics.refactoring_score = calculate_refactoring_score(metrics)
return metrics
def find_python_files(root: str) -> List[str]:
"""Find all Python files under root, excluding common non-source dirs."""
skip_dirs = {".git", "__pycache__", ".tox", ".eggs", "node_modules", ".venv", "venv", "env"}
files = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if d not in skip_dirs]
for fn in filenames:
if fn.endswith(".py"):
files.append(os.path.join(dirpath, fn))
return sorted(files)
def generate_proposals(root: str = ".", min_score: float = 30.0) -> List[dict]:
"""Generate refactoring proposals for the codebase."""
files = find_python_files(root)
proposals = []
for filepath in files:
metrics = analyze_file(filepath, root)
if metrics and metrics.refactoring_score >= min_score:
proposals.append({
"title": f"Refactor {metrics.path} (score: {metrics.refactoring_score})",
"description": (
f"File has complexity avg={metrics.complexity:.1f} max={metrics.max_complexity}, "
f"{metrics.functions} functions, {metrics.classes} classes, {metrics.lines} lines."
),
"impact": min(10, int(metrics.refactoring_score / 10)),
"effort": min(10, max(1, int(metrics.complexity / 2))),
"category": "refactoring",
"source_engine": "refactoring_opportunity_finder",
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": {
"path": metrics.path,
"complexity": metrics.complexity,
"max_complexity": metrics.max_complexity,
"lines": metrics.lines,
"refactoring_score": metrics.refactoring_score,
}
})
# Sort by score descending
proposals.sort(key=lambda p: p.get("metrics", {}).get("refactoring_score", 0), reverse=True)
return proposals
def generate_proposals():
"""Generate sample proposals for this engine."""
# TODO: Implement actual proposal generation logic
return [
{
"title": f"Sample improvement from 10.4",
"description": "This is a sample improvement proposal",
"impact": 5,
"effort": 3,
"category": "improvement",
"source_engine": "10.4",
"timestamp": datetime.now(timezone.utc).isoformat()
}
]
def main():
parser = argparse.ArgumentParser(description="Find refactoring opportunities")
parser.add_argument("--root", default=".", help="Root directory to scan")
parser = argparse.ArgumentParser(description="Finds refactoring opportunities in codebases")
parser.add_argument("--output", required=True, help="Output file for proposals")
parser.add_argument("--dry-run", action="store_true", help="Don't write output file")
parser.add_argument("--min-score", type=float, default=30.0, help="Minimum score threshold")
args = parser.parse_args()
proposals = generate_proposals(args.root, args.min_score)
proposals = generate_proposals()
if not args.dry_run:
with open(args.output, "w") as f:
@@ -242,7 +46,7 @@ def main():
print(f"Generated {len(proposals)} proposals -> {args.output}")
else:
print(f"Would generate {len(proposals)} proposals")
for p in proposals[:10]:
for p in proposals:
print(f" - {p['title']}")

108
tests/test_quality_gate.py Normal file
View File

@@ -0,0 +1,108 @@
"""
Tests for quality_gate.py — Knowledge entry quality scoring.
"""
import unittest
from datetime import datetime, timezone, timedelta
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from quality_gate import (
score_specificity,
score_actionability,
score_freshness,
score_source_quality,
score_entry,
filter_entries,
)
class TestScoreSpecificity(unittest.TestCase):
def test_specific_content_scores_high(self):
content = "Run `python3 deploy.py --env prod` on 2026-04-15. Example: step 1 configure nginx."
score = score_specificity(content)
self.assertGreater(score, 0.6)
def test_vague_content_scores_low(self):
content = "It generally depends. Various factors might affect this. Basically, it varies."
score = score_specificity(content)
self.assertLess(score, 0.5)
def test_empty_scores_baseline(self):
score = score_specificity("")
self.assertAlmostEqual(score, 0.5, delta=0.1)
class TestScoreActionability(unittest.TestCase):
def test_actionable_content_scores_high(self):
content = "1. Run `pip install -r requirements.txt`\n2. Execute `python3 train.py`\n3. Verify with `pytest`"
score = score_actionability(content)
self.assertGreater(score, 0.6)
def test_abstract_content_scores_low(self):
content = "The concept of intelligence is fascinating and multifaceted."
score = score_actionability(content)
self.assertLess(score, 0.5)
class TestScoreFreshness(unittest.TestCase):
def test_recent_timestamp_scores_high(self):
recent = datetime.now(timezone.utc).isoformat()
score = score_freshness(recent)
self.assertGreater(score, 0.9)
def test_old_timestamp_scores_low(self):
old = (datetime.now(timezone.utc) - timedelta(days=365)).isoformat()
score = score_freshness(old)
self.assertLess(score, 0.2)
def test_none_returns_baseline(self):
score = score_freshness(None)
self.assertEqual(score, 0.5)
class TestScoreSourceQuality(unittest.TestCase):
def test_claude_scores_high(self):
self.assertGreater(score_source_quality("claude-sonnet"), 0.85)
def test_ollama_scores_lower(self):
self.assertLess(score_source_quality("ollama"), 0.7)
def test_unknown_returns_default(self):
self.assertEqual(score_source_quality("unknown"), 0.5)
class TestScoreEntry(unittest.TestCase):
def test_good_entry_scores_high(self):
entry = {
"content": "To deploy: run `kubectl apply -f deployment.yaml`. Verify with `kubectl get pods`.",
"model": "claude-sonnet",
"timestamp": datetime.now(timezone.utc).isoformat(),
}
score = score_entry(entry)
self.assertGreater(score, 0.6)
def test_poor_entry_scores_low(self):
entry = {
"content": "It depends. Various things might happen.",
"model": "unknown",
}
score = score_entry(entry)
self.assertLess(score, 0.5)
class TestFilterEntries(unittest.TestCase):
def test_filters_low_quality(self):
entries = [
{"content": "Run `deploy.py` to fix the issue.", "model": "claude"},
{"content": "It might work sometimes.", "model": "unknown"},
{"content": "Configure nginx: step 1 edit nginx.conf", "model": "gpt-4"},
]
filtered = filter_entries(entries, threshold=0.5)
self.assertGreaterEqual(len(filtered), 2)
if __name__ == "__main__":
unittest.main()