"""LLM backends — Grok (xAI) and Claude (Anthropic). Provides drop-in replacements for the Agno Agent that expose the same run(message, stream) → RunResult interface used by the dashboard and the print_response(message, stream) interface used by the CLI. Backends: - GrokBackend: xAI Grok API via OpenAI-compatible SDK (opt-in premium) - ClaudeBackend: Anthropic Claude API — lightweight cloud fallback No cloud by default. No telemetry. Sats are sovereignty, boss. """ import logging import platform import time from dataclasses import dataclass from timmy.prompts import get_system_prompt logger = logging.getLogger(__name__) @dataclass class RunResult: """Minimal Agno-compatible run result — carries the model's response text.""" content: str confidence: float | None = None def is_apple_silicon() -> bool: """Return True when running on an M-series Mac (arm64 Darwin).""" return platform.system() == "Darwin" and platform.machine() == "arm64" # ── Grok (xAI) Backend ───────────────────────────────────────────────────── # Premium cloud augmentation — opt-in only, never the default path. # Available Grok models (configurable via GROK_DEFAULT_MODEL) GROK_MODELS: dict[str, str] = { "grok-3-fast": "grok-3-fast", "grok-3": "grok-3", "grok-3-mini": "grok-3-mini", "grok-3-mini-fast": "grok-3-mini-fast", } @dataclass class GrokUsageStats: """Tracks Grok API usage for cost monitoring and Spark logging.""" total_requests: int = 0 total_prompt_tokens: int = 0 total_completion_tokens: int = 0 total_latency_ms: float = 0.0 errors: int = 0 last_request_at: float | None = None @property def estimated_cost_sats(self) -> int: """Rough cost estimate in sats based on token usage.""" # ~$5/1M input tokens, ~$15/1M output tokens for Grok # At ~$100k/BTC, 1 sat ≈ $0.001 input_cost = (self.total_prompt_tokens / 1_000_000) * 5 output_cost = (self.total_completion_tokens / 1_000_000) * 15 total_usd = input_cost + output_cost return int(total_usd / 0.001) # Convert to sats class GrokBackend: """xAI Grok backend — premium cloud augmentation for frontier reasoning. Uses the OpenAI-compatible SDK to connect to xAI's API. Only activated when GROK_ENABLED=true and XAI_API_KEY is set. Exposes the same interface as Agno Agent: run(message, stream) → RunResult [dashboard] print_response(message, stream) → None [CLI] health_check() → dict [monitoring] """ def __init__( self, api_key: str | None = None, model: str | None = None, ) -> None: from config import settings self._api_key = api_key if api_key is not None else settings.xai_api_key self._model = model or settings.grok_default_model self._history: list[dict[str, str]] = [] self.stats = GrokUsageStats() if not self._api_key: logger.warning( "GrokBackend created without XAI_API_KEY — calls will fail until key is configured" ) def _get_client(self): """Create OpenAI client configured for xAI endpoint.""" from config import settings import httpx from openai import OpenAI return OpenAI( api_key=self._api_key, base_url=settings.xai_base_url, timeout=httpx.Timeout(300.0), ) async def _get_async_client(self): """Create async OpenAI client configured for xAI endpoint.""" from config import settings import httpx from openai import AsyncOpenAI return AsyncOpenAI( api_key=self._api_key, base_url=settings.xai_base_url, timeout=httpx.Timeout(300.0), ) # ── Public interface (mirrors Agno Agent) ───────────────────────────── def run(self, message: str, *, stream: bool = False) -> RunResult: """Synchronous inference via Grok API. Args: message: User prompt stream: Accepted for API compat; Grok returns full response Returns: RunResult with response content """ if not self._api_key: return RunResult(content="Grok is not configured. Set XAI_API_KEY to enable.") start = time.time() messages = self._build_messages(message) try: client = self._get_client() response = client.chat.completions.create( model=self._model, messages=messages, temperature=0.7, ) content = response.choices[0].message.content or "" latency_ms = (time.time() - start) * 1000 # Track usage self.stats.total_requests += 1 self.stats.total_latency_ms += latency_ms self.stats.last_request_at = time.time() if response.usage: self.stats.total_prompt_tokens += response.usage.prompt_tokens self.stats.total_completion_tokens += response.usage.completion_tokens # Update conversation history self._history.append({"role": "user", "content": message}) self._history.append({"role": "assistant", "content": content}) # Keep last 10 turns if len(self._history) > 20: self._history = self._history[-20:] logger.info( "Grok response: %d tokens in %.0fms (model=%s)", response.usage.completion_tokens if response.usage else 0, latency_ms, self._model, ) return RunResult(content=content) except Exception as exc: self.stats.errors += 1 logger.error("Grok API error: %s", exc) return RunResult(content=f"Grok temporarily unavailable: {exc}") async def arun(self, message: str) -> RunResult: """Async inference via Grok API — used by cascade router and tools.""" if not self._api_key: return RunResult(content="Grok is not configured. Set XAI_API_KEY to enable.") start = time.time() messages = self._build_messages(message) try: client = await self._get_async_client() response = await client.chat.completions.create( model=self._model, messages=messages, temperature=0.7, ) content = response.choices[0].message.content or "" latency_ms = (time.time() - start) * 1000 # Track usage self.stats.total_requests += 1 self.stats.total_latency_ms += latency_ms self.stats.last_request_at = time.time() if response.usage: self.stats.total_prompt_tokens += response.usage.prompt_tokens self.stats.total_completion_tokens += response.usage.completion_tokens # Update conversation history self._history.append({"role": "user", "content": message}) self._history.append({"role": "assistant", "content": content}) if len(self._history) > 20: self._history = self._history[-20:] logger.info( "Grok async response: %d tokens in %.0fms (model=%s)", response.usage.completion_tokens if response.usage else 0, latency_ms, self._model, ) return RunResult(content=content) except Exception as exc: self.stats.errors += 1 logger.error("Grok async API error: %s", exc) return RunResult(content=f"Grok temporarily unavailable: {exc}") def print_response(self, message: str, *, stream: bool = True) -> None: """Run inference and render the response to stdout (CLI interface).""" result = self.run(message, stream=stream) try: from rich.console import Console from rich.markdown import Markdown Console().print(Markdown(result.content)) except ImportError: print(result.content) def health_check(self) -> dict: """Check Grok API connectivity and return status.""" if not self._api_key: return { "ok": False, "error": "XAI_API_KEY not configured", "backend": "grok", "model": self._model, } try: client = self._get_client() # Lightweight check — list models client.models.list() return { "ok": True, "error": None, "backend": "grok", "model": self._model, "stats": { "total_requests": self.stats.total_requests, "estimated_cost_sats": self.stats.estimated_cost_sats, }, } except Exception as exc: return { "ok": False, "error": str(exc), "backend": "grok", "model": self._model, } @property def estimated_cost(self) -> int: """Return estimated cost in sats for all requests so far.""" return self.stats.estimated_cost_sats # ── Private helpers ─────────────────────────────────────────────────── def _build_messages(self, message: str) -> list[dict[str, str]]: """Build the messages array for the API call.""" messages = [ {"role": "system", "content": get_system_prompt(tools_enabled=True, session_id="grok")} ] # Include conversation history for context messages.extend(self._history[-10:]) messages.append({"role": "user", "content": message}) return messages # ── Module-level Grok singleton ───────────────────────────────────────────── _grok_backend: GrokBackend | None = None def get_grok_backend() -> GrokBackend: """Get or create the Grok backend singleton.""" global _grok_backend if _grok_backend is None: _grok_backend = GrokBackend() return _grok_backend def grok_available() -> bool: """Return True when Grok is enabled and API key is configured.""" try: from config import settings return settings.grok_enabled and bool(settings.xai_api_key) except Exception as exc: logger.warning("Backend check failed (grok_available): %s", exc) return False # ── Claude (Anthropic) Backend ───────────────────────────────────────────── # Lightweight cloud fallback — used when Ollama is offline and the user # has set ANTHROPIC_API_KEY. Follows the same sovereign-first philosophy: # never the default, only activated explicitly or as a last-resort fallback. CLAUDE_MODELS: dict[str, str] = { "haiku": "claude-haiku-4-5-20251001", "sonnet": "claude-sonnet-4-20250514", "opus": "claude-opus-4-20250514", } class ClaudeBackend: """Anthropic Claude backend — cloud fallback when local models are offline. Uses the official Anthropic SDK. Same interface as GrokBackend: run(message, stream) → RunResult [dashboard] print_response(message, stream) → None [CLI] health_check() → dict [monitoring] """ def __init__( self, api_key: str | None = None, model: str | None = None, ) -> None: from config import settings self._api_key = api_key or settings.anthropic_api_key raw_model = model or settings.claude_model # Allow short names like "haiku" / "sonnet" / "opus" self._model = CLAUDE_MODELS.get(raw_model, raw_model) self._history: list[dict[str, str]] = [] if not self._api_key: logger.warning( "ClaudeBackend created without ANTHROPIC_API_KEY — " "calls will fail until key is configured" ) def _get_client(self): """Create Anthropic client.""" import anthropic return anthropic.Anthropic(api_key=self._api_key) # ── Public interface (mirrors Agno Agent) ───────────────────────────── def run(self, message: str, *, stream: bool = False, **kwargs) -> RunResult: """Synchronous inference via Claude API.""" if not self._api_key: return RunResult(content="Claude is not configured. Set ANTHROPIC_API_KEY to enable.") start = time.time() messages = self._build_messages(message) try: client = self._get_client() response = client.messages.create( model=self._model, max_tokens=1024, system=get_system_prompt(tools_enabled=True, session_id="claude"), messages=messages, ) content = response.content[0].text if response.content else "" latency_ms = (time.time() - start) * 1000 # Update conversation history self._history.append({"role": "user", "content": message}) self._history.append({"role": "assistant", "content": content}) if len(self._history) > 20: self._history = self._history[-20:] logger.info( "Claude response: %d chars in %.0fms (model=%s)", len(content), latency_ms, self._model, ) return RunResult(content=content) except Exception as exc: logger.error("Claude API error: %s", exc) return RunResult(content=f"Claude temporarily unavailable: {exc}") def print_response(self, message: str, *, stream: bool = True) -> None: """Run inference and render the response to stdout (CLI interface).""" result = self.run(message, stream=stream) try: from rich.console import Console from rich.markdown import Markdown Console().print(Markdown(result.content)) except ImportError: print(result.content) def health_check(self) -> dict: """Check Claude API connectivity.""" if not self._api_key: return { "ok": False, "error": "ANTHROPIC_API_KEY not configured", "backend": "claude", "model": self._model, } try: client = self._get_client() # Lightweight ping — tiny completion client.messages.create( model=self._model, max_tokens=4, messages=[{"role": "user", "content": "ping"}], ) return {"ok": True, "error": None, "backend": "claude", "model": self._model} except Exception as exc: return {"ok": False, "error": str(exc), "backend": "claude", "model": self._model} # ── Private helpers ─────────────────────────────────────────────────── def _build_messages(self, message: str) -> list[dict[str, str]]: """Build the messages array for the API call.""" messages = list(self._history[-10:]) messages.append({"role": "user", "content": message}) return messages # ── Module-level Claude singleton ────────────────────────────────────────── _claude_backend: ClaudeBackend | None = None def get_claude_backend() -> ClaudeBackend: """Get or create the Claude backend singleton.""" global _claude_backend if _claude_backend is None: _claude_backend = ClaudeBackend() return _claude_backend def claude_available() -> bool: """Return True when Anthropic API key is configured.""" try: from config import settings return bool(settings.anthropic_api_key) except Exception as exc: logger.warning("Backend check failed (claude_available): %s", exc) return False