#!/usr/bin/env python3 """ session_metadata.py - Extract structured metadata from Hermes session transcripts. Works alongside session_reader.py to provide higher-level session analysis. """ import json import re import sys from dataclasses import dataclass, asdict from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any # Import from session_reader (the canonical reader) from session_reader import read_session @dataclass class SessionSummary: """Structured summary of a Hermes session transcript.""" session_id: str model: str repo: str outcome: str message_count: int tool_calls: int duration_estimate: str key_actions: List[str] errors_encountered: List[str] start_time: Optional[str] = None end_time: Optional[str] = None total_tokens_estimate: int = 0 user_messages: int = 0 assistant_messages: int = 0 tool_outputs: int = 0 def extract_session_metadata(file_path: str) -> SessionSummary: """ Extract structured metadata from a Hermes session JSONL transcript. Uses session_reader.read_session() for file reading. """ session_id = Path(file_path).stem messages = [] model = "unknown" repo = "unknown" tool_calls_count = 0 key_actions = [] errors = [] start_time = None end_time = None total_tokens = 0 # Common repo patterns to look for repo_patterns = [ r"(?:the-nexus|compounding-intelligence|timmy-config|hermes-agent)", r"(?:forge\.alexanderwhitestone\.com/([^/]+/[^/\\s]+))", r"(?:github\.com/([^/]+/[^/\\s]+))", r"(?:Timmy_Foundation/([^/\\s]+))", ] try: # Use the canonical reader from session_reader.py messages = read_session(file_path) except FileNotFoundError: return SessionSummary( session_id=session_id, model="unknown", repo="unknown", outcome="failure", message_count=0, tool_calls=0, duration_estimate="0m", key_actions=[], errors_encountered=[f"File not found: {file_path}"] ) # Process messages for metadata for entry in messages: # Extract model from assistant messages if entry.get("role") == "assistant" and entry.get("model"): model = entry["model"] # Extract timestamps if entry.get("timestamp"): ts = entry["timestamp"] if start_time is None: start_time = ts end_time = ts # Count tool calls if entry.get("tool_calls"): tool_calls_count += len(entry["tool_calls"]) for tc in entry["tool_calls"]: if tc.get("function", {}).get("name"): action = f"{tc['function']['name']}" if action not in key_actions: key_actions.append(action) # Estimate tokens from content length content = entry.get("content", "") if isinstance(content, str): total_tokens += len(content.split()) elif isinstance(content, list): for item in content: if isinstance(item, dict) and "text" in item: total_tokens += len(item["text"].split()) # Look for repo mentions in content if entry.get("content"): content_str = str(entry["content"]) for pattern in repo_patterns: match = re.search(pattern, content_str, re.IGNORECASE) if match: if match.groups(): repo = match.group(1) else: repo = match.group(0) break # Look for error messages if entry.get("role") == "tool" and entry.get("is_error"): error_msg = entry.get("content", "Unknown error") if isinstance(error_msg, str) and len(error_msg) < 200: errors.append(error_msg[:200]) # Count message types user_messages = sum(1 for m in messages if m.get("role") == "user") assistant_messages = sum(1 for m in messages if m.get("role") == "assistant") tool_outputs = sum(1 for m in messages if m.get("role") == "tool") # Calculate duration estimate duration_estimate = "unknown" if start_time and end_time: try: # Try to parse timestamps start_dt = None end_dt = None # Handle various timestamp formats for fmt in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"]: try: if start_dt is None: start_dt = datetime.strptime(start_time, fmt) if end_dt is None: end_dt = datetime.strptime(end_time, fmt) except ValueError: continue if start_dt and end_dt: duration = end_dt - start_dt minutes = duration.total_seconds() / 60 duration_estimate = f"{minutes:.0f}m" except Exception: pass # Classify outcome outcome = "unknown" if errors: # Check if any errors are fatal fatal_errors = any("405" in e or "permission" in e.lower() or "authentication" in e.lower() for e in errors) if fatal_errors: outcome = "failure" else: outcome = "partial" elif messages: # Check last message for success indicators last_msg = messages[-1] if last_msg.get("role") == "assistant": content = last_msg.get("content", "") if isinstance(content, str): success_indicators = ["done", "completed", "success", "merged", "pushed"] if any(indicator in content.lower() for indicator in success_indicators): outcome = "success" else: outcome = "unknown" # Deduplicate key actions (keep unique, limit to 10) unique_actions = [] for action in key_actions: if action not in unique_actions: unique_actions.append(action) if len(unique_actions) >= 10: break # Deduplicate errors (keep unique, limit to 5) unique_errors = [] for error in errors: if error not in unique_errors: unique_errors.append(error) if len(unique_errors) >= 5: break return SessionSummary( session_id=session_id, model=model, repo=repo, outcome=outcome, message_count=len(messages), tool_calls=tool_calls_count, duration_estimate=duration_estimate, key_actions=unique_actions, errors_encountered=unique_errors, start_time=start_time, end_time=end_time, total_tokens_estimate=total_tokens, user_messages=user_messages, assistant_messages=assistant_messages, tool_outputs=tool_outputs ) def process_session_directory(directory_path: str, output_file: Optional[str] = None) -> List[SessionSummary]: """ Process all JSONL files in a directory. """ directory = Path(directory_path) if not directory.exists(): print(f"Error: Directory {directory_path} does not exist", file=sys.stderr) return [] jsonl_files = list(directory.glob("*.jsonl")) if not jsonl_files: print(f"Warning: No JSONL files found in {directory_path}", file=sys.stderr) return [] summaries = [] for jsonl_file in sorted(jsonl_files): print(f"Processing {jsonl_file.name}...", file=sys.stderr) summary = extract_session_metadata(str(jsonl_file)) summaries.append(summary) if output_file: with open(output_file, 'w', encoding='utf-8') as f: json.dump([asdict(s) for s in summaries], f, indent=2) print(f"Wrote {len(summaries)} summaries to {output_file}", file=sys.stderr) return summaries def main(): """CLI entry point.""" import argparse parser = argparse.ArgumentParser(description="Extract metadata from Hermes session JSONL transcripts") parser.add_argument("path", help="Path to JSONL file or directory of session files") parser.add_argument("-o", "--output", help="Output JSON file (default: stdout)") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args() path = Path(args.path) if path.is_file(): summary = extract_session_metadata(str(path)) if args.output: with open(args.output, 'w') as f: json.dump(asdict(summary), f, indent=2) print(f"Wrote summary to {args.output}", file=sys.stderr) else: print(json.dumps(asdict(summary), indent=2)) elif path.is_dir(): summaries = process_session_directory(str(path), args.output) if not args.output: print(json.dumps([asdict(s) for s in summaries], indent=2)) else: print(f"Error: {args.path} is not a file or directory", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main()