Main harvester module that chains: session_reader → extraction prompt → LLM → validate → deduplicate → store Includes: - scripts/harvester.py — main module (reader + prompt + storage pipeline) - scripts/session_reader.py — JSONL transcript parser - scripts/test_harvester_pipeline.py — smoke tests (all passing) Pipeline: 1. Read session JSONL via session_reader 2. Truncate long sessions (first 50 + last 50 messages) 3. Send transcript + extraction prompt to LLM (mimo-v2-pro) 4. Parse structured JSON response (facts/pitfalls/patterns/quirks/questions) 5. Validate fields + confidence threshold 6. Deduplicate against knowledge/index.json (fingerprint + word overlap) 7. Write to knowledge store (index.json + per-repo markdown) CLI: Single: python3 harvester.py --session <path> --output knowledge/ Batch: python3 harvester.py --batch --since 2026-04-01 --limit 100 Dry-run: python3 harvester.py --session <path> --dry-run
143 lines
4.8 KiB
Python
143 lines
4.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
session_reader.py — Parse Hermes session JSONL transcripts.
|
|
|
|
Each line in a session file is a JSON object representing a message.
|
|
Standard fields: role (user|assistant|system), content (str), timestamp (str).
|
|
Tool calls and tool results are also captured.
|
|
"""
|
|
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Iterator, Optional
|
|
|
|
|
|
def read_session(path: str) -> list[dict]:
|
|
"""Read a session JSONL file and return all messages as a list."""
|
|
messages = []
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
msg = json.loads(line)
|
|
messages.append(msg)
|
|
except json.JSONDecodeError as e:
|
|
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
|
|
return messages
|
|
|
|
|
|
def read_session_iter(path: str) -> Iterator[dict]:
|
|
"""Iterate over session messages without loading all into memory."""
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
for line_num, line in enumerate(f, 1):
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
try:
|
|
yield json.loads(line)
|
|
except json.JSONDecodeError as e:
|
|
print(f"WARNING: Skipping malformed JSON at line {line_num}: {e}", file=sys.stderr)
|
|
|
|
|
|
def extract_conversation(messages: list[dict]) -> list[dict]:
|
|
"""Extract user/assistant conversation turns, skipping tool-only messages."""
|
|
conversation = []
|
|
for msg in messages:
|
|
role = msg.get('role', '')
|
|
content = msg.get('content', '')
|
|
|
|
# Skip empty messages and pure tool calls
|
|
if role in ('user', 'assistant', 'system'):
|
|
if isinstance(content, str) and content.strip():
|
|
conversation.append({
|
|
'role': role,
|
|
'content': content.strip(),
|
|
'timestamp': msg.get('timestamp', '')
|
|
})
|
|
elif isinstance(content, list):
|
|
# Multimodal content — extract text parts
|
|
text_parts = []
|
|
for part in content:
|
|
if isinstance(part, dict) and part.get('type') == 'text':
|
|
text_parts.append(part.get('text', ''))
|
|
if text_parts:
|
|
conversation.append({
|
|
'role': role,
|
|
'content': '\n'.join(text_parts),
|
|
'timestamp': msg.get('timestamp', '')
|
|
})
|
|
return conversation
|
|
|
|
|
|
def truncate_for_context(messages: list[dict], head: int = 50, tail: int = 50) -> list[dict]:
|
|
"""Truncate long sessions: keep first N + last N messages.
|
|
|
|
This preserves session start (initial context) and end (final results),
|
|
skipping the messy middle of long debugging sessions.
|
|
"""
|
|
if len(messages) <= head + tail:
|
|
return messages
|
|
|
|
truncated = messages[:head]
|
|
truncated.append({
|
|
'role': 'system',
|
|
'content': f'[{len(messages) - head - tail} messages truncated]',
|
|
'timestamp': ''
|
|
})
|
|
truncated.extend(messages[-tail:])
|
|
return truncated
|
|
|
|
|
|
def messages_to_text(messages: list[dict]) -> str:
|
|
"""Convert message list to plain text for LLM consumption."""
|
|
lines = []
|
|
for msg in messages:
|
|
role = msg.get('role', 'unknown').upper()
|
|
content = msg.get('content', '')
|
|
if msg.get('role') == 'system' and 'truncated' in content:
|
|
lines.append(f'--- {content} ---')
|
|
else:
|
|
lines.append(f'{role}: {content}')
|
|
return '\n\n'.join(lines)
|
|
|
|
|
|
def get_session_metadata(path: str) -> dict:
|
|
"""Extract metadata from a session file (first message often has config info)."""
|
|
messages = read_session(path)
|
|
if not messages:
|
|
return {'path': path, 'message_count': 0}
|
|
|
|
first = messages[0]
|
|
last = messages[-1]
|
|
|
|
return {
|
|
'path': path,
|
|
'message_count': len(messages),
|
|
'first_timestamp': first.get('timestamp', ''),
|
|
'last_timestamp': last.get('timestamp', ''),
|
|
'first_role': first.get('role', ''),
|
|
'has_tool_calls': any(m.get('tool_calls') for m in messages),
|
|
}
|
|
|
|
|
|
if __name__ == '__main__':
|
|
if len(sys.argv) < 2:
|
|
print(f"Usage: {sys.argv[0]} <session.jsonl>")
|
|
sys.exit(1)
|
|
|
|
path = sys.argv[1]
|
|
meta = get_session_metadata(path)
|
|
print(json.dumps(meta, indent=2))
|
|
|
|
messages = read_session(path)
|
|
conv = extract_conversation(messages)
|
|
print(f"\nConversation: {len(conv)} turns")
|
|
|
|
truncated = truncate_for_context(conv)
|
|
print(f"After truncation: {len(truncated)} turns")
|
|
print(f"\nPreview (first 500 chars):")
|
|
print(messages_to_text(truncated[:5])[:500])
|