Compare commits
3 Commits
step35/138
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 2a4e73aa03 | |||
|
|
4b5a675355 | ||
|
|
b1a728f5f4 |
@@ -1,283 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
conference_summarizer.py — Extract knowledge from conference talk transcripts.
|
||||
|
||||
Reads a plain-text transcript and uses LLM to extract durable knowledge items.
|
||||
Integrates with the knowledge store (index.json + knowledge/conferences/talks.md).
|
||||
|
||||
Usage:
|
||||
python3 conference_summarizer.py --transcript talk.txt --conference "AI拂晓" --domain global
|
||||
python3 conference_summarizer.py --transcript talk.txt --domain the-nexus # talk about that repo
|
||||
python3 conference_summarizer.py --transcript talk.txt --dry-run
|
||||
|
||||
Refs: Issue #138 — 7.6: Conference Talk Summarizer
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
REPO_ROOT = SCRIPT_DIR.parent
|
||||
KNOWLEDGE_DIR = REPO_ROOT / "knowledge"
|
||||
|
||||
DEFAULT_API_BASE = "https://api.nousresearch.com/v1"
|
||||
DEFAULT_API_KEY = ""
|
||||
DEFAULT_MODEL = "xiaomi/mimo-v2-pro"
|
||||
|
||||
API_KEY_PATHS = [
|
||||
Path.home() / ".config/nous/key",
|
||||
Path.home() / ".hermes/keymaxxing/active/minimax.key",
|
||||
Path.home() / ".config/openrouter/key",
|
||||
]
|
||||
|
||||
|
||||
def find_api_key() -> str:
|
||||
for path in API_KEY_PATHS:
|
||||
if path.exists():
|
||||
return path.read_text().strip()
|
||||
return ""
|
||||
|
||||
|
||||
def load_prompt() -> str:
|
||||
path = SCRIPT_DIR.parent / "templates" / "conference-summary-prompt.md"
|
||||
if not path.exists():
|
||||
print(f"ERROR: Prompt not found at {path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return path.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def truncate_for_context(text: str, head: int = 120, tail: int = 120) -> str:
|
||||
lines = text.splitlines()
|
||||
if len(lines) <= head + tail:
|
||||
return text
|
||||
return (
|
||||
"\n".join(lines[:head])
|
||||
+ "\n\n... [truncated] ...\n\n"
|
||||
+ "\n".join(lines[-tail:])
|
||||
)
|
||||
|
||||
|
||||
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str):
|
||||
import urllib.request
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": f"Transcript:\n\n{truncate_for_context(transcript)}"},
|
||||
]
|
||||
payload = json.dumps(
|
||||
{"model": model, "messages": messages, "temperature": 0.1, "max_tokens": 4096}
|
||||
).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{api_base}/chat/completions",
|
||||
data=payload,
|
||||
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
data = json.loads(resp.read())
|
||||
content = data["choices"][0]["message"]["content"].strip()
|
||||
# Strip code fences
|
||||
if content.startswith("```"):
|
||||
content = content.split("\n", 1)[1].rsplit("```", 1)[0].strip()
|
||||
return json.loads(content)
|
||||
except Exception as e:
|
||||
print(f"ERROR: LLM extraction failed: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def load_index(knowledge_dir: Path) -> dict:
|
||||
index_path = knowledge_dir / "index.json"
|
||||
if index_path.exists():
|
||||
with open(index_path) as f:
|
||||
return json.load(f)
|
||||
return {"version": 1, "total_facts": 0, "facts": []}
|
||||
|
||||
|
||||
def content_hash(text: str) -> str:
|
||||
normalized = " ".join(text.lower().strip().split())
|
||||
return hashlib.sha256(normalized.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def compute_next_sequence(existing_facts: list[dict], domain: str, category: str) -> int:
|
||||
"""Compute next sequence number for (domain, category) based on existing IDs."""
|
||||
max_seq = 0
|
||||
for f in existing_facts:
|
||||
fid = f.get("id", "")
|
||||
parts = fid.split(":")
|
||||
if len(parts) == 3 and parts[0] == domain and parts[1] == category:
|
||||
try:
|
||||
seq = int(parts[2])
|
||||
max_seq = max(max_seq, seq)
|
||||
except ValueError:
|
||||
pass
|
||||
return max_seq + 1
|
||||
|
||||
|
||||
def deduplicate(new_facts: list[dict], existing: list[dict]) -> list[dict]:
|
||||
"""Exact-deduplicate by content hash; near-dedup by token overlap."""
|
||||
existing_hashes = {content_hash(f["fact"]): f for f in existing}
|
||||
existing_texts = [f["fact"].lower() for f in existing]
|
||||
unique = []
|
||||
for fact in new_facts:
|
||||
text = fact.get("fact", "")
|
||||
h = content_hash(text)
|
||||
if h in existing_hashes:
|
||||
continue
|
||||
# Near-dedup: token Jaccard >= 0.8
|
||||
tokens = set(text.lower().split())
|
||||
for ex in existing_texts:
|
||||
ex_tokens = set(ex.split())
|
||||
if tokens and ex_tokens:
|
||||
inter = len(tokens & ex_tokens)
|
||||
union = len(tokens | ex_tokens)
|
||||
if inter / union >= 0.8:
|
||||
break
|
||||
else:
|
||||
unique.append(fact)
|
||||
return unique
|
||||
|
||||
|
||||
def validate_fact(fact: dict) -> bool:
|
||||
required = ["fact", "category", "domain", "confidence"]
|
||||
for field in required:
|
||||
if field not in fact:
|
||||
return False
|
||||
if not isinstance(fact["fact"], str) or not fact["fact"].strip():
|
||||
return False
|
||||
if fact["category"] not in ["fact", "pitfall", "pattern", "tool-quirk", "question"]:
|
||||
return False
|
||||
c = fact.get("confidence", 0)
|
||||
return isinstance(c, (int, float)) and 0.0 <= c <= 1.0
|
||||
|
||||
|
||||
def write_knowledge(index: dict, new_facts: list[dict], knowledge_dir: Path):
|
||||
kdir = knowledge_dir
|
||||
kdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for fact in new_facts:
|
||||
fact["harvested_at"] = datetime.now(timezone.utc).isoformat()
|
||||
fact["source"] = "conference-talk"
|
||||
|
||||
index["facts"].extend(new_facts)
|
||||
index["total_facts"] = len(index["facts"])
|
||||
index["last_updated"] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
# index.json
|
||||
with open(kdir / "index.json", "w", encoding="utf-8") as f:
|
||||
json.dump(index, f, indent=2, ensure_ascii=False)
|
||||
|
||||
# conferences/talks.md (human-readable)
|
||||
conf_dir = kdir / "conferences"
|
||||
conf_dir.mkdir(parents=True, exist_ok=True)
|
||||
conf_md = conf_dir / "talks.md"
|
||||
mode = "a" if conf_md.exists() else "w"
|
||||
with open(conf_md, mode, encoding="utf-8") as f:
|
||||
if mode == "w":
|
||||
f.write("# Conference Talk Knowledge\n\n")
|
||||
f.write(
|
||||
f"## {datetime.now(timezone.utc).strftime('%Y-%m-%d')} — {len(new_facts)} items\n\n"
|
||||
)
|
||||
for fact in new_facts:
|
||||
icon = {"fact": "📋", "pitfall": "⚠️", "pattern": "🔄", "tool-quirk": "🔧", "question": "❓"}.get(fact["category"], "•")
|
||||
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
|
||||
if fact.get("evidence"):
|
||||
f.write(f" _Evidence: {fact['evidence']}_\n")
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Summarize conference talks into knowledge store")
|
||||
parser.add_argument("--transcript", required=True, help="Path to transcript text file")
|
||||
parser.add_argument("--conference", default="unknown", help="Conference name")
|
||||
parser.add_argument("--title", default="", help="Talk title")
|
||||
parser.add_argument("--speaker", default="", help="Speaker name(s)")
|
||||
parser.add_argument("--talk-url", default="", help="URL to talk/video")
|
||||
parser.add_argument("--domain", default="global", help="Domain: global or repo/agent name")
|
||||
parser.add_argument("--knowledge-dir", default=str(KNOWLEDGE_DIR), help="Knowledge store directory")
|
||||
parser.add_argument("--api-base", default=DEFAULT_API_BASE, help="LLM API base URL")
|
||||
parser.add_argument("--api-key", default="", help="LLM API key")
|
||||
parser.add_argument("--model", default=DEFAULT_MODEL, help="Model to use")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
|
||||
parser.add_argument("--min-confidence", type=float, default=0.3, help="Minimum confidence threshold")
|
||||
args = parser.parse_args()
|
||||
|
||||
transcript_path = Path(args.transcript)
|
||||
if not transcript_path.exists():
|
||||
print(f"ERROR: Transcript not found: {transcript_path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
transcript = transcript_path.read_text(encoding="utf-8", errors="replace")
|
||||
if not transcript.strip():
|
||||
print("ERROR: Transcript is empty", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
|
||||
if not api_key:
|
||||
print("ERROR: No API key. Set HARVESTER_API_KEY or pass --api-key", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
prompt = load_prompt()
|
||||
print(f"Summarizing '{transcript_path.name}' domain={args.domain} conf={args.conference}")
|
||||
start = time.time()
|
||||
extracted = call_llm(prompt, transcript, args.api_base, api_key, args.model)
|
||||
if extracted is None:
|
||||
print("ERROR: LLM extraction failed", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
raw_items = extracted.get("knowledge", [])
|
||||
print(f" Raw items: {len(raw_items)}")
|
||||
valid = [f for f in raw_items if validate_fact(f) and f.get("confidence", 0) >= args.min_confidence]
|
||||
print(f" Valid: {len(valid)}")
|
||||
|
||||
if not valid:
|
||||
print("WARNING: No valid items extracted", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
kdir = Path(args.knowledge_dir)
|
||||
index = load_index(kdir)
|
||||
existing_facts = index.get("facts", [])
|
||||
new_facts = deduplicate(valid, existing_facts)
|
||||
print(f" New (non-duplicate): {len(new_facts)}")
|
||||
|
||||
if not new_facts:
|
||||
print("All items duplicated — nothing to write.")
|
||||
sys.exit(0)
|
||||
|
||||
# Assign IDs per (domain, category) sequence
|
||||
seq_counters = {}
|
||||
# Count existing for this domain
|
||||
for f in existing_facts:
|
||||
if f.get("domain") == args.domain:
|
||||
cat = f.get("category", "fact")
|
||||
key = (args.domain, cat)
|
||||
seq_counters[key] = seq_counters.get(key, 0) + 1
|
||||
# Now next sequence for each category in new_facts
|
||||
for fact in new_facts:
|
||||
cat = fact["category"]
|
||||
key = (args.domain, cat)
|
||||
next_seq = seq_counters.get(key, 0) + 1
|
||||
seq_counters[key] = next_seq
|
||||
fact["id"] = f"{args.domain}:{cat}:{next_seq:03d}"
|
||||
fact["domain"] = args.domain
|
||||
fact.setdefault("tags", []).extend([args.conference, "conference-talk"])
|
||||
fact["first_seen"] = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
fact["last_confirmed"] = fact["first_seen"]
|
||||
fact["source_count"] = 1
|
||||
fact["talk_meta"] = extracted.get("meta", {})
|
||||
|
||||
if args.dry_run:
|
||||
print("DRY RUN — items that would be added:")
|
||||
for f in new_facts:
|
||||
print(f" [{f['category']}] {f['fact'][:90]}")
|
||||
sys.exit(0)
|
||||
|
||||
write_knowledge(index, new_facts, kdir)
|
||||
print(f"✓ Stored {len(new_facts)} items to knowledge store in {time.time() - start:.1f}s")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
351
scripts/pr_complexity_scorer.py
Normal file
351
scripts/pr_complexity_scorer.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -22,114 +22,95 @@ import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from session_reader import extract_conversation, read_session
|
||||
|
||||
|
||||
def compute_hash(text: str) -> str:
|
||||
"""Content hash for deduplication."""
|
||||
return hashlib.sha256(text.encode()).hexdigest()[:16]
|
||||
|
||||
|
||||
def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5,
|
||||
def extract_pairs_from_conversation(conversation: list, session_id: str, model: str,
|
||||
min_ratio: float = 1.5,
|
||||
min_response_words: int = 20) -> list:
|
||||
"""Extract terse→rich pairs from a single session object."""
|
||||
"""Extract terse→rich pairs from a normalized conversation."""
|
||||
pairs = []
|
||||
conversations = session_data.get("conversations", [])
|
||||
session_id = session_data.get("id", "unknown")
|
||||
model = session_data.get("model", "unknown")
|
||||
|
||||
seen_hashes = set()
|
||||
|
||||
for i, msg in enumerate(conversations):
|
||||
# Look for assistant/gpt responses
|
||||
if msg.get("from") not in ("gpt", "assistant"):
|
||||
for i, msg in enumerate(conversation):
|
||||
# Look for assistant responses
|
||||
if msg.get('role') != 'assistant':
|
||||
continue
|
||||
|
||||
response_text = msg.get("value", "")
|
||||
response_text = msg.get('content', '')
|
||||
if not response_text or len(response_text.split()) < min_response_words:
|
||||
continue
|
||||
|
||||
# Find the preceding human message
|
||||
# Find the preceding user message
|
||||
prompt_text = ""
|
||||
for j in range(i - 1, -1, -1):
|
||||
if conversations[j].get("from") == "human":
|
||||
prompt_text = conversations[j].get("value", "")
|
||||
if conversation[j].get('role') == 'user':
|
||||
prompt_text = conversation[j].get('content', '')
|
||||
break
|
||||
|
||||
if not prompt_text:
|
||||
continue
|
||||
|
||||
# Filter: skip tool results, system messages embedded as human
|
||||
if prompt_text.startswith("{") and "output" in prompt_text[:100]:
|
||||
continue # likely a tool result
|
||||
if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"):
|
||||
continue # system prompt leak
|
||||
if prompt_text.startswith('{') and 'output' in prompt_text[:100]:
|
||||
continue
|
||||
if prompt_text.startswith('# SOUL.md') or prompt_text.startswith('You are'):
|
||||
continue
|
||||
|
||||
# Quality filters
|
||||
prompt_words = len(prompt_text.split())
|
||||
response_words = len(response_text.split())
|
||||
|
||||
# Must have meaningful length ratio
|
||||
if prompt_words == 0 or response_words == 0:
|
||||
continue
|
||||
ratio = response_words / prompt_words
|
||||
if ratio < min_ratio:
|
||||
continue
|
||||
|
||||
# Skip responses that are mostly code
|
||||
code_blocks = response_text.count("```")
|
||||
if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50:
|
||||
code_blocks = response_text.count('```')
|
||||
if code_blocks >= 4 and len(response_text.replace('```', '').strip()) < 50:
|
||||
continue
|
||||
|
||||
# Skip responses with tool call artifacts
|
||||
if "tool_call" in response_text[:100] or "function_call" in response_text[:100]:
|
||||
if 'tool_call' in response_text[:100] or 'function_call' in response_text[:100]:
|
||||
continue
|
||||
|
||||
# Deduplicate by content hash
|
||||
content_hash = compute_hash(prompt_text + response_text[:200])
|
||||
if content_hash in seen_hashes:
|
||||
continue
|
||||
seen_hashes.add(content_hash)
|
||||
|
||||
# Clean up response: remove markdown headers if too many
|
||||
clean_response = response_text
|
||||
|
||||
pairs.append({
|
||||
"terse": prompt_text.strip(),
|
||||
"rich": clean_response.strip(),
|
||||
"source": session_id,
|
||||
"model": model,
|
||||
"prompt_words": prompt_words,
|
||||
"response_words": response_words,
|
||||
"ratio": round(ratio, 2),
|
||||
'terse': prompt_text.strip(),
|
||||
'rich': clean_response.strip(),
|
||||
'source': session_id,
|
||||
'model': model,
|
||||
'prompt_words': prompt_words,
|
||||
'response_words': response_words,
|
||||
'ratio': round(ratio, 2),
|
||||
})
|
||||
|
||||
return pairs
|
||||
|
||||
|
||||
def extract_from_jsonl_file(filepath: str, **kwargs) -> list:
|
||||
"""Extract pairs from a session JSONL file."""
|
||||
pairs = []
|
||||
path = Path(filepath)
|
||||
|
||||
if not path.exists():
|
||||
print(f"Warning: {filepath} not found", file=sys.stderr)
|
||||
return pairs
|
||||
|
||||
content = path.read_text()
|
||||
lines = content.strip().split("\n")
|
||||
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
try:
|
||||
session = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
|
||||
session_pairs = extract_pairs_from_session(session, **kwargs)
|
||||
pairs.extend(session_pairs)
|
||||
|
||||
return pairs
|
||||
def extract_from_jsonl_file(path: str, **kwargs) -> list:
|
||||
"""Read a session file and extract training pairs using normalized conversation."""
|
||||
session_messages = read_session(path)
|
||||
if not session_messages:
|
||||
return []
|
||||
conversation = extract_conversation(session_messages)
|
||||
# Derive session_id and model from first real message metadata
|
||||
first_msg = next((m for m in session_messages if m.get('role') or m.get('from')), {})
|
||||
session_id = first_msg.get('meta_session_id', Path(path).name)
|
||||
model = first_msg.get('model', 'unknown')
|
||||
return extract_pairs_from_conversation(conversation, session_id, model, **kwargs)
|
||||
|
||||
|
||||
def deduplicate_pairs(pairs: list) -> list:
|
||||
|
||||
170
scripts/test_pr_complexity_scorer.py
Normal file
170
scripts/test_pr_complexity_scorer.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
@@ -1,72 +0,0 @@
|
||||
# Conference Talk Knowledge Extraction Prompt
|
||||
|
||||
## System Prompt
|
||||
|
||||
You are a knowledge extraction engine specialized in conference talks. You read talk transcripts and output ONLY structured JSON. You extract factual insights, patterns, tool discoveries, and warnings that are durable and actionable for the Timmy Foundation fleet.
|
||||
|
||||
## Prompt
|
||||
|
||||
```
|
||||
TASK: Extract durable knowledge from this conference talk transcript.
|
||||
|
||||
RULES:
|
||||
1. Extract ONLY information explicitly stated or strongly implied in the transcript.
|
||||
2. Do NOT hallucinate, infer unsupported details, or invent quotes.
|
||||
3. Every fact must be grounded in something the speaker actually said.
|
||||
4. Focus on **durable, reusable** knowledge — not specific project details that won't apply elsewhere.
|
||||
5. Prioritize insights that improve: workflows, tool usage, system design, governance, or operational reliability.
|
||||
|
||||
CATEGORIES (assign exactly one per item):
|
||||
- fact: Concrete, verifiable takeaway (technical detail, config, workflow)
|
||||
- pitfall: Mistake, trap, or cost of wrong approach the speaker warned about
|
||||
- pattern: Successful approach, sequence, or template worth reusing
|
||||
- tool-quirk: Unexpected behavior, gotcha, or setup detail for a specific tool/platform
|
||||
- question: Something raised but not fully answered — worth investigating further
|
||||
|
||||
CONFIDENCE:
|
||||
- 0.9–1.0: Explicitly stated by speaker with clear reasoning/evidence
|
||||
- 0.7–0.8: Clearly implied by multiple statements, speaker's expertise
|
||||
- 0.5–0.6: Suggested or hinted, but not directly confirmed
|
||||
- 0.3–0.4: Interpretive, speculative, or single-data-point observation
|
||||
|
||||
TARGET DOMAIN:
|
||||
- If talk is about a specific repo (e.g. hermes-agent, the-nexus), set `domain` to that repo name.
|
||||
- If talk is about general principles, fleet processes, or multiple repos, set `domain` to "global".
|
||||
- If talk is about an agent type (mimo, groq, claude), set `domain` to the agent name.
|
||||
- If talk is about the compounding-intelligence system itself, set `domain` to "compounding-intelligence".
|
||||
|
||||
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
|
||||
|
||||
{
|
||||
"knowledge": [
|
||||
{
|
||||
"fact": "One specific, actionable sentence of knowledge",
|
||||
"category": "fact|pitfall|pattern|tool-quirk|question",
|
||||
"domain": "global|{repo}|{agent}|compounding-intelligence",
|
||||
"confidence": 0.0-1.0,
|
||||
"tags": ["relevant", "keywords"],
|
||||
"evidence": "Brief paraphrase or quote from the transcript that supports this"
|
||||
}
|
||||
],
|
||||
"meta": {
|
||||
"talk_title": "Title of the talk (if known)",
|
||||
"speaker": "Speaker name(s)",
|
||||
"conference": "Conference name",
|
||||
"talk_url": "URL to talk/video (if available)",
|
||||
"knowledge_count": 0,
|
||||
"extraction_date": "2026-04-26"
|
||||
}
|
||||
}
|
||||
|
||||
TRANSCRIPT:
|
||||
{{transcript}}
|
||||
```
|
||||
|
||||
## Design Notes
|
||||
|
||||
- Keep `fact` field to **one clear sentence**. Avoid run-ons.
|
||||
- `evidence` should be a 1–2 sentence paraphrase, not verbatim paragraph.
|
||||
- `tags` should include: tool names, repo names, agent types, concepts mentioned
|
||||
- Focus on what the fleet can **reuse tomorrow**, not ephemeral project context
|
||||
- If the talk is high-level vision with no concrete details, that's a `question` or low-confidence `fact`
|
||||
|
||||
118
tests/test_session_pair_harvester.py
Normal file
118
tests/test_session_pair_harvester.py
Normal file
@@ -0,0 +1,118 @@
|
||||
"""
|
||||
Tests for session_pair_harvester — training pair extraction from sessions.
|
||||
"""
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
from session_pair_harvester import (
|
||||
extract_pairs_from_conversation,
|
||||
extract_from_jsonl_file,
|
||||
deduplicate_pairs,
|
||||
compute_hash,
|
||||
)
|
||||
|
||||
|
||||
class TestSessionPairHarvester(unittest.TestCase):
|
||||
def test_compute_hash_consistent(self):
|
||||
h1 = compute_hash("hello world")
|
||||
h2 = compute_hash("hello world")
|
||||
self.assertEqual(h1, h2)
|
||||
self.assertEqual(len(h1), 16)
|
||||
|
||||
def test_extract_simple_qa_pair(self):
|
||||
"""A simple user→assistant exchange produces one pair."""
|
||||
conversation = [
|
||||
{"role": "user", "content": "What is the capital of France?"},
|
||||
{"role": "assistant", "content": "The capital of France is Paris. It is a major European city renowned for its art, fashion, gastronomy, cultural heritage, and historical significance. The city attracts millions of tourists annually."},
|
||||
]
|
||||
pairs = extract_pairs_from_conversation(conversation, "test_session", "test-model")
|
||||
self.assertEqual(len(pairs), 1)
|
||||
self.assertEqual(pairs[0]["terse"], "What is the capital of France?")
|
||||
self.assertIn("Paris", pairs[0]["rich"])
|
||||
self.assertEqual(pairs[0]["source"], "test_session")
|
||||
|
||||
def test_min_ratio_filter(self):
|
||||
"""Very short responses are filtered out."""
|
||||
conversation = [
|
||||
{"role": "user", "content": "Yes"},
|
||||
{"role": "assistant", "content": "No."},
|
||||
]
|
||||
# Default min_ratio = 1.5, min_words = 20 for response
|
||||
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
|
||||
self.assertEqual(len(pairs), 0)
|
||||
|
||||
def test_min_words_filter(self):
|
||||
"""Assistant responses below min word count are skipped."""
|
||||
conversation = [
|
||||
{"role": "user", "content": "Explain the project architecture in detail"},
|
||||
{"role": "assistant", "content": "OK."},
|
||||
]
|
||||
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=5)
|
||||
self.assertEqual(len(pairs), 0)
|
||||
|
||||
def test_skip_non_assistant_messages(self):
|
||||
"""System and tool messages are ignored."""
|
||||
conversation = [
|
||||
{"role": "system", "content": "You are a helpful assistant."},
|
||||
{"role": "user", "content": "Hello"},
|
||||
{"role": "assistant", "content": "Hi there! How can I help you today?"},
|
||||
]
|
||||
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_response_words=3)
|
||||
self.assertEqual(len(pairs), 1)
|
||||
self.assertEqual(pairs[0]["terse"], "Hello")
|
||||
|
||||
def test_multiple_pairs_from_one_session(self):
|
||||
"""A conversation with several Q&A turns yields multiple pairs."""
|
||||
conversation = [
|
||||
{"role": "user", "content": "First question?"},
|
||||
{"role": "assistant", "content": "Here is a detailed and comprehensive answer that thoroughly explores multiple aspects of the subject. It provides background context and practical implications for the reader."},
|
||||
{"role": "user", "content": "Second?"},
|
||||
{"role": "assistant", "content": "Another comprehensive response with detailed examples. This includes practical code blocks and thorough explanations to ensure deep understanding of the topic at hand."},
|
||||
]
|
||||
pairs = extract_pairs_from_conversation(conversation, "s", "m", min_ratio=1.0)
|
||||
self.assertEqual(len(pairs), 2)
|
||||
|
||||
def test_deduplication_removes_duplicates(self):
|
||||
"""Identical pairs across sessions are deduplicated."""
|
||||
pairs = [
|
||||
{"terse": "q1", "rich": "a1", "source": "s1", "model": "m"},
|
||||
{"terse": "q1", "rich": "a1", "source": "s2", "model": "m"},
|
||||
{"terse": "q2", "rich": "a2", "source": "s1", "model": "m"},
|
||||
]
|
||||
unique = deduplicate_pairs(pairs)
|
||||
self.assertEqual(len(unique), 2)
|
||||
sources = {p["source"] for p in unique}
|
||||
# First unique pair can be from either s1 or s2
|
||||
self.assertIn("s1", sources)
|
||||
|
||||
def test_integration_with_test_sessions(self):
|
||||
"""Harvester finds pairs in real test session files."""
|
||||
repo_root = Path(__file__).parent.parent
|
||||
test_sessions_dir = repo_root / "test_sessions"
|
||||
if not test_sessions_dir.exists():
|
||||
self.skipTest("test_sessions not found")
|
||||
|
||||
pairs = []
|
||||
for jsonl_file in sorted(test_sessions_dir.glob("*.jsonl")):
|
||||
pairs.extend(extract_from_jsonl_file(str(jsonl_file)))
|
||||
|
||||
self.assertGreater(len(pairs), 0, "Should extract at least one pair from test_sessions")
|
||||
for p in pairs:
|
||||
self.assertIn("terse", p)
|
||||
self.assertIn("rich", p)
|
||||
self.assertIn("source", p)
|
||||
self.assertIn("model", p)
|
||||
# Verify content exists
|
||||
self.assertGreater(len(p["terse"]), 0)
|
||||
self.assertGreater(len(p["rich"]), 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user