Compare commits

..

2 Commits

7 changed files with 1055 additions and 700 deletions

View File

@@ -0,0 +1,366 @@
#!/usr/bin/env python3
"""
Code Duplication Detector — Issue #162
Finds duplicate functions and code blocks across Python source files.
Reports duplication percentage and outputs a duplication report.
Usage:
python3 scripts/code_duplication_detector.py --output reports/code_duplication.json
python3 scripts/code_duplication_detector.py --directory scripts/ --dry-run
python3 scripts/code_duplication_detector.py --test # Run built-in test
"""
import argparse
import hashlib
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Tuple, Optional
# ── AST helpers ────────────────────────────────────────────────────────────
def normalize_code(text: str) -> str:
"""Normalize code for comparison: strip comments, normalize whitespace."""
# Remove comments (both # and docstring triple-quote strings)
text = re.sub(r'#.*$', '', text, flags=re.MULTILINE)
text = re.sub(r'""".*?"""', '', text, flags=re.DOTALL)
text = re.sub(r"'''.*?'''", '', text, flags=re.DOTALL)
# Normalize whitespace
text = re.sub(r'\s+', ' ', text).strip()
return text.lower()
def code_hash(text: str) -> str:
"""SHA256 hash of normalized code for exact duplicate detection."""
normalized = normalize_code(text)
return hashlib.sha256(normalized.encode('utf-8')).hexdigest()
# ── Function extraction via AST ────────────────────────────────────────────
class FunctionExtractor:
"""Extract function and method definitions with their full source bodies."""
def __init__(self, source: str, filepath: str):
self.source = source
self.filepath = filepath
self.lines = source.splitlines()
self.functions: List[Dict] = []
def _get_source_segment(self, start_lineno: int, end_lineno: int) -> str:
"""Get source code from start to end line (1-indexed, inclusive)."""
# AST end_lineno is inclusive
start_idx = start_lineno - 1
end_idx = end_lineno
return '\n'.join(self.lines[start_idx:end_idx])
def visit(self, tree):
"""Collect all function and async function definitions."""
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef) or isinstance(node, ast.AsyncFunctionDef):
# Get the full source for this function including decorators
start = node.lineno
end = node.end_lineno
body_source = self._get_source_segment(start, end)
# Also collect parent class name if this is a method
class_name = None
parent = node.parent if hasattr(node, 'parent') else None
if parent and isinstance(parent, ast.ClassDef):
class_name = parent.name
self.functions.append({
'name': node.name,
'file': self.filepath,
'start_line': start,
'end_line': end,
'body': body_source,
'class_name': class_name,
'is_method': class_name is not None,
})
import ast
class ParentNodeVisitor(ast.NodeVisitor):
"""Annotate nodes with parent references."""
def __init__(self, parent=None):
self.parent = parent
def generic_visit(self, node):
node.parent = self.parent
for child in ast.iter_child_nodes(node):
self.__class__(child).parent = node
super().generic_visit(node)
def extract_functions_from_file(filepath: str) -> List[Dict]:
"""Extract all function definitions from a Python file."""
try:
with open(filepath, 'r', encoding='utf-8', errors='replace') as f:
source = f.read()
tree = ast.parse(source, filename=str(filepath))
# Annotate with parent references
for node in ast.walk(tree):
for child in ast.iter_child_nodes(node):
child.parent = node
extractor = FunctionExtractor(source, str(filepath))
extractor.visit(tree)
return extractor.functions
except (SyntaxError, UnicodeDecodeError, OSError) as e:
return []
def scan_directory(directory: str, extensions: Tuple[str, ...] = ('.py',)) -> List[Dict]:
"""Scan directory for Python files and extract all functions."""
all_functions = []
path = Path(directory)
for filepath in path.rglob('*'):
if filepath.is_file() and filepath.suffix in extensions:
# Skip common non-source dirs
parts = filepath.parts
if any(ex in parts for ex in ('__pycache__', 'node_modules', '.git', 'venv', '.venv', 'dist', 'build')):
continue
if filepath.name.startswith('.'):
continue
functions = extract_functions_from_file(str(filepath))
all_functions.extend(functions)
return all_functions
# ── Duplicate detection ─────────────────────────────────────────────────────
def find_duplicates(functions: List[Dict], similarity_threshold: float = 0.95) -> Dict:
"""
Find duplicate and near-duplicate functions.
Returns dict with:
- exact_duplicates: {hash: [function_info, ...]}
- near_duplicates: [[function_info, ...], ...]
- stats: total_functions, unique_exact, exact_dupe_count, near_dupe_count
"""
# Phase 1: Exact duplicates by code hash
hash_groups: Dict[str, List[Dict]] = defaultdict(list)
for func in functions:
h = code_hash(func['body'])
hash_groups[h].append(func)
exact_duplicates = {h: group for h, group in hash_groups.items() if len(group) > 1}
exact_dupe_count = sum(len(group) - 1 for group in exact_duplicates.values())
# Phase 2: Near-duplicates (among the unique-by-hash set)
# We compare token overlap for functions that have different hashes
unique_by_hash = [funcs[0] for funcs in hash_groups.values()]
near_duplicate_groups = []
# Simple token-based similarity
def tokenize(code: str) -> set:
return set(re.findall(r'[a-zA-Z_][a-zA-Z0-9_]*', code.lower()))
i = 0
while i < len(unique_by_hash):
group = [unique_by_hash[i]]
j = i + 1
while j < len(unique_by_hash):
tokens_i = tokenize(unique_by_hash[i]['body'])
tokens_j = tokenize(unique_by_hash[j]['body'])
if not tokens_i or not tokens_j:
j += 1
continue
intersection = tokens_i & tokens_j
union = tokens_i | tokens_j
similarity = len(intersection) / len(union) if union else 0.0
if similarity >= similarity_threshold:
group.append(unique_by_hash[j])
unique_by_hash.pop(j)
else:
j += 1
if len(group) > 1:
near_duplicate_groups.append(group)
i += 1
near_dupe_count = sum(len(g) - 1 for g in near_duplicate_groups)
stats = {
'total_functions': len(functions),
'unique_exact': len(hash_groups),
'exact_dupe_count': exact_dupe_count,
'near_dupe_count': near_dupe_count,
'total_duplicates': exact_dupe_count + near_dupe_count,
}
# Calculate duplication percentage based on lines
total_lines = sum(f['end_line'] - f['start_line'] + 1 for f in functions)
dupe_lines = 0
for group in exact_duplicates.values():
# Count all but one as duplicates
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
for group in near_duplicate_groups:
for f in group[1:]:
dupe_lines += f['end_line'] - f['start_line'] + 1
stats['total_lines'] = total_lines
stats['duplicate_lines'] = dupe_lines
stats['duplication_percentage'] = round((dupe_lines / total_lines * 100) if total_lines else 0, 2)
return {
'exact_duplicates': exact_duplicates,
'near_duplicates': near_duplicate_groups,
'stats': stats,
}
# ── Report generation ────────────────────────────────────────────────────────
def generate_report(results: Dict, output_format: str = 'json') -> str:
"""Generate human-readable report from detection results."""
stats = results['stats']
if output_format == 'json':
return json.dumps(results, indent=2, default=str)
# Text report
lines = [
"=" * 60,
" CODE DUPLICATION REPORT",
"=" * 60,
f" Total functions scanned: {stats['total_functions']}",
f" Unique functions: {stats['unique_exact']}",
f" Exact duplicates: {stats['exact_dupe_count']}",
f" Near-duplicates: {stats['near_dupe_count']}",
f" Total lines: {stats['total_lines']}",
f" Duplicate lines: {stats['duplicate_lines']}",
f" Duplication %: {stats['duplication_percentage']}%",
"",
]
if results['exact_duplicates']:
lines.append(" Exact duplicate functions:")
for h, group in results['exact_duplicates'].items():
first = group[0]
lines.append(f" {first['name']} ({first['file']}:{first['start_line']}) — "
f"copied {len(group)-1}x in:")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
if results['near_duplicates']:
lines.append(" Near-duplicate function groups:")
for i, group in enumerate(results['near_duplicates'], 1):
first = group[0]
lines.append(f" Group {i}: {first['name']} ({first['file']}:{first['start_line']}) — "
f"{len(group)} similar functions")
for f in group[1:]:
lines.append(f"{f['file']}:{f['start_line']}")
lines.append("")
lines.append("=" * 60)
return '\n'.join(lines)
# ── CLI ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description="Code Duplication Detector")
parser.add_argument('--directory', default='.',
help='Directory to scan (default: current directory)')
parser.add_argument('--output', help='Output file for JSON report')
parser.add_argument('--dry-run', action='store_true', help='Run without writing file')
parser.add_argument('--threshold', type=float, default=0.95,
help='Similarity threshold for near-dupes (default: 0.95)')
parser.add_argument('--json', action='store_true', help='JSON output to stdout')
parser.add_argument('--test', action='store_true', help='Run built-in test')
args = parser.parse_args()
if args.test:
_run_test()
return
# Scan
functions = scan_directory(args.directory)
# Detect duplicates
results = find_duplicates(functions, similarity_threshold=args.threshold)
stats = results['stats']
# Output
if args.json:
print(json.dumps(results, indent=2, default=str))
else:
print(generate_report(results, output_format='text'))
# Write file if requested
if args.output and not args.dry_run:
os.makedirs(os.path.dirname(args.output) or '.', exist_ok=True)
with open(args.output, 'w') as f:
json.dump(results, f, indent=2, default=str)
print(f"\nReport written to: {args.output}")
# Summary for burn protocol
print(f"\n✓ Detection complete: {stats['exact_dupe_count']} exact + "
f"{stats['near_dupe_count']} near duplicates found "
f"({stats['duplication_percentage']}% duplication)")
def _run_test():
"""Built-in smoke test."""
import tempfile
import os
with tempfile.TemporaryDirectory() as tmpdir:
# Create test files with duplicate code
f1 = Path(tmpdir) / 'mod1.py'
f1.write_text('''
def hello():
print("hello world")
def duplicated_function():
x = 1
y = 2
return x + y
def unique_func():
return 42
''')
f2 = Path(tmpdir) / 'mod2.py'
f2.write_text('''
def duplicated_function():
x = 1
y = 2
return x + y
def another_unique():
return "different"
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, "Should find at least 1 exact duplicate"
assert stats['total_functions'] >= 4, "Should find at least 4 functions"
# Check duplication percentage is calculated
assert 'duplication_percentage' in stats
print(f"\n✓ Test passed: {stats['total_functions']} functions, "
f"{stats['exact_dupe_count']} exact duplicates, "
f"{stats['duplication_percentage']}% duplication")
if __name__ == '__main__':
main()

View File

@@ -1,418 +0,0 @@
#!/usr/bin/env python3
"""
knowledge_synthesizer.py — Zero-shot knowledge synthesis for compounding intelligence.
Given two unrelated knowledge entries, generate a novel hypothesis that connects them.
Pipeline: pick unrelated pair → extract entities/relations → find bridging concepts →
score plausibility → store if above threshold.
Usage:
python3 scripts/knowledge_synthesizer.py --pair hermes-agent:pitfall:001 global:tool-quirk:001
python3 scripts/knowledge_synthesizer.py --auto --threshold 0.75
python3 scripts/knowledge_synthesizer.py --dry-run # show candidate pair without synthesizing
"""
import argparse
import json
import os
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Tuple, List, Dict
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
REPO_ROOT = SCRIPT_DIR.parent
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
TEMPLATE_PATH = SCRIPT_DIR.parent / "templates" / "synthesis-prompt.md"
# Default API configuration
DEFAULT_API_BASE = os.environ.get(
"SYNTHESIS_API_BASE",
os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
)
DEFAULT_API_KEY = os.environ.get("SYNTHESIS_API_KEY", "")
DEFAULT_MODEL = os.environ.get(
"SYNTHESIS_MODEL",
os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
)
# Places to look for API keys if not in env
API_KEY_PATHS = [
os.path.expanduser("~/.config/nous/key"),
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
os.path.expanduser("~/.config/openrouter/key"),
]
def find_api_key() -> str:
for path in API_KEY_PATHS:
if os.path.exists(path):
with open(path) as f:
key = f.read().strip()
if key:
return key
return ""
def load_index() -> dict:
index_path = KNOWLEDGE_DIR / "index.json"
if not index_path.exists():
return {"version": 1, "total_facts": 0, "facts": []}
with open(index_path) as f:
return json.load(f)
def save_index(index: dict) -> None:
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
index_path = KNOWLEDGE_DIR / "index.json"
with open(index_path, 'w', encoding='utf-8') as f:
json.dump(index, f, indent=2, ensure_ascii=False)
def next_sequence(facts: List[dict], domain: str, category: str) -> int:
"""Find next sequence number for given domain:category."""
prefix = f"{domain}:{category}:"
max_seq = 0
for fact in facts:
fid = fact.get('id', '')
if fid.startswith(prefix):
try:
seq = int(fid.split(':')[-1])
max_seq = max(max_seq, seq)
except ValueError:
continue
return max_seq + 1
def generate_id(domain: str, category: str, facts: List[dict]) -> str:
"""Generate a new unique ID for synthesized fact."""
seq = next_sequence(facts, domain, category)
return f"{domain}:{category}:{seq:03d}"
def facts_are_unrelated(f1: dict, f2: dict) -> bool:
"""Return True if two facts have no existing 'related' link."""
id1, id2 = f1['id'], f2['id']
rel1 = set(f1.get('related', []))
rel2 = set(f2.get('related', []))
return (id2 not in rel1) and (id1 not in rel2)
def find_candidate_pair(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
"""Pick two unrelated facts from different domains if possible."""
# Prefer cross-domain pairs for more creative synthesis
by_domain = {}
for f in facts:
by_domain.setdefault(f['domain'], []).append(f)
domains = list(by_domain.keys())
if len(domains) < 2:
# Not enough domain diversity, pick any unrelated pair
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
# Try cross-domain first
for d1 in domains:
for d2 in domains:
if d1 == d2:
continue
for f1 in by_domain[d1]:
for f2 in by_domain[d2]:
if facts_are_unrelated(f1, f2):
return f1, f2
# Fallback to any unrelated pair
return find_candidate_pair_by_simple(facts)
def find_candidate_pair_by_simple(facts: List[dict]) -> Optional[Tuple[dict, dict]]:
for i, f1 in enumerate(facts):
for f2 in facts[i+1:]:
if facts_are_unrelated(f1, f2):
return f1, f2
return None
def load_synthesis_prompt() -> str:
if TEMPLATE_PATH.exists():
return TEMPLATE_PATH.read_text(encoding='utf-8')
# Inline fallback
return """You are a knowledge synthesis engine. Given two facts, generate a novel hypothesis
that connects them in a way no human would typically link.
TASK:
- Fact A: {fact_a}
- Fact B: {fact_b}
OUTPUT a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts in an actionable way",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
RULES:
1. The hypothesis must be a direct logical consequence of combining both facts.
2. Do NOT restate either fact — produce a new insight.
3. Plausibility should reflect how likely the hypothesis is to be true given the facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON, no markdown.
"""
def call_synthesis_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[dict]:
"""Call LLM to synthesize a hypothesis from two facts."""
import urllib.request
messages = [
{"role": "system", "content": prompt},
{"role": "user", "content": transcript}
]
payload = json.dumps({
"model": model,
"messages": messages,
"temperature": 0.7, # More creative for synthesis
"max_tokens": 512
}).encode('utf-8')
req = urllib.request.Request(
f"{api_base}/chat/completions",
data=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read().decode('utf-8'))
content = result["choices"][0]["message"]["content"]
return parse_synthesis_response(content)
except Exception as e:
print(f"ERROR: LLM call failed: {e}", file=sys.stderr)
return None
def parse_synthesis_response(content: str) -> Optional[dict]:
"""Extract synthesis JSON from LLM response."""
try:
data = json.loads(content)
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
import re
json_match = re.search(r'```(?:json)?\s*({.*?})\s*```', content, re.DOTALL)
if json_match:
try:
data = json.loads(json_match.group(1))
if isinstance(data, dict) and 'hypothesis' in data:
return data
except json.JSONDecodeError:
pass
# Try finding any JSON object
json_match = re.search(r'(\{.*"hypothesis".*\})', content, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
return None
def heuristic_synthesis(f1: dict, f2: dict) -> dict:
"""Rule-based fallback synthesis when no LLM available."""
# Simple bridging: combine tags and domains
tags = list(set(f1.get('tags', []) + f2.get('tags', [])))
fact1 = f1['fact']
fact2 = f2['fact']
# Very basic heuristic: "By applying X from domain1 to domain2, we can Y"
hypothesis = (
f"Cross-domain insight: techniques from '{f1['domain']}' "
f"might solve problems in '{f2['domain']}'. "
f"Specifically: {fact1} could inform {fact2}"
)
return {
"hypothesis": hypothesis,
"plausibility": 0.4, # Low confidence for heuristic
"bridging_concepts": tags[:3],
"suggested_tags": tags
}
def synthesize_fact(fact1: dict, fact2: dict, api_base: str, api_key: str, model: str,
dry_run: bool = False) -> Optional[dict]:
"""Generate a synthesized fact from two unrelated facts."""
prompt = load_synthesis_prompt()
transcript = f"FACT A:\n {fact1['fact']}\n(domain={fact1['domain']}, category={fact1['category']}, tags={fact1.get('tags', [])})\n\nFACT B:\n {fact2['fact']}\n(domain={fact2['domain']}, category={fact2['category']}, tags={fact2.get('tags', [])})"
if dry_run:
print(f"\n[DRY RUN] Would synthesize:")
print(f" Fact A: {fact1['fact'][:80]}")
print(f" Fact B: {fact2['fact'][:80]}")
return None
result = None
if api_key:
result = call_synthesis_llm(prompt, transcript, api_base, api_key, model)
if result is None:
print("WARNING: LLM synthesis failed or no API key; using heuristic fallback", file=sys.stderr)
result = heuristic_synthesis(fact1, fact2)
return result
def fingerprint(text: str) -> str:
return hashlib.md5(text.lower().strip().encode('utf-8')).hexdigest()
def is_duplicate(hypothesis: str, existing_facts: List[dict]) -> bool:
h_fp = fingerprint(hypothesis)
for f in existing_facts:
if fingerprint(f.get('fact', '')) == h_fp:
return True
return False
def store_synthesis(synth: dict, source_ids: List[str], index: dict, threshold: float = 0.5) -> bool:
"""Store synthesized fact if plausibility exceeds threshold."""
plaus = synth.get('plausibility', 0.0)
if plaus < threshold:
print(f"Skipped: plausibility {plaus:.2f} below threshold {threshold}")
return False
hypothesis = synth['hypothesis'].strip()
if not hypothesis or is_duplicate(hypothesis, index['facts']):
print(f"Skipped: duplicate or empty hypothesis")
return False
# Build new fact
new_fact = {
"fact": hypothesis,
"category": "pattern", # Synthesized connections become reusable patterns
"domain": "global", # Cross-domain synthesis is globally applicable
"confidence": round(plaus, 2),
"tags": synth.get('suggested_tags', []),
"related": source_ids,
"first_seen": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"last_confirmed": datetime.now(timezone.utc).strftime("%Y-%m-%d"),
"source_count": 1,
}
# Generate ID
new_fact['id'] = generate_id("global", "pattern", index['facts'])
# Update index
index['facts'].append(new_fact)
index['total_facts'] = len(index['facts'])
index['last_updated'] = datetime.now(timezone.utc).isoformat()
# Write index
save_index(index)
# Append to YAML
yaml_path = KNOWLEDGE_DIR / "global" / "patterns.yaml"
yaml_path.parent.mkdir(parents=True, exist_ok=True)
mode = 'a' if yaml_path.exists() else 'w'
with open(yaml_path, mode, encoding='utf-8') as f:
if mode == 'w':
f.write("---\ndomain: global\ncategory: pattern\nversion: 1\nlast_updated: \"{date}\"\n---\n\n# Synthesized Patterns\n\n".format(date=datetime.now(timezone.utc).strftime("%Y-%m-%d")))
f.write(f"\n- id: {new_fact['id']}\n")
f.write(f" fact: \"{hypothesis}\"\n")
f.write(f" confidence: {plaus}\n")
if new_fact['tags']:
f.write(f" tags: {json.dumps(new_fact['tags'])}\n")
f.write(f" related: {json.dumps(source_ids)}\n")
f.write(f" first_seen: \"{new_fact['first_seen']}\"\n")
f.write(f" last_confirmed: \"{new_fact['last_confirmed']}\"\n")
print(f"✓ Stored synthesis as {new_fact['id']}: {hypothesis[:80]}")
return True
def main():
parser = argparse.ArgumentParser(description="Zero-shot knowledge synthesis")
parser.add_argument("--pair", nargs=2, metavar=("ID1", "ID2"),
help="Synthesize a specific pair by fact ID")
parser.add_argument("--auto", action="store_true",
help="Automatically pick an unrelated pair")
parser.add_argument("--threshold", type=float, default=0.6,
help="Plausibility threshold for storage (default: 0.6)")
parser.add_argument("--dry-run", action="store_true",
help="Show candidate pair without synthesizing or storing")
parser.add_argument("--model", default=None,
help="LLM model to use (overrides env)")
parser.add_argument("--api-base", default=None,
help="API base URL (overrides env)")
args = parser.parse_args()
# Resolve API credentials
api_base = args.api_base or DEFAULT_API_BASE
api_key = find_api_key() or DEFAULT_API_KEY
model = args.model or DEFAULT_MODEL
if not args.dry_run and not args.pair and not args.auto:
print("ERROR: Must specify either --pair ID1 ID2 or --auto", file=sys.stderr)
parser.print_help()
sys.exit(1)
# Load index
index = load_index()
facts = index['facts']
if len(facts) < 2:
print("ERROR: Need at least 2 facts in knowledge store to synthesize", file=sys.stderr)
sys.exit(1)
# Select facts
f1, f2 = None, None
if args.pair:
id1, id2 = args.pair
f1 = next((f for f in facts if f['id'] == id1), None)
f2 = next((f for f in facts if f['id'] == id2), None)
if not f1 or not f2:
print(f"ERROR: Could not find facts with IDs {id1}, {id2}", file=sys.stderr)
sys.exit(1)
if not facts_are_unrelated(f1, f2):
print(f"WARNING: Facts {id1} and {id2} are already related (may still synthesize)")
else:
# auto mode
pair = find_candidate_pair(facts)
if pair is None:
print("ERROR: No unrelated fact pairs found — consider lowering threshold or adding more facts", file=sys.stderr)
sys.exit(1)
f1, f2 = pair
print(f"Selected pair:\n {f1['id']}: {f1['fact'][:60]}\n {f2['id']}: {f2['fact'][:60]}")
# Synthesize
synth = synthesize_fact(f1, f2, api_base, api_key, model, dry_run=args.dry_run)
if synth is None:
sys.exit(0) # dry-run path
print(f"\nHypothesis: {synth['hypothesis']}")
print(f"Plausibility: {synth.get('plausibility', 0.0):.2f}")
print(f"Bridging concepts: {synth.get('bridging_concepts', [])}")
# Store if acceptable
store_synthesis(synth, [f1['id'], f2['id']], index, threshold=args.threshold)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,351 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,168 @@
#!/usr/bin/env python3
"""
Smoke test for code duplication detector — verifies:
- Function extraction from Python files
- Exact duplicate detection
- Near-duplicate detection (token similarity)
- Report generation and stats
- JSON output format
"""
import json
import sys
import tempfile
from pathlib import Path
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from code_duplication_detector import (
extract_functions_from_file,
scan_directory,
find_duplicates,
generate_report,
)
def test_extract_functions():
"""Test that function extraction works."""
with tempfile.TemporaryDirectory() as tmpdir:
test_file = Path(tmpdir) / 'sample.py'
test_file.write_text('''
def foo():
return 1
def bar():
return 2
class MyClass:
def method(self):
return 3
''')
functions = extract_functions_from_file(str(test_file))
assert len(functions) == 3, f"Expected 3 functions, got {len(functions)}"
names = {f['name'] for f in functions}
assert names == {'foo', 'bar', 'method'}, f"Names mismatch: {names}"
print(" [PASS] function extraction works")
def test_exact_duplicate_detection():
"""Test that identical functions are flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create two files with the same function
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def duplicated():
x = 1
y = 2
return x + y
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert stats['exact_dupe_count'] >= 1, f"Expected exact duplicate, got count={stats['exact_dupe_count']}"
assert len(results['exact_duplicates']) >= 1, "Should have at least one duplicate group"
print(" [PASS] exact duplicate detection works")
def test_unique_functions_not_flagged():
"""Test that different functions are not flagged as duplicates."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def func_a(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def func_b(): return 2')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
assert results['stats']['exact_dupe_count'] == 0
assert len(results['exact_duplicates']) == 0
print(" [PASS] unique functions not flagged as duplicates")
def test_duplication_percentage_calculated():
"""Test that duplication percentage is computed."""
with tempfile.TemporaryDirectory() as tmpdir:
# Create file with mostly duplicated content
f1 = Path(tmpdir) / 'a.py'
f1.write_text('''
def common():
x = 1
y = 2
return x + y
def unique1():
return 100
''')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('''
def common():
x = 1
y = 2
return x + y
def unique2():
return 200
''')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
stats = results['stats']
assert 'duplication_percentage' in stats
# 2 copies of common (6 lines), 1 unique in each (2 lines each) = 10 total
# Duplicate lines = 6 (one copy marked duplicate) → ~60%
assert stats['duplication_percentage'] > 0
print(f" [PASS] duplication percentage computed: {stats['duplication_percentage']}%")
def test_report_output_format():
"""Test that report output is valid."""
with tempfile.TemporaryDirectory() as tmpdir:
f1 = Path(tmpdir) / 'a.py'
f1.write_text('def dup(): return 1')
f2 = Path(tmpdir) / 'b.py'
f2.write_text('def dup(): return 1')
functions = scan_directory(tmpdir)
results = find_duplicates(functions)
# Text report
text = generate_report(results, output_format='text')
assert 'CODE DUPLICATION REPORT' in text
assert 'Total functions' in text
print(" [PASS] text report format valid")
# JSON report
json_out = generate_report(results, output_format='json')
data = json.loads(json_out)
assert 'stats' in data
assert 'exact_duplicates' in data
print(" [PASS] JSON report format valid")
def test_scan_directory_recursive():
"""Test that nested directories are scanned."""
with tempfile.TemporaryDirectory() as tmpdir:
subdir = Path(tmpdir) / 'sub'
subdir.mkdir()
(subdir / 'nested.py').write_text('def nested(): pass')
(Path(tmpdir) / 'root.py').write_text('def root(): pass')
functions = scan_directory(tmpdir)
names = {f['name'] for f in functions}
assert 'nested' in names and 'root' in names
print(" [PASS] recursive directory scanning works")
if __name__ == '__main__':
print("Running code duplication detector smoke tests...")
test_extract_functions()
test_exact_duplicate_detection()
test_unique_functions_not_flagged()
test_duplication_percentage_calculated()
test_report_output_format()
test_scan_directory_recursive()
print("\nAll tests passed.")

View File

@@ -1,235 +0,0 @@
#!/usr/bin/env python3
"""
Tests for knowledge_synthesizer.py — zero-shot knowledge synthesis pipeline.
Run with: python3 scripts/test_knowledge_synthesizer.py
Or via pytest: pytest scripts/test_knowledge_synthesizer.py
"""
import json
import os
import sys
import os
import tempfile
from pathlib import Path
# Add scripts dir to path for importing sibling module
SCRIPT_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(SCRIPT_DIR))
import importlib.util
spec = importlib.util.spec_from_file_location(
"ks", os.path.join(str(SCRIPT_DIR), "knowledge_synthesizer.py")
)
ks = importlib.util.module_from_spec(spec)
spec.loader.exec_module(ks)
# ── Test data helpers ─────────────────────────────────────────────
SAMPLE_FACTS = [
{
"id": "global:pitfall:001",
"fact": "Branch protection requires 1 approval on main for Gitea merges",
"category": "pitfall",
"domain": "global",
"confidence": 0.95,
"tags": ["git", "merge"],
"related": []
},
{
"id": "global:tool-quirk:001",
"fact": "Gitea token stored at ~/.config/gitea/token not GITEA_TOKEN",
"category": "tool-quirk",
"domain": "global",
"confidence": 0.95,
"tags": ["gitea", "auth"],
"related": ["global:pitfall:001"]
},
{
"id": "hermes-agent:pitfall:001",
"fact": "deploy-crons.py leaves jobs in mixed model format",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.95,
"tags": ["cron"],
"related": []
},
]
def make_index(facts, tmp_dir: Path) -> Path:
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": len(facts),
"facts": facts,
}
path = tmp_dir / "index.json"
with open(path, "w") as f:
json.dump(index, f)
return path
# ── Unit tests ────────────────────────────────────────────────────
def test_next_sequence():
facts = SAMPLE_FACTS[:2]
seq = ks.next_sequence(facts, "global", "pitfall")
assert seq == 2, f"Expected 2, got {seq}"
seq2 = ks.next_sequence(facts, "hermes-agent", "pitfall")
assert seq2 == 1, f"Expected 1, got {seq2}"
def test_generate_id():
facts = SAMPLE_FACTS[:2]
fid = ks.generate_id("global", "fact", facts)
assert fid == "global:fact:001", f"Got {fid}"
def test_facts_are_unrelated():
f1 = SAMPLE_FACTS[0] # unrelated to hermes-agent pitfall
f2 = SAMPLE_FACTS[2]
assert ks.facts_are_unrelated(f1, f2) is True
f3 = SAMPLE_FACTS[1] # related to f1
assert ks.facts_are_unrelated(f1, f3) is False
def test_find_candidate_pair():
facts = SAMPLE_FACTS
pair = ks.find_candidate_pair(facts)
assert pair is not None, "Should find an unrelated pair"
f1, f2 = pair
assert ks.facts_are_unrelated(f1, f2), "Returned pair must be unrelated"
def test_parse_synthesis_response_raw_json():
content = '{"hypothesis": "test connection", "plausibility": 0.8, "bridging_concepts": ["x"], "suggested_tags": ["a"]}'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "test connection"
assert result["plausibility"] == 0.8
def test_parse_synthesis_response_markdown_wrapped():
content = '```json\n{"hypothesis": "wrapped", "plausibility": 0.5}\n```'
result = ks.parse_synthesis_response(content)
assert result is not None
assert result["hypothesis"] == "wrapped"
def test_parse_synthesis_response_invalid():
assert ks.parse_synthesis_response("not json") is None
assert ks.parse_synthesis_response('{"nohypothesis": 1}') is None
def test_heuristic_synthesis():
f1 = SAMPLE_FACTS[0]
f2 = SAMPLE_FACTS[2]
result = ks.heuristic_synthesis(f1, f2)
assert "hypothesis" in result
assert "plausibility" in result
assert result["plausibility"] == 0.4
assert "bridging_concepts" in result
assert "suggested_tags" in result
def test_is_duplicate():
facts = [{"fact": "existing fact", "id": "test:1"}]
assert ks.is_duplicate("existing fact", facts) is True
assert ks.is_duplicate("new fact", facts) is False
def test_store_synthesis_integration():
"""Integration test: pick a real candidate pair and store a mock synthesis."""
with tempfile.TemporaryDirectory() as tmp:
tmp_path = Path(tmp)
# Create fake knowledge dir with index
kdir = tmp_path / "knowledge"
kdir.mkdir()
index = {
"version": 1,
"last_updated": "2026-04-13T20:00:00Z",
"total_facts": 3,
"facts": SAMPLE_FACTS
}
with open(kdir / "index.json", "w") as f:
json.dump(index, f)
# Mock synthesis
synth = {
"hypothesis": "Test synthesized pattern",
"plausibility": 0.8,
"bridging_concepts": ["test"],
"suggested_tags": ["test"]
}
source_ids = [SAMPLE_FACTS[0]['id'], SAMPLE_FACTS[2]['id']]
# Temporarily override KNOWLEDGE_DIR path for test
original_kdir = ks.KNOWLEDGE_DIR
ks.KNOWLEDGE_DIR = kdir
try:
stored = ks.store_synthesis(synth, source_ids, index, threshold=0.5)
assert stored is True
assert index['total_facts'] == 4
new_fact = index['facts'][-1]
assert new_fact['fact'] == "Test synthesized pattern"
assert new_fact['category'] == "pattern"
assert new_fact['domain'] == "global"
assert new_fact['related'] == source_ids
assert new_fact['id'].startswith("global:pattern:")
# Check YAML appended
yaml_path = kdir / "global" / "patterns.yaml"
assert yaml_path.exists()
content = yaml_path.read_text()
assert "Test synthesized pattern" in content
finally:
ks.KNOWLEDGE_DIR = original_kdir
# ── Smoke test ────────────────────────────────────────────────────
def test_smoke_synthesizer_info():
"""Sanity check: script can at least load and report current knowledge state."""
index = ks.load_index()
total = index.get('total_facts', 0)
facts = index.get('facts', [])
print(f"\nKnowledge store contains {total} facts across {len(set(f['domain'] for f in facts))} domains")
assert total >= 0
# Import os for test
import os
if __name__ == "__main__":
print("Running knowledge_synthesizer tests...\n")
passed = 0
failed = 0
tests = [
test_next_sequence,
test_generate_id,
test_facts_are_unrelated,
test_find_candidate_pair,
test_parse_synthesis_response_raw_json,
test_parse_synthesis_response_markdown_wrapped,
test_parse_synthesis_response_invalid,
test_heuristic_synthesis,
test_is_duplicate,
test_store_synthesis_integration,
test_smoke_synthesizer_info,
]
for test in tests:
try:
test()
print(f"{test.__name__}")
passed += 1
except Exception as e:
import traceback; traceback.print_exc(); print(f"{test.__name__}: {e}")
failed += 1
print(f"\n{passed} passed, {failed} failed")
sys.exit(0 if failed == 0 else 1)

View File

@@ -0,0 +1,170 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,47 +0,0 @@
# Knowledge Synthesis Prompt
## System Prompt
You are a knowledge synthesis engine. Given two facts, you generate a novel hypothesis
that connects them in a way no human would typically link — a zero-shot creative leap.
## Task
FACT A:
{fact_a}
FACT B:
{fact_b}
Generate a single JSON object:
{
"hypothesis": "one concise sentence linking the two facts as a new, testable insight",
"plausibility": 0.0-1.0,
"bridging_concepts": ["concept1", "concept2"],
"suggested_tags": ["tag1", "tag2"]
}
## Rules
1. The hypothesis must be a logical consequence of combining both facts.
2. DO NOT restate either fact — produce genuinely new insight.
3. Plausibility should reflect confidence given only these two facts.
4. If no meaningful connection exists, return {"hypothesis":"","plausibility":0.0}.
5. Output ONLY valid JSON — no markdown, no explanation.
## Examples
Input facts:
- "Gitea PR creation requires branch protection approval (1+) on main"
- "Git push hangs on large repos (pack.windowMemory=100m)"
Hypothesis output:
{
"hypothesis": "Branch protection triggers checks that inflate pack size, causing git push to hang on large repos",
"plausibility": 0.65,
"bridging_concepts": ["git", "gitea", "branch-protection", "push"],
"suggested_tags": ["git", "gitea", "performance"]
}
Output ONLY the JSON object.