Compare commits

...

1 Commits

Author SHA1 Message Date
215329146a feat(morrowind): add Perception/Command protocol + SQLite command log (#859, #855)
Some checks failed
Tests / lint (pull_request) Has been cancelled
Tests / test (pull_request) Has been cancelled
Implement two foundational infrastructure pieces for the Morrowind integration:

1. Perception/Command Protocol (Issue #859):
   - Formal spec document with JSON schemas, API contracts, versioning strategy
   - Engine-agnostic design following the Falsework Rule
   - Pydantic v2 models (PerceptionOutput, CommandInput) for runtime validation

2. SQLite Command Log + Training Pipeline (Issue #855):
   - SQLAlchemy model for command_log table with full indexing
   - CommandLogger class with log_command(), query(), export_training_data()
   - TrainingExporter with chat-completion, episode, and instruction formats
   - Storage management (rotation/archival) utilities
   - Alembic migration for the new table

Includes 39 passing tests covering schema validation, logging, querying, and export.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-21 22:33:13 +00:00
9 changed files with 1664 additions and 0 deletions

View File

@@ -0,0 +1,312 @@
# Morrowind Perception/Command Protocol Specification
**Version:** 1.0.0
**Status:** Draft
**Authors:** Timmy Infrastructure Team
**Date:** 2026-03-21
---
## 1. Overview
This document defines the **engine-agnostic Perception/Command protocol** used by Timmy's
heartbeat loop to observe the game world and issue commands. The protocol is designed
around the **Falsework Rule**: TES3MP (Morrowind) is scaffolding. If the engine swaps,
only the bridge and perception script change — the heartbeat, reasoning, and journal
remain sovereign.
### 1.1 Design Principles
- **Engine-agnostic**: Schemas reference abstract concepts (cells, entities, quests), not
Morrowind-specific internals.
- **Versioned**: Every payload carries a `protocol_version` so consumers can negotiate
compatibility.
- **Typed at the boundary**: Pydantic v2 models enforce validation on both the producer
(bridge) and consumer (heartbeat) side.
- **Logged by default**: Every command is persisted to the SQLite command log for
training-data extraction (see Issue #855).
---
## 2. Protocol Version Strategy
| Field | Type | Description |
| ------------------ | ------ | ------------------------------------ |
| `protocol_version` | string | SemVer string (e.g. `"1.0.0"`) |
### Compatibility Rules
- **Patch** bump (1.0.x): additive fields with defaults — fully backward-compatible.
- **Minor** bump (1.x.0): new optional endpoints or enum values — old clients still work.
- **Major** bump (x.0.0): breaking schema change — requires coordinated upgrade of bridge
and heartbeat.
Consumers MUST reject payloads whose major version exceeds their own.
---
## 3. Perception Output Schema
Returned by `GET /perception`. Represents a single snapshot of the game world as observed
by the bridge.
```json
{
"protocol_version": "1.0.0",
"timestamp": "2026-03-21T14:30:00Z",
"agent_id": "timmy",
"location": {
"cell": "Balmora",
"x": 1024.5,
"y": -512.3,
"z": 64.0,
"interior": false
},
"health": {
"current": 85,
"max": 100
},
"nearby_entities": [
{
"entity_id": "npc_001",
"name": "Caius Cosades",
"entity_type": "npc",
"distance": 12.5,
"disposition": 65
}
],
"inventory_summary": {
"gold": 150,
"item_count": 23,
"encumbrance_pct": 0.45
},
"active_quests": [
{
"quest_id": "mq_01",
"name": "Report to Caius Cosades",
"stage": 10
}
],
"environment": {
"time_of_day": "afternoon",
"weather": "clear",
"is_combat": false,
"is_dialogue": false
},
"raw_engine_data": {}
}
```
### 3.1 Field Reference
| Field | Type | Required | Description |
| -------------------- | ----------------- | -------- | ------------------------------------------------------------ |
| `protocol_version` | string | yes | Protocol SemVer |
| `timestamp` | ISO 8601 datetime | yes | When the snapshot was taken |
| `agent_id` | string | yes | Which agent this perception belongs to |
| `location.cell` | string | yes | Current cell/zone name |
| `location.x/y/z` | float | yes | World coordinates |
| `location.interior` | bool | yes | Whether the agent is indoors |
| `health.current` | int (0max) | yes | Current health |
| `health.max` | int (>0) | yes | Maximum health |
| `nearby_entities` | array | yes | Entities within perception radius (may be empty) |
| `inventory_summary` | object | yes | Lightweight inventory overview |
| `active_quests` | array | yes | Currently tracked quests |
| `environment` | object | yes | World-state flags |
| `raw_engine_data` | object | no | Opaque engine-specific blob (not relied upon by heartbeat) |
### 3.2 Entity Types
The `entity_type` field uses a controlled vocabulary:
| Value | Description |
| ---------- | ------------------------ |
| `npc` | Non-player character |
| `creature` | Hostile or neutral mob |
| `item` | Pickup-able world item |
| `door` | Door or transition |
| `container`| Lootable container |
---
## 4. Command Input Schema
Sent via `POST /command`. Represents a single action the agent wants to take in the world.
```json
{
"protocol_version": "1.0.0",
"timestamp": "2026-03-21T14:30:01Z",
"agent_id": "timmy",
"command": "move_to",
"params": {
"target_cell": "Balmora",
"target_x": 1050.0,
"target_y": -500.0
},
"reasoning": "Moving closer to Caius Cosades to begin the main quest dialogue.",
"episode_id": "ep_20260321_001",
"context": {
"perception_timestamp": "2026-03-21T14:30:00Z",
"heartbeat_cycle": 42
}
}
```
### 4.1 Field Reference
| Field | Type | Required | Description |
| ------------------------------ | ----------------- | -------- | ------------------------------------------------------- |
| `protocol_version` | string | yes | Protocol SemVer |
| `timestamp` | ISO 8601 datetime | yes | When the command was issued |
| `agent_id` | string | yes | Which agent is issuing the command |
| `command` | string (enum) | yes | Command type (see §4.2) |
| `params` | object | yes | Command-specific parameters (may be empty `{}`) |
| `reasoning` | string | yes | Natural-language explanation of *why* this command |
| `episode_id` | string | no | Groups commands into training episodes |
| `context` | object | no | Metadata linking command to its triggering perception |
### 4.2 Command Types
| Command | Description | Key Params |
| --------------- | ---------------------------------------- | ---------------------------------- |
| `move_to` | Navigate to coordinates or entity | `target_cell`, `target_x/y/z` |
| `interact` | Interact with entity (talk, activate) | `entity_id`, `interaction_type` |
| `use_item` | Use an inventory item | `item_id`, `target_entity_id?` |
| `wait` | Wait/idle for a duration | `duration_seconds` |
| `combat_action` | Perform a combat action | `action_type`, `target_entity_id` |
| `dialogue` | Choose a dialogue option | `entity_id`, `topic`, `choice_idx` |
| `journal_note` | Write an internal journal observation | `content`, `tags` |
| `noop` | Heartbeat tick with no action | — |
---
## 5. API Contracts
### 5.1 `GET /perception`
Returns the latest perception snapshot.
**Response:** `200 OK` with `PerceptionOutput` JSON body.
**Error Responses:**
| Status | Code | Description |
| ------ | ------------------- | ----------------------------------- |
| 503 | `BRIDGE_UNAVAILABLE`| Game bridge is not connected |
| 504 | `PERCEPTION_TIMEOUT`| Bridge did not respond in time |
| 422 | `SCHEMA_MISMATCH` | Bridge returned incompatible schema |
### 5.2 `POST /command`
Submit a command for the agent to execute.
**Request:** `CommandInput` JSON body.
**Response:** `202 Accepted`
```json
{
"status": "accepted",
"command_id": "cmd_abc123",
"logged": true
}
```
**Error Responses:**
| Status | Code | Description |
| ------ | -------------------- | ----------------------------------- |
| 400 | `INVALID_COMMAND` | Command type not recognized |
| 400 | `VALIDATION_ERROR` | Payload fails Pydantic validation |
| 409 | `COMMAND_CONFLICT` | Agent is busy executing another cmd |
| 503 | `BRIDGE_UNAVAILABLE` | Game bridge is not connected |
### 5.3 `GET /morrowind/status`
Health-check endpoint for the Morrowind bridge.
**Response:** `200 OK`
```json
{
"bridge_connected": true,
"engine": "tes3mp",
"protocol_version": "1.0.0",
"uptime_seconds": 3600,
"last_perception_at": "2026-03-21T14:30:00Z"
}
```
---
## 6. Engine-Swap Documentation (The Falsework Rule)
### What Changes
| Component | Changes on Engine Swap? | Notes |
| ---------------------- | ----------------------- | --------------------------------------------- |
| Bridge process | **YES** — replaced | New bridge speaks same protocol to new engine |
| Perception Lua script | **YES** — replaced | New engine's scripting language/API |
| `PerceptionOutput` | NO | Schema is engine-agnostic |
| `CommandInput` | NO | Schema is engine-agnostic |
| Heartbeat loop | NO | Consumes `PerceptionOutput`, emits `Command` |
| Reasoning/LLM layer | NO | Operates on abstract perception data |
| Journal system | NO | Writes `journal_note` commands |
| Command log + training | NO | Logs all commands regardless of engine |
| Dashboard WebSocket | NO | Separate protocol (`src/infrastructure/protocol.py`) |
### Swap Procedure
1. Implement new bridge that serves `GET /perception` and accepts `POST /command`.
2. Update `raw_engine_data` field documentation for the new engine.
3. Extend `entity_type` enum if the new engine has novel entity categories.
4. Bump `protocol_version` minor (or major if schema changes are required).
5. Run integration tests against the new bridge.
---
## 7. Error Handling Specification
### 7.1 Error Response Format
All error responses follow a consistent structure:
```json
{
"error": {
"code": "BRIDGE_UNAVAILABLE",
"message": "Human-readable error description",
"details": {},
"timestamp": "2026-03-21T14:30:00Z"
}
}
```
### 7.2 Error Codes
| Code | HTTP Status | Retry? | Description |
| -------------------- | ----------- | ------ | ---------------------------------------- |
| `BRIDGE_UNAVAILABLE` | 503 | yes | Bridge process not connected |
| `PERCEPTION_TIMEOUT` | 504 | yes | Bridge did not respond within deadline |
| `SCHEMA_MISMATCH` | 422 | no | Protocol version incompatibility |
| `INVALID_COMMAND` | 400 | no | Unknown command type |
| `VALIDATION_ERROR` | 400 | no | Pydantic validation failed |
| `COMMAND_CONFLICT` | 409 | yes | Agent busy — retry after current command |
| `INTERNAL_ERROR` | 500 | yes | Unexpected server error |
### 7.3 Retry Policy
Clients SHOULD implement exponential backoff for retryable errors:
- Initial delay: 100ms
- Max delay: 5s
- Max retries: 5
- Jitter: ±50ms
---
## 8. Appendix: Pydantic Model Reference
The canonical Pydantic v2 models live in `src/infrastructure/morrowind/schemas.py`.
These models serve as both runtime validation and living documentation of this spec.
Any change to this spec document MUST be reflected in the Pydantic models, and vice versa.

View File

@@ -20,6 +20,7 @@ if config.config_file_name is not None:
# target_metadata = mymodel.Base.metadata
from src.dashboard.models.database import Base
from src.dashboard.models.calm import Task, JournalEntry
from src.infrastructure.morrowind.command_log import CommandLog # noqa: F401
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,

View File

@@ -0,0 +1,89 @@
"""Create command_log table
Revision ID: a1b2c3d4e5f6
Revises: 0093c15b4bbf
Create Date: 2026-03-21 12:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, Sequence[str], None] = "0093c15b4bbf"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
op.create_table(
"command_log",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("timestamp", sa.DateTime(), nullable=False),
sa.Column("command", sa.String(length=64), nullable=False),
sa.Column("params", sa.Text(), nullable=False, server_default="{}"),
sa.Column("reasoning", sa.Text(), nullable=False, server_default=""),
sa.Column(
"perception_snapshot", sa.Text(), nullable=False, server_default="{}"
),
sa.Column("outcome", sa.Text(), nullable=True),
sa.Column(
"agent_id",
sa.String(length=64),
nullable=False,
server_default="timmy",
),
sa.Column("episode_id", sa.String(length=128), nullable=True),
sa.Column("cell", sa.String(length=255), nullable=True),
sa.Column(
"protocol_version",
sa.String(length=16),
nullable=False,
server_default="1.0.0",
),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_command_log_timestamp"), "command_log", ["timestamp"], unique=False
)
op.create_index(
op.f("ix_command_log_command"), "command_log", ["command"], unique=False
)
op.create_index(
op.f("ix_command_log_agent_id"), "command_log", ["agent_id"], unique=False
)
op.create_index(
op.f("ix_command_log_episode_id"),
"command_log",
["episode_id"],
unique=False,
)
op.create_index(
op.f("ix_command_log_cell"), "command_log", ["cell"], unique=False
)
op.create_index(
"ix_command_log_cmd_cell", "command_log", ["command", "cell"], unique=False
)
op.create_index(
"ix_command_log_episode",
"command_log",
["episode_id", "timestamp"],
unique=False,
)
def downgrade() -> None:
"""Downgrade schema."""
op.drop_index("ix_command_log_episode", table_name="command_log")
op.drop_index("ix_command_log_cmd_cell", table_name="command_log")
op.drop_index(op.f("ix_command_log_cell"), table_name="command_log")
op.drop_index(op.f("ix_command_log_episode_id"), table_name="command_log")
op.drop_index(op.f("ix_command_log_agent_id"), table_name="command_log")
op.drop_index(op.f("ix_command_log_command"), table_name="command_log")
op.drop_index(op.f("ix_command_log_timestamp"), table_name="command_log")
op.drop_table("command_log")

View File

@@ -0,0 +1,18 @@
"""Morrowind engine-agnostic perception/command protocol.
This package implements the Perception/Command protocol defined in
``docs/protocol/morrowind-perception-command-spec.md``. It provides:
- Pydantic v2 schemas for runtime validation (``schemas``)
- SQLite command logging and query interface (``command_log``)
- Training-data export pipeline (``training_export``)
"""
from .schemas import CommandInput, CommandType, EntityType, PerceptionOutput
__all__ = [
"CommandInput",
"CommandType",
"EntityType",
"PerceptionOutput",
]

View File

@@ -0,0 +1,307 @@
"""SQLite command log for the Morrowind Perception/Command protocol.
Every heartbeat cycle is logged — the resulting dataset serves as organic
training data for local model fine-tuning (Phase 7+).
Usage::
from infrastructure.morrowind.command_log import CommandLogger
logger = CommandLogger() # uses project default DB
logger.log_command(command_input, perception_snapshot)
results = logger.query(command_type="move_to", limit=100)
logger.export_training_data("export.jsonl")
"""
from __future__ import annotations
import json
import logging
from datetime import UTC, datetime, timedelta
from pathlib import Path
from typing import Any
from sqlalchemy import (
Column,
DateTime,
Index,
Integer,
String,
Text,
create_engine,
)
from sqlalchemy.orm import Session, sessionmaker
from src.dashboard.models.database import Base
from .schemas import CommandInput, CommandType, PerceptionOutput
logger = logging.getLogger(__name__)
# Default database path — same SQLite file as the rest of the project.
DEFAULT_DB_PATH = Path("./data/timmy_calm.db")
# ---------------------------------------------------------------------------
# SQLAlchemy model
# ---------------------------------------------------------------------------
class CommandLog(Base):
"""Persisted command log entry.
Schema columns mirror the requirements from Issue #855:
timestamp, command, params, reasoning, perception_snapshot,
outcome, episode_id.
"""
__tablename__ = "command_log"
id = Column(Integer, primary_key=True, autoincrement=True)
timestamp = Column(
DateTime, nullable=False, default=lambda: datetime.now(UTC), index=True
)
command = Column(String(64), nullable=False, index=True)
params = Column(Text, nullable=False, default="{}")
reasoning = Column(Text, nullable=False, default="")
perception_snapshot = Column(Text, nullable=False, default="{}")
outcome = Column(Text, nullable=True)
agent_id = Column(String(64), nullable=False, default="timmy", index=True)
episode_id = Column(String(128), nullable=True, index=True)
cell = Column(String(255), nullable=True, index=True)
protocol_version = Column(String(16), nullable=False, default="1.0.0")
created_at = Column(
DateTime, nullable=False, default=lambda: datetime.now(UTC)
)
__table_args__ = (
Index("ix_command_log_cmd_cell", "command", "cell"),
Index("ix_command_log_episode", "episode_id", "timestamp"),
)
# ---------------------------------------------------------------------------
# CommandLogger — high-level API
# ---------------------------------------------------------------------------
class CommandLogger:
"""High-level interface for logging, querying, and exporting commands.
Args:
db_url: SQLAlchemy database URL. Defaults to the project SQLite path.
"""
def __init__(self, db_url: str | None = None) -> None:
if db_url is None:
DEFAULT_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
db_url = f"sqlite:///{DEFAULT_DB_PATH}"
self._engine = create_engine(
db_url, connect_args={"check_same_thread": False}
)
self._SessionLocal = sessionmaker(
autocommit=False, autoflush=False, bind=self._engine
)
# Ensure table exists.
Base.metadata.create_all(bind=self._engine, tables=[CommandLog.__table__])
def _get_session(self) -> Session:
return self._SessionLocal()
# -- Write ---------------------------------------------------------------
def log_command(
self,
command_input: CommandInput,
perception: PerceptionOutput | None = None,
outcome: str | None = None,
) -> int:
"""Persist a command to the log.
Returns the auto-generated row id.
"""
perception_json = perception.model_dump_json() if perception else "{}"
cell = perception.location.cell if perception else None
entry = CommandLog(
timestamp=command_input.timestamp,
command=command_input.command.value,
params=json.dumps(command_input.params),
reasoning=command_input.reasoning,
perception_snapshot=perception_json,
outcome=outcome,
agent_id=command_input.agent_id,
episode_id=command_input.episode_id,
cell=cell,
protocol_version=command_input.protocol_version,
)
session = self._get_session()
try:
session.add(entry)
session.commit()
session.refresh(entry)
row_id: int = entry.id
return row_id
except Exception:
session.rollback()
raise
finally:
session.close()
# -- Read ----------------------------------------------------------------
def query(
self,
*,
command_type: str | CommandType | None = None,
cell: str | None = None,
episode_id: str | None = None,
agent_id: str | None = None,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> list[dict[str, Any]]:
"""Query command log entries with optional filters.
Returns a list of dicts (serialisable to JSON).
"""
session = self._get_session()
try:
q = session.query(CommandLog)
if command_type is not None:
q = q.filter(CommandLog.command == str(command_type))
if cell is not None:
q = q.filter(CommandLog.cell == cell)
if episode_id is not None:
q = q.filter(CommandLog.episode_id == episode_id)
if agent_id is not None:
q = q.filter(CommandLog.agent_id == agent_id)
if since is not None:
q = q.filter(CommandLog.timestamp >= since)
if until is not None:
q = q.filter(CommandLog.timestamp <= until)
q = q.order_by(CommandLog.timestamp.desc())
q = q.offset(offset).limit(limit)
rows = q.all()
return [self._row_to_dict(row) for row in rows]
finally:
session.close()
# -- Export --------------------------------------------------------------
def export_training_data(
self,
output_path: str | Path,
*,
episode_id: str | None = None,
since: datetime | None = None,
until: datetime | None = None,
) -> int:
"""Export command log entries as a JSONL file for fine-tuning.
Each line is a JSON object with ``perception`` (input) and
``command`` + ``reasoning`` (target output).
Returns the number of rows exported.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
session = self._get_session()
try:
q = session.query(CommandLog)
if episode_id is not None:
q = q.filter(CommandLog.episode_id == episode_id)
if since is not None:
q = q.filter(CommandLog.timestamp >= since)
if until is not None:
q = q.filter(CommandLog.timestamp <= until)
q = q.order_by(CommandLog.timestamp.asc())
count = 0
with open(output_path, "w", encoding="utf-8") as fh:
for row in q.yield_per(500):
record = {
"input": {
"perception": json.loads(row.perception_snapshot),
},
"output": {
"command": row.command,
"params": json.loads(row.params),
"reasoning": row.reasoning,
},
"metadata": {
"timestamp": row.timestamp.isoformat() if row.timestamp else None,
"episode_id": row.episode_id,
"cell": row.cell,
"outcome": row.outcome,
},
}
fh.write(json.dumps(record) + "\n")
count += 1
logger.info("Exported %d training records to %s", count, output_path)
return count
finally:
session.close()
# -- Storage management --------------------------------------------------
def rotate(self, max_age_days: int = 90) -> int:
"""Delete command log entries older than *max_age_days*.
Returns the number of rows deleted.
"""
cutoff = datetime.now(UTC) - timedelta(days=max_age_days)
session = self._get_session()
try:
deleted = (
session.query(CommandLog)
.filter(CommandLog.timestamp < cutoff)
.delete(synchronize_session=False)
)
session.commit()
logger.info("Rotated %d command log entries older than %s", deleted, cutoff)
return deleted
except Exception:
session.rollback()
raise
finally:
session.close()
def count(self) -> int:
"""Return the total number of command log entries."""
session = self._get_session()
try:
return session.query(CommandLog).count()
finally:
session.close()
# -- Helpers -------------------------------------------------------------
@staticmethod
def _row_to_dict(row: CommandLog) -> dict[str, Any]:
return {
"id": row.id,
"timestamp": row.timestamp.isoformat() if row.timestamp else None,
"command": row.command,
"params": json.loads(row.params) if row.params else {},
"reasoning": row.reasoning,
"perception_snapshot": json.loads(row.perception_snapshot)
if row.perception_snapshot
else {},
"outcome": row.outcome,
"agent_id": row.agent_id,
"episode_id": row.episode_id,
"cell": row.cell,
"protocol_version": row.protocol_version,
"created_at": row.created_at.isoformat() if row.created_at else None,
}

View File

@@ -0,0 +1,186 @@
"""Pydantic v2 models for the Morrowind Perception/Command protocol.
These models enforce the contract defined in
``docs/protocol/morrowind-perception-command-spec.md`` at runtime.
They are engine-agnostic by design — see the Falsework Rule.
"""
from __future__ import annotations
from datetime import datetime
from enum import StrEnum
from typing import Any
from pydantic import BaseModel, Field, model_validator
PROTOCOL_VERSION = "1.0.0"
# ---------------------------------------------------------------------------
# Enums
# ---------------------------------------------------------------------------
class EntityType(StrEnum):
"""Controlled vocabulary for nearby entity types."""
NPC = "npc"
CREATURE = "creature"
ITEM = "item"
DOOR = "door"
CONTAINER = "container"
class CommandType(StrEnum):
"""All supported command types."""
MOVE_TO = "move_to"
INTERACT = "interact"
USE_ITEM = "use_item"
WAIT = "wait"
COMBAT_ACTION = "combat_action"
DIALOGUE = "dialogue"
JOURNAL_NOTE = "journal_note"
NOOP = "noop"
# ---------------------------------------------------------------------------
# Perception Output sub-models
# ---------------------------------------------------------------------------
class Location(BaseModel):
"""Agent position within the game world."""
cell: str = Field(..., description="Current cell/zone name")
x: float = Field(..., description="World X coordinate")
y: float = Field(..., description="World Y coordinate")
z: float = Field(0.0, description="World Z coordinate")
interior: bool = Field(False, description="Whether the agent is indoors")
class HealthStatus(BaseModel):
"""Agent health information."""
current: int = Field(..., ge=0, description="Current health points")
max: int = Field(..., gt=0, description="Maximum health points")
@model_validator(mode="after")
def current_le_max(self) -> "HealthStatus":
if self.current > self.max:
raise ValueError(
f"current ({self.current}) cannot exceed max ({self.max})"
)
return self
class NearbyEntity(BaseModel):
"""An entity within the agent's perception radius."""
entity_id: str = Field(..., description="Unique entity identifier")
name: str = Field(..., description="Display name")
entity_type: EntityType = Field(..., description="Entity category")
distance: float = Field(..., ge=0, description="Distance from agent")
disposition: int | None = Field(None, description="NPC disposition (0-100)")
class InventorySummary(BaseModel):
"""Lightweight overview of the agent's inventory."""
gold: int = Field(0, ge=0, description="Gold held")
item_count: int = Field(0, ge=0, description="Total items carried")
encumbrance_pct: float = Field(
0.0, ge=0.0, le=1.0, description="Encumbrance as fraction (0.01.0)"
)
class QuestInfo(BaseModel):
"""A currently tracked quest."""
quest_id: str = Field(..., description="Quest identifier")
name: str = Field(..., description="Quest display name")
stage: int = Field(0, ge=0, description="Current quest stage")
class Environment(BaseModel):
"""World-state flags."""
time_of_day: str = Field("unknown", description="Time period (morning, afternoon, etc.)")
weather: str = Field("clear", description="Current weather condition")
is_combat: bool = Field(False, description="Whether the agent is in combat")
is_dialogue: bool = Field(False, description="Whether the agent is in dialogue")
# ---------------------------------------------------------------------------
# Top-level schemas
# ---------------------------------------------------------------------------
class PerceptionOutput(BaseModel):
"""Complete perception snapshot returned by ``GET /perception``.
This is the engine-agnostic view of the game world consumed by the
heartbeat loop and reasoning layer.
"""
protocol_version: str = Field(
default=PROTOCOL_VERSION,
description="Protocol SemVer string",
)
timestamp: datetime = Field(..., description="When the snapshot was taken")
agent_id: str = Field(..., description="Which agent this perception belongs to")
location: Location
health: HealthStatus
nearby_entities: list[NearbyEntity] = Field(default_factory=list)
inventory_summary: InventorySummary = Field(default_factory=InventorySummary)
active_quests: list[QuestInfo] = Field(default_factory=list)
environment: Environment = Field(default_factory=Environment)
raw_engine_data: dict[str, Any] = Field(
default_factory=dict,
description="Opaque engine-specific blob — not relied upon by heartbeat",
)
class CommandContext(BaseModel):
"""Metadata linking a command to its triggering perception."""
perception_timestamp: datetime | None = Field(
None, description="Timestamp of the perception that triggered this command"
)
heartbeat_cycle: int | None = Field(
None, ge=0, description="Heartbeat cycle number"
)
class CommandInput(BaseModel):
"""Command payload sent via ``POST /command``.
Every command includes a ``reasoning`` field so the command log
captures the agent's intent — critical for training-data export.
"""
protocol_version: str = Field(
default=PROTOCOL_VERSION,
description="Protocol SemVer string",
)
timestamp: datetime = Field(..., description="When the command was issued")
agent_id: str = Field(..., description="Which agent is issuing the command")
command: CommandType = Field(..., description="Command type")
params: dict[str, Any] = Field(
default_factory=dict, description="Command-specific parameters"
)
reasoning: str = Field(
...,
min_length=1,
description="Natural-language explanation of why this command was chosen",
)
episode_id: str | None = Field(
None, description="Groups commands into training episodes"
)
context: CommandContext | None = Field(
None, description="Metadata linking command to its triggering perception"
)

View File

@@ -0,0 +1,243 @@
"""Fine-tuning dataset export pipeline for command log data.
Transforms raw command log entries into structured training datasets
suitable for supervised fine-tuning of local models.
Usage::
from infrastructure.morrowind.training_export import TrainingExporter
exporter = TrainingExporter(command_logger)
stats = exporter.export_chat_format("train.jsonl")
stats = exporter.export_episode_sequences("episodes/", min_length=5)
"""
from __future__ import annotations
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from .command_log import CommandLogger
logger = logging.getLogger(__name__)
@dataclass
class ExportStats:
"""Statistics about an export run."""
total_records: int = 0
episodes_exported: int = 0
skipped_records: int = 0
output_path: str = ""
format: str = ""
exported_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
class TrainingExporter:
"""Builds fine-tuning datasets from the command log.
Supports multiple output formats used by common fine-tuning
frameworks (chat-completion style, instruction-following, episode
sequences).
Args:
command_logger: A :class:`CommandLogger` instance to read from.
"""
def __init__(self, command_logger: CommandLogger) -> None:
self._logger = command_logger
# -- Chat-completion format ----------------------------------------------
def export_chat_format(
self,
output_path: str | Path,
*,
since: datetime | None = None,
until: datetime | None = None,
max_records: int | None = None,
) -> ExportStats:
"""Export as chat-completion training pairs.
Each line is a JSON object with ``messages`` list containing a
``system`` prompt, ``user`` (perception), and ``assistant``
(command + reasoning) message.
This format is compatible with OpenAI / Llama fine-tuning APIs.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
rows = self._logger.query(
since=since,
until=until,
limit=max_records or 100_000,
)
# query returns newest-first; reverse for chronological export
rows.reverse()
stats = ExportStats(
output_path=str(output_path),
format="chat_completion",
)
with open(output_path, "w", encoding="utf-8") as fh:
for row in rows:
perception = row.get("perception_snapshot", {})
if not perception:
stats.skipped_records += 1
continue
record = {
"messages": [
{
"role": "system",
"content": (
"You are an autonomous agent navigating a game world. "
"Given a perception of the world state, decide what "
"command to execute and explain your reasoning."
),
},
{
"role": "user",
"content": json.dumps(perception),
},
{
"role": "assistant",
"content": json.dumps(
{
"command": row.get("command"),
"params": row.get("params", {}),
"reasoning": row.get("reasoning", ""),
}
),
},
],
}
fh.write(json.dumps(record) + "\n")
stats.total_records += 1
logger.info(
"Exported %d chat-format records to %s (skipped %d)",
stats.total_records,
output_path,
stats.skipped_records,
)
return stats
# -- Episode sequences ---------------------------------------------------
def export_episode_sequences(
self,
output_dir: str | Path,
*,
min_length: int = 3,
since: datetime | None = None,
until: datetime | None = None,
) -> ExportStats:
"""Export command sequences grouped by episode.
Each episode is written as a separate JSONL file in *output_dir*.
Episodes shorter than *min_length* are skipped.
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Gather all rows (high limit) and group by episode.
rows = self._logger.query(since=since, until=until, limit=1_000_000)
rows.reverse() # chronological
episodes: dict[str, list[dict[str, Any]]] = {}
for row in rows:
ep_id = row.get("episode_id") or "unknown"
episodes.setdefault(ep_id, []).append(row)
stats = ExportStats(
output_path=str(output_dir),
format="episode_sequence",
)
for ep_id, entries in episodes.items():
if len(entries) < min_length:
stats.skipped_records += len(entries)
continue
ep_file = output_dir / f"{ep_id}.jsonl"
with open(ep_file, "w", encoding="utf-8") as fh:
for entry in entries:
fh.write(json.dumps(entry, default=str) + "\n")
stats.total_records += 1
stats.episodes_exported += 1
logger.info(
"Exported %d episodes (%d records) to %s",
stats.episodes_exported,
stats.total_records,
output_dir,
)
return stats
# -- Instruction-following format ----------------------------------------
def export_instruction_format(
self,
output_path: str | Path,
*,
since: datetime | None = None,
until: datetime | None = None,
max_records: int | None = None,
) -> ExportStats:
"""Export as instruction/response pairs (Alpaca-style).
Each line has ``instruction``, ``input``, and ``output`` fields.
"""
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
rows = self._logger.query(
since=since,
until=until,
limit=max_records or 100_000,
)
rows.reverse()
stats = ExportStats(
output_path=str(output_path),
format="instruction",
)
with open(output_path, "w", encoding="utf-8") as fh:
for row in rows:
perception = row.get("perception_snapshot", {})
if not perception:
stats.skipped_records += 1
continue
record = {
"instruction": (
"Given the following game world perception, decide what "
"command to execute. Explain your reasoning."
),
"input": json.dumps(perception),
"output": json.dumps(
{
"command": row.get("command"),
"params": row.get("params", {}),
"reasoning": row.get("reasoning", ""),
}
),
}
fh.write(json.dumps(record) + "\n")
stats.total_records += 1
logger.info(
"Exported %d instruction-format records to %s",
stats.total_records,
output_path,
)
return stats

266
tests/test_command_log.py Normal file
View File

@@ -0,0 +1,266 @@
"""Tests for Morrowind command log and training export pipeline."""
from datetime import UTC, datetime, timedelta
from pathlib import Path
import pytest
from src.infrastructure.morrowind.command_log import CommandLog, CommandLogger
from src.infrastructure.morrowind.schemas import (
CommandInput,
CommandType,
PerceptionOutput,
)
from src.infrastructure.morrowind.training_export import TrainingExporter
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
NOW = datetime(2026, 3, 21, 14, 30, 0, tzinfo=UTC)
def _make_perception(**overrides) -> PerceptionOutput:
defaults = {
"timestamp": NOW,
"agent_id": "timmy",
"location": {"cell": "Balmora", "x": 1024.5, "y": -512.3, "z": 64.0},
"health": {"current": 85, "max": 100},
}
defaults.update(overrides)
return PerceptionOutput(**defaults)
def _make_command(**overrides) -> CommandInput:
defaults = {
"timestamp": NOW,
"agent_id": "timmy",
"command": "move_to",
"params": {"target_x": 1050.0},
"reasoning": "Moving closer to quest target.",
}
defaults.update(overrides)
return CommandInput(**defaults)
@pytest.fixture
def logger(tmp_path: Path) -> CommandLogger:
"""CommandLogger backed by an in-memory SQLite DB."""
db_path = tmp_path / "test.db"
return CommandLogger(db_url=f"sqlite:///{db_path}")
@pytest.fixture
def exporter(logger: CommandLogger) -> TrainingExporter:
return TrainingExporter(logger)
# ---------------------------------------------------------------------------
# CommandLogger — log_command
# ---------------------------------------------------------------------------
class TestLogCommand:
def test_basic_log(self, logger: CommandLogger):
cmd = _make_command()
row_id = logger.log_command(cmd)
assert row_id >= 1
def test_log_with_perception(self, logger: CommandLogger):
cmd = _make_command()
perception = _make_perception()
row_id = logger.log_command(cmd, perception=perception)
assert row_id >= 1
results = logger.query(limit=1)
assert len(results) == 1
assert results[0]["cell"] == "Balmora"
assert results[0]["perception_snapshot"]["location"]["cell"] == "Balmora"
def test_log_with_outcome(self, logger: CommandLogger):
cmd = _make_command()
row_id = logger.log_command(cmd, outcome="success: arrived at destination")
results = logger.query(limit=1)
assert results[0]["outcome"] == "success: arrived at destination"
def test_log_preserves_episode_id(self, logger: CommandLogger):
cmd = _make_command(episode_id="ep_test_001")
logger.log_command(cmd)
results = logger.query(episode_id="ep_test_001")
assert len(results) == 1
assert results[0]["episode_id"] == "ep_test_001"
# ---------------------------------------------------------------------------
# CommandLogger — query
# ---------------------------------------------------------------------------
class TestQuery:
def test_filter_by_command_type(self, logger: CommandLogger):
logger.log_command(_make_command(command="move_to"))
logger.log_command(_make_command(command="noop"))
logger.log_command(_make_command(command="move_to"))
results = logger.query(command_type="move_to")
assert len(results) == 2
assert all(r["command"] == "move_to" for r in results)
def test_filter_by_cell(self, logger: CommandLogger):
p1 = _make_perception(location={"cell": "Balmora", "x": 0, "y": 0, "z": 0})
p2 = _make_perception(location={"cell": "Vivec", "x": 0, "y": 0, "z": 0})
logger.log_command(_make_command(), perception=p1)
logger.log_command(_make_command(), perception=p2)
results = logger.query(cell="Vivec")
assert len(results) == 1
assert results[0]["cell"] == "Vivec"
def test_filter_by_time_range(self, logger: CommandLogger):
t1 = NOW - timedelta(hours=2)
t2 = NOW - timedelta(hours=1)
t3 = NOW
logger.log_command(_make_command(timestamp=t1.isoformat()))
logger.log_command(_make_command(timestamp=t2.isoformat()))
logger.log_command(_make_command(timestamp=t3.isoformat()))
results = logger.query(since=NOW - timedelta(hours=1, minutes=30), until=NOW)
assert len(results) == 2
def test_limit_and_offset(self, logger: CommandLogger):
for i in range(5):
logger.log_command(_make_command())
results = logger.query(limit=2, offset=0)
assert len(results) == 2
results = logger.query(limit=10, offset=3)
assert len(results) == 2
def test_empty_query(self, logger: CommandLogger):
results = logger.query()
assert results == []
# ---------------------------------------------------------------------------
# CommandLogger — export_training_data (JSONL)
# ---------------------------------------------------------------------------
class TestExportTrainingData:
def test_basic_export(self, logger: CommandLogger, tmp_path: Path):
perception = _make_perception()
for _ in range(3):
logger.log_command(_make_command(), perception=perception)
output = tmp_path / "train.jsonl"
count = logger.export_training_data(output)
assert count == 3
assert output.exists()
import json
lines = output.read_text().strip().split("\n")
assert len(lines) == 3
record = json.loads(lines[0])
assert "input" in record
assert "output" in record
assert record["output"]["command"] == "move_to"
def test_export_filter_by_episode(self, logger: CommandLogger, tmp_path: Path):
logger.log_command(_make_command(episode_id="ep_a"), perception=_make_perception())
logger.log_command(_make_command(episode_id="ep_b"), perception=_make_perception())
output = tmp_path / "ep_a.jsonl"
count = logger.export_training_data(output, episode_id="ep_a")
assert count == 1
# ---------------------------------------------------------------------------
# CommandLogger — storage management
# ---------------------------------------------------------------------------
class TestStorageManagement:
def test_count(self, logger: CommandLogger):
assert logger.count() == 0
logger.log_command(_make_command())
logger.log_command(_make_command())
assert logger.count() == 2
def test_rotate_old_entries(self, logger: CommandLogger):
old_time = NOW - timedelta(days=100)
logger.log_command(_make_command(timestamp=old_time.isoformat()))
logger.log_command(_make_command(timestamp=NOW.isoformat()))
deleted = logger.rotate(max_age_days=90)
assert deleted == 1
assert logger.count() == 1
def test_rotate_nothing_to_delete(self, logger: CommandLogger):
logger.log_command(_make_command(timestamp=NOW.isoformat()))
deleted = logger.rotate(max_age_days=1)
assert deleted == 0
# ---------------------------------------------------------------------------
# TrainingExporter — chat format
# ---------------------------------------------------------------------------
class TestTrainingExporterChat:
def test_chat_format_export(
self, logger: CommandLogger, exporter: TrainingExporter, tmp_path: Path
):
perception = _make_perception()
for _ in range(3):
logger.log_command(_make_command(), perception=perception)
output = tmp_path / "chat.jsonl"
stats = exporter.export_chat_format(output)
assert stats.total_records == 3
assert stats.format == "chat_completion"
import json
lines = output.read_text().strip().split("\n")
record = json.loads(lines[0])
assert record["messages"][0]["role"] == "system"
assert record["messages"][1]["role"] == "user"
assert record["messages"][2]["role"] == "assistant"
# ---------------------------------------------------------------------------
# TrainingExporter — episode sequences
# ---------------------------------------------------------------------------
class TestTrainingExporterEpisodes:
def test_episode_export(
self, logger: CommandLogger, exporter: TrainingExporter, tmp_path: Path
):
perception = _make_perception()
for i in range(5):
logger.log_command(
_make_command(episode_id="ep_test"),
perception=perception,
)
output_dir = tmp_path / "episodes"
stats = exporter.export_episode_sequences(output_dir, min_length=3)
assert stats.episodes_exported == 1
assert stats.total_records == 5
assert (output_dir / "ep_test.jsonl").exists()
def test_short_episodes_skipped(
self, logger: CommandLogger, exporter: TrainingExporter, tmp_path: Path
):
perception = _make_perception()
logger.log_command(_make_command(episode_id="short"), perception=perception)
output_dir = tmp_path / "episodes"
stats = exporter.export_episode_sequences(output_dir, min_length=3)
assert stats.episodes_exported == 0
assert stats.skipped_records == 1

View File

@@ -0,0 +1,242 @@
"""Tests for Morrowind Perception/Command protocol Pydantic schemas."""
from datetime import UTC, datetime
import pytest
from pydantic import ValidationError
from src.infrastructure.morrowind.schemas import (
PROTOCOL_VERSION,
CommandContext,
CommandInput,
CommandType,
EntityType,
Environment,
HealthStatus,
InventorySummary,
Location,
NearbyEntity,
PerceptionOutput,
QuestInfo,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
NOW = datetime(2026, 3, 21, 14, 30, 0, tzinfo=UTC)
def _make_perception(**overrides) -> PerceptionOutput:
defaults = {
"timestamp": NOW,
"agent_id": "timmy",
"location": {"cell": "Balmora", "x": 1024.5, "y": -512.3, "z": 64.0, "interior": False},
"health": {"current": 85, "max": 100},
}
defaults.update(overrides)
return PerceptionOutput(**defaults)
def _make_command(**overrides) -> CommandInput:
defaults = {
"timestamp": NOW,
"agent_id": "timmy",
"command": "move_to",
"params": {"target_cell": "Balmora", "target_x": 1050.0},
"reasoning": "Moving closer to the quest target.",
}
defaults.update(overrides)
return CommandInput(**defaults)
# ---------------------------------------------------------------------------
# PerceptionOutput tests
# ---------------------------------------------------------------------------
class TestPerceptionOutput:
def test_minimal_valid(self):
p = _make_perception()
assert p.protocol_version == PROTOCOL_VERSION
assert p.agent_id == "timmy"
assert p.location.cell == "Balmora"
assert p.health.current == 85
assert p.nearby_entities == []
assert p.active_quests == []
def test_full_payload(self):
p = _make_perception(
nearby_entities=[
{
"entity_id": "npc_001",
"name": "Caius Cosades",
"entity_type": "npc",
"distance": 12.5,
"disposition": 65,
}
],
inventory_summary={"gold": 150, "item_count": 23, "encumbrance_pct": 0.45},
active_quests=[{"quest_id": "mq_01", "name": "Report to Caius", "stage": 10}],
environment={
"time_of_day": "afternoon",
"weather": "clear",
"is_combat": False,
"is_dialogue": False,
},
raw_engine_data={"tes3mp_version": "0.8.1"},
)
assert len(p.nearby_entities) == 1
assert p.nearby_entities[0].entity_type == EntityType.NPC
assert p.inventory_summary.gold == 150
assert p.active_quests[0].quest_id == "mq_01"
assert p.raw_engine_data["tes3mp_version"] == "0.8.1"
def test_serialization_roundtrip(self):
p = _make_perception()
json_str = p.model_dump_json()
p2 = PerceptionOutput.model_validate_json(json_str)
assert p2.location.cell == p.location.cell
assert p2.health.current == p.health.current
def test_missing_required_fields(self):
with pytest.raises(ValidationError):
PerceptionOutput(timestamp=NOW, agent_id="timmy") # no location/health
def test_default_protocol_version(self):
p = _make_perception()
assert p.protocol_version == "1.0.0"
# ---------------------------------------------------------------------------
# Health validation
# ---------------------------------------------------------------------------
class TestHealthStatus:
def test_current_cannot_exceed_max(self):
with pytest.raises(ValidationError, match="cannot exceed max"):
HealthStatus(current=150, max=100)
def test_max_must_be_positive(self):
with pytest.raises(ValidationError):
HealthStatus(current=0, max=0)
def test_current_can_be_zero(self):
h = HealthStatus(current=0, max=100)
assert h.current == 0
# ---------------------------------------------------------------------------
# Location
# ---------------------------------------------------------------------------
class TestLocation:
def test_defaults(self):
loc = Location(cell="Seyda Neen", x=0.0, y=0.0)
assert loc.z == 0.0
assert loc.interior is False
# ---------------------------------------------------------------------------
# NearbyEntity
# ---------------------------------------------------------------------------
class TestNearbyEntity:
def test_all_entity_types(self):
for et in EntityType:
e = NearbyEntity(entity_id="e1", name="Test", entity_type=et, distance=1.0)
assert e.entity_type == et
def test_invalid_entity_type(self):
with pytest.raises(ValidationError):
NearbyEntity(entity_id="e1", name="Test", entity_type="dragon", distance=1.0)
def test_negative_distance_rejected(self):
with pytest.raises(ValidationError):
NearbyEntity(entity_id="e1", name="Test", entity_type="npc", distance=-5.0)
# ---------------------------------------------------------------------------
# InventorySummary
# ---------------------------------------------------------------------------
class TestInventorySummary:
def test_encumbrance_bounds(self):
with pytest.raises(ValidationError):
InventorySummary(encumbrance_pct=1.5)
with pytest.raises(ValidationError):
InventorySummary(encumbrance_pct=-0.1)
def test_defaults(self):
inv = InventorySummary()
assert inv.gold == 0
assert inv.item_count == 0
assert inv.encumbrance_pct == 0.0
# ---------------------------------------------------------------------------
# CommandInput tests
# ---------------------------------------------------------------------------
class TestCommandInput:
def test_minimal_valid(self):
c = _make_command()
assert c.command == CommandType.MOVE_TO
assert c.reasoning == "Moving closer to the quest target."
assert c.episode_id is None
def test_all_command_types(self):
for ct in CommandType:
c = _make_command(command=ct.value)
assert c.command == ct
def test_invalid_command_type(self):
with pytest.raises(ValidationError):
_make_command(command="fly_to_moon")
def test_reasoning_required(self):
with pytest.raises(ValidationError):
CommandInput(
timestamp=NOW,
agent_id="timmy",
command="noop",
reasoning="", # min_length=1
)
def test_with_episode_and_context(self):
c = _make_command(
episode_id="ep_001",
context={"perception_timestamp": NOW, "heartbeat_cycle": 42},
)
assert c.episode_id == "ep_001"
assert c.context.heartbeat_cycle == 42
def test_serialization_roundtrip(self):
c = _make_command(episode_id="ep_002")
json_str = c.model_dump_json()
c2 = CommandInput.model_validate_json(json_str)
assert c2.command == c.command
assert c2.episode_id == c.episode_id
# ---------------------------------------------------------------------------
# Enum coverage
# ---------------------------------------------------------------------------
class TestEnums:
def test_entity_type_values(self):
assert set(EntityType) == {"npc", "creature", "item", "door", "container"}
def test_command_type_values(self):
expected = {
"move_to", "interact", "use_item", "wait",
"combat_action", "dialogue", "journal_note", "noop",
}
assert set(CommandType) == expected