Compare commits

..

1 Commits

Author SHA1 Message Date
STEP35 Claude Code
44607f8484 feat: add dependency freshness checker — issue #161
Some checks failed
Test / pytest (pull_request) Failing after 8s
Implements scripts/dependency_freshness.py which compares installed
dependencies against latest PyPI versions and flags packages that are
more than 2 major versions behind. Includes comprehensive tests in
scripts/test_dependency_freshness.py.

Closes #161
2026-04-26 09:58:30 -04:00
6 changed files with 450 additions and 950 deletions

View File

@@ -0,0 +1,271 @@
#!/usr/bin/env python3
"""dependency_freshness.py - Compare installed dependencies against latest PyPI versions.
Identify packages that are more than 2 major versions behind.
Outputs a human-readable report by default or JSON with --json flag.
"""
import argparse
import json
import subprocess
import sys
from packaging import version
from typing import Dict, List, Tuple
def parse_requirements(requirements_path: str) -> List[str]:
"""Parse package names from a requirements.txt file."""
packages = []
try:
with open(requirements_path, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
pkg_name = line
for delim in ['[', '>', '<', '=', '!', ';', '@']:
if delim in pkg_name:
pkg_name = pkg_name.split(delim)[0]
pkg_name = pkg_name.strip()
if pkg_name:
packages.append(pkg_name.lower())
except FileNotFoundError:
print(f"Warning: requirements file not found: {requirements_path}", file=sys.stderr)
return packages
def get_installed_packages() -> Dict[str, str]:
"""Get all installed packages via pip list --format=json."""
try:
result = subprocess.run(
[sys.executable, '-m', 'pip', 'list', '--format=json'],
capture_output=True, text=True, check=True
)
packages = json.loads(result.stdout)
return {pkg['name'].lower(): pkg['version'] for pkg in packages}
except subprocess.CalledProcessError as e:
print(f"Error running pip list: {e}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing pip output: {e}", file=sys.stderr)
sys.exit(1)
def get_outdated_packages() -> Dict[str, dict]:
"""Get outdated packages via pip list --outdated --format=json."""
try:
result = subprocess.run(
[sys.executable, '-m', 'pip', 'list', '--outdated', '--format=json'],
capture_output=True, text=True, check=True
)
outdated_list = json.loads(result.stdout)
outdated = {}
for pkg in outdated_list:
name = pkg['name'].lower()
outdated[name] = {
'installed': pkg.get('version', ''),
'latest': pkg.get('latest_version', ''),
'latest_filetype': pkg.get('latest_filetype', '')
}
return outdated
except subprocess.CalledProcessError as e:
print(f"Error running pip list --outdated: {e}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing pip outdated output: {e}", file=sys.stderr)
sys.exit(1)
def get_major_version(v: str) -> int:
"""Extract major version number from a version string."""
try:
parsed = version.parse(v)
if hasattr(parsed, 'major'):
return int(parsed.major)
parts = str(v).split('.')
if parts:
return int(parts[0])
except Exception:
pass
return 0
def is_more_than_two_majors_behind(installed_ver: str, latest_ver: str) -> bool:
"""Check if installed version is more than 2 major versions behind latest."""
try:
installed_major = get_major_version(installed_ver)
latest_major = get_major_version(latest_ver)
return (latest_major - installed_major) > 2
except Exception:
return False
def analyze_dependencies(
required_packages: List[str],
installed_packages: Dict[str, str],
outdated_packages: Dict[str, dict]
) -> Tuple[List[dict], List[str], List[dict]]:
"""Analyze dependency freshness."""
very_outdated = []
missing = []
outdated_but_not_critical = []
for pkg in required_packages:
if pkg not in installed_packages:
missing.append(pkg)
continue
installed_ver = installed_packages[pkg]
if pkg not in outdated_packages:
continue
latest_ver = outdated_packages[pkg]['latest']
if is_more_than_two_majors_behind(installed_ver, latest_ver):
very_outdated.append({
'package': pkg,
'installed': installed_ver,
'latest': latest_ver,
'major_diff': get_major_version(latest_ver) - get_major_version(installed_ver)
})
else:
outdated_but_not_critical.append({
'package': pkg,
'installed': installed_ver,
'latest': latest_ver,
'major_diff': get_major_version(latest_ver) - get_major_version(installed_ver)
})
return very_outdated, missing, outdated_but_not_critical
def generate_human_report(
very_outdated: List[dict],
missing: List[str],
outdated_but_not_critical: List[dict],
requirements_path: str
) -> str:
"""Generate a human-readable staleness report."""
lines = []
lines.append("=" * 60)
lines.append("DEPENDENCY FRESHNESS REPORT")
lines.append("=" * 60)
lines.append(f"Requirements file: {requirements_path}")
total = len(very_outdated) + len(missing) + len(outdated_but_not_critical)
lines.append(f"Total dependencies checked: {total}")
lines.append(f"Very outdated (>2 major versions behind): {len(very_outdated)}")
lines.append(f"Outdated but within 2 major versions: {len(outdated_but_not_critical)}")
lines.append(f"Missing (not installed): {len(missing)}")
lines.append("")
if very_outdated:
lines.append("!!! VERY OUTDATED PACKAGES (consider updating):")
lines.append("-" * 60)
for pkg_info in very_outdated:
lines.append(f" {pkg_info['package']}")
lines.append(f" Installed: {pkg_info['installed']}")
lines.append(f" Latest: {pkg_info['latest']}")
lines.append(f" Major diff: {pkg_info['major_diff']}")
lines.append("")
else:
lines.append("✓ No packages more than 2 major versions behind.")
lines.append("")
if outdated_but_not_critical:
lines.append(f"Outdated packages (within 2 major versions):")
lines.append("-" * 60)
for pkg_info in outdated_but_not_critical:
lines.append(f" {pkg_info['package']}: {pkg_info['installed']} -> {pkg_info['latest']} (major diff: {pkg_info['major_diff']})")
lines.append("")
if missing:
lines.append(f"Missing packages (not installed):")
lines.append("-" * 60)
for pkg in missing:
lines.append(f" {pkg}")
lines.append("")
lines.append("=" * 60)
lines.append("For full details, run: python3 -m pip list --outdated")
lines.append("=" * 60)
return "\n".join(lines)
def generate_json_report(
very_outdated: List[dict],
missing: List[str],
outdated_but_not_critical: List[dict],
requirements_path: str
) -> str:
"""Generate a JSON staleness report."""
report = {
'requirements_file': requirements_path,
'summary': {
'total_dependencies': len(very_outdated) + len(missing) + len(outdated_but_not_critical),
'very_outdated_count': len(very_outdated),
'outdated_within_threshold_count': len(outdated_but_not_critical),
'missing_count': len(missing)
},
'very_outdated': very_outdated,
'outdated_within_threshold': outdated_but_not_critical,
'missing': missing
}
return json.dumps(report, indent=2)
def main():
parser = argparse.ArgumentParser(
description='Check dependency freshness against PyPI latest versions.'
)
parser.add_argument(
'--requirements', '-r',
default='requirements.txt',
help='Path to requirements.txt file (default: requirements.txt)'
)
parser.add_argument(
'--json',
action='store_true',
help='Output report as JSON instead of human-readable text'
)
parser.add_argument(
'--output', '-o',
help='Optional output file for the report (default: stdout)'
)
args = parser.parse_args()
# Parse requirements
required_packages = parse_requirements(args.requirements)
if not required_packages:
print("No packages found in requirements file.", file=sys.stderr)
sys.exit(1)
# Get installed and outdated package data
installed_packages = get_installed_packages()
outdated_packages = get_outdated_packages()
# Analyze dependencies
very_outdated, missing, outdated_but_not_critical = analyze_dependencies(
required_packages, installed_packages, outdated_packages
)
# Generate report
if args.json:
report = generate_json_report(very_outdated, missing, outdated_but_not_critical, args.requirements)
else:
report = generate_human_report(very_outdated, missing, outdated_but_not_critical, args.requirements)
# Output report
if args.output:
with open(args.output, 'w') as f:
f.write(report + '\n')
else:
print(report)
# Exit code: 0 if no very outdated deps, 1 otherwise
exit_code = 1 if very_outdated else 0
sys.exit(exit_code)
if __name__ == '__main__':
main()

View File

@@ -1,255 +0,0 @@
#!/usr/bin/env python3
"""
knowledge_to_training_pairs.py — Convert quality-gated knowledge entries into training pairs.
Reads knowledge/index.json (or a custom JSONL of entries), applies quality filters,
and emits terse→rich training pairs in JSONL format for model fine-tuning.
Usage:
python3 scripts/knowledge_to_training_pairs.py \
--input knowledge/index.json \
--output training_pairs.jsonl \
--min-confidence 0.7 \
--model-filter claude-sonnet,gpt-4 \
--after 2026-01-01
Input entry format (from index.json facts):
{
"id": "hermes-agent:pitfall:001",
"fact": "deploy-crons.py leaves jobs in mixed model format",
"category": "pitfall",
"domain": "hermes-agent",
"confidence": 0.95,
...
}
Output training pair format:
{
"terse": "How do I handle deploy-crons.py mixed model format?",
"rich": "deploy-crons.py leaves jobs in mixed model format.",
"domain": "hermes-agent",
"source_confidence": 0.95,
"source_model": "unknown"
}
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
def fact_to_terse(fact: str, category: str, domain: str) -> str:
"""
Derive a short user query from a knowledge fact.
Strategy:
- Pitfalls → "How do I avoid/handle/fix <fact excerpt>?"
- Patterns → "What's the recommended way to <pattern core>?"
- Tool quirks → "How does <tool> behave in <context>?"
- Facts → "What should I know about <fact excerpt>?"
- Questions → "What is the answer to: <fact>?"
"""
fact_lower = fact.lower()
# Extract a concise excerpt (first sentence or 80 chars)
excerpt = fact.split('. ')[0] if '. ' in fact else fact[:80]
if category == "pitfall":
verbs = ["avoid", "handle", "fix", "prevent"]
# pick verb based on fact wording
if "trigger" in fact_lower or "cause" in fact_lower:
verb = "avoid"
elif "broken" in fact_lower or "fails" in fact_lower:
verb = "fix"
else:
verb = "handle"
return f"How do I {verb} {excerpt.rstrip('.')}?"
elif category == "pattern":
return f"What's the recommended way to {excerpt.rstrip('.')}?"
elif category == "tool-quirk":
# Try to extract tool name
tool = fact.split()[0] if fact.split() else domain
return f"How does {tool} behave in this context?"
elif category == "question":
return f"What is the answer to: {excerpt}?"
else: # fact or unknown
return f"What should I know about {excerpt.rstrip('.')}?"
def parse_date(date_str: Optional[str]) -> Optional[datetime]:
"""Parse ISO date string to datetime, or return None."""
if not date_str:
return None
try:
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
except ValueError:
return None
def load_knowledge_index(path: str) -> list[dict]:
"""Load knowledge facts from index.json (or plain JSONL of entries)."""
p = Path(path)
if not p.exists():
print(f"ERROR: Knowledge input not found: {path}", file=sys.stderr)
sys.exit(1)
with open(p) as f:
data = json.load(f)
# index.json format: {"facts": [...], ...}
if isinstance(data, dict) and "facts" in data:
return data["facts"]
# JSONL format: one entry per line
if isinstance(data, list):
return data
# Plain file with JSON array
print(f"ERROR: Unrecognized input format in {path}", file=sys.stderr)
sys.exit(1)
def filter_entries(entries: list[dict],
min_confidence: float = 0.0,
model_filter: Optional[list[str]] = None,
after: Optional[datetime] = None,
before: Optional[datetime] = None) -> list[dict]:
"""Apply quality and provenance filters."""
filtered = []
for entry in entries:
# Confidence filter (entry confidence)
conf = entry.get("confidence", 0.0)
if conf < min_confidence:
continue
# Model filter: if specified, entry's model must be in the list
if model_filter:
entry_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown"))
if entry_model not in model_filter:
continue
# Date filter: use last_confirmed or first_seen or harvested_at
entry_date = None
for field in ("last_confirmed", "first_seen", "harvested_at"):
if field in entry:
entry_date = parse_date(entry[field])
if entry_date:
break
if after and entry_date and entry_date < after:
continue
if before and entry_date and entry_date > before:
continue
filtered.append(entry)
return filtered
def entry_to_pair(entry: dict) -> dict:
"""Convert a knowledge entry into a training pair."""
fact = entry.get("fact", "").strip()
if not fact:
return None
category = entry.get("category", "fact")
domain = entry.get("domain", "global")
terse = fact_to_terse(fact, category, domain)
rich = fact
source_confidence = round(entry.get("confidence", 0.0), 4)
source_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown"))
return {
"terse": terse,
"rich": rich,
"domain": domain,
"source_confidence": source_confidence,
"source_model": source_model,
}
def main():
parser = argparse.ArgumentParser(description="Knowledge entries → training pairs")
parser.add_argument("--input", "-i", default="knowledge/index.json",
help="Input knowledge index or JSONL (default: knowledge/index.json)")
parser.add_argument("--output", "-o", default="training_pairs.jsonl",
help="Output JSONL file")
parser.add_argument("--min-confidence", type=float, default=0.5,
help="Minimum entry confidence to include (0.0-1.0, default: 0.5)")
parser.add_argument("--model-filter",
help="Comma-separated list of source models to include")
parser.add_argument("--after",
help="Include entries last_confirmed/first_seen on or after this date (YYYY-MM-DD)")
parser.add_argument("--before",
help="Include entries last_confirmed/first_seen on or before this date (YYYY-MM-DD)")
parser.add_argument("--dry-run", action="store_true",
help="Print sample pairs and stats without writing")
args = parser.parse_args()
# Load
entries = load_knowledge_index(args.input)
print(f"Loaded {len(entries)} entries from {args.input}", file=sys.stderr)
# Parse filters
model_list = args.model_filter.split(",") if args.model_filter else None
after_dt = parse_date(args.after) if args.after else None
before_dt = parse_date(args.before) if args.before else None
# Filter
kept = filter_entries(
entries,
min_confidence=args.min_confidence,
model_filter=model_list,
after=after_dt,
before=before_dt,
)
print(f"After filtering: {len(kept)} / {len(entries)} entries", file=sys.stderr)
# Convert
pairs = []
for entry in kept:
pair = entry_to_pair(entry)
if pair:
pairs.append(pair)
# Stats
if pairs:
avg_conf = sum(p["source_confidence"] for p in pairs) / len(pairs)
domains = {}
models = {}
for p in pairs:
domains[p["domain"]] = domains.get(p["domain"], 0) + 1
models[p["source_model"]] = models.get(p["source_model"], 0) + 1
else:
avg_conf = 0.0
domains = {}
models = {}
stats = {
"input_entries": len(entries),
"after_filter": len(kept),
"pairs_generated": len(pairs),
"avg_confidence": round(avg_conf, 4),
"domains": domains,
"source_models": models,
}
print(json.dumps(stats, indent=2), file=sys.stderr)
if args.dry_run:
print("\nSample pairs:", file=sys.stderr)
for p in pairs[:3]:
print(json.dumps(p, ensure_ascii=False), file=sys.stderr)
return
# Write JSONL
out_path = Path(args.output)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
for pair in pairs:
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
print(f"\nWrote {len(pairs)} training pairs to {out_path}", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,179 @@
#!/usr/bin/env python3
"""Tests for scripts/dependency_freshness.py — 9.7 Dependency Freshness."""
import json
import os
import sys
from unittest.mock import patch, MagicMock
# Import target module
sys.path.insert(0, os.path.dirname(__file__) or ".")
import importlib.util
spec = importlib.util.spec_from_file_location(
"dependency_freshness",
os.path.join(os.path.dirname(__file__) or ".", "dependency_freshness.py")
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
parse_requirements = mod.parse_requirements
get_major_version = mod.get_major_version
is_more_than_two_majors_behind = mod.is_more_than_two_majors_behind
analyze_dependencies = mod.analyze_dependencies
def test_parse_requirements_simple():
"""Parse a simple package line."""
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("requests\n")
tmp = f.name
try:
pkgs = parse_requirements(tmp)
assert pkgs == ["requests"], f"got {pkgs}"
print("PASS: test_parse_requirements_simple")
finally:
os.unlink(tmp)
def test_parse_requirements_with_specifiers():
"""Parse lines with version specifiers."""
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("pytest>=8,<9\n")
f.write("aiohttp>=3.8\n")
tmp = f.name
try:
pkgs = parse_requirements(tmp)
assert pkgs == ["pytest", "aiohttp"], f"got {pkgs}"
print("PASS: test_parse_requirements_with_specifiers")
finally:
os.unlink(tmp)
def test_parse_requirements_ignores_comments_and_blanks():
"""Comments and blank lines are skipped."""
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
f.write("# This is a comment\n")
f.write("\n")
f.write(" \n")
f.write("numpy\n")
f.write("# another comment\n")
tmp = f.name
try:
pkgs = parse_requirements(tmp)
assert pkgs == ["numpy"], f"got {pkgs}"
print("PASS: test_parse_requirements_ignores_comments_and_blanks")
finally:
os.unlink(tmp)
def test_get_major_version_normal():
"""Extract major version from typical semantic strings."""
assert get_major_version("1.2.3") == 1
assert get_major_version("3.4.5") == 3
assert get_major_version("0.11.0") == 0
print("PASS: test_get_major_version_normal")
def test_get_major_version_with_rc():
"""Prerelease versions still yield major number."""
assert get_major_version("2.0.0rc1") == 2
assert get_major_version("1.0.0a1") == 1
print("PASS: test_get_major_version_with_rc")
def test_is_more_than_two_majors_behind():
"""Difference >2 triggers True; <=2 triggers False."""
assert is_more_than_two_majors_behind("1.2.3", "4.0.0") is True
assert is_more_than_two_majors_behind("3.9.0", "4.0.0") is False
assert is_more_than_two_majors_behind("2.1.0", "5.2.0") is True
assert is_more_than_two_majors_behind("8.0.0", "9.0.0") is False
assert is_more_than_two_majors_behind("4.0.0", "4.0.0") is False
print("PASS: test_is_more_than_two_majors_behind")
def test_analyze_dependencies_very_outdated():
"""Flag packages more than 2 major versions behind."""
required = ["pkg_a", "pkg_b"]
installed = {"pkg_a": "1.0.0", "pkg_b": "3.5.2"}
outdated = {
"pkg_a": {"installed": "1.0.0", "latest": "4.0.0"},
"pkg_b": {"installed": "3.5.2", "latest": "4.0.0"},
}
very_out, missing, outdated_ok = analyze_dependencies(required, installed, outdated)
assert len(very_out) == 1 and very_out[0]["package"] == "pkg_a"
assert len(missing) == 0
assert len(outdated_ok) == 1 and outdated_ok[0]["package"] == "pkg_b"
print("PASS: test_analyze_dependencies_very_outdated")
def test_analyze_dependencies_missing():
"""Detect packages not installed at all."""
required = ["pkg_a", "pkg_missing"]
installed = {"pkg_a": "2.0.0"}
outdated = {"pkg_a": {"installed": "2.0.0", "latest": "3.0.0"}}
very_out, missing, outdated_ok = analyze_dependencies(required, installed, outdated)
assert "pkg_missing" in missing
assert len(very_out) == 0
assert len(outdated_ok) == 1
print("PASS: test_analyze_dependencies_missing")
def test_analyze_dependencies_up_to_date():
"""Packages up-to-date are not flagged."""
required = ["pkg_good"]
installed = {"pkg_good": "5.0.0"}
outdated = {}
very_out, missing, outdated_ok = analyze_dependencies(required, installed, outdated)
assert len(very_out) == 0
assert len(missing) == 0
assert len(outdated_ok) == 0
print("PASS: test_analyze_dependencies_up_to_date")
def test_generate_human_report_contains_very_outdated():
"""Human report includes very outdated packages."""
very_out = [
{"package": "oldpkg", "installed": "1.0", "latest": "4.0", "major_diff": 3}
]
missing = []
outdated_ok = []
report = mod.generate_human_report(very_out, missing, outdated_ok, "requirements.txt")
assert "oldpkg" in report
assert "Installed: 1.0" in report
assert "Latest: 4.0" in report
assert "Major diff: 3" in report
print("PASS: test_generate_human_report_contains_very_outdated")
def test_generate_json_report_structure():
"""JSON report contains required keys."""
very_out = [{"package": "oldpkg", "installed": "1.0", "latest": "4.0", "major_diff": 3}]
missing = ["missing_pkg"]
outdated_ok = []
report_json = mod.generate_json_report(very_out, missing, outdated_ok, "requirements.txt")
data = json.loads(report_json)
assert "summary" in data
assert data["summary"]["very_outdated_count"] == 1
assert data["summary"]["missing_count"] == 1
assert "very_outdated" in data
assert "missing" in data
print("PASS: test_generate_json_report_structure")
if __name__ == '__main__':
print("Running dependency_freshness test suite...")
test_parse_requirements_simple()
test_parse_requirements_with_specifiers()
test_parse_requirements_ignores_comments_and_blanks()
test_get_major_version_normal()
test_get_major_version_with_rc()
test_is_more_than_two_majors_behind()
test_analyze_dependencies_very_outdated()
test_analyze_dependencies_missing()
test_analyze_dependencies_up_to_date()
test_generate_human_report_contains_very_outdated()
test_generate_json_report_structure()
print("ALL TESTS PASSED.")

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,174 +0,0 @@
#!/usr/bin/env python3
"""
Smoke tests for knowledge_to_training_pairs.py
Tests:
- Output is valid JSONL
- Each line has required fields (terse, rich, domain, source_confidence, source_model)
- Confidence values are in [0,1]
- Terse is non-empty and reasonably short (< 200 chars)
- Rich matches the original fact
"""
import json
import sys
import os
import tempfile
from pathlib import Path
# Add scripts dir to path for imports
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
sys.path.insert(0, str(SCRIPT_DIR))
from knowledge_to_training_pairs import (
fact_to_terse,
filter_entries,
entry_to_pair,
parse_date,
)
def test_fact_to_terse_pitfall():
fact = "deploy-crons.py leaves jobs in mixed model format"
category = "pitfall"
domain = "hermes-agent"
terse = fact_to_terse(fact, category, domain)
assert terse.startswith("How do I")
assert "?" in terse
assert len(terse) < 150
print("PASS: test_fact_to_terse_pitfall")
def test_fact_to_terse_fact():
fact = "Python is a high-level programming language"
terse = fact_to_terse(fact, "fact", "global")
assert terse.startswith("What should I know about")
assert "?" in terse
print("PASS: test_fact_to_terse_fact")
def test_fact_to_terse_pattern():
fact = "Use sparse checkout for large repos"
terse = fact_to_terse(fact, "pattern", "devops")
assert "recommended way" in terse or "best way" in terse
print("PASS: test_fact_to_terse_pattern")
def test_entry_to_pair_structure():
entry = {
"id": "test:001",
"fact": "Test fact text.",
"category": "fact",
"domain": "test-domain",
"confidence": 0.85,
"model": "test-model",
}
pair = entry_to_pair(entry)
assert pair is not None
assert "terse" in pair
assert "rich" in pair
assert "domain" in pair
assert "source_confidence" in pair
assert "source_model" in pair
assert pair["rich"] == "Test fact text."
assert pair["domain"] == "test-domain"
assert 0.0 <= pair["source_confidence"] <= 1.0
print("PASS: test_entry_to_pair_structure")
def test_filter_by_confidence():
entries = [
{"fact": "A", "confidence": 0.9},
{"fact": "B", "confidence": 0.4},
{"fact": "C", "confidence": 0.6},
]
filtered = filter_entries(entries, min_confidence=0.5)
assert len(filtered) == 2
assert all(e["confidence"] >= 0.5 for e in filtered)
print("PASS: test_filter_by_confidence")
def test_filter_by_model():
entries = [
{"fact": "A", "model": "claude-sonnet"},
{"fact": "B", "model": "gpt-4"},
{"fact": "C", "model": "unknown"},
]
filtered = filter_entries(entries, model_filter=["claude-sonnet", "gpt-4"])
assert len(filtered) == 2
assert all(e["model"] in ("claude-sonnet", "gpt-4") for e in filtered)
print("PASS: test_filter_by_model")
def test_filter_by_date():
entries = [
{"fact": "A", "last_confirmed": "2026-04-10"},
{"fact": "B", "last_confirmed": "2026-03-01"},
{"fact": "C", "first_seen": "2026-04-15"},
]
after_dt = parse_date("2026-04-01")
filtered = filter_entries(entries, after=after_dt)
assert len(filtered) == 2
print("PASS: test_filter_by_date")
def test_end_to_end_jsonl_output():
"""Integration test: run the script and verify JSONL validity."""
import subprocess
repo_dir = SCRIPT_DIR.parent
result = subprocess.run(
["python3", "scripts/knowledge_to_training_pairs.py", "--dry-run"],
capture_output=True, text=True, cwd=repo_dir
)
assert result.returncode == 0
stderr = result.stderr.strip()
# The stats JSON object is at the top of stderr. Find its bounds via brace matching.
start = stderr.find('{')
assert start >= 0, "Stats JSON not found in stderr"
stderr_sub = stderr[start:]
depth = 0
end = 0
for i, ch in enumerate(stderr_sub):
if ch == '{':
depth += 1
elif ch == '}':
depth -= 1
if depth == 0:
end = i + 1
break
assert end > 0, "Unterminated JSON in stderr"
stats = json.loads(stderr_sub[:end])
assert stats["input_entries"] > 0
assert stats["pairs_generated"] > 0
print("PASS: test_end_to_end_jsonl_output")
def test_terse_length_constraint():
"""Terse should be reasonably short for training."""
# Sample facts from actual knowledge
test_facts = [
("deploy-crons.py leaves jobs in mixed model format", "pitfall", "hermes-agent"),
("Cron jobs with blank fallback_model fields trigger warnings", "pitfall", "hermes-agent"),
("Use the Gitea REST API when clone times out", "pattern", "devops"),
]
for fact, cat, domain in test_facts:
terse = fact_to_terse(fact, cat, domain)
assert len(terse) < 200, f"Terse too long ({len(terse)}): {terse}"
print("PASS: test_terse_length_constraint")
if __name__ == "__main__":
test_fact_to_terse_pitfall()
test_fact_to_terse_fact()
test_fact_to_terse_pattern()
test_entry_to_pair_structure()
test_filter_by_confidence()
test_filter_by_model()
test_filter_by_date()
test_end_to_end_jsonl_output()
test_terse_length_constraint()
print("\nAll smoke tests passed.")