Files
timmy-home/uni-wizard/v3/hermes_bridge.py
Allegro 31026ddcc1 [#76-v4] Final Uni-Wizard Architecture — Production Integration
Complete four-pass evolution to production-ready architecture:

**Pass 1 → Foundation:**
- Tool registry, basic harness, 19 tools
- VPS provisioning, Syncthing mesh
- Health daemon, systemd services

**Pass 2 → Three-House Canon:**
- Timmy (Sovereign), Ezra (Archivist), Bezalel (Artificer)
- Provenance tracking, artifact-flow discipline
- House-aware policy enforcement

**Pass 3 → Self-Improvement:**
- Pattern database with SQLite backend
- Adaptive policies (auto-adjust thresholds)
- Predictive execution (success prediction)
- Hermes bridge for shortest-loop telemetry
- Learning velocity tracking

**Pass 4 → Production Integration:**
- Unified API: `from uni_wizard import Harness, House, Mode`
- Three modes: SIMPLE / INTELLIGENT / SOVEREIGN
- Circuit breaker pattern for fault tolerance
- Async/concurrent execution support
- Production hardening (timeouts, retries)

**Allegro Lane Definition:**
- Narrowed to: Gitea integration, Hermes bridge, redundancy/failover
- Provides: Cloud connectivity, telemetry streaming, issue routing
- Does NOT: Make sovereign decisions, authenticate as Timmy

**Files:**
- v3/: Intelligence engine, adaptive harness, Hermes bridge
- v4/: Unified API, production harness, final architecture

Total: ~25KB architecture documentation + production code
2026-03-30 16:39:42 +00:00

394 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Hermes Telemetry Bridge v3 — Shortest Loop Integration
Streams telemetry from Hermes harness directly into Timmy's intelligence.
Design principle: Hermes session data → Timmy context in <100ms
"""
import json
import sqlite3
import time
from pathlib import Path
from typing import Dict, List, Optional, Generator
from dataclasses import dataclass
from datetime import datetime
import threading
import queue
@dataclass
class HermesSessionEvent:
"""Normalized event from Hermes session"""
session_id: str
timestamp: float
event_type: str # tool_call, message, completion
tool_name: Optional[str]
success: Optional[bool]
latency_ms: float
model: str
provider: str
token_count: int
error: Optional[str]
def to_dict(self):
return {
"session_id": self.session_id,
"timestamp": self.timestamp,
"event_type": self.event_type,
"tool_name": self.tool_name,
"success": self.success,
"latency_ms": self.latency_ms,
"model": self.model,
"provider": self.provider,
"token_count": self.token_count,
"error": self.error
}
class HermesStateReader:
"""
Reads from Hermes state database.
Hermes stores sessions in ~/.hermes/state.db
Schema: sessions(id, session_id, model, source, started_at, messages, tool_calls)
"""
def __init__(self, db_path: Path = None):
self.db_path = db_path or Path.home() / ".hermes" / "state.db"
self.last_read_id = 0
def is_available(self) -> bool:
"""Check if Hermes database is accessible"""
return self.db_path.exists()
def get_recent_sessions(self, limit: int = 10) -> List[Dict]:
"""Get recent sessions from Hermes"""
if not self.is_available():
return []
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute("""
SELECT id, session_id, model, source, started_at,
message_count, tool_call_count
FROM sessions
ORDER BY started_at DESC
LIMIT ?
""", (limit,)).fetchall()
conn.close()
return [dict(row) for row in rows]
except Exception as e:
print(f"Error reading Hermes state: {e}")
return []
def get_session_details(self, session_id: str) -> Optional[Dict]:
"""Get full session details including messages"""
if not self.is_available():
return None
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
# Get session
session = conn.execute("""
SELECT * FROM sessions WHERE session_id = ?
""", (session_id,)).fetchone()
if not session:
conn.close()
return None
# Get messages
messages = conn.execute("""
SELECT * FROM messages WHERE session_id = ?
ORDER BY timestamp
""", (session_id,)).fetchall()
# Get tool calls
tool_calls = conn.execute("""
SELECT * FROM tool_calls WHERE session_id = ?
ORDER BY timestamp
""", (session_id,)).fetchall()
conn.close()
return {
"session": dict(session),
"messages": [dict(m) for m in messages],
"tool_calls": [dict(t) for t in tool_calls]
}
except Exception as e:
print(f"Error reading session details: {e}")
return None
def stream_new_events(self, poll_interval: float = 1.0) -> Generator[HermesSessionEvent, None, None]:
"""
Stream new events from Hermes as they occur.
This is the SHORTEST LOOP - real-time telemetry ingestion.
"""
while True:
if not self.is_available():
time.sleep(poll_interval)
continue
try:
conn = sqlite3.connect(str(self.db_path))
conn.row_factory = sqlite3.Row
# Get new tool calls since last read
rows = conn.execute("""
SELECT tc.*, s.model, s.source
FROM tool_calls tc
JOIN sessions s ON tc.session_id = s.session_id
WHERE tc.id > ?
ORDER BY tc.id
""", (self.last_read_id,)).fetchall()
for row in rows:
row_dict = dict(row)
self.last_read_id = max(self.last_read_id, row_dict.get("id", 0))
yield HermesSessionEvent(
session_id=row_dict.get("session_id", "unknown"),
timestamp=row_dict.get("timestamp", time.time()),
event_type="tool_call",
tool_name=row_dict.get("tool_name"),
success=row_dict.get("error") is None,
latency_ms=row_dict.get("execution_time_ms", 0),
model=row_dict.get("model", "unknown"),
provider=row_dict.get("source", "unknown"),
token_count=row_dict.get("token_count", 0),
error=row_dict.get("error")
)
conn.close()
except Exception as e:
print(f"Error streaming events: {e}")
time.sleep(poll_interval)
class TelemetryStreamProcessor:
"""
Processes Hermes telemetry stream into Timmy's intelligence.
Converts Hermes events into intelligence engine records.
"""
def __init__(self, intelligence_engine):
self.intelligence = intelligence_engine
self.event_queue = queue.Queue()
self.processing_thread = None
self.running = False
# Metrics
self.events_processed = 0
self.events_dropped = 0
self.avg_processing_time_ms = 0
def start(self, hermes_reader: HermesStateReader):
"""Start processing stream in background"""
self.running = True
self.processing_thread = threading.Thread(
target=self._process_stream,
args=(hermes_reader,),
daemon=True
)
self.processing_thread.start()
print(f"Telemetry processor started (PID: {self.processing_thread.ident})")
def stop(self):
"""Stop processing"""
self.running = False
if self.processing_thread:
self.processing_thread.join(timeout=5)
def _process_stream(self, hermes_reader: HermesStateReader):
"""Background thread: consume Hermes events"""
for event in hermes_reader.stream_new_events(poll_interval=1.0):
if not self.running:
break
start = time.time()
try:
# Convert to intelligence record
record = self._convert_event(event)
# Record in intelligence database
self.intelligence.db.record_execution(record)
self.events_processed += 1
# Update avg processing time
proc_time = (time.time() - start) * 1000
self.avg_processing_time_ms = (
(self.avg_processing_time_ms * (self.events_processed - 1) + proc_time)
/ self.events_processed
)
except Exception as e:
self.events_dropped += 1
print(f"Error processing event: {e}")
def _convert_event(self, event: HermesSessionEvent) -> Dict:
"""Convert Hermes event to intelligence record"""
# Map Hermes tool to uni-wizard tool
tool_mapping = {
"terminal": "system_shell",
"file_read": "file_read",
"file_write": "file_write",
"search_files": "file_search",
"web_search": "web_search",
"delegate_task": "delegate",
"execute_code": "code_execute"
}
tool = tool_mapping.get(event.tool_name, event.tool_name or "unknown")
# Determine house based on context
# In real implementation, this would come from session metadata
house = "timmy" # Default
if "ezra" in event.session_id.lower():
house = "ezra"
elif "bezalel" in event.session_id.lower():
house = "bezalel"
return {
"tool": tool,
"house": house,
"model": event.model,
"task_type": self._infer_task_type(tool),
"success": event.success,
"latency_ms": event.latency_ms,
"confidence": 0.8 if event.success else 0.2,
"tokens_in": event.token_count,
"error_type": "execution_error" if event.error else None
}
def _infer_task_type(self, tool: str) -> str:
"""Infer task type from tool name"""
if any(kw in tool for kw in ["read", "get", "list", "status", "info"]):
return "read"
if any(kw in tool for kw in ["write", "create", "commit", "push"]):
return "build"
if any(kw in tool for kw in ["test", "check", "verify"]):
return "test"
if any(kw in tool for kw in ["search", "analyze"]):
return "synthesize"
return "general"
def get_stats(self) -> Dict:
"""Get processing statistics"""
return {
"events_processed": self.events_processed,
"events_dropped": self.events_dropped,
"avg_processing_time_ms": round(self.avg_processing_time_ms, 2),
"queue_depth": self.event_queue.qsize(),
"running": self.running
}
class ShortestLoopIntegrator:
"""
One-stop integration: Connect Hermes → Timmy Intelligence
Usage:
integrator = ShortestLoopIntegrator(intelligence_engine)
integrator.start()
# Now all Hermes telemetry flows into Timmy's intelligence
"""
def __init__(self, intelligence_engine, hermes_db_path: Path = None):
self.intelligence = intelligence_engine
self.hermes_reader = HermesStateReader(hermes_db_path)
self.processor = TelemetryStreamProcessor(intelligence_engine)
def start(self):
"""Start the shortest-loop integration"""
if not self.hermes_reader.is_available():
print("⚠️ Hermes database not found. Shortest loop disabled.")
return False
self.processor.start(self.hermes_reader)
print("✅ Shortest loop active: Hermes → Timmy Intelligence")
return True
def stop(self):
"""Stop the integration"""
self.processor.stop()
print("⏹️ Shortest loop stopped")
def get_status(self) -> Dict:
"""Get integration status"""
return {
"hermes_available": self.hermes_reader.is_available(),
"stream_active": self.processor.running,
"processor_stats": self.processor.get_stats()
}
def sync_historical(self, days: int = 7) -> Dict:
"""
One-time sync of historical Hermes data.
Use this to bootstrap intelligence with past data.
"""
if not self.hermes_reader.is_available():
return {"error": "Hermes not available"}
sessions = self.hermes_reader.get_recent_sessions(limit=1000)
synced = 0
for session in sessions:
session_id = session.get("session_id")
details = self.hermes_reader.get_session_details(session_id)
if details:
count = self.intelligence.ingest_hermes_session({
"session_id": session_id,
"model": session.get("model"),
"messages": details.get("messages", []),
"started_at": session.get("started_at")
})
synced += count
return {
"sessions_synced": len(sessions),
"executions_synced": synced
}
if __name__ == "__main__":
print("=" * 60)
print("HERMES BRIDGE v3 — Shortest Loop Demo")
print("=" * 60)
# Check Hermes availability
reader = HermesStateReader()
print(f"\n🔍 Hermes Status:")
print(f" Database: {reader.db_path}")
print(f" Available: {reader.is_available()}")
if reader.is_available():
sessions = reader.get_recent_sessions(limit=5)
print(f"\n📊 Recent Sessions:")
for s in sessions:
print(f" - {s.get('session_id', 'unknown')[:16]}... "
f"({s.get('model', 'unknown')}) "
f"{s.get('tool_call_count', 0)} tools")
print("\n" + "=" * 60)