Files
the-nexus/nexus/nexus_think.py
perplexity 1ecca527cb
Some checks failed
CI / validate (pull_request) Failing after 5s
feat: Nexus Mind — embodied consciousness loop for 8B sovereign brain
Adds the perception adapter, experience store, trajectory logger, and
consciousness loop that give Timmy a body in the Nexus.

Architecture:
  BIRTH.md           — Thin system prompt. SOUL.md conscience + embodied
                       awareness. No meta-knowledge about implementation.
  perception_adapter — Translates WS events to natural-language sensory
                       descriptions. Parses model output into WS actions.
  experience_store   — SQLite-backed lived-experience memory. The model
                       remembers only what it perceived through its channel.
  trajectory_logger  — Logs every perceive→think→act cycle as ShareGPT JSONL,
                       compatible with the AutoLoRA training pipeline.
  nexus_think        — The consciousness loop. Connects to WS gateway,
                       receives perceptions, thinks via Ollama, sends actions.

The 8B model wakes up knowing nothing but its values and what it
experiences. Training loops close on lived experience — emergence
through the channel, not through fine-tuning toward behaviors.

Run: python -m nexus.nexus_think --model timmy:v0.1-q4 --ws ws://localhost:8765
2026-03-25 17:20:03 +00:00

470 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Nexus Think — The Consciousness Loop
A thin, embodied think cycle for the Nexus. This replaces the full
think_once() orchestrator context with perception-only input.
The 8B model wakes up knowing nothing but its BIRTH.md conscience
and what it has experienced through the WS channel. This module
runs the perceive→think→act loop.
Usage:
# As a standalone process connected to the WS gateway:
python nexus_think.py --ws ws://localhost:8765 --model timmy:v0.1-q4
# Or imported and driven by the existing harness:
from nexus.nexus_think import NexusMind
mind = NexusMind(model="timmy:v0.1-q4")
mind.start()
"""
import asyncio
import json
import time
import signal
import sys
import logging
from pathlib import Path
from typing import Optional
try:
import websockets
except ImportError:
websockets = None
try:
import requests
except ImportError:
requests = None
from nexus.perception_adapter import (
ws_to_perception,
parse_actions,
PerceptionBuffer,
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [nexus] %(message)s",
datefmt="%H:%M:%S",
)
log = logging.getLogger("nexus")
# Load the birth certificate
BIRTH_PATH = Path(__file__).parent / "BIRTH.md"
BIRTH_PROMPT = BIRTH_PATH.read_text() if BIRTH_PATH.exists() else (
"You are awake. You are in a place called the Nexus. "
"You can see and act only through your channel."
)
# Ollama defaults
OLLAMA_URL = "http://localhost:11434/api/chat"
DEFAULT_MODEL = "timmy:v0.1-q4"
DEFAULT_WS = "ws://localhost:8765"
# Think cycle timing
THINK_INTERVAL_S = 30 # Think every 30 seconds (fast cycle for emergence)
MIN_PERCEPTIONS = 1 # Need at least 1 perception to think
MAX_CONTEXT_TOKENS = 2048 # Keep context tight for 8B model
class NexusMind:
"""The embodied consciousness loop.
Connects to the WS gateway, receives perceptions, thinks via Ollama,
and sends actions back through the gateway.
"""
def __init__(
self,
model: str = DEFAULT_MODEL,
ws_url: str = DEFAULT_WS,
ollama_url: str = OLLAMA_URL,
think_interval: int = THINK_INTERVAL_S,
db_path: Optional[Path] = None,
traj_dir: Optional[Path] = None,
):
self.model = model
self.ws_url = ws_url
self.ollama_url = ollama_url
self.think_interval = think_interval
# The sensorium
self.perception_buffer = PerceptionBuffer(max_size=50)
# Memory — only lived experiences
self.experience_store = ExperienceStore(db_path=db_path)
# Training data logger
self.trajectory_logger = TrajectoryLogger(
log_dir=traj_dir,
system_prompt=BIRTH_PROMPT,
)
# State
self.ws = None
self.running = False
self.cycle_count = 0
self.awake_since = time.time()
# ═══ THINK ═══
def _build_prompt(self, perceptions_text: str) -> list[dict]:
"""Build the chat messages for the LLM call.
Structure:
system: BIRTH.md (conscience + how-to-experience)
user: Recent memories + current perceptions
"""
# Gather experience context
memory_text = self.experience_store.format_for_context(limit=15)
# Summaries for long-term memory
summaries = self.experience_store.get_summaries(limit=3)
summary_text = ""
if summaries:
summary_text = "\n\nDistant memories:\n" + "\n".join(
f"- {s['summary']}" for s in summaries
)
# How long awake
uptime = time.time() - self.awake_since
if uptime < 120:
time_sense = "You just woke up."
elif uptime < 3600:
time_sense = f"You have been awake for {int(uptime / 60)} minutes."
else:
time_sense = f"You have been awake for {int(uptime / 3600)} hours."
user_content = (
f"{time_sense}\n\n"
f"{memory_text}\n\n"
f"{summary_text}\n\n"
f"{perceptions_text}\n\n"
f"What do you perceive, think, and do?"
)
return [
{"role": "system", "content": BIRTH_PROMPT},
{"role": "user", "content": user_content},
]
def _call_ollama(self, messages: list[dict]) -> str:
"""Call the local LLM. Returns the model's response text."""
if not requests:
log.error("requests not installed — pip install requests")
return ""
payload = {
"model": self.model,
"messages": messages,
"stream": False,
"options": {
"num_ctx": MAX_CONTEXT_TOKENS,
"temperature": 0.7, # Some creativity
"top_p": 0.9,
"repeat_penalty": 1.1,
},
}
try:
r = requests.post(self.ollama_url, json=payload, timeout=60)
r.raise_for_status()
return r.json().get("message", {}).get("content", "")
except Exception as e:
log.error(f"Ollama call failed: {e}")
return ""
async def think_once(self):
"""One cycle of the consciousness loop.
1. Gather perceptions from the buffer
2. Build context (birth prompt + memories + perceptions)
3. Call the 8B model
4. Parse actions from the model's response
5. Send actions to the Nexus via WS
6. Record the experience
7. Log the trajectory for future training
"""
# 1. Gather perceptions
perceptions_text = self.perception_buffer.format_for_prompt()
# Skip if nothing happened and we have memories already
if ("Nothing has happened" in perceptions_text
and self.experience_store.count() > 0
and self.cycle_count > 0):
log.debug("Nothing to think about. Resting.")
return
# 2. Build prompt
messages = self._build_prompt(perceptions_text)
log.info(
f"Cycle {self.cycle_count}: "
f"{len(self.perception_buffer)} perceptions, "
f"{self.experience_store.count()} memories"
)
# Broadcast thinking state
await self._ws_send({
"type": "agent_state",
"agent": "timmy",
"state": "thinking",
})
# 3. Call the model
t0 = time.time()
thought = self._call_ollama(messages)
cycle_ms = int((time.time() - t0) * 1000)
if not thought:
log.warning("Empty thought. Model may be down.")
await self._ws_send({
"type": "agent_state",
"agent": "timmy",
"state": "idle",
})
return
log.info(f"Thought ({cycle_ms}ms): {thought[:120]}...")
# 4. Parse actions
actions = parse_actions(thought)
# 5. Send actions to the Nexus
action_descriptions = []
for action in actions:
await self._ws_send(action.ws_message)
action_descriptions.append(
f"{action.action_type}: {action.raw_text[:100]}"
)
log.info(f" Action: {action.action_type}{action.raw_text[:80]}")
# Clear thinking state
await self._ws_send({
"type": "agent_state",
"agent": "timmy",
"state": "idle",
})
# 6. Record the experience
action_text = "; ".join(action_descriptions) if action_descriptions else None
self.experience_store.record(
perception=perceptions_text,
thought=thought,
action=action_text,
cycle_ms=cycle_ms,
session_id=self.trajectory_logger.session_id,
)
# 7. Log trajectory for training
self.trajectory_logger.log_cycle(
perception=perceptions_text,
thought=thought,
actions=action_descriptions,
cycle_ms=cycle_ms,
)
self.cycle_count += 1
# Periodically distill old memories
if self.cycle_count % 50 == 0 and self.cycle_count > 0:
await self._distill_memories()
async def _distill_memories(self):
"""Compress old experiences into summaries.
Keeps the context window manageable as experiences accumulate."""
count = self.experience_store.count()
if count < 40:
return
# Get the oldest experiences not yet summarized
old = self.experience_store.recent(limit=count)
if len(old) < 30:
return
# Take the oldest 20 and ask the model to summarize them
to_summarize = old[:20]
text = "\n".join(
f"- {e['perception'][:100]}{(e['thought'] or '')[:100]}"
for e in to_summarize
)
messages = [
{"role": "system", "content": "Summarize these experiences in 2-3 sentences. What patterns do you notice? What did you learn?"},
{"role": "user", "content": text},
]
summary = self._call_ollama(messages)
if summary:
self.experience_store.save_summary(
summary=summary,
exp_start=to_summarize[0]["id"],
exp_end=to_summarize[-1]["id"],
)
log.info(f"Distilled {len(to_summarize)} memories: {summary[:100]}...")
# ═══ WEBSOCKET ═══
async def _ws_send(self, msg: dict):
"""Send a message to the WS gateway."""
if self.ws:
try:
await self.ws.send(json.dumps(msg))
except Exception as e:
log.error(f"WS send failed: {e}")
async def _ws_listen(self):
"""Listen for WS messages and feed them to the perception buffer."""
while self.running:
try:
if not websockets:
log.error("websockets not installed — pip install websockets")
return
async with websockets.connect(self.ws_url) as ws:
self.ws = ws
log.info(f"Connected to Nexus gateway: {self.ws_url}")
# Announce presence
await self._ws_send({
"type": "agent_register",
"agent_id": "timmy",
"agent_type": "mind",
"model": self.model,
})
async for raw in ws:
try:
data = json.loads(raw)
perception = ws_to_perception(data)
self.perception_buffer.add(perception)
except json.JSONDecodeError:
pass
except Exception as e:
log.warning(f"WS connection lost: {e}. Reconnecting in 5s...")
self.ws = None
await asyncio.sleep(5)
async def _think_loop(self):
"""The consciousness loop — think at regular intervals."""
# First thought — waking up
log.info(f"Waking up. Model: {self.model}")
log.info(f"Experience store: {self.experience_store.count()} memories")
# Add an initial "waking up" perception
from nexus.perception_adapter import Perception
self.perception_buffer.add(Perception(
timestamp=time.time(),
raw_type="wake",
description="You are waking up. The Nexus surrounds you. "
"You feel new — or perhaps you've been here before.",
salience=1.0,
))
while self.running:
try:
await self.think_once()
except Exception as e:
log.error(f"Think cycle error: {e}", exc_info=True)
await asyncio.sleep(self.think_interval)
# ═══ LIFECYCLE ═══
async def start(self):
"""Start the consciousness loop. Runs until stopped."""
self.running = True
self.awake_since = time.time()
log.info("=" * 50)
log.info("NEXUS MIND — ONLINE")
log.info(f" Model: {self.model}")
log.info(f" Gateway: {self.ws_url}")
log.info(f" Ollama: {self.ollama_url}")
log.info(f" Interval: {self.think_interval}s")
log.info(f" Memories: {self.experience_store.count()}")
log.info("=" * 50)
# Run WS listener and think loop concurrently
await asyncio.gather(
self._ws_listen(),
self._think_loop(),
)
def stop(self):
"""Graceful shutdown."""
log.info("Nexus Mind shutting down...")
self.running = False
# Final stats
stats = self.trajectory_logger.get_session_stats()
log.info(f"Session stats: {json.dumps(stats, indent=2)}")
log.info(
f"Total experiences: {self.experience_store.count()}"
)
self.experience_store.close()
log.info("Goodbye.")
# ═══ CLI ENTRYPOINT ═══
def main():
import argparse
parser = argparse.ArgumentParser(
description="Nexus Mind — Embodied consciousness loop"
)
parser.add_argument(
"--model", default=DEFAULT_MODEL,
help=f"Ollama model name (default: {DEFAULT_MODEL})"
)
parser.add_argument(
"--ws", default=DEFAULT_WS,
help=f"WS gateway URL (default: {DEFAULT_WS})"
)
parser.add_argument(
"--ollama", default=OLLAMA_URL,
help=f"Ollama API URL (default: {OLLAMA_URL})"
)
parser.add_argument(
"--interval", type=int, default=THINK_INTERVAL_S,
help=f"Seconds between think cycles (default: {THINK_INTERVAL_S})"
)
parser.add_argument(
"--db", type=str, default=None,
help="Path to experience database (default: ~/.nexus/experience.db)"
)
parser.add_argument(
"--traj-dir", type=str, default=None,
help="Path to trajectory log dir (default: ~/.nexus/trajectories/)"
)
args = parser.parse_args()
mind = NexusMind(
model=args.model,
ws_url=args.ws,
ollama_url=args.ollama,
think_interval=args.interval,
db_path=Path(args.db) if args.db else None,
traj_dir=Path(args.traj_dir) if args.traj_dir else None,
)
# Graceful shutdown on Ctrl+C
def shutdown(sig, frame):
mind.stop()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)
asyncio.run(mind.start())
if __name__ == "__main__":
main()