#!/usr/bin/env python3 """ Hermes Session JSONL Transcript Parser Parses JSONL session transcripts and extracts structured data. Part of the compounding-intelligence harvester pipeline. """ import json import re import sys import os from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from dataclasses import dataclass, asdict @dataclass class SessionSummary: """Structured summary of a Hermes session transcript.""" session_id: str model: str repo: str outcome: str message_count: int tool_calls: int duration_estimate: str key_actions: List[str] errors_encountered: List[str] start_time: Optional[str] = None end_time: Optional[str] = None total_tokens_estimate: int = 0 user_messages: int = 0 assistant_messages: int = 0 tool_outputs: int = 0 def parse_jsonl_session(file_path: str) -> SessionSummary: """ Parse a Hermes session JSONL transcript and extract structured data. Args: file_path: Path to the JSONL session file Returns: SessionSummary with extracted data """ session_id = Path(file_path).stem messages = [] model = "unknown" repo = "unknown" tool_calls_count = 0 key_actions = [] errors = [] start_time = None end_time = None total_tokens = 0 # Common repo patterns to look for repo_patterns = [ r"(?:the-nexus|compounding-intelligence|timmy-config|hermes-agent)", r"(?:forge\.alexanderwhitestone\.com/([^/]+/[^/\\s]+))", r"(?:github\.com/([^/]+/[^/\\s]+))", r"(?:Timmy_Foundation/([^/\\s]+))", ] # Read JSONL file try: with open(file_path, 'r', encoding='utf-8') as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: entry = json.loads(line) except json.JSONDecodeError as e: errors.append(f"Line {line_num}: Invalid JSON - {e}") continue messages.append(entry) # Extract model from assistant messages if entry.get("role") == "assistant" and entry.get("model"): model = entry["model"] # Extract timestamps if entry.get("timestamp"): ts = entry["timestamp"] if start_time is None: start_time = ts end_time = ts # Count tool calls if entry.get("tool_calls"): tool_calls_count += len(entry["tool_calls"]) for tc in entry["tool_calls"]: if tc.get("function", {}).get("name"): action = f"{tc['function']['name']}" if action not in key_actions: key_actions.append(action) # Estimate tokens from content length content = entry.get("content", "") if isinstance(content, str): total_tokens += len(content.split()) elif isinstance(content, list): for item in content: if isinstance(item, dict) and "text" in item: total_tokens += len(item["text"].split()) # Look for repo mentions in content if entry.get("content"): content_str = str(entry["content"]) for pattern in repo_patterns: match = re.search(pattern, content_str, re.IGNORECASE) if match: if match.groups(): repo = match.group(1) else: repo = match.group(0) break # Look for error messages if entry.get("role") == "tool" and entry.get("is_error"): error_msg = entry.get("content", "Unknown error") if isinstance(error_msg, str) and len(error_msg) < 200: errors.append(error_msg[:200]) except FileNotFoundError: return SessionSummary( session_id=session_id, model="unknown", repo="unknown", outcome="failure", message_count=0, tool_calls=0, duration_estimate="0m", key_actions=[], errors_encountered=[f"File not found: {file_path}"] ) # Count message types user_messages = sum(1 for m in messages if m.get("role") == "user") assistant_messages = sum(1 for m in messages if m.get("role") == "assistant") tool_outputs = sum(1 for m in messages if m.get("role") == "tool") # Calculate duration estimate duration_estimate = "unknown" if start_time and end_time: try: # Try to parse timestamps start_dt = None end_dt = None # Handle various timestamp formats for fmt in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"]: try: if start_dt is None: start_dt = datetime.strptime(start_time, fmt) if end_dt is None: end_dt = datetime.strptime(end_time, fmt) except ValueError: continue if start_dt and end_dt: duration = end_dt - start_dt minutes = duration.total_seconds() / 60 duration_estimate = f"{minutes:.0f}m" except Exception: pass # Classify outcome outcome = "unknown" if errors: # Check if any errors are fatal fatal_errors = any("405" in e or "permission" in e.lower() or "authentication" in e.lower() for e in errors) if fatal_errors: outcome = "failure" else: outcome = "partial" elif messages: # Check last message for success indicators last_msg = messages[-1] if last_msg.get("role") == "assistant": content = last_msg.get("content", "") if isinstance(content, str): success_indicators = ["done", "completed", "success", "merged", "pushed"] if any(indicator in content.lower() for indicator in success_indicators): outcome = "success" else: outcome = "unknown" # Deduplicate key actions (keep unique, limit to 10) unique_actions = [] for action in key_actions: if action not in unique_actions: unique_actions.append(action) if len(unique_actions) >= 10: break # Deduplicate errors (keep unique, limit to 5) unique_errors = [] for error in errors: if error not in unique_errors: unique_errors.append(error) if len(unique_errors) >= 5: break return SessionSummary( session_id=session_id, model=model, repo=repo, outcome=outcome, message_count=len(messages), tool_calls=tool_calls_count, duration_estimate=duration_estimate, key_actions=unique_actions, errors_encountered=unique_errors, start_time=start_time, end_time=end_time, total_tokens_estimate=total_tokens, user_messages=user_messages, assistant_messages=assistant_messages, tool_outputs=tool_outputs ) def process_session_directory(directory_path: str, output_file: Optional[str] = None) -> List[SessionSummary]: """ Process all JSONL files in a directory. Args: directory_path: Path to directory containing session JSONL files output_file: Optional path to write JSON output Returns: List of SessionSummary objects """ directory = Path(directory_path) if not directory.exists(): print(f"Error: Directory {directory_path} does not exist", file=sys.stderr) return [] jsonl_files = list(directory.glob("session_*.jsonl")) if not jsonl_files: print(f"Warning: No session_*.jsonl files found in {directory_path}", file=sys.stderr) return [] summaries = [] for jsonl_file in sorted(jsonl_files): print(f"Processing {jsonl_file.name}...", file=sys.stderr) summary = parse_jsonl_session(str(jsonl_file)) summaries.append(summary) if output_file: with open(output_file, 'w', encoding='utf-8') as f: json.dump([asdict(s) for s in summaries], f, indent=2) print(f"Wrote {len(summaries)} summaries to {output_file}", file=sys.stderr) return summaries def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser(description="Parse Hermes session JSONL transcripts") parser.add_argument("path", help="Path to JSONL file or directory of session files") parser.add_argument("-o", "--output", help="Output JSON file (default: stdout)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args() path = Path(args.path) if path.is_file(): summary = parse_jsonl_session(str(path)) if args.output: with open(args.output, 'w') as f: json.dump(asdict(summary), f, indent=2) print(f"Wrote summary to {args.output}", file=sys.stderr) else: print(json.dumps(asdict(summary), indent=2)) elif path.is_dir(): summaries = process_session_directory(str(path), args.output) if not args.output: print(json.dumps([asdict(s) for s in summaries], indent=2)) else: print(f"Error: {args.path} is not a file or directory", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()