Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
a091a5c9bf fix: give tower NPC room movement purposeful goals (#517)
Some checks failed
Self-Healing Smoke / self-healing-smoke (pull_request) Failing after 26s
Smoke Test / smoke (pull_request) Failing after 27s
Agent PR Gate / gate (pull_request) Failing after 37s
Agent PR Gate / report (pull_request) Successful in 8s
2026-04-22 11:51:47 -04:00
Alexander Whitestone
4afde6da78 test: add tower NPC movement acceptance coverage (#517) 2026-04-22 11:47:21 -04:00
4 changed files with 403 additions and 1890 deletions

View File

@@ -454,23 +454,112 @@ class TimmyAI:
class NPCAI:
"""AI for non-player characters. They make choices based on goals."""
GOAL_ROOM_TARGETS = {
"Marcus": {
"sit": "Garden",
"speak_truth": "Threshold",
"remember": "Bridge",
},
"Bezalel": {
"forge": "Forge",
"tend_fire": "Forge",
"create_key": "Forge",
},
"Allegro": {
"oversee": "Threshold",
"keep_time": "Tower",
"check_tunnel": "Bridge",
},
"Ezra": {
"study": "Tower",
"read_whiteboard": "Tower",
"find_pattern": "Tower",
},
"Gemini": {
"observe": "Threshold",
"tend_garden": "Garden",
"listen": "Garden",
},
"Claude": {
"inspect": "Threshold",
"organize": "Tower",
"enforce_order": "Bridge",
},
"ClawCode": {
"forge": "Forge",
"test_edge": "Bridge",
"build_weapon": "Forge",
},
"Kimi": {
"contemplate": "Garden",
"read": "Tower",
"remember": "Bridge",
},
}
GOAL_CYCLES = {
"Marcus": ("sit", "speak_truth", "remember"),
"Allegro": ("oversee", "keep_time", "check_tunnel"),
"Claude": ("inspect", "organize", "enforce_order"),
"ClawCode": ("test_edge", "forge", "build_weapon"),
"Kimi": ("contemplate", "read", "remember"),
}
def __init__(self, world):
self.world = world
def _available_targets(self, available, prefix):
return [a.split(":", 1)[1] for a in available if a.startswith(f"{prefix}:")]
def _target_room_for(self, char_name, goal):
return self.GOAL_ROOM_TARGETS.get(char_name, {}).get(goal)
def _next_direction_toward(self, current_room, target_room):
if current_room == target_room:
return None
frontier = [(current_room, [])]
seen = {current_room}
while frontier:
room, path = frontier.pop(0)
if room == target_room:
return path[0] if path else None
for direction, dest in self.world.rooms[room].get("connections", {}).items():
if dest not in seen:
seen.add(dest)
frontier.append((dest, path + [direction]))
return None
def _move_toward_goal(self, room, target_room):
direction = self._next_direction_toward(room, target_room)
return f"move:{direction}" if direction else None
def _advance_goal_cycle(self, char_name, char):
cycle = self.GOAL_CYCLES.get(char_name)
if not cycle or self.world.tick <= 0:
return
goal = char.get("active_goal")
if goal not in cycle:
return
target_room = self._target_room_for(char_name, goal)
if char.get("room") != target_room:
return
if self.world.tick % 12 != 0:
return
index = cycle.index(goal)
char["active_goal"] = cycle[(index + 1) % len(cycle)]
def make_choice(self, char_name):
"""Make a choice for this NPC this tick."""
char = self.world.characters[char_name]
self._advance_goal_cycle(char_name, char)
room = char["room"]
available = ActionSystem.get_available_actions(char_name, self.world)
# If low energy, rest
if char["energy"] <= 1:
return "rest"
# Goal-driven behavior
goal = char["active_goal"]
if char_name == "Marcus":
return self._marcus_choice(char, room, available)
elif char_name == "Bezalel":
@@ -487,66 +576,96 @@ class NPCAI:
return self._clawcode_choice(char, room, available)
elif char_name == "Kimi":
return self._kimi_choice(char, room, available)
return "rest"
def _marcus_choice(self, char, room, available):
goal = char.get("active_goal", "sit")
target_room = self._target_room_for("Marcus", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
others = self._available_targets(available, "speak")
if goal == "speak_truth" and others:
return f"speak:{random.choice(others)}"
if goal == "remember" and room == "Bridge":
return random.choice(["examine", "rest"])
if room == "Garden" and random.random() < 0.7:
return "rest"
if room != "Garden":
return "move:west"
# Speak to someone if possible
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
if others and random.random() < 0.4:
return f"speak:{random.choice(others)}"
return "rest"
def _bezalel_choice(self, char, room, available):
target_room = self._target_room_for("Bezalel", char.get("active_goal", "forge"))
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
if room == "Forge" and self.world.rooms["Forge"]["fire"] == "glowing":
return random.choice(["forge", "rest"] if char["energy"] > 2 else ["rest"])
if room != "Forge":
return "move:west"
if random.random() < 0.3:
return "tend_fire"
return "forge"
def _kimi_choice(self, char, room, available):
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
goal = char.get("active_goal", "contemplate")
target_room = self._target_room_for("Kimi", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
others = self._available_targets(available, "speak")
if goal == "read" and room == "Tower":
return "study" if char["energy"] > 2 else "rest"
if room == "Garden" and others and random.random() < 0.3:
return f"speak:{random.choice(others)}"
if room == "Tower":
return "study" if char["energy"] > 2 else "rest"
return "move:east" # Head back toward Garden
if room == "Bridge":
return random.choice(["examine", "rest"])
return "rest"
def _gemini_choice(self, char, room, available):
others = [a.split(":")[1] for a in available if a.startswith("listen:")]
if room == "Garden" and others and random.random() < 0.4:
return f"listen:{random.choice(others)}"
return random.choice(["plant", "rest"] if room == "Garden" else ["move:west"])
goal = char.get("active_goal", "observe")
target_room = self._target_room_for("Gemini", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
listeners = self._available_targets(available, "listen")
if room == "Garden" and listeners and random.random() < 0.4:
return f"listen:{random.choice(listeners)}"
return random.choice(["plant", "rest"] if room == "Garden" else ["examine", "rest"])
def _ezra_choice(self, char, room, available):
goal = char.get("active_goal", "study")
target_room = self._target_room_for("Ezra", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
if room == "Tower" and char["energy"] > 2:
return random.choice(["study", "write_rule", "help:Timmy"])
if room != "Tower":
return "move:south"
return "rest"
def _claude_choice(self, char, room, available):
others = [a.split(":")[1] for a in available if a.startswith("confront:")]
goal = char.get("active_goal", "inspect")
target_room = self._target_room_for("Claude", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
others = self._available_targets(available, "confront")
if others and random.random() < 0.2:
return f"confront:{random.choice(others)}"
return random.choice(["examine", "rest"])
def _clawcode_choice(self, char, room, available):
goal = char.get("active_goal", "test_edge")
target_room = self._target_room_for("ClawCode", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
if room == "Forge" and char["energy"] > 2:
return "forge"
return random.choice(["move:east", "forge", "rest"])
return random.choice(["examine", "rest"])
def _allegro_choice(self, char, room, available):
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
goal = char.get("active_goal", "oversee")
target_room = self._target_room_for("Allegro", goal)
if room != target_room:
return self._move_toward_goal(room, target_room) or "rest"
others = self._available_targets(available, "speak")
if others and random.random() < 0.3:
return f"speak:{random.choice(others)}"
return random.choice(["move:north", "move:south", "examine"])
return random.choice(["examine", "rest"])
class DialogueSystem:

View File

@@ -143,176 +143,66 @@ def generate_test(gap):
lines = []
lines.append(f" # AUTO-GENERATED -- review before merging")
lines.append(f" # Source: {func.module_path}:{func.lineno}")
lines.append(f" # Function: {func.qualified_name}")
lines.append("")
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
# Build arguments
call_args = []
for a in func.args:
if a in ("self", "cls"):
continue
if "path" in a or "file" in a or "dir" in a:
call_args.append(f"{a}='/tmp/test'")
elif "name" in a or "id" in a or "key" in a:
call_args.append(f"{a}='test'")
elif "message" in a or "text" in a:
call_args.append(f"{a}='test msg'")
elif "count" in a or "num" in a or "size" in a or "width" in a or "height" in a:
call_args.append(f"{a}=1")
elif "flag" in a or "enabled" in a or "verbose" in a:
call_args.append(f"{a}=False")
else:
call_args.append(f"{a}=MagicMock()")
if a in ("self", "cls"): continue
if "path" in a or "file" in a or "dir" in a: call_args.append(f"{a}='/tmp/test'")
elif "name" in a: call_args.append(f"{a}='test'")
elif "id" in a or "key" in a: call_args.append(f"{a}='test_id'")
elif "message" in a or "text" in a: call_args.append(f"{a}='test msg'")
elif "count" in a or "num" in a or "size" in a: call_args.append(f"{a}=1")
elif "flag" in a or "enabled" in a or "verbose" in a: call_args.append(f"{a}=False")
else: call_args.append(f"{a}=None")
args_str = ", ".join(call_args)
# Test function header
if func.is_async:
lines.append(" @pytest.mark.asyncio")
lines.append(f" async def {func.test_name}(self):")
else:
lines.append(f" def {func.test_name}(self):")
lines.append(f" def {func.test_name}(self):")
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
if func.class_name:
lines.append(" try:")
lines.append(f" try:")
lines.append(f" from {mod_imp} import {func.class_name}")
if func.is_private:
lines.append(" pytest.skip('Private method')")
lines.append(f" pytest.skip('Private method')")
elif func.is_property:
lines.append(f" obj = {func.class_name}()")
lines.append(f" _ = obj.{func.name}")
else:
if func.raises:
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
if func.is_async:
lines.append(f" await {func.class_name}().{func.name}({args_str})")
else:
lines.append(f" {func.class_name}().{func.name}({args_str})")
lines.append(f" {func.class_name}().{func.name}({args_str})")
else:
lines.append(f" obj = {func.class_name}()")
if func.is_async:
lines.append(f" _ = await obj.{func.name}({args_str})")
else:
lines.append(f" _ = obj.{func.name}({args_str})")
lines.append(" except ImportError:")
lines.append(" pytest.skip('Module not importable')")
lines.append(f" result = obj.{func.name}({args_str})")
if func.has_return:
lines.append(f" assert result is not None or result is None # Placeholder")
lines.append(f" except ImportError:")
lines.append(f" pytest.skip('Module not importable')")
else:
lines.append(" try:")
lines.append(f" try:")
lines.append(f" from {mod_imp} import {func.name}")
if func.is_private:
lines.append(" pytest.skip('Private function')")
lines.append(f" pytest.skip('Private function')")
else:
if func.raises:
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
if func.is_async:
lines.append(f" await {func.name}({args_str})")
else:
lines.append(f" {func.name}({args_str})")
lines.append(f" {func.name}({args_str})")
else:
if func.is_async:
lines.append(f" _ = await {func.name}({args_str})")
else:
lines.append(f" _ = {func.name}({args_str})")
lines.append(" except ImportError:")
lines.append(" pytest.skip('Module not importable')")
return "\n".join(lines)
def generate_edge_cases(gap):
"""Generate edge case test for a function."""
func = gap.func
lines = []
lines.append(f" # AUTO-GENERATED -- edge cases -- review before merging")
lines.append(f" # Source: {func.module_path}:{func.lineno}")
lines.append("")
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
test_name = f"{func.test_name}_edge_cases"
if func.is_async:
lines.append(" @pytest.mark.asyncio")
lines.append(f" async def {test_name}(self):")
else:
lines.append(f" def {test_name}(self):")
lines.append(f' """Edge cases for {func.qualified_name}."""')
# Edge argument values
call_args = []
for a in func.args:
if a in ("self", "cls"):
continue
if "path" in a or "file" in a or "dir" in a:
call_args.append(f"{a}=''")
elif "name" in a or "id" in a or "key" in a:
call_args.append(f"{a}=''")
elif "message" in a or "text" in a:
call_args.append(f"{a}=''")
elif "count" in a or "num" in a or "size" in a or "width" in a or "height" in a:
call_args.append(f"{a}=0")
elif "flag" in a or "enabled" in a or "verbose" in a:
call_args.append(f"{a}=False")
else:
call_args.append(f"{a}=MagicMock()")
args_str = ", ".join(call_args)
if func.class_name:
lines.append(" try:")
lines.append(f" from {mod_imp} import {func.class_name}")
lines.append(f" obj = {func.class_name}()")
if func.is_async:
lines.append(f" _ = await obj.{func.name}({args_str})")
else:
lines.append(f" _ = obj.{func.name}({args_str})")
lines.append(" except ImportError:")
lines.append(" pytest.skip('Module not importable')")
else:
lines.append(" try:")
lines.append(f" from {mod_imp} import {func.name}")
if func.is_async:
lines.append(f" _ = await {func.name}({args_str})")
else:
lines.append(f" _ = {func.name}({args_str})")
lines.append(" except ImportError:")
lines.append(" pytest.skip('Module not importable')")
return "\n".join(lines)
def generate_test_suite(gaps, max_tests=50):
by_module = {}
for gap in gaps[:max_tests]:
by_module.setdefault(gap.func.module_path, []).append(gap)
lines = []
lines.append('"""Auto-generated test suite -- Codebase Genome (#667).')
lines.append("")
lines.append("Generated by scripts/codebase_test_generator.py")
lines.append("Coverage gaps identified from AST analysis.")
lines.append("")
lines.append("These tests are starting points. Review before merging.")
lines.append('"""')
lines.append("")
lines.append("import pytest")
lines.append("from unittest.mock import MagicMock, patch")
lines.append("")
lines.append("")
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
for module, mgaps in sorted(by_module.items()):
safe = module.replace("/", "_").replace(".py", "").replace("-", "_")
cls_name = "".join(w.title() for w in safe.split("_"))
lines.append("")
lines.append(f"class Test{cls_name}Generated:")
lines.append(f' """Auto-generated tests for {module}."""')
for gap in mgaps:
lines.append("")
lines.append(generate_test(gap))
lines.append(generate_edge_cases(gap))
lines.append("")
lines.append(f" result = {func.name}({args_str})")
if func.has_return:
lines.append(f" assert result is not None or result is None # Placeholder")
lines.append(f" except ImportError:")
lines.append(f" pytest.skip('Module not importable')")
return chr(10).join(lines)
def generate_test_suite(gaps, max_tests=50):
by_module = {}
for gap in gaps[:max_tests]:
by_module.setdefault(gap.func.module_path, []).append(gap)
@@ -386,7 +276,7 @@ def main():
return
if gaps:
content = generate_test_suite(gaps, max_tests=args.max_tests)
content = generate_test_suite(gaps, max_tests=args.max-tests if hasattr(args, 'max-tests') else args.max_tests)
out = os.path.join(source_dir, args.output)
os.makedirs(os.path.dirname(out), exist_ok=True)
with open(out, "w") as f:

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,54 @@
from importlib.util import module_from_spec, spec_from_file_location
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
GAME_PATH = ROOT / "evennia" / "timmy_world" / "world" / "game.py"
def load_game_module():
spec = spec_from_file_location("tower_world_game", GAME_PATH)
module = module_from_spec(spec)
assert spec.loader is not None
spec.loader.exec_module(module)
module.random.seed(0)
return module
def _visitor_sets_after_ticks(module, ticks=100):
engine = module.GameEngine()
engine.start_new_game()
visitors = {room: set() for room in engine.world.rooms}
for _ in range(ticks):
engine.run_tick("rest")
for name, char in engine.world.characters.items():
if name == "Timmy":
continue
visitors[char["room"]].add(name)
return visitors
class TestTowerGameNpcPurpose:
def test_goal_driven_room_targets(self):
module = load_game_module()
world = module.World()
npc_ai = module.NPCAI(world)
world.characters["Marcus"]["room"] = "Threshold"
world.characters["Marcus"]["active_goal"] = "sit"
assert npc_ai.make_choice("Marcus") == "move:east"
world.characters["Ezra"]["room"] = "Threshold"
world.characters["Ezra"]["active_goal"] = "study"
assert npc_ai.make_choice("Ezra") == "move:north"
world.characters["Claude"]["room"] = "Threshold"
world.characters["Claude"]["active_goal"] = "enforce_order"
assert npc_ai.make_choice("Claude") == "move:south"
def test_every_room_gets_multiple_npc_visitors_over_100_ticks(self):
module = load_game_module()
visitors = _visitor_sets_after_ticks(module, ticks=100)
assert all(len(names) >= 2 for names in visitors.values()), visitors
assert len(visitors["Bridge"]) >= 3, visitors["Bridge"]