Compare commits

..

2 Commits

Author SHA1 Message Date
Rockachopa
ec76e9fec3 test(scanner): unit tests for github_trending_scanner
Some checks failed
Test / pytest (pull_request) Failing after 9s
2026-04-26 11:21:02 +00:00
38c5862737 feat(scanner): add GitHub Trending Scanner CLI for AI/ML repos 2026-04-26 11:20:51 +00:00
7 changed files with 383 additions and 21161 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""GitHub Trending Scanner — Scan trending repos in AI/ML.
Extracts: repo description, stars, key features (topics, inferred highlights).
Filters by language and/or topic. Outputs dated JSON for daily scan pipeline.
Usage:
python3 github_trending_scanner.py --language python --topic ai --output metrics/trending
python3 github_trending_scanner.py --topic machine-learning --limit 50
python3 github_trending_scanner.py --language rust --topic artificial-intelligence
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict
import urllib.request
import urllib.parse
import urllib.error
GITHUB_API_BASE = os.environ.get("GITHUB_API_BASE", "https://api.github.com")
DEFAULT_OUTPUT_DIR = os.environ.get("TRENDING_OUTPUT_DIR", "metrics/trending")
DEFAULT_LIMIT = int(os.environ.get("TRENDING_LIMIT", "30"))
DEFAULT_MIN_STARS = int(os.environ.get("TRENDING_MIN_STARS", "1000"))
def fetch_trending_repos(
language: Optional[str] = None,
topic: Optional[str] = None,
min_stars: int = DEFAULT_MIN_STARS,
limit: int = DEFAULT_LIMIT,
) -> List[Dict]:
"""Fetch trending-like repositories from GitHub using the search API.
GitHub's public search API is unauthenticated-rate-limited (60 req/hr).
This function retries on rate-limit backoff and falls back gracefully.
"""
# Build search query: stars threshold + optional language/topic filters
query = f"stars:>{min_stars}"
if language:
query += f" language:{language}"
if topic:
query += f" topic:{topic}"
# Sort by stars descending as a proxy for trending/popular
params = {
"q": query,
"sort": "stars",
"order": "desc",
"per_page": min(limit, 100), # GitHub max per_page is 100
}
url = f"{GITHUB_API_BASE}/search/repositories?{urllib.parse.urlencode(params)}"
headers = {
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Sovereign-Trending-Scanner/1.0",
}
for attempt in range(3):
try:
req = urllib.request.Request(url, headers=headers)
with urllib.request.urlopen(req, timeout=30) as resp:
if resp.status != 200:
raise RuntimeError(f"GitHub API returned {resp.status}")
data = json.loads(resp.read().decode("utf-8"))
return data.get("items", [])[:limit]
except urllib.error.HTTPError as e:
if e.code == 403:
# Check for rate limit message
body = e.read().decode("utf-8", errors="replace").lower()
if "rate limit" in body or "api rate limit exceeded" in body:
reset_ts = int(e.headers.get("X-RateLimit-Reset", 0))
wait_seconds = max(5, reset_ts - int(time.time()) + 5)
print(f"Rate limit exceeded — waiting {wait_seconds}s (attempt {attempt+1}/3)...", file=sys.stderr)
time.sleep(wait_seconds)
continue
print(f"ERROR: GitHub API request failed: {e}{e.read().decode('utf-8', errors='replace')[:200]}", file=sys.stderr)
return []
except Exception as e:
if attempt < 2:
backoff = 2 ** attempt
print(f"WARNING: Fetch attempt {attempt+1} failed: {e} — retrying in {backoff}s", file=sys.stderr)
time.sleep(backoff)
continue
print(f"ERROR: All fetch attempts failed: {e}", file=sys.stderr)
return []
return []
def extract_repo_features(repo_data: Dict) -> Dict:
"""Extract structured fields for a trending repo."""
description = (repo_data.get("description") or "").strip()
topics = repo_data.get("topics", [])
# Infer key features from description and topics
features = infer_features(description, topics)
return {
"name": repo_data.get("full_name", ""),
"description": description,
"stars": repo_data.get("stargazers_count", 0),
"forks": repo_data.get("forks_count", 0),
"open_issues": repo_data.get("open_issues_count", 0),
"language": repo_data.get("language", ""),
"topics": topics,
"url": repo_data.get("html_url", ""),
"created_at": repo_data.get("created_at", ""),
"updated_at": repo_data.get("updated_at", ""),
"key_features": features,
"scanned_at": datetime.now(timezone.utc).isoformat(),
}
def infer_features(description: str, topics: List[str]) -> List[str]:
"""Infer notable capabilities/features from repo metadata.
Looks for AI/ML-relevant capabilities in topics and description.
"""
features = []
text = (description + " " + " ".join(topics)).lower()
# Domain capabilities (keys normalized to lowercase for consistency)
capability_keywords = {
"fine-tuning": ["fine-tun", "finetun"],
"agent framework": ["agent"],
"local/offline": ["local", "on-device", "offline"],
"quantized models": ["quantized", "quantization", "gguf", "gptq"],
"vision": ["vision", "multimodal", "image", "visual"],
"speech/audio": ["speech", "audio", "whisper", "tts"],
"retrieval/rag": ["rag", "retrieval", "embedding", "vector"],
"training": ["train", "training", "sft", "dpo"],
"gui/playground": ["gui", "playground", "webui", "interface"],
"sota": ["state-of-the-art", "sota", "latest"],
}
for label, keywords in capability_keywords.items():
if any(kw in text for kw in keywords):
features.append(label)
# Also include non-generic topics as features
generic_topics = {"ai", "ml", "machine-learning", "deep-learning", "llm", "python", "pytorch", "tensorflow"}
for topic in topics:
if topic.lower() not in generic_topics:
features.append(topic)
# Deduplicate while preserving order, return up to 10
seen = set()
unique = []
for f in features:
key = f.lower()
if key not in seen:
seen.add(key)
unique.append(f)
return unique[:10]
def save_trending(repos: List[Dict], output_dir: str = "metrics/trending") -> str:
"""Save trending results to a dated JSON file.
Returns the path of the written file.
"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
filename = output_path / f"github-trending-{date_str}.json"
output_data = {
"scanned_at": datetime.now(timezone.utc).isoformat(),
"count": len(repos),
"repos": repos,
}
with open(filename, "w") as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
return str(filename)
def main() -> None:
parser = argparse.ArgumentParser(
description="Scan GitHub trending repositories in AI/ML"
)
parser.add_argument(
"--language",
help="Filter by programming language (e.g., python, rust, go)",
)
parser.add_argument(
"--topic",
help="Filter by GitHub topic (e.g., ai, machine-learning, llm)",
)
parser.add_argument(
"--since",
default="daily",
choices=["daily", "weekly", "monthly"],
help="Trending period (daily/weekly/monthly) — informational only",
)
parser.add_argument(
"--output",
default="metrics/trending",
help="Output directory for results (default: metrics/trending)",
)
parser.add_argument(
"--limit",
type=int,
default=DEFAULT_LIMIT,
help=f"Maximum repos to fetch (default: {DEFAULT_LIMIT})",
)
parser.add_argument(
"--min-stars",
type=int,
default=DEFAULT_MIN_STARS,
help=f"Minimum star count for relevance (default: {DEFAULT_MIN_STARS})",
)
args = parser.parse_args()
print(
f"Fetching trending repos "
f"(language={args.language or 'any'}, topic={args.topic or 'any'}, period={args.since})..."
)
repos_raw = fetch_trending_repos(
language=args.language,
topic=args.topic,
min_stars=args.min_stars,
limit=args.limit,
)
if not repos_raw:
print("WARNING: No repos fetched — check network or rate limits", file=sys.stderr)
repos = [extract_repo_features(r) for r in repos_raw]
output_file = save_trending(repos, args.output)
print(f"Saved {len(repos)} trending repos to {output_file}")
# Brief human-readable summary
if repos:
print("\nTop repos:")
for repo in repos[:5]:
features_preview = ", ".join(repo["key_features"][:3])
print(f"{repo['stars']:>7} {repo['name']}")
if repo["description"]:
desc = repo["description"][:80]
print(f" {desc}{'...' if len(repo['description']) > 80 else ''}")
if features_preview:
print(f" Features: {features_preview}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,351 +0,0 @@
#!/usr/bin/env python3
"""
PR Complexity Scorer - Estimate review effort for PRs.
"""
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional
import urllib.request
import urllib.error
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
DEPENDENCY_FILES = {
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
}
TEST_PATTERNS = [
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
r"spec/.*\.rb$", r".*_spec\.rb$",
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
]
WEIGHT_FILES = 0.25
WEIGHT_LINES = 0.25
WEIGHT_DEPS = 0.30
WEIGHT_TEST_COV = 0.20
SMALL_FILES = 5
MEDIUM_FILES = 20
LARGE_FILES = 50
SMALL_LINES = 100
MEDIUM_LINES = 500
LARGE_LINES = 2000
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
@dataclass
class PRComplexity:
pr_number: int
title: str
files_changed: int
additions: int
deletions: int
has_dependency_changes: bool
test_coverage_delta: Optional[int]
score: int
estimated_minutes: int
reasons: List[str]
def to_dict(self) -> dict:
return asdict(self)
class GiteaClient:
def __init__(self, token: str):
self.token = token
self.base_url = GITEA_BASE.rstrip("/")
def _request(self, path: str, params: Dict = None) -> Any:
url = f"{self.base_url}{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
url += f"?{qs}"
req = urllib.request.Request(url)
req.add_header("Authorization", f"token {self.token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
except urllib.error.HTTPError as e:
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
return None
except urllib.error.URLError as e:
print(f"Network error: {e}", file=sys.stderr)
return None
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
prs = []
page = 1
while True:
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
return prs
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
files = []
page = 1
while True:
batch = self._request(
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
{"limit": 100, "page": page}
)
if not batch:
break
files.extend(batch)
if len(batch) < 100:
break
page += 1
return files
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
data = json.dumps({"body": body}).encode("utf-8")
req = urllib.request.Request(
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
data=data,
method="POST",
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return resp.status in (200, 201)
except urllib.error.HTTPError:
return False
def is_dependency_file(filename: str) -> bool:
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
def is_test_file(filename: str) -> bool:
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
def score_pr(
files_changed: int,
additions: int,
deletions: int,
has_dependency_changes: bool,
test_coverage_delta: Optional[int] = None
) -> tuple[int, int, List[str]]:
score = 1.0
reasons = []
# Files changed
if files_changed <= SMALL_FILES:
fscore = 1.0
reasons.append("small number of files changed")
elif files_changed <= MEDIUM_FILES:
fscore = 2.0
reasons.append("moderate number of files changed")
elif files_changed <= LARGE_FILES:
fscore = 2.5
reasons.append("large number of files changed")
else:
fscore = 3.0
reasons.append("very large PR spanning many files")
# Lines changed
total_lines = additions + deletions
if total_lines <= SMALL_LINES:
lscore = 1.0
reasons.append("small change size")
elif total_lines <= MEDIUM_LINES:
lscore = 2.0
reasons.append("moderate change size")
elif total_lines <= LARGE_LINES:
lscore = 3.0
reasons.append("large change size")
else:
lscore = 4.0
reasons.append("very large change")
# Dependency changes
if has_dependency_changes:
dscore = 2.5
reasons.append("dependency changes (architectural impact)")
else:
dscore = 0.0
# Test coverage delta
tscore = 0.0
if test_coverage_delta is not None:
if test_coverage_delta > 0:
reasons.append(f"test additions (+{test_coverage_delta} test files)")
tscore = -min(2.0, test_coverage_delta / 2.0)
elif test_coverage_delta < 0:
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
else:
reasons.append("test coverage change not assessed")
# Weighted sum, scaled by 3 to use full 1-10 range
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
scaled_bonus = bonus * 3.0
score = 1.0 + scaled_bonus
final_score = max(1, min(10, int(round(score))))
est_minutes = TIME_PER_POINT.get(final_score, 30)
return final_score, est_minutes, reasons
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
pr_num = pr_data["number"]
title = pr_data.get("title", "")
files = client.get_pr_files(org, repo, pr_num)
additions = sum(f.get("additions", 0) for f in files)
deletions = sum(f.get("deletions", 0) for f in files)
filenames = [f.get("filename", "") for f in files]
has_deps = any(is_dependency_file(f) for f in filenames)
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
test_delta = test_added - test_removed if (test_added or test_removed) else None
score, est_min, reasons = score_pr(
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta
)
return PRComplexity(
pr_number=pr_num,
title=title,
files_changed=len(files),
additions=additions,
deletions=deletions,
has_dependency_changes=has_deps,
test_coverage_delta=test_delta,
score=score,
estimated_minutes=est_min,
reasons=reasons
)
def build_comment(complexity: PRComplexity) -> str:
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
test_note = ""
if complexity.test_coverage_delta is not None:
if complexity.test_coverage_delta > 0:
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
elif complexity.test_coverage_delta < 0:
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
comment = f"## 📊 PR Complexity Analysis\n\n"
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
comment += f"| Metric | Value |\n|--------|-------|\n"
comment += f"| Changes | {change_desc} |\n"
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
comment += f"### Scoring rationale:"
for r in complexity.reasons:
comment += f"\n- {r}"
if deps_note:
comment += deps_note
if test_note:
comment += test_note
comment += f"\n\n---\n"
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
return comment
def main():
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
parser.add_argument("--org", default="Timmy_Foundation")
parser.add_argument("--repo", default="compounding-intelligence")
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--apply", action="store_true")
parser.add_argument("--output", default="metrics/pr_complexity.json")
args = parser.parse_args()
token_path = args.token
if os.path.exists(token_path):
with open(token_path) as f:
token = f.read().strip()
else:
token = args.token
if not token:
print("ERROR: No Gitea token provided", file=sys.stderr)
sys.exit(1)
client = GiteaClient(token)
print(f"Fetching open PRs for {args.org}/{args.repo}...")
prs = client.get_open_prs(args.org, args.repo)
if not prs:
print("No open PRs found.")
sys.exit(0)
print(f"Found {len(prs)} open PR(s). Analyzing...")
results = []
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
for pr in prs:
pr_num = pr["number"]
title = pr.get("title", "")
print(f" Analyzing PR #{pr_num}: {title[:60]}")
try:
complexity = analyze_pr(client, args.org, args.repo, pr)
results.append(complexity.to_dict())
comment = build_comment(complexity)
if args.dry_run:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
elif args.apply:
success = client.post_comment(args.org, args.repo, pr_num, comment)
status = "[commented]" if success else "[FAILED]"
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
else:
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
except Exception as e:
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
with open(args.output, "w") as f:
json.dump({
"org": args.org,
"repo": args.repo,
"timestamp": datetime.now(timezone.utc).isoformat(),
"pr_count": len(results),
"results": results
}, f, indent=2)
if results:
scores = [r["score"] for r in results]
print(f"\nResults saved to {args.output}")
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
else:
print("\nNo results to save.")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,125 @@
#!/usr/bin/env python3
"""Tests for github_trending_scanner.py — pure function validation.
Tests the feature inference, extraction, and output formatting logic
without relying on external GitHub API calls.
"""
import json
import sys
import tempfile
from pathlib import Path
# Add scripts dir to path for import
sys.path.insert(0, str(Path(__file__).resolve().parent))
from github_trending_scanner import (
extract_repo_features,
infer_features,
save_trending,
)
def test_infer_features_from_description():
"""Feature inference extracts capabilities from description text."""
desc = "A local, quantized LLM framework for fine-tuning and agent-based RAG with vision."
topics = ["ai", "llm"]
features = infer_features(desc, topics)
# Should include relevant capabilities (case-insensitive comparison)
expected_lower = {"fine-tuning", "local/offline", "quantized models", "agent framework", "vision", "retrieval/rag"}
actual_lower = set(f.lower() for f in features)
assert expected_lower.issubset(actual_lower), f"Missing features. Expected subset of {expected_lower}, got {actual_lower}"
print("PASS: infer_features_from_description")
def test_infer_features_from_topics_only():
"""Topics alone can drive feature detection."""
desc = ""
topics = ["computer-vision", "speech", "pytorch"]
features = infer_features(desc, topics)
# Non-generic topics should appear as features (topics preserved as-is)
assert "computer-vision" in features, f"Expected 'computer-vision' in {features}"
assert "speech" in features, f"Expected 'speech' in {features}"
# Generic topics (pytorch) may be filtered
print(f"PASS: infer_features_from_topics_only → {features}")
def test_extract_repo_features_produces_valid_structure():
"""extract_repo_features returns all required fields."""
mock_repo = {
"full_name": "example/repo",
"description": "An example repository",
"stargazers_count": 1234,
"forks_count": 56,
"open_issues_count": 7,
"language": "Python",
"topics": ["ai", "llm"],
"html_url": "https://github.com/example/repo",
"created_at": "2025-01-01T00:00:00Z",
"updated_at": "2026-01-01T00:00:00Z",
}
result = extract_repo_features(mock_repo)
assert result["name"] == "example/repo"
assert result["description"] == "An example repository"
assert result["stars"] == 1234
assert isinstance(result["key_features"], list)
assert "scanned_at" in result
assert result["url"] == "https://github.com/example/repo"
print("PASS: extract_repo_features_structure")
def test_save_trending_creates_dated_json():
"""save_trending writes a valid JSON file with the expected schema."""
repos = [
{
"name": "test/repo",
"description": "Test repository",
"stars": 999,
"language": "Python",
"topics": ["test"],
"key_features": ["testing"],
"scanned_at": "2026-04-26T00:00:00+00:00",
}
]
with tempfile.TemporaryDirectory() as tmp:
output_file = save_trending(repos, output_dir=tmp)
path = Path(output_file)
assert path.exists(), f"Output file not created: {output_file}"
with open(path) as f:
data = json.load(f)
assert "scanned_at" in data
assert data["count"] == 1
assert isinstance(data["repos"], list)
assert data["repos"][0]["name"] == "test/repo"
print(f"PASS: save_trending → {output_file}")
def test_save_trending_respects_output_dir_creation():
"""Output directory is created if it doesn't exist."""
repos = []
with tempfile.TemporaryDirectory() as tmp:
nested = Path(tmp) / "nested" / "trending"
assert not nested.exists()
output_file = save_trending(repos, output_dir=str(nested))
assert nested.exists()
assert Path(output_file).exists()
print("PASS: output_dir_creation")
if __name__ == "__main__":
test_infer_features_from_description()
test_infer_features_from_topics_only()
test_extract_repo_features_produces_valid_structure()
test_save_trending_creates_dated_json()
test_save_trending_respects_output_dir_creation()
print("\nAll github_trending_scanner tests passed.")

View File

@@ -1,170 +0,0 @@
#!/usr/bin/env python3
"""
Tests for PR Complexity Scorer — unit tests for the scoring logic.
"""
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pr_complexity_scorer import (
score_pr,
is_dependency_file,
is_test_file,
TIME_PER_POINT,
SMALL_FILES,
MEDIUM_FILES,
LARGE_FILES,
SMALL_LINES,
MEDIUM_LINES,
LARGE_LINES,
)
PASS = 0
FAIL = 0
def test(name):
def decorator(fn):
global PASS, FAIL
try:
fn()
PASS += 1
print(f" [PASS] {name}")
except AssertionError as e:
FAIL += 1
print(f" [FAIL] {name}: {e}")
except Exception as e:
FAIL += 1
print(f" [FAIL] {name}: Unexpected error: {e}")
return decorator
def assert_eq(a, b, msg=""):
if a != b:
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
def assert_true(v, msg=""):
if not v:
raise AssertionError(msg or "Expected True")
def assert_false(v, msg=""):
if v:
raise AssertionError(msg or "Expected False")
print("=== PR Complexity Scorer Tests ===\n")
print("-- File Classification --")
@test("dependency file detection — requirements.txt")
def _():
assert_true(is_dependency_file("requirements.txt"))
assert_true(is_dependency_file("src/requirements.txt"))
assert_false(is_dependency_file("requirements_test.txt"))
@test("dependency file detection — pyproject.toml")
def _():
assert_true(is_dependency_file("pyproject.toml"))
assert_false(is_dependency_file("myproject.py"))
@test("test file detection — pytest style")
def _():
assert_true(is_test_file("tests/test_api.py"))
assert_true(is_test_file("test_module.py"))
assert_true(is_test_file("src/module_test.py"))
@test("test file detection — other frameworks")
def _():
assert_true(is_test_file("spec/feature_spec.rb"))
assert_true(is_test_file("__tests__/component.test.js"))
assert_false(is_test_file("testfixtures/helper.py"))
print("\n-- Scoring Logic --")
@test("small PR gets low score (1-3)")
def _():
score, minutes, _ = score_pr(
files_changed=3,
additions=50,
deletions=10,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
assert_true(minutes < 20)
@test("medium PR gets medium score (4-6)")
def _():
score, minutes, _ = score_pr(
files_changed=15,
additions=400,
deletions=100,
has_dependency_changes=False,
test_coverage_delta=None
)
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
assert_true(20 <= minutes <= 45)
@test("large PR gets high score (7-9)")
def _():
score, minutes, _ = score_pr(
files_changed=60,
additions=3000,
deletions=1500,
has_dependency_changes=True,
test_coverage_delta=None
)
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
assert_true(minutes >= 45)
@test("dependency changes boost score")
def _():
base_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=False, test_coverage_delta=None
)
dep_score, _, _ = score_pr(
files_changed=10, additions=200, deletions=50,
has_dependency_changes=True, test_coverage_delta=None
)
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
@test("adding tests lowers complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
better_score, _, _ = score_pr(
files_changed=8, additions=180, deletions=20,
has_dependency_changes=False, test_coverage_delta=3
)
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
@test("removing tests increases complexity")
def _():
base_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=None
)
worse_score, _, _ = score_pr(
files_changed=8, additions=150, deletions=20,
has_dependency_changes=False, test_coverage_delta=-2
)
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
@test("score bounded 1-10")
def _():
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
score, _, _ = score_pr(files, adds, dels, False, None)
assert_true(1 <= score <= 10, f"Score {score} out of range")
@test("estimated minutes exist for all scores")
def _():
for s in range(1, 11):
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
sys.exit(0 if FAIL == 0 else 1)

View File

@@ -1,377 +0,0 @@
#!/usr/bin/env python3
"""
transcript_harvester.py — Rule-based knowledge extraction from Hermes session transcripts.
Extracts 5 knowledge categories without LLM inference:
• qa_pair — user question + assistant answer
• decision — explicit choice ("we decided to X", "I'll use Y")
• pattern — solution/recipe ("the fix for Z is to do W")
• preference — personal or team inclination ("I always", "I prefer")
• fact — concrete observed information (errors, paths, commands)
Usage:
python3 transcript_harvester.py --session ~/.hermes/sessions/session_xxx.jsonl
python3 transcript_harvester.py --batch --sessions-dir ~/.hermes/sessions --limit 50
python3 transcript_harvester.py --session session.jsonl --output knowledge/transcripts/
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Import session_reader from the same scripts directory
SCRIPT_DIR = Path(__file__).parent.absolute()
sys.path.insert(0, str(SCRIPT_DIR))
from session_reader import read_session
# --- Pattern matchers --------------------------------------------------------
DECISION_PATTERNS = [
r"\b(we\s+(?:decided|chose|agreed|will|are going)\s+to\s+.*)",
r"\b(I\s+will\s+use|I\s+choose|I\s+am going\s+to)\s+.*",
r"\b(let's\s+(?:use|go\s+with|do|try))\s+.*",
r"\b(the\s+(?:decision|choice)\s+is)\s+.*",
r"\b(I'll\s+implement|I'll\s+deploy|I'll\s+create)\s+.*",
]
PATTERN_PATTERNS = [
r"\b(the\s+fix\s+for\s+.*\s+is\s+to\s+.*)",
r"\b(solution:?\s+.*)",
r"\b(approach:?\s+.*)",
r"\b(procedure:?\s+.*)",
r"\b(to\s+resolve\s+this.*?,\s+.*)",
r"\b(used\s+.*\s+to\s+.*)", # "used X to do Y"
r"\b(by\s+doing\s+.*\s+we\s+.*)",
r"\b(Here's\s+the\s+.*\s+process:?)", # "Here's the deployment process:"
r"\b(The\s+steps\s+are:?)",
r"\b(steps\s+to\s+.*:?)",
r"\b(Implementation\s+plan:?)",
r"\b(\d+\.\s+.*\n\d+\.)", # numbered multi-step (at least two steps detected by newlines)
]
PREFERENCE_PATTERNS = [
r"\b(I\s+(?:always|never|prefer|usually|typically|generally)\s+.*)",
r"\b(I\s+like\s+.*)",
r"\b(My\s+preference\s+is\s+.*)",
r"\b(Alexander\s+(?:prefers|always|never).*)",
r"\b(We\s+always\s+.*)",
]
ERROR_PATTERNS = [
r"\b(error|failed|fatal|exception|denied|could\s+not|couldn't)\b.*",
]
# For a fix that follows an error within 2 messages
FIX_INDICATORS = [
r"\b(fixed|resolved|added|generated|created|corrected|worked)\b",
r"\b(the\s+key\s+is|solution\s+was|generate\s+a\s+new)\b",
]
def is_decision(text: str) -> bool:
for p in DECISION_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_pattern(text: str) -> bool:
for p in PATTERN_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_preference(text: str) -> bool:
for p in PREFERENCE_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_error(text: str) -> bool:
for p in ERROR_PATTERNS:
if re.search(p, text, re.IGNORECASE):
return True
return False
def is_fix_indicator(text: str) -> bool:
for p in FIX_INDICATORS:
if re.search(p, text, re.IGNORECASE):
return True
return False
# --- Extractors --------------------------------------------------------------
def extract_qa_pair(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a question→answer pair: user question followed by assistant answer."""
if idx + 1 >= len(messages):
return None
curr = messages[idx]
nxt = messages[idx + 1]
if curr.get('role') != 'user' or nxt.get('role') != 'assistant':
return None
question = curr.get('content', '').strip()
answer = nxt.get('content', '').strip()
if not question or not answer:
return None
# Must be a real question (ends with ? or starts with WH-)
if not (question.endswith('?') or re.match(r'^(how|what|why|when|where|who|which|can|do|is|are)', question, re.IGNORECASE)):
return None
# Skip very short answers ("OK", "Yes")
if len(answer.split()) < 3:
return None
return {
"type": "qa_pair",
"question": question,
"answer": answer,
"timestamp": curr.get('timestamp', ''),
}
def extract_decision(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a decision statement from assistant or user message."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_decision(text):
return None
return {
"type": "decision",
"decision": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_pattern(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a pattern or solution description."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_pattern(text):
return None
return {
"type": "pattern",
"pattern": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_preference(messages: list[dict], idx: int) -> Optional[dict]:
"""Extract a stated preference."""
msg = messages[idx]
text = msg.get('content', '').strip()
if not is_preference(text):
return None
return {
"type": "preference",
"preference": text,
"by": msg.get('role', 'unknown'),
"timestamp": msg.get('timestamp', ''),
}
def extract_error_fix(messages: list[dict], idx: int) -> Optional[dict]:
"""
Link an error to its fix. Catch two patterns:
1. Error statement followed by explicit fix indicator ("fixed", "resolved")
2. Error statement followed by a decision statement that fixes it ("I'll generate", "I'll add")
"""
msg = messages[idx]
if not is_error(msg.get('content', '')):
return None
error_text = msg.get('content', '').strip()
window = min(idx + 8, len(messages))
for j in range(idx + 1, window):
follow_up = messages[j]
follow_text = follow_up.get('content', '').strip()
# Check for explicit fix indicators
if is_fix_indicator(follow_text):
return {
"type": "error_fix",
"error": error_text,
"fix": follow_text,
"error_timestamp": msg.get('timestamp', ''),
"fix_timestamp": follow_up.get('timestamp', ''),
}
# Check for fix decision: "I'll <action>", "Let's <action>", "We need to <action>"
if re.match(r"^(I'll|I will|Let's|We (will|should|need to))\s+\w+", follow_text, re.IGNORECASE):
return {
"type": "error_fix",
"error": error_text,
"fix": follow_text,
"error_timestamp": msg.get('timestamp', ''),
"fix_timestamp": follow_up.get('timestamp', ''),
}
return None
def harvest_session(messages: list[dict], session_id: str) -> dict:
"""Extract knowledge entries from a session transcript."""
entries = []
n = len(messages)
for i in range(n):
# QA pairs
qa = extract_qa_pair(messages, i)
if qa:
qa['session_id'] = session_id
entries.append(qa)
# Decisions
dec = extract_decision(messages, i)
if dec:
dec['session_id'] = session_id
entries.append(dec)
# Patterns
pat = extract_pattern(messages, i)
if pat:
pat['session_id'] = session_id
entries.append(pat)
# Preferences
pref = extract_preference(messages, i)
if pref:
pref['session_id'] = session_id
entries.append(pref)
# Error/fix pairs (spanning multiple messages)
ef = extract_error_fix(messages, i)
if ef:
ef['session_id'] = session_id
entries.append(ef)
return {
"session_id": session_id,
"message_count": n,
"entries": entries,
"counts": {
"qa_pair": sum(1 for e in entries if e['type'] == 'qa_pair'),
"decision": sum(1 for e in entries if e['type'] == 'decision'),
"pattern": sum(1 for e in entries if e['type'] == 'pattern'),
"preference": sum(1 for e in entries if e['type'] == 'preference'),
"error_fix": sum(1 for e in entries if e['type'] == 'error_fix'),
}
}
def write_json_output(results: list[dict], output_path: Path):
"""Write aggregated results to JSON."""
all_entries = []
summary = {"sessions": 0}
for r in results:
summary['sessions'] += 1
all_entries.extend(r['entries'])
output = {
"harvester": "transcript_harvester",
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": summary,
"total_entries": len(all_entries),
"entries": all_entries,
}
output_path.write_text(json.dumps(output, indent=2, ensure_ascii=False))
return output
def write_report(results: list[dict], report_path: Path):
"""Write a human-readable markdown report."""
lines = []
lines.append("# Transcript Harvester Report")
lines.append(f"Generated: {datetime.now(timezone.utc).isoformat()}")
lines.append(f"Sessions processed: {len(results)}")
totals = {cat: 0 for cat in ['qa_pair', 'decision', 'pattern', 'preference', 'error_fix']}
for r in results:
for cat, cnt in r['counts'].items():
totals[cat] += cnt # BUG: should be += cnt
lines.append("\n## Extracted Knowledge by Category\n")
for cat, cnt in totals.items():
lines.append(f"- **{cat}**: {cnt}")
lines.append("\n## Sample Entries\n")
for r in results:
for entry in r['entries'][:3]:
lines.append(f"\n### {entry['type'].upper()} ({r['session_id']})\n")
if entry['type'] == 'qa_pair':
lines.append(f"**Q:** {entry['question']}\n")
lines.append(f"**A:** {entry['answer']}\n")
elif entry['type'] == 'decision':
lines.append(f"**Decision:** {entry['decision']}\n")
lines.append(f"By: {entry['by']}\n")
elif entry['type'] == 'pattern':
lines.append(f"**Pattern:** {entry['pattern']}\n")
elif entry['type'] == 'preference':
lines.append(f"**Preference:** {entry['preference']}\n")
elif entry['type'] == 'error_fix':
lines.append(f"**Error:** {entry['error']}\n")
lines.append(f"**Fixed by:** {entry['fix']}\n")
report_path.write_text("\n".join(lines))
def find_recent_sessions(sessions_dir: Path, limit: int = 50) -> list[Path]:
"""Find up to `limit` most recent .jsonl session files."""
sessions = sorted(sessions_dir.glob("*.jsonl"), reverse=True)
return sessions[:limit] if limit > 0 else sessions
def main():
parser = argparse.ArgumentParser(description="Harvest knowledge from session transcripts")
parser.add_argument('--session', help='Single session JSONL file')
parser.add_argument('--batch', action='store_true', help='Batch mode')
parser.add_argument('--sessions-dir', default=str(Path.home() / '.hermes' / 'sessions'),
help='Directory of session files')
parser.add_argument('--output', default='knowledge/transcripts',
help='Output directory (default: knowledge/transcripts)')
parser.add_argument('--limit', type=int, default=50,
help='Max sessions to process in batch (default: 50)')
args = parser.parse_args()
output_dir = Path(args.output)
output_dir.mkdir(parents=True, exist_ok=True)
results = []
if args.session:
messages = read_session(args.session)
session_id = Path(args.session).stem
results.append(harvest_session(messages, session_id))
elif args.batch:
sessions_dir = Path(args.sessions_dir)
sessions = find_recent_sessions(sessions_dir, args.limit)
print(f"Processing {len(sessions)} sessions...")
for sf in sessions:
messages = read_session(str(sf))
results.append(harvest_session(messages, sf.stem))
else:
parser.print_help()
sys.exit(1)
# Write outputs
json_path = output_dir / "transcript_knowledge.json"
report_path = output_dir / "transcript_report.md"
output = write_json_output(results, json_path)
write_report(results, report_path)
print(f"\nDone: {output['total_entries']} entries from {len(results)} sessions")
print(f"Output: {json_path}")
print(f"Report: {report_path}")
# Print category totals
totals = {}
for r in results:
for cat, cnt in r['counts'].items():
totals[cat] = totals.get(cat, 0) + cnt
print("\nCategory counts:")
for cat, cnt in sorted(totals.items()):
print(f" {cat}: {cnt}")
if __name__ == '__main__':
main()