Compare commits
2 Commits
step35/103
...
step35/140
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c75bd5094f | ||
|
|
4b5a675355 |
16
knowledge/global/citations.yaml
Normal file
16
knowledge/global/citations.yaml
Normal file
@@ -0,0 +1,16 @@
|
||||
# Key Papers to Track
|
||||
# Configuration for citation_tracker.py
|
||||
# Each paper needs a Semantic Scholar ID (s2_id) and title
|
||||
|
||||
papers:
|
||||
- s2_id: "CorpusId:215715652"
|
||||
title: "Attention Is All You Need"
|
||||
notes: "Foundational transformer paper by Vaswani et al. (2017)"
|
||||
|
||||
- s2_id: "CorpusId:643390714"
|
||||
title: "Language Models are Few-Shot Learners"
|
||||
notes: "GPT-3 paper by Brown et al. (2020)"
|
||||
|
||||
- s2_id: "arXiv:2303.18247"
|
||||
title: "Sovereign Intelligence: Local-First AI Agents"
|
||||
notes: "Timmy architecture paper (placeholder - update when published)"
|
||||
235
scripts/citation_tracker.py
Executable file
235
scripts/citation_tracker.py
Executable file
@@ -0,0 +1,235 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Citation Tracker — Monitor citations of key papers.
|
||||
Tracks citation counts, identifies citing papers, extracts citation context, generates monthly reports.
|
||||
|
||||
Issue: #140 (7.8)
|
||||
Categories: fact, pattern
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any, Optional
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
KNOWLEDGE_DIR = SCRIPT_DIR.parent / "knowledge"
|
||||
METRICS_DIR = SCRIPT_DIR.parent / "metrics"
|
||||
INDEX_PATH = KNOWLEDGE_DIR / "index.json"
|
||||
|
||||
# Semantic Scholar API (free, no key required for basic lookups)
|
||||
S2_API_BASE = "https://api.semanticscholar.org/graph/v1"
|
||||
|
||||
def fetch_paper(s2_id: str) -> Optional[Dict]:
|
||||
"""Fetch paper metadata from Semantic Scholar."""
|
||||
url = f"{S2_API_BASE}/paper/{s2_id}?fields=title,year,citationCount,externalIds,publicationVenue,publicationTypes"
|
||||
try:
|
||||
with urllib.request.urlopen(url, timeout=10) as resp:
|
||||
return json.loads(resp.read())
|
||||
except (urllib.error.HTTPError, urllib.error.URLError) as e:
|
||||
print(f"Warning: Failed to fetch {s2_id}: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def fetch_citations(s2_id: str, limit: int = 50) -> List[Dict]:
|
||||
"""Fetch recent citing papers from Semantic Scholar."""
|
||||
url = f"{S2_API_BASE}/paper/{s2_id}/citations?fields=title,year,authors,publicationVenue,publicationTypes&limit={limit}"
|
||||
try:
|
||||
with urllib.request.urlopen(url, timeout=15) as resp:
|
||||
data = json.loads(resp.read())
|
||||
return [c["citingPaper"] for c in data.get("data", [])]
|
||||
except (urllib.error.HTTPError, urllib.error.URLError) as e:
|
||||
print(f"Warning: Failed to fetch citations for {s2_id}: {e}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
def load_key_papers() -> List[Dict]:
|
||||
"""Load key papers list from citations.yaml."""
|
||||
config_path = KNOWLEDGE_DIR / "global" / "citations.yaml"
|
||||
if not config_path.exists():
|
||||
print(f"Error: {config_path} not found. Create it with key papers list.", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
import yaml
|
||||
with open(config_path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
papers = []
|
||||
for entry in data.get("papers", []):
|
||||
papers.append({
|
||||
"id": entry["s2_id"],
|
||||
"title": entry.get("title", "Unknown"),
|
||||
"notes": entry.get("notes", "")
|
||||
})
|
||||
return papers
|
||||
|
||||
def load_index() -> Dict:
|
||||
"""Load or initialize knowledge index."""
|
||||
if INDEX_PATH.exists():
|
||||
with open(INDEX_PATH) as f:
|
||||
return json.load(f)
|
||||
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
|
||||
|
||||
def save_index(index: Dict) -> None:
|
||||
"""Save knowledge index."""
|
||||
KNOWLEDGE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with open(INDEX_PATH, "w") as f:
|
||||
json.dump(index, f, indent=2)
|
||||
|
||||
def add_citation_fact(index: Dict, fact: str, repo: str, confidence: float,
|
||||
tags: List[str], source_count: int = 1) -> None:
|
||||
"""Add a new citation fact to the index."""
|
||||
# Determine next sequence number for citation:facts in this domain
|
||||
domain = "global"
|
||||
category = "fact"
|
||||
prefix = f"{domain}:{category}:"
|
||||
seq_nums = []
|
||||
for f in index["facts"]:
|
||||
if f["id"].startswith(prefix):
|
||||
try:
|
||||
seq = int(f["id"].split(":")[-1])
|
||||
seq_nums.append(seq)
|
||||
except ValueError:
|
||||
continue
|
||||
next_seq = max(seq_nums, default=0) + 1
|
||||
new_id = f"{domain}:{category}:{next_seq:03d}"
|
||||
|
||||
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
fact_entry = {
|
||||
"id": new_id,
|
||||
"fact": fact,
|
||||
"category": category,
|
||||
"domain": domain,
|
||||
"confidence": confidence,
|
||||
"tags": tags,
|
||||
"source_count": source_count,
|
||||
"first_seen": today,
|
||||
"last_confirmed": today
|
||||
}
|
||||
index["facts"].append(fact_entry)
|
||||
index["total_facts"] = len(index["facts"])
|
||||
index["last_updated"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
def update_citation_data() -> None:
|
||||
"""Update citation counts and facts for all key papers."""
|
||||
papers = load_key_papers()
|
||||
index = load_index()
|
||||
updated = 0
|
||||
|
||||
for paper in papers:
|
||||
s2_id = paper["id"]
|
||||
title = paper["title"]
|
||||
|
||||
# Fetch current paper data
|
||||
data = fetch_paper(s2_id)
|
||||
if not data:
|
||||
continue
|
||||
|
||||
citation_count = data.get("citationCount", 0)
|
||||
external_ids = data.get("externalIds", {})
|
||||
arxiv_id = externalIds.get("ArXiv") if external_ids else None
|
||||
|
||||
# Add citation count fact (high confidence - directly from API)
|
||||
count_fact = f"Paper '{title}' (S2:{s2_id}) has {citation_count} citations as of {datetime.now(timezone.utc).strftime('%Y-%m-%d')}"
|
||||
if arxiv_id:
|
||||
count_fact += f" [arXiv:{arxiv_id}]"
|
||||
|
||||
add_citation_fact(
|
||||
index=index,
|
||||
fact=count_fact,
|
||||
repo="compounding-intelligence",
|
||||
confidence=0.95,
|
||||
tags=["citation", "tracking", "paper", s2_id],
|
||||
source_count=1
|
||||
)
|
||||
updated += 1
|
||||
|
||||
# Fetch recent citations (context extraction - limited batch)
|
||||
citations = fetch_citations(s2_id, limit=20)
|
||||
for citation in citations:
|
||||
citing_title = citation.get("title", "Unknown")
|
||||
citing_year = citation.get("year", "Unknown year")
|
||||
authors = citation.get("authors", [])
|
||||
author_names = [a.get("name", "") for a in authors[:3]]
|
||||
if len(authors) > 3:
|
||||
author_names.append("et al.")
|
||||
|
||||
cite_fact = f"Paper '{citing_title}' ({', '.join(author_names)}, {citing_year}) cites '{title}'"
|
||||
add_citation_fact(
|
||||
index=index,
|
||||
fact=cite_fact,
|
||||
repo="compounding-intelligence",
|
||||
confidence=0.8,
|
||||
tags=["citation", "citing-paper", s2_id],
|
||||
source_count=1
|
||||
)
|
||||
|
||||
print(f"Updated: {title} — {citation_count} citations, {len(citations)} recent")
|
||||
|
||||
save_index(index)
|
||||
print(f"\nUpdated {updated} papers. Total facts in index: {index['total_facts']}")
|
||||
|
||||
def generate_monthly_report(month: Optional[str] = None) -> str:
|
||||
"""Generate a monthly citation report."""
|
||||
target_month = month or datetime.now(timezone.utc).strftime("%Y-%m")
|
||||
year, mon = map(int, target_month.split("-"))
|
||||
|
||||
index = load_index()
|
||||
monthly_facts = []
|
||||
|
||||
for fact in index["facts"]:
|
||||
last_confirmed = fact.get("last_confirmed", "")
|
||||
if last_confirmed.startswith(f"{year}-{mon:02d}"):
|
||||
monthly_facts.append(fact)
|
||||
|
||||
# Build report
|
||||
lines = []
|
||||
lines.append(f"# Citation Tracker Monthly Report — {target_month}")
|
||||
lines.append("")
|
||||
lines.append(f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}")
|
||||
lines.append(f"Total citation facts this month: {len(monthly_facts)}")
|
||||
lines.append("")
|
||||
|
||||
# Group by paper
|
||||
from collections import defaultdict
|
||||
by_paper = defaultdict(list)
|
||||
for fact in monthly_facts:
|
||||
# Extract paper identifier from fact text
|
||||
text = fact["fact"]
|
||||
by_paper[text].append(fact)
|
||||
|
||||
for paper_title, facts in by_paper.items():
|
||||
lines.append(f"## {paper_title}")
|
||||
for f in facts:
|
||||
lines.append(f"- {f['fact']} (confidence: {f['confidence']})")
|
||||
lines.append("")
|
||||
|
||||
report = "\n".join(lines)
|
||||
|
||||
# Save report
|
||||
METRICS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
report_path = METRICS_DIR / f"citation_report_{target_month}.md"
|
||||
with open(report_path, "w") as f:
|
||||
f.write(report)
|
||||
|
||||
print(f"Monthly report saved to: {report_path}")
|
||||
return report
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="Citation Tracker — Monitor key paper citations")
|
||||
parser.add_argument("--update", action="store_true", help="Fetch latest citation data")
|
||||
parser.add_argument("--report", action="store_true", help="Generate monthly report")
|
||||
parser.add_argument("--month", type=str, help="Month for report (YYYY-MM), defaults to current")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.update:
|
||||
update_citation_data()
|
||||
elif args.report:
|
||||
generate_monthly_report(args.month)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
351
scripts/pr_complexity_scorer.py
Normal file
351
scripts/pr_complexity_scorer.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
31
scripts/test_citation_tracker.py
Executable file
31
scripts/test_citation_tracker.py
Executable file
@@ -0,0 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
import sys
|
||||
sys.path.insert(0, "/Users/apayne/burn-clone/STEP35-compounding-intelligence-140/scripts")
|
||||
import yaml
|
||||
from pathlib import Path
|
||||
|
||||
KNOWLEDGE_DIR = Path("/Users/apayne/burn-clone/STEP35-compounding-intelligence-140/knowledge")
|
||||
config_path = KNOWLEDGE_DIR / "global" / "citations.yaml"
|
||||
|
||||
with open(config_path) as f:
|
||||
data = yaml.safe_load(f)
|
||||
|
||||
papers = data.get("papers", [])
|
||||
print(f"Loaded {len(papers)} key papers:")
|
||||
for p in papers:
|
||||
print(f" - {p['s2_id']}: {p['title']}")
|
||||
|
||||
# Test that citation_tracker module loads
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location("citation_tracker",
|
||||
"/Users/apayne/burn-clone/STEP35-compounding-intelligence-140/scripts/citation_tracker.py")
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
print("Module loaded successfully")
|
||||
|
||||
# Test fetch functions (with mock/real API)
|
||||
result = mod.fetch_paper("CorpusId:215715652") # Attention Is All You Need
|
||||
if result:
|
||||
print(f"Paper fetched: {result.get('title')} — {result.get('citationCount')} citations")
|
||||
else:
|
||||
print("Paper fetch failed (may be network issue)")
|
||||
170
scripts/test_pr_complexity_scorer.py
Normal file
170
scripts/test_pr_complexity_scorer.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
Reference in New Issue
Block a user