Compare commits

...

4 Commits

Author SHA1 Message Date
bf27768ba6 [ezra] Deep Dive scaffold for #830: deepdive_orchestrator.py 2026-04-05 01:48:59 +00:00
ca7591cfb5 [ezra] Deep Dive scaffold for #830: deepdive_aggregator.py 2026-04-05 01:48:58 +00:00
0380bc065e [ezra] Deep Dive scaffold for #830: DEEPSDIVE_ARCHITECTURE.md 2026-04-05 01:48:58 +00:00
Alexander Whitestone
197b42584b WIP: Claude Code progress on #824
Automated salvage commit — agent session ended (exit 1).
Work in progress, may need continuation.
2026-04-04 15:45:38 -04:00
4 changed files with 1200 additions and 0 deletions

116
bin/deepdive_aggregator.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""deepdive_aggregator.py — Phase 1: Intelligence source aggregation. Issue #830."""
import argparse
import json
import xml.etree.ElementTree as ET
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import List, Optional
from pathlib import Path
import urllib.request
@dataclass
class RawItem:
source: str
title: str
url: str
content: str
published: str
authors: Optional[str] = None
categories: Optional[List[str]] = None
class ArxivRSSAdapter:
def __init__(self, category: str):
self.name = f"arxiv_{category}"
self.url = f"http://export.arxiv.org/rss/{category}"
def fetch(self) -> List[RawItem]:
try:
with urllib.request.urlopen(self.url, timeout=30) as resp:
xml_content = resp.read()
except Exception as e:
print(f"Error fetching {self.url}: {e}")
return []
items = []
try:
root = ET.fromstring(xml_content)
channel = root.find("channel")
if channel is None:
return items
for item in channel.findall("item"):
title = item.findtext("title", default="")
link = item.findtext("link", default="")
desc = item.findtext("description", default="")
pub_date = item.findtext("pubDate", default="")
items.append(RawItem(
source=self.name,
title=title.strip(),
url=link,
content=desc[:2000],
published=self._parse_date(pub_date),
categories=[self.category]
))
except ET.ParseError as e:
print(f"Parse error: {e}")
return items
def _parse_date(self, date_str: str) -> str:
from email.utils import parsedate_to_datetime
try:
dt = parsedate_to_datetime(date_str)
return dt.isoformat()
except:
return datetime.now().isoformat()
SOURCE_REGISTRY = {
"arxiv_cs_ai": lambda: ArxivRSSAdapter("cs.AI"),
"arxiv_cs_cl": lambda: ArxivRSSAdapter("cs.CL"),
"arxiv_cs_lg": lambda: ArxivRSSAdapter("cs.LG"),
}
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--sources", default="arxiv_cs_ai,arxiv_cs_cl")
parser.add_argument("--output")
args = parser.parse_args()
sources = [s.strip() for s in args.sources.split(",")]
all_items = []
for source_name in sources:
if source_name not in SOURCE_REGISTRY:
print(f"[WARN] Unknown source: {source_name}")
continue
adapter = SOURCE_REGISTRY[source_name]()
items = adapter.fetch()
all_items.extend(items)
print(f"[INFO] {source_name}: {len(items)} items")
all_items.sort(key=lambda x: x.published, reverse=True)
output = {
"metadata": {
"count": len(all_items),
"sources": sources,
"generated": datetime.now().isoformat()
},
"items": [asdict(i) for i in all_items]
}
if args.output:
Path(args.output).write_text(json.dumps(output, indent=2))
else:
print(json.dumps(output, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""deepdive_orchestrator.py — Deep Dive pipeline controller. Issue #830."""
import argparse
import json
import subprocess
import sys
from datetime import datetime
from pathlib import Path
DEFAULT_CONFIG = {
"sources": ["arxiv_cs_ai", "arxiv_cs_cl", "arxiv_cs_lg"],
"max_items": 10,
"tts_enabled": False,
}
class Orchestrator:
def __init__(self, date: str = None):
self.date = date or datetime.now().strftime("%Y-%m-%d")
self.state_dir = Path("~/the-nexus/deepdive_state").expanduser() / self.date
self.state_dir.mkdir(parents=True, exist_ok=True)
def phase1(self, sources):
"""Aggregate from sources."""
print("[PHASE 1] Aggregating...")
output_file = self.state_dir / "raw_items.json"
subprocess.run([
sys.executable, Path(__file__).parent / "deepdive_aggregator.py",
"--sources", ",".join(sources), "--output", str(output_file)
])
return json.loads(output_file.read_text())
def phase2(self, raw_items, max_items):
"""Filter by keywords."""
print("[PHASE 2] Filtering...")
keywords = ["agent", "llm", "tool use", "rlhf", "alignment", "finetuning"]
scored = []
for item in raw_items["items"]:
content = f"{item.get('title','')} {item.get('content','')}".lower()
score = sum(1 for kw in keywords if kw in content)
scored.append({**item, "score": score})
scored.sort(key=lambda x: x["score"], reverse=True)
top = scored[:max_items]
output_file = self.state_dir / "ranked.json"
output_file.write_text(json.dumps({"items": top}, indent=2))
return top
def phase3(self, ranked_items):
"""Synthesize briefing."""
print("[PHASE 3] Synthesizing (MVP: structured text)...")
md = f"# Deep Dive — {self.date}\n\n"
for i, item in enumerate(ranked_items[:3], 1):
md += f"{i}. [{item['title']}]({item['url']}) — Score: {item['score']}\n\n"
briefing_file = self.state_dir / "briefing.md"
briefing_file.write_text(md)
return str(briefing_file)
def phase4(self, briefing_file):
"""Generate audio."""
print("[PHASE 4] ⚠ TTS decision needed — skipping")
return None
def phase5(self, briefing_file, audio_file):
"""Deliver."""
print("[PHASE 5] ⚠ Telegram delivery not integrated")
text = Path(briefing_file).read_text()
print(text[:500])
return {"status": "logged"}
def run(self, config):
raw = self.phase1(config["sources"])
ranked = self.phase2(raw, config["max_items"])
briefing = self.phase3(ranked)
audio = self.phase4(briefing)
return self.phase5(briefing, audio)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--daily", action="store_true")
parser.add_argument("--date")
args = parser.parse_args()
orch = Orchestrator(date=args.date)
result = orch.run(DEFAULT_CONFIG)
print(f"[DONE] State: {orch.state_dir}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,88 @@
# Deep Dive — Sovereign NotebookLM Architecture
> Parent: [#830](http://143.198.27.163:3000/Timmy_Foundation/the-nexus/issues/830)
> Status: Architecture committed, awaiting infrastructure decisions
> Owner: @ezra
> Created: 2026-04-05
## Vision
**Deep Dive** is a fully automated daily intelligence briefing system that eliminates the 20+ minute manual research overhead. It produces a personalized AI-generated podcast (or text briefing) with **zero manual input**.
Unlike NotebookLM which requires manual source curation, Deep Dive operates autonomously.
## Architecture Overview
```
┌──────────────────────────────────────────────────────────────────────────────┐
│ D E E P D I V E P I P E L I N E │
├──────────────────────────────────────────────────────────────────────────────┤
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌────────┐ │
│ │ AGGREGATE │──▶│ FILTER │──▶│ SYNTHESIZE│──▶│ AUDIO │──▶│DELIVER │ │
│ │ arXiv RSS │ │ Keywords │ │ LLM brief │ │ TTS voice │ │Telegram│ │
│ └───────────┘ └───────────┘ └───────────┘ └───────────┘ └────────┘ │
└──────────────────────────────────────────────────────────────────────────────┘
```
## Phase Specifications
### Phase 1: Aggregate
Fetches from arXiv RSS (cs.AI, cs.CL, cs.LG), lab blogs, newsletters.
**Output**: `List[RawItem]`
**Implementation**: `bin/deepdive_aggregator.py`
### Phase 2: Filter
Ranks items by keyword relevance to Hermes/Timmy work.
**Scoring Algorithm (MVP)**:
```python
keywords = ["agent", "llm", "tool use", "rlhf", "alignment"]
score = sum(1 for kw in keywords if kw in content)
```
### Phase 3: Synthesize
LLM generates structured briefing: HEADLINES, DEEP DIVES, BOTTOM LINE.
### Phase 4: Audio
TTS converts briefing to MP3 (10-15 min).
**Decision needed**: Local (Piper/coqui) vs API (ElevenLabs/OpenAI)
### Phase 5: Deliver
Telegram voice message delivered at scheduled time (default 6 AM).
## Implementation Path
### MVP (2 hours, Phases 1+5)
arXiv RSS → keyword filter → text briefing → Telegram text at 6 AM
### V1 (1 week, Phases 1-3+5)
Add LLM synthesis, more sources
### V2 (2 weeks, Full)
Add TTS audio, embedding-based filtering
## Integration Points
| System | Point | Status |
|--------|-------|--------|
| Hermes | `/deepdive` command | Pending |
| timmy-config | `cron/jobs.json` entry | Ready |
| Telegram | Voice delivery | Existing |
| TTS Service | Local vs API | **NEEDS DECISION** |
## Files
- `docs/DEEPSDIVE_ARCHITECTURE.md` — This document
- `bin/deepdive_aggregator.py` — Phase 1 source adapters
- `bin/deepdive_orchestrator.py` — Pipeline controller
## Blockers
| # | Item | Status |
|---|------|--------|
| 1 | TTS Service decision | **NEEDS DECISION** |
| 2 | `/deepdive` command registration | Pending |
**Ezra, Architect** — 2026-04-05

901
nexus/allegro_harness.py Normal file
View File

@@ -0,0 +1,901 @@
#!/usr/bin/env python3
"""
Allegro Harness — Hermes/OpenClaw harness backed by Kimi K2
A harness instance on Timmy's sovereign network, same pattern as Ezra,
Bezalel, and Gemini. Timmy is sovereign; Kimi is a worker.
Architecture:
Timmy (sovereign)
├── Ezra (harness)
├── Bezalel (harness)
├── Allegro (harness — this module)
└── Gemini (harness)
Features:
- Text generation, code generation
- Streaming responses
- Context caching for project context
- Model fallback: kimi-k2 → moonshot-v1-32k → moonshot-v1-8k
- Latency, token, and cost telemetry
- Hermes WebSocket registration
- HTTP endpoint for network access
Usage:
# As a standalone harness server:
python -m nexus.allegro_harness --serve
# Or imported:
from nexus.allegro_harness import AllegroHarness
harness = AllegroHarness()
response = harness.generate("Hello Timmy")
print(response.text)
Environment Variables:
KIMI_API_KEY — Kimi/Moonshot API key (from platform.moonshot.cn)
HERMES_WS_URL — Hermes WebSocket URL (default: ws://localhost:8000/ws)
KIMI_MODEL — Override default model
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Iterator, Optional, Union
import requests
log = logging.getLogger("allegro")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [allegro] %(message)s",
datefmt="%H:%M:%S",
)
# ═══════════════════════════════════════════════════════════════════════════
# MODEL CONFIGURATION
# ═══════════════════════════════════════════════════════════════════════════
# Model fallback chain: primary → secondary → tertiary
KIMI_MODEL_PRIMARY = "kimi-k2"
KIMI_MODEL_SECONDARY = "moonshot-v1-32k"
KIMI_MODEL_TERTIARY = "moonshot-v1-8k"
MODEL_FALLBACK_CHAIN = [
KIMI_MODEL_PRIMARY,
KIMI_MODEL_SECONDARY,
KIMI_MODEL_TERTIARY,
]
# Kimi/Moonshot API — OpenAI-compatible endpoint
KIMI_API_BASE = "https://api.moonshot.cn/v1"
# Approximate cost per 1M tokens (USD) — used for cost logging only
# Prices current as of April 2026; verify at platform.moonshot.cn
COST_PER_1M_INPUT = {
KIMI_MODEL_PRIMARY: 0.60,
KIMI_MODEL_SECONDARY: 0.24,
KIMI_MODEL_TERTIARY: 0.12,
}
COST_PER_1M_OUTPUT = {
KIMI_MODEL_PRIMARY: 2.50,
KIMI_MODEL_SECONDARY: 0.24,
KIMI_MODEL_TERTIARY: 0.12,
}
DEFAULT_HERMES_WS_URL = os.environ.get("HERMES_WS_URL", "ws://localhost:8000/ws")
HARNESS_ID = "allegro"
HARNESS_NAME = "Allegro Harness"
# Default HTTP server port for the Allegro gateway
DEFAULT_PORT = 9400
# ═══════════════════════════════════════════════════════════════════════════
# DATA CLASSES
# ═══════════════════════════════════════════════════════════════════════════
@dataclass
class AllegroResponse:
"""Response from an Allegro generate call."""
text: str = ""
model: str = ""
input_tokens: int = 0
output_tokens: int = 0
latency_ms: float = 0.0
cost_usd: float = 0.0
cached: bool = False
error: Optional[str] = None
timestamp: str = field(
default_factory=lambda: datetime.now(timezone.utc).isoformat()
)
def to_dict(self) -> dict:
return {
"text": self.text,
"model": self.model,
"input_tokens": self.input_tokens,
"output_tokens": self.output_tokens,
"latency_ms": self.latency_ms,
"cost_usd": self.cost_usd,
"cached": self.cached,
"error": self.error,
"timestamp": self.timestamp,
}
@dataclass
class ContextCache:
"""In-memory context cache for project context."""
cache_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
content: str = ""
created_at: float = field(default_factory=time.time)
hit_count: int = 0
ttl_seconds: float = 3600.0 # 1 hour default
def is_valid(self) -> bool:
return (time.time() - self.created_at) < self.ttl_seconds
def touch(self):
self.hit_count += 1
# ═══════════════════════════════════════════════════════════════════════════
# ALLEGRO HARNESS
# ═══════════════════════════════════════════════════════════════════════════
class AllegroHarness:
"""
Allegro harness for Timmy's sovereign network.
Acts as a Hermes/OpenClaw harness worker backed by the Kimi API.
Registers itself on the network at startup; accepts text and code
generation requests.
All calls flow through the fallback chain (kimi-k2 → moonshot-v1-32k →
moonshot-v1-8k) and emit latency/token/cost telemetry to Hermes.
Allegro is the top code performer in the fleet — optimized for bulk
code execution, PR generation, and BURN issue throughput.
"""
def __init__(
self,
api_key: Optional[str] = None,
model: Optional[str] = None,
hermes_ws_url: str = DEFAULT_HERMES_WS_URL,
context_ttl: float = 3600.0,
):
self.api_key = api_key or os.environ.get("KIMI_API_KEY", "")
self.model = model or os.environ.get("KIMI_MODEL", KIMI_MODEL_PRIMARY)
self.hermes_ws_url = hermes_ws_url
self.context_ttl = context_ttl
# Context cache (project context stored here to avoid re-sending)
self._context_cache: Optional[ContextCache] = None
# Session bookkeeping
self.session_id = str(uuid.uuid4())[:8]
self.request_count = 0
self.total_input_tokens = 0
self.total_output_tokens = 0
self.total_cost_usd = 0.0
# WebSocket connection (lazy — created on first telemetry send)
self._ws = None
self._ws_connected = False
if not self.api_key:
log.warning(
"KIMI_API_KEY not set — calls will fail. "
"Set it via environment variable or pass api_key=."
)
# ═══ LIFECYCLE ═══════════════════════════════════════════════════════
async def start(self):
"""Register harness on the network via Hermes WebSocket."""
log.info("=" * 50)
log.info(f"{HARNESS_NAME} — STARTING")
log.info(f" Session: {self.session_id}")
log.info(f" Model: {self.model}")
log.info(f" Hermes: {self.hermes_ws_url}")
log.info("=" * 50)
await self._connect_hermes()
await self._send_telemetry({
"type": "harness_register",
"harness_id": HARNESS_ID,
"session_id": self.session_id,
"model": self.model,
"fallback_chain": MODEL_FALLBACK_CHAIN,
"capabilities": ["text", "code", "streaming"],
})
log.info("Harness registered on network")
async def stop(self):
"""Deregister and disconnect."""
await self._send_telemetry({
"type": "harness_deregister",
"harness_id": HARNESS_ID,
"session_id": self.session_id,
"stats": self._session_stats(),
})
if self._ws:
try:
await self._ws.close()
except Exception:
pass
self._ws_connected = False
log.info(f"{HARNESS_NAME} stopped. {self._session_stats()}")
# ═══ CORE GENERATION ═════════════════════════════════════════════════
def generate(
self,
prompt: Union[str, list[dict]],
*,
system: Optional[str] = None,
use_cache: bool = True,
stream: bool = False,
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
) -> AllegroResponse:
"""
Generate a response from Kimi.
Tries the model fallback chain: kimi-k2 → moonshot-v1-32k → moonshot-v1-8k.
Injects cached context if available and use_cache=True.
Args:
prompt: String prompt or list of message dicts
(OpenAI-style: [{"role": "user", "content": "..."}])
system: Optional system instruction
use_cache: Prepend cached project context if set
stream: Return streaming response (prints to stdout)
max_tokens: Override default max output tokens
temperature: Sampling temperature (0.01.0)
Returns:
AllegroResponse with text, token counts, latency, cost
"""
if not self.api_key:
return AllegroResponse(error="KIMI_API_KEY not set")
messages = self._build_messages(prompt, system=system, use_cache=use_cache)
for model in MODEL_FALLBACK_CHAIN:
response = self._call_api(
model=model,
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
)
if response.error is None:
self._record(response)
return response
log.warning(f"Model {model} failed: {response.error} — trying next")
# All models failed
final = AllegroResponse(error="All models in fallback chain failed")
self._record(final)
return final
def generate_code(
self,
task: str,
language: str = "python",
context: Optional[str] = None,
) -> AllegroResponse:
"""
Specialized code generation call.
Args:
task: Natural language description of what to code
language: Target programming language
context: Optional code context (existing code, interfaces, etc.)
"""
system = (
f"You are an expert {language} programmer. "
"Produce clean, well-structured code. "
"Return only the code block, no explanation unless asked."
)
if context:
prompt = f"Context:\n```{language}\n{context}\n```\n\nTask: {task}"
else:
prompt = f"Task: {task}"
return self.generate(prompt, system=system)
def stream_generate(
self,
prompt: Union[str, list[dict]],
system: Optional[str] = None,
use_cache: bool = True,
) -> Iterator[str]:
"""
Stream text chunks from Kimi.
Yields string chunks as they arrive. Logs final telemetry when done.
Usage:
for chunk in harness.stream_generate("Tell me about Timmy"):
print(chunk, end="", flush=True)
"""
messages = self._build_messages(prompt, system=system, use_cache=use_cache)
for model in MODEL_FALLBACK_CHAIN:
try:
yield from self._stream_api(model=model, messages=messages)
return
except Exception as e:
log.warning(f"Stream: model {model} failed: {e}")
log.error("Stream: all models in fallback chain failed")
# ═══ HEALTH CHECK ═════════════════════════════════════════════════════
def health_check(self) -> dict:
"""
Verify Kimi API key validity with a minimal probe call.
Returns a dict with 'ok' bool and optional 'error' string.
"""
if not self.api_key:
return {"ok": False, "error": "KIMI_API_KEY not set"}
url = f"{KIMI_API_BASE}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": KIMI_MODEL_TERTIARY, # cheapest model for the probe
"messages": [{"role": "user", "content": "ping"}],
"max_tokens": 1,
}
try:
r = requests.post(url, json=payload, headers=headers, timeout=15)
if r.status_code == 200:
log.info("Kimi API key valid — health check passed")
return {"ok": True, "model": KIMI_MODEL_TERTIARY}
elif r.status_code == 401:
log.error("Kimi API key invalid (401 Unauthorized)")
return {"ok": False, "error": f"Invalid API key (HTTP 401)"}
else:
return {"ok": False, "error": f"HTTP {r.status_code}: {r.text[:100]}"}
except requests.Timeout:
return {"ok": False, "error": "Health check timed out (15s)"}
except Exception as e:
return {"ok": False, "error": str(e)}
# ═══ CONTEXT CACHING ═════════════════════════════════════════════════
def set_context(self, content: str, ttl_seconds: float = 3600.0):
"""
Cache project context to prepend on future calls.
Args:
content: Context text (project docs, code, instructions)
ttl_seconds: Cache TTL (default: 1 hour)
"""
self._context_cache = ContextCache(
content=content,
ttl_seconds=ttl_seconds,
)
log.info(
f"Context cached ({len(content)} chars, "
f"TTL={ttl_seconds}s, id={self._context_cache.cache_id})"
)
def clear_context(self):
"""Clear the cached project context."""
self._context_cache = None
log.info("Context cache cleared")
def context_status(self) -> dict:
"""Return cache status info."""
if not self._context_cache:
return {"cached": False}
return {
"cached": True,
"cache_id": self._context_cache.cache_id,
"valid": self._context_cache.is_valid(),
"hit_count": self._context_cache.hit_count,
"age_seconds": time.time() - self._context_cache.created_at,
"content_length": len(self._context_cache.content),
}
# ═══ INTERNAL: API CALLS ═════════════════════════════════════════════
def _call_api(
self,
model: str,
messages: list[dict],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
) -> AllegroResponse:
"""Make a single (non-streaming) call to the Kimi OpenAI-compat API."""
url = f"{KIMI_API_BASE}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload: dict[str, Any] = {
"model": model,
"messages": messages,
"stream": False,
}
if max_tokens is not None:
payload["max_tokens"] = max_tokens
if temperature is not None:
payload["temperature"] = temperature
t0 = time.time()
try:
r = requests.post(url, json=payload, headers=headers, timeout=120)
latency_ms = (time.time() - t0) * 1000
if r.status_code != 200:
return AllegroResponse(
model=model,
latency_ms=latency_ms,
error=f"HTTP {r.status_code}: {r.text[:200]}",
)
data = r.json()
choice = data.get("choices", [{}])[0]
text = choice.get("message", {}).get("content", "")
usage = data.get("usage", {})
input_tokens = usage.get("prompt_tokens", 0)
output_tokens = usage.get("completion_tokens", 0)
cost = self._estimate_cost(model, input_tokens, output_tokens)
return AllegroResponse(
text=text,
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
cost_usd=cost,
)
except requests.Timeout:
return AllegroResponse(
model=model,
latency_ms=(time.time() - t0) * 1000,
error="Request timed out (120s)",
)
except Exception as e:
return AllegroResponse(
model=model,
latency_ms=(time.time() - t0) * 1000,
error=str(e),
)
def _stream_api(
self,
model: str,
messages: list[dict],
max_tokens: Optional[int] = None,
temperature: Optional[float] = None,
) -> Iterator[str]:
"""Stream tokens from the Kimi OpenAI-compat API."""
url = f"{KIMI_API_BASE}/chat/completions"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload: dict[str, Any] = {
"model": model,
"messages": messages,
"stream": True,
}
if max_tokens is not None:
payload["max_tokens"] = max_tokens
if temperature is not None:
payload["temperature"] = temperature
t0 = time.time()
input_tokens = 0
output_tokens = 0
with requests.post(
url, json=payload, headers=headers, stream=True, timeout=120
) as r:
r.raise_for_status()
for raw_line in r.iter_lines():
if not raw_line:
continue
line = raw_line.decode("utf-8") if isinstance(raw_line, bytes) else raw_line
if not line.startswith("data: "):
continue
payload_str = line[6:]
if payload_str.strip() == "[DONE]":
break
try:
chunk = json.loads(payload_str)
delta = chunk.get("choices", [{}])[0].get("delta", {})
content = delta.get("content", "")
if content:
output_tokens += 1 # rough estimate
yield content
# Capture usage if present in final chunk
usage = chunk.get("usage", {})
if usage:
input_tokens = usage.get("prompt_tokens", input_tokens)
output_tokens = usage.get("completion_tokens", output_tokens)
except json.JSONDecodeError:
pass
latency_ms = (time.time() - t0) * 1000
cost = self._estimate_cost(model, input_tokens, output_tokens)
resp = AllegroResponse(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
cost_usd=cost,
)
self._record(resp)
# ═══ INTERNAL: HELPERS ═══════════════════════════════════════════════
def _build_messages(
self,
prompt: Union[str, list[dict]],
system: Optional[str] = None,
use_cache: bool = True,
) -> list[dict]:
"""Build the messages list, injecting cached context if applicable."""
messages: list[dict] = []
# System instruction
if system:
messages.append({"role": "system", "content": system})
# Cached context prepended as system memory
if use_cache and self._context_cache and self._context_cache.is_valid():
self._context_cache.touch()
messages.append({
"role": "system",
"content": f"[Project Context]\n{self._context_cache.content}",
})
# User message
if isinstance(prompt, str):
messages.append({"role": "user", "content": prompt})
else:
messages.extend(prompt)
return messages
@staticmethod
def _estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""Estimate USD cost from token counts."""
in_rate = COST_PER_1M_INPUT.get(model, 0.60)
out_rate = COST_PER_1M_OUTPUT.get(model, 2.50)
return (input_tokens * in_rate + output_tokens * out_rate) / 1_000_000
def _record(self, response: AllegroResponse):
"""Update session stats and emit telemetry for a completed response."""
self.request_count += 1
self.total_input_tokens += response.input_tokens
self.total_output_tokens += response.output_tokens
self.total_cost_usd += response.cost_usd
log.info(
f"[{response.model}] {response.latency_ms:.0f}ms | "
f"in={response.input_tokens} out={response.output_tokens} | "
f"${response.cost_usd:.6f}"
)
# Fire-and-forget telemetry (don't block the caller)
try:
asyncio.get_event_loop().create_task(
self._send_telemetry({
"type": "allegro_response",
"harness_id": HARNESS_ID,
"session_id": self.session_id,
"model": response.model,
"latency_ms": response.latency_ms,
"input_tokens": response.input_tokens,
"output_tokens": response.output_tokens,
"cost_usd": response.cost_usd,
"cached": response.cached,
"error": response.error,
})
)
except RuntimeError:
# No event loop running (sync context) — skip async telemetry
pass
def _session_stats(self) -> dict:
return {
"session_id": self.session_id,
"request_count": self.request_count,
"total_input_tokens": self.total_input_tokens,
"total_output_tokens": self.total_output_tokens,
"total_cost_usd": round(self.total_cost_usd, 6),
}
# ═══ HERMES WEBSOCKET ════════════════════════════════════════════════
async def _connect_hermes(self):
"""Connect to Hermes WebSocket for telemetry."""
try:
import websockets # type: ignore
self._ws = await websockets.connect(self.hermes_ws_url)
self._ws_connected = True
log.info(f"Connected to Hermes: {self.hermes_ws_url}")
except Exception as e:
log.warning(f"Hermes connection failed (telemetry disabled): {e}")
self._ws_connected = False
async def _send_telemetry(self, data: dict):
"""Send a telemetry event to Hermes."""
if not self._ws_connected or not self._ws:
return
try:
await self._ws.send(json.dumps(data))
except Exception as e:
log.warning(f"Telemetry send failed: {e}")
self._ws_connected = False
# ═══ SOVEREIGN ORCHESTRATION REGISTRATION ════════════════════════════
def register_in_orchestration(
self,
orchestration_url: str = "http://localhost:8000/api/v1/workers/register",
) -> bool:
"""
Register this harness as an available worker in sovereign orchestration.
Sends a POST to the orchestration endpoint with harness metadata.
Returns True on success.
"""
payload = {
"worker_id": HARNESS_ID,
"name": HARNESS_NAME,
"session_id": self.session_id,
"model": self.model,
"fallback_chain": MODEL_FALLBACK_CHAIN,
"capabilities": ["text", "code", "streaming"],
"transport": "http+ws",
"registered_at": datetime.now(timezone.utc).isoformat(),
}
try:
r = requests.post(orchestration_url, json=payload, timeout=10)
if r.status_code in (200, 201):
log.info(f"Registered in orchestration: {orchestration_url}")
return True
log.warning(
f"Orchestration registration returned {r.status_code}: {r.text[:100]}"
)
return False
except Exception as e:
log.warning(f"Orchestration registration failed: {e}")
return False
# ═══════════════════════════════════════════════════════════════════════════
# HTTP SERVER — expose harness to the network
# ═══════════════════════════════════════════════════════════════════════════
def create_app(harness: AllegroHarness):
"""
Create a minimal HTTP app that exposes the harness to the network.
Endpoints:
POST /generate — text/code generation
POST /generate/stream — streaming text generation
POST /generate/code — code generation
GET /health — health check (also validates Kimi API key)
GET /status — session stats + cache status
POST /context — set project context cache
DELETE /context — clear context cache
"""
from http.server import BaseHTTPRequestHandler, HTTPServer
class AllegroHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
log.info(f"HTTP {fmt % args}")
def _read_body(self) -> dict:
length = int(self.headers.get("Content-Length", 0))
raw = self.rfile.read(length) if length else b"{}"
return json.loads(raw)
def _send_json(self, data: dict, status: int = 200):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
if self.path == "/health":
result = harness.health_check()
status = 200 if result["ok"] else 503
self._send_json({**result, "harness": HARNESS_ID}, status)
elif self.path == "/status":
self._send_json({
**harness._session_stats(),
"model": harness.model,
"context": harness.context_status(),
})
else:
self._send_json({"error": "Not found"}, 404)
def do_POST(self):
body = self._read_body()
if self.path == "/generate":
prompt = body.get("prompt", "")
system = body.get("system")
use_cache = body.get("use_cache", True)
response = harness.generate(
prompt, system=system, use_cache=use_cache
)
self._send_json(response.to_dict())
elif self.path == "/generate/code":
task = body.get("task", "")
language = body.get("language", "python")
context = body.get("context")
response = harness.generate_code(task, language=language, context=context)
self._send_json(response.to_dict())
elif self.path == "/context":
content = body.get("content", "")
ttl = float(body.get("ttl_seconds", 3600.0))
harness.set_context(content, ttl_seconds=ttl)
self._send_json({"status": "cached", **harness.context_status()})
else:
self._send_json({"error": "Not found"}, 404)
def do_DELETE(self):
if self.path == "/context":
harness.clear_context()
self._send_json({"status": "cleared"})
else:
self._send_json({"error": "Not found"}, 404)
return HTTPServer, AllegroHandler
# ═══════════════════════════════════════════════════════════════════════════
# CLI ENTRYPOINT
# ═══════════════════════════════════════════════════════════════════════════
async def _async_start(harness: AllegroHarness):
await harness.start()
def main():
import argparse
parser = argparse.ArgumentParser(
description=f"{HARNESS_NAME} — Timmy's Kimi harness worker",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python -m nexus.allegro_harness "What is the meaning of sovereignty?"
python -m nexus.allegro_harness --model moonshot-v1-8k "Quick test"
python -m nexus.allegro_harness --serve --port 9400
python -m nexus.allegro_harness --code "Write a fizzbuzz in Python"
python -m nexus.allegro_harness --health
Environment Variables:
KIMI_API_KEY — required for all API calls
HERMES_WS_URL — Hermes telemetry endpoint
KIMI_MODEL — override default model
""",
)
parser.add_argument(
"prompt",
nargs="?",
default=None,
help="Prompt to send (omit to use --serve mode)",
)
parser.add_argument(
"--model",
default=None,
help=f"Model to use (default: {KIMI_MODEL_PRIMARY})",
)
parser.add_argument(
"--serve",
action="store_true",
help="Start HTTP server to expose harness on the network",
)
parser.add_argument(
"--port",
type=int,
default=DEFAULT_PORT,
help=f"HTTP server port (default: {DEFAULT_PORT})",
)
parser.add_argument(
"--hermes-ws",
default=DEFAULT_HERMES_WS_URL,
help=f"Hermes WebSocket URL (default: {DEFAULT_HERMES_WS_URL})",
)
parser.add_argument(
"--code",
metavar="TASK",
help="Generate code for TASK instead of plain text",
)
parser.add_argument(
"--stream",
action="store_true",
help="Stream response chunks to stdout",
)
parser.add_argument(
"--health",
action="store_true",
help="Run health check (verify Kimi API key) and exit",
)
args = parser.parse_args()
harness = AllegroHarness(
model=args.model,
hermes_ws_url=args.hermes_ws,
)
if args.health:
result = harness.health_check()
if result["ok"]:
print(f"OK — Kimi API key valid, model={result.get('model')}")
raise SystemExit(0)
else:
print(f"FAIL — {result.get('error')}")
raise SystemExit(1)
if args.serve:
# Start harness registration then serve HTTP
asyncio.run(_async_start(harness))
HTTPServer, AllegroHandler = create_app(harness)
server = HTTPServer(("0.0.0.0", args.port), AllegroHandler)
log.info(f"Serving on http://0.0.0.0:{args.port}")
log.info("Endpoints: /generate /generate/code /health /status /context")
try:
server.serve_forever()
except KeyboardInterrupt:
log.info("Shutting down server")
asyncio.run(harness.stop())
return
if args.code:
response = harness.generate_code(args.code)
elif args.prompt:
if args.stream:
for chunk in harness.stream_generate(args.prompt):
print(chunk, end="", flush=True)
print()
return
else:
response = harness.generate(args.prompt)
else:
parser.print_help()
return
if response.error:
print(f"ERROR: {response.error}")
else:
print(response.text)
print(
f"\n[{response.model}] {response.latency_ms:.0f}ms | "
f"tokens: {response.input_tokens}{response.output_tokens} | "
f"${response.cost_usd:.6f}",
flush=True,
)
if __name__ == "__main__":
main()