diff --git a/nexus/allegro_harness.py b/nexus/allegro_harness.py new file mode 100644 index 00000000..b33ca686 --- /dev/null +++ b/nexus/allegro_harness.py @@ -0,0 +1,901 @@ +#!/usr/bin/env python3 +""" +Allegro Harness — Hermes/OpenClaw harness backed by Kimi K2 + +A harness instance on Timmy's sovereign network, same pattern as Ezra, +Bezalel, and Gemini. Timmy is sovereign; Kimi is a worker. + +Architecture: + Timmy (sovereign) + ├── Ezra (harness) + ├── Bezalel (harness) + ├── Allegro (harness — this module) + └── Gemini (harness) + +Features: +- Text generation, code generation +- Streaming responses +- Context caching for project context +- Model fallback: kimi-k2 → moonshot-v1-32k → moonshot-v1-8k +- Latency, token, and cost telemetry +- Hermes WebSocket registration +- HTTP endpoint for network access + +Usage: + # As a standalone harness server: + python -m nexus.allegro_harness --serve + + # Or imported: + from nexus.allegro_harness import AllegroHarness + harness = AllegroHarness() + response = harness.generate("Hello Timmy") + print(response.text) + +Environment Variables: + KIMI_API_KEY — Kimi/Moonshot API key (from platform.moonshot.cn) + HERMES_WS_URL — Hermes WebSocket URL (default: ws://localhost:8000/ws) + KIMI_MODEL — Override default model +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import time +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Iterator, Optional, Union + +import requests + +log = logging.getLogger("allegro") +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [allegro] %(message)s", + datefmt="%H:%M:%S", +) + +# ═══════════════════════════════════════════════════════════════════════════ +# MODEL CONFIGURATION +# ═══════════════════════════════════════════════════════════════════════════ + +# Model fallback chain: primary → secondary → tertiary +KIMI_MODEL_PRIMARY = "kimi-k2" +KIMI_MODEL_SECONDARY = "moonshot-v1-32k" +KIMI_MODEL_TERTIARY = "moonshot-v1-8k" +MODEL_FALLBACK_CHAIN = [ + KIMI_MODEL_PRIMARY, + KIMI_MODEL_SECONDARY, + KIMI_MODEL_TERTIARY, +] + +# Kimi/Moonshot API — OpenAI-compatible endpoint +KIMI_API_BASE = "https://api.moonshot.cn/v1" + +# Approximate cost per 1M tokens (USD) — used for cost logging only +# Prices current as of April 2026; verify at platform.moonshot.cn +COST_PER_1M_INPUT = { + KIMI_MODEL_PRIMARY: 0.60, + KIMI_MODEL_SECONDARY: 0.24, + KIMI_MODEL_TERTIARY: 0.12, +} +COST_PER_1M_OUTPUT = { + KIMI_MODEL_PRIMARY: 2.50, + KIMI_MODEL_SECONDARY: 0.24, + KIMI_MODEL_TERTIARY: 0.12, +} + +DEFAULT_HERMES_WS_URL = os.environ.get("HERMES_WS_URL", "ws://localhost:8000/ws") +HARNESS_ID = "allegro" +HARNESS_NAME = "Allegro Harness" + +# Default HTTP server port for the Allegro gateway +DEFAULT_PORT = 9400 + + +# ═══════════════════════════════════════════════════════════════════════════ +# DATA CLASSES +# ═══════════════════════════════════════════════════════════════════════════ + +@dataclass +class AllegroResponse: + """Response from an Allegro generate call.""" + text: str = "" + model: str = "" + input_tokens: int = 0 + output_tokens: int = 0 + latency_ms: float = 0.0 + cost_usd: float = 0.0 + cached: bool = False + error: Optional[str] = None + timestamp: str = field( + default_factory=lambda: datetime.now(timezone.utc).isoformat() + ) + + def to_dict(self) -> dict: + return { + "text": self.text, + "model": self.model, + "input_tokens": self.input_tokens, + "output_tokens": self.output_tokens, + "latency_ms": self.latency_ms, + "cost_usd": self.cost_usd, + "cached": self.cached, + "error": self.error, + "timestamp": self.timestamp, + } + + +@dataclass +class ContextCache: + """In-memory context cache for project context.""" + cache_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8]) + content: str = "" + created_at: float = field(default_factory=time.time) + hit_count: int = 0 + ttl_seconds: float = 3600.0 # 1 hour default + + def is_valid(self) -> bool: + return (time.time() - self.created_at) < self.ttl_seconds + + def touch(self): + self.hit_count += 1 + + +# ═══════════════════════════════════════════════════════════════════════════ +# ALLEGRO HARNESS +# ═══════════════════════════════════════════════════════════════════════════ + +class AllegroHarness: + """ + Allegro harness for Timmy's sovereign network. + + Acts as a Hermes/OpenClaw harness worker backed by the Kimi API. + Registers itself on the network at startup; accepts text and code + generation requests. + + All calls flow through the fallback chain (kimi-k2 → moonshot-v1-32k → + moonshot-v1-8k) and emit latency/token/cost telemetry to Hermes. + + Allegro is the top code performer in the fleet — optimized for bulk + code execution, PR generation, and BURN issue throughput. + """ + + def __init__( + self, + api_key: Optional[str] = None, + model: Optional[str] = None, + hermes_ws_url: str = DEFAULT_HERMES_WS_URL, + context_ttl: float = 3600.0, + ): + self.api_key = api_key or os.environ.get("KIMI_API_KEY", "") + self.model = model or os.environ.get("KIMI_MODEL", KIMI_MODEL_PRIMARY) + self.hermes_ws_url = hermes_ws_url + self.context_ttl = context_ttl + + # Context cache (project context stored here to avoid re-sending) + self._context_cache: Optional[ContextCache] = None + + # Session bookkeeping + self.session_id = str(uuid.uuid4())[:8] + self.request_count = 0 + self.total_input_tokens = 0 + self.total_output_tokens = 0 + self.total_cost_usd = 0.0 + + # WebSocket connection (lazy — created on first telemetry send) + self._ws = None + self._ws_connected = False + + if not self.api_key: + log.warning( + "KIMI_API_KEY not set — calls will fail. " + "Set it via environment variable or pass api_key=." + ) + + # ═══ LIFECYCLE ═══════════════════════════════════════════════════════ + + async def start(self): + """Register harness on the network via Hermes WebSocket.""" + log.info("=" * 50) + log.info(f"{HARNESS_NAME} — STARTING") + log.info(f" Session: {self.session_id}") + log.info(f" Model: {self.model}") + log.info(f" Hermes: {self.hermes_ws_url}") + log.info("=" * 50) + + await self._connect_hermes() + await self._send_telemetry({ + "type": "harness_register", + "harness_id": HARNESS_ID, + "session_id": self.session_id, + "model": self.model, + "fallback_chain": MODEL_FALLBACK_CHAIN, + "capabilities": ["text", "code", "streaming"], + }) + log.info("Harness registered on network") + + async def stop(self): + """Deregister and disconnect.""" + await self._send_telemetry({ + "type": "harness_deregister", + "harness_id": HARNESS_ID, + "session_id": self.session_id, + "stats": self._session_stats(), + }) + if self._ws: + try: + await self._ws.close() + except Exception: + pass + self._ws_connected = False + log.info(f"{HARNESS_NAME} stopped. {self._session_stats()}") + + # ═══ CORE GENERATION ═════════════════════════════════════════════════ + + def generate( + self, + prompt: Union[str, list[dict]], + *, + system: Optional[str] = None, + use_cache: bool = True, + stream: bool = False, + max_tokens: Optional[int] = None, + temperature: Optional[float] = None, + ) -> AllegroResponse: + """ + Generate a response from Kimi. + + Tries the model fallback chain: kimi-k2 → moonshot-v1-32k → moonshot-v1-8k. + Injects cached context if available and use_cache=True. + + Args: + prompt: String prompt or list of message dicts + (OpenAI-style: [{"role": "user", "content": "..."}]) + system: Optional system instruction + use_cache: Prepend cached project context if set + stream: Return streaming response (prints to stdout) + max_tokens: Override default max output tokens + temperature: Sampling temperature (0.0–1.0) + + Returns: + AllegroResponse with text, token counts, latency, cost + """ + if not self.api_key: + return AllegroResponse(error="KIMI_API_KEY not set") + + messages = self._build_messages(prompt, system=system, use_cache=use_cache) + + for model in MODEL_FALLBACK_CHAIN: + response = self._call_api( + model=model, + messages=messages, + max_tokens=max_tokens, + temperature=temperature, + ) + if response.error is None: + self._record(response) + return response + log.warning(f"Model {model} failed: {response.error} — trying next") + + # All models failed + final = AllegroResponse(error="All models in fallback chain failed") + self._record(final) + return final + + def generate_code( + self, + task: str, + language: str = "python", + context: Optional[str] = None, + ) -> AllegroResponse: + """ + Specialized code generation call. + + Args: + task: Natural language description of what to code + language: Target programming language + context: Optional code context (existing code, interfaces, etc.) + """ + system = ( + f"You are an expert {language} programmer. " + "Produce clean, well-structured code. " + "Return only the code block, no explanation unless asked." + ) + if context: + prompt = f"Context:\n```{language}\n{context}\n```\n\nTask: {task}" + else: + prompt = f"Task: {task}" + + return self.generate(prompt, system=system) + + def stream_generate( + self, + prompt: Union[str, list[dict]], + system: Optional[str] = None, + use_cache: bool = True, + ) -> Iterator[str]: + """ + Stream text chunks from Kimi. + + Yields string chunks as they arrive. Logs final telemetry when done. + + Usage: + for chunk in harness.stream_generate("Tell me about Timmy"): + print(chunk, end="", flush=True) + """ + messages = self._build_messages(prompt, system=system, use_cache=use_cache) + + for model in MODEL_FALLBACK_CHAIN: + try: + yield from self._stream_api(model=model, messages=messages) + return + except Exception as e: + log.warning(f"Stream: model {model} failed: {e}") + + log.error("Stream: all models in fallback chain failed") + + # ═══ HEALTH CHECK ═════════════════════════════════════════════════════ + + def health_check(self) -> dict: + """ + Verify Kimi API key validity with a minimal probe call. + + Returns a dict with 'ok' bool and optional 'error' string. + """ + if not self.api_key: + return {"ok": False, "error": "KIMI_API_KEY not set"} + + url = f"{KIMI_API_BASE}/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + payload = { + "model": KIMI_MODEL_TERTIARY, # cheapest model for the probe + "messages": [{"role": "user", "content": "ping"}], + "max_tokens": 1, + } + try: + r = requests.post(url, json=payload, headers=headers, timeout=15) + if r.status_code == 200: + log.info("Kimi API key valid — health check passed") + return {"ok": True, "model": KIMI_MODEL_TERTIARY} + elif r.status_code == 401: + log.error("Kimi API key invalid (401 Unauthorized)") + return {"ok": False, "error": f"Invalid API key (HTTP 401)"} + else: + return {"ok": False, "error": f"HTTP {r.status_code}: {r.text[:100]}"} + except requests.Timeout: + return {"ok": False, "error": "Health check timed out (15s)"} + except Exception as e: + return {"ok": False, "error": str(e)} + + # ═══ CONTEXT CACHING ═════════════════════════════════════════════════ + + def set_context(self, content: str, ttl_seconds: float = 3600.0): + """ + Cache project context to prepend on future calls. + + Args: + content: Context text (project docs, code, instructions) + ttl_seconds: Cache TTL (default: 1 hour) + """ + self._context_cache = ContextCache( + content=content, + ttl_seconds=ttl_seconds, + ) + log.info( + f"Context cached ({len(content)} chars, " + f"TTL={ttl_seconds}s, id={self._context_cache.cache_id})" + ) + + def clear_context(self): + """Clear the cached project context.""" + self._context_cache = None + log.info("Context cache cleared") + + def context_status(self) -> dict: + """Return cache status info.""" + if not self._context_cache: + return {"cached": False} + return { + "cached": True, + "cache_id": self._context_cache.cache_id, + "valid": self._context_cache.is_valid(), + "hit_count": self._context_cache.hit_count, + "age_seconds": time.time() - self._context_cache.created_at, + "content_length": len(self._context_cache.content), + } + + # ═══ INTERNAL: API CALLS ═════════════════════════════════════════════ + + def _call_api( + self, + model: str, + messages: list[dict], + max_tokens: Optional[int] = None, + temperature: Optional[float] = None, + ) -> AllegroResponse: + """Make a single (non-streaming) call to the Kimi OpenAI-compat API.""" + url = f"{KIMI_API_BASE}/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + payload: dict[str, Any] = { + "model": model, + "messages": messages, + "stream": False, + } + if max_tokens is not None: + payload["max_tokens"] = max_tokens + if temperature is not None: + payload["temperature"] = temperature + + t0 = time.time() + try: + r = requests.post(url, json=payload, headers=headers, timeout=120) + latency_ms = (time.time() - t0) * 1000 + + if r.status_code != 200: + return AllegroResponse( + model=model, + latency_ms=latency_ms, + error=f"HTTP {r.status_code}: {r.text[:200]}", + ) + + data = r.json() + choice = data.get("choices", [{}])[0] + text = choice.get("message", {}).get("content", "") + usage = data.get("usage", {}) + input_tokens = usage.get("prompt_tokens", 0) + output_tokens = usage.get("completion_tokens", 0) + cost = self._estimate_cost(model, input_tokens, output_tokens) + + return AllegroResponse( + text=text, + model=model, + input_tokens=input_tokens, + output_tokens=output_tokens, + latency_ms=latency_ms, + cost_usd=cost, + ) + + except requests.Timeout: + return AllegroResponse( + model=model, + latency_ms=(time.time() - t0) * 1000, + error="Request timed out (120s)", + ) + except Exception as e: + return AllegroResponse( + model=model, + latency_ms=(time.time() - t0) * 1000, + error=str(e), + ) + + def _stream_api( + self, + model: str, + messages: list[dict], + max_tokens: Optional[int] = None, + temperature: Optional[float] = None, + ) -> Iterator[str]: + """Stream tokens from the Kimi OpenAI-compat API.""" + url = f"{KIMI_API_BASE}/chat/completions" + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + payload: dict[str, Any] = { + "model": model, + "messages": messages, + "stream": True, + } + if max_tokens is not None: + payload["max_tokens"] = max_tokens + if temperature is not None: + payload["temperature"] = temperature + + t0 = time.time() + input_tokens = 0 + output_tokens = 0 + + with requests.post( + url, json=payload, headers=headers, stream=True, timeout=120 + ) as r: + r.raise_for_status() + for raw_line in r.iter_lines(): + if not raw_line: + continue + line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line + if not line.startswith("data: "): + continue + payload_str = line[6:] + if payload_str.strip() == "[DONE]": + break + try: + chunk = json.loads(payload_str) + delta = chunk.get("choices", [{}])[0].get("delta", {}) + content = delta.get("content", "") + if content: + output_tokens += 1 # rough estimate + yield content + # Capture usage if present in final chunk + usage = chunk.get("usage", {}) + if usage: + input_tokens = usage.get("prompt_tokens", input_tokens) + output_tokens = usage.get("completion_tokens", output_tokens) + except json.JSONDecodeError: + pass + + latency_ms = (time.time() - t0) * 1000 + cost = self._estimate_cost(model, input_tokens, output_tokens) + resp = AllegroResponse( + model=model, + input_tokens=input_tokens, + output_tokens=output_tokens, + latency_ms=latency_ms, + cost_usd=cost, + ) + self._record(resp) + + # ═══ INTERNAL: HELPERS ═══════════════════════════════════════════════ + + def _build_messages( + self, + prompt: Union[str, list[dict]], + system: Optional[str] = None, + use_cache: bool = True, + ) -> list[dict]: + """Build the messages list, injecting cached context if applicable.""" + messages: list[dict] = [] + + # System instruction + if system: + messages.append({"role": "system", "content": system}) + + # Cached context prepended as system memory + if use_cache and self._context_cache and self._context_cache.is_valid(): + self._context_cache.touch() + messages.append({ + "role": "system", + "content": f"[Project Context]\n{self._context_cache.content}", + }) + + # User message + if isinstance(prompt, str): + messages.append({"role": "user", "content": prompt}) + else: + messages.extend(prompt) + + return messages + + @staticmethod + def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float: + """Estimate USD cost from token counts.""" + in_rate = COST_PER_1M_INPUT.get(model, 0.60) + out_rate = COST_PER_1M_OUTPUT.get(model, 2.50) + return (input_tokens * in_rate + output_tokens * out_rate) / 1_000_000 + + def _record(self, response: AllegroResponse): + """Update session stats and emit telemetry for a completed response.""" + self.request_count += 1 + self.total_input_tokens += response.input_tokens + self.total_output_tokens += response.output_tokens + self.total_cost_usd += response.cost_usd + + log.info( + f"[{response.model}] {response.latency_ms:.0f}ms | " + f"in={response.input_tokens} out={response.output_tokens} | " + f"${response.cost_usd:.6f}" + ) + + # Fire-and-forget telemetry (don't block the caller) + try: + asyncio.get_event_loop().create_task( + self._send_telemetry({ + "type": "allegro_response", + "harness_id": HARNESS_ID, + "session_id": self.session_id, + "model": response.model, + "latency_ms": response.latency_ms, + "input_tokens": response.input_tokens, + "output_tokens": response.output_tokens, + "cost_usd": response.cost_usd, + "cached": response.cached, + "error": response.error, + }) + ) + except RuntimeError: + # No event loop running (sync context) — skip async telemetry + pass + + def _session_stats(self) -> dict: + return { + "session_id": self.session_id, + "request_count": self.request_count, + "total_input_tokens": self.total_input_tokens, + "total_output_tokens": self.total_output_tokens, + "total_cost_usd": round(self.total_cost_usd, 6), + } + + # ═══ HERMES WEBSOCKET ════════════════════════════════════════════════ + + async def _connect_hermes(self): + """Connect to Hermes WebSocket for telemetry.""" + try: + import websockets # type: ignore + self._ws = await websockets.connect(self.hermes_ws_url) + self._ws_connected = True + log.info(f"Connected to Hermes: {self.hermes_ws_url}") + except Exception as e: + log.warning(f"Hermes connection failed (telemetry disabled): {e}") + self._ws_connected = False + + async def _send_telemetry(self, data: dict): + """Send a telemetry event to Hermes.""" + if not self._ws_connected or not self._ws: + return + try: + await self._ws.send(json.dumps(data)) + except Exception as e: + log.warning(f"Telemetry send failed: {e}") + self._ws_connected = False + + # ═══ SOVEREIGN ORCHESTRATION REGISTRATION ════════════════════════════ + + def register_in_orchestration( + self, + orchestration_url: str = "http://localhost:8000/api/v1/workers/register", + ) -> bool: + """ + Register this harness as an available worker in sovereign orchestration. + + Sends a POST to the orchestration endpoint with harness metadata. + Returns True on success. + """ + payload = { + "worker_id": HARNESS_ID, + "name": HARNESS_NAME, + "session_id": self.session_id, + "model": self.model, + "fallback_chain": MODEL_FALLBACK_CHAIN, + "capabilities": ["text", "code", "streaming"], + "transport": "http+ws", + "registered_at": datetime.now(timezone.utc).isoformat(), + } + try: + r = requests.post(orchestration_url, json=payload, timeout=10) + if r.status_code in (200, 201): + log.info(f"Registered in orchestration: {orchestration_url}") + return True + log.warning( + f"Orchestration registration returned {r.status_code}: {r.text[:100]}" + ) + return False + except Exception as e: + log.warning(f"Orchestration registration failed: {e}") + return False + + +# ═══════════════════════════════════════════════════════════════════════════ +# HTTP SERVER — expose harness to the network +# ═══════════════════════════════════════════════════════════════════════════ + +def create_app(harness: AllegroHarness): + """ + Create a minimal HTTP app that exposes the harness to the network. + + Endpoints: + POST /generate — text/code generation + POST /generate/stream — streaming text generation + POST /generate/code — code generation + GET /health — health check (also validates Kimi API key) + GET /status — session stats + cache status + POST /context — set project context cache + DELETE /context — clear context cache + """ + from http.server import BaseHTTPRequestHandler, HTTPServer + + class AllegroHandler(BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + log.info(f"HTTP {fmt % args}") + + def _read_body(self) -> dict: + length = int(self.headers.get("Content-Length", 0)) + raw = self.rfile.read(length) if length else b"{}" + return json.loads(raw) + + def _send_json(self, data: dict, status: int = 200): + body = json.dumps(data).encode() + self.send_response(status) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def do_GET(self): + if self.path == "/health": + result = harness.health_check() + status = 200 if result["ok"] else 503 + self._send_json({**result, "harness": HARNESS_ID}, status) + elif self.path == "/status": + self._send_json({ + **harness._session_stats(), + "model": harness.model, + "context": harness.context_status(), + }) + else: + self._send_json({"error": "Not found"}, 404) + + def do_POST(self): + body = self._read_body() + + if self.path == "/generate": + prompt = body.get("prompt", "") + system = body.get("system") + use_cache = body.get("use_cache", True) + response = harness.generate( + prompt, system=system, use_cache=use_cache + ) + self._send_json(response.to_dict()) + + elif self.path == "/generate/code": + task = body.get("task", "") + language = body.get("language", "python") + context = body.get("context") + response = harness.generate_code(task, language=language, context=context) + self._send_json(response.to_dict()) + + elif self.path == "/context": + content = body.get("content", "") + ttl = float(body.get("ttl_seconds", 3600.0)) + harness.set_context(content, ttl_seconds=ttl) + self._send_json({"status": "cached", **harness.context_status()}) + + else: + self._send_json({"error": "Not found"}, 404) + + def do_DELETE(self): + if self.path == "/context": + harness.clear_context() + self._send_json({"status": "cleared"}) + else: + self._send_json({"error": "Not found"}, 404) + + return HTTPServer, AllegroHandler + + +# ═══════════════════════════════════════════════════════════════════════════ +# CLI ENTRYPOINT +# ═══════════════════════════════════════════════════════════════════════════ + +async def _async_start(harness: AllegroHarness): + await harness.start() + + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description=f"{HARNESS_NAME} — Timmy's Kimi harness worker", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + python -m nexus.allegro_harness "What is the meaning of sovereignty?" + python -m nexus.allegro_harness --model moonshot-v1-8k "Quick test" + python -m nexus.allegro_harness --serve --port 9400 + python -m nexus.allegro_harness --code "Write a fizzbuzz in Python" + python -m nexus.allegro_harness --health + +Environment Variables: + KIMI_API_KEY — required for all API calls + HERMES_WS_URL — Hermes telemetry endpoint + KIMI_MODEL — override default model + """, + ) + parser.add_argument( + "prompt", + nargs="?", + default=None, + help="Prompt to send (omit to use --serve mode)", + ) + parser.add_argument( + "--model", + default=None, + help=f"Model to use (default: {KIMI_MODEL_PRIMARY})", + ) + parser.add_argument( + "--serve", + action="store_true", + help="Start HTTP server to expose harness on the network", + ) + parser.add_argument( + "--port", + type=int, + default=DEFAULT_PORT, + help=f"HTTP server port (default: {DEFAULT_PORT})", + ) + parser.add_argument( + "--hermes-ws", + default=DEFAULT_HERMES_WS_URL, + help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})", + ) + parser.add_argument( + "--code", + metavar="TASK", + help="Generate code for TASK instead of plain text", + ) + parser.add_argument( + "--stream", + action="store_true", + help="Stream response chunks to stdout", + ) + parser.add_argument( + "--health", + action="store_true", + help="Run health check (verify Kimi API key) and exit", + ) + args = parser.parse_args() + + harness = AllegroHarness( + model=args.model, + hermes_ws_url=args.hermes_ws, + ) + + if args.health: + result = harness.health_check() + if result["ok"]: + print(f"OK — Kimi API key valid, model={result.get('model')}") + raise SystemExit(0) + else: + print(f"FAIL — {result.get('error')}") + raise SystemExit(1) + + if args.serve: + # Start harness registration then serve HTTP + asyncio.run(_async_start(harness)) + HTTPServer, AllegroHandler = create_app(harness) + server = HTTPServer(("0.0.0.0", args.port), AllegroHandler) + log.info(f"Serving on http://0.0.0.0:{args.port}") + log.info("Endpoints: /generate /generate/code /health /status /context") + try: + server.serve_forever() + except KeyboardInterrupt: + log.info("Shutting down server") + asyncio.run(harness.stop()) + return + + if args.code: + response = harness.generate_code(args.code) + elif args.prompt: + if args.stream: + for chunk in harness.stream_generate(args.prompt): + print(chunk, end="", flush=True) + print() + return + else: + response = harness.generate(args.prompt) + else: + parser.print_help() + return + + if response.error: + print(f"ERROR: {response.error}") + else: + print(response.text) + print( + f"\n[{response.model}] {response.latency_ms:.0f}ms | " + f"tokens: {response.input_tokens}→{response.output_tokens} | " + f"${response.cost_usd:.6f}", + flush=True, + ) + + +if __name__ == "__main__": + main()