From 8db40621f4e7efc85dc327e5cefee1bd7858263a Mon Sep 17 00:00:00 2001 From: Perplexity Computer Date: Fri, 27 Mar 2026 18:14:42 +0000 Subject: [PATCH] =?UTF-8?q?Add=20pilot.py=20=E2=80=94=20Layer=201=20determ?= =?UTF-8?q?inistic=20motor=20control?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Behavior tree: approach NPCs, enter doors, explore, unstick. Optional reflex model integration. Goal file for Timmy. Trajectory logging to JSONL for DPO training. Refs: #660, #661 --- morrowind/pilot.py | 459 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 459 insertions(+) create mode 100644 morrowind/pilot.py diff --git a/morrowind/pilot.py b/morrowind/pilot.py new file mode 100644 index 0000000..ee3f3d7 --- /dev/null +++ b/morrowind/pilot.py @@ -0,0 +1,459 @@ +#!/usr/bin/env python3 +""" +Morrowind Pilot — Layer 1: Deterministic motor control. + +Three-layer architecture: + Layer 3: Timmy (hermes4 14B) → strategy via current_goal.txt + Layer 2: Reflex (1-3B model) → tactics via reflex.py (optional) + Layer 1: Pilot (this script) → motor control, no LLM + +Reuses existing perception + action infrastructure from mcp_server.py +and local_brain.py. Runs standalone alongside Hermes TUI. + +Usage: + python3 ~/.timmy/morrowind/pilot.py + python3 ~/.timmy/morrowind/pilot.py --interval 0.5 --cycles 0 # 0 = infinite + +Logs every perception-action pair to trajectories/ for DPO training. +""" + +import argparse +import json +import math +import os +import random +import re +import signal +import sys +import time + +# ═══════════════════════════════════════ +# CONFIG +# ═══════════════════════════════════════ + +OPENMW_LOG = os.path.expanduser("~/Library/Preferences/openmw/openmw.log") +MORROWIND_DIR = os.path.expanduser("~/.timmy/morrowind") +GOAL_FILE = os.path.join(MORROWIND_DIR, "current_goal.txt") +ESCALATION_FILE = os.path.join(MORROWIND_DIR, "escalation.txt") +TRAJECTORY_DIR = os.path.join(MORROWIND_DIR, "trajectories") +REFLEX_URL = os.environ.get("REFLEX_URL", "http://localhost:8082/v1/chat/completions") + +os.makedirs(TRAJECTORY_DIR, exist_ok=True) + +# CGEvent key codes (from local_brain.py) +KEYCODES = { + "w": 13, "a": 0, "s": 1, "d": 2, + "space": 49, "escape": 53, "return": 36, + "e": 14, "f": 3, "q": 12, "j": 38, "t": 20, + "f5": 96, "f9": 101, + "left": 123, "right": 124, "up": 126, "down": 125, +} + +# ═══════════════════════════════════════ +# PERCEPTION — Reused from mcp_server.py +# ═══════════════════════════════════════ + +def parse_latest_perception(): + """Parse the most recent perception block from the OpenMW log.""" + try: + with open(OPENMW_LOG, "r") as f: + content = f.read() + except FileNotFoundError: + return None + + blocks = re.findall( + r"=== TIMMY PERCEPTION ===(.*?)=== END PERCEPTION ===", + content, re.DOTALL + ) + if not blocks: + return None + + block = blocks[-1] + state = {"npcs": [], "doors": [], "items": []} + + for line in block.strip().split("\n"): + line = line.strip() + if "]:\t" in line: + line = line.split("]:\t", 1)[1] + + if line.startswith("Cell:"): + state["cell"] = line.split(":", 1)[1].strip() + elif line.startswith("Pos:"): + state["position"] = line.split(":", 1)[1].strip() + elif line.startswith("Yaw:"): + state["yaw"] = line.split(":", 1)[1].strip() + elif line.startswith("HP:"): + state["health"] = line.split(":", 1)[1].strip() + elif line.startswith("MP:"): + state["magicka"] = line.split(":", 1)[1].strip() + elif line.startswith("FT:"): + state["fatigue"] = line.split(":", 1)[1].strip() + elif line.startswith("Mode:"): + state["mode"] = line.split(":", 1)[1].strip() + elif line.startswith("Time:"): + state["game_time"] = line.split(":", 1)[1].strip() + elif line.startswith("NPC:"): + state["npcs"].append(line[4:].strip()) + elif line.startswith("Door:"): + state["doors"].append(line[5:].strip()) + elif line.startswith("Item:"): + state["items"].append(line[5:].strip()) + + return state + + +# ═══════════════════════════════════════ +# ACTIONS — CGEvent keypresses +# ═══════════════════════════════════════ + +def send_key(keycode, duration=0.0, shift=False): + """Send a keypress to the game via CGEvent.""" + import Quartz + + flags = Quartz.kCGEventFlagMaskShift if shift else 0 + down = Quartz.CGEventCreateKeyboardEvent(None, keycode, True) + Quartz.CGEventSetFlags(down, flags) + Quartz.CGEventPost(Quartz.kCGHIDEventTap, down) + + if duration > 0: + time.sleep(duration) + + up = Quartz.CGEventCreateKeyboardEvent(None, keycode, False) + Quartz.CGEventSetFlags(up, 0) + Quartz.CGEventPost(Quartz.kCGHIDEventTap, up) + + +def move(direction, duration=1.0, run=False): + """Move the character.""" + key_map = { + "forward": "w", "backward": "s", + "left": "a", "right": "d", + "turn_left": "left", "turn_right": "right", + } + key = key_map.get(direction, "w") + send_key(KEYCODES[key], duration=duration, shift=run) + return f"move:{direction}:{duration:.1f}s" + (":run" if run else "") + + +def activate(): + """Interact with whatever is in front of us.""" + send_key(KEYCODES["space"], duration=0.15) + return "activate" + + +def jump(): + send_key(KEYCODES["space"], duration=0.05) + return "jump" + + +def attack(): + send_key(KEYCODES["f"], duration=0.3) + return "attack" + + +def quicksave(): + send_key(KEYCODES["f5"], duration=0.1) + return "quicksave" + + +# ═══════════════════════════════════════ +# GOAL — Set by Timmy (Layer 3) +# ═══════════════════════════════════════ + +def read_goal(): + """Read current strategic goal from Timmy.""" + try: + with open(GOAL_FILE, "r") as f: + return f.read().strip() + except FileNotFoundError: + return "explore" + + +def escalate(situation): + """Write an escalation for Timmy to handle.""" + entry = { + "situation": situation, + "timestamp": time.time(), + "iso": time.strftime("%Y-%m-%dT%H:%M:%S"), + } + with open(ESCALATION_FILE, "a") as f: + f.write(json.dumps(entry) + "\n") + print(f" ⬆ ESCALATED: {situation[:80]}") + + +# ═══════════════════════════════════════ +# REFLEX — Optional Layer 2 (1-3B model) +# ═══════════════════════════════════════ + +def ask_reflex(perception, goal): + """Query the reflex model for a tactical decision. Falls back to behavior tree.""" + try: + import requests + prompt = ( + f"Game: {perception.get('cell', '?')}. " + f"NPCs: {perception.get('npcs', [])}. " + f"Doors: {perception.get('doors', [])}. " + f"Items: {perception.get('items', [])}. " + f"Goal: {goal}. " + f"Pick ONE: explore, approach_npc, enter_door, pickup_item, fight, flee, activate, wait" + ) + resp = requests.post(REFLEX_URL, json={ + "model": "reflex", + "messages": [{"role": "user", "content": prompt}], + "max_tokens": 10, + "temperature": 0.3, + }, timeout=2) + if resp.status_code == 200: + text = resp.json()["choices"][0]["message"]["content"].strip().lower() + # Extract first word that matches a known action + for action in ["explore", "approach_npc", "enter_door", "pickup_item", "fight", "flee", "activate", "wait"]: + if action in text: + return action, "reflex" + except Exception: + pass # Reflex not running — fall through to behavior tree + + return None, None + + +# ═══════════════════════════════════════ +# BEHAVIOR TREE — Deterministic fallback +# ═══════════════════════════════════════ + +def parse_distance(entity_str): + """Extract distance from entity string like 'Jobasha (dist:145.2)'.""" + m = re.search(r'dist:([\d.]+)', entity_str) + return float(m.group(1)) if m else 9999.0 + + +def decide(perception, goal, history): + """Deterministic behavior tree. Returns (action_name, action_func_and_args).""" + + npcs = perception.get("npcs", []) + doors = perception.get("doors", []) + items = perception.get("items", []) + mode = perception.get("mode", "idle") + + # Stuck detection — same position for 4+ cycles + positions = [h.get("position", "") for h in history[-4:]] + stuck = len(positions) >= 4 and len(set(positions)) == 1 and positions[0] != "" + + # Priority 1: If there's an NPC nearby, approach and activate + if npcs: + closest = min(npcs, key=parse_distance) + dist = parse_distance(closest) + name = closest.split("(")[0].strip() + if dist < 150: + # Close enough to interact + escalate(f"NPC '{name}' is right in front of us (dist:{dist:.0f}). Approaching to interact.") + return "approach_npc", lambda: activate() + elif dist < 500: + return "approach_npc", lambda: move("forward", duration=min(dist / 200, 3.0)) + + # Priority 2: If there's an item nearby, approach and pick up + if items: + closest = min(items, key=parse_distance) + dist = parse_distance(closest) + if dist < 200: + return "pickup_item", lambda: activate() + elif dist < 500: + return "pickup_item", lambda: move("forward", duration=min(dist / 200, 2.0)) + + # Priority 3: If there's a door nearby, go through it + if doors: + closest = min(doors, key=parse_distance) + dist = parse_distance(closest) + door_name = closest.split("(")[0].strip() + if dist < 200: + return "enter_door", lambda: activate() + elif dist < 600: + return "enter_door", lambda: move("forward", duration=min(dist / 200, 3.0)) + + # Priority 4: If stuck, turn randomly and try to unstick + if stuck: + direction = random.choice(["turn_left", "turn_right"]) + duration = random.uniform(0.5, 1.5) + return "unstick", lambda: (move(direction, duration), move("forward", 2.0)) + + # Priority 5: Explore — walk forward with occasional turns + r = random.random() + if r < 0.65: + # Walk forward + duration = random.uniform(1.5, 4.0) + return "explore", lambda: move("forward", duration) + elif r < 0.80: + # Turn and walk + direction = random.choice(["turn_left", "turn_right"]) + return "explore", lambda: (move(direction, random.uniform(0.3, 1.0)), move("forward", random.uniform(1.0, 3.0))) + elif r < 0.90: + # Run forward (cover ground faster) + return "explore", lambda: move("forward", random.uniform(2.0, 5.0), run=True) + else: + # Quicksave periodically + return "quicksave", lambda: quicksave() + + +# ═══════════════════════════════════════ +# TRAJECTORY LOGGING +# ═══════════════════════════════════════ + +def log_trajectory(cycle, perception, goal, action_name, source, latency_ms): + """Log a semantic trajectory to JSONL.""" + today = time.strftime("%Y%m%d") + path = os.path.join(TRAJECTORY_DIR, f"{today}.jsonl") + + entry = { + "cycle": cycle, + "timestamp": time.time(), + "iso": time.strftime("%Y-%m-%dT%H:%M:%S"), + "layer": source, # "pilot" | "reflex" | "timmy" + "perception": { + "cell": perception.get("cell", "?"), + "position": perception.get("position", "?"), + "yaw": perception.get("yaw", "?"), + "health": perception.get("health", "?"), + "npcs": perception.get("npcs", []), + "doors": perception.get("doors", []), + "items": perception.get("items", []), + "mode": perception.get("mode", "?"), + }, + "goal": goal, + "action": action_name, + "latency_ms": latency_ms, + } + + with open(path, "a") as f: + f.write(json.dumps(entry) + "\n") + + +# ═══════════════════════════════════════ +# MAIN LOOP +# ═══════════════════════════════════════ + +running = True + +def handle_sigint(sig, frame): + global running + print("\n\n🛑 Pilot stopping gracefully...") + running = False + +signal.signal(signal.SIGINT, handle_sigint) + + +def main(): + parser = argparse.ArgumentParser(description="Morrowind Pilot — Layer 1 Motor Control") + parser.add_argument("--interval", type=float, default=0.5, help="Seconds between cycles (default: 0.5)") + parser.add_argument("--cycles", type=int, default=0, help="Max cycles, 0=infinite (default: 0)") + parser.add_argument("--reflex", action="store_true", help="Try reflex model (layer 2) before behavior tree") + parser.add_argument("--quiet", action="store_true", help="Minimal output") + args = parser.parse_args() + + print("═══════════════════════════════════════") + print(" MORROWIND PILOT — Layer 1") + print(" Motor control + trajectory logging") + print(f" Interval: {args.interval}s") + print(f" Reflex: {'enabled' if args.reflex else 'disabled (behavior tree only)'}") + print(f" Goal: {read_goal()}") + print(f" Logs: {TRAJECTORY_DIR}/") + print("═══════════════════════════════════════") + print() + print(" Ctrl+C to stop cleanly") + print(" Write goals to: ~/.timmy/morrowind/current_goal.txt") + print(" Escalations at: ~/.timmy/morrowind/escalation.txt") + print() + + history = [] + cycle = 0 + autosave_counter = 0 + + while running: + cycle += 1 + if args.cycles > 0 and cycle > args.cycles: + break + + t0 = time.time() + + # Perceive + perception = parse_latest_perception() + if not perception: + if not args.quiet: + print(f" [{cycle}] No perception data. Game running?") + time.sleep(args.interval * 2) + continue + + # Read goal from Timmy + goal = read_goal() + + # Track history for stuck detection + history.append({"position": perception.get("position", ""), "cycle": cycle}) + if len(history) > 10: + history.pop(0) + + # Decide — try reflex first, fall back to behavior tree + source = "pilot" + if args.reflex: + reflex_action, reflex_source = ask_reflex(perception, goal) + if reflex_action: + action_name = reflex_action + source = "reflex" + # Map reflex action to motor commands + action_map = { + "explore": lambda: move("forward", random.uniform(1.5, 3.0)), + "approach_npc": lambda: move("forward", 1.5), + "enter_door": lambda: (move("forward", 1.0), activate()), + "pickup_item": lambda: activate(), + "fight": lambda: attack(), + "flee": lambda: (move("turn_left", 1.0), move("forward", 3.0, run=True)), + "activate": lambda: activate(), + "wait": lambda: None, + } + action_fn = action_map.get(reflex_action, lambda: move("forward", 1.0)) + else: + action_name, action_fn = decide(perception, goal, history) + else: + action_name, action_fn = decide(perception, goal, history) + + # Execute + result = action_fn() + + latency_ms = int((time.time() - t0) * 1000) + + # Log + if not args.quiet: + cell = perception.get("cell", "?")[:25] + n_npcs = len(perception.get("npcs", [])) + n_doors = len(perception.get("doors", [])) + n_items = len(perception.get("items", [])) + print(f" [{cycle}] {cell} | {action_name} ({source}) | NPCs:{n_npcs} Doors:{n_doors} Items:{n_items} | {latency_ms}ms") + + log_trajectory(cycle, perception, goal, action_name, source, latency_ms) + + # Autosave every 100 cycles + autosave_counter += 1 + if autosave_counter >= 100: + quicksave() + autosave_counter = 0 + if not args.quiet: + print(f" [{cycle}] ⚡ Autosaved") + + # Wait + elapsed = time.time() - t0 + sleep_time = max(0, args.interval - elapsed) + time.sleep(sleep_time) + + # Final stats + today = time.strftime("%Y%m%d") + traj_file = os.path.join(TRAJECTORY_DIR, f"{today}.jsonl") + traj_count = 0 + if os.path.exists(traj_file): + with open(traj_file) as f: + traj_count = sum(1 for _ in f) + + print(f"\n═══════════════════════════════════════") + print(f" Pilot stopped after {cycle} cycles") + print(f" Trajectories logged: {traj_count}") + print(f" Log: {traj_file}") + print(f"═══════════════════════════════════════") + + +if __name__ == "__main__": + main()