diff --git a/scripts/sampler.py b/scripts/sampler.py new file mode 100644 index 0000000..114d71c --- /dev/null +++ b/scripts/sampler.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python3 +""" +sampler.py — Score and rank sessions by harvest value. + +With 20k+ sessions on disk, we can't harvest all at once. This script +scores each session by how likely it is to contain valuable knowledge, +so the harvester processes the best ones first. + +Scoring strategy: + - Recency: last 7d=3pts, last 30d=2pts, older=1pt + - Length: >50 messages=3pts, >20=2pts, <20=1pt + - Repo uniqueness: first session for a repo=5pts, otherwise=1pt + - Outcome: failure=3pts (most to learn), success=2pts, unknown=1pt + - Tool calls: >10 tool invocations=2pts (complex sessions) + +Usage: + python3 sampler.py --count 100 # Top 100 sessions + python3 sampler.py --repo the-nexus --count 20 # Top 20 for a repo + python3 sampler.py --since 2026-04-01 # All sessions since date + python3 sampler.py --count 50 --min-score 8 # Only high-value sessions + python3 sampler.py --count 100 --output sample.json # Save to file +""" + +import argparse +import json +import os +import sys +import time +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Optional + + +# --- Fast session scanning (no full parse) --- + +def scan_session_fast(path: str) -> dict: + """Extract scoring metadata from a session without parsing the full JSONL. + + Reads only: first line, last ~20 lines, and line count. This processes + 20k sessions in seconds instead of minutes. + """ + meta = { + 'path': path, + 'message_count': 0, + 'has_tool_calls': False, + 'tool_call_count': 0, + 'first_timestamp': '', + 'last_timestamp': '', + 'is_failure': False, + 'repos_mentioned': set(), + 'first_role': '', + 'last_content_preview': '', + } + + try: + file_size = os.path.getsize(path) + if file_size == 0: + return meta + + with open(path, 'r', encoding='utf-8', errors='replace') as f: + # Read first line for timestamp + role + first_line = f.readline().strip() + if first_line: + try: + first_msg = json.loads(first_line) + meta['first_timestamp'] = first_msg.get('timestamp', '') + meta['first_role'] = first_msg.get('role', '') + except json.JSONDecodeError: + pass + + # Fast line count + collect tail lines + # For the tail, seek to near end of file + tail_lines = [] + line_count = 1 # already read first + + if file_size > 8192: + # Seek to last 8KB for tail sampling + f.seek(max(0, file_size - 8192)) + f.readline() # skip partial line + for line in f: + line = line.strip() + if line: + tail_lines.append(line) + line_count += 1 + # We lost the exact count for big files — estimate from file size + # Average JSONL line is ~500 bytes + if line_count < 100: + line_count = max(line_count, file_size // 500) + else: + # Small file — read all + for line in f: + line = line.strip() + if line: + tail_lines.append(line) + line_count += 1 + + meta['message_count'] = line_count + + # Parse tail lines for outcome, tool calls, repos + for line in tail_lines[-30:]: # last 30 non-empty lines + try: + msg = json.loads(line) + + # Track last timestamp + ts = msg.get('timestamp', '') + if ts: + meta['last_timestamp'] = ts + + # Count tool calls + if msg.get('tool_calls'): + meta['has_tool_calls'] = True + meta['tool_call_count'] += len(msg['tool_calls']) + + # Detect failure signals in content + content = '' + if isinstance(msg.get('content'), str): + content = msg['content'].lower() + elif isinstance(msg.get('content'), list): + for part in msg['content']: + if isinstance(part, dict) and part.get('type') == 'text': + content += part.get('text', '').lower() + + if content: + meta['last_content_preview'] = content[:200] + failure_signals = ['error', 'failed', 'cannot', 'unable', + 'exception', 'traceback', 'rejected', 'denied'] + if any(sig in content for sig in failure_signals): + meta['is_failure'] = True + + # Extract repo references from tool call arguments + if msg.get('tool_calls'): + for tc in msg['tool_calls']: + args = tc.get('function', {}).get('arguments', '') + if isinstance(args, str): + # Look for repo patterns + for pattern in ['Timmy_Foundation/', 'Rockachopa/', 'compounding-intelligence', 'the-nexus', 'timmy-home', 'hermes-agent', 'the-beacon', 'the-door']: + if pattern in args: + repo = pattern.rstrip('/') + meta['repos_mentioned'].add(repo) + + except json.JSONDecodeError: + continue + + except (IOError, OSError): + pass + + meta['repos_mentioned'] = list(meta['repos_mentioned']) + return meta + + +# --- Filename timestamp parsing --- + +def parse_session_timestamp(filename: str) -> Optional[datetime]: + """Parse timestamp from session filename. + + Common formats: + session_20260413_123456_hash.jsonl + 20260413_123456_hash.jsonl + """ + stem = Path(filename).stem + parts = stem.split('_') + + # Try session_YYYYMMDD_HHMMSS format + for i, part in enumerate(parts): + if len(part) == 8 and part.isdigit(): + date_part = part + time_part = parts[i + 1] if i + 1 < len(parts) and len(parts[i + 1]) == 6 else '000000' + try: + return datetime.strptime(f"{date_part}_{time_part}", '%Y%m%d_%H%M%S').replace(tzinfo=timezone.utc) + except ValueError: + continue + + # Fallback: use file modification time + return None + + +# --- Scoring --- + +def score_session(meta: dict, now: datetime, seen_repos: set) -> tuple[int, dict]: + """Score a session for harvest value. Returns (score, breakdown).""" + score = 0 + breakdown = {} + + # 1. Recency + ts = parse_session_timestamp(os.path.basename(meta['path'])) + if ts is None: + # Fallback to mtime + try: + ts = datetime.fromtimestamp(os.path.getmtime(meta['path']), tz=timezone.utc) + except OSError: + ts = now - timedelta(days=365) + + age_days = (now - ts).days + if age_days <= 7: + recency = 3 + elif age_days <= 30: + recency = 2 + else: + recency = 1 + score += recency + breakdown['recency'] = recency + + # 2. Length + count = meta['message_count'] + if count > 50: + length = 3 + elif count > 20: + length = 2 + else: + length = 1 + score += length + breakdown['length'] = length + + # 3. Repo uniqueness (first session mentioning a repo gets bonus) + repo_score = 0 + for repo in meta.get('repos_mentioned', []): + if repo not in seen_repos: + seen_repos.add(repo) + repo_score = max(repo_score, 5) + else: + repo_score = max(repo_score, 1) + score += repo_score + breakdown['repo_unique'] = repo_score + + # 4. Outcome + if meta.get('is_failure'): + outcome = 3 + elif meta.get('last_content_preview', '').strip(): + outcome = 2 # has some content = likely completed + else: + outcome = 1 + score += outcome + breakdown['outcome'] = outcome + + # 5. Tool calls + if meta.get('tool_call_count', 0) > 10: + tool = 2 + else: + tool = 0 + score += tool + breakdown['tool_calls'] = tool + + return score, breakdown + + +# --- Main --- + +def main(): + parser = argparse.ArgumentParser(description="Score and rank sessions for harvesting") + parser.add_argument('--sessions-dir', default=os.path.expanduser('~/.hermes/sessions'), + help='Directory containing session files') + parser.add_argument('--count', type=int, default=100, help='Number of top sessions to return') + parser.add_argument('--repo', default='', help='Filter to sessions mentioning this repo') + parser.add_argument('--since', default='', help='Only score sessions after this date (YYYY-MM-DD)') + parser.add_argument('--min-score', type=int, default=0, help='Minimum score threshold') + parser.add_argument('--output', default='', help='Output file (JSON). Default: stdout') + parser.add_argument('--format', choices=['json', 'paths', 'table'], default='table', + help='Output format: json (full), paths (one per line), table (human)') + parser.add_argument('--top-percent', type=float, default=0, help='Return top N%% instead of --count') + + args = parser.parse_args() + + sessions_dir = Path(args.sessions_dir) + if not sessions_dir.is_dir(): + print(f"ERROR: Sessions directory not found: {sessions_dir}", file=sys.stderr) + sys.exit(1) + + # Find all JSONL files + print(f"Scanning {sessions_dir}...", file=sys.stderr) + t0 = time.time() + + session_files = list(sessions_dir.glob('*.jsonl')) + total = len(session_files) + print(f"Found {total} session files", file=sys.stderr) + + # Parse since date + since_dt = None + if args.since: + since_dt = datetime.strptime(args.since, '%Y-%m-%d').replace(tzinfo=timezone.utc) + + # Score all sessions + now = datetime.now(timezone.utc) + seen_repos = set() # Track repos for uniqueness scoring + scored = [] + + for i, sf in enumerate(session_files): + # Date filter (fast path: check filename first) + if since_dt: + ts = parse_session_timestamp(sf.name) + if ts and ts < since_dt: + continue + + meta = scan_session_fast(str(sf)) + + # Repo filter + if args.repo: + repos = meta.get('repos_mentioned', []) + if args.repo.lower() not in [r.lower() for r in repos]: + # Also check filename + if args.repo.lower() not in sf.name.lower(): + continue + + score, breakdown = score_session(meta, now, seen_repos) + + if score >= args.min_score: + scored.append({ + 'path': str(sf), + 'filename': sf.name, + 'score': score, + 'breakdown': breakdown, + 'message_count': meta['message_count'], + 'repos': meta['repos_mentioned'], + 'is_failure': meta['is_failure'], + }) + + if (i + 1) % 5000 == 0: + elapsed = time.time() - t0 + print(f" Scanned {i + 1}/{total} ({elapsed:.1f}s)", file=sys.stderr) + + elapsed = time.time() - t0 + print(f"Scored {len(scored)} sessions in {elapsed:.1f}s", file=sys.stderr) + + # Sort by score descending + scored.sort(key=lambda x: x['score'], reverse=True) + + # Apply count or percent + if args.top_percent > 0: + count = max(1, int(len(scored) * args.top_percent / 100)) + else: + count = args.count + scored = scored[:count] + + # Output + if args.output: + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(scored, f, indent=2) + print(f"Wrote {len(scored)} sessions to {args.output}", file=sys.stderr) + elif args.format == 'json': + json.dump(scored, sys.stdout, indent=2) + elif args.format == 'paths': + for s in scored: + print(s['path']) + else: # table + print(f"{'SCORE':>5} {'MSGS':>5} {'REPOS':<25} {'FILE'}") + print(f"{'-'*5} {'-'*5} {'-'*25} {'-'*40}") + for s in scored: + repos = ', '.join(s['repos'][:2]) if s['repos'] else '-' + fail = ' FAIL' if s['is_failure'] else '' + print(f"{s['score']:>5} {s['message_count']:>5} {repos:<25} {s['filename'][:40]}{fail}") + + +if __name__ == '__main__': + main()