diff --git a/scripts/knowledge_to_training_pairs.py b/scripts/knowledge_to_training_pairs.py new file mode 100644 index 0000000..f692858 --- /dev/null +++ b/scripts/knowledge_to_training_pairs.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python3 +""" +knowledge_to_training_pairs.py — Convert quality-gated knowledge entries into training pairs. + +Reads knowledge/index.json (or a custom JSONL of entries), applies quality filters, +and emits terse→rich training pairs in JSONL format for model fine-tuning. + +Usage: + python3 scripts/knowledge_to_training_pairs.py \ + --input knowledge/index.json \ + --output training_pairs.jsonl \ + --min-confidence 0.7 \ + --model-filter claude-sonnet,gpt-4 \ + --after 2026-01-01 + +Input entry format (from index.json facts): + { + "id": "hermes-agent:pitfall:001", + "fact": "deploy-crons.py leaves jobs in mixed model format", + "category": "pitfall", + "domain": "hermes-agent", + "confidence": 0.95, + ... + } + +Output training pair format: + { + "terse": "How do I handle deploy-crons.py mixed model format?", + "rich": "deploy-crons.py leaves jobs in mixed model format.", + "domain": "hermes-agent", + "source_confidence": 0.95, + "source_model": "unknown" + } +""" + +import argparse +import json +import os +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + + +def fact_to_terse(fact: str, category: str, domain: str) -> str: + """ + Derive a short user query from a knowledge fact. + + Strategy: + - Pitfalls → "How do I avoid/handle/fix ?" + - Patterns → "What's the recommended way to ?" + - Tool quirks → "How does behave in ?" + - Facts → "What should I know about ?" + - Questions → "What is the answer to: ?" + """ + fact_lower = fact.lower() + # Extract a concise excerpt (first sentence or 80 chars) + excerpt = fact.split('. ')[0] if '. ' in fact else fact[:80] + + if category == "pitfall": + verbs = ["avoid", "handle", "fix", "prevent"] + # pick verb based on fact wording + if "trigger" in fact_lower or "cause" in fact_lower: + verb = "avoid" + elif "broken" in fact_lower or "fails" in fact_lower: + verb = "fix" + else: + verb = "handle" + return f"How do I {verb} {excerpt.rstrip('.')}?" + elif category == "pattern": + return f"What's the recommended way to {excerpt.rstrip('.')}?" + elif category == "tool-quirk": + # Try to extract tool name + tool = fact.split()[0] if fact.split() else domain + return f"How does {tool} behave in this context?" + elif category == "question": + return f"What is the answer to: {excerpt}?" + else: # fact or unknown + return f"What should I know about {excerpt.rstrip('.')}?" + + +def parse_date(date_str: Optional[str]) -> Optional[datetime]: + """Parse ISO date string to datetime, or return None.""" + if not date_str: + return None + try: + return datetime.fromisoformat(date_str.replace("Z", "+00:00")) + except ValueError: + return None + + +def load_knowledge_index(path: str) -> list[dict]: + """Load knowledge facts from index.json (or plain JSONL of entries).""" + p = Path(path) + if not p.exists(): + print(f"ERROR: Knowledge input not found: {path}", file=sys.stderr) + sys.exit(1) + + with open(p) as f: + data = json.load(f) + + # index.json format: {"facts": [...], ...} + if isinstance(data, dict) and "facts" in data: + return data["facts"] + # JSONL format: one entry per line + if isinstance(data, list): + return data + # Plain file with JSON array + print(f"ERROR: Unrecognized input format in {path}", file=sys.stderr) + sys.exit(1) + + +def filter_entries(entries: list[dict], + min_confidence: float = 0.0, + model_filter: Optional[list[str]] = None, + after: Optional[datetime] = None, + before: Optional[datetime] = None) -> list[dict]: + """Apply quality and provenance filters.""" + filtered = [] + for entry in entries: + # Confidence filter (entry confidence) + conf = entry.get("confidence", 0.0) + if conf < min_confidence: + continue + + # Model filter: if specified, entry's model must be in the list + if model_filter: + entry_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown")) + if entry_model not in model_filter: + continue + + # Date filter: use last_confirmed or first_seen or harvested_at + entry_date = None + for field in ("last_confirmed", "first_seen", "harvested_at"): + if field in entry: + entry_date = parse_date(entry[field]) + if entry_date: + break + if after and entry_date and entry_date < after: + continue + if before and entry_date and entry_date > before: + continue + + filtered.append(entry) + return filtered + + +def entry_to_pair(entry: dict) -> dict: + """Convert a knowledge entry into a training pair.""" + fact = entry.get("fact", "").strip() + if not fact: + return None + + category = entry.get("category", "fact") + domain = entry.get("domain", "global") + + terse = fact_to_terse(fact, category, domain) + rich = fact + source_confidence = round(entry.get("confidence", 0.0), 4) + source_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown")) + + return { + "terse": terse, + "rich": rich, + "domain": domain, + "source_confidence": source_confidence, + "source_model": source_model, + } + + +def main(): + parser = argparse.ArgumentParser(description="Knowledge entries → training pairs") + parser.add_argument("--input", "-i", default="knowledge/index.json", + help="Input knowledge index or JSONL (default: knowledge/index.json)") + parser.add_argument("--output", "-o", default="training_pairs.jsonl", + help="Output JSONL file") + parser.add_argument("--min-confidence", type=float, default=0.5, + help="Minimum entry confidence to include (0.0-1.0, default: 0.5)") + parser.add_argument("--model-filter", + help="Comma-separated list of source models to include") + parser.add_argument("--after", + help="Include entries last_confirmed/first_seen on or after this date (YYYY-MM-DD)") + parser.add_argument("--before", + help="Include entries last_confirmed/first_seen on or before this date (YYYY-MM-DD)") + parser.add_argument("--dry-run", action="store_true", + help="Print sample pairs and stats without writing") + args = parser.parse_args() + + # Load + entries = load_knowledge_index(args.input) + print(f"Loaded {len(entries)} entries from {args.input}", file=sys.stderr) + + # Parse filters + model_list = args.model_filter.split(",") if args.model_filter else None + after_dt = parse_date(args.after) if args.after else None + before_dt = parse_date(args.before) if args.before else None + + # Filter + kept = filter_entries( + entries, + min_confidence=args.min_confidence, + model_filter=model_list, + after=after_dt, + before=before_dt, + ) + print(f"After filtering: {len(kept)} / {len(entries)} entries", file=sys.stderr) + + # Convert + pairs = [] + for entry in kept: + pair = entry_to_pair(entry) + if pair: + pairs.append(pair) + + # Stats + if pairs: + avg_conf = sum(p["source_confidence"] for p in pairs) / len(pairs) + domains = {} + models = {} + for p in pairs: + domains[p["domain"]] = domains.get(p["domain"], 0) + 1 + models[p["source_model"]] = models.get(p["source_model"], 0) + 1 + else: + avg_conf = 0.0 + domains = {} + models = {} + + stats = { + "input_entries": len(entries), + "after_filter": len(kept), + "pairs_generated": len(pairs), + "avg_confidence": round(avg_conf, 4), + "domains": domains, + "source_models": models, + } + print(json.dumps(stats, indent=2), file=sys.stderr) + + if args.dry_run: + print("\nSample pairs:", file=sys.stderr) + for p in pairs[:3]: + print(json.dumps(p, ensure_ascii=False), file=sys.stderr) + return + + # Write JSONL + out_path = Path(args.output) + out_path.parent.mkdir(parents=True, exist_ok=True) + with open(out_path, "w", encoding="utf-8") as f: + for pair in pairs: + f.write(json.dumps(pair, ensure_ascii=False) + "\n") + + print(f"\nWrote {len(pairs)} training pairs to {out_path}", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/tests/test_knowledge_to_training_pairs.py b/tests/test_knowledge_to_training_pairs.py new file mode 100644 index 0000000..6c72070 --- /dev/null +++ b/tests/test_knowledge_to_training_pairs.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 +""" +Smoke tests for knowledge_to_training_pairs.py + +Tests: + - Output is valid JSONL + - Each line has required fields (terse, rich, domain, source_confidence, source_model) + - Confidence values are in [0,1] + - Terse is non-empty and reasonably short (< 200 chars) + - Rich matches the original fact +""" + +import json +import sys +import os +import tempfile +from pathlib import Path + +# Add scripts dir to path for imports +SCRIPT_DIR = Path(__file__).parent.parent / "scripts" +sys.path.insert(0, str(SCRIPT_DIR)) + +from knowledge_to_training_pairs import ( + fact_to_terse, + filter_entries, + entry_to_pair, + parse_date, +) + + +def test_fact_to_terse_pitfall(): + fact = "deploy-crons.py leaves jobs in mixed model format" + category = "pitfall" + domain = "hermes-agent" + terse = fact_to_terse(fact, category, domain) + assert terse.startswith("How do I") + assert "?" in terse + assert len(terse) < 150 + print("PASS: test_fact_to_terse_pitfall") + + +def test_fact_to_terse_fact(): + fact = "Python is a high-level programming language" + terse = fact_to_terse(fact, "fact", "global") + assert terse.startswith("What should I know about") + assert "?" in terse + print("PASS: test_fact_to_terse_fact") + + +def test_fact_to_terse_pattern(): + fact = "Use sparse checkout for large repos" + terse = fact_to_terse(fact, "pattern", "devops") + assert "recommended way" in terse or "best way" in terse + print("PASS: test_fact_to_terse_pattern") + + +def test_entry_to_pair_structure(): + entry = { + "id": "test:001", + "fact": "Test fact text.", + "category": "fact", + "domain": "test-domain", + "confidence": 0.85, + "model": "test-model", + } + pair = entry_to_pair(entry) + assert pair is not None + assert "terse" in pair + assert "rich" in pair + assert "domain" in pair + assert "source_confidence" in pair + assert "source_model" in pair + assert pair["rich"] == "Test fact text." + assert pair["domain"] == "test-domain" + assert 0.0 <= pair["source_confidence"] <= 1.0 + print("PASS: test_entry_to_pair_structure") + + +def test_filter_by_confidence(): + entries = [ + {"fact": "A", "confidence": 0.9}, + {"fact": "B", "confidence": 0.4}, + {"fact": "C", "confidence": 0.6}, + ] + filtered = filter_entries(entries, min_confidence=0.5) + assert len(filtered) == 2 + assert all(e["confidence"] >= 0.5 for e in filtered) + print("PASS: test_filter_by_confidence") + + +def test_filter_by_model(): + entries = [ + {"fact": "A", "model": "claude-sonnet"}, + {"fact": "B", "model": "gpt-4"}, + {"fact": "C", "model": "unknown"}, + ] + filtered = filter_entries(entries, model_filter=["claude-sonnet", "gpt-4"]) + assert len(filtered) == 2 + assert all(e["model"] in ("claude-sonnet", "gpt-4") for e in filtered) + print("PASS: test_filter_by_model") + + +def test_filter_by_date(): + entries = [ + {"fact": "A", "last_confirmed": "2026-04-10"}, + {"fact": "B", "last_confirmed": "2026-03-01"}, + {"fact": "C", "first_seen": "2026-04-15"}, + ] + after_dt = parse_date("2026-04-01") + filtered = filter_entries(entries, after=after_dt) + assert len(filtered) == 2 + print("PASS: test_filter_by_date") + + +def test_end_to_end_jsonl_output(): + """Integration test: run the script and verify JSONL validity.""" + import subprocess + + repo_dir = SCRIPT_DIR.parent + result = subprocess.run( + ["python3", "scripts/knowledge_to_training_pairs.py", "--dry-run"], + capture_output=True, text=True, cwd=repo_dir + ) + assert result.returncode == 0 + stderr = result.stderr.strip() + + # The stats JSON object is at the top of stderr. Find its bounds via brace matching. + start = stderr.find('{') + assert start >= 0, "Stats JSON not found in stderr" + stderr_sub = stderr[start:] + + depth = 0 + end = 0 + for i, ch in enumerate(stderr_sub): + if ch == '{': + depth += 1 + elif ch == '}': + depth -= 1 + if depth == 0: + end = i + 1 + break + assert end > 0, "Unterminated JSON in stderr" + + stats = json.loads(stderr_sub[:end]) + assert stats["input_entries"] > 0 + assert stats["pairs_generated"] > 0 + print("PASS: test_end_to_end_jsonl_output") + + +def test_terse_length_constraint(): + """Terse should be reasonably short for training.""" + # Sample facts from actual knowledge + test_facts = [ + ("deploy-crons.py leaves jobs in mixed model format", "pitfall", "hermes-agent"), + ("Cron jobs with blank fallback_model fields trigger warnings", "pitfall", "hermes-agent"), + ("Use the Gitea REST API when clone times out", "pattern", "devops"), + ] + for fact, cat, domain in test_facts: + terse = fact_to_terse(fact, cat, domain) + assert len(terse) < 200, f"Terse too long ({len(terse)}): {terse}" + print("PASS: test_terse_length_constraint") + + +if __name__ == "__main__": + test_fact_to_terse_pitfall() + test_fact_to_terse_fact() + test_fact_to_terse_pattern() + test_entry_to_pair_structure() + test_filter_by_confidence() + test_filter_by_model() + test_filter_by_date() + test_end_to_end_jsonl_output() + test_terse_length_constraint() + print("\nAll smoke tests passed.")