Complete four-pass evolution to production-ready architecture: **Pass 1 → Foundation:** - Tool registry, basic harness, 19 tools - VPS provisioning, Syncthing mesh - Health daemon, systemd services **Pass 2 → Three-House Canon:** - Timmy (Sovereign), Ezra (Archivist), Bezalel (Artificer) - Provenance tracking, artifact-flow discipline - House-aware policy enforcement **Pass 3 → Self-Improvement:** - Pattern database with SQLite backend - Adaptive policies (auto-adjust thresholds) - Predictive execution (success prediction) - Hermes bridge for shortest-loop telemetry - Learning velocity tracking **Pass 4 → Production Integration:** - Unified API: `from uni_wizard import Harness, House, Mode` - Three modes: SIMPLE / INTELLIGENT / SOVEREIGN - Circuit breaker pattern for fault tolerance - Async/concurrent execution support - Production hardening (timeouts, retries) **Allegro Lane Definition:** - Narrowed to: Gitea integration, Hermes bridge, redundancy/failover - Provides: Cloud connectivity, telemetry streaming, issue routing - Does NOT: Make sovereign decisions, authenticate as Timmy **Files:** - v3/: Intelligence engine, adaptive harness, Hermes bridge - v4/: Unified API, production harness, final architecture Total: ~25KB architecture documentation + production code
394 lines
13 KiB
Python
394 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Hermes Telemetry Bridge v3 — Shortest Loop Integration
|
|
|
|
Streams telemetry from Hermes harness directly into Timmy's intelligence.
|
|
|
|
Design principle: Hermes session data → Timmy context in <100ms
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Generator
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
import threading
|
|
import queue
|
|
|
|
|
|
@dataclass
|
|
class HermesSessionEvent:
|
|
"""Normalized event from Hermes session"""
|
|
session_id: str
|
|
timestamp: float
|
|
event_type: str # tool_call, message, completion
|
|
tool_name: Optional[str]
|
|
success: Optional[bool]
|
|
latency_ms: float
|
|
model: str
|
|
provider: str
|
|
token_count: int
|
|
error: Optional[str]
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"session_id": self.session_id,
|
|
"timestamp": self.timestamp,
|
|
"event_type": self.event_type,
|
|
"tool_name": self.tool_name,
|
|
"success": self.success,
|
|
"latency_ms": self.latency_ms,
|
|
"model": self.model,
|
|
"provider": self.provider,
|
|
"token_count": self.token_count,
|
|
"error": self.error
|
|
}
|
|
|
|
|
|
class HermesStateReader:
|
|
"""
|
|
Reads from Hermes state database.
|
|
|
|
Hermes stores sessions in ~/.hermes/state.db
|
|
Schema: sessions(id, session_id, model, source, started_at, messages, tool_calls)
|
|
"""
|
|
|
|
def __init__(self, db_path: Path = None):
|
|
self.db_path = db_path or Path.home() / ".hermes" / "state.db"
|
|
self.last_read_id = 0
|
|
|
|
def is_available(self) -> bool:
|
|
"""Check if Hermes database is accessible"""
|
|
return self.db_path.exists()
|
|
|
|
def get_recent_sessions(self, limit: int = 10) -> List[Dict]:
|
|
"""Get recent sessions from Hermes"""
|
|
if not self.is_available():
|
|
return []
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
rows = conn.execute("""
|
|
SELECT id, session_id, model, source, started_at,
|
|
message_count, tool_call_count
|
|
FROM sessions
|
|
ORDER BY started_at DESC
|
|
LIMIT ?
|
|
""", (limit,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
return [dict(row) for row in rows]
|
|
|
|
except Exception as e:
|
|
print(f"Error reading Hermes state: {e}")
|
|
return []
|
|
|
|
def get_session_details(self, session_id: str) -> Optional[Dict]:
|
|
"""Get full session details including messages"""
|
|
if not self.is_available():
|
|
return None
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# Get session
|
|
session = conn.execute("""
|
|
SELECT * FROM sessions WHERE session_id = ?
|
|
""", (session_id,)).fetchone()
|
|
|
|
if not session:
|
|
conn.close()
|
|
return None
|
|
|
|
# Get messages
|
|
messages = conn.execute("""
|
|
SELECT * FROM messages WHERE session_id = ?
|
|
ORDER BY timestamp
|
|
""", (session_id,)).fetchall()
|
|
|
|
# Get tool calls
|
|
tool_calls = conn.execute("""
|
|
SELECT * FROM tool_calls WHERE session_id = ?
|
|
ORDER BY timestamp
|
|
""", (session_id,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
"session": dict(session),
|
|
"messages": [dict(m) for m in messages],
|
|
"tool_calls": [dict(t) for t in tool_calls]
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error reading session details: {e}")
|
|
return None
|
|
|
|
def stream_new_events(self, poll_interval: float = 1.0) -> Generator[HermesSessionEvent, None, None]:
|
|
"""
|
|
Stream new events from Hermes as they occur.
|
|
|
|
This is the SHORTEST LOOP - real-time telemetry ingestion.
|
|
"""
|
|
while True:
|
|
if not self.is_available():
|
|
time.sleep(poll_interval)
|
|
continue
|
|
|
|
try:
|
|
conn = sqlite3.connect(str(self.db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
# Get new tool calls since last read
|
|
rows = conn.execute("""
|
|
SELECT tc.*, s.model, s.source
|
|
FROM tool_calls tc
|
|
JOIN sessions s ON tc.session_id = s.session_id
|
|
WHERE tc.id > ?
|
|
ORDER BY tc.id
|
|
""", (self.last_read_id,)).fetchall()
|
|
|
|
for row in rows:
|
|
row_dict = dict(row)
|
|
self.last_read_id = max(self.last_read_id, row_dict.get("id", 0))
|
|
|
|
yield HermesSessionEvent(
|
|
session_id=row_dict.get("session_id", "unknown"),
|
|
timestamp=row_dict.get("timestamp", time.time()),
|
|
event_type="tool_call",
|
|
tool_name=row_dict.get("tool_name"),
|
|
success=row_dict.get("error") is None,
|
|
latency_ms=row_dict.get("execution_time_ms", 0),
|
|
model=row_dict.get("model", "unknown"),
|
|
provider=row_dict.get("source", "unknown"),
|
|
token_count=row_dict.get("token_count", 0),
|
|
error=row_dict.get("error")
|
|
)
|
|
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
print(f"Error streaming events: {e}")
|
|
|
|
time.sleep(poll_interval)
|
|
|
|
|
|
class TelemetryStreamProcessor:
|
|
"""
|
|
Processes Hermes telemetry stream into Timmy's intelligence.
|
|
|
|
Converts Hermes events into intelligence engine records.
|
|
"""
|
|
|
|
def __init__(self, intelligence_engine):
|
|
self.intelligence = intelligence_engine
|
|
self.event_queue = queue.Queue()
|
|
self.processing_thread = None
|
|
self.running = False
|
|
|
|
# Metrics
|
|
self.events_processed = 0
|
|
self.events_dropped = 0
|
|
self.avg_processing_time_ms = 0
|
|
|
|
def start(self, hermes_reader: HermesStateReader):
|
|
"""Start processing stream in background"""
|
|
self.running = True
|
|
self.processing_thread = threading.Thread(
|
|
target=self._process_stream,
|
|
args=(hermes_reader,),
|
|
daemon=True
|
|
)
|
|
self.processing_thread.start()
|
|
print(f"Telemetry processor started (PID: {self.processing_thread.ident})")
|
|
|
|
def stop(self):
|
|
"""Stop processing"""
|
|
self.running = False
|
|
if self.processing_thread:
|
|
self.processing_thread.join(timeout=5)
|
|
|
|
def _process_stream(self, hermes_reader: HermesStateReader):
|
|
"""Background thread: consume Hermes events"""
|
|
for event in hermes_reader.stream_new_events(poll_interval=1.0):
|
|
if not self.running:
|
|
break
|
|
|
|
start = time.time()
|
|
|
|
try:
|
|
# Convert to intelligence record
|
|
record = self._convert_event(event)
|
|
|
|
# Record in intelligence database
|
|
self.intelligence.db.record_execution(record)
|
|
|
|
self.events_processed += 1
|
|
|
|
# Update avg processing time
|
|
proc_time = (time.time() - start) * 1000
|
|
self.avg_processing_time_ms = (
|
|
(self.avg_processing_time_ms * (self.events_processed - 1) + proc_time)
|
|
/ self.events_processed
|
|
)
|
|
|
|
except Exception as e:
|
|
self.events_dropped += 1
|
|
print(f"Error processing event: {e}")
|
|
|
|
def _convert_event(self, event: HermesSessionEvent) -> Dict:
|
|
"""Convert Hermes event to intelligence record"""
|
|
|
|
# Map Hermes tool to uni-wizard tool
|
|
tool_mapping = {
|
|
"terminal": "system_shell",
|
|
"file_read": "file_read",
|
|
"file_write": "file_write",
|
|
"search_files": "file_search",
|
|
"web_search": "web_search",
|
|
"delegate_task": "delegate",
|
|
"execute_code": "code_execute"
|
|
}
|
|
|
|
tool = tool_mapping.get(event.tool_name, event.tool_name or "unknown")
|
|
|
|
# Determine house based on context
|
|
# In real implementation, this would come from session metadata
|
|
house = "timmy" # Default
|
|
if "ezra" in event.session_id.lower():
|
|
house = "ezra"
|
|
elif "bezalel" in event.session_id.lower():
|
|
house = "bezalel"
|
|
|
|
return {
|
|
"tool": tool,
|
|
"house": house,
|
|
"model": event.model,
|
|
"task_type": self._infer_task_type(tool),
|
|
"success": event.success,
|
|
"latency_ms": event.latency_ms,
|
|
"confidence": 0.8 if event.success else 0.2,
|
|
"tokens_in": event.token_count,
|
|
"error_type": "execution_error" if event.error else None
|
|
}
|
|
|
|
def _infer_task_type(self, tool: str) -> str:
|
|
"""Infer task type from tool name"""
|
|
if any(kw in tool for kw in ["read", "get", "list", "status", "info"]):
|
|
return "read"
|
|
if any(kw in tool for kw in ["write", "create", "commit", "push"]):
|
|
return "build"
|
|
if any(kw in tool for kw in ["test", "check", "verify"]):
|
|
return "test"
|
|
if any(kw in tool for kw in ["search", "analyze"]):
|
|
return "synthesize"
|
|
return "general"
|
|
|
|
def get_stats(self) -> Dict:
|
|
"""Get processing statistics"""
|
|
return {
|
|
"events_processed": self.events_processed,
|
|
"events_dropped": self.events_dropped,
|
|
"avg_processing_time_ms": round(self.avg_processing_time_ms, 2),
|
|
"queue_depth": self.event_queue.qsize(),
|
|
"running": self.running
|
|
}
|
|
|
|
|
|
class ShortestLoopIntegrator:
|
|
"""
|
|
One-stop integration: Connect Hermes → Timmy Intelligence
|
|
|
|
Usage:
|
|
integrator = ShortestLoopIntegrator(intelligence_engine)
|
|
integrator.start()
|
|
# Now all Hermes telemetry flows into Timmy's intelligence
|
|
"""
|
|
|
|
def __init__(self, intelligence_engine, hermes_db_path: Path = None):
|
|
self.intelligence = intelligence_engine
|
|
self.hermes_reader = HermesStateReader(hermes_db_path)
|
|
self.processor = TelemetryStreamProcessor(intelligence_engine)
|
|
|
|
def start(self):
|
|
"""Start the shortest-loop integration"""
|
|
if not self.hermes_reader.is_available():
|
|
print("⚠️ Hermes database not found. Shortest loop disabled.")
|
|
return False
|
|
|
|
self.processor.start(self.hermes_reader)
|
|
print("✅ Shortest loop active: Hermes → Timmy Intelligence")
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Stop the integration"""
|
|
self.processor.stop()
|
|
print("⏹️ Shortest loop stopped")
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Get integration status"""
|
|
return {
|
|
"hermes_available": self.hermes_reader.is_available(),
|
|
"stream_active": self.processor.running,
|
|
"processor_stats": self.processor.get_stats()
|
|
}
|
|
|
|
def sync_historical(self, days: int = 7) -> Dict:
|
|
"""
|
|
One-time sync of historical Hermes data.
|
|
|
|
Use this to bootstrap intelligence with past data.
|
|
"""
|
|
if not self.hermes_reader.is_available():
|
|
return {"error": "Hermes not available"}
|
|
|
|
sessions = self.hermes_reader.get_recent_sessions(limit=1000)
|
|
|
|
synced = 0
|
|
for session in sessions:
|
|
session_id = session.get("session_id")
|
|
details = self.hermes_reader.get_session_details(session_id)
|
|
|
|
if details:
|
|
count = self.intelligence.ingest_hermes_session({
|
|
"session_id": session_id,
|
|
"model": session.get("model"),
|
|
"messages": details.get("messages", []),
|
|
"started_at": session.get("started_at")
|
|
})
|
|
synced += count
|
|
|
|
return {
|
|
"sessions_synced": len(sessions),
|
|
"executions_synced": synced
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=" * 60)
|
|
print("HERMES BRIDGE v3 — Shortest Loop Demo")
|
|
print("=" * 60)
|
|
|
|
# Check Hermes availability
|
|
reader = HermesStateReader()
|
|
|
|
print(f"\n🔍 Hermes Status:")
|
|
print(f" Database: {reader.db_path}")
|
|
print(f" Available: {reader.is_available()}")
|
|
|
|
if reader.is_available():
|
|
sessions = reader.get_recent_sessions(limit=5)
|
|
print(f"\n📊 Recent Sessions:")
|
|
for s in sessions:
|
|
print(f" - {s.get('session_id', 'unknown')[:16]}... "
|
|
f"({s.get('model', 'unknown')}) "
|
|
f"{s.get('tool_call_count', 0)} tools")
|
|
|
|
print("\n" + "=" * 60)
|