#!/usr/bin/env python3 """ Gemini Harness — Hermes/OpenClaw harness backed by Gemini 3.1 Pro A harness instance on Timmy's sovereign network, same pattern as Ezra, Bezalel, and Allegro. Timmy is sovereign; Gemini is a worker. Architecture: Timmy (sovereign) ├── Ezra (harness) ├── Bezalel (harness) ├── Allegro (harness) └── Gemini (harness — this module) Features: - Text generation, multimodal (image/video), code generation - Streaming responses - Context caching for project context - Model fallback: 3.1 Pro → 3 Pro → Flash - Latency, token, and cost telemetry - Hermes WebSocket registration - HTTP endpoint for network access Usage: # As a standalone harness server: python -m nexus.gemini_harness --serve # Or imported: from nexus.gemini_harness import GeminiHarness harness = GeminiHarness() response = harness.generate("Hello Timmy") print(response.text) Environment Variables: GOOGLE_API_KEY — Gemini API key (from aistudio.google.com) HERMES_WS_URL — Hermes WebSocket URL (default: ws://localhost:8000/ws) GEMINI_MODEL — Override default model """ from __future__ import annotations import asyncio import json import logging import os import time import uuid from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, AsyncIterator, Iterator, Optional, Union import requests log = logging.getLogger("gemini") logging.basicConfig( level=logging.INFO, format="%(asctime)s [gemini] %(message)s", datefmt="%H:%M:%S", ) # ═══════════════════════════════════════════════════════════════════════════ # MODEL CONFIGURATION # ═══════════════════════════════════════════════════════════════════════════ # Model fallback chain: primary → secondary → tertiary GEMINI_MODEL_PRIMARY = "gemini-2.5-pro-preview-03-25" GEMINI_MODEL_SECONDARY = "gemini-2.0-pro" GEMINI_MODEL_TERTIARY = "gemini-2.0-flash" MODEL_FALLBACK_CHAIN = [ GEMINI_MODEL_PRIMARY, GEMINI_MODEL_SECONDARY, GEMINI_MODEL_TERTIARY, ] # Gemini API (OpenAI-compatible endpoint for drop-in compatibility) GEMINI_OPENAI_COMPAT_BASE = ( "https://generativelanguage.googleapis.com/v1beta/openai" ) GEMINI_NATIVE_BASE = "https://generativelanguage.googleapis.com/v1beta" # Approximate cost per 1M tokens (USD) — used for cost logging only # Prices current as of April 2026; verify at ai.google.dev/gemini-api/docs/pricing COST_PER_1M_INPUT = { GEMINI_MODEL_PRIMARY: 3.50, GEMINI_MODEL_SECONDARY: 2.00, GEMINI_MODEL_TERTIARY: 0.10, } COST_PER_1M_OUTPUT = { GEMINI_MODEL_PRIMARY: 10.50, GEMINI_MODEL_SECONDARY: 8.00, GEMINI_MODEL_TERTIARY: 0.40, } DEFAULT_HERMES_WS_URL = os.environ.get("HERMES_WS_URL", "ws://localhost:8000/ws") HARNESS_ID = "gemini" HARNESS_NAME = "Gemini Harness" # ═══════════════════════════════════════════════════════════════════════════ # DATA CLASSES # ═══════════════════════════════════════════════════════════════════════════ @dataclass class GeminiResponse: """Response from a Gemini generate call.""" text: str = "" model: str = "" input_tokens: int = 0 output_tokens: int = 0 latency_ms: float = 0.0 cost_usd: float = 0.0 cached: bool = False error: Optional[str] = None timestamp: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) def to_dict(self) -> dict: return { "text": self.text, "model": self.model, "input_tokens": self.input_tokens, "output_tokens": self.output_tokens, "latency_ms": self.latency_ms, "cost_usd": self.cost_usd, "cached": self.cached, "error": self.error, "timestamp": self.timestamp, } @dataclass class ContextCache: """In-memory context cache for project context.""" cache_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) content: str = "" created_at: float = field(default_factory=time.time) hit_count: int = 0 ttl_seconds: float = 3600.0 # 1 hour default def is_valid(self) -> bool: return (time.time() - self.created_at) < self.ttl_seconds def touch(self): self.hit_count += 1 # ═══════════════════════════════════════════════════════════════════════════ # GEMINI HARNESS # ═══════════════════════════════════════════════════════════════════════════ class GeminiHarness: """ Gemini harness for Timmy's sovereign network. Acts as a Hermes/OpenClaw harness worker backed by the Gemini API. Registers itself on the network at startup; accepts text, code, and multimodal generation requests. All calls flow through the fallback chain (3.1 Pro → 3 Pro → Flash) and emit latency/token/cost telemetry to Hermes. """ def __init__( self, api_key: Optional[str] = None, model: Optional[str] = None, hermes_ws_url: str = DEFAULT_HERMES_WS_URL, context_ttl: float = 3600.0, ): self.api_key = api_key or os.environ.get("GOOGLE_API_KEY", "") self.model = model or os.environ.get("GEMINI_MODEL", GEMINI_MODEL_PRIMARY) self.hermes_ws_url = hermes_ws_url self.context_ttl = context_ttl # Context cache (project context stored here to avoid re-sending) self._context_cache: Optional[ContextCache] = None # Session bookkeeping self.session_id = str(uuid.uuid4())[:8] self.request_count = 0 self.total_input_tokens = 0 self.total_output_tokens = 0 self.total_cost_usd = 0.0 # WebSocket connection (lazy — created on first telemetry send) self._ws = None self._ws_connected = False if not self.api_key: log.warning( "GOOGLE_API_KEY not set — calls will fail. " "Set it via environment variable or pass api_key=." ) # ═══ LIFECYCLE ═══════════════════════════════════════════════════════ async def start(self): """Register harness on the network via Hermes WebSocket.""" log.info("=" * 50) log.info(f"{HARNESS_NAME} — STARTING") log.info(f" Session: {self.session_id}") log.info(f" Model: {self.model}") log.info(f" Hermes: {self.hermes_ws_url}") log.info("=" * 50) await self._connect_hermes() await self._send_telemetry({ "type": "harness_register", "harness_id": HARNESS_ID, "session_id": self.session_id, "model": self.model, "fallback_chain": MODEL_FALLBACK_CHAIN, "capabilities": ["text", "code", "multimodal", "streaming"], }) log.info("Harness registered on network") async def stop(self): """Deregister and disconnect.""" await self._send_telemetry({ "type": "harness_deregister", "harness_id": HARNESS_ID, "session_id": self.session_id, "stats": self._session_stats(), }) if self._ws: try: await self._ws.close() except Exception: pass self._ws_connected = False log.info(f"{HARNESS_NAME} stopped. {self._session_stats()}") # ═══ CORE GENERATION ═════════════════════════════════════════════════ def generate( self, prompt: Union[str, list[dict]], *, system: Optional[str] = None, use_cache: bool = True, stream: bool = False, max_tokens: Optional[int] = None, temperature: Optional[float] = None, ) -> GeminiResponse: """ Generate a response from Gemini. Tries the model fallback chain: primary → secondary → tertiary. Injects cached context if available and use_cache=True. Args: prompt: String prompt or list of message dicts (OpenAI-style: [{"role": "user", "content": "..."}]) system: Optional system instruction use_cache: Prepend cached project context if set stream: Return streaming response (prints to stdout) max_tokens: Override default max output tokens temperature: Sampling temperature (0.0–2.0) Returns: GeminiResponse with text, token counts, latency, cost """ if not self.api_key: return GeminiResponse(error="GOOGLE_API_KEY not set") messages = self._build_messages(prompt, system=system, use_cache=use_cache) for model in MODEL_FALLBACK_CHAIN: response = self._call_api( model=model, messages=messages, stream=stream, max_tokens=max_tokens, temperature=temperature, ) if response.error is None: self._record(response) return response log.warning(f"Model {model} failed: {response.error} — trying next") # All models failed final = GeminiResponse(error="All models in fallback chain failed") self._record(final) return final def generate_code( self, task: str, language: str = "python", context: Optional[str] = None, ) -> GeminiResponse: """ Specialized code generation call. Args: task: Natural language description of what to code language: Target programming language context: Optional code context (existing code, interfaces, etc.) """ system = ( f"You are an expert {language} programmer. " "Produce clean, well-structured code. " "Return only the code block, no explanation unless asked." ) if context: prompt = f"Context:\n```{language}\n{context}\n```\n\nTask: {task}" else: prompt = f"Task: {task}" return self.generate(prompt, system=system) def generate_multimodal( self, text: str, images: Optional[list[dict]] = None, system: Optional[str] = None, ) -> GeminiResponse: """ Multimodal generation with text + images. Args: text: Text prompt images: List of image dicts: [{"type": "base64", "data": "...", "mime": "image/png"}] or [{"type": "url", "url": "..."}] system: Optional system instruction """ # Build content parts parts: list[dict] = [{"type": "text", "text": text}] if images: for img in images: if img.get("type") == "base64": parts.append({ "type": "image_url", "image_url": { "url": f"data:{img.get('mime', 'image/png')};base64,{img['data']}" }, }) elif img.get("type") == "url": parts.append({ "type": "image_url", "image_url": {"url": img["url"]}, }) messages = [{"role": "user", "content": parts}] if system: messages = [{"role": "system", "content": system}] + messages for model in MODEL_FALLBACK_CHAIN: response = self._call_api(model=model, messages=messages) if response.error is None: self._record(response) return response log.warning(f"Multimodal: model {model} failed: {response.error}") return GeminiResponse(error="All models failed for multimodal request") def stream_generate( self, prompt: Union[str, list[dict]], system: Optional[str] = None, use_cache: bool = True, ) -> Iterator[str]: """ Stream text chunks from Gemini. Yields string chunks as they arrive. Logs final telemetry when done. Usage: for chunk in harness.stream_generate("Tell me about Timmy"): print(chunk, end="", flush=True) """ messages = self._build_messages(prompt, system=system, use_cache=use_cache) for model in MODEL_FALLBACK_CHAIN: try: yield from self._stream_api(model=model, messages=messages) return except Exception as e: log.warning(f"Stream: model {model} failed: {e}") log.error("Stream: all models in fallback chain failed") # ═══ CONTEXT CACHING ═════════════════════════════════════════════════ def set_context(self, content: str, ttl_seconds: float = 3600.0): """ Cache project context to prepend on future calls. Args: content: Context text (project docs, code, instructions) ttl_seconds: Cache TTL (default: 1 hour) """ self._context_cache = ContextCache( content=content, ttl_seconds=ttl_seconds, ) log.info( f"Context cached ({len(content)} chars, " f"TTL={ttl_seconds}s, id={self._context_cache.cache_id})" ) def clear_context(self): """Clear the cached project context.""" self._context_cache = None log.info("Context cache cleared") def context_status(self) -> dict: """Return cache status info.""" if not self._context_cache: return {"cached": False} return { "cached": True, "cache_id": self._context_cache.cache_id, "valid": self._context_cache.is_valid(), "hit_count": self._context_cache.hit_count, "age_seconds": time.time() - self._context_cache.created_at, "content_length": len(self._context_cache.content), } # ═══ INTERNAL: API CALLS ═════════════════════════════════════════════ def _call_api( self, model: str, messages: list[dict], stream: bool = False, max_tokens: Optional[int] = None, temperature: Optional[float] = None, ) -> GeminiResponse: """Make a single (non-streaming) call to the Gemini OpenAI-compat API.""" url = f"{GEMINI_OPENAI_COMPAT_BASE}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload: dict[str, Any] = { "model": model, "messages": messages, "stream": False, } if max_tokens is not None: payload["max_tokens"] = max_tokens if temperature is not None: payload["temperature"] = temperature t0 = time.time() try: r = requests.post(url, json=payload, headers=headers, timeout=120) latency_ms = (time.time() - t0) * 1000 if r.status_code != 200: return GeminiResponse( model=model, latency_ms=latency_ms, error=f"HTTP {r.status_code}: {r.text[:200]}", ) data = r.json() choice = data.get("choices", [{}])[0] text = choice.get("message", {}).get("content", "") usage = data.get("usage", {}) input_tokens = usage.get("prompt_tokens", 0) output_tokens = usage.get("completion_tokens", 0) cost = self._estimate_cost(model, input_tokens, output_tokens) return GeminiResponse( text=text, model=model, input_tokens=input_tokens, output_tokens=output_tokens, latency_ms=latency_ms, cost_usd=cost, ) except requests.Timeout: return GeminiResponse( model=model, latency_ms=(time.time() - t0) * 1000, error="Request timed out (120s)", ) except Exception as e: return GeminiResponse( model=model, latency_ms=(time.time() - t0) * 1000, error=str(e), ) def _stream_api( self, model: str, messages: list[dict], max_tokens: Optional[int] = None, temperature: Optional[float] = None, ) -> Iterator[str]: """Stream tokens from the Gemini OpenAI-compat API.""" url = f"{GEMINI_OPENAI_COMPAT_BASE}/chat/completions" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload: dict[str, Any] = { "model": model, "messages": messages, "stream": True, } if max_tokens is not None: payload["max_tokens"] = max_tokens if temperature is not None: payload["temperature"] = temperature t0 = time.time() input_tokens = 0 output_tokens = 0 with requests.post( url, json=payload, headers=headers, stream=True, timeout=120 ) as r: r.raise_for_status() for raw_line in r.iter_lines(): if not raw_line: continue line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line if not line.startswith("data: "): continue payload_str = line[6:] if payload_str.strip() == "[DONE]": break try: chunk = json.loads(payload_str) delta = chunk.get("choices", [{}])[0].get("delta", {}) content = delta.get("content", "") if content: output_tokens += 1 # rough estimate yield content # Capture usage if present in final chunk usage = chunk.get("usage", {}) if usage: input_tokens = usage.get("prompt_tokens", input_tokens) output_tokens = usage.get("completion_tokens", output_tokens) except json.JSONDecodeError: pass latency_ms = (time.time() - t0) * 1000 cost = self._estimate_cost(model, input_tokens, output_tokens) resp = GeminiResponse( model=model, input_tokens=input_tokens, output_tokens=output_tokens, latency_ms=latency_ms, cost_usd=cost, ) self._record(resp) # ═══ INTERNAL: HELPERS ═══════════════════════════════════════════════ def _build_messages( self, prompt: Union[str, list[dict]], system: Optional[str] = None, use_cache: bool = True, ) -> list[dict]: """Build the messages list, injecting cached context if applicable.""" messages: list[dict] = [] # System instruction if system: messages.append({"role": "system", "content": system}) # Cached context prepended as assistant memory if use_cache and self._context_cache and self._context_cache.is_valid(): self._context_cache.touch() messages.append({ "role": "system", "content": f"[Project Context]\n{self._context_cache.content}", }) # User message if isinstance(prompt, str): messages.append({"role": "user", "content": prompt}) else: messages.extend(prompt) return messages @staticmethod def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float: """Estimate USD cost from token counts.""" in_rate = COST_PER_1M_INPUT.get(model, 3.50) out_rate = COST_PER_1M_OUTPUT.get(model, 10.50) return (input_tokens * in_rate + output_tokens * out_rate) / 1_000_000 def _record(self, response: GeminiResponse): """Update session stats and emit telemetry for a completed response.""" self.request_count += 1 self.total_input_tokens += response.input_tokens self.total_output_tokens += response.output_tokens self.total_cost_usd += response.cost_usd log.info( f"[{response.model}] {response.latency_ms:.0f}ms | " f"in={response.input_tokens} out={response.output_tokens} | " f"${response.cost_usd:.6f}" ) # Fire-and-forget telemetry (don't block the caller) try: asyncio.get_event_loop().create_task( self._send_telemetry({ "type": "gemini_response", "harness_id": HARNESS_ID, "session_id": self.session_id, "model": response.model, "latency_ms": response.latency_ms, "input_tokens": response.input_tokens, "output_tokens": response.output_tokens, "cost_usd": response.cost_usd, "cached": response.cached, "error": response.error, }) ) except RuntimeError: # No event loop running (sync context) — skip async telemetry pass def _session_stats(self) -> dict: return { "session_id": self.session_id, "request_count": self.request_count, "total_input_tokens": self.total_input_tokens, "total_output_tokens": self.total_output_tokens, "total_cost_usd": round(self.total_cost_usd, 6), } # ═══ HERMES WEBSOCKET ════════════════════════════════════════════════ async def _connect_hermes(self): """Connect to Hermes WebSocket for telemetry.""" try: import websockets # type: ignore self._ws = await websockets.connect(self.hermes_ws_url) self._ws_connected = True log.info(f"Connected to Hermes: {self.hermes_ws_url}") except Exception as e: log.warning(f"Hermes connection failed (telemetry disabled): {e}") self._ws_connected = False async def _send_telemetry(self, data: dict): """Send a telemetry event to Hermes.""" if not self._ws_connected or not self._ws: return try: await self._ws.send(json.dumps(data)) except Exception as e: log.warning(f"Telemetry send failed: {e}") self._ws_connected = False # ═══ SOVEREIGN ORCHESTRATION REGISTRATION ════════════════════════════ def register_in_orchestration( self, orchestration_url: str = "http://localhost:8000/api/v1/workers/register", ) -> bool: """ Register this harness as an available worker in sovereign orchestration. Sends a POST to the orchestration endpoint with harness metadata. Returns True on success. """ payload = { "worker_id": HARNESS_ID, "name": HARNESS_NAME, "session_id": self.session_id, "model": self.model, "fallback_chain": MODEL_FALLBACK_CHAIN, "capabilities": ["text", "code", "multimodal", "streaming"], "transport": "http+ws", "registered_at": datetime.now(timezone.utc).isoformat(), } try: r = requests.post(orchestration_url, json=payload, timeout=10) if r.status_code in (200, 201): log.info(f"Registered in orchestration: {orchestration_url}") return True log.warning( f"Orchestration registration returned {r.status_code}: {r.text[:100]}" ) return False except Exception as e: log.warning(f"Orchestration registration failed: {e}") return False # ═══════════════════════════════════════════════════════════════════════════ # HTTP SERVER — expose harness to the network # ═══════════════════════════════════════════════════════════════════════════ def create_app(harness: GeminiHarness): """ Create a minimal HTTP app that exposes the harness to the network. Endpoints: POST /generate — text/code generation POST /generate/stream — streaming text generation POST /generate/code — code generation GET /health — health check GET /status — session stats + cache status POST /context — set project context cache DELETE /context — clear context cache """ try: from http.server import BaseHTTPRequestHandler, HTTPServer except ImportError: raise RuntimeError("http.server not available") class GeminiHandler(BaseHTTPRequestHandler): def log_message(self, fmt, *args): log.info(f"HTTP {fmt % args}") def _read_body(self) -> dict: length = int(self.headers.get("Content-Length", 0)) raw = self.rfile.read(length) if length else b"{}" return json.loads(raw) def _send_json(self, data: dict, status: int = 200): body = json.dumps(data).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): if self.path == "/health": self._send_json({"status": "ok", "harness": HARNESS_ID}) elif self.path == "/status": self._send_json({ **harness._session_stats(), "model": harness.model, "context": harness.context_status(), }) else: self._send_json({"error": "Not found"}, 404) def do_POST(self): body = self._read_body() if self.path == "/generate": prompt = body.get("prompt", "") system = body.get("system") use_cache = body.get("use_cache", True) response = harness.generate( prompt, system=system, use_cache=use_cache ) self._send_json(response.to_dict()) elif self.path == "/generate/code": task = body.get("task", "") language = body.get("language", "python") context = body.get("context") response = harness.generate_code(task, language=language, context=context) self._send_json(response.to_dict()) elif self.path == "/context": content = body.get("content", "") ttl = float(body.get("ttl_seconds", 3600.0)) harness.set_context(content, ttl_seconds=ttl) self._send_json({"status": "cached", **harness.context_status()}) else: self._send_json({"error": "Not found"}, 404) def do_DELETE(self): if self.path == "/context": harness.clear_context() self._send_json({"status": "cleared"}) else: self._send_json({"error": "Not found"}, 404) return HTTPServer, GeminiHandler # ═══════════════════════════════════════════════════════════════════════════ # CLI ENTRYPOINT # ═══════════════════════════════════════════════════════════════════════════ async def _async_start(harness: GeminiHarness): await harness.start() def main(): import argparse parser = argparse.ArgumentParser( description=f"{HARNESS_NAME} — Timmy's Gemini harness worker", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python -m nexus.gemini_harness "What is the meaning of sovereignty?" python -m nexus.gemini_harness --model gemini-2.0-flash "Quick test" python -m nexus.gemini_harness --serve --port 9300 python -m nexus.gemini_harness --code "Write a fizzbuzz in Python" Environment Variables: GOOGLE_API_KEY — required for all API calls HERMES_WS_URL — Hermes telemetry endpoint GEMINI_MODEL — override default model """, ) parser.add_argument( "prompt", nargs="?", default=None, help="Prompt to send (omit to use --serve mode)", ) parser.add_argument( "--model", default=None, help=f"Model to use (default: {GEMINI_MODEL_PRIMARY})", ) parser.add_argument( "--serve", action="store_true", help="Start HTTP server to expose harness on the network", ) parser.add_argument( "--port", type=int, default=9300, help="HTTP server port (default: 9300)", ) parser.add_argument( "--hermes-ws", default=DEFAULT_HERMES_WS_URL, help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})", ) parser.add_argument( "--code", metavar="TASK", help="Generate code for TASK instead of plain text", ) parser.add_argument( "--stream", action="store_true", help="Stream response chunks to stdout", ) args = parser.parse_args() harness = GeminiHarness( model=args.model, hermes_ws_url=args.hermes_ws, ) if args.serve: # Start harness registration then serve HTTP asyncio.run(_async_start(harness)) HTTPServer, GeminiHandler = create_app(harness) server = HTTPServer(("0.0.0.0", args.port), GeminiHandler) log.info(f"Serving on http://0.0.0.0:{args.port}") log.info("Endpoints: /generate /generate/code /health /status /context") try: server.serve_forever() except KeyboardInterrupt: log.info("Shutting down server") asyncio.run(harness.stop()) return if args.code: response = harness.generate_code(args.code) elif args.prompt: if args.stream: for chunk in harness.stream_generate(args.prompt): print(chunk, end="", flush=True) print() return else: response = harness.generate(args.prompt) else: parser.print_help() return if response.error: print(f"ERROR: {response.error}") else: print(response.text) print( f"\n[{response.model}] {response.latency_ms:.0f}ms | " f"tokens: {response.input_tokens}→{response.output_tokens} | " f"${response.cost_usd:.6f}", flush=True, ) if __name__ == "__main__": main()