Files
the-nexus/nexus/bilbo_harness.py
Alexander Whitestone ed0ba7f5d8 WIP: Claude Code progress on #825
Automated salvage commit — agent session ended (exit 1).
Work in progress, may need continuation.
2026-04-04 15:45:15 -04:00

723 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Bilbo Harness — Light-Duty Gateway backed by local Gemma 4B (Ollama)
Bilbo's lane: documentation, labelling, tagging, formatting.
Free local compute — no API key, no cost, no cloud dependency.
Architecture:
Timmy (sovereign)
├── Ezra (harness — Claude Opus 4.6, architecture/triage)
├── Bezalel (harness — Claude Opus 4.6, security/forge)
├── Allegro (harness — Kimi K2.5, bulk code execution)
└── Bilbo (harness — Gemma 4B local, light-duty support) ← this module
Routing principles:
- DO route here: doc stubs, tag/label extraction, README updates, issue formatting
- DO NOT route here: security audits, complex reasoning, multi-step refactors
Ollama must be running locally with the gemma model pulled:
ollama pull gemma3:4b (or gemma:4b, gemma2:2b — see BILBO_MODEL env var)
ollama serve
Usage:
# Single prompt:
python -m nexus.bilbo_harness "Summarise this issue: ..."
# Serve as HTTP gateway:
python -m nexus.bilbo_harness --serve --port 9400
# Summarise a file:
python -m nexus.bilbo_harness --summarise path/to/file.md
Environment Variables:
BILBO_MODEL — Ollama model tag (default: gemma3:4b)
OLLAMA_BASE_URL — Ollama HTTP base (default: http://localhost:11434)
HERMES_WS_URL — Hermes telemetry WebSocket (default: ws://localhost:8000/ws)
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Iterator, Optional, Union
import requests
log = logging.getLogger("bilbo")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [bilbo] %(message)s",
datefmt="%H:%M:%S",
)
# ═══════════════════════════════════════════════════════════════════════════
# CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════
BILBO_MODEL_DEFAULT = "gemma3:4b"
# Ollama OpenAI-compatible endpoint (v0.1.24+)
OLLAMA_BASE_URL = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
OLLAMA_CHAT_URL = f"{OLLAMA_BASE_URL}/v1/chat/completions"
OLLAMA_TAGS_URL = f"{OLLAMA_BASE_URL}/api/tags"
DEFAULT_HERMES_WS_URL = os.environ.get("HERMES_WS_URL", "ws://localhost:8000/ws")
HARNESS_ID = "bilbo"
HARNESS_NAME = "Bilbo Harness"
# Light-duty task types Bilbo handles well
BILBO_TASK_LANES = ["documentation", "tagging", "labelling", "formatting", "summarisation"]
# ═══════════════════════════════════════════════════════════════════════════
# DATA CLASSES
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class BilboResponse:
"""Response from a Bilbo generate call."""
text: str = ""
model: str = ""
input_tokens: int = 0
output_tokens: int = 0
latency_ms: float = 0.0
error: Optional[str] = None
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_dict(self) -> dict:
return {
"text": self.text,
"model": self.model,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"latency_ms": self.latency_ms,
"error": self.error,
"timestamp": self.timestamp,
}
# ═══════════════════════════════════════════════════════════════════════════
# BILBO HARNESS
# ═══════════════════════════════════════════════════════════════════════════
class BilboHarness:
"""
Bilbo gateway harness — local Gemma 4B via Ollama.
Handles light-duty tasks: documentation stubs, tag extraction, issue
formatting, README updates, label suggestions.
All calls use the Ollama OpenAI-compatible endpoint so the same
request shape works against any future model swap.
"""
def __init__(
self,
model: Optional[str] = None,
ollama_base_url: str = OLLAMA_BASE_URL,
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
):
self.model = model or os.environ.get("BILBO_MODEL", BILBO_MODEL_DEFAULT)
self.ollama_base_url = ollama_base_url
self.chat_url = f"{ollama_base_url}/v1/chat/completions"
self.hermes_ws_url = hermes_ws_url
# Session bookkeeping
self.session_id = str(uuid.uuid4())[:8]
self.request_count = 0
self.total_input_tokens = 0
self.total_output_tokens = 0
# WebSocket connection (lazy)
self._ws = None
self._ws_connected = False
# ═══ LIFECYCLE ═══════════════════════════════════════════════════════
async def start(self):
"""Register harness on the network via Hermes WebSocket."""
log.info("=" * 50)
log.info(f"{HARNESS_NAME} — STARTING")
log.info(f" Session: {self.session_id}")
log.info(f" Model: {self.model}")
log.info(f" Ollama: {self.ollama_base_url}")
log.info(f" Hermes: {self.hermes_ws_url}")
log.info(f" Lane: {', '.join(BILBO_TASK_LANES)}")
log.info("=" * 50)
await self._connect_hermes()
await self._send_telemetry({
"type": "harness_register",
"harness_id": HARNESS_ID,
"session_id": self.session_id,
"model": self.model,
"capabilities": BILBO_TASK_LANES,
"transport": "ollama-local",
})
log.info("Bilbo registered on network")
async def stop(self):
"""Deregister and disconnect."""
await self._send_telemetry({
"type": "harness_deregister",
"harness_id": HARNESS_ID,
"session_id": self.session_id,
"stats": self._session_stats(),
})
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws_connected = False
log.info(f"{HARNESS_NAME} stopped. {self._session_stats()}")
# ═══ HEALTH CHECK ═══════════════════════════════════════════════════
def check_ollama(self) -> dict:
"""
Verify Ollama is running and the configured model is available.
Returns dict with keys: running (bool), model_available (bool),
available_models (list[str]), error (str|None).
"""
try:
r = requests.get(f"{self.ollama_base_url}/api/tags", timeout=5)
if r.status_code != 200:
return {
"running": False,
"model_available": False,
"available_models": [],
"error": f"Ollama returned HTTP {r.status_code}",
}
data = r.json()
models = [m["name"] for m in data.get("models", [])]
# Match on prefix (gemma3:4b matches gemma3:4b-instruct-q4_0, etc.)
model_available = any(
m == self.model or m.startswith(self.model.split(":")[0])
for m in models
)
return {
"running": True,
"model_available": model_available,
"available_models": models,
"error": None,
}
except requests.ConnectionError:
return {
"running": False,
"model_available": False,
"available_models": [],
"error": f"Cannot connect to Ollama at {self.ollama_base_url}",
}
except Exception as e:
return {
"running": False,
"model_available": False,
"available_models": [],
"error": str(e),
}
# ═══ CORE GENERATION ═════════════════════════════════════════════════
def generate(
self,
prompt: Union[str, list[dict]],
*,
system: Optional[str] = None,
max_tokens: Optional[int] = None,
temperature: float = 0.3,
) -> BilboResponse:
"""
Generate a response from the local Gemma model via Ollama.
Args:
prompt: String prompt or list of message dicts
system: Optional system instruction
max_tokens: Override default max output tokens (None = Ollama default)
temperature: Sampling temperature (default: 0.3 for focused output)
Returns:
BilboResponse with text, token counts, latency
"""
messages = self._build_messages(prompt, system=system)
response = self._call_ollama(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
self._record(response)
return response
def summarise(self, text: str, max_words: int = 100) -> BilboResponse:
"""
Summarise text in plain language.
Args:
text: Content to summarise
max_words: Target word count for the summary
Returns:
BilboResponse with the summary in .text
"""
system = (
"You are a concise technical writer. "
"Summarise the provided text clearly and accurately. "
"Use plain language. Avoid jargon. Be brief."
)
prompt = (
f"Summarise the following in approximately {max_words} words:\n\n{text}"
)
return self.generate(prompt, system=system, temperature=0.2)
def extract_tags(self, text: str) -> BilboResponse:
"""
Extract relevant tags/labels from text for issue or doc labelling.
Returns:
BilboResponse where .text contains a comma-separated tag list
"""
system = (
"You are a tagging assistant. "
"Given some text, output a comma-separated list of short, lowercase tags "
"(3-8 tags). Output ONLY the comma-separated list, nothing else."
)
prompt = f"Extract tags for:\n\n{text}"
return self.generate(prompt, system=system, temperature=0.1, max_tokens=64)
def format_doc(self, text: str, target_format: str = "markdown") -> BilboResponse:
"""
Reformat or clean up a documentation snippet.
Args:
text: The raw documentation text
target_format: Output format (default: markdown)
Returns:
BilboResponse with the reformatted content in .text
"""
system = (
f"You are a documentation formatter. "
f"Reformat the provided text as clean {target_format}. "
f"Fix whitespace, headings, and lists. Preserve meaning exactly."
)
prompt = f"Reformat this documentation:\n\n{text}"
return self.generate(prompt, system=system, temperature=0.1)
def write_doc_stub(self, signature: str, context: str = "") -> BilboResponse:
"""
Write a documentation stub for a function/class signature.
Args:
signature: Function or class signature string
context: Optional surrounding code context
Returns:
BilboResponse with the docstring stub in .text
"""
system = (
"You are a Python docstring writer. "
"Write a concise docstring for the given signature. "
"Include Args and Returns sections where applicable. "
"Output only the docstring, including triple-quotes."
)
prompt = signature
if context:
prompt = f"Context:\n{context}\n\nSignature: {signature}"
return self.generate(prompt, system=system, temperature=0.2)
# ═══ INTERNAL: API CALL ══════════════════════════════════════════════
def _call_ollama(
self,
messages: list[dict],
max_tokens: Optional[int] = None,
temperature: float = 0.3,
) -> BilboResponse:
"""Make a single call to the Ollama OpenAI-compatible endpoint."""
headers = {"Content-Type": "application/json"}
payload: dict[str, Any] = {
"model": self.model,
"messages": messages,
"stream": False,
"options": {"temperature": temperature},
}
if max_tokens is not None:
payload["options"]["num_predict"] = max_tokens
t0 = time.time()
try:
r = requests.post(
self.chat_url, json=payload, headers=headers, timeout=120
)
latency_ms = (time.time() - t0) * 1000
if r.status_code != 200:
return BilboResponse(
model=self.model,
latency_ms=latency_ms,
error=f"HTTP {r.status_code}: {r.text[:200]}",
)
data = r.json()
choice = data.get("choices", [{}])[0]
text = choice.get("message", {}).get("content", "")
usage = data.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
return BilboResponse(
text=text,
model=self.model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
)
except requests.Timeout:
return BilboResponse(
model=self.model,
latency_ms=(time.time() - t0) * 1000,
error="Request timed out (120s) — model may still be loading",
)
except requests.ConnectionError:
return BilboResponse(
model=self.model,
latency_ms=(time.time() - t0) * 1000,
error=(
f"Cannot connect to Ollama at {self.ollama_base_url}. "
"Run: ollama serve"
),
)
except Exception as e:
return BilboResponse(
model=self.model,
latency_ms=(time.time() - t0) * 1000,
error=str(e),
)
# ═══ INTERNAL: HELPERS ═══════════════════════════════════════════════
@staticmethod
def _build_messages(
prompt: Union[str, list[dict]],
system: Optional[str] = None,
) -> list[dict]:
"""Build the messages list for Ollama chat API."""
messages: list[dict] = []
if system:
messages.append({"role": "system", "content": system})
if isinstance(prompt, str):
messages.append({"role": "user", "content": prompt})
else:
messages.extend(prompt)
return messages
def _record(self, response: BilboResponse):
"""Update session stats and emit telemetry for a completed response."""
self.request_count += 1
self.total_input_tokens += response.input_tokens
self.total_output_tokens += response.output_tokens
if response.error:
log.warning(f"[{response.model}] error: {response.error}")
else:
log.info(
f"[{response.model}] {response.latency_ms:.0f}ms | "
f"in={response.input_tokens} out={response.output_tokens}"
)
try:
asyncio.get_event_loop().create_task(
self._send_telemetry({
"type": "bilbo_response",
"harness_id": HARNESS_ID,
"session_id": self.session_id,
"model": response.model,
"latency_ms": response.latency_ms,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"error": response.error,
})
)
except RuntimeError:
pass
def _session_stats(self) -> dict:
return {
"session_id": self.session_id,
"request_count": self.request_count,
"total_input_tokens": self.total_input_tokens,
"total_output_tokens": self.total_output_tokens,
}
# ═══ HERMES WEBSOCKET ════════════════════════════════════════════════
async def _connect_hermes(self):
"""Connect to Hermes WebSocket for telemetry."""
try:
import websockets # type: ignore
self._ws = await websockets.connect(self.hermes_ws_url)
self._ws_connected = True
log.info(f"Connected to Hermes: {self.hermes_ws_url}")
except Exception as e:
log.warning(f"Hermes connection failed (telemetry disabled): {e}")
self._ws_connected = False
async def _send_telemetry(self, data: dict):
"""Send a telemetry event to Hermes."""
if not self._ws_connected or not self._ws:
return
try:
await self._ws.send(json.dumps(data))
except Exception as e:
log.warning(f"Telemetry send failed: {e}")
self._ws_connected = False
# ═══════════════════════════════════════════════════════════════════════════
# HTTP SERVER — expose harness to the network
# ═══════════════════════════════════════════════════════════════════════════
def create_app(harness: BilboHarness):
"""
Create a minimal HTTP app exposing Bilbo's harness to the network.
Endpoints:
POST /generate — general text generation
POST /summarise — summarise provided text
POST /extract-tags — extract tags from text
POST /format-doc — reformat documentation
POST /write-doc-stub — write a docstring stub
GET /health — health check (includes Ollama status)
GET /status — session stats
"""
from http.server import BaseHTTPRequestHandler, HTTPServer
class BilboHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
log.info(f"HTTP {fmt % args}")
def _read_body(self) -> dict:
length = int(self.headers.get("Content-Length", 0))
raw = self.rfile.read(length) if length else b"{}"
return json.loads(raw)
def _send_json(self, data: dict, status: int = 200):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
if self.path == "/health":
ollama_status = harness.check_ollama()
self._send_json({
"status": "ok" if ollama_status["running"] else "degraded",
"harness": HARNESS_ID,
"model": harness.model,
"ollama": ollama_status,
})
elif self.path == "/status":
self._send_json({
**harness._session_stats(),
"model": harness.model,
"ollama_base_url": harness.ollama_base_url,
"lanes": BILBO_TASK_LANES,
})
else:
self._send_json({"error": "Not found"}, 404)
def do_POST(self):
body = self._read_body()
if self.path == "/generate":
prompt = body.get("prompt", "")
system = body.get("system")
max_tokens = body.get("max_tokens")
temperature = float(body.get("temperature", 0.3))
response = harness.generate(
prompt, system=system, max_tokens=max_tokens,
temperature=temperature,
)
self._send_json(response.to_dict())
elif self.path == "/summarise":
text = body.get("text", "")
max_words = int(body.get("max_words", 100))
response = harness.summarise(text, max_words=max_words)
self._send_json(response.to_dict())
elif self.path == "/extract-tags":
text = body.get("text", "")
response = harness.extract_tags(text)
self._send_json(response.to_dict())
elif self.path == "/format-doc":
text = body.get("text", "")
target_format = body.get("format", "markdown")
response = harness.format_doc(text, target_format=target_format)
self._send_json(response.to_dict())
elif self.path == "/write-doc-stub":
signature = body.get("signature", "")
context = body.get("context", "")
response = harness.write_doc_stub(signature, context=context)
self._send_json(response.to_dict())
else:
self._send_json({"error": "Not found"}, 404)
return HTTPServer, BilboHandler
# ═══════════════════════════════════════════════════════════════════════════
# CLI ENTRYPOINT
# ═══════════════════════════════════════════════════════════════════════════
async def _async_start(harness: BilboHarness):
await harness.start()
def main():
import argparse
parser = argparse.ArgumentParser(
description=f"{HARNESS_NAME} — Bilbo light-duty gateway (Gemma 4B local)",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python -m nexus.bilbo_harness "Write a one-line description of the heartbeat module"
python -m nexus.bilbo_harness --summarise path/to/doc.md
python -m nexus.bilbo_harness --tags "Python async websocket telemetry harness"
python -m nexus.bilbo_harness --serve --port 9400
python -m nexus.bilbo_harness --check
Environment Variables:
BILBO_MODEL — Ollama model tag (default: gemma3:4b)
OLLAMA_BASE_URL — Ollama HTTP base (default: http://localhost:11434)
HERMES_WS_URL — Hermes telemetry endpoint
""",
)
parser.add_argument(
"prompt",
nargs="?",
default=None,
help="Prompt to send (omit for --serve or task-specific flags)",
)
parser.add_argument(
"--model",
default=None,
help=f"Ollama model tag (default: {BILBO_MODEL_DEFAULT})",
)
parser.add_argument(
"--serve",
action="store_true",
help="Start HTTP server to expose harness on the network",
)
parser.add_argument(
"--port",
type=int,
default=9400,
help="HTTP server port (default: 9400)",
)
parser.add_argument(
"--hermes-ws",
default=DEFAULT_HERMES_WS_URL,
help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})",
)
parser.add_argument(
"--check",
action="store_true",
help="Check Ollama status and model availability, then exit",
)
parser.add_argument(
"--summarise",
metavar="FILE_OR_TEXT",
help="Summarise a file path or inline text",
)
parser.add_argument(
"--tags",
metavar="TEXT",
help="Extract tags from TEXT",
)
args = parser.parse_args()
harness = BilboHarness(
model=args.model,
hermes_ws_url=args.hermes_ws,
)
if args.check:
status = harness.check_ollama()
print(json.dumps(status, indent=2))
if not status["running"]:
print("\n[!] Ollama is not running. Start it with: ollama serve")
elif not status["model_available"]:
print(
f"\n[!] Model '{harness.model}' not found. "
f"Pull it with: ollama pull {harness.model}"
)
else:
print(f"\n[OK] Bilbo gateway ready. Model: {harness.model}")
return
if args.serve:
asyncio.run(_async_start(harness))
HTTPServer, BilboHandler = create_app(harness)
server = HTTPServer(("0.0.0.0", args.port), BilboHandler)
log.info(f"Bilbo serving on http://0.0.0.0:{args.port}")
log.info(
"Endpoints: /generate /summarise /extract-tags "
"/format-doc /write-doc-stub /health /status"
)
try:
server.serve_forever()
except KeyboardInterrupt:
log.info("Shutting down Bilbo gateway")
asyncio.run(harness.stop())
return
if args.summarise:
import pathlib
p = pathlib.Path(args.summarise)
text = p.read_text() if p.exists() else args.summarise
response = harness.summarise(text)
elif args.tags:
response = harness.extract_tags(args.tags)
elif args.prompt:
response = harness.generate(args.prompt)
else:
parser.print_help()
return
if response.error:
print(f"ERROR: {response.error}")
if "ollama serve" in (response.error or ""):
print(
"\nStart Ollama with: ollama serve\n"
f"Pull the model with: ollama pull {harness.model}"
)
else:
print(response.text)
print(
f"\n[{response.model}] {response.latency_ms:.0f}ms | "
f"tokens: {response.input_tokens}{response.output_tokens}",
flush=True,
)
if __name__ == "__main__":
main()