#!/usr/bin/env python3 """ Timmy Bridge Monitor - Complete monitoring system for Local Timmy Tracks heartbeat, artifacts, and performance metrics """ import asyncio import json import sqlite3 import time import os from datetime import datetime from pathlib import Path from dataclasses import dataclass from typing import Optional, List, Dict try: import websockets except ImportError: raise ImportError("pip install websockets") DB_PATH = Path(os.environ.get('TIMMY_DB', '/root/allegro/timmy_metrics.db')) RELAY_URL = os.environ.get('TIMMY_RELAY', 'ws://167.99.126.228:3334') @dataclass class HeartbeatEvent: timestamp: str pubkey: str event_id: str content: str latency_ms: Optional[int] = None @dataclass class ArtifactEvent: timestamp: str pubkey: str artifact_type: str reference: str size_bytes: int description: str class TimmyMonitor: """Monitors Local Timmy via Nostr relay""" def __init__(self, db_path: Path = DB_PATH, relay_url: str = RELAY_URL): self.db_path = db_path self.relay_url = relay_url self.db = None self.connect_time = None self.events_received = 0 self.init_db() def init_db(self): """Initialize SQLite database with full schema""" self.db_path.parent.mkdir(parents=True, exist_ok=True) self.db = sqlite3.connect(self.db_path) cursor = self.db.cursor() cursor.executescript(''' CREATE TABLE IF NOT EXISTS heartbeats ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, timmy_pubkey TEXT NOT NULL, event_id TEXT UNIQUE, content_preview TEXT, latency_ms INTEGER, response_time_ms INTEGER, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_heartbeats_time ON heartbeats(timestamp); CREATE INDEX IF NOT EXISTS idx_heartbeats_pubkey ON heartbeats(timmy_pubkey); CREATE TABLE IF NOT EXISTS artifacts ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT NOT NULL, timmy_pubkey TEXT NOT NULL, artifact_type TEXT, reference TEXT, size_bytes INTEGER, description TEXT, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_artifacts_time ON artifacts(timestamp); CREATE INDEX IF NOT EXISTS idx_artifacts_type ON artifacts(artifact_type); CREATE TABLE IF NOT EXISTS conversations ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT UNIQUE, started_at TEXT, ended_at TEXT, turn_count INTEGER DEFAULT 0, total_latency_ms INTEGER, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_conversations_session ON conversations(session_id); CREATE TABLE IF NOT EXISTS metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, metric_type TEXT NOT NULL, value REAL, timestamp TEXT DEFAULT CURRENT_TIMESTAMP, metadata TEXT ); CREATE INDEX IF NOT EXISTS idx_metrics_type_time ON metrics(metric_type, timestamp); ''') self.db.commit() print(f"[Monitor] Database initialized: {self.db_path}") async def listen(self): """Main WebSocket listener loop with auto-reconnect""" while True: try: print(f"[Monitor] Connecting to {self.relay_url}") async with websockets.connect(self.relay_url) as ws: self.connect_time = datetime.now() print(f"[Monitor] Connected at {self.connect_time}") # Subscribe to all events sub_id = f"timmy-monitor-{int(time.time())}" req = ["REQ", sub_id, {}] await ws.send(json.dumps(req)) print(f"[Monitor] Subscribed with ID: {sub_id}") while True: msg = await ws.recv() await self.handle_message(json.loads(msg)) except websockets.exceptions.ConnectionClosed: print("[Monitor] Connection closed, reconnecting in 5s...") await asyncio.sleep(5) except Exception as e: print(f"[Monitor] Error: {e}, reconnecting in 10s...") await asyncio.sleep(10) async def handle_message(self, data: List): """Process incoming Nostr messages""" if not isinstance(data, list) or len(data) < 2: return msg_type = data[0] if msg_type == "EVENT" and len(data) >= 3: await self.handle_event(data[2]) elif msg_type == "EOSE": print(f"[Monitor] End of stored events: {data[1]}") elif msg_type == "NOTICE": print(f"[Monitor] Relay notice: {data[1]}") async def handle_event(self, event: Dict): """Process Nostr events""" kind = event.get("kind") pubkey = event.get("pubkey") content = event.get("content", "") created_at = event.get("created_at") event_id = event.get("id") tags = event.get("tags", []) timestamp = datetime.fromtimestamp(created_at).isoformat() if created_at else datetime.now().isoformat() if kind == 1: # Short text note - heartbeat latency = self._extract_latency(content) self.log_heartbeat(pubkey, event_id, content[:200], latency) print(f"[Heartbeat] {timestamp} - {pubkey[:16]}...") elif kind == 30078: # Artifact event artifact_type = self._extract_artifact_type(tags) reference = self._extract_reference(tags) or content[:64] self.log_artifact(pubkey, artifact_type, reference, len(content), content[:200]) print(f"[Artifact] {timestamp} - {artifact_type}") elif kind == 4: # Encrypted DM print(f"[DM] {timestamp} - {pubkey[:16]}...") self.events_received += 1 def _extract_latency(self, content: str) -> Optional[int]: """Extract latency from heartbeat content""" import re match = re.search(r'(\d+)ms', content) return int(match.group(1)) if match else None def _extract_artifact_type(self, tags: List) -> str: """Extract artifact type from tags""" for tag in tags: if len(tag) >= 2 and tag[0] == "t" and "artifact-type:" in tag[1]: return tag[1].split(":")[1] return "unknown" def _extract_reference(self, tags: List) -> Optional[str]: """Extract reference from tags""" for tag in tags: if len(tag) >= 2 and tag[0] == "r": return tag[1] return None def log_heartbeat(self, pubkey: str, event_id: str, content: str, latency: Optional[int]): """Log heartbeat to database""" cursor = self.db.cursor() try: cursor.execute(''' INSERT OR IGNORE INTO heartbeats (timestamp, timmy_pubkey, event_id, content_preview, latency_ms) VALUES (?, ?, ?, ?, ?) ''', (datetime.now().isoformat(), pubkey, event_id, content, latency)) self.db.commit() except Exception as e: print(f"[Monitor] DB error (heartbeat): {e}") def log_artifact(self, pubkey: str, artifact_type: str, reference: str, size: int, description: str): """Log artifact to database""" cursor = self.db.cursor() try: cursor.execute(''' INSERT INTO artifacts (timestamp, timmy_pubkey, artifact_type, reference, size_bytes, description) VALUES (?, ?, ?, ?, ?, ?) ''', (datetime.now().isoformat(), pubkey, artifact_type, reference, size, description)) self.db.commit() except Exception as e: print(f"[Monitor] DB error (artifact): {e}") def generate_report(self, hours: int = 24) -> str: """Generate comprehensive retrospective report""" cursor = self.db.cursor() # Heartbeat metrics cursor.execute(''' SELECT COUNT(*), AVG(latency_ms), MIN(timestamp), MAX(timestamp) FROM heartbeats WHERE timestamp > datetime('now', ?) ''', (f'-{hours} hours',)) hb_count, avg_latency, first_hb, last_hb = cursor.fetchone() # Artifact metrics cursor.execute(''' SELECT COUNT(*), artifact_type, SUM(size_bytes) FROM artifacts WHERE timestamp > datetime('now', ?) GROUP BY artifact_type ''', (f'-{hours} hours',)) artifacts = cursor.fetchall() # Uptime calculation cursor.execute(''' SELECT COUNT(DISTINCT strftime('%Y-%m-%d %H', timestamp)) FROM heartbeats WHERE timestamp > datetime('now', ?) ''', (f'-{hours} hours',)) active_hours = cursor.fetchone()[0] uptime_pct = (active_hours / hours) * 100 if hours > 0 else 0 report = f"""# Timmy Retrospective Report Generated: {datetime.now().isoformat()} Period: Last {hours} hours ## Executive Summary {'✓ ACTIVE' if hb_count and hb_count > 0 else '✗ NO ACTIVITY'} - Uptime: {uptime_pct:.1f}% - Heartbeats: {hb_count or 0} - First: {first_hb or 'N/A'} - Last: {last_hb or 'N/A'} ## Performance Metrics - Average latency: {avg_latency or 'N/A'} ms - Active hours: {active_hours}/{hours} ## Artifacts Created {chr(10).join([f"- {count} {atype} ({size or 0} bytes)" for count, atype, size in artifacts]) if artifacts else "- None recorded"} ## Recommendations {""" + self._generate_recommendations(hb_count, avg_latency, uptime_pct) return report def _generate_recommendations(self, hb_count, avg_latency, uptime_pct) -> str: """Generate actionable recommendations""" recs = [] if not hb_count or hb_count == 0: recs.append("- ⚠️ No heartbeats detected - check Timmy client connectivity") elif hb_count < 12: # Less than one per hour on average recs.append("- Consider reducing heartbeat interval to 3 minutes for better visibility") if avg_latency and avg_latency > 500: recs.append(f"- High latency detected ({avg_latency:.0f}ms) - investigate network or MLX load") if uptime_pct < 80: recs.append(f"- Low uptime ({uptime_pct:.1f}%) - check relay stability or client errors") if not recs: recs.append("- ✓ System operating within normal parameters") recs.append("- Consider adding more artifact types for richer telemetry") return "\n".join(recs) async def main(): monitor = TimmyMonitor() try: await monitor.listen() except KeyboardInterrupt: print("\n[Monitor] Shutting down gracefully...") print(monitor.generate_report()) if __name__ == "__main__": asyncio.run(main())