diff --git a/scripts/session_pair_harvester.py b/scripts/session_pair_harvester.py new file mode 100644 index 0000000..82ce640 --- /dev/null +++ b/scripts/session_pair_harvester.py @@ -0,0 +1,234 @@ +#!/usr/bin/env python3 +""" +Session Transcript → Training Pair Harvester + +Scans Hermes session JSONL files for Q&A patterns and extracts +terse→rich training pairs. Outputs JSONL matching the timmy-config +training pairs spec. + +Usage: + python3 scripts/session_pair_harvester.py ~/.hermes/sessions/ + python3 scripts/session_pair_harvester.py session.jsonl --output pairs.jsonl + python3 scripts/session_pair_harvester.py --dir ~/.hermes/sessions/ --min-ratio 2.0 + +Output format: + {"terse": "user short prompt", "rich": "ai detailed response", "source": "session_id", "model": "..."} +""" + +import argparse +import hashlib +import json +import sys +from pathlib import Path +from typing import Optional + + +def compute_hash(text: str) -> str: + """Content hash for deduplication.""" + return hashlib.sha256(text.encode()).hexdigest()[:16] + + +def extract_pairs_from_session(session_data: dict, min_ratio: float = 1.5, + min_response_words: int = 20) -> list: + """Extract terse→rich pairs from a single session object.""" + pairs = [] + conversations = session_data.get("conversations", []) + session_id = session_data.get("id", "unknown") + model = session_data.get("model", "unknown") + + seen_hashes = set() + + for i, msg in enumerate(conversations): + # Look for assistant/gpt responses + if msg.get("from") not in ("gpt", "assistant"): + continue + + response_text = msg.get("value", "") + if not response_text or len(response_text.split()) < min_response_words: + continue + + # Find the preceding human message + prompt_text = "" + for j in range(i - 1, -1, -1): + if conversations[j].get("from") == "human": + prompt_text = conversations[j].get("value", "") + break + + if not prompt_text: + continue + + # Filter: skip tool results, system messages embedded as human + if prompt_text.startswith("{") and "output" in prompt_text[:100]: + continue # likely a tool result + if prompt_text.startswith("# SOUL.md") or prompt_text.startswith("You are"): + continue # system prompt leak + + # Quality filters + prompt_words = len(prompt_text.split()) + response_words = len(response_text.split()) + + # Must have meaningful length ratio + if prompt_words == 0 or response_words == 0: + continue + ratio = response_words / prompt_words + if ratio < min_ratio: + continue + + # Skip responses that are mostly code + code_blocks = response_text.count("```") + if code_blocks >= 4 and len(response_text.replace("```", "").strip()) < 50: + continue + + # Skip responses with tool call artifacts + if "tool_call" in response_text[:100] or "function_call" in response_text[:100]: + continue + + # Deduplicate by content hash + content_hash = compute_hash(prompt_text + response_text[:200]) + if content_hash in seen_hashes: + continue + seen_hashes.add(content_hash) + + # Clean up response: remove markdown headers if too many + clean_response = response_text + + pairs.append({ + "terse": prompt_text.strip(), + "rich": clean_response.strip(), + "source": session_id, + "model": model, + "prompt_words": prompt_words, + "response_words": response_words, + "ratio": round(ratio, 2), + }) + + return pairs + + +def extract_from_jsonl_file(filepath: str, **kwargs) -> list: + """Extract pairs from a session JSONL file.""" + pairs = [] + path = Path(filepath) + + if not path.exists(): + print(f"Warning: {filepath} not found", file=sys.stderr) + return pairs + + content = path.read_text() + lines = content.strip().split("\n") + + for line in lines: + line = line.strip() + if not line: + continue + try: + session = json.loads(line) + except json.JSONDecodeError: + continue + + session_pairs = extract_pairs_from_session(session, **kwargs) + pairs.extend(session_pairs) + + return pairs + + +def deduplicate_pairs(pairs: list) -> list: + """Remove duplicate pairs across files.""" + seen = set() + unique = [] + for pair in pairs: + key = compute_hash(pair["terse"] + pair["rich"][:200]) + if key not in seen: + seen.add(key) + unique.append(pair) + return unique + + +def main(): + parser = argparse.ArgumentParser(description="Harvest training pairs from session transcripts") + parser.add_argument("input", nargs="?", help="Session JSONL file or directory") + parser.add_argument("--dir", "-d", help="Directory to scan for session files") + parser.add_argument("--output", "-o", default="harvested_pairs.jsonl", help="Output file") + parser.add_argument("--min-ratio", type=float, default=1.5, help="Min response/prompt word ratio") + parser.add_argument("--min-words", type=int, default=20, help="Min response word count") + parser.add_argument("--dry-run", action="store_true", help="Print stats without writing") + args = parser.parse_args() + + all_pairs = [] + files_scanned = 0 + + scan_dir = args.dir or args.input + if not scan_dir: + parser.print_help() + sys.exit(1) + + scan_path = Path(scan_dir) + if scan_path.is_dir(): + jsonl_files = sorted(scan_path.rglob("*.jsonl")) + print(f"Scanning {len(jsonl_files)} files in {scan_dir}...", file=sys.stderr) + for fpath in jsonl_files: + pairs = extract_from_jsonl_file( + str(fpath), + min_ratio=args.min_ratio, + min_response_words=args.min_words + ) + all_pairs.extend(pairs) + files_scanned += 1 + else: + pairs = extract_from_jsonl_file( + str(scan_path), + min_ratio=args.min_ratio, + min_response_words=args.min_words + ) + all_pairs.extend(pairs) + files_scanned = 1 + + # Deduplicate + unique_pairs = deduplicate_pairs(all_pairs) + + # Stats + if unique_pairs: + avg_prompt = sum(p["prompt_words"] for p in unique_pairs) / len(unique_pairs) + avg_response = sum(p["response_words"] for p in unique_pairs) / len(unique_pairs) + avg_ratio = sum(p["ratio"] for p in unique_pairs) / len(unique_pairs) + else: + avg_prompt = avg_response = avg_ratio = 0 + + stats = { + "files_scanned": files_scanned, + "raw_pairs": len(all_pairs), + "unique_pairs": len(unique_pairs), + "duplicates_removed": len(all_pairs) - len(unique_pairs), + "avg_prompt_words": round(avg_prompt, 1), + "avg_response_words": round(avg_response, 1), + "avg_ratio": round(avg_ratio, 2), + } + + print(json.dumps(stats, indent=2), file=sys.stderr) + + if args.dry_run: + # Print sample pairs + for pair in unique_pairs[:3]: + print(f"\n--- Source: {pair['source']} (ratio: {pair['ratio']}) ---", file=sys.stderr) + print(f"TERSE: {pair['terse'][:100]}...", file=sys.stderr) + print(f"RICH: {pair['rich'][:150]}...", file=sys.stderr) + return + + # Write output + output_path = Path(args.output) + with open(output_path, "w") as f: + for pair in unique_pairs: + # Strip internal fields for output + output = { + "terse": pair["terse"], + "rich": pair["rich"], + "source": pair["source"], + "model": pair["model"], + } + f.write(json.dumps(output) + "\n") + + print(f"\nWrote {len(unique_pairs)} pairs to {output_path}", file=sys.stderr) + + +if __name__ == "__main__": + main()