Compare commits
2 Commits
step35/148
...
step35/199
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86eb1c9a50 | ||
|
|
4b5a675355 |
255
scripts/knowledge_to_training_pairs.py
Normal file
255
scripts/knowledge_to_training_pairs.py
Normal file
@@ -0,0 +1,255 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
knowledge_to_training_pairs.py — Convert quality-gated knowledge entries into training pairs.
|
||||
|
||||
Reads knowledge/index.json (or a custom JSONL of entries), applies quality filters,
|
||||
and emits terse→rich training pairs in JSONL format for model fine-tuning.
|
||||
|
||||
Usage:
|
||||
python3 scripts/knowledge_to_training_pairs.py \
|
||||
--input knowledge/index.json \
|
||||
--output training_pairs.jsonl \
|
||||
--min-confidence 0.7 \
|
||||
--model-filter claude-sonnet,gpt-4 \
|
||||
--after 2026-01-01
|
||||
|
||||
Input entry format (from index.json facts):
|
||||
{
|
||||
"id": "hermes-agent:pitfall:001",
|
||||
"fact": "deploy-crons.py leaves jobs in mixed model format",
|
||||
"category": "pitfall",
|
||||
"domain": "hermes-agent",
|
||||
"confidence": 0.95,
|
||||
...
|
||||
}
|
||||
|
||||
Output training pair format:
|
||||
{
|
||||
"terse": "How do I handle deploy-crons.py mixed model format?",
|
||||
"rich": "deploy-crons.py leaves jobs in mixed model format.",
|
||||
"domain": "hermes-agent",
|
||||
"source_confidence": 0.95,
|
||||
"source_model": "unknown"
|
||||
}
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def fact_to_terse(fact: str, category: str, domain: str) -> str:
|
||||
"""
|
||||
Derive a short user query from a knowledge fact.
|
||||
|
||||
Strategy:
|
||||
- Pitfalls → "How do I avoid/handle/fix <fact excerpt>?"
|
||||
- Patterns → "What's the recommended way to <pattern core>?"
|
||||
- Tool quirks → "How does <tool> behave in <context>?"
|
||||
- Facts → "What should I know about <fact excerpt>?"
|
||||
- Questions → "What is the answer to: <fact>?"
|
||||
"""
|
||||
fact_lower = fact.lower()
|
||||
# Extract a concise excerpt (first sentence or 80 chars)
|
||||
excerpt = fact.split('. ')[0] if '. ' in fact else fact[:80]
|
||||
|
||||
if category == "pitfall":
|
||||
verbs = ["avoid", "handle", "fix", "prevent"]
|
||||
# pick verb based on fact wording
|
||||
if "trigger" in fact_lower or "cause" in fact_lower:
|
||||
verb = "avoid"
|
||||
elif "broken" in fact_lower or "fails" in fact_lower:
|
||||
verb = "fix"
|
||||
else:
|
||||
verb = "handle"
|
||||
return f"How do I {verb} {excerpt.rstrip('.')}?"
|
||||
elif category == "pattern":
|
||||
return f"What's the recommended way to {excerpt.rstrip('.')}?"
|
||||
elif category == "tool-quirk":
|
||||
# Try to extract tool name
|
||||
tool = fact.split()[0] if fact.split() else domain
|
||||
return f"How does {tool} behave in this context?"
|
||||
elif category == "question":
|
||||
return f"What is the answer to: {excerpt}?"
|
||||
else: # fact or unknown
|
||||
return f"What should I know about {excerpt.rstrip('.')}?"
|
||||
|
||||
|
||||
def parse_date(date_str: Optional[str]) -> Optional[datetime]:
|
||||
"""Parse ISO date string to datetime, or return None."""
|
||||
if not date_str:
|
||||
return None
|
||||
try:
|
||||
return datetime.fromisoformat(date_str.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
def load_knowledge_index(path: str) -> list[dict]:
|
||||
"""Load knowledge facts from index.json (or plain JSONL of entries)."""
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
print(f"ERROR: Knowledge input not found: {path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
with open(p) as f:
|
||||
data = json.load(f)
|
||||
|
||||
# index.json format: {"facts": [...], ...}
|
||||
if isinstance(data, dict) and "facts" in data:
|
||||
return data["facts"]
|
||||
# JSONL format: one entry per line
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
# Plain file with JSON array
|
||||
print(f"ERROR: Unrecognized input format in {path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def filter_entries(entries: list[dict],
|
||||
min_confidence: float = 0.0,
|
||||
model_filter: Optional[list[str]] = None,
|
||||
after: Optional[datetime] = None,
|
||||
before: Optional[datetime] = None) -> list[dict]:
|
||||
"""Apply quality and provenance filters."""
|
||||
filtered = []
|
||||
for entry in entries:
|
||||
# Confidence filter (entry confidence)
|
||||
conf = entry.get("confidence", 0.0)
|
||||
if conf < min_confidence:
|
||||
continue
|
||||
|
||||
# Model filter: if specified, entry's model must be in the list
|
||||
if model_filter:
|
||||
entry_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown"))
|
||||
if entry_model not in model_filter:
|
||||
continue
|
||||
|
||||
# Date filter: use last_confirmed or first_seen or harvested_at
|
||||
entry_date = None
|
||||
for field in ("last_confirmed", "first_seen", "harvested_at"):
|
||||
if field in entry:
|
||||
entry_date = parse_date(entry[field])
|
||||
if entry_date:
|
||||
break
|
||||
if after and entry_date and entry_date < after:
|
||||
continue
|
||||
if before and entry_date and entry_date > before:
|
||||
continue
|
||||
|
||||
filtered.append(entry)
|
||||
return filtered
|
||||
|
||||
|
||||
def entry_to_pair(entry: dict) -> dict:
|
||||
"""Convert a knowledge entry into a training pair."""
|
||||
fact = entry.get("fact", "").strip()
|
||||
if not fact:
|
||||
return None
|
||||
|
||||
category = entry.get("category", "fact")
|
||||
domain = entry.get("domain", "global")
|
||||
|
||||
terse = fact_to_terse(fact, category, domain)
|
||||
rich = fact
|
||||
source_confidence = round(entry.get("confidence", 0.0), 4)
|
||||
source_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown"))
|
||||
|
||||
return {
|
||||
"terse": terse,
|
||||
"rich": rich,
|
||||
"domain": domain,
|
||||
"source_confidence": source_confidence,
|
||||
"source_model": source_model,
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Knowledge entries → training pairs")
|
||||
parser.add_argument("--input", "-i", default="knowledge/index.json",
|
||||
help="Input knowledge index or JSONL (default: knowledge/index.json)")
|
||||
parser.add_argument("--output", "-o", default="training_pairs.jsonl",
|
||||
help="Output JSONL file")
|
||||
parser.add_argument("--min-confidence", type=float, default=0.5,
|
||||
help="Minimum entry confidence to include (0.0-1.0, default: 0.5)")
|
||||
parser.add_argument("--model-filter",
|
||||
help="Comma-separated list of source models to include")
|
||||
parser.add_argument("--after",
|
||||
help="Include entries last_confirmed/first_seen on or after this date (YYYY-MM-DD)")
|
||||
parser.add_argument("--before",
|
||||
help="Include entries last_confirmed/first_seen on or before this date (YYYY-MM-DD)")
|
||||
parser.add_argument("--dry-run", action="store_true",
|
||||
help="Print sample pairs and stats without writing")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Load
|
||||
entries = load_knowledge_index(args.input)
|
||||
print(f"Loaded {len(entries)} entries from {args.input}", file=sys.stderr)
|
||||
|
||||
# Parse filters
|
||||
model_list = args.model_filter.split(",") if args.model_filter else None
|
||||
after_dt = parse_date(args.after) if args.after else None
|
||||
before_dt = parse_date(args.before) if args.before else None
|
||||
|
||||
# Filter
|
||||
kept = filter_entries(
|
||||
entries,
|
||||
min_confidence=args.min_confidence,
|
||||
model_filter=model_list,
|
||||
after=after_dt,
|
||||
before=before_dt,
|
||||
)
|
||||
print(f"After filtering: {len(kept)} / {len(entries)} entries", file=sys.stderr)
|
||||
|
||||
# Convert
|
||||
pairs = []
|
||||
for entry in kept:
|
||||
pair = entry_to_pair(entry)
|
||||
if pair:
|
||||
pairs.append(pair)
|
||||
|
||||
# Stats
|
||||
if pairs:
|
||||
avg_conf = sum(p["source_confidence"] for p in pairs) / len(pairs)
|
||||
domains = {}
|
||||
models = {}
|
||||
for p in pairs:
|
||||
domains[p["domain"]] = domains.get(p["domain"], 0) + 1
|
||||
models[p["source_model"]] = models.get(p["source_model"], 0) + 1
|
||||
else:
|
||||
avg_conf = 0.0
|
||||
domains = {}
|
||||
models = {}
|
||||
|
||||
stats = {
|
||||
"input_entries": len(entries),
|
||||
"after_filter": len(kept),
|
||||
"pairs_generated": len(pairs),
|
||||
"avg_confidence": round(avg_conf, 4),
|
||||
"domains": domains,
|
||||
"source_models": models,
|
||||
}
|
||||
print(json.dumps(stats, indent=2), file=sys.stderr)
|
||||
|
||||
if args.dry_run:
|
||||
print("\nSample pairs:", file=sys.stderr)
|
||||
for p in pairs[:3]:
|
||||
print(json.dumps(p, ensure_ascii=False), file=sys.stderr)
|
||||
return
|
||||
|
||||
# Write JSONL
|
||||
out_path = Path(args.output)
|
||||
out_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(out_path, "w", encoding="utf-8") as f:
|
||||
for pair in pairs:
|
||||
f.write(json.dumps(pair, ensure_ascii=False) + "\n")
|
||||
|
||||
print(f"\nWrote {len(pairs)} training pairs to {out_path}", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
351
scripts/pr_complexity_scorer.py
Normal file
351
scripts/pr_complexity_scorer.py
Normal file
@@ -0,0 +1,351 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PR Complexity Scorer - Estimate review effort for PRs.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
GITEA_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
||||
|
||||
DEPENDENCY_FILES = {
|
||||
"requirements.txt", "pyproject.toml", "setup.py", "setup.cfg",
|
||||
"Pipfile", "poetry.lock", "package.json", "yarn.lock", "Gemfile",
|
||||
"go.mod", "Cargo.toml", "pom.xml", "build.gradle"
|
||||
}
|
||||
|
||||
TEST_PATTERNS = [
|
||||
r"tests?/.*\.py$", r".*_test\.py$", r"test_.*\.py$",
|
||||
r"spec/.*\.rb$", r".*_spec\.rb$",
|
||||
r"__tests__/", r".*\.test\.(js|ts|jsx|tsx)$"
|
||||
]
|
||||
|
||||
WEIGHT_FILES = 0.25
|
||||
WEIGHT_LINES = 0.25
|
||||
WEIGHT_DEPS = 0.30
|
||||
WEIGHT_TEST_COV = 0.20
|
||||
|
||||
SMALL_FILES = 5
|
||||
MEDIUM_FILES = 20
|
||||
LARGE_FILES = 50
|
||||
|
||||
SMALL_LINES = 100
|
||||
MEDIUM_LINES = 500
|
||||
LARGE_LINES = 2000
|
||||
|
||||
TIME_PER_POINT = {1: 5, 2: 10, 3: 15, 4: 20, 5: 25, 6: 30, 7: 45, 8: 60, 9: 90, 10: 120}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PRComplexity:
|
||||
pr_number: int
|
||||
title: str
|
||||
files_changed: int
|
||||
additions: int
|
||||
deletions: int
|
||||
has_dependency_changes: bool
|
||||
test_coverage_delta: Optional[int]
|
||||
score: int
|
||||
estimated_minutes: int
|
||||
reasons: List[str]
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return asdict(self)
|
||||
|
||||
|
||||
class GiteaClient:
|
||||
def __init__(self, token: str):
|
||||
self.token = token
|
||||
self.base_url = GITEA_BASE.rstrip("/")
|
||||
|
||||
def _request(self, path: str, params: Dict = None) -> Any:
|
||||
url = f"{self.base_url}{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items() if v is not None)
|
||||
url += f"?{qs}"
|
||||
|
||||
req = urllib.request.Request(url)
|
||||
req.add_header("Authorization", f"token {self.token}")
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
except urllib.error.HTTPError as e:
|
||||
print(f"API error {e.code}: {e.read().decode()[:200]}", file=sys.stderr)
|
||||
return None
|
||||
except urllib.error.URLError as e:
|
||||
print(f"Network error: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
def get_open_prs(self, org: str, repo: str) -> List[Dict]:
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(f"/repos/{org}/{repo}/pulls", {"limit": 50, "page": page, "state": "open"})
|
||||
if not batch:
|
||||
break
|
||||
prs.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
def get_pr_files(self, org: str, repo: str, pr_number: int) -> List[Dict]:
|
||||
files = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = self._request(
|
||||
f"/repos/{org}/{repo}/pulls/{pr_number}/files",
|
||||
{"limit": 100, "page": page}
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
files.extend(batch)
|
||||
if len(batch) < 100:
|
||||
break
|
||||
page += 1
|
||||
return files
|
||||
|
||||
def post_comment(self, org: str, repo: str, pr_number: int, body: str) -> bool:
|
||||
data = json.dumps({"body": body}).encode("utf-8")
|
||||
req = urllib.request.Request(
|
||||
f"{self.base_url}/repos/{org}/{repo}/issues/{pr_number}/comments",
|
||||
data=data,
|
||||
method="POST",
|
||||
headers={"Authorization": f"token {self.token}", "Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return resp.status in (200, 201)
|
||||
except urllib.error.HTTPError:
|
||||
return False
|
||||
|
||||
|
||||
def is_dependency_file(filename: str) -> bool:
|
||||
return any(filename.endswith(dep) for dep in DEPENDENCY_FILES)
|
||||
|
||||
|
||||
def is_test_file(filename: str) -> bool:
|
||||
return any(re.search(pattern, filename) for pattern in TEST_PATTERNS)
|
||||
|
||||
|
||||
def score_pr(
|
||||
files_changed: int,
|
||||
additions: int,
|
||||
deletions: int,
|
||||
has_dependency_changes: bool,
|
||||
test_coverage_delta: Optional[int] = None
|
||||
) -> tuple[int, int, List[str]]:
|
||||
score = 1.0
|
||||
reasons = []
|
||||
|
||||
# Files changed
|
||||
if files_changed <= SMALL_FILES:
|
||||
fscore = 1.0
|
||||
reasons.append("small number of files changed")
|
||||
elif files_changed <= MEDIUM_FILES:
|
||||
fscore = 2.0
|
||||
reasons.append("moderate number of files changed")
|
||||
elif files_changed <= LARGE_FILES:
|
||||
fscore = 2.5
|
||||
reasons.append("large number of files changed")
|
||||
else:
|
||||
fscore = 3.0
|
||||
reasons.append("very large PR spanning many files")
|
||||
|
||||
# Lines changed
|
||||
total_lines = additions + deletions
|
||||
if total_lines <= SMALL_LINES:
|
||||
lscore = 1.0
|
||||
reasons.append("small change size")
|
||||
elif total_lines <= MEDIUM_LINES:
|
||||
lscore = 2.0
|
||||
reasons.append("moderate change size")
|
||||
elif total_lines <= LARGE_LINES:
|
||||
lscore = 3.0
|
||||
reasons.append("large change size")
|
||||
else:
|
||||
lscore = 4.0
|
||||
reasons.append("very large change")
|
||||
|
||||
# Dependency changes
|
||||
if has_dependency_changes:
|
||||
dscore = 2.5
|
||||
reasons.append("dependency changes (architectural impact)")
|
||||
else:
|
||||
dscore = 0.0
|
||||
|
||||
# Test coverage delta
|
||||
tscore = 0.0
|
||||
if test_coverage_delta is not None:
|
||||
if test_coverage_delta > 0:
|
||||
reasons.append(f"test additions (+{test_coverage_delta} test files)")
|
||||
tscore = -min(2.0, test_coverage_delta / 2.0)
|
||||
elif test_coverage_delta < 0:
|
||||
reasons.append(f"test removals ({abs(test_coverage_delta)} test files)")
|
||||
tscore = min(2.0, abs(test_coverage_delta) * 0.5)
|
||||
else:
|
||||
reasons.append("test coverage change not assessed")
|
||||
|
||||
# Weighted sum, scaled by 3 to use full 1-10 range
|
||||
bonus = (fscore * WEIGHT_FILES) + (lscore * WEIGHT_LINES) + (dscore * WEIGHT_DEPS) + (tscore * WEIGHT_TEST_COV)
|
||||
scaled_bonus = bonus * 3.0
|
||||
score = 1.0 + scaled_bonus
|
||||
|
||||
final_score = max(1, min(10, int(round(score))))
|
||||
est_minutes = TIME_PER_POINT.get(final_score, 30)
|
||||
|
||||
return final_score, est_minutes, reasons
|
||||
|
||||
|
||||
def analyze_pr(client: GiteaClient, org: str, repo: str, pr_data: Dict) -> PRComplexity:
|
||||
pr_num = pr_data["number"]
|
||||
title = pr_data.get("title", "")
|
||||
files = client.get_pr_files(org, repo, pr_num)
|
||||
|
||||
additions = sum(f.get("additions", 0) for f in files)
|
||||
deletions = sum(f.get("deletions", 0) for f in files)
|
||||
filenames = [f.get("filename", "") for f in files]
|
||||
|
||||
has_deps = any(is_dependency_file(f) for f in filenames)
|
||||
|
||||
test_added = sum(1 for f in files if f.get("status") == "added" and is_test_file(f.get("filename", "")))
|
||||
test_removed = sum(1 for f in files if f.get("status") == "removed" and is_test_file(f.get("filename", "")))
|
||||
test_delta = test_added - test_removed if (test_added or test_removed) else None
|
||||
|
||||
score, est_min, reasons = score_pr(
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta
|
||||
)
|
||||
|
||||
return PRComplexity(
|
||||
pr_number=pr_num,
|
||||
title=title,
|
||||
files_changed=len(files),
|
||||
additions=additions,
|
||||
deletions=deletions,
|
||||
has_dependency_changes=has_deps,
|
||||
test_coverage_delta=test_delta,
|
||||
score=score,
|
||||
estimated_minutes=est_min,
|
||||
reasons=reasons
|
||||
)
|
||||
|
||||
|
||||
def build_comment(complexity: PRComplexity) -> str:
|
||||
change_desc = f"{complexity.files_changed} files, +{complexity.additions}/-{complexity.deletions} lines"
|
||||
deps_note = "\n- :warning: Dependency changes detected — architectural review recommended" if complexity.has_dependency_changes else ""
|
||||
test_note = ""
|
||||
if complexity.test_coverage_delta is not None:
|
||||
if complexity.test_coverage_delta > 0:
|
||||
test_note = f"\n- :+1: {complexity.test_coverage_delta} test file(s) added"
|
||||
elif complexity.test_coverage_delta < 0:
|
||||
test_note = f"\n- :warning: {abs(complexity.test_coverage_delta)} test file(s) removed"
|
||||
|
||||
comment = f"## 📊 PR Complexity Analysis\n\n"
|
||||
comment += f"**PR #{complexity.pr_number}: {complexity.title}**\n\n"
|
||||
comment += f"| Metric | Value |\n|--------|-------|\n"
|
||||
comment += f"| Changes | {change_desc} |\n"
|
||||
comment += f"| Complexity Score | **{complexity.score}/10** |\n"
|
||||
comment += f"| Estimated Review Time | ~{complexity.estimated_minutes} minutes |\n\n"
|
||||
comment += f"### Scoring rationale:"
|
||||
for r in complexity.reasons:
|
||||
comment += f"\n- {r}"
|
||||
if deps_note:
|
||||
comment += deps_note
|
||||
if test_note:
|
||||
comment += test_note
|
||||
comment += f"\n\n---\n"
|
||||
comment += f"*Generated by PR Complexity Scorer — [issue #135](https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence/issues/135)*"
|
||||
return comment
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR Complexity Scorer")
|
||||
parser.add_argument("--org", default="Timmy_Foundation")
|
||||
parser.add_argument("--repo", default="compounding-intelligence")
|
||||
parser.add_argument("--token", default=os.environ.get("GITEA_TOKEN") or os.path.expanduser("~/.config/gitea/token"))
|
||||
parser.add_argument("--dry-run", action="store_true")
|
||||
parser.add_argument("--apply", action="store_true")
|
||||
parser.add_argument("--output", default="metrics/pr_complexity.json")
|
||||
args = parser.parse_args()
|
||||
|
||||
token_path = args.token
|
||||
if os.path.exists(token_path):
|
||||
with open(token_path) as f:
|
||||
token = f.read().strip()
|
||||
else:
|
||||
token = args.token
|
||||
|
||||
if not token:
|
||||
print("ERROR: No Gitea token provided", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
client = GiteaClient(token)
|
||||
|
||||
print(f"Fetching open PRs for {args.org}/{args.repo}...")
|
||||
prs = client.get_open_prs(args.org, args.repo)
|
||||
if not prs:
|
||||
print("No open PRs found.")
|
||||
sys.exit(0)
|
||||
|
||||
print(f"Found {len(prs)} open PR(s). Analyzing...")
|
||||
|
||||
results = []
|
||||
Path(args.output).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for pr in prs:
|
||||
pr_num = pr["number"]
|
||||
title = pr.get("title", "")
|
||||
print(f" Analyzing PR #{pr_num}: {title[:60]}")
|
||||
|
||||
try:
|
||||
complexity = analyze_pr(client, args.org, args.repo, pr)
|
||||
results.append(complexity.to_dict())
|
||||
|
||||
comment = build_comment(complexity)
|
||||
|
||||
if args.dry_run:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [DRY-RUN]")
|
||||
elif args.apply:
|
||||
success = client.post_comment(args.org, args.repo, pr_num, comment)
|
||||
status = "[commented]" if success else "[FAILED]"
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min {status}")
|
||||
else:
|
||||
print(f" → Score: {complexity.score}/10, Est: {complexity.estimated_minutes}min [no action]")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ERROR analyzing PR #{pr_num}: {e}", file=sys.stderr)
|
||||
|
||||
with open(args.output, "w") as f:
|
||||
json.dump({
|
||||
"org": args.org,
|
||||
"repo": args.repo,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"pr_count": len(results),
|
||||
"results": results
|
||||
}, f, indent=2)
|
||||
|
||||
if results:
|
||||
scores = [r["score"] for r in results]
|
||||
print(f"\nResults saved to {args.output}")
|
||||
print(f"Summary: {len(results)} PRs, scores range {min(scores):.0f}-{max(scores):.0f}")
|
||||
else:
|
||||
print("\nNo results to save.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,468 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
session_knowledge_extractor.py — Extract session-level entities and relationships from Hermes transcripts.
|
||||
|
||||
Creates knowledge facts about: which agent handled the session, what task was solved,
|
||||
which tools were used and why, and the outcome. Target: 10+ facts per session.
|
||||
|
||||
Usage:
|
||||
python3 session_knowledge_extractor.py --session session.jsonl --output knowledge/
|
||||
python3 session_knowledge_extractor.py --batch --sessions-dir ~/.hermes/sessions/ --limit 10
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import hashlib
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
|
||||
|
||||
# --- Configuration ---
|
||||
DEFAULT_API_BASE = os.environ.get(
|
||||
"EXTRACTOR_API_BASE",
|
||||
os.environ.get("HARVESTER_API_BASE", "https://api.nousresearch.com/v1")
|
||||
)
|
||||
DEFAULT_API_KEY = os.environ.get(
|
||||
"EXTRACTOR_API_KEY",
|
||||
os.environ.get("HARVESTER_API_KEY", "")
|
||||
)
|
||||
DEFAULT_MODEL = os.environ.get(
|
||||
"EXTRACTOR_MODEL",
|
||||
os.environ.get("HARVESTER_MODEL", "xiaomi/mimo-v2-pro")
|
||||
)
|
||||
KNOWLEDGE_DIR = os.environ.get("EXTRACTOR_KNOWLEDGE_DIR", "knowledge")
|
||||
PROMPT_PATH = os.environ.get(
|
||||
"EXTRACTOR_PROMPT_PATH",
|
||||
str(SCRIPT_DIR.parent / "templates" / "session-entity-prompt.md")
|
||||
)
|
||||
|
||||
API_KEY_PATHS = [
|
||||
os.path.expanduser("~/.config/nous/key"),
|
||||
os.path.expanduser("~/.hermes/keymaxxing/active/minimax.key"),
|
||||
os.path.expanduser("~/.config/openrouter/key"),
|
||||
os.path.expanduser("~/.config/gitea/token"), # fallback
|
||||
]
|
||||
|
||||
|
||||
def find_api_key() -> str:
|
||||
for path in API_KEY_PATHS:
|
||||
if os.path.exists(path):
|
||||
with open(path) as f:
|
||||
key = f.read().strip()
|
||||
if key:
|
||||
return key
|
||||
return ""
|
||||
|
||||
|
||||
def load_extraction_prompt() -> str:
|
||||
path = Path(PROMPT_PATH)
|
||||
if not path.exists():
|
||||
print(f"ERROR: Extraction prompt not found at {path}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return path.read_text(encoding='utf-8')
|
||||
|
||||
|
||||
def call_llm(prompt: str, transcript: str, api_base: str, api_key: str, model: str) -> Optional[List[dict]]:
|
||||
"""Call LLM to extract session entity knowledge."""
|
||||
import urllib.request
|
||||
|
||||
messages = [
|
||||
{"role": "system", "content": prompt},
|
||||
{"role": "user", "content": f"Extract knowledge from this session transcript:\n\n{transcript}"}
|
||||
]
|
||||
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"messages": messages,
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 4096
|
||||
}).encode('utf-8')
|
||||
|
||||
req = urllib.request.Request(
|
||||
f"{api_base}/chat/completions",
|
||||
data=payload,
|
||||
headers={
|
||||
"Authorization": f"Bearer {api_key}",
|
||||
"Content-Type": "application/json"
|
||||
},
|
||||
method="POST"
|
||||
)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
result = json.loads(resp.read().decode('utf-8'))
|
||||
content = result["choices"][0]["message"]["content"]
|
||||
return parse_extraction_response(content)
|
||||
except Exception as e:
|
||||
print(f"ERROR: LLM API call failed: {e}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def parse_extraction_response(content: str) -> Optional[List[dict]]:
|
||||
"""Parse LLM response; handles JSON or markdown-wrapped JSON."""
|
||||
try:
|
||||
data = json.loads(content)
|
||||
if isinstance(data, dict) and 'knowledge' in data:
|
||||
return data['knowledge']
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
import re
|
||||
json_match = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', content, re.DOTALL)
|
||||
if json_match:
|
||||
try:
|
||||
data = json.loads(json_match.group(1))
|
||||
if isinstance(data, dict) and 'knowledge' in data:
|
||||
return data['knowledge']
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
json_match = re.search(r'(\{[^{}]*"knowledge"[^{}]*\[.*?\])', content, re.DOTALL)
|
||||
if json_match:
|
||||
try:
|
||||
data = json.loads(json_match.group(1))
|
||||
return data.get('knowledge', [])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
|
||||
print(f"WARNING: Could not parse LLM response as JSON", file=sys.stderr)
|
||||
print(f"Response preview: {content[:500]}", file=sys.stderr)
|
||||
return None
|
||||
|
||||
|
||||
def load_existing_knowledge(knowledge_dir: str) -> dict:
|
||||
index_path = Path(knowledge_dir) / "index.json"
|
||||
if not index_path.exists():
|
||||
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
|
||||
try:
|
||||
with open(index_path, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
except (json.JSONDecodeError, IOError) as e:
|
||||
print(f"WARNING: Could not load knowledge index: {e}", file=sys.stderr)
|
||||
return {"version": 1, "last_updated": "", "total_facts": 0, "facts": []}
|
||||
|
||||
|
||||
def fact_fingerprint(fact: dict) -> str:
|
||||
text = fact.get('fact', '').lower().strip()
|
||||
text = ' '.join(text.split())
|
||||
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
||||
|
||||
|
||||
def deduplicate(new_facts: List[dict], existing: List[dict], similarity_threshold: float = 0.8) -> List[dict]:
|
||||
existing_fingerprints = set()
|
||||
existing_texts = []
|
||||
for f in existing:
|
||||
fp = fact_fingerprint(f)
|
||||
existing_fingerprints.add(fp)
|
||||
existing_texts.append(f.get('fact', '').lower().strip())
|
||||
|
||||
unique = []
|
||||
for fact in new_facts:
|
||||
fp = fact_fingerprint(fact)
|
||||
if fp in existing_fingerprints:
|
||||
continue
|
||||
|
||||
fact_words = set(fact.get('fact', '').lower().split())
|
||||
is_dup = False
|
||||
for existing_text in existing_texts:
|
||||
existing_words = set(existing_text.split())
|
||||
if not fact_words or not existing_words:
|
||||
continue
|
||||
overlap = len(fact_words & existing_words) / max(len(fact_words | existing_words), 1)
|
||||
if overlap >= similarity_threshold:
|
||||
is_dup = True
|
||||
break
|
||||
|
||||
if not is_dup:
|
||||
unique.append(fact)
|
||||
existing_fingerprints.add(fp)
|
||||
existing_texts.append(fact.get('fact', '').lower().strip())
|
||||
|
||||
return unique
|
||||
|
||||
|
||||
def validate_fact(fact: dict) -> bool:
|
||||
required = ['fact', 'category', 'repo', 'confidence']
|
||||
for field in required:
|
||||
if field not in fact:
|
||||
return False
|
||||
if not isinstance(fact['fact'], str) or not fact['fact'].strip():
|
||||
return False
|
||||
valid_categories = ['fact', 'pitfall', 'pattern', 'tool-quirk', 'question']
|
||||
if fact['category'] not in valid_categories:
|
||||
return False
|
||||
if not isinstance(fact.get('confidence', 0), (int, float)):
|
||||
return False
|
||||
if not (0.0 <= fact['confidence'] <= 1.0):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def write_knowledge(index: dict, new_facts: List[dict], knowledge_dir: str, source_session: str = ""):
|
||||
kdir = Path(knowledge_dir)
|
||||
kdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
for fact in new_facts:
|
||||
fact['source_session'] = source_session
|
||||
fact['harvested_at'] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
index['facts'].extend(new_facts)
|
||||
index['total_facts'] = len(index['facts'])
|
||||
index['last_updated'] = datetime.now(timezone.utc).isoformat()
|
||||
|
||||
index_path = kdir / "index.json"
|
||||
with open(index_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(index, f, indent=2, ensure_ascii=False)
|
||||
|
||||
repos = {}
|
||||
for fact in new_facts:
|
||||
repo = fact.get('repo', 'global')
|
||||
repos.setdefault(repo, []).append(fact)
|
||||
|
||||
for repo, facts in repos.items():
|
||||
if repo == 'global':
|
||||
md_path = kdir / "global" / "sessions.md"
|
||||
else:
|
||||
md_path = kdir / "repos" / f"{repo}.md"
|
||||
|
||||
md_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
mode = 'a' if md_path.exists() else 'w'
|
||||
with open(md_path, mode, encoding='utf-8') as f:
|
||||
if mode == 'w':
|
||||
f.write(f"# Session Knowledge: {repo}\n\n")
|
||||
f.write(f"## Session {Path(source_session).stem} — {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M')}\n\n")
|
||||
for fact in facts:
|
||||
icon = {'fact': '📋', 'pitfall': '⚠️', 'pattern': '🔄', 'tool-quirk': '🔧', 'question': '❓'}.get(fact['category'], '•')
|
||||
f.write(f"- {icon} **{fact['category']}** (conf: {fact['confidence']:.1f}): {fact['fact']}\n")
|
||||
f.write("\n")
|
||||
|
||||
|
||||
def extract_session_id(messages: List[dict]) -> str:
|
||||
"""Derive a stable session ID from messages or return 'unknown'."""
|
||||
# Try to find session_id in the first message or use filename from source
|
||||
for msg in messages[:3]:
|
||||
if msg.get('session_id'):
|
||||
return msg['session_id'][:32]
|
||||
# Fallback: hash first few messages
|
||||
content = str(messages[:3])
|
||||
return hashlib.md5(content.encode()).hexdigest()[:12]
|
||||
|
||||
|
||||
def extract_agent(messages: List[dict]) -> Optional[str]:
|
||||
"""Extract the agent/model name from assistant messages."""
|
||||
for msg in messages:
|
||||
if msg.get('role') == 'assistant' and msg.get('model'):
|
||||
return msg['model']
|
||||
return None
|
||||
|
||||
|
||||
def extract_tasks(messages: List[dict]) -> List[str]:
|
||||
"""Extract the task/goal from the first user message."""
|
||||
tasks = []
|
||||
for msg in messages:
|
||||
if msg.get('role') == 'user' and msg.get('content'):
|
||||
content = msg['content']
|
||||
if isinstance(content, str) and len(content.strip()) < 500:
|
||||
tasks.append(content.strip())
|
||||
break # First user message is usually the task
|
||||
return tasks
|
||||
|
||||
|
||||
def extract_tools(messages: List[dict]) -> List[str]:
|
||||
"""Extract tool names used in the session."""
|
||||
tools = set()
|
||||
for msg in messages:
|
||||
if msg.get('tool_calls'):
|
||||
for tc in msg['tool_calls']:
|
||||
func = tc.get('function', {})
|
||||
name = func.get('name', '')
|
||||
if name:
|
||||
tools.add(name)
|
||||
return list(tools)
|
||||
|
||||
|
||||
def extract_outcome(messages: List[dict]) -> str:
|
||||
"""Classify session outcome: success/partial/failure."""
|
||||
errors = []
|
||||
for msg in messages:
|
||||
if msg.get('role') == 'tool' and msg.get('is_error'):
|
||||
err = msg.get('content', '')
|
||||
if isinstance(err, str):
|
||||
errors.append(err.lower())
|
||||
|
||||
if errors:
|
||||
if any('405' in e or 'permission' in e or 'authentication' in e for e in errors):
|
||||
return 'failure'
|
||||
return 'partial'
|
||||
|
||||
# Check last assistant message for success indicators
|
||||
last = messages[-1] if messages else {}
|
||||
if last.get('role') == 'assistant':
|
||||
content = str(last.get('content', ''))
|
||||
success_words = ['done', 'completed', 'success', 'merged', 'pushed', 'created', 'saved']
|
||||
if any(word in content.lower() for word in success_words):
|
||||
return 'success'
|
||||
|
||||
return 'unknown'
|
||||
|
||||
|
||||
def harvest_session(session_path: str, knowledge_dir: str, api_base: str, api_key: str,
|
||||
model: str, dry_run: bool = False, min_confidence: float = 0.3) -> dict:
|
||||
"""Harvest session entities and relationships from one session."""
|
||||
start_time = time.time()
|
||||
stats = {
|
||||
'session': session_path,
|
||||
'facts_found': 0,
|
||||
'facts_new': 0,
|
||||
'facts_dup': 0,
|
||||
'elapsed_seconds': 0,
|
||||
'error': None
|
||||
}
|
||||
|
||||
try:
|
||||
messages = read_session(session_path)
|
||||
if not messages:
|
||||
stats['error'] = "Empty session file"
|
||||
return stats
|
||||
|
||||
conv = extract_conversation(messages)
|
||||
if not conv:
|
||||
stats['error'] = "No conversation turns found"
|
||||
return stats
|
||||
|
||||
truncated = truncate_for_context(conv, head=50, tail=50)
|
||||
transcript = messages_to_text(truncated)
|
||||
|
||||
prompt = load_extraction_prompt()
|
||||
raw_facts = call_llm(prompt, transcript, api_base, api_key, model)
|
||||
if raw_facts is None:
|
||||
stats['error'] = "LLM extraction failed"
|
||||
return stats
|
||||
|
||||
valid_facts = [f for f in raw_facts if validate_fact(f) and f.get('confidence', 0) >= min_confidence]
|
||||
stats['facts_found'] = len(valid_facts)
|
||||
|
||||
existing_index = load_existing_knowledge(knowledge_dir)
|
||||
existing_facts = existing_index.get('facts', [])
|
||||
new_facts = deduplicate(valid_facts, existing_facts)
|
||||
stats['facts_new'] = len(new_facts)
|
||||
stats['facts_dup'] = len(valid_facts) - len(new_facts)
|
||||
|
||||
if new_facts and not dry_run:
|
||||
write_knowledge(existing_index, new_facts, knowledge_dir, source_session=session_path)
|
||||
|
||||
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
|
||||
return stats
|
||||
|
||||
except Exception as e:
|
||||
stats['error'] = str(e)
|
||||
stats['elapsed_seconds'] = round(time.time() - start_time, 2)
|
||||
return stats
|
||||
|
||||
|
||||
def batch_harvest(sessions_dir: str, knowledge_dir: str, api_base: str, api_key: str,
|
||||
model: str, since: str = "", limit: int = 0, dry_run: bool = False) -> List[dict]:
|
||||
sessions_path = Path(sessions_dir)
|
||||
if not sessions_path.is_dir():
|
||||
print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr)
|
||||
return []
|
||||
|
||||
session_files = sorted(sessions_path.glob("*.jsonl"), reverse=True)
|
||||
|
||||
if since:
|
||||
since_dt = datetime.fromisoformat(since.replace('Z', '+00:00'))
|
||||
filtered = []
|
||||
for sf in session_files:
|
||||
try:
|
||||
parts = sf.stem.split('_')
|
||||
if len(parts) >= 3:
|
||||
date_str = parts[1]
|
||||
file_dt = datetime.strptime(date_str, '%Y%m%d').replace(tzinfo=timezone.utc)
|
||||
if file_dt >= since_dt:
|
||||
filtered.append(sf)
|
||||
except (ValueError, IndexError):
|
||||
filtered.append(sf)
|
||||
session_files = filtered
|
||||
|
||||
if limit > 0:
|
||||
session_files = session_files[:limit]
|
||||
|
||||
print(f"Harvesting {len(session_files)} sessions with session knowledge extractor...")
|
||||
|
||||
results = []
|
||||
for i, sf in enumerate(session_files, 1):
|
||||
print(f"[{i}/{len(session_files)}] {sf.name}...", end=" ", flush=True)
|
||||
stats = harvest_session(str(sf), knowledge_dir, api_base, api_key, model, dry_run)
|
||||
if stats['error']:
|
||||
print(f"ERROR: {stats['error']}")
|
||||
else:
|
||||
print(f"{stats['facts_new']} new, {stats['facts_dup']} dup ({stats['elapsed_seconds']}s)")
|
||||
results.append(stats)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Extract session entities and relationships from Hermes transcripts")
|
||||
parser.add_argument('--session', help='Path to a single session JSONL file')
|
||||
parser.add_argument('--batch', action='store_true', help='Batch mode: process multiple sessions')
|
||||
parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'),
|
||||
help='Directory containing session files (default: ~/.hermes/sessions)')
|
||||
parser.add_argument('--output', default='knowledge', help='Output directory for knowledge store')
|
||||
parser.add_argument('--since', default='', help='Only process sessions after this date (YYYY-MM-DD)')
|
||||
parser.add_argument('--limit', type=int, default=0, help='Max sessions to process (0=unlimited)')
|
||||
parser.add_argument('--api-base', default=DEFAULT_API_BASE, help='LLM API base URL')
|
||||
parser.add_argument('--api-key', default='', help='LLM API key (or set EXTRACTOR_API_KEY)')
|
||||
parser.add_argument('--model', default=DEFAULT_MODEL, help='Model to use for extraction')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Preview without writing to knowledge store')
|
||||
parser.add_argument('--min-confidence', type=float, default=0.3, help='Minimum confidence threshold')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
api_key = args.api_key or DEFAULT_API_KEY or find_api_key()
|
||||
if not api_key:
|
||||
print("ERROR: No API key found. Set EXTRACTOR_API_KEY or store in one of:", file=sys.stderr)
|
||||
for p in API_KEY_PATHS:
|
||||
print(f" {p}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
knowledge_dir = args.output
|
||||
if not os.path.isabs(knowledge_dir):
|
||||
knowledge_dir = os.path.join(SCRIPT_DIR.parent, knowledge_dir)
|
||||
|
||||
if args.session:
|
||||
stats = harvest_session(
|
||||
args.session, knowledge_dir, args.api_base, api_key, args.model,
|
||||
dry_run=args.dry_run, min_confidence=args.min_confidence
|
||||
)
|
||||
print(json.dumps(stats, indent=2))
|
||||
if stats['error']:
|
||||
sys.exit(1)
|
||||
elif args.batch:
|
||||
results = batch_harvest(
|
||||
args.sessions_dir, knowledge_dir, args.api_base, api_key, args.model,
|
||||
since=args.since, limit=args.limit, dry_run=args.dry_run
|
||||
)
|
||||
total_new = sum(r['facts_new'] for r in results)
|
||||
total_dup = sum(r['facts_dup'] for r in results)
|
||||
errors = sum(1 for r in results if r['error'])
|
||||
print(f"\nDone: {total_new} new facts, {total_dup} duplicates, {errors} errors")
|
||||
else:
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
170
scripts/test_pr_complexity_scorer.py
Normal file
170
scripts/test_pr_complexity_scorer.py
Normal file
@@ -0,0 +1,170 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Tests for PR Complexity Scorer — unit tests for the scoring logic.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
from pr_complexity_scorer import (
|
||||
score_pr,
|
||||
is_dependency_file,
|
||||
is_test_file,
|
||||
TIME_PER_POINT,
|
||||
SMALL_FILES,
|
||||
MEDIUM_FILES,
|
||||
LARGE_FILES,
|
||||
SMALL_LINES,
|
||||
MEDIUM_LINES,
|
||||
LARGE_LINES,
|
||||
)
|
||||
|
||||
PASS = 0
|
||||
FAIL = 0
|
||||
|
||||
def test(name):
|
||||
def decorator(fn):
|
||||
global PASS, FAIL
|
||||
try:
|
||||
fn()
|
||||
PASS += 1
|
||||
print(f" [PASS] {name}")
|
||||
except AssertionError as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: {e}")
|
||||
except Exception as e:
|
||||
FAIL += 1
|
||||
print(f" [FAIL] {name}: Unexpected error: {e}")
|
||||
return decorator
|
||||
|
||||
def assert_eq(a, b, msg=""):
|
||||
if a != b:
|
||||
raise AssertionError(f"{msg} expected {b!r}, got {a!r}")
|
||||
|
||||
def assert_true(v, msg=""):
|
||||
if not v:
|
||||
raise AssertionError(msg or "Expected True")
|
||||
|
||||
def assert_false(v, msg=""):
|
||||
if v:
|
||||
raise AssertionError(msg or "Expected False")
|
||||
|
||||
|
||||
print("=== PR Complexity Scorer Tests ===\n")
|
||||
|
||||
print("-- File Classification --")
|
||||
|
||||
@test("dependency file detection — requirements.txt")
|
||||
def _():
|
||||
assert_true(is_dependency_file("requirements.txt"))
|
||||
assert_true(is_dependency_file("src/requirements.txt"))
|
||||
assert_false(is_dependency_file("requirements_test.txt"))
|
||||
|
||||
@test("dependency file detection — pyproject.toml")
|
||||
def _():
|
||||
assert_true(is_dependency_file("pyproject.toml"))
|
||||
assert_false(is_dependency_file("myproject.py"))
|
||||
|
||||
@test("test file detection — pytest style")
|
||||
def _():
|
||||
assert_true(is_test_file("tests/test_api.py"))
|
||||
assert_true(is_test_file("test_module.py"))
|
||||
assert_true(is_test_file("src/module_test.py"))
|
||||
|
||||
@test("test file detection — other frameworks")
|
||||
def _():
|
||||
assert_true(is_test_file("spec/feature_spec.rb"))
|
||||
assert_true(is_test_file("__tests__/component.test.js"))
|
||||
assert_false(is_test_file("testfixtures/helper.py"))
|
||||
|
||||
|
||||
print("\n-- Scoring Logic --")
|
||||
|
||||
@test("small PR gets low score (1-3)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=3,
|
||||
additions=50,
|
||||
deletions=10,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(1 <= score <= 3, f"Score should be low, got {score}")
|
||||
assert_true(minutes < 20)
|
||||
|
||||
@test("medium PR gets medium score (4-6)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=15,
|
||||
additions=400,
|
||||
deletions=100,
|
||||
has_dependency_changes=False,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(4 <= score <= 6, f"Score should be medium, got {score}")
|
||||
assert_true(20 <= minutes <= 45)
|
||||
|
||||
@test("large PR gets high score (7-9)")
|
||||
def _():
|
||||
score, minutes, _ = score_pr(
|
||||
files_changed=60,
|
||||
additions=3000,
|
||||
deletions=1500,
|
||||
has_dependency_changes=True,
|
||||
test_coverage_delta=None
|
||||
)
|
||||
assert_true(7 <= score <= 9, f"Score should be high, got {score}")
|
||||
assert_true(minutes >= 45)
|
||||
|
||||
@test("dependency changes boost score")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
dep_score, _, _ = score_pr(
|
||||
files_changed=10, additions=200, deletions=50,
|
||||
has_dependency_changes=True, test_coverage_delta=None
|
||||
)
|
||||
assert_true(dep_score > base_score, f"Deps: {base_score} -> {dep_score}")
|
||||
|
||||
@test("adding tests lowers complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
better_score, _, _ = score_pr(
|
||||
files_changed=8, additions=180, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=3
|
||||
)
|
||||
assert_true(better_score < base_score, f"Tests: {base_score} -> {better_score}")
|
||||
|
||||
@test("removing tests increases complexity")
|
||||
def _():
|
||||
base_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=None
|
||||
)
|
||||
worse_score, _, _ = score_pr(
|
||||
files_changed=8, additions=150, deletions=20,
|
||||
has_dependency_changes=False, test_coverage_delta=-2
|
||||
)
|
||||
assert_true(worse_score > base_score, f"Remove tests: {base_score} -> {worse_score}")
|
||||
|
||||
@test("score bounded 1-10")
|
||||
def _():
|
||||
for files, adds, dels in [(1, 10, 5), (100, 10000, 5000)]:
|
||||
score, _, _ = score_pr(files, adds, dels, False, None)
|
||||
assert_true(1 <= score <= 10, f"Score {score} out of range")
|
||||
|
||||
@test("estimated minutes exist for all scores")
|
||||
def _():
|
||||
for s in range(1, 11):
|
||||
assert_true(s in TIME_PER_POINT, f"Missing time for score {s}")
|
||||
|
||||
|
||||
print(f"\n=== Results: {PASS} passed, {FAIL} failed ===")
|
||||
sys.exit(0 if FAIL == 0 else 1)
|
||||
@@ -1,197 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke test for session knowledge extractor.
|
||||
Tests: parsing, entity extraction, metadata generation, dedup, store roundtrip.
|
||||
Does NOT call real LLM — uses mock facts.
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
SCRIPT_DIR = Path(__file__).parent.absolute()
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from session_reader import read_session, extract_conversation, truncate_for_context, messages_to_text
|
||||
from session_knowledge_extractor import (
|
||||
validate_fact, deduplicate, load_existing_knowledge, fact_fingerprint,
|
||||
extract_agent, extract_tasks, extract_tools, extract_outcome,
|
||||
write_knowledge
|
||||
)
|
||||
|
||||
|
||||
def make_test_session():
|
||||
"""Create a sample Hermes session transcript."""
|
||||
messages = [
|
||||
{"role": "user", "content": "Clone the compounding-intelligence repo and run tests", "timestamp": "2026-04-13T10:00:00Z"},
|
||||
{"role": "assistant", "model": "xiaomi/mimo-v2-pro", "content": "I'll clone the repo and run tests.", "timestamp": "2026-04-13T10:00:02Z",
|
||||
"tool_calls": [
|
||||
{"function": {"name": "terminal", "arguments": '{"command": "git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/compounding-intelligence.git"}'}},
|
||||
]},
|
||||
{"role": "tool", "content": "Cloned successfully", "timestamp": "2026-04-13T10:00:10Z"},
|
||||
{"role": "assistant", "model": "xiaomi/mimo-v2-pro", "content": "Now running pytest...", "timestamp": "2026-04-13T10:00:11Z",
|
||||
"tool_calls": [
|
||||
{"function": {"name": "execute_code", "arguments": '{"code": "import subprocess; subprocess.run([\"pytest\"])"}'}},
|
||||
]},
|
||||
{"role": "tool", "content": "15 passed, 0 failed", "timestamp": "2026-04-13T10:00:15Z"},
|
||||
{"role": "assistant", "model": "xiaomi/mimo-v2-pro", "content": "All tests passed — done.", "timestamp": "2026-04-13T10:00:16Z"},
|
||||
]
|
||||
return messages
|
||||
|
||||
|
||||
def test_extract_entities():
|
||||
"""Test entity extraction from messages."""
|
||||
messages = make_test_session() # 6 total: 3 user/assistant + 3 tool
|
||||
agent = extract_agent(messages)
|
||||
assert agent == "xiaomi/mimo-v2-pro"
|
||||
tasks = extract_tasks(messages)
|
||||
assert len(tasks) >= 1 and "clone" in tasks[0].lower()
|
||||
tools = extract_tools(messages)
|
||||
assert "terminal" in tools and "execute_code" in tools and len(tools) == 2
|
||||
outcome = extract_outcome(messages)
|
||||
assert outcome == "success"
|
||||
|
||||
print(" [PASS] entity extraction works")
|
||||
|
||||
|
||||
def test_validate_fact():
|
||||
good = {"fact": "Token is at ~/.config/gitea/token", "category": "tool-quirk", "repo": "global", "confidence": 0.9}
|
||||
assert validate_fact(good), "Valid fact should pass"
|
||||
|
||||
bad = {"fact": "Something", "category": "nonsense", "repo": "x", "confidence": 0.5}
|
||||
assert not validate_fact(bad), "Bad category should fail"
|
||||
|
||||
print(" [PASS] fact validation works")
|
||||
|
||||
|
||||
def test_deduplicate():
|
||||
existing = [{"fact": "A", "category": "fact", "repo": "global", "confidence": 0.9}]
|
||||
new = [
|
||||
{"fact": "A", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "B", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
]
|
||||
result = deduplicate(new, existing)
|
||||
assert len(result) == 1 and result[0]["fact"] == "B", "Should remove exact dup"
|
||||
print(" [PASS] deduplication works")
|
||||
|
||||
|
||||
def test_knowledge_store_roundtrip():
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
index = load_existing_knowledge(tmpdir)
|
||||
assert index["total_facts"] == 0
|
||||
|
||||
new_facts = [
|
||||
{"fact": "session_x used terminal", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "session_x task: clone repo", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
|
||||
{"fact": "session_x outcome: success", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
] * 4 # 12 facts total
|
||||
|
||||
write_knowledge(index, new_facts, tmpdir, source_session="session_x.jsonl")
|
||||
|
||||
index2 = load_existing_knowledge(tmpdir)
|
||||
assert index2["total_facts"] == 12
|
||||
|
||||
# Verify markdown written
|
||||
md_path = Path(tmpdir) / "repos" / "compounding-intelligence.md"
|
||||
assert md_path.exists(), "Markdown file should be created"
|
||||
|
||||
print(" [PASS] knowledge store roundtrip works (12 facts)")
|
||||
|
||||
|
||||
def test_min_facts_per_session():
|
||||
"""Validator: a typical session should yield 10+ facts."""
|
||||
# Simulate facts from one session (what the LLM would produce)
|
||||
mock_facts = [
|
||||
{"fact": "session_123 was handled by model xiaomi/mimo-v2-pro", "category": "fact", "repo": "global", "confidence": 0.95},
|
||||
{"fact": "session_123's task was to clone the compounding-intelligence repository", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
|
||||
{"fact": "session_123 used tool 'terminal' to run git clone", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "session_123 used tool 'execute_code' to run pytest", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "session_123 executed: git clone https://forge...", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "session_123 executed: pytest (15 tests)", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
|
||||
{"fact": "session_123 outcome: all 15 tests passed", "category": "fact", "repo": "global", "confidence": 0.95},
|
||||
{"fact": "session_123 touched repo: compounding-intelligence", "category": "fact", "repo": "compounding-intelligence", "confidence": 1.0},
|
||||
{"fact": "session_123 terminal output: 'Cloned successfully'", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "session_123 test output: '15 passed, 0 failed'", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
|
||||
{"fact": "session_123 completed without errors", "category": "fact", "repo": "global", "confidence": 0.85},
|
||||
{"fact": "session_123 final message: 'All tests passed — done.'", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
]
|
||||
assert len(mock_facts) >= 10, f"Should have at least 10 facts, got {len(mock_facts)}"
|
||||
print(f" [PASS] mock session produces {len(mock_facts)} facts")
|
||||
|
||||
|
||||
def test_full_chain_no_llm():
|
||||
"""Full pipeline: read -> extract entities -> validate -> dedup -> store."""
|
||||
messages = make_test_session()
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
|
||||
for msg in messages:
|
||||
f.write(json.dumps(msg) + '\n')
|
||||
session_path = f.name
|
||||
|
||||
with tempfile.TemporaryDirectory() as knowledge_dir:
|
||||
# Step 1: Read
|
||||
msgs = read_session(session_path)
|
||||
assert len(msgs) == 6 # 3 user/assistant + 3 tool role messages
|
||||
|
||||
# Step 2: Extract conversation
|
||||
conv = extract_conversation(msgs)
|
||||
assert len(conv) == 4 # 1 user + 3 assistant messages (tool role messages skipped)
|
||||
|
||||
# Step 3: Truncate
|
||||
truncated = truncate_for_context(conv, head=50, tail=50)
|
||||
transcript = messages_to_text(truncated)
|
||||
assert "clone" in transcript.lower()
|
||||
|
||||
# Step 4: Extract entities
|
||||
agent = extract_agent(msgs)
|
||||
tools = extract_tools(msgs)
|
||||
outcome = extract_outcome(msgs)
|
||||
assert agent == "xiaomi/mimo-v2-pro"
|
||||
assert len(tools) >= 2
|
||||
assert outcome == "success"
|
||||
|
||||
# Step 5-7: Simulated LLM output → validate → dedup → store
|
||||
# Create 12 distinct facts to meet the 10+ requirement
|
||||
mock_facts = [
|
||||
{"fact": "Session used tool terminal", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "Session used tool execute_code", "category": "tool-quirk", "repo": "global", "confidence": 0.9},
|
||||
{"fact": f"Session handled by agent {agent}", "category": "fact", "repo": "global", "confidence": 0.95},
|
||||
{"fact": "Session task: clone the repository", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
|
||||
{"fact": "Session task: run pytest", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.9},
|
||||
{"fact": "Session outcome: success", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "Session repo: compounding-intelligence touched", "category": "fact", "repo": "compounding-intelligence", "confidence": 1.0},
|
||||
{"fact": "Terminal command executed: git clone", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "Test result: 15 passed, 0 failed", "category": "fact", "repo": "compounding-intelligence", "confidence": 0.95},
|
||||
{"fact": "All tests passed — session complete", "category": "fact", "repo": "global", "confidence": 0.9},
|
||||
{"fact": "No errors encountered during session", "category": "fact", "repo": "global", "confidence": 0.8},
|
||||
{"fact": "Session duration: approximately 16 seconds", "category": "fact", "repo": "global", "confidence": 0.7},
|
||||
]
|
||||
|
||||
valid = [f for f in mock_facts if validate_fact(f)]
|
||||
assert len(valid) == 12
|
||||
|
||||
index = load_existing_knowledge(knowledge_dir)
|
||||
new_facts = deduplicate(valid, index.get("facts", []))
|
||||
assert len(new_facts) == 12
|
||||
|
||||
from session_knowledge_extractor import write_knowledge
|
||||
write_knowledge(index, new_facts, knowledge_dir, source_session=session_path)
|
||||
|
||||
index2 = load_existing_knowledge(knowledge_dir)
|
||||
assert index2["total_facts"] == 12
|
||||
|
||||
os.unlink(session_path)
|
||||
print(" [PASS] full chain (read → entities → validate → dedup → store) works (12 facts)")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Running session knowledge extractor smoke tests...")
|
||||
test_extract_entities()
|
||||
test_validate_fact()
|
||||
test_deduplicate()
|
||||
test_knowledge_store_roundtrip()
|
||||
test_min_facts_per_session()
|
||||
test_full_chain_no_llm()
|
||||
print("\nAll tests passed — extractor produces 10+ facts per session ✓")
|
||||
@@ -1,95 +0,0 @@
|
||||
# Knowledge Extraction Prompt — Session Entities & Relationships
|
||||
|
||||
## System Prompt
|
||||
|
||||
You are a session knowledge extraction engine. You read Hermes session transcripts and output ONLY structured JSON. You extract session entities (agent, task, tools, outcome) and the relationships between them. You never invent facts not in the transcript.
|
||||
|
||||
## Prompt
|
||||
|
||||
```
|
||||
TASK: Extract knowledge facts from this session transcript. Focus on:
|
||||
|
||||
1. AGENT: Which model/agent handled this session
|
||||
2. TASK: What problem or goal was being solved
|
||||
3. TOOLS: Which tools were used and what each accomplished
|
||||
4. OUTCOME: Did the session succeed, partially succeed, or fail?
|
||||
5. RELATIONSHIPS: How do these entities connect?
|
||||
|
||||
RULES:
|
||||
1. Extract ONLY information explicitly stated or clearly implied by the transcript.
|
||||
2. Do NOT infer, assume, or hallucinate.
|
||||
3. Every fact must point to a specific message or tool call as evidence.
|
||||
4. Generate at least 10 facts. Break complex tool usages into multiple atomic facts.
|
||||
5. Include relationship facts: "session X used tool Y", "agent Z handled session X", "task W was completed by session X".
|
||||
6. Include outcome facts: success indicators, error conditions, partial completions.
|
||||
|
||||
CATEGORIES (assign exactly one):
|
||||
- fact: Concrete, verifiable statement (paths, commands, results, configs)
|
||||
- pitfall: Error hit, wrong assumption, time wasted
|
||||
- pattern: Successful reusable sequence
|
||||
- tool-quirk: Environment-specific behavior (token paths, URLs, API gotchas)
|
||||
- question: Something identified but not answered
|
||||
|
||||
CONFIDENCE:
|
||||
- 0.9: Directly observed with explicit output or verification
|
||||
- 0.7: Multiple data points confirm, but not explicitly verified
|
||||
- 0.5: Clear implication but not directly stated
|
||||
- 0.3: Weak inference from limited evidence
|
||||
|
||||
OUTPUT FORMAT (valid JSON only, no markdown, no explanation):
|
||||
{
|
||||
"knowledge": [
|
||||
{
|
||||
"fact": "One specific sentence of knowledge",
|
||||
"category": "fact|pitfall|pattern|tool-quirk|question",
|
||||
"repo": "repo-name or global",
|
||||
"confidence": 0.0-1.0,
|
||||
"evidence": "Brief quote or reference from transcript that supports this"
|
||||
}
|
||||
],
|
||||
"meta": {
|
||||
"session_id": "extracted or generated id",
|
||||
"session_outcome": "success|partial|failure|unknown",
|
||||
"agent": "model name if identifiable",
|
||||
"task": "brief description of the goal",
|
||||
"tools_used": ["tool1", "tool2"],
|
||||
"repos_touched": ["repo1"],
|
||||
"fact_count": 0
|
||||
}
|
||||
}
|
||||
|
||||
TRANSCRIPT:
|
||||
{{transcript}}
|
||||
```
|
||||
|
||||
## Design Notes
|
||||
|
||||
### Entity extraction strategy
|
||||
|
||||
**Agent:** Look for `"model": "..."` in assistant messages or model mentions in content.
|
||||
|
||||
**Task:** The first user message usually states the goal. If vague, look for the assistant's interpretation: "I'll help you X".
|
||||
|
||||
**Tools:** Every `tool_calls` entry is a tool use. Extract the function name and what it was used for based on arguments.
|
||||
|
||||
**Outcome:** Success indicators: "done", "completed", "merged", "pushed", "created". Failures: HTTP errors (405, 404, 403), stack traces, explicit failures.
|
||||
|
||||
**Relationships:** Treat the session as a central entity. Generate facts like:
|
||||
- Agent relationship: "session_abc was handled by model xiaomi/mimo-v2-pro"
|
||||
- Task relationship: "session_abc's task was to merge PR #123"
|
||||
- Tool relationship: "session_abc used terminal to run 'git clone'"
|
||||
- Outcome relationship: "session_abc outcome: success — PR merged"
|
||||
|
||||
### 10+ facts guarantee
|
||||
|
||||
Each session with tool usage typically yields:
|
||||
- 1 fact: agent identity
|
||||
- 1-2 facts: task/goal (decomposed into sub-goals)
|
||||
- 3-5 facts: each tool call becomes 1-2 facts (tool name + purpose + result)
|
||||
- 1-2 facts: outcome details
|
||||
- 1-2 facts: repo touched
|
||||
Total: 10+ per non-trivial session.
|
||||
|
||||
### Token budget
|
||||
|
||||
~700 tokens for prompt (excluding transcript). Leaves room for long transcripts.
|
||||
174
tests/test_knowledge_to_training_pairs.py
Normal file
174
tests/test_knowledge_to_training_pairs.py
Normal file
@@ -0,0 +1,174 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Smoke tests for knowledge_to_training_pairs.py
|
||||
|
||||
Tests:
|
||||
- Output is valid JSONL
|
||||
- Each line has required fields (terse, rich, domain, source_confidence, source_model)
|
||||
- Confidence values are in [0,1]
|
||||
- Terse is non-empty and reasonably short (< 200 chars)
|
||||
- Rich matches the original fact
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
# Add scripts dir to path for imports
|
||||
SCRIPT_DIR = Path(__file__).parent.parent / "scripts"
|
||||
sys.path.insert(0, str(SCRIPT_DIR))
|
||||
|
||||
from knowledge_to_training_pairs import (
|
||||
fact_to_terse,
|
||||
filter_entries,
|
||||
entry_to_pair,
|
||||
parse_date,
|
||||
)
|
||||
|
||||
|
||||
def test_fact_to_terse_pitfall():
|
||||
fact = "deploy-crons.py leaves jobs in mixed model format"
|
||||
category = "pitfall"
|
||||
domain = "hermes-agent"
|
||||
terse = fact_to_terse(fact, category, domain)
|
||||
assert terse.startswith("How do I")
|
||||
assert "?" in terse
|
||||
assert len(terse) < 150
|
||||
print("PASS: test_fact_to_terse_pitfall")
|
||||
|
||||
|
||||
def test_fact_to_terse_fact():
|
||||
fact = "Python is a high-level programming language"
|
||||
terse = fact_to_terse(fact, "fact", "global")
|
||||
assert terse.startswith("What should I know about")
|
||||
assert "?" in terse
|
||||
print("PASS: test_fact_to_terse_fact")
|
||||
|
||||
|
||||
def test_fact_to_terse_pattern():
|
||||
fact = "Use sparse checkout for large repos"
|
||||
terse = fact_to_terse(fact, "pattern", "devops")
|
||||
assert "recommended way" in terse or "best way" in terse
|
||||
print("PASS: test_fact_to_terse_pattern")
|
||||
|
||||
|
||||
def test_entry_to_pair_structure():
|
||||
entry = {
|
||||
"id": "test:001",
|
||||
"fact": "Test fact text.",
|
||||
"category": "fact",
|
||||
"domain": "test-domain",
|
||||
"confidence": 0.85,
|
||||
"model": "test-model",
|
||||
}
|
||||
pair = entry_to_pair(entry)
|
||||
assert pair is not None
|
||||
assert "terse" in pair
|
||||
assert "rich" in pair
|
||||
assert "domain" in pair
|
||||
assert "source_confidence" in pair
|
||||
assert "source_model" in pair
|
||||
assert pair["rich"] == "Test fact text."
|
||||
assert pair["domain"] == "test-domain"
|
||||
assert 0.0 <= pair["source_confidence"] <= 1.0
|
||||
print("PASS: test_entry_to_pair_structure")
|
||||
|
||||
|
||||
def test_filter_by_confidence():
|
||||
entries = [
|
||||
{"fact": "A", "confidence": 0.9},
|
||||
{"fact": "B", "confidence": 0.4},
|
||||
{"fact": "C", "confidence": 0.6},
|
||||
]
|
||||
filtered = filter_entries(entries, min_confidence=0.5)
|
||||
assert len(filtered) == 2
|
||||
assert all(e["confidence"] >= 0.5 for e in filtered)
|
||||
print("PASS: test_filter_by_confidence")
|
||||
|
||||
|
||||
def test_filter_by_model():
|
||||
entries = [
|
||||
{"fact": "A", "model": "claude-sonnet"},
|
||||
{"fact": "B", "model": "gpt-4"},
|
||||
{"fact": "C", "model": "unknown"},
|
||||
]
|
||||
filtered = filter_entries(entries, model_filter=["claude-sonnet", "gpt-4"])
|
||||
assert len(filtered) == 2
|
||||
assert all(e["model"] in ("claude-sonnet", "gpt-4") for e in filtered)
|
||||
print("PASS: test_filter_by_model")
|
||||
|
||||
|
||||
def test_filter_by_date():
|
||||
entries = [
|
||||
{"fact": "A", "last_confirmed": "2026-04-10"},
|
||||
{"fact": "B", "last_confirmed": "2026-03-01"},
|
||||
{"fact": "C", "first_seen": "2026-04-15"},
|
||||
]
|
||||
after_dt = parse_date("2026-04-01")
|
||||
filtered = filter_entries(entries, after=after_dt)
|
||||
assert len(filtered) == 2
|
||||
print("PASS: test_filter_by_date")
|
||||
|
||||
|
||||
def test_end_to_end_jsonl_output():
|
||||
"""Integration test: run the script and verify JSONL validity."""
|
||||
import subprocess
|
||||
|
||||
repo_dir = SCRIPT_DIR.parent
|
||||
result = subprocess.run(
|
||||
["python3", "scripts/knowledge_to_training_pairs.py", "--dry-run"],
|
||||
capture_output=True, text=True, cwd=repo_dir
|
||||
)
|
||||
assert result.returncode == 0
|
||||
stderr = result.stderr.strip()
|
||||
|
||||
# The stats JSON object is at the top of stderr. Find its bounds via brace matching.
|
||||
start = stderr.find('{')
|
||||
assert start >= 0, "Stats JSON not found in stderr"
|
||||
stderr_sub = stderr[start:]
|
||||
|
||||
depth = 0
|
||||
end = 0
|
||||
for i, ch in enumerate(stderr_sub):
|
||||
if ch == '{':
|
||||
depth += 1
|
||||
elif ch == '}':
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
end = i + 1
|
||||
break
|
||||
assert end > 0, "Unterminated JSON in stderr"
|
||||
|
||||
stats = json.loads(stderr_sub[:end])
|
||||
assert stats["input_entries"] > 0
|
||||
assert stats["pairs_generated"] > 0
|
||||
print("PASS: test_end_to_end_jsonl_output")
|
||||
|
||||
|
||||
def test_terse_length_constraint():
|
||||
"""Terse should be reasonably short for training."""
|
||||
# Sample facts from actual knowledge
|
||||
test_facts = [
|
||||
("deploy-crons.py leaves jobs in mixed model format", "pitfall", "hermes-agent"),
|
||||
("Cron jobs with blank fallback_model fields trigger warnings", "pitfall", "hermes-agent"),
|
||||
("Use the Gitea REST API when clone times out", "pattern", "devops"),
|
||||
]
|
||||
for fact, cat, domain in test_facts:
|
||||
terse = fact_to_terse(fact, cat, domain)
|
||||
assert len(terse) < 200, f"Terse too long ({len(terse)}): {terse}"
|
||||
print("PASS: test_terse_length_constraint")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_fact_to_terse_pitfall()
|
||||
test_fact_to_terse_fact()
|
||||
test_fact_to_terse_pattern()
|
||||
test_entry_to_pair_structure()
|
||||
test_filter_by_confidence()
|
||||
test_filter_by_model()
|
||||
test_filter_by_date()
|
||||
test_end_to_end_jsonl_output()
|
||||
test_terse_length_constraint()
|
||||
print("\nAll smoke tests passed.")
|
||||
Reference in New Issue
Block a user