#!/usr/bin/env python3 """ knowledge_to_training_pairs.py — Convert quality-gated knowledge entries into training pairs. Reads knowledge/index.json (or a custom JSONL of entries), applies quality filters, and emits terse→rich training pairs in JSONL format for model fine-tuning. Usage: python3 scripts/knowledge_to_training_pairs.py \ --input knowledge/index.json \ --output training_pairs.jsonl \ --min-confidence 0.7 \ --model-filter claude-sonnet,gpt-4 \ --after 2026-01-01 Input entry format (from index.json facts): { "id": "hermes-agent:pitfall:001", "fact": "deploy-crons.py leaves jobs in mixed model format", "category": "pitfall", "domain": "hermes-agent", "confidence": 0.95, ... } Output training pair format: { "terse": "How do I handle deploy-crons.py mixed model format?", "rich": "deploy-crons.py leaves jobs in mixed model format.", "domain": "hermes-agent", "source_confidence": 0.95, "source_model": "unknown" } """ import argparse import json import os import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional def fact_to_terse(fact: str, category: str, domain: str) -> str: """ Derive a short user query from a knowledge fact. Strategy: - Pitfalls → "How do I avoid/handle/fix ?" - Patterns → "What's the recommended way to ?" - Tool quirks → "How does behave in ?" - Facts → "What should I know about ?" - Questions → "What is the answer to: ?" """ fact_lower = fact.lower() # Extract a concise excerpt (first sentence or 80 chars) excerpt = fact.split('. ')[0] if '. ' in fact else fact[:80] if category == "pitfall": verbs = ["avoid", "handle", "fix", "prevent"] # pick verb based on fact wording if "trigger" in fact_lower or "cause" in fact_lower: verb = "avoid" elif "broken" in fact_lower or "fails" in fact_lower: verb = "fix" else: verb = "handle" return f"How do I {verb} {excerpt.rstrip('.')}?" elif category == "pattern": return f"What's the recommended way to {excerpt.rstrip('.')}?" elif category == "tool-quirk": # Try to extract tool name tool = fact.split()[0] if fact.split() else domain return f"How does {tool} behave in this context?" elif category == "question": return f"What is the answer to: {excerpt}?" else: # fact or unknown return f"What should I know about {excerpt.rstrip('.')}?" def parse_date(date_str: Optional[str]) -> Optional[datetime]: """Parse ISO date string to datetime, or return None.""" if not date_str: return None try: return datetime.fromisoformat(date_str.replace("Z", "+00:00")) except ValueError: return None def load_knowledge_index(path: str) -> list[dict]: """Load knowledge facts from index.json (or plain JSONL of entries).""" p = Path(path) if not p.exists(): print(f"ERROR: Knowledge input not found: {path}", file=sys.stderr) sys.exit(1) with open(p) as f: data = json.load(f) # index.json format: {"facts": [...], ...} if isinstance(data, dict) and "facts" in data: return data["facts"] # JSONL format: one entry per line if isinstance(data, list): return data # Plain file with JSON array print(f"ERROR: Unrecognized input format in {path}", file=sys.stderr) sys.exit(1) def filter_entries(entries: list[dict], min_confidence: float = 0.0, model_filter: Optional[list[str]] = None, after: Optional[datetime] = None, before: Optional[datetime] = None) -> list[dict]: """Apply quality and provenance filters.""" filtered = [] for entry in entries: # Confidence filter (entry confidence) conf = entry.get("confidence", 0.0) if conf < min_confidence: continue # Model filter: if specified, entry's model must be in the list if model_filter: entry_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown")) if entry_model not in model_filter: continue # Date filter: use last_confirmed or first_seen or harvested_at entry_date = None for field in ("last_confirmed", "first_seen", "harvested_at"): if field in entry: entry_date = parse_date(entry[field]) if entry_date: break if after and entry_date and entry_date < after: continue if before and entry_date and entry_date > before: continue filtered.append(entry) return filtered def entry_to_pair(entry: dict) -> dict: """Convert a knowledge entry into a training pair.""" fact = entry.get("fact", "").strip() if not fact: return None category = entry.get("category", "fact") domain = entry.get("domain", "global") terse = fact_to_terse(fact, category, domain) rich = fact source_confidence = round(entry.get("confidence", 0.0), 4) source_model = entry.get("model", entry.get("provenance", {}).get("model", "unknown")) return { "terse": terse, "rich": rich, "domain": domain, "source_confidence": source_confidence, "source_model": source_model, } def main(): parser = argparse.ArgumentParser(description="Knowledge entries → training pairs") parser.add_argument("--input", "-i", default="knowledge/index.json", help="Input knowledge index or JSONL (default: knowledge/index.json)") parser.add_argument("--output", "-o", default="training_pairs.jsonl", help="Output JSONL file") parser.add_argument("--min-confidence", type=float, default=0.5, help="Minimum entry confidence to include (0.0-1.0, default: 0.5)") parser.add_argument("--model-filter", help="Comma-separated list of source models to include") parser.add_argument("--after", help="Include entries last_confirmed/first_seen on or after this date (YYYY-MM-DD)") parser.add_argument("--before", help="Include entries last_confirmed/first_seen on or before this date (YYYY-MM-DD)") parser.add_argument("--dry-run", action="store_true", help="Print sample pairs and stats without writing") args = parser.parse_args() # Load entries = load_knowledge_index(args.input) print(f"Loaded {len(entries)} entries from {args.input}", file=sys.stderr) # Parse filters model_list = args.model_filter.split(",") if args.model_filter else None after_dt = parse_date(args.after) if args.after else None before_dt = parse_date(args.before) if args.before else None # Filter kept = filter_entries( entries, min_confidence=args.min_confidence, model_filter=model_list, after=after_dt, before=before_dt, ) print(f"After filtering: {len(kept)} / {len(entries)} entries", file=sys.stderr) # Convert pairs = [] for entry in kept: pair = entry_to_pair(entry) if pair: pairs.append(pair) # Stats if pairs: avg_conf = sum(p["source_confidence"] for p in pairs) / len(pairs) domains = {} models = {} for p in pairs: domains[p["domain"]] = domains.get(p["domain"], 0) + 1 models[p["source_model"]] = models.get(p["source_model"], 0) + 1 else: avg_conf = 0.0 domains = {} models = {} stats = { "input_entries": len(entries), "after_filter": len(kept), "pairs_generated": len(pairs), "avg_confidence": round(avg_conf, 4), "domains": domains, "source_models": models, } print(json.dumps(stats, indent=2), file=sys.stderr) if args.dry_run: print("\nSample pairs:", file=sys.stderr) for p in pairs[:3]: print(json.dumps(p, ensure_ascii=False), file=sys.stderr) return # Write JSONL out_path = Path(args.output) out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "w", encoding="utf-8") as f: for pair in pairs: f.write(json.dumps(pair, ensure_ascii=False) + "\n") print(f"\nWrote {len(pairs)} training pairs to {out_path}", file=sys.stderr) if __name__ == "__main__": main()