Compare commits

..

2 Commits

Author SHA1 Message Date
hermes
7f13f540b3 fix: loop_guard GITEA_API default + queue.json validation guard (#951, #952) 2026-03-22 13:51:59 -04:00
bebbe442b4 feat: WorldInterface + Heartbeat v2 (#871, #872) (#900)
Some checks failed
Tests / lint (push) Has been cancelled
Tests / test (push) Has been cancelled
Co-authored-by: Perplexity Computer <perplexity@tower.local>
Co-committed-by: Perplexity Computer <perplexity@tower.local>
2026-03-22 13:44:49 +00:00
29 changed files with 1275 additions and 318 deletions

View File

@@ -30,7 +30,7 @@ IDLE_STATE_FILE = REPO_ROOT / ".loop" / "idle_state.json"
CYCLE_RESULT_FILE = REPO_ROOT / ".loop" / "cycle_result.json"
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
# Default cycle duration in seconds (5 min); stale threshold = 2× this

View File

@@ -1,56 +0,0 @@
import json
import urllib.request
import urllib.error
import os
BASE_URL = "http://143.198.27.163:3000/api/v1"
issues = [
{"title": "LHF: Fix 4 broken tests in test_setup_script.py", "body": "Add @pytest.mark.skip_ci or mock fixtures to stop these environment-specific script tests from failing CI."},
{"title": "LHF: Fix xdist and coverage conflict in pyproject.toml", "body": "The -n auto --dist worksteal arguments conflict with --cov flags during make test-cov. Reposition these flags so xdist and coverage play nicely."},
{"title": "LHF: Separate tox unit and integration environments", "body": "They currently alias the same command. Ensure `tox -e unit` uses `-m unit` and `tox -e integration` uses `-m integration`."},
{"title": "LHF: Add duration and coverage threshold strictness to pytest", "body": "Add `--durations=10` and `--cov-fail-under=60` directly to the tool.pytest.ini_options addopts or CI pipeline."},
{"title": "LHF: Enforce coverage threshold in CI workflow", "body": "Update .github/workflows/tests.yml to fail if coverage drops below the 60% floor threshold."},
{"title": "LHF: Extract hardcoded PRAGMA busy_timeout=5000", "body": "Move the SQLite busy_timeout hardcode to pydantic-settings config.py for better environment control."},
{"title": "LHF: Extract hardcoded sats limit in consult_grok()", "body": "The hardcoded sats limit for the grok L402 proxy should be controlled via config.py environment variables."},
{"title": "LHF: Remove bare pass clauses in src/timmy/tools.py", "body": "Logged exceptions should not be followed by bare `pass` clauses if they silently swallow critical tool errors. Refactor to return an error string or raise gracefully."},
{"title": "LHF: Add docstrings to src/dashboard/routes/tasks.py", "body": "Add proper module-level and function-level docstrings to all public methods."},
{"title": "LHF: Add docstrings to src/dashboard/routes/system.py", "body": "Add proper module-level and function-level docstrings to the system configuration endpoints."},
{"title": "LHF: Add docstrings to VoiceTTS setter methods", "body": "Document `set_rate()`, `set_volume()`, and `set_voice()` parameters and bounds."},
{"title": "LHF: DRY up tasks_pending/active/completed in tasks.py", "body": "Refactor and extract the shared filtering logic for these three similar list-filtering functions."},
{"title": "LHF: Add error handling for missing DB in tasks.py", "body": "If swarm.db is locked or missing, tasks.py endpoints currently crash. Add a try/except pattern matching the graceful degradation specs."},
{"title": "LHF: Write unit tests for db_pool.py", "body": "The SQLite connection pool infrastructure needs dedicated unit tests ensuring that connections do not leak and pragmas are applied."},
{"title": "LHF: Write unit tests for health.py", "body": "The health check route needs tests to ensure it correctly aggregates subsystem states (Ollama, Redis, DB) without blocking the event loop."}
]
def main():
token_path = os.path.join(os.getcwd(), ".antigravity_gitea_token")
if not os.path.exists(token_path):
print("Missing token.")
return
with open(token_path, "r") as f:
token = f.read().strip()
repo_owner = "rockachopa"
repo_name = "Timmy-time-dashboard"
count = 0
for i, issue in enumerate(issues):
print(f"Creating LHF issue {i+1}: {issue['title']}")
url = f"{BASE_URL}/repos/{repo_owner}/{repo_name}/issues"
payload = json.dumps(issue).encode("utf-8")
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Authorization", f"token {token}")
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req) as resp:
if resp.status == 201:
count += 1
print(f" -> Success")
except urllib.error.HTTPError as e:
print(f" -> Failed: {e.code} {e.read().decode('utf-8')}")
print(f"Created {count}/{len(issues)} LHF issues.")
if __name__ == '__main__':
main()

View File

@@ -20,7 +20,7 @@ from datetime import datetime, timezone
from pathlib import Path
# ── Config ──────────────────────────────────────────────────────────────
GITEA_API = os.environ.get("GITEA_API", "http://localhost:3000/api/v1")
GITEA_API = os.environ.get("GITEA_API", "http://143.198.27.163:3000/api/v1")
REPO_SLUG = os.environ.get("REPO_SLUG", "rockachopa/Timmy-time-dashboard")
TOKEN_FILE = Path.home() / ".hermes" / "gitea_token"
REPO_ROOT = Path(__file__).resolve().parent.parent
@@ -327,7 +327,31 @@ def run_triage() -> list[dict]:
not_ready = [s for s in scored if not s["ready"]]
QUEUE_FILE.parent.mkdir(parents=True, exist_ok=True)
QUEUE_FILE.write_text(json.dumps(ready, indent=2) + "\n")
backup_file = QUEUE_FILE.with_suffix(".json.bak")
# Backup existing queue before overwriting
if QUEUE_FILE.exists():
try:
backup_file.write_text(QUEUE_FILE.read_text())
except OSError:
pass
# Write and validate
queue_json = json.dumps(ready, indent=2) + "\n"
QUEUE_FILE.write_text(queue_json)
# Validate by re-reading — restore backup on corruption
try:
validated = json.loads(QUEUE_FILE.read_text())
if not isinstance(validated, list):
raise ValueError("queue.json is not a list")
except (json.JSONDecodeError, ValueError) as e:
print(f"[triage] ERROR: queue.json validation failed: {e}", file=sys.stderr)
if backup_file.exists():
print("[triage] Restoring from backup", file=sys.stderr)
QUEUE_FILE.write_text(backup_file.read_text())
else:
QUEUE_FILE.write_text("[]\n")
# Write retro entry
retro_entry = {

View File

@@ -56,13 +56,11 @@ async def self_modify_queue(request: Request):
@router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request):
"""Render the primary swarm mission control terminal."""
return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request):
"""Render the systemic bugs and issue tracking page."""
return templates.TemplateResponse(
request,
"bugs.html",
@@ -77,19 +75,16 @@ async def bugs_page(request: Request):
@router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request):
"""Render the self-coding and modifications statistics page."""
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request):
"""Render the physical 'hands' tracking page for environment-interacting agents."""
return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request):
"""Render the creative/producer studio UI for image and media generation."""
return templates.TemplateResponse(request, "creative.html", {})

View File

@@ -104,29 +104,25 @@ class _TaskView:
@router.get("/tasks", response_class=HTMLResponse)
async def tasks_page(request: Request):
"""Render the main task queue page with 3-column layout."""
pending, active, completed = [], [], []
try:
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
except sqlite3.Error as e:
logger.error("Database error rendering tasks_page: %s", e)
with _get_db() as db:
pending = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('pending_approval') ORDER BY created_at DESC"
).fetchall()
]
active = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
]
completed = [
_TaskView(_row_to_dict(r))
for r in db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
]
return templates.TemplateResponse(
request,
@@ -147,45 +143,61 @@ async def tasks_page(request: Request):
# ---------------------------------------------------------------------------
def _render_task_list(request: Request, query: str, empty_message: str) -> HTMLResponse:
"""Helper to fetch tasks from DB and render HTML partials safely, handling DB errors."""
try:
with _get_db() as db:
rows = db.execute(query).fetchall()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
if not tasks:
return HTMLResponse(f'<div class="empty-column">{empty_message}</div>')
parts = [
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
).fetchall()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
parts.append(
templates.TemplateResponse(
request, "partials/task_card.html", {"task": task}
).body.decode()
for task in tasks
]
return HTMLResponse("".join(parts))
except sqlite3.Error as e:
logger.error("Database error fetching tasks: %s", e)
return HTMLResponse('<div class="empty-column error">Database unavailable</div>')
@router.get("/tasks/pending", response_class=HTMLResponse)
async def tasks_pending(request: Request):
"""HTMX partial rendering the list of pending tasks."""
query = "SELECT * FROM tasks WHERE status='pending_approval' ORDER BY created_at DESC"
return _render_task_list(request, query, "No pending tasks")
)
if not parts:
return HTMLResponse('<div class="empty-column">No pending tasks</div>')
return HTMLResponse("".join(parts))
@router.get("/tasks/active", response_class=HTMLResponse)
async def tasks_active(request: Request):
"""HTMX partial rendering the list of active tasks."""
query = "SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
return _render_task_list(request, query, "No active tasks")
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('approved','running','paused') ORDER BY created_at DESC"
).fetchall()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
parts.append(
templates.TemplateResponse(
request, "partials/task_card.html", {"task": task}
).body.decode()
)
if not parts:
return HTMLResponse('<div class="empty-column">No active tasks</div>')
return HTMLResponse("".join(parts))
@router.get("/tasks/completed", response_class=HTMLResponse)
async def tasks_completed(request: Request):
"""HTMX partial rendering the list of completed tasks."""
query = "SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
return _render_task_list(request, query, "No completed tasks yet")
with _get_db() as db:
rows = db.execute(
"SELECT * FROM tasks WHERE status IN ('completed','vetoed','failed') ORDER BY completed_at DESC LIMIT 50"
).fetchall()
tasks = [_TaskView(_row_to_dict(r)) for r in rows]
parts = []
for task in tasks:
parts.append(
templates.TemplateResponse(
request, "partials/task_card.html", {"task": task}
).body.decode()
)
if not parts:
return HTMLResponse('<div class="empty-column">No completed tasks yet</div>')
return HTMLResponse("".join(parts))
# ---------------------------------------------------------------------------
@@ -229,31 +241,26 @@ async def create_task_form(
@router.post("/tasks/{task_id}/approve", response_class=HTMLResponse)
async def approve_task(request: Request, task_id: str):
"""Approve a task for execution."""
return await _set_status(request, task_id, "approved")
@router.post("/tasks/{task_id}/veto", response_class=HTMLResponse)
async def veto_task(request: Request, task_id: str):
"""Veto a task to prevent execution."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/pause", response_class=HTMLResponse)
async def pause_task(request: Request, task_id: str):
"""Pause an active task."""
return await _set_status(request, task_id, "paused")
@router.post("/tasks/{task_id}/cancel", response_class=HTMLResponse)
async def cancel_task(request: Request, task_id: str):
"""Cancel a task, moving it to vetoed state."""
return await _set_status(request, task_id, "vetoed")
@router.post("/tasks/{task_id}/retry", response_class=HTMLResponse)
async def retry_task(request: Request, task_id: str):
"""Retry a failed or completed task by re-approving it."""
return await _set_status(request, task_id, "approved")
@@ -264,7 +271,6 @@ async def modify_task(
title: str = Form(...),
description: str = Form(""),
):
"""Modify the title and/or description of a specific task."""
with _get_db() as db:
db.execute(
"UPDATE tasks SET title=?, description=? WHERE id=?",

View File

@@ -16,8 +16,6 @@ from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
@@ -104,7 +102,7 @@ class EventBus:
self._persistence_db_path.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.execute("PRAGMA busy_timeout=5000")
conn.executescript(_EVENTS_SCHEMA)
conn.commit()
@@ -116,7 +114,7 @@ class EventBus:
return
with closing(sqlite3.connect(str(self._persistence_db_path))) as conn:
conn.row_factory = sqlite3.Row
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.execute("PRAGMA busy_timeout=5000")
yield conn
def _persist_event(self, event: Event) -> None:

View File

@@ -18,8 +18,6 @@ from datetime import UTC, datetime
from enum import StrEnum
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
DB_PATH = Path("data/swarm.db")
@@ -70,7 +68,7 @@ def _get_conn() -> Generator[sqlite3.Connection, None, None]:
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.execute("PRAGMA busy_timeout=5000")
conn.execute("""
CREATE TABLE IF NOT EXISTS custom_models (
name TEXT PRIMARY KEY,

View File

@@ -0,0 +1,29 @@
"""World interface — engine-agnostic adapter pattern for embodied agents.
Provides the ``WorldInterface`` ABC and an adapter registry so Timmy can
observe, act, and speak in any game world (Morrowind, Luanti, Godot, …)
through a single contract.
Quick start::
from infrastructure.world import get_adapter, register_adapter
from infrastructure.world.interface import WorldInterface
register_adapter("mock", MockWorldAdapter)
world = get_adapter("mock")
perception = world.observe()
"""
from infrastructure.world.registry import AdapterRegistry
_registry = AdapterRegistry()
register_adapter = _registry.register
get_adapter = _registry.get
list_adapters = _registry.list_adapters
__all__ = [
"register_adapter",
"get_adapter",
"list_adapters",
]

View File

@@ -0,0 +1 @@
"""Built-in world adapters."""

View File

@@ -0,0 +1,99 @@
"""Mock world adapter — returns canned perception and logs commands.
Useful for testing the heartbeat loop and WorldInterface contract
without a running game server.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
logger = logging.getLogger(__name__)
@dataclass
class _ActionLog:
"""Record of an action dispatched to the mock world."""
command: CommandInput
timestamp: datetime
class MockWorldAdapter(WorldInterface):
"""In-memory mock adapter for testing.
* ``observe()`` returns configurable canned perception.
* ``act()`` logs the command and returns success.
* ``speak()`` logs the message.
Inspect ``action_log`` and ``speech_log`` to verify behaviour in tests.
"""
def __init__(
self,
*,
location: str = "Test Chamber",
entities: list[str] | None = None,
events: list[str] | None = None,
) -> None:
self._location = location
self._entities = entities or ["TestNPC"]
self._events = events or []
self._connected = False
self.action_log: list[_ActionLog] = []
self.speech_log: list[dict] = []
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
self._connected = True
logger.info("MockWorldAdapter connected")
def disconnect(self) -> None:
self._connected = False
logger.info("MockWorldAdapter disconnected")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract -----------------------------------------------------
def observe(self) -> PerceptionOutput:
logger.debug("MockWorldAdapter.observe()")
return PerceptionOutput(
timestamp=datetime.now(UTC),
location=self._location,
entities=list(self._entities),
events=list(self._events),
raw={"adapter": "mock"},
)
def act(self, command: CommandInput) -> ActionResult:
logger.debug("MockWorldAdapter.act(%s)", command.action)
self.action_log.append(_ActionLog(command=command, timestamp=datetime.now(UTC)))
return ActionResult(
status=ActionStatus.SUCCESS,
message=f"Mock executed: {command.action}",
data={"adapter": "mock"},
)
def speak(self, message: str, target: str | None = None) -> None:
logger.debug("MockWorldAdapter.speak(%r, target=%r)", message, target)
self.speech_log.append(
{
"message": message,
"target": target,
"timestamp": datetime.now(UTC).isoformat(),
}
)

View File

@@ -0,0 +1,58 @@
"""TES3MP world adapter — stub for Morrowind multiplayer via TES3MP.
This adapter will eventually connect to a TES3MP server and translate
the WorldInterface contract into TES3MP commands. For now every method
raises ``NotImplementedError`` with guidance on what needs wiring up.
Once PR #864 merges, import PerceptionOutput and CommandInput directly
from ``infrastructure.morrowind.schemas`` if their shapes differ from
the canonical types in ``infrastructure.world.types``.
"""
from __future__ import annotations
import logging
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
logger = logging.getLogger(__name__)
class TES3MPWorldAdapter(WorldInterface):
"""Stub adapter for TES3MP (Morrowind multiplayer).
All core methods raise ``NotImplementedError``.
Implement ``connect()`` first — it should open a socket to the
TES3MP server and authenticate.
"""
def __init__(self, *, host: str = "localhost", port: int = 25565) -> None:
self._host = host
self._port = port
self._connected = False
# -- lifecycle ---------------------------------------------------------
def connect(self) -> None:
raise NotImplementedError("TES3MPWorldAdapter.connect() — wire up TES3MP server socket")
def disconnect(self) -> None:
raise NotImplementedError("TES3MPWorldAdapter.disconnect() — close TES3MP server socket")
@property
def is_connected(self) -> bool:
return self._connected
# -- core contract (stubs) ---------------------------------------------
def observe(self) -> PerceptionOutput:
raise NotImplementedError("TES3MPWorldAdapter.observe() — poll TES3MP for player/NPC state")
def act(self, command: CommandInput) -> ActionResult:
raise NotImplementedError(
"TES3MPWorldAdapter.act() — translate CommandInput to TES3MP packet"
)
def speak(self, message: str, target: str | None = None) -> None:
raise NotImplementedError("TES3MPWorldAdapter.speak() — send chat message via TES3MP")

View File

@@ -0,0 +1,64 @@
"""Abstract WorldInterface — the contract every game-world adapter must fulfil.
Follows a Gymnasium-inspired pattern: observe → act → speak, with each
method returning strongly-typed data structures.
Any future engine (TES3MP, Luanti, Godot, …) plugs in by subclassing
``WorldInterface`` and implementing the three methods.
"""
from __future__ import annotations
from abc import ABC, abstractmethod
from infrastructure.world.types import ActionResult, CommandInput, PerceptionOutput
class WorldInterface(ABC):
"""Engine-agnostic base class for world adapters.
Subclasses must implement:
- ``observe()`` — gather structured perception from the world
- ``act()`` — dispatch a command and return the outcome
- ``speak()`` — send a message to an NPC / player / broadcast
Lifecycle hooks ``connect()`` and ``disconnect()`` are optional.
"""
# -- lifecycle (optional overrides) ------------------------------------
def connect(self) -> None: # noqa: B027
"""Establish connection to the game world.
Default implementation is a no-op. Override to open sockets,
authenticate, etc.
"""
def disconnect(self) -> None: # noqa: B027
"""Tear down the connection.
Default implementation is a no-op.
"""
@property
def is_connected(self) -> bool:
"""Return ``True`` if the adapter has an active connection.
Default returns ``True``. Override for adapters that maintain
persistent connections.
"""
return True
# -- core contract (must implement) ------------------------------------
@abstractmethod
def observe(self) -> PerceptionOutput:
"""Return a structured snapshot of the current world state."""
@abstractmethod
def act(self, command: CommandInput) -> ActionResult:
"""Execute *command* in the world and return the result."""
@abstractmethod
def speak(self, message: str, target: str | None = None) -> None:
"""Send *message* in the world, optionally directed at *target*."""

View File

@@ -0,0 +1,54 @@
"""Adapter registry — register and instantiate world adapters by name.
Usage::
registry = AdapterRegistry()
registry.register("mock", MockWorldAdapter)
adapter = registry.get("mock", some_kwarg="value")
"""
from __future__ import annotations
import logging
from typing import Any
from infrastructure.world.interface import WorldInterface
logger = logging.getLogger(__name__)
class AdapterRegistry:
"""Name → WorldInterface class registry with instantiation."""
def __init__(self) -> None:
self._adapters: dict[str, type[WorldInterface]] = {}
def register(self, name: str, cls: type[WorldInterface]) -> None:
"""Register an adapter class under *name*.
Raises ``TypeError`` if *cls* is not a ``WorldInterface`` subclass.
"""
if not (isinstance(cls, type) and issubclass(cls, WorldInterface)):
raise TypeError(f"{cls!r} is not a WorldInterface subclass")
if name in self._adapters:
logger.warning("Overwriting adapter %r (was %r)", name, self._adapters[name])
self._adapters[name] = cls
logger.info("Registered world adapter: %s%s", name, cls.__name__)
def get(self, name: str, **kwargs: Any) -> WorldInterface:
"""Instantiate and return the adapter registered as *name*.
Raises ``KeyError`` if *name* is not registered.
"""
cls = self._adapters[name]
return cls(**kwargs)
def list_adapters(self) -> list[str]:
"""Return sorted list of registered adapter names."""
return sorted(self._adapters)
def __contains__(self, name: str) -> bool:
return name in self._adapters
def __len__(self) -> int:
return len(self._adapters)

View File

@@ -0,0 +1,71 @@
"""Canonical data types for world interaction.
These mirror the PerceptionOutput / CommandInput types from PR #864's
``morrowind/schemas.py``. When that PR merges, these can be replaced
with re-exports — but until then they serve as the stable contract for
every WorldInterface adapter.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import StrEnum
class ActionStatus(StrEnum):
"""Outcome of an action dispatched to the world."""
SUCCESS = "success"
FAILURE = "failure"
PENDING = "pending"
NOOP = "noop"
@dataclass
class PerceptionOutput:
"""Structured world state returned by ``WorldInterface.observe()``.
Attributes:
timestamp: When the observation was captured.
location: Free-form location descriptor (e.g. "Balmora, Fighters Guild").
entities: List of nearby entity descriptions.
events: Recent game events since last observation.
raw: Optional raw / engine-specific payload for advanced consumers.
"""
timestamp: datetime = field(default_factory=lambda: datetime.now(UTC))
location: str = ""
entities: list[str] = field(default_factory=list)
events: list[str] = field(default_factory=list)
raw: dict = field(default_factory=dict)
@dataclass
class CommandInput:
"""Action command sent via ``WorldInterface.act()``.
Attributes:
action: Verb / action name (e.g. "move", "attack", "use_item").
target: Optional target identifier.
parameters: Arbitrary key-value payload for engine-specific params.
"""
action: str
target: str | None = None
parameters: dict = field(default_factory=dict)
@dataclass
class ActionResult:
"""Outcome returned by ``WorldInterface.act()``.
Attributes:
status: Whether the action succeeded, failed, etc.
message: Human-readable description of the outcome.
data: Arbitrary engine-specific result payload.
"""
status: ActionStatus = ActionStatus.SUCCESS
message: str = ""
data: dict = field(default_factory=dict)

286
src/loop/heartbeat.py Normal file
View File

@@ -0,0 +1,286 @@
"""Heartbeat v2 — WorldInterface-driven cognitive loop.
Drives real observe → reason → act → reflect cycles through whatever
``WorldInterface`` adapter is connected. When no adapter is present,
gracefully falls back to the existing ``run_cycle()`` behaviour.
Usage::
heartbeat = Heartbeat(world=adapter, interval=30.0)
await heartbeat.run_once() # single cycle
await heartbeat.start() # background loop
heartbeat.stop() # graceful shutdown
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from loop.phase1_gather import gather
from loop.phase2_reason import reason
from loop.phase3_act import act
from loop.schema import ContextPayload
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle log entry
# ---------------------------------------------------------------------------
@dataclass
class CycleRecord:
"""One observe → reason → act → reflect cycle."""
cycle_id: int
timestamp: str
observation: dict = field(default_factory=dict)
reasoning_summary: str = ""
action_taken: str = ""
action_status: str = ""
reflect_notes: str = ""
duration_ms: int = 0
# ---------------------------------------------------------------------------
# Heartbeat
# ---------------------------------------------------------------------------
class Heartbeat:
"""Manages the recurring cognitive loop with optional world adapter.
Parameters
----------
world:
A ``WorldInterface`` instance (or ``None`` for passive mode).
interval:
Seconds between heartbeat ticks. 30 s for embodied mode,
300 s (5 min) for passive thinking.
on_cycle:
Optional async callback invoked after each cycle with the
``CycleRecord``.
"""
def __init__(
self,
*,
world=None, # WorldInterface | None
interval: float = 30.0,
on_cycle=None, # Callable[[CycleRecord], Awaitable[None]] | None
) -> None:
self._world = world
self._interval = interval
self._on_cycle = on_cycle
self._cycle_count: int = 0
self._running = False
self._task: asyncio.Task | None = None
self.history: list[CycleRecord] = []
# -- properties --------------------------------------------------------
@property
def world(self):
return self._world
@world.setter
def world(self, adapter) -> None:
self._world = adapter
@property
def interval(self) -> float:
return self._interval
@interval.setter
def interval(self, value: float) -> None:
self._interval = max(1.0, value)
@property
def is_running(self) -> bool:
return self._running
@property
def cycle_count(self) -> int:
return self._cycle_count
# -- single cycle ------------------------------------------------------
async def run_once(self) -> CycleRecord:
"""Execute one full heartbeat cycle.
If a world adapter is present:
1. Observe — ``world.observe()``
2. Gather + Reason + Act via the three-phase loop, with the
observation injected into the payload
3. Dispatch the decided action back to ``world.act()``
4. Reflect — log the cycle
Without an adapter the existing loop runs on a timer-sourced
payload (passive thinking).
"""
self._cycle_count += 1
start = time.monotonic()
record = CycleRecord(
cycle_id=self._cycle_count,
timestamp=datetime.now(UTC).isoformat(),
)
if self._world is not None:
record = await self._embodied_cycle(record)
else:
record = await self._passive_cycle(record)
record.duration_ms = int((time.monotonic() - start) * 1000)
self.history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
if self._on_cycle:
await self._on_cycle(record)
logger.info(
"Heartbeat cycle #%d complete (%d ms) — action=%s status=%s",
record.cycle_id,
record.duration_ms,
record.action_taken or "(passive)",
record.action_status or "n/a",
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring heartbeat loop as a background task."""
if self._running:
logger.warning("Heartbeat already running")
return
self._running = True
self._task = asyncio.current_task() or asyncio.ensure_future(self._loop())
if self._task is not asyncio.current_task():
return
await self._loop()
async def _loop(self) -> None:
logger.info(
"Heartbeat loop started (interval=%.1fs, adapter=%s)",
self._interval,
type(self._world).__name__ if self._world else "None",
)
while self._running:
try:
await self.run_once()
except Exception:
logger.exception("Heartbeat cycle failed")
await asyncio.sleep(self._interval)
def stop(self) -> None:
"""Signal the heartbeat loop to stop after the current cycle."""
self._running = False
logger.info("Heartbeat stop requested")
# -- internal: embodied cycle ------------------------------------------
async def _embodied_cycle(self, record: CycleRecord) -> CycleRecord:
"""Cycle with a live world adapter: observe → reason → act → reflect."""
from infrastructure.world.types import ActionStatus, CommandInput
# 1. Observe
perception = self._world.observe()
record.observation = {
"location": perception.location,
"entities": perception.entities,
"events": perception.events,
}
# 2. Feed observation into the three-phase loop
obs_content = (
f"Location: {perception.location}\n"
f"Entities: {', '.join(perception.entities)}\n"
f"Events: {', '.join(perception.events)}"
)
payload = ContextPayload(
source="world",
content=obs_content,
metadata={"perception": record.observation},
)
gathered = gather(payload)
reasoned = reason(gathered)
acted = act(reasoned)
# Extract action decision from the acted payload
action_name = acted.metadata.get("action", "idle")
action_target = acted.metadata.get("action_target")
action_params = acted.metadata.get("action_params", {})
record.reasoning_summary = acted.metadata.get("reasoning", acted.content[:200])
# 3. Dispatch action to world
if action_name != "idle":
cmd = CommandInput(
action=action_name,
target=action_target,
parameters=action_params,
)
result = self._world.act(cmd)
record.action_taken = action_name
record.action_status = result.status.value
else:
record.action_taken = "idle"
record.action_status = ActionStatus.NOOP.value
# 4. Reflect
record.reflect_notes = (
f"Observed {len(perception.entities)} entities at {perception.location}. "
f"Action: {record.action_taken}{record.action_status}."
)
return record
# -- internal: passive cycle -------------------------------------------
async def _passive_cycle(self, record: CycleRecord) -> CycleRecord:
"""Cycle without a world adapter — existing think_once() behaviour."""
payload = ContextPayload(
source="timer",
content="heartbeat",
metadata={"mode": "passive"},
)
gathered = gather(payload)
reasoned = reason(gathered)
acted = act(reasoned)
record.reasoning_summary = acted.content[:200]
record.action_taken = "think"
record.action_status = "noop"
record.reflect_notes = "Passive thinking cycle — no world adapter connected."
return record
# -- broadcast ---------------------------------------------------------
async def _broadcast(self, record: CycleRecord) -> None:
"""Emit heartbeat cycle data via WebSocket (best-effort)."""
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"heartbeat.cycle",
{
"cycle_id": record.cycle_id,
"timestamp": record.timestamp,
"action": record.action_taken,
"action_status": record.action_status,
"reasoning_summary": record.reasoning_summary[:300],
"observation": record.observation,
"duration_ms": record.duration_ms,
},
)
except (ImportError, AttributeError, ConnectionError, RuntimeError) as exc:
logger.debug("Heartbeat broadcast skipped: %s", exc)

View File

@@ -17,9 +17,9 @@ logger = logging.getLogger(__name__)
def gather(payload: ContextPayload) -> ContextPayload:
"""Accept raw input and return structured context for reasoning.
Stub: tags the payload with phase=gather and logs transit.
Timmy will flesh this out with context selection, memory lookup,
adapter polling, and attention-residual weighting.
When the payload carries a ``perception`` dict in metadata (injected by
the heartbeat loop from a WorldInterface adapter), that observation is
folded into the gathered context. Otherwise behaves as before.
"""
logger.info(
"Phase 1 (Gather) received: source=%s content_len=%d tokens=%d",
@@ -28,7 +28,20 @@ def gather(payload: ContextPayload) -> ContextPayload:
payload.token_count,
)
result = payload.with_metadata(phase="gather", gathered=True)
extra: dict = {"phase": "gather", "gathered": True}
# Enrich with world observation when present
perception = payload.metadata.get("perception")
if perception:
extra["world_observation"] = perception
logger.info(
"Phase 1 (Gather) world observation: location=%s entities=%d events=%d",
perception.get("location", "?"),
len(perception.get("entities", [])),
len(perception.get("events", [])),
)
result = payload.with_metadata(**extra)
logger.info(
"Phase 1 (Gather) produced: metadata_keys=%s",

View File

@@ -22,8 +22,6 @@ from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
DB_PATH = Path("data/spark.db")
@@ -49,7 +47,7 @@ def _get_conn() -> Generator[sqlite3.Connection, None, None]:
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.execute("PRAGMA busy_timeout=5000")
conn.execute("""
CREATE TABLE IF NOT EXISTS spark_predictions (
id TEXT PRIMARY KEY,

View File

@@ -19,8 +19,6 @@ from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from config import settings
logger = logging.getLogger(__name__)
DB_PATH = Path("data/spark.db")
@@ -65,7 +63,7 @@ def _get_conn() -> Generator[sqlite3.Connection, None, None]:
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute(f"PRAGMA busy_timeout={settings.db_busy_timeout_ms}")
conn.execute("PRAGMA busy_timeout=5000")
conn.execute("""
CREATE TABLE IF NOT EXISTS spark_events (
id TEXT PRIMARY KEY,

View File

@@ -68,13 +68,11 @@ class VoiceTTS:
logger.error("VoiceTTS: speech failed — %s", exc)
def set_rate(self, rate: int) -> None:
"""Set the speech rate in words per minute."""
self._rate = rate
if self._engine:
self._engine.setProperty("rate", rate)
def set_volume(self, volume: float) -> None:
"""Set the speech volume (0.0 to 1.0)."""
self._volume = max(0.0, min(1.0, volume))
if self._engine:
self._engine.setProperty("volume", self._volume)
@@ -94,7 +92,6 @@ class VoiceTTS:
return []
def set_voice(self, voice_id: str) -> None:
"""Set the active voice by its system ID."""
if self._engine:
self._engine.setProperty("voice", voice_id)

View File

@@ -20,9 +20,6 @@ pytestmark = pytest.mark.skipif(
@pytest.fixture(scope="module", autouse=True)
def setup_prod_env():
"""Ensure a clean environment and run the full installation."""
if not SETUP_SCRIPT_PATH.exists():
pytest.skip(f"Setup script not found at {SETUP_SCRIPT_PATH}")
if PROD_PROJECT_DIR.exists():
shutil.rmtree(PROD_PROJECT_DIR)

View File

View File

@@ -0,0 +1,129 @@
"""Tests for the WorldInterface contract and type system."""
import pytest
from infrastructure.world.interface import WorldInterface
from infrastructure.world.types import (
ActionResult,
ActionStatus,
CommandInput,
PerceptionOutput,
)
# ---------------------------------------------------------------------------
# Type construction
# ---------------------------------------------------------------------------
class TestPerceptionOutput:
def test_defaults(self):
p = PerceptionOutput()
assert p.location == ""
assert p.entities == []
assert p.events == []
assert p.raw == {}
assert p.timestamp is not None
def test_custom_values(self):
p = PerceptionOutput(
location="Balmora",
entities=["Guard", "Merchant"],
events=["door_opened"],
)
assert p.location == "Balmora"
assert len(p.entities) == 2
assert "door_opened" in p.events
class TestCommandInput:
def test_minimal(self):
c = CommandInput(action="move")
assert c.action == "move"
assert c.target is None
assert c.parameters == {}
def test_with_target_and_params(self):
c = CommandInput(action="attack", target="Rat", parameters={"weapon": "sword"})
assert c.target == "Rat"
assert c.parameters["weapon"] == "sword"
class TestActionResult:
def test_defaults(self):
r = ActionResult()
assert r.status == ActionStatus.SUCCESS
assert r.message == ""
def test_failure(self):
r = ActionResult(status=ActionStatus.FAILURE, message="blocked")
assert r.status == ActionStatus.FAILURE
class TestActionStatus:
def test_values(self):
assert ActionStatus.SUCCESS.value == "success"
assert ActionStatus.FAILURE.value == "failure"
assert ActionStatus.PENDING.value == "pending"
assert ActionStatus.NOOP.value == "noop"
# ---------------------------------------------------------------------------
# Abstract contract
# ---------------------------------------------------------------------------
class TestWorldInterfaceContract:
"""Verify the ABC cannot be instantiated directly."""
def test_cannot_instantiate(self):
with pytest.raises(TypeError):
WorldInterface()
def test_subclass_must_implement_observe(self):
class Incomplete(WorldInterface):
def act(self, command):
pass
def speak(self, message, target=None):
pass
with pytest.raises(TypeError):
Incomplete()
def test_subclass_must_implement_act(self):
class Incomplete(WorldInterface):
def observe(self):
return PerceptionOutput()
def speak(self, message, target=None):
pass
with pytest.raises(TypeError):
Incomplete()
def test_subclass_must_implement_speak(self):
class Incomplete(WorldInterface):
def observe(self):
return PerceptionOutput()
def act(self, command):
return ActionResult()
with pytest.raises(TypeError):
Incomplete()
def test_complete_subclass_instantiates(self):
class Complete(WorldInterface):
def observe(self):
return PerceptionOutput()
def act(self, command):
return ActionResult()
def speak(self, message, target=None):
pass
adapter = Complete()
assert adapter.is_connected is True # default
assert isinstance(adapter.observe(), PerceptionOutput)
assert isinstance(adapter.act(CommandInput(action="test")), ActionResult)

View File

@@ -0,0 +1,80 @@
"""Tests for the MockWorldAdapter — full observe/act/speak cycle."""
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.types import ActionStatus, CommandInput, PerceptionOutput
class TestMockWorldAdapter:
def test_observe_returns_perception(self):
adapter = MockWorldAdapter(location="Vivec")
perception = adapter.observe()
assert isinstance(perception, PerceptionOutput)
assert perception.location == "Vivec"
assert perception.raw == {"adapter": "mock"}
def test_observe_entities(self):
adapter = MockWorldAdapter(entities=["Jiub", "Silt Strider"])
perception = adapter.observe()
assert perception.entities == ["Jiub", "Silt Strider"]
def test_act_logs_command(self):
adapter = MockWorldAdapter()
cmd = CommandInput(action="move", target="north")
result = adapter.act(cmd)
assert result.status == ActionStatus.SUCCESS
assert "move" in result.message
assert len(adapter.action_log) == 1
assert adapter.action_log[0].command.action == "move"
def test_act_multiple_commands(self):
adapter = MockWorldAdapter()
adapter.act(CommandInput(action="attack"))
adapter.act(CommandInput(action="defend"))
adapter.act(CommandInput(action="retreat"))
assert len(adapter.action_log) == 3
def test_speak_logs_message(self):
adapter = MockWorldAdapter()
adapter.speak("Hello, traveler!")
assert len(adapter.speech_log) == 1
assert adapter.speech_log[0]["message"] == "Hello, traveler!"
assert adapter.speech_log[0]["target"] is None
def test_speak_with_target(self):
adapter = MockWorldAdapter()
adapter.speak("Die, scum!", target="Cliff Racer")
assert adapter.speech_log[0]["target"] == "Cliff Racer"
def test_lifecycle(self):
adapter = MockWorldAdapter()
assert adapter.is_connected is False
adapter.connect()
assert adapter.is_connected is True
adapter.disconnect()
assert adapter.is_connected is False
def test_full_observe_act_speak_cycle(self):
"""Acceptance criterion: full observe/act/speak cycle passes."""
adapter = MockWorldAdapter(
location="Seyda Neen",
entities=["Fargoth", "Hrisskar"],
events=["quest_started"],
)
adapter.connect()
# Observe
perception = adapter.observe()
assert perception.location == "Seyda Neen"
assert len(perception.entities) == 2
assert "quest_started" in perception.events
# Act
result = adapter.act(CommandInput(action="talk", target="Fargoth"))
assert result.status == ActionStatus.SUCCESS
# Speak
adapter.speak("Where is your ring, Fargoth?", target="Fargoth")
assert len(adapter.speech_log) == 1
adapter.disconnect()
assert adapter.is_connected is False

View File

@@ -0,0 +1,68 @@
"""Tests for the adapter registry."""
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.registry import AdapterRegistry
class TestAdapterRegistry:
def test_register_and_get(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
adapter = reg.get("mock")
assert isinstance(adapter, MockWorldAdapter)
def test_register_with_kwargs(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
adapter = reg.get("mock", location="Custom Room")
assert adapter._location == "Custom Room"
def test_get_unknown_raises(self):
reg = AdapterRegistry()
with pytest.raises(KeyError):
reg.get("nonexistent")
def test_register_non_subclass_raises(self):
reg = AdapterRegistry()
with pytest.raises(TypeError):
reg.register("bad", dict)
def test_list_adapters(self):
reg = AdapterRegistry()
reg.register("beta", MockWorldAdapter)
reg.register("alpha", MockWorldAdapter)
assert reg.list_adapters() == ["alpha", "beta"]
def test_contains(self):
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
assert "mock" in reg
assert "other" not in reg
def test_len(self):
reg = AdapterRegistry()
assert len(reg) == 0
reg.register("mock", MockWorldAdapter)
assert len(reg) == 1
def test_overwrite_warns(self, caplog):
import logging
reg = AdapterRegistry()
reg.register("mock", MockWorldAdapter)
with caplog.at_level(logging.WARNING):
reg.register("mock", MockWorldAdapter)
assert "Overwriting" in caplog.text
class TestModuleLevelRegistry:
"""Test the convenience functions in infrastructure.world.__init__."""
def test_register_and_get(self):
from infrastructure.world import get_adapter, register_adapter
register_adapter("test_mock", MockWorldAdapter)
adapter = get_adapter("test_mock")
assert isinstance(adapter, MockWorldAdapter)

View File

@@ -0,0 +1,44 @@
"""Tests for the TES3MP stub adapter."""
import pytest
from infrastructure.world.adapters.tes3mp import TES3MPWorldAdapter
from infrastructure.world.types import CommandInput
class TestTES3MPStub:
"""Acceptance criterion: stub imports cleanly and raises NotImplementedError."""
def test_instantiates(self):
adapter = TES3MPWorldAdapter(host="127.0.0.1", port=25565)
assert adapter._host == "127.0.0.1"
assert adapter._port == 25565
def test_is_connected_default_false(self):
adapter = TES3MPWorldAdapter()
assert adapter.is_connected is False
def test_connect_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="connect"):
adapter.connect()
def test_disconnect_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="disconnect"):
adapter.disconnect()
def test_observe_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="observe"):
adapter.observe()
def test_act_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="act"):
adapter.act(CommandInput(action="move"))
def test_speak_raises(self):
adapter = TES3MPWorldAdapter()
with pytest.raises(NotImplementedError, match="speak"):
adapter.speak("Hello")

View File

@@ -0,0 +1,176 @@
"""Tests for Heartbeat v2 — WorldInterface-driven cognitive loop.
Acceptance criteria:
- With MockWorldAdapter: heartbeat runs, logs show observe→reason→act→reflect
- Without adapter: existing think_once() behaviour unchanged
- WebSocket broadcasts include current action and reasoning summary
"""
from unittest.mock import AsyncMock, patch
import pytest
from infrastructure.world.adapters.mock import MockWorldAdapter
from infrastructure.world.types import ActionStatus
from loop.heartbeat import CycleRecord, Heartbeat
@pytest.fixture
def mock_adapter():
adapter = MockWorldAdapter(
location="Balmora",
entities=["Guard", "Merchant"],
events=["player_entered"],
)
adapter.connect()
return adapter
class TestHeartbeatWithAdapter:
"""With MockWorldAdapter: heartbeat runs full embodied cycle."""
@pytest.mark.asyncio
async def test_run_once_returns_cycle_record(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert isinstance(record, CycleRecord)
assert record.cycle_id == 1
@pytest.mark.asyncio
async def test_observation_populated(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert record.observation["location"] == "Balmora"
assert "Guard" in record.observation["entities"]
assert "player_entered" in record.observation["events"]
@pytest.mark.asyncio
async def test_action_dispatched_to_world(self, mock_adapter):
"""Act phase should dispatch to world.act() for non-idle actions."""
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
# The default loop phases don't set an explicit action, so it
# falls through to "idle" → NOOP. That's correct behaviour —
# the real LLM-powered reason phase will set action metadata.
assert record.action_status in (
ActionStatus.NOOP.value,
ActionStatus.SUCCESS.value,
)
@pytest.mark.asyncio
async def test_reflect_notes_present(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert "Balmora" in record.reflect_notes
@pytest.mark.asyncio
async def test_cycle_count_increments(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
await hb.run_once()
assert hb.cycle_count == 2
assert len(hb.history) == 2
@pytest.mark.asyncio
async def test_duration_recorded(self, mock_adapter):
hb = Heartbeat(world=mock_adapter)
record = await hb.run_once()
assert record.duration_ms >= 0
@pytest.mark.asyncio
async def test_on_cycle_callback(self, mock_adapter):
received = []
async def callback(record):
received.append(record)
hb = Heartbeat(world=mock_adapter, on_cycle=callback)
await hb.run_once()
assert len(received) == 1
assert received[0].cycle_id == 1
class TestHeartbeatWithoutAdapter:
"""Without adapter: existing think_once() behaviour unchanged."""
@pytest.mark.asyncio
async def test_passive_cycle(self):
hb = Heartbeat(world=None)
record = await hb.run_once()
assert record.action_taken == "think"
assert record.action_status == "noop"
assert "Passive" in record.reflect_notes
@pytest.mark.asyncio
async def test_passive_no_observation(self):
hb = Heartbeat(world=None)
record = await hb.run_once()
assert record.observation == {}
class TestHeartbeatLifecycle:
def test_interval_property(self):
hb = Heartbeat(interval=60.0)
assert hb.interval == 60.0
hb.interval = 10.0
assert hb.interval == 10.0
def test_interval_minimum(self):
hb = Heartbeat()
hb.interval = 0.1
assert hb.interval == 1.0
def test_world_property(self):
hb = Heartbeat()
assert hb.world is None
adapter = MockWorldAdapter()
hb.world = adapter
assert hb.world is adapter
def test_stop_sets_flag(self):
hb = Heartbeat()
assert not hb.is_running
hb.stop()
assert not hb.is_running
class TestHeartbeatBroadcast:
"""WebSocket broadcasts include action and reasoning summary."""
@pytest.mark.asyncio
async def test_broadcast_called(self, mock_adapter):
with patch(
"loop.heartbeat.ws_manager",
create=True,
) as mock_ws:
mock_ws.broadcast = AsyncMock()
# Patch the import inside heartbeat
with patch("infrastructure.ws_manager.handler.ws_manager") as ws_mod:
ws_mod.broadcast = AsyncMock()
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
ws_mod.broadcast.assert_called_once()
call_args = ws_mod.broadcast.call_args
assert call_args[0][0] == "heartbeat.cycle"
data = call_args[0][1]
assert "action" in data
assert "reasoning_summary" in data
assert "observation" in data
class TestHeartbeatLog:
"""Verify logging of observe→reason→act→reflect cycle."""
@pytest.mark.asyncio
async def test_embodied_cycle_logs(self, mock_adapter, caplog):
import logging
with caplog.at_level(logging.INFO):
hb = Heartbeat(world=mock_adapter)
await hb.run_once()
messages = caplog.text
assert "Phase 1 (Gather)" in messages
assert "Phase 2 (Reason)" in messages
assert "Phase 3 (Act)" in messages
assert "Heartbeat cycle #1 complete" in messages

View File

@@ -1,80 +0,0 @@
"""Tests for the thread-local SQLite ConnectionPool."""
import sqlite3
import threading
from pathlib import Path
import pytest
from infrastructure.db_pool import ConnectionPool
pytestmark = pytest.mark.unit
def test_pool_creates_connection(tmp_path: Path):
"""Test that the pool successfully creates a SQLite connection."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn = pool.get_connection()
assert isinstance(conn, sqlite3.Connection)
cursor = conn.execute("SELECT 1")
assert cursor.fetchone()[0] == 1
def test_pool_reuses_connection_same_thread(tmp_path: Path):
"""Test that multiple calls in the same thread return the same connection."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn1 = pool.get_connection()
conn2 = pool.get_connection()
assert conn1 is conn2
def test_pool_different_connections_different_threads(tmp_path: Path):
"""Test that different threads receive distinct connections."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn1 = pool.get_connection()
conn2_list = []
def _worker():
conn2_list.append(pool.get_connection())
thread = threading.Thread(target=_worker)
thread.start()
thread.join()
assert len(conn2_list) == 1
conn2 = conn2_list[0]
assert conn1 is not conn2
def test_pool_close_connection(tmp_path: Path):
"""Test that connection is closed and cleared from thread local."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
conn1 = pool.get_connection()
pool.close_connection()
# Getting a new connection should create a new object
conn2 = pool.get_connection()
assert conn1 is not conn2
def test_pool_context_manager(tmp_path: Path):
"""Test that the context manager yields a connection and closes it after."""
db_file = tmp_path / "test.db"
pool = ConnectionPool(db_file)
with pool.connection() as conn1:
assert isinstance(conn1, sqlite3.Connection)
# After exiting the context manager, the connection should be closed implicitly
# resulting in a new connection object for the next request.
conn2 = pool.get_connection()
assert conn1 is not conn2

View File

@@ -1,91 +0,0 @@
"""Tests for the health and sovereignty endpoints."""
from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from dashboard.app import app
from dashboard.routes.health import DependencyStatus
pytestmark = pytest.mark.unit
client = TestClient(app)
@pytest.fixture
def mock_ollama_healthy():
with patch("dashboard.routes.health.check_ollama", return_value=True):
yield
@pytest.fixture
def mock_ollama_unavailable():
with patch("dashboard.routes.health.check_ollama", return_value=False):
yield
@pytest.fixture
def mock_check_ollama_sovereignty():
dep = DependencyStatus(
name="Ollama AI",
status="healthy",
sovereignty_score=10,
details={"url": "http://localhost:11434"},
)
with patch("dashboard.routes.health._check_ollama", return_value=dep):
yield
def test_health_check_healthy(mock_ollama_healthy):
"""Test legacy health check endpoint when Ollama is up."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
assert data["services"]["ollama"] == "up"
assert data["agents"]["agent"]["status"] == "idle"
def test_health_check_degraded(mock_ollama_unavailable):
"""Test legacy health check endpoint when Ollama is down."""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "degraded"
assert data["services"]["ollama"] == "down"
assert data["agents"]["agent"]["status"] == "offline"
def test_health_status_panel_healthy(mock_ollama_healthy):
"""Test HTML status panel rendering."""
response = client.get("/health/status")
assert response.status_code == 200
assert "text/html" in response.headers["content-type"]
assert "UP" in response.text
assert "#10b981" in response.text
def test_sovereignty_check(mock_check_ollama_sovereignty):
"""Test comprehensive sovereignty audit report."""
with (
patch("dashboard.routes.health._check_lightning") as mock_lightning,
patch("dashboard.routes.health._check_sqlite") as mock_sqlite,
):
mock_lightning.return_value = DependencyStatus(
name="Lightning", status="unavailable", sovereignty_score=8, details={}
)
mock_sqlite.return_value = DependencyStatus(
name="SQLite", status="healthy", sovereignty_score=10, details={}
)
response = client.get("/health/sovereignty")
assert response.status_code == 200
data = response.json()
# (10 + 8 + 10) / 3 = 9.3
assert data["overall_score"] == 9.3
assert len(data["dependencies"]) == 3
# Ensure recommendations contain note about unavailable dependency
recommendations = " ".join(data["recommendations"])
assert "unavailable" in recommendations.lower()

View File

@@ -50,17 +50,18 @@ commands =
description = Fast tests — excludes e2e, functional, and external services
commands =
pytest tests/ -q --tb=short \
-m "unit" \
--ignore=tests/e2e \
--ignore=tests/functional \
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
-n auto --dist worksteal
[testenv:integration]
description = Integration tests (marked with @pytest.mark.integration)
commands =
pytest tests/ -q --tb=short \
-m "integration" \
-m "integration and not ollama and not docker and not selenium and not external_api and not slow" \
-n auto --dist worksteal
[testenv:functional]
description = Functional tests — real HTTP, no mocking (excl slow + selenium)
commands =