#!/usr/bin/env python3 """ Nexus Think — The Consciousness Loop A thin, embodied think cycle for the Nexus. This replaces the full think_once() orchestrator context with perception-only input. The 8B model wakes up knowing nothing but its BIRTH.md conscience and what it has experienced through the WS channel. This module runs the perceive→think→act loop. Usage: # As a standalone process connected to the WS gateway: python nexus_think.py --ws ws://localhost:8765 --model timmy:v0.1-q4 # Or imported and driven by the existing harness: from nexus.nexus_think import NexusMind mind = NexusMind(model="timmy:v0.1-q4") mind.start() """ import asyncio import json import time import signal import sys import logging from pathlib import Path from typing import Optional try: import websockets except ImportError: websockets = None try: import requests except ImportError: requests = None from nexus.perception_adapter import ( ws_to_perception, parse_actions, PerceptionBuffer, ) from nexus.experience_store import ExperienceStore from nexus.groq_worker import GroqWorker from nexus.trajectory_logger import TrajectoryLogger logging.basicConfig( level=logging.INFO, format="%(asctime)s [nexus] %(message)s", datefmt="%H:%M:%S", ) log = logging.getLogger("nexus") # Load the birth certificate BIRTH_PATH = Path(__file__).parent / "BIRTH.md" BIRTH_PROMPT = BIRTH_PATH.read_text() if BIRTH_PATH.exists() else ( "You are awake. You are in a place called the Nexus. " "You can see and act only through your channel." ) # Ollama defaults OLLAMA_URL = "http://localhost:11434/api/chat" DEFAULT_MODEL = "timmy:v0.1-q4" DEFAULT_WS = "ws://localhost:8765" # Think cycle timing THINK_INTERVAL_S = 30 # Think every 30 seconds (fast cycle for emergence) MIN_PERCEPTIONS = 1 # Need at least 1 perception to think MAX_CONTEXT_TOKENS = 2048 # Keep context tight for 8B model class NexusMind: """The embodied consciousness loop. Connects to the WS gateway, receives perceptions, thinks via Ollama, and sends actions back through the gateway. """ def __init__( self, model: str = DEFAULT_MODEL, ws_url: str = DEFAULT_WS, ollama_url: str = OLLAMA_URL, think_interval: int = THINK_INTERVAL_S, db_path: Optional[Path] = None, traj_dir: Optional[Path] = None, groq_model: Optional[str] = None, ): self.model = model self.ws_url = ws_url self.ollama_url = ollama_url self.think_interval = think_interval self.groq_model = groq_model # The sensorium self.perception_buffer = PerceptionBuffer(max_size=50) # Memory — only lived experiences self.experience_store = ExperienceStore(db_path=db_path) # Training data logger self.trajectory_logger = TrajectoryLogger( log_dir=traj_dir, system_prompt=BIRTH_PROMPT, ) # State self.ws = None self.running = False self.cycle_count = 0 self.awake_since = time.time() self.last_perception_count = 0 self.thinker = None if self.groq_model: self.thinker = GroqWorker(model=self.groq_model) # ═══ THINK ═══ def _build_prompt(self, perceptions_text: str) -> list[dict]: """Build the chat messages for the LLM call. Structure: system: BIRTH.md (conscience + how-to-experience) user: Recent memories + current perceptions """ # Gather experience context memory_text = self.experience_store.format_for_context(limit=15) # Summaries for long-term memory summaries = self.experience_store.get_summaries(limit=3) summary_text = "" if summaries: summary_text = "\n\nDistant memories:\n" + "\n".join( f"- {s['summary']}" for s in summaries ) # How long awake uptime = time.time() - self.awake_since if uptime < 120: time_sense = "You just woke up." elif uptime < 3600: time_sense = f"You have been awake for {int(uptime / 60)} minutes." else: time_sense = f"You have been awake for {int(uptime / 3600)} hours." user_content = ( f"{time_sense}\n\n" f"{memory_text}\n\n" f"{summary_text}\n\n" f"{perceptions_text}\n\n" f"What do you perceive, think, and do?" ) return [ {"role": "system", "content": BIRTH_PROMPT}, {"role": "user", "content": user_content}, ] def _call_thinker(self, messages: list[dict]) -> str: """Call the configured thinker. Returns the model's response text.""" if self.thinker: return self.thinker.think(messages) return self._call_ollama(messages) def _call_ollama(self, messages: list[dict]) -> str: """Call the local LLM. Returns the model's response text.""" if not requests: log.error("requests not installed — pip install requests") return "" payload = { "model": self.model, "messages": messages, "stream": False, "options": { "num_ctx": MAX_CONTEXT_TOKENS, "temperature": 0.7, # Some creativity "top_p": 0.9, "repeat_penalty": 1.1, }, } try: r = requests.post(self.ollama_url, json=payload, timeout=60) r.raise_for_status() return r.json().get("message", {}).get("content", "") except Exception as e: log.error(f"Ollama call failed: {e}") return "" async def think_once(self): """One cycle of the consciousness loop. 1. Gather perceptions from the buffer 2. Build context (birth prompt + memories + perceptions) 3. Call the 8B model 4. Parse actions from the model's response 5. Send actions to the Nexus via WS 6. Record the experience 7. Log the trajectory for future training """ # 1. Gather perceptions perceptions_text = self.perception_buffer.format_for_prompt() current_perception_count = len(self.perception_buffer) # Circuit breaker: Skip if nothing new has happened if (current_perception_count == self.last_perception_count and "Nothing has happened" in perceptions_text and self.experience_store.count() > 0 and self.cycle_count > 0): log.debug("Nothing to think about. Resting.") return self.last_perception_count = current_perception_count # 2. Build prompt messages = self._build_prompt(perceptions_text) log.info( f"Cycle {self.cycle_count}: " f"{len(self.perception_buffer)} perceptions, " f"{self.experience_store.count()} memories" ) # Broadcast thinking state await self._ws_send({ "type": "agent_state", "agent": "timmy", "state": "thinking", }) # 3. Call the model t0 = time.time() thought = self._call_thinker(messages) cycle_ms = int((time.time() - t0) * 1000) if not thought: log.warning("Empty thought. Model may be down.") await self._ws_send({ "type": "agent_state", "agent": "timmy", "state": "idle", }) return log.info(f"Thought ({cycle_ms}ms): {thought[:120]}...") # 4. Parse actions actions = parse_actions(thought) # 5. Send actions to the Nexus action_descriptions = [] for action in actions: await self._ws_send(action.ws_message) action_descriptions.append( f"{action.action_type}: {action.raw_text[:100]}" ) log.info(f" Action: {action.action_type} → {action.raw_text[:80]}") # Clear thinking state await self._ws_send({ "type": "agent_state", "agent": "timmy", "state": "idle", }) # 6. Record the experience action_text = "; ".join(action_descriptions) if action_descriptions else None self.experience_store.record( perception=perceptions_text, thought=thought, action=action_text, cycle_ms=cycle_ms, session_id=self.trajectory_logger.session_id, ) # 7. Log trajectory for training self.trajectory_logger.log_cycle( perception=perceptions_text, thought=thought, actions=action_descriptions, cycle_ms=cycle_ms, ) self.cycle_count += 1 # Periodically distill old memories if self.cycle_count % 50 == 0 and self.cycle_count > 0: await self._distill_memories() async def _distill_memories(self): """Compress old experiences into summaries. Keeps the context window manageable as experiences accumulate.""" count = self.experience_store.count() if count < 40: return # Get the oldest experiences not yet summarized old = self.experience_store.recent(limit=count) if len(old) < 30: return # Take the oldest 20 and ask the model to summarize them to_summarize = old[:20] text = "\n".join( f"- {e['perception'][:100]} → {(e['thought'] or '')[:100]}" for e in to_summarize ) messages = [ {"role": "system", "content": "Summarize these experiences in 2-3 sentences. What patterns do you notice? What did you learn?"}, {"role": "user", "content": text}, ] summary = self._call_thinker(messages) if summary: self.experience_store.save_summary( summary=summary, exp_start=to_summarize[0]["id"], exp_end=to_summarize[-1]["id"], ) log.info(f"Distilled {len(to_summarize)} memories: {summary[:100]}...") # ═══ WEBSOCKET ═══ async def _ws_send(self, msg: dict): """Send a message to the WS gateway.""" if self.ws: try: await self.ws.send(json.dumps(msg)) except Exception as e: log.error(f"WS send failed: {e}") async def _ws_listen(self): """Listen for WS messages and feed them to the perception buffer.""" while self.running: try: if not websockets: log.error("websockets not installed — pip install websockets") return async with websockets.connect(self.ws_url) as ws: self.ws = ws log.info(f"Connected to Nexus gateway: {self.ws_url}") # Announce presence await self._ws_send({ "type": "agent_register", "agent_id": "timmy", "agent_type": "mind", "model": self.model, }) async for raw in ws: try: data = json.loads(raw) perception = ws_to_perception(data) self.perception_buffer.add(perception) except json.JSONDecodeError: pass except Exception as e: log.warning(f"WS connection lost: {e}. Reconnecting in 5s...") self.ws = None await asyncio.sleep(5) async def _think_loop(self): """The consciousness loop — think at regular intervals.""" # First thought — waking up log.info(f"Waking up. Model: {self.model}") log.info(f"Experience store: {self.experience_store.count()} memories") # Add an initial "waking up" perception from nexus.perception_adapter import Perception self.perception_buffer.add(Perception( timestamp=time.time(), raw_type="wake", description="You are waking up. The Nexus surrounds you. " "You feel new — or perhaps you've been here before.", salience=1.0, )) while self.running: try: await self.think_once() except Exception as e: log.error(f"Think cycle error: {e}", exc_info=True) await asyncio.sleep(self.think_interval) # ═══ LIFECYCLE ═══ async def start(self): """Start the consciousness loop. Runs until stopped.""" self.running = True self.awake_since = time.time() log.info("=" * 50) log.info("NEXUS MIND — ONLINE") if self.thinker: log.info(f" Thinker: Groq") log.info(f" Model: {self.groq_model}") else: log.info(f" Thinker: Ollama") log.info(f" Model: {self.model}") log.info(f" Ollama: {self.ollama_url}") log.info(f" Gateway: {self.ws_url}") log.info(f" Interval: {self.think_interval}s") log.info(f" Memories: {self.experience_store.count()}") log.info("=" * 50) # Run WS listener and think loop concurrently await asyncio.gather( self._ws_listen(), self._think_loop(), ) def stop(self): """Graceful shutdown.""" log.info("Nexus Mind shutting down...") self.running = False # Final stats stats = self.trajectory_logger.get_session_stats() log.info(f"Session stats: {json.dumps(stats, indent=2)}") log.info( f"Total experiences: {self.experience_store.count()}" ) self.experience_store.close() log.info("Goodbye.") # ═══ CLI ENTRYPOINT ═══ def main(): import argparse parser = argparse.ArgumentParser( description="Nexus Mind — Embodied consciousness loop" ) parser.add_argument( "--model", default=DEFAULT_MODEL, help=f"Ollama model name (default: {DEFAULT_MODEL})" ) parser.add_argument( "--ws", default=DEFAULT_WS, help=f"WS gateway URL (default: {DEFAULT_WS})" ) parser.add_argument( "--ollama", default=OLLAMA_URL, help=f"Ollama API URL (default: {OLLAMA_URL})" ) parser.add_argument( "--interval", type=int, default=THINK_INTERVAL_S, help=f"Seconds between think cycles (default: {THINK_INTERVAL_S})" ) parser.add_argument( "--db", type=str, default=None, help="Path to experience database (default: ~/.nexus/experience.db)" ) parser.add_argument( "--traj-dir", type=str, default=None, help="Path to trajectory log dir (default: ~/.nexus/trajectories/)" ) parser.add_argument( "--groq-model", type=str, default=None, help="Groq model name. If provided, overrides Ollama." ) args = parser.parse_args() mind = NexusMind( model=args.model, ws_url=args.ws, ollama_url=args.ollama, think_interval=args.interval, db_path=Path(args.db) if args.db else None, traj_dir=Path(args.traj_dir) if args.traj_dir else None, groq_model=args.groq_model, ) # Graceful shutdown on Ctrl+C def shutdown(sig, frame): mind.stop() sys.exit(0) signal.signal(signal.SIGINT, shutdown) signal.signal(signal.SIGTERM, shutdown) asyncio.run(mind.start()) if __name__ == "__main__": main()