diff --git a/scripts/session_metadata.py b/scripts/session_metadata.py new file mode 100644 index 0000000..a934691 --- /dev/null +++ b/scripts/session_metadata.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +""" +session_metadata.py - Extract structured metadata from Hermes session transcripts. +Works alongside session_reader.py to provide higher-level session analysis. +""" + +import json +import re +import sys +from dataclasses import dataclass, asdict +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Any + +# Import from session_reader (the canonical reader) +from session_reader import read_session + + +@dataclass +class SessionSummary: + """Structured summary of a Hermes session transcript.""" + session_id: str + model: str + repo: str + outcome: str + message_count: int + tool_calls: int + duration_estimate: str + key_actions: List[str] + errors_encountered: List[str] + start_time: Optional[str] = None + end_time: Optional[str] = None + total_tokens_estimate: int = 0 + user_messages: int = 0 + assistant_messages: int = 0 + tool_outputs: int = 0 + + +def extract_session_metadata(file_path: str) -> SessionSummary: + """ + Extract structured metadata from a Hermes session JSONL transcript. + Uses session_reader.read_session() for file reading. + """ + session_id = Path(file_path).stem + messages = [] + model = "unknown" + repo = "unknown" + tool_calls_count = 0 + key_actions = [] + errors = [] + start_time = None + end_time = None + total_tokens = 0 + + # Common repo patterns to look for + repo_patterns = [ + r"(?:the-nexus|compounding-intelligence|timmy-config|hermes-agent)", + r"(?:forge\.alexanderwhitestone\.com/([^/]+/[^/\\s]+))", + r"(?:github\.com/([^/]+/[^/\\s]+))", + r"(?:Timmy_Foundation/([^/\\s]+))", + ] + + try: + # Use the canonical reader from session_reader.py + messages = read_session(file_path) + except FileNotFoundError: + return SessionSummary( + session_id=session_id, + model="unknown", + repo="unknown", + outcome="failure", + message_count=0, + tool_calls=0, + duration_estimate="0m", + key_actions=[], + errors_encountered=[f"File not found: {file_path}"] + ) + + # Process messages for metadata + for entry in messages: + # Extract model from assistant messages + if entry.get("role") == "assistant" and entry.get("model"): + model = entry["model"] + + # Extract timestamps + if entry.get("timestamp"): + ts = entry["timestamp"] + if start_time is None: + start_time = ts + end_time = ts + + # Count tool calls + if entry.get("tool_calls"): + tool_calls_count += len(entry["tool_calls"]) + for tc in entry["tool_calls"]: + if tc.get("function", {}).get("name"): + action = f"{tc['function']['name']}" + if action not in key_actions: + key_actions.append(action) + + # Estimate tokens from content length + content = entry.get("content", "") + if isinstance(content, str): + total_tokens += len(content.split()) + elif isinstance(content, list): + for item in content: + if isinstance(item, dict) and "text" in item: + total_tokens += len(item["text"].split()) + + # Look for repo mentions in content + if entry.get("content"): + content_str = str(entry["content"]) + for pattern in repo_patterns: + match = re.search(pattern, content_str, re.IGNORECASE) + if match: + if match.groups(): + repo = match.group(1) + else: + repo = match.group(0) + break + + # Look for error messages + if entry.get("role") == "tool" and entry.get("is_error"): + error_msg = entry.get("content", "Unknown error") + if isinstance(error_msg, str) and len(error_msg) < 200: + errors.append(error_msg[:200]) + + # Count message types + user_messages = sum(1 for m in messages if m.get("role") == "user") + assistant_messages = sum(1 for m in messages if m.get("role") == "assistant") + tool_outputs = sum(1 for m in messages if m.get("role") == "tool") + + # Calculate duration estimate + duration_estimate = "unknown" + if start_time and end_time: + try: + # Try to parse timestamps + start_dt = None + end_dt = None + + # Handle various timestamp formats + for fmt in ["%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%d %H:%M:%S"]: + try: + if start_dt is None: + start_dt = datetime.strptime(start_time, fmt) + if end_dt is None: + end_dt = datetime.strptime(end_time, fmt) + except ValueError: + continue + + if start_dt and end_dt: + duration = end_dt - start_dt + minutes = duration.total_seconds() / 60 + duration_estimate = f"{minutes:.0f}m" + except Exception: + pass + + # Classify outcome + outcome = "unknown" + if errors: + # Check if any errors are fatal + fatal_errors = any("405" in e or "permission" in e.lower() or "authentication" in e.lower() + for e in errors) + if fatal_errors: + outcome = "failure" + else: + outcome = "partial" + elif messages: + # Check last message for success indicators + last_msg = messages[-1] + if last_msg.get("role") == "assistant": + content = last_msg.get("content", "") + if isinstance(content, str): + success_indicators = ["done", "completed", "success", "merged", "pushed"] + if any(indicator in content.lower() for indicator in success_indicators): + outcome = "success" + else: + outcome = "unknown" + + # Deduplicate key actions (keep unique, limit to 10) + unique_actions = [] + for action in key_actions: + if action not in unique_actions: + unique_actions.append(action) + if len(unique_actions) >= 10: + break + + # Deduplicate errors (keep unique, limit to 5) + unique_errors = [] + for error in errors: + if error not in unique_errors: + unique_errors.append(error) + if len(unique_errors) >= 5: + break + + return SessionSummary( + session_id=session_id, + model=model, + repo=repo, + outcome=outcome, + message_count=len(messages), + tool_calls=tool_calls_count, + duration_estimate=duration_estimate, + key_actions=unique_actions, + errors_encountered=unique_errors, + start_time=start_time, + end_time=end_time, + total_tokens_estimate=total_tokens, + user_messages=user_messages, + assistant_messages=assistant_messages, + tool_outputs=tool_outputs + ) + + +def process_session_directory(directory_path: str, output_file: Optional[str] = None) -> List[SessionSummary]: + """ + Process all JSONL files in a directory. + """ + directory = Path(directory_path) + if not directory.exists(): + print(f"Error: Directory {directory_path} does not exist", file=sys.stderr) + return [] + + jsonl_files = list(directory.glob("*.jsonl")) + if not jsonl_files: + print(f"Warning: No JSONL files found in {directory_path}", file=sys.stderr) + return [] + + summaries = [] + for jsonl_file in sorted(jsonl_files): + print(f"Processing {jsonl_file.name}...", file=sys.stderr) + summary = extract_session_metadata(str(jsonl_file)) + summaries.append(summary) + + if output_file: + with open(output_file, 'w', encoding='utf-8') as f: + json.dump([asdict(s) for s in summaries], f, indent=2) + print(f"Wrote {len(summaries)} summaries to {output_file}", file=sys.stderr) + + return summaries + + +def main(): + """CLI entry point.""" + import argparse + + parser = argparse.ArgumentParser(description="Extract metadata from Hermes session JSONL transcripts") + parser.add_argument("path", help="Path to JSONL file or directory of session files") + parser.add_argument("-o", "--output", help="Output JSON file (default: stdout)") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") + + args = parser.parse_args() + + path = Path(args.path) + + if path.is_file(): + summary = extract_session_metadata(str(path)) + if args.output: + with open(args.output, 'w') as f: + json.dump(asdict(summary), f, indent=2) + print(f"Wrote summary to {args.output}", file=sys.stderr) + else: + print(json.dumps(asdict(summary), indent=2)) + + elif path.is_dir(): + summaries = process_session_directory(str(path), args.output) + if not args.output: + print(json.dumps([asdict(s) for s in summaries], indent=2)) + + else: + print(f"Error: {args.path} is not a file or directory", file=sys.stderr) + sys.exit(1) + + +if __name__ == "__main__": + main()