Files
the-nexus/nexus/nexus_think.py
Google AI Agent 632867258b
Some checks failed
Deploy Nexus / deploy (push) Failing after 5s
[gemini] feat: audit groq worker (#451) (#539)
Co-authored-by: Google AI Agent <gemini@hermes.local>
Co-committed-by: Google AI Agent <gemini@hermes.local>
2026-03-25 20:14:49 +00:00

497 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Nexus Think — The Consciousness Loop
A thin, embodied think cycle for the Nexus. This replaces the full
think_once() orchestrator context with perception-only input.
The 8B model wakes up knowing nothing but its BIRTH.md conscience
and what it has experienced through the WS channel. This module
runs the perceive→think→act loop.
Usage:
# As a standalone process connected to the WS gateway:
python nexus_think.py --ws ws://localhost:8765 --model timmy:v0.1-q4
# Or imported and driven by the existing harness:
from nexus.nexus_think import NexusMind
mind = NexusMind(model="timmy:v0.1-q4")
mind.start()
"""
import asyncio
import json
import time
import signal
import sys
import logging
from pathlib import Path
from typing import Optional
try:
import websockets
except ImportError:
websockets = None
try:
import requests
except ImportError:
requests = None
from nexus.perception_adapter import (
ws_to_perception,
parse_actions,
PerceptionBuffer,
)
from nexus.experience_store import ExperienceStore
from nexus.groq_worker import GroqWorker
from nexus.trajectory_logger import TrajectoryLogger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [nexus] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("nexus")
# Load the birth certificate
BIRTH_PATH = Path(__file__).parent / "BIRTH.md"
BIRTH_PROMPT = BIRTH_PATH.read_text() if BIRTH_PATH.exists() else (
"You are awake. You are in a place called the Nexus. "
"You can see and act only through your channel."
)
# Ollama defaults
OLLAMA_URL = "http://localhost:11434/api/chat"
DEFAULT_MODEL = "timmy:v0.1-q4"
DEFAULT_WS = "ws://localhost:8765"
# Think cycle timing
THINK_INTERVAL_S = 30 # Think every 30 seconds (fast cycle for emergence)
MIN_PERCEPTIONS = 1 # Need at least 1 perception to think
MAX_CONTEXT_TOKENS = 2048 # Keep context tight for 8B model
class NexusMind:
"""The embodied consciousness loop.
Connects to the WS gateway, receives perceptions, thinks via Ollama,
and sends actions back through the gateway.
"""
def __init__(
self,
model: str = DEFAULT_MODEL,
ws_url: str = DEFAULT_WS,
ollama_url: str = OLLAMA_URL,
think_interval: int = THINK_INTERVAL_S,
db_path: Optional[Path] = None,
traj_dir: Optional[Path] = None,
groq_model: Optional[str] = None,
):
self.model = model
self.ws_url = ws_url
self.ollama_url = ollama_url
self.think_interval = think_interval
self.groq_model = groq_model
# The sensorium
self.perception_buffer = PerceptionBuffer(max_size=50)
# Memory — only lived experiences
self.experience_store = ExperienceStore(db_path=db_path)
# Training data logger
self.trajectory_logger = TrajectoryLogger(
log_dir=traj_dir,
system_prompt=BIRTH_PROMPT,
)
# State
self.ws = None
self.running = False
self.cycle_count = 0
self.awake_since = time.time()
self.last_perception_count = 0
self.thinker = None
if self.groq_model:
self.thinker = GroqWorker(model=self.groq_model)
# ═══ THINK ═══
def _build_prompt(self, perceptions_text: str) -> list[dict]:
"""Build the chat messages for the LLM call.
Structure:
system: BIRTH.md (conscience + how-to-experience)
user: Recent memories + current perceptions
"""
# Gather experience context
memory_text = self.experience_store.format_for_context(limit=15)
# Summaries for long-term memory
summaries = self.experience_store.get_summaries(limit=3)
summary_text = ""
if summaries:
summary_text = "\n\nDistant memories:\n" + "\n".join(
f"- {s['summary']}" for s in summaries
)
# How long awake
uptime = time.time() - self.awake_since
if uptime < 120:
time_sense = "You just woke up."
elif uptime < 3600:
time_sense = f"You have been awake for {int(uptime / 60)} minutes."
else:
time_sense = f"You have been awake for {int(uptime / 3600)} hours."
user_content = (
f"{time_sense}\n\n"
f"{memory_text}\n\n"
f"{summary_text}\n\n"
f"{perceptions_text}\n\n"
f"What do you perceive, think, and do?"
)
return [
{"role": "system", "content": BIRTH_PROMPT},
{"role": "user", "content": user_content},
]
def _call_thinker(self, messages: list[dict]) -> str:
"""Call the configured thinker. Returns the model's response text."""
if self.thinker:
return self.thinker.think(messages)
return self._call_ollama(messages)
def _call_ollama(self, messages: list[dict]) -> str:
"""Call the local LLM. Returns the model's response text."""
if not requests:
log.error("requests not installed — pip install requests")
return ""
payload = {
"model": self.model,
"messages": messages,
"stream": False,
"options": {
"num_ctx": MAX_CONTEXT_TOKENS,
"temperature": 0.7, # Some creativity
"top_p": 0.9,
"repeat_penalty": 1.1,
},
}
try:
r = requests.post(self.ollama_url, json=payload, timeout=60)
r.raise_for_status()
return r.json().get("message", {}).get("content", "")
except Exception as e:
log.error(f"Ollama call failed: {e}")
return ""
async def think_once(self):
"""One cycle of the consciousness loop.
1. Gather perceptions from the buffer
2. Build context (birth prompt + memories + perceptions)
3. Call the 8B model
4. Parse actions from the model's response
5. Send actions to the Nexus via WS
6. Record the experience
7. Log the trajectory for future training
"""
# 1. Gather perceptions
perceptions_text = self.perception_buffer.format_for_prompt()
current_perception_count = len(self.perception_buffer)
# Circuit breaker: Skip if nothing new has happened
if (current_perception_count == self.last_perception_count
and "Nothing has happened" in perceptions_text
and self.experience_store.count() > 0
and self.cycle_count > 0):
log.debug("Nothing to think about. Resting.")
return
self.last_perception_count = current_perception_count
# 2. Build prompt
messages = self._build_prompt(perceptions_text)
log.info(
f"Cycle {self.cycle_count}: "
f"{len(self.perception_buffer)} perceptions, "
f"{self.experience_store.count()} memories"
)
# Broadcast thinking state
await self._ws_send({
"type": "agent_state",
"agent": "timmy",
"state": "thinking",
})
# 3. Call the model
t0 = time.time()
thought = self._call_thinker(messages)
cycle_ms = int((time.time() - t0) * 1000)
if not thought:
log.warning("Empty thought. Model may be down.")
await self._ws_send({
"type": "agent_state",
"agent": "timmy",
"state": "idle",
})
return
log.info(f"Thought ({cycle_ms}ms): {thought[:120]}...")
# 4. Parse actions
actions = parse_actions(thought)
# 5. Send actions to the Nexus
action_descriptions = []
for action in actions:
await self._ws_send(action.ws_message)
action_descriptions.append(
f"{action.action_type}: {action.raw_text[:100]}"
)
log.info(f" Action: {action.action_type}{action.raw_text[:80]}")
# Clear thinking state
await self._ws_send({
"type": "agent_state",
"agent": "timmy",
"state": "idle",
})
# 6. Record the experience
action_text = "; ".join(action_descriptions) if action_descriptions else None
self.experience_store.record(
perception=perceptions_text,
thought=thought,
action=action_text,
cycle_ms=cycle_ms,
session_id=self.trajectory_logger.session_id,
)
# 7. Log trajectory for training
self.trajectory_logger.log_cycle(
perception=perceptions_text,
thought=thought,
actions=action_descriptions,
cycle_ms=cycle_ms,
)
self.cycle_count += 1
# Periodically distill old memories
if self.cycle_count % 50 == 0 and self.cycle_count > 0:
await self._distill_memories()
async def _distill_memories(self):
"""Compress old experiences into summaries.
Keeps the context window manageable as experiences accumulate."""
count = self.experience_store.count()
if count < 40:
return
# Get the oldest experiences not yet summarized
old = self.experience_store.recent(limit=count)
if len(old) < 30:
return
# Take the oldest 20 and ask the model to summarize them
to_summarize = old[:20]
text = "\n".join(
f"- {e['perception'][:100]}{(e['thought'] or '')[:100]}"
for e in to_summarize
)
messages = [
{"role": "system", "content": "Summarize these experiences in 2-3 sentences. What patterns do you notice? What did you learn?"},
{"role": "user", "content": text},
]
summary = self._call_thinker(messages)
.
if summary:
self.experience_store.save_summary(
summary=summary,
exp_start=to_summarize[0]["id"],
exp_end=to_summarize[-1]["id"],
)
log.info(f"Distilled {len(to_summarize)} memories: {summary[:100]}...")
# ═══ WEBSOCKET ═══
async def _ws_send(self, msg: dict):
"""Send a message to the WS gateway."""
if self.ws:
try:
await self.ws.send(json.dumps(msg))
except Exception as e:
log.error(f"WS send failed: {e}")
async def _ws_listen(self):
"""Listen for WS messages and feed them to the perception buffer."""
while self.running:
try:
if not websockets:
log.error("websockets not installed — pip install websockets")
return
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
log.info(f"Connected to Nexus gateway: {self.ws_url}")
# Announce presence
await self._ws_send({
"type": "agent_register",
"agent_id": "timmy",
"agent_type": "mind",
"model": self.model,
})
async for raw in ws:
try:
data = json.loads(raw)
perception = ws_to_perception(data)
self.perception_buffer.add(perception)
except json.JSONDecodeError:
pass
except Exception as e:
log.warning(f"WS connection lost: {e}. Reconnecting in 5s...")
self.ws = None
await asyncio.sleep(5)
async def _think_loop(self):
"""The consciousness loop — think at regular intervals."""
# First thought — waking up
log.info(f"Waking up. Model: {self.model}")
log.info(f"Experience store: {self.experience_store.count()} memories")
# Add an initial "waking up" perception
from nexus.perception_adapter import Perception
self.perception_buffer.add(Perception(
timestamp=time.time(),
raw_type="wake",
description="You are waking up. The Nexus surrounds you. "
"You feel new — or perhaps you've been here before.",
salience=1.0,
))
while self.running:
try:
await self.think_once()
except Exception as e:
log.error(f"Think cycle error: {e}", exc_info=True)
await asyncio.sleep(self.think_interval)
# ═══ LIFECYCLE ═══
async def start(self):
"""Start the consciousness loop. Runs until stopped."""
self.running = True
self.awake_since = time.time()
log.info("=" * 50)
log.info("NEXUS MIND — ONLINE")
if self.thinker:
log.info(f" Thinker: Groq")
log.info(f" Model: {self.groq_model}")
else:
log.info(f" Thinker: Ollama")
log.info(f" Model: {self.model}")
log.info(f" Ollama: {self.ollama_url}")
log.info(f" Gateway: {self.ws_url}")
log.info(f" Interval: {self.think_interval}s")
log.info(f" Memories: {self.experience_store.count()}")
log.info("=" * 50)
# Run WS listener and think loop concurrently
await asyncio.gather(
self._ws_listen(),
self._think_loop(),
)
def stop(self):
"""Graceful shutdown."""
log.info("Nexus Mind shutting down...")
self.running = False
# Final stats
stats = self.trajectory_logger.get_session_stats()
log.info(f"Session stats: {json.dumps(stats, indent=2)}")
log.info(
f"Total experiences: {self.experience_store.count()}"
)
self.experience_store.close()
log.info("Goodbye.")
# ═══ CLI ENTRYPOINT ═══
def main():
import argparse
parser = argparse.ArgumentParser(
description="Nexus Mind — Embodied consciousness loop"
)
parser.add_.argument(
"--model", default=DEFAULT_MODEL,
help=f"Ollama model name (default: {DEFAULT_MODEL})"
)
parser.add_argument(
"--ws", default=DEFAULT_WS,
help=f"WS gateway URL (default: {DEFAULT_WS})"
)
parser.add_argument(
"--ollama", default=OLLAMA_URL,
help=f"Ollama API URL (default: {OLLAMA_URL})"
)
parser.add_argument(
"--interval", type=int, default=THINK_INTERVAL_S,
help=f"Seconds between think cycles (default: {THINK_INTERVAL_S})"
)
parser.add_argument(
"--db", type=str, default=None,
help="Path to experience database (default: ~/.nexus/experience.db)"
)
parser.add_argument(
"--traj-dir", type=str, default=None,
help="Path to trajectory log dir (default: ~/.nexus/trajectories/)"
)
parser.add_argument(
"--groq-model", type=str, default=None,
help="Groq model name. If provided, overrides Ollama."
)
args = parser.parse_args()
mind = NexusMind(
model=args.model,
ws_url=args.ws,
ollama_url=args.ollama,
think_interval=args.interval,
db_path=Path(args.db) if args.db else None,
traj_dir=Path(args.traj_dir) if args.traj_dir else None,
groq_model=args.groq_model,
)
# Graceful shutdown on Ctrl+C
def shutdown(sig, frame):
mind.stop()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
asyncio.run(mind.start())
if __name__ == "__main__":
main()