Add pilot.py — Layer 1 deterministic motor control

Behavior tree: approach NPCs, enter doors, explore, unstick.
Optional reflex model integration. Goal file for Timmy.
Trajectory logging to JSONL for DPO training.

Refs: #660, #661
This commit is contained in:
2026-03-27 18:14:42 +00:00
parent 0d64d8e559
commit 8db40621f4

459
morrowind/pilot.py Normal file
View File

@@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""
Morrowind Pilot — Layer 1: Deterministic motor control.
Three-layer architecture:
Layer 3: Timmy (hermes4 14B) → strategy via current_goal.txt
Layer 2: Reflex (1-3B model) → tactics via reflex.py (optional)
Layer 1: Pilot (this script) → motor control, no LLM
Reuses existing perception + action infrastructure from mcp_server.py
and local_brain.py. Runs standalone alongside Hermes TUI.
Usage:
python3 ~/.timmy/morrowind/pilot.py
python3 ~/.timmy/morrowind/pilot.py --interval 0.5 --cycles 0 # 0 = infinite
Logs every perception-action pair to trajectories/ for DPO training.
"""
import argparse
import json
import math
import os
import random
import re
import signal
import sys
import time
# ═══════════════════════════════════════
# CONFIG
# ═══════════════════════════════════════
OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log")
MORROWIND_DIR = os.path.expanduser("~/.timmy/morrowind")
GOAL_FILE = os.path.join(MORROWIND_DIR, "current_goal.txt")
ESCALATION_FILE = os.path.join(MORROWIND_DIR, "escalation.txt")
TRAJECTORY_DIR = os.path.join(MORROWIND_DIR, "trajectories")
REFLEX_URL = os.environ.get("REFLEX_URL", "http://localhost:8082/v1/chat/completions")
os.makedirs(TRAJECTORY_DIR, exist_ok=True)
# CGEvent key codes (from local_brain.py)
KEYCODES = {
"w": 13, "a": 0, "s": 1, "d": 2,
"space": 49, "escape": 53, "return": 36,
"e": 14, "f": 3, "q": 12, "j": 38, "t": 20,
"f5": 96, "f9": 101,
"left": 123, "right": 124, "up": 126, "down": 125,
}
# ═══════════════════════════════════════
# PERCEPTION — Reused from mcp_server.py
# ═══════════════════════════════════════
def parse_latest_perception():
"""Parse the most recent perception block from the OpenMW log."""
try:
with open(OPENMW_LOG, "r") as f:
content = f.read()
except FileNotFoundError:
return None
blocks = re.findall(
r"=== TIMMY PERCEPTION ===(.*?)=== END PERCEPTION ===",
content, re.DOTALL
)
if not blocks:
return None
block = blocks[-1]
state = {"npcs": [], "doors": [], "items": []}
for line in block.strip().split("\n"):
line = line.strip()
if "]:\t" in line:
line = line.split("]:\t", 1)[1]
if line.startswith("Cell:"):
state["cell"] = line.split(":", 1)[1].strip()
elif line.startswith("Pos:"):
state["position"] = line.split(":", 1)[1].strip()
elif line.startswith("Yaw:"):
state["yaw"] = line.split(":", 1)[1].strip()
elif line.startswith("HP:"):
state["health"] = line.split(":", 1)[1].strip()
elif line.startswith("MP:"):
state["magicka"] = line.split(":", 1)[1].strip()
elif line.startswith("FT:"):
state["fatigue"] = line.split(":", 1)[1].strip()
elif line.startswith("Mode:"):
state["mode"] = line.split(":", 1)[1].strip()
elif line.startswith("Time:"):
state["game_time"] = line.split(":", 1)[1].strip()
elif line.startswith("NPC:"):
state["npcs"].append(line[4:].strip())
elif line.startswith("Door:"):
state["doors"].append(line[5:].strip())
elif line.startswith("Item:"):
state["items"].append(line[5:].strip())
return state
# ═══════════════════════════════════════
# ACTIONS — CGEvent keypresses
# ═══════════════════════════════════════
def send_key(keycode, duration=0.0, shift=False):
"""Send a keypress to the game via CGEvent."""
import Quartz
flags = Quartz.kCGEventFlagMaskShift if shift else 0
down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True)
Quartz.CGEventSetFlags(down, flags)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, down)
if duration > 0:
time.sleep(duration)
up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False)
Quartz.CGEventSetFlags(up, 0)
Quartz.CGEventPost(Quartz.kCGHIDEventTap, up)
def move(direction, duration=1.0, run=False):
"""Move the character."""
key_map = {
"forward": "w", "backward": "s",
"left": "a", "right": "d",
"turn_left": "left", "turn_right": "right",
}
key = key_map.get(direction, "w")
send_key(KEYCODES[key], duration=duration, shift=run)
return f"move:{direction}:{duration:.1f}s" + (":run" if run else "")
def activate():
"""Interact with whatever is in front of us."""
send_key(KEYCODES["space"], duration=0.15)
return "activate"
def jump():
send_key(KEYCODES["space"], duration=0.05)
return "jump"
def attack():
send_key(KEYCODES["f"], duration=0.3)
return "attack"
def quicksave():
send_key(KEYCODES["f5"], duration=0.1)
return "quicksave"
# ═══════════════════════════════════════
# GOAL — Set by Timmy (Layer 3)
# ═══════════════════════════════════════
def read_goal():
"""Read current strategic goal from Timmy."""
try:
with open(GOAL_FILE, "r") as f:
return f.read().strip()
except FileNotFoundError:
return "explore"
def escalate(situation):
"""Write an escalation for Timmy to handle."""
entry = {
"situation": situation,
"timestamp": time.time(),
"iso": time.strftime("%Y-%m-%dT%H:%M:%S"),
}
with open(ESCALATION_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
print(f" ⬆ ESCALATED: {situation[:80]}")
# ═══════════════════════════════════════
# REFLEX — Optional Layer 2 (1-3B model)
# ═══════════════════════════════════════
def ask_reflex(perception, goal):
"""Query the reflex model for a tactical decision. Falls back to behavior tree."""
try:
import requests
prompt = (
f"Game: {perception.get('cell', '?')}. "
f"NPCs: {perception.get('npcs', [])}. "
f"Doors: {perception.get('doors', [])}. "
f"Items: {perception.get('items', [])}. "
f"Goal: {goal}. "
f"Pick ONE: explore, approach_npc, enter_door, pickup_item, fight, flee, activate, wait"
)
resp = requests.post(REFLEX_URL, json={
"model": "reflex",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 10,
"temperature": 0.3,
}, timeout=2)
if resp.status_code == 200:
text = resp.json()["choices"][0]["message"]["content"].strip().lower()
# Extract first word that matches a known action
for action in ["explore", "approach_npc", "enter_door", "pickup_item", "fight", "flee", "activate", "wait"]:
if action in text:
return action, "reflex"
except Exception:
pass # Reflex not running — fall through to behavior tree
return None, None
# ═══════════════════════════════════════
# BEHAVIOR TREE — Deterministic fallback
# ═══════════════════════════════════════
def parse_distance(entity_str):
"""Extract distance from entity string like 'Jobasha (dist:145.2)'."""
m = re.search(r'dist:([\d.]+)', entity_str)
return float(m.group(1)) if m else 9999.0
def decide(perception, goal, history):
"""Deterministic behavior tree. Returns (action_name, action_func_and_args)."""
npcs = perception.get("npcs", [])
doors = perception.get("doors", [])
items = perception.get("items", [])
mode = perception.get("mode", "idle")
# Stuck detection — same position for 4+ cycles
positions = [h.get("position", "") for h in history[-4:]]
stuck = len(positions) >= 4 and len(set(positions)) == 1 and positions[0] != ""
# Priority 1: If there's an NPC nearby, approach and activate
if npcs:
closest = min(npcs, key=parse_distance)
dist = parse_distance(closest)
name = closest.split("(")[0].strip()
if dist < 150:
# Close enough to interact
escalate(f"NPC '{name}' is right in front of us (dist:{dist:.0f}). Approaching to interact.")
return "approach_npc", lambda: activate()
elif dist < 500:
return "approach_npc", lambda: move("forward", duration=min(dist / 200, 3.0))
# Priority 2: If there's an item nearby, approach and pick up
if items:
closest = min(items, key=parse_distance)
dist = parse_distance(closest)
if dist < 200:
return "pickup_item", lambda: activate()
elif dist < 500:
return "pickup_item", lambda: move("forward", duration=min(dist / 200, 2.0))
# Priority 3: If there's a door nearby, go through it
if doors:
closest = min(doors, key=parse_distance)
dist = parse_distance(closest)
door_name = closest.split("(")[0].strip()
if dist < 200:
return "enter_door", lambda: activate()
elif dist < 600:
return "enter_door", lambda: move("forward", duration=min(dist / 200, 3.0))
# Priority 4: If stuck, turn randomly and try to unstick
if stuck:
direction = random.choice(["turn_left", "turn_right"])
duration = random.uniform(0.5, 1.5)
return "unstick", lambda: (move(direction, duration), move("forward", 2.0))
# Priority 5: Explore — walk forward with occasional turns
r = random.random()
if r < 0.65:
# Walk forward
duration = random.uniform(1.5, 4.0)
return "explore", lambda: move("forward", duration)
elif r < 0.80:
# Turn and walk
direction = random.choice(["turn_left", "turn_right"])
return "explore", lambda: (move(direction, random.uniform(0.3, 1.0)), move("forward", random.uniform(1.0, 3.0)))
elif r < 0.90:
# Run forward (cover ground faster)
return "explore", lambda: move("forward", random.uniform(2.0, 5.0), run=True)
else:
# Quicksave periodically
return "quicksave", lambda: quicksave()
# ═══════════════════════════════════════
# TRAJECTORY LOGGING
# ═══════════════════════════════════════
def log_trajectory(cycle, perception, goal, action_name, source, latency_ms):
"""Log a semantic trajectory to JSONL."""
today = time.strftime("%Y%m%d")
path = os.path.join(TRAJECTORY_DIR, f"{today}.jsonl")
entry = {
"cycle": cycle,
"timestamp": time.time(),
"iso": time.strftime("%Y-%m-%dT%H:%M:%S"),
"layer": source, # "pilot" | "reflex" | "timmy"
"perception": {
"cell": perception.get("cell", "?"),
"position": perception.get("position", "?"),
"yaw": perception.get("yaw", "?"),
"health": perception.get("health", "?"),
"npcs": perception.get("npcs", []),
"doors": perception.get("doors", []),
"items": perception.get("items", []),
"mode": perception.get("mode", "?"),
},
"goal": goal,
"action": action_name,
"latency_ms": latency_ms,
}
with open(path, "a") as f:
f.write(json.dumps(entry) + "\n")
# ═══════════════════════════════════════
# MAIN LOOP
# ═══════════════════════════════════════
running = True
def handle_sigint(sig, frame):
global running
print("\n\n🛑 Pilot stopping gracefully...")
running = False
signal.signal(signal.SIGINT, handle_sigint)
def main():
parser = argparse.ArgumentParser(description="Morrowind Pilot — Layer 1 Motor Control")
parser.add_argument("--interval", type=float, default=0.5, help="Seconds between cycles (default: 0.5)")
parser.add_argument("--cycles", type=int, default=0, help="Max cycles, 0=infinite (default: 0)")
parser.add_argument("--reflex", action="store_true", help="Try reflex model (layer 2) before behavior tree")
parser.add_argument("--quiet", action="store_true", help="Minimal output")
args = parser.parse_args()
print("═══════════════════════════════════════")
print(" MORROWIND PILOT — Layer 1")
print(" Motor control + trajectory logging")
print(f" Interval: {args.interval}s")
print(f" Reflex: {'enabled' if args.reflex else 'disabled (behavior tree only)'}")
print(f" Goal: {read_goal()}")
print(f" Logs: {TRAJECTORY_DIR}/")
print("═══════════════════════════════════════")
print()
print(" Ctrl+C to stop cleanly")
print(" Write goals to: ~/.timmy/morrowind/current_goal.txt")
print(" Escalations at: ~/.timmy/morrowind/escalation.txt")
print()
history = []
cycle = 0
autosave_counter = 0
while running:
cycle += 1
if args.cycles > 0 and cycle > args.cycles:
break
t0 = time.time()
# Perceive
perception = parse_latest_perception()
if not perception:
if not args.quiet:
print(f" [{cycle}] No perception data. Game running?")
time.sleep(args.interval * 2)
continue
# Read goal from Timmy
goal = read_goal()
# Track history for stuck detection
history.append({"position": perception.get("position", ""), "cycle": cycle})
if len(history) > 10:
history.pop(0)
# Decide — try reflex first, fall back to behavior tree
source = "pilot"
if args.reflex:
reflex_action, reflex_source = ask_reflex(perception, goal)
if reflex_action:
action_name = reflex_action
source = "reflex"
# Map reflex action to motor commands
action_map = {
"explore": lambda: move("forward", random.uniform(1.5, 3.0)),
"approach_npc": lambda: move("forward", 1.5),
"enter_door": lambda: (move("forward", 1.0), activate()),
"pickup_item": lambda: activate(),
"fight": lambda: attack(),
"flee": lambda: (move("turn_left", 1.0), move("forward", 3.0, run=True)),
"activate": lambda: activate(),
"wait": lambda: None,
}
action_fn = action_map.get(reflex_action, lambda: move("forward", 1.0))
else:
action_name, action_fn = decide(perception, goal, history)
else:
action_name, action_fn = decide(perception, goal, history)
# Execute
result = action_fn()
latency_ms = int((time.time() - t0) * 1000)
# Log
if not args.quiet:
cell = perception.get("cell", "?")[:25]
n_npcs = len(perception.get("npcs", []))
n_doors = len(perception.get("doors", []))
n_items = len(perception.get("items", []))
print(f" [{cycle}] {cell} | {action_name} ({source}) | NPCs:{n_npcs} Doors:{n_doors} Items:{n_items} | {latency_ms}ms")
log_trajectory(cycle, perception, goal, action_name, source, latency_ms)
# Autosave every 100 cycles
autosave_counter += 1
if autosave_counter >= 100:
quicksave()
autosave_counter = 0
if not args.quiet:
print(f" [{cycle}] ⚡ Autosaved")
# Wait
elapsed = time.time() - t0
sleep_time = max(0, args.interval - elapsed)
time.sleep(sleep_time)
# Final stats
today = time.strftime("%Y%m%d")
traj_file = os.path.join(TRAJECTORY_DIR, f"{today}.jsonl")
traj_count = 0
if os.path.exists(traj_file):
with open(traj_file) as f:
traj_count = sum(1 for _ in f)
print(f"\n═══════════════════════════════════════")
print(f" Pilot stopped after {cycle} cycles")
print(f" Trajectories logged: {traj_count}")
print(f" Log: {traj_file}")
print(f"═══════════════════════════════════════")
if __name__ == "__main__":
main()