"""Heartbeat v2 — WorldInterface-driven cognitive loop. Drives real observe → reason → act → reflect cycles through whatever ``WorldInterface`` adapter is connected. When no adapter is present, gracefully falls back to the existing ``run_cycle()`` behaviour. Usage:: heartbeat = Heartbeat(world=adapter, interval=30.0) await heartbeat.run_once() # single cycle await heartbeat.start() # background loop heartbeat.stop() # graceful shutdown """ from __future__ import annotations import asyncio import logging import time from dataclasses import dataclass, field from datetime import UTC, datetime from loop.phase1_gather import gather from loop.phase2_reason import reason from loop.phase3_act import act from loop.schema import ContextPayload logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Cycle log entry # --------------------------------------------------------------------------- @dataclass class CycleRecord: """One observe → reason → act → reflect cycle.""" cycle_id: int timestamp: str observation: dict = field(default_factory=dict) reasoning_summary: str = "" action_taken: str = "" action_status: str = "" reflect_notes: str = "" duration_ms: int = 0 # --------------------------------------------------------------------------- # Heartbeat # --------------------------------------------------------------------------- class Heartbeat: """Manages the recurring cognitive loop with optional world adapter. Parameters ---------- world: A ``WorldInterface`` instance (or ``None`` for passive mode). interval: Seconds between heartbeat ticks. 30 s for embodied mode, 300 s (5 min) for passive thinking. on_cycle: Optional async callback invoked after each cycle with the ``CycleRecord``. """ def __init__( self, *, world=None, # WorldInterface | None interval: float = 30.0, on_cycle=None, # Callable[[CycleRecord], Awaitable[None]] | None ) -> None: self._world = world self._interval = interval self._on_cycle = on_cycle self._cycle_count: int = 0 self._running = False self._task: asyncio.Task | None = None self.history: list[CycleRecord] = [] # -- properties -------------------------------------------------------- @property def world(self): return self._world @world.setter def world(self, adapter) -> None: self._world = adapter @property def interval(self) -> float: return self._interval @interval.setter def interval(self, value: float) -> None: self._interval = max(1.0, value) @property def is_running(self) -> bool: return self._running @property def cycle_count(self) -> int: return self._cycle_count # -- single cycle ------------------------------------------------------ async def run_once(self) -> CycleRecord: """Execute one full heartbeat cycle. If a world adapter is present: 1. Observe — ``world.observe()`` 2. Gather + Reason + Act via the three-phase loop, with the observation injected into the payload 3. Dispatch the decided action back to ``world.act()`` 4. Reflect — log the cycle Without an adapter the existing loop runs on a timer-sourced payload (passive thinking). """ self._cycle_count += 1 start = time.monotonic() record = CycleRecord( cycle_id=self._cycle_count, timestamp=datetime.now(UTC).isoformat(), ) if self._world is not None: record = await self._embodied_cycle(record) else: record = await self._passive_cycle(record) record.duration_ms = int((time.monotonic() - start) * 1000) self.history.append(record) # Broadcast via WebSocket (best-effort) await self._broadcast(record) if self._on_cycle: await self._on_cycle(record) logger.info( "Heartbeat cycle #%d complete (%d ms) — action=%s status=%s", record.cycle_id, record.duration_ms, record.action_taken or "(passive)", record.action_status or "n/a", ) return record # -- background loop --------------------------------------------------- async def start(self) -> None: """Start the recurring heartbeat loop as a background task.""" if self._running: logger.warning("Heartbeat already running") return self._running = True self._task = asyncio.current_task() or asyncio.ensure_future(self._loop()) if self._task is not asyncio.current_task(): return await self._loop() async def _loop(self) -> None: logger.info( "Heartbeat loop started (interval=%.1fs, adapter=%s)", self._interval, type(self._world).__name__ if self._world else "None", ) while self._running: try: await self.run_once() except Exception: logger.exception("Heartbeat cycle failed") await asyncio.sleep(self._interval) def stop(self) -> None: """Signal the heartbeat loop to stop after the current cycle.""" self._running = False logger.info("Heartbeat stop requested") # -- internal: embodied cycle ------------------------------------------ async def _embodied_cycle(self, record: CycleRecord) -> CycleRecord: """Cycle with a live world adapter: observe → reason → act → reflect.""" from infrastructure.world.types import ActionStatus, CommandInput # 1. Observe perception = self._world.observe() record.observation = { "location": perception.location, "entities": perception.entities, "events": perception.events, } # 2. Feed observation into the three-phase loop obs_content = ( f"Location: {perception.location}\n" f"Entities: {', '.join(perception.entities)}\n" f"Events: {', '.join(perception.events)}" ) payload = ContextPayload( source="world", content=obs_content, metadata={"perception": record.observation}, ) gathered = gather(payload) reasoned = reason(gathered) acted = act(reasoned) # Extract action decision from the acted payload action_name = acted.metadata.get("action", "idle") action_target = acted.metadata.get("action_target") action_params = acted.metadata.get("action_params", {}) record.reasoning_summary = acted.metadata.get("reasoning", acted.content[:200]) # 3. Dispatch action to world if action_name != "idle": cmd = CommandInput( action=action_name, target=action_target, parameters=action_params, ) result = self._world.act(cmd) record.action_taken = action_name record.action_status = result.status.value else: record.action_taken = "idle" record.action_status = ActionStatus.NOOP.value # 4. Reflect record.reflect_notes = ( f"Observed {len(perception.entities)} entities at {perception.location}. " f"Action: {record.action_taken} → {record.action_status}." ) return record # -- internal: passive cycle ------------------------------------------- async def _passive_cycle(self, record: CycleRecord) -> CycleRecord: """Cycle without a world adapter — existing think_once() behaviour.""" payload = ContextPayload( source="timer", content="heartbeat", metadata={"mode": "passive"}, ) gathered = gather(payload) reasoned = reason(gathered) acted = act(reasoned) record.reasoning_summary = acted.content[:200] record.action_taken = "think" record.action_status = "noop" record.reflect_notes = "Passive thinking cycle — no world adapter connected." return record # -- broadcast --------------------------------------------------------- async def _broadcast(self, record: CycleRecord) -> None: """Emit heartbeat cycle data via WebSocket (best-effort).""" try: from infrastructure.ws_manager.handler import ws_manager await ws_manager.broadcast( "heartbeat.cycle", { "cycle_id": record.cycle_id, "timestamp": record.timestamp, "action": record.action_taken, "action_status": record.action_status, "reasoning_summary": record.reasoning_summary[:300], "observation": record.observation, "duration_ms": record.duration_ms, }, ) except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc: logger.debug("Heartbeat broadcast skipped: %s", exc)