Behavior tree: approach NPCs, enter doors, explore, unstick. Optional reflex model integration. Goal file for Timmy. Trajectory logging to JSONL for DPO training. Refs: #660, #661
460 lines
17 KiB
Python
460 lines
17 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Morrowind Pilot — Layer 1: Deterministic motor control.
|
|
|
|
Three-layer architecture:
|
|
Layer 3: Timmy (hermes4 14B) → strategy via current_goal.txt
|
|
Layer 2: Reflex (1-3B model) → tactics via reflex.py (optional)
|
|
Layer 1: Pilot (this script) → motor control, no LLM
|
|
|
|
Reuses existing perception + action infrastructure from mcp_server.py
|
|
and local_brain.py. Runs standalone alongside Hermes TUI.
|
|
|
|
Usage:
|
|
python3 ~/.timmy/morrowind/pilot.py
|
|
python3 ~/.timmy/morrowind/pilot.py --interval 0.5 --cycles 0 # 0 = infinite
|
|
|
|
Logs every perception-action pair to trajectories/ for DPO training.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import math
|
|
import os
|
|
import random
|
|
import re
|
|
import signal
|
|
import sys
|
|
import time
|
|
|
|
# ═══════════════════════════════════════
|
|
# CONFIG
|
|
# ═══════════════════════════════════════
|
|
|
|
OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log")
|
|
MORROWIND_DIR = os.path.expanduser("~/.timmy/morrowind")
|
|
GOAL_FILE = os.path.join(MORROWIND_DIR, "current_goal.txt")
|
|
ESCALATION_FILE = os.path.join(MORROWIND_DIR, "escalation.txt")
|
|
TRAJECTORY_DIR = os.path.join(MORROWIND_DIR, "trajectories")
|
|
REFLEX_URL = os.environ.get("REFLEX_URL", "http://localhost:8082/v1/chat/completions")
|
|
|
|
os.makedirs(TRAJECTORY_DIR, exist_ok=True)
|
|
|
|
# CGEvent key codes (from local_brain.py)
|
|
KEYCODES = {
|
|
"w": 13, "a": 0, "s": 1, "d": 2,
|
|
"space": 49, "escape": 53, "return": 36,
|
|
"e": 14, "f": 3, "q": 12, "j": 38, "t": 20,
|
|
"f5": 96, "f9": 101,
|
|
"left": 123, "right": 124, "up": 126, "down": 125,
|
|
}
|
|
|
|
# ═══════════════════════════════════════
|
|
# PERCEPTION — Reused from mcp_server.py
|
|
# ═══════════════════════════════════════
|
|
|
|
def parse_latest_perception():
|
|
"""Parse the most recent perception block from the OpenMW log."""
|
|
try:
|
|
with open(OPENMW_LOG, "r") as f:
|
|
content = f.read()
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
blocks = re.findall(
|
|
r"=== TIMMY PERCEPTION ===(.*?)=== END PERCEPTION ===",
|
|
content, re.DOTALL
|
|
)
|
|
if not blocks:
|
|
return None
|
|
|
|
block = blocks[-1]
|
|
state = {"npcs": [], "doors": [], "items": []}
|
|
|
|
for line in block.strip().split("\n"):
|
|
line = line.strip()
|
|
if "]:\t" in line:
|
|
line = line.split("]:\t", 1)[1]
|
|
|
|
if line.startswith("Cell:"):
|
|
state["cell"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Pos:"):
|
|
state["position"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Yaw:"):
|
|
state["yaw"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("HP:"):
|
|
state["health"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("MP:"):
|
|
state["magicka"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("FT:"):
|
|
state["fatigue"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Mode:"):
|
|
state["mode"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("Time:"):
|
|
state["game_time"] = line.split(":", 1)[1].strip()
|
|
elif line.startswith("NPC:"):
|
|
state["npcs"].append(line[4:].strip())
|
|
elif line.startswith("Door:"):
|
|
state["doors"].append(line[5:].strip())
|
|
elif line.startswith("Item:"):
|
|
state["items"].append(line[5:].strip())
|
|
|
|
return state
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# ACTIONS — CGEvent keypresses
|
|
# ═══════════════════════════════════════
|
|
|
|
def send_key(keycode, duration=0.0, shift=False):
|
|
"""Send a keypress to the game via CGEvent."""
|
|
import Quartz
|
|
|
|
flags = Quartz.kCGEventFlagMaskShift if shift else 0
|
|
down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True)
|
|
Quartz.CGEventSetFlags(down, flags)
|
|
Quartz.CGEventPost(Quartz.kCGHIDEventTap, down)
|
|
|
|
if duration > 0:
|
|
time.sleep(duration)
|
|
|
|
up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False)
|
|
Quartz.CGEventSetFlags(up, 0)
|
|
Quartz.CGEventPost(Quartz.kCGHIDEventTap, up)
|
|
|
|
|
|
def move(direction, duration=1.0, run=False):
|
|
"""Move the character."""
|
|
key_map = {
|
|
"forward": "w", "backward": "s",
|
|
"left": "a", "right": "d",
|
|
"turn_left": "left", "turn_right": "right",
|
|
}
|
|
key = key_map.get(direction, "w")
|
|
send_key(KEYCODES[key], duration=duration, shift=run)
|
|
return f"move:{direction}:{duration:.1f}s" + (":run" if run else "")
|
|
|
|
|
|
def activate():
|
|
"""Interact with whatever is in front of us."""
|
|
send_key(KEYCODES["space"], duration=0.15)
|
|
return "activate"
|
|
|
|
|
|
def jump():
|
|
send_key(KEYCODES["space"], duration=0.05)
|
|
return "jump"
|
|
|
|
|
|
def attack():
|
|
send_key(KEYCODES["f"], duration=0.3)
|
|
return "attack"
|
|
|
|
|
|
def quicksave():
|
|
send_key(KEYCODES["f5"], duration=0.1)
|
|
return "quicksave"
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# GOAL — Set by Timmy (Layer 3)
|
|
# ═══════════════════════════════════════
|
|
|
|
def read_goal():
|
|
"""Read current strategic goal from Timmy."""
|
|
try:
|
|
with open(GOAL_FILE, "r") as f:
|
|
return f.read().strip()
|
|
except FileNotFoundError:
|
|
return "explore"
|
|
|
|
|
|
def escalate(situation):
|
|
"""Write an escalation for Timmy to handle."""
|
|
entry = {
|
|
"situation": situation,
|
|
"timestamp": time.time(),
|
|
"iso": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
}
|
|
with open(ESCALATION_FILE, "a") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
print(f" ⬆ ESCALATED: {situation[:80]}")
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# REFLEX — Optional Layer 2 (1-3B model)
|
|
# ═══════════════════════════════════════
|
|
|
|
def ask_reflex(perception, goal):
|
|
"""Query the reflex model for a tactical decision. Falls back to behavior tree."""
|
|
try:
|
|
import requests
|
|
prompt = (
|
|
f"Game: {perception.get('cell', '?')}. "
|
|
f"NPCs: {perception.get('npcs', [])}. "
|
|
f"Doors: {perception.get('doors', [])}. "
|
|
f"Items: {perception.get('items', [])}. "
|
|
f"Goal: {goal}. "
|
|
f"Pick ONE: explore, approach_npc, enter_door, pickup_item, fight, flee, activate, wait"
|
|
)
|
|
resp = requests.post(REFLEX_URL, json={
|
|
"model": "reflex",
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": 10,
|
|
"temperature": 0.3,
|
|
}, timeout=2)
|
|
if resp.status_code == 200:
|
|
text = resp.json()["choices"][0]["message"]["content"].strip().lower()
|
|
# Extract first word that matches a known action
|
|
for action in ["explore", "approach_npc", "enter_door", "pickup_item", "fight", "flee", "activate", "wait"]:
|
|
if action in text:
|
|
return action, "reflex"
|
|
except Exception:
|
|
pass # Reflex not running — fall through to behavior tree
|
|
|
|
return None, None
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# BEHAVIOR TREE — Deterministic fallback
|
|
# ═══════════════════════════════════════
|
|
|
|
def parse_distance(entity_str):
|
|
"""Extract distance from entity string like 'Jobasha (dist:145.2)'."""
|
|
m = re.search(r'dist:([\d.]+)', entity_str)
|
|
return float(m.group(1)) if m else 9999.0
|
|
|
|
|
|
def decide(perception, goal, history):
|
|
"""Deterministic behavior tree. Returns (action_name, action_func_and_args)."""
|
|
|
|
npcs = perception.get("npcs", [])
|
|
doors = perception.get("doors", [])
|
|
items = perception.get("items", [])
|
|
mode = perception.get("mode", "idle")
|
|
|
|
# Stuck detection — same position for 4+ cycles
|
|
positions = [h.get("position", "") for h in history[-4:]]
|
|
stuck = len(positions) >= 4 and len(set(positions)) == 1 and positions[0] != ""
|
|
|
|
# Priority 1: If there's an NPC nearby, approach and activate
|
|
if npcs:
|
|
closest = min(npcs, key=parse_distance)
|
|
dist = parse_distance(closest)
|
|
name = closest.split("(")[0].strip()
|
|
if dist < 150:
|
|
# Close enough to interact
|
|
escalate(f"NPC '{name}' is right in front of us (dist:{dist:.0f}). Approaching to interact.")
|
|
return "approach_npc", lambda: activate()
|
|
elif dist < 500:
|
|
return "approach_npc", lambda: move("forward", duration=min(dist / 200, 3.0))
|
|
|
|
# Priority 2: If there's an item nearby, approach and pick up
|
|
if items:
|
|
closest = min(items, key=parse_distance)
|
|
dist = parse_distance(closest)
|
|
if dist < 200:
|
|
return "pickup_item", lambda: activate()
|
|
elif dist < 500:
|
|
return "pickup_item", lambda: move("forward", duration=min(dist / 200, 2.0))
|
|
|
|
# Priority 3: If there's a door nearby, go through it
|
|
if doors:
|
|
closest = min(doors, key=parse_distance)
|
|
dist = parse_distance(closest)
|
|
door_name = closest.split("(")[0].strip()
|
|
if dist < 200:
|
|
return "enter_door", lambda: activate()
|
|
elif dist < 600:
|
|
return "enter_door", lambda: move("forward", duration=min(dist / 200, 3.0))
|
|
|
|
# Priority 4: If stuck, turn randomly and try to unstick
|
|
if stuck:
|
|
direction = random.choice(["turn_left", "turn_right"])
|
|
duration = random.uniform(0.5, 1.5)
|
|
return "unstick", lambda: (move(direction, duration), move("forward", 2.0))
|
|
|
|
# Priority 5: Explore — walk forward with occasional turns
|
|
r = random.random()
|
|
if r < 0.65:
|
|
# Walk forward
|
|
duration = random.uniform(1.5, 4.0)
|
|
return "explore", lambda: move("forward", duration)
|
|
elif r < 0.80:
|
|
# Turn and walk
|
|
direction = random.choice(["turn_left", "turn_right"])
|
|
return "explore", lambda: (move(direction, random.uniform(0.3, 1.0)), move("forward", random.uniform(1.0, 3.0)))
|
|
elif r < 0.90:
|
|
# Run forward (cover ground faster)
|
|
return "explore", lambda: move("forward", random.uniform(2.0, 5.0), run=True)
|
|
else:
|
|
# Quicksave periodically
|
|
return "quicksave", lambda: quicksave()
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# TRAJECTORY LOGGING
|
|
# ═══════════════════════════════════════
|
|
|
|
def log_trajectory(cycle, perception, goal, action_name, source, latency_ms):
|
|
"""Log a semantic trajectory to JSONL."""
|
|
today = time.strftime("%Y%m%d")
|
|
path = os.path.join(TRAJECTORY_DIR, f"{today}.jsonl")
|
|
|
|
entry = {
|
|
"cycle": cycle,
|
|
"timestamp": time.time(),
|
|
"iso": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
|
"layer": source, # "pilot" | "reflex" | "timmy"
|
|
"perception": {
|
|
"cell": perception.get("cell", "?"),
|
|
"position": perception.get("position", "?"),
|
|
"yaw": perception.get("yaw", "?"),
|
|
"health": perception.get("health", "?"),
|
|
"npcs": perception.get("npcs", []),
|
|
"doors": perception.get("doors", []),
|
|
"items": perception.get("items", []),
|
|
"mode": perception.get("mode", "?"),
|
|
},
|
|
"goal": goal,
|
|
"action": action_name,
|
|
"latency_ms": latency_ms,
|
|
}
|
|
|
|
with open(path, "a") as f:
|
|
f.write(json.dumps(entry) + "\n")
|
|
|
|
|
|
# ═══════════════════════════════════════
|
|
# MAIN LOOP
|
|
# ═══════════════════════════════════════
|
|
|
|
running = True
|
|
|
|
def handle_sigint(sig, frame):
|
|
global running
|
|
print("\n\n🛑 Pilot stopping gracefully...")
|
|
running = False
|
|
|
|
signal.signal(signal.SIGINT, handle_sigint)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Morrowind Pilot — Layer 1 Motor Control")
|
|
parser.add_argument("--interval", type=float, default=0.5, help="Seconds between cycles (default: 0.5)")
|
|
parser.add_argument("--cycles", type=int, default=0, help="Max cycles, 0=infinite (default: 0)")
|
|
parser.add_argument("--reflex", action="store_true", help="Try reflex model (layer 2) before behavior tree")
|
|
parser.add_argument("--quiet", action="store_true", help="Minimal output")
|
|
args = parser.parse_args()
|
|
|
|
print("═══════════════════════════════════════")
|
|
print(" MORROWIND PILOT — Layer 1")
|
|
print(" Motor control + trajectory logging")
|
|
print(f" Interval: {args.interval}s")
|
|
print(f" Reflex: {'enabled' if args.reflex else 'disabled (behavior tree only)'}")
|
|
print(f" Goal: {read_goal()}")
|
|
print(f" Logs: {TRAJECTORY_DIR}/")
|
|
print("═══════════════════════════════════════")
|
|
print()
|
|
print(" Ctrl+C to stop cleanly")
|
|
print(" Write goals to: ~/.timmy/morrowind/current_goal.txt")
|
|
print(" Escalations at: ~/.timmy/morrowind/escalation.txt")
|
|
print()
|
|
|
|
history = []
|
|
cycle = 0
|
|
autosave_counter = 0
|
|
|
|
while running:
|
|
cycle += 1
|
|
if args.cycles > 0 and cycle > args.cycles:
|
|
break
|
|
|
|
t0 = time.time()
|
|
|
|
# Perceive
|
|
perception = parse_latest_perception()
|
|
if not perception:
|
|
if not args.quiet:
|
|
print(f" [{cycle}] No perception data. Game running?")
|
|
time.sleep(args.interval * 2)
|
|
continue
|
|
|
|
# Read goal from Timmy
|
|
goal = read_goal()
|
|
|
|
# Track history for stuck detection
|
|
history.append({"position": perception.get("position", ""), "cycle": cycle})
|
|
if len(history) > 10:
|
|
history.pop(0)
|
|
|
|
# Decide — try reflex first, fall back to behavior tree
|
|
source = "pilot"
|
|
if args.reflex:
|
|
reflex_action, reflex_source = ask_reflex(perception, goal)
|
|
if reflex_action:
|
|
action_name = reflex_action
|
|
source = "reflex"
|
|
# Map reflex action to motor commands
|
|
action_map = {
|
|
"explore": lambda: move("forward", random.uniform(1.5, 3.0)),
|
|
"approach_npc": lambda: move("forward", 1.5),
|
|
"enter_door": lambda: (move("forward", 1.0), activate()),
|
|
"pickup_item": lambda: activate(),
|
|
"fight": lambda: attack(),
|
|
"flee": lambda: (move("turn_left", 1.0), move("forward", 3.0, run=True)),
|
|
"activate": lambda: activate(),
|
|
"wait": lambda: None,
|
|
}
|
|
action_fn = action_map.get(reflex_action, lambda: move("forward", 1.0))
|
|
else:
|
|
action_name, action_fn = decide(perception, goal, history)
|
|
else:
|
|
action_name, action_fn = decide(perception, goal, history)
|
|
|
|
# Execute
|
|
result = action_fn()
|
|
|
|
latency_ms = int((time.time() - t0) * 1000)
|
|
|
|
# Log
|
|
if not args.quiet:
|
|
cell = perception.get("cell", "?")[:25]
|
|
n_npcs = len(perception.get("npcs", []))
|
|
n_doors = len(perception.get("doors", []))
|
|
n_items = len(perception.get("items", []))
|
|
print(f" [{cycle}] {cell} | {action_name} ({source}) | NPCs:{n_npcs} Doors:{n_doors} Items:{n_items} | {latency_ms}ms")
|
|
|
|
log_trajectory(cycle, perception, goal, action_name, source, latency_ms)
|
|
|
|
# Autosave every 100 cycles
|
|
autosave_counter += 1
|
|
if autosave_counter >= 100:
|
|
quicksave()
|
|
autosave_counter = 0
|
|
if not args.quiet:
|
|
print(f" [{cycle}] ⚡ Autosaved")
|
|
|
|
# Wait
|
|
elapsed = time.time() - t0
|
|
sleep_time = max(0, args.interval - elapsed)
|
|
time.sleep(sleep_time)
|
|
|
|
# Final stats
|
|
today = time.strftime("%Y%m%d")
|
|
traj_file = os.path.join(TRAJECTORY_DIR, f"{today}.jsonl")
|
|
traj_count = 0
|
|
if os.path.exists(traj_file):
|
|
with open(traj_file) as f:
|
|
traj_count = sum(1 for _ in f)
|
|
|
|
print(f"\n═══════════════════════════════════════")
|
|
print(f" Pilot stopped after {cycle} cycles")
|
|
print(f" Trajectories logged: {traj_count}")
|
|
print(f" Log: {traj_file}")
|
|
print(f"═══════════════════════════════════════")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|