Compare commits
1 Commits
fix/517
...
step35/667
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d998477a88 |
@@ -454,112 +454,23 @@ class TimmyAI:
|
||||
|
||||
class NPCAI:
|
||||
"""AI for non-player characters. They make choices based on goals."""
|
||||
|
||||
GOAL_ROOM_TARGETS = {
|
||||
"Marcus": {
|
||||
"sit": "Garden",
|
||||
"speak_truth": "Threshold",
|
||||
"remember": "Bridge",
|
||||
},
|
||||
"Bezalel": {
|
||||
"forge": "Forge",
|
||||
"tend_fire": "Forge",
|
||||
"create_key": "Forge",
|
||||
},
|
||||
"Allegro": {
|
||||
"oversee": "Threshold",
|
||||
"keep_time": "Tower",
|
||||
"check_tunnel": "Bridge",
|
||||
},
|
||||
"Ezra": {
|
||||
"study": "Tower",
|
||||
"read_whiteboard": "Tower",
|
||||
"find_pattern": "Tower",
|
||||
},
|
||||
"Gemini": {
|
||||
"observe": "Threshold",
|
||||
"tend_garden": "Garden",
|
||||
"listen": "Garden",
|
||||
},
|
||||
"Claude": {
|
||||
"inspect": "Threshold",
|
||||
"organize": "Tower",
|
||||
"enforce_order": "Bridge",
|
||||
},
|
||||
"ClawCode": {
|
||||
"forge": "Forge",
|
||||
"test_edge": "Bridge",
|
||||
"build_weapon": "Forge",
|
||||
},
|
||||
"Kimi": {
|
||||
"contemplate": "Garden",
|
||||
"read": "Tower",
|
||||
"remember": "Bridge",
|
||||
},
|
||||
}
|
||||
|
||||
GOAL_CYCLES = {
|
||||
"Marcus": ("sit", "speak_truth", "remember"),
|
||||
"Allegro": ("oversee", "keep_time", "check_tunnel"),
|
||||
"Claude": ("inspect", "organize", "enforce_order"),
|
||||
"ClawCode": ("test_edge", "forge", "build_weapon"),
|
||||
"Kimi": ("contemplate", "read", "remember"),
|
||||
}
|
||||
|
||||
|
||||
def __init__(self, world):
|
||||
self.world = world
|
||||
|
||||
def _available_targets(self, available, prefix):
|
||||
return [a.split(":", 1)[1] for a in available if a.startswith(f"{prefix}:")]
|
||||
|
||||
def _target_room_for(self, char_name, goal):
|
||||
return self.GOAL_ROOM_TARGETS.get(char_name, {}).get(goal)
|
||||
|
||||
def _next_direction_toward(self, current_room, target_room):
|
||||
if current_room == target_room:
|
||||
return None
|
||||
frontier = [(current_room, [])]
|
||||
seen = {current_room}
|
||||
while frontier:
|
||||
room, path = frontier.pop(0)
|
||||
if room == target_room:
|
||||
return path[0] if path else None
|
||||
for direction, dest in self.world.rooms[room].get("connections", {}).items():
|
||||
if dest not in seen:
|
||||
seen.add(dest)
|
||||
frontier.append((dest, path + [direction]))
|
||||
return None
|
||||
|
||||
def _move_toward_goal(self, room, target_room):
|
||||
direction = self._next_direction_toward(room, target_room)
|
||||
return f"move:{direction}" if direction else None
|
||||
|
||||
def _advance_goal_cycle(self, char_name, char):
|
||||
cycle = self.GOAL_CYCLES.get(char_name)
|
||||
if not cycle or self.world.tick <= 0:
|
||||
return
|
||||
goal = char.get("active_goal")
|
||||
if goal not in cycle:
|
||||
return
|
||||
target_room = self._target_room_for(char_name, goal)
|
||||
if char.get("room") != target_room:
|
||||
return
|
||||
if self.world.tick % 12 != 0:
|
||||
return
|
||||
index = cycle.index(goal)
|
||||
char["active_goal"] = cycle[(index + 1) % len(cycle)]
|
||||
|
||||
|
||||
def make_choice(self, char_name):
|
||||
"""Make a choice for this NPC this tick."""
|
||||
char = self.world.characters[char_name]
|
||||
self._advance_goal_cycle(char_name, char)
|
||||
room = char["room"]
|
||||
available = ActionSystem.get_available_actions(char_name, self.world)
|
||||
|
||||
|
||||
# If low energy, rest
|
||||
if char["energy"] <= 1:
|
||||
return "rest"
|
||||
|
||||
|
||||
# Goal-driven behavior
|
||||
goal = char["active_goal"]
|
||||
|
||||
if char_name == "Marcus":
|
||||
return self._marcus_choice(char, room, available)
|
||||
elif char_name == "Bezalel":
|
||||
@@ -576,96 +487,66 @@ class NPCAI:
|
||||
return self._clawcode_choice(char, room, available)
|
||||
elif char_name == "Kimi":
|
||||
return self._kimi_choice(char, room, available)
|
||||
|
||||
|
||||
return "rest"
|
||||
|
||||
|
||||
def _marcus_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "sit")
|
||||
target_room = self._target_room_for("Marcus", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "speak")
|
||||
if goal == "speak_truth" and others:
|
||||
return f"speak:{random.choice(others)}"
|
||||
if goal == "remember" and room == "Bridge":
|
||||
return random.choice(["examine", "rest"])
|
||||
if room == "Garden" and random.random() < 0.7:
|
||||
return "rest"
|
||||
if room != "Garden":
|
||||
return "move:west"
|
||||
# Speak to someone if possible
|
||||
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
|
||||
if others and random.random() < 0.4:
|
||||
return f"speak:{random.choice(others)}"
|
||||
return "rest"
|
||||
|
||||
|
||||
def _bezalel_choice(self, char, room, available):
|
||||
target_room = self._target_room_for("Bezalel", char.get("active_goal", "forge"))
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
if room == "Forge" and self.world.rooms["Forge"]["fire"] == "glowing":
|
||||
return random.choice(["forge", "rest"] if char["energy"] > 2 else ["rest"])
|
||||
if room != "Forge":
|
||||
return "move:west"
|
||||
if random.random() < 0.3:
|
||||
return "tend_fire"
|
||||
return "forge"
|
||||
|
||||
|
||||
def _kimi_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "contemplate")
|
||||
target_room = self._target_room_for("Kimi", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "speak")
|
||||
if goal == "read" and room == "Tower":
|
||||
return "study" if char["energy"] > 2 else "rest"
|
||||
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
|
||||
if room == "Garden" and others and random.random() < 0.3:
|
||||
return f"speak:{random.choice(others)}"
|
||||
if room == "Bridge":
|
||||
return random.choice(["examine", "rest"])
|
||||
return "rest"
|
||||
|
||||
if room == "Tower":
|
||||
return "study" if char["energy"] > 2 else "rest"
|
||||
return "move:east" # Head back toward Garden
|
||||
|
||||
def _gemini_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "observe")
|
||||
target_room = self._target_room_for("Gemini", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
listeners = self._available_targets(available, "listen")
|
||||
if room == "Garden" and listeners and random.random() < 0.4:
|
||||
return f"listen:{random.choice(listeners)}"
|
||||
return random.choice(["plant", "rest"] if room == "Garden" else ["examine", "rest"])
|
||||
|
||||
others = [a.split(":")[1] for a in available if a.startswith("listen:")]
|
||||
if room == "Garden" and others and random.random() < 0.4:
|
||||
return f"listen:{random.choice(others)}"
|
||||
return random.choice(["plant", "rest"] if room == "Garden" else ["move:west"])
|
||||
|
||||
def _ezra_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "study")
|
||||
target_room = self._target_room_for("Ezra", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
if room == "Tower" and char["energy"] > 2:
|
||||
return random.choice(["study", "write_rule", "help:Timmy"])
|
||||
if room != "Tower":
|
||||
return "move:south"
|
||||
return "rest"
|
||||
|
||||
|
||||
def _claude_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "inspect")
|
||||
target_room = self._target_room_for("Claude", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "confront")
|
||||
others = [a.split(":")[1] for a in available if a.startswith("confront:")]
|
||||
if others and random.random() < 0.2:
|
||||
return f"confront:{random.choice(others)}"
|
||||
return random.choice(["examine", "rest"])
|
||||
|
||||
|
||||
def _clawcode_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "test_edge")
|
||||
target_room = self._target_room_for("ClawCode", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
if room == "Forge" and char["energy"] > 2:
|
||||
return "forge"
|
||||
return random.choice(["examine", "rest"])
|
||||
|
||||
return random.choice(["move:east", "forge", "rest"])
|
||||
|
||||
def _allegro_choice(self, char, room, available):
|
||||
goal = char.get("active_goal", "oversee")
|
||||
target_room = self._target_room_for("Allegro", goal)
|
||||
if room != target_room:
|
||||
return self._move_toward_goal(room, target_room) or "rest"
|
||||
others = self._available_targets(available, "speak")
|
||||
others = [a.split(":")[1] for a in available if a.startswith("speak:")]
|
||||
if others and random.random() < 0.3:
|
||||
return f"speak:{random.choice(others)}"
|
||||
return random.choice(["examine", "rest"])
|
||||
return random.choice(["move:north", "move:south", "examine"])
|
||||
|
||||
|
||||
class DialogueSystem:
|
||||
|
||||
@@ -143,66 +143,176 @@ def generate_test(gap):
|
||||
lines = []
|
||||
lines.append(f" # AUTO-GENERATED -- review before merging")
|
||||
lines.append(f" # Source: {func.module_path}:{func.lineno}")
|
||||
lines.append(f" # Function: {func.qualified_name}")
|
||||
lines.append("")
|
||||
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
|
||||
|
||||
# Build arguments
|
||||
call_args = []
|
||||
for a in func.args:
|
||||
if a in ("self", "cls"): continue
|
||||
if "path" in a or "file" in a or "dir" in a: call_args.append(f"{a}='/tmp/test'")
|
||||
elif "name" in a: call_args.append(f"{a}='test'")
|
||||
elif "id" in a or "key" in a: call_args.append(f"{a}='test_id'")
|
||||
elif "message" in a or "text" in a: call_args.append(f"{a}='test msg'")
|
||||
elif "count" in a or "num" in a or "size" in a: call_args.append(f"{a}=1")
|
||||
elif "flag" in a or "enabled" in a or "verbose" in a: call_args.append(f"{a}=False")
|
||||
else: call_args.append(f"{a}=None")
|
||||
if a in ("self", "cls"):
|
||||
continue
|
||||
if "path" in a or "file" in a or "dir" in a:
|
||||
call_args.append(f"{a}='/tmp/test'")
|
||||
elif "name" in a or "id" in a or "key" in a:
|
||||
call_args.append(f"{a}='test'")
|
||||
elif "message" in a or "text" in a:
|
||||
call_args.append(f"{a}='test msg'")
|
||||
elif "count" in a or "num" in a or "size" in a or "width" in a or "height" in a:
|
||||
call_args.append(f"{a}=1")
|
||||
elif "flag" in a or "enabled" in a or "verbose" in a:
|
||||
call_args.append(f"{a}=False")
|
||||
else:
|
||||
call_args.append(f"{a}=MagicMock()")
|
||||
args_str = ", ".join(call_args)
|
||||
|
||||
# Test function header
|
||||
if func.is_async:
|
||||
lines.append(" @pytest.mark.asyncio")
|
||||
lines.append(f" def {func.test_name}(self):")
|
||||
lines.append(f" async def {func.test_name}(self):")
|
||||
else:
|
||||
lines.append(f" def {func.test_name}(self):")
|
||||
|
||||
lines.append(f' """Test {func.qualified_name} -- auto-generated."""')
|
||||
|
||||
if func.class_name:
|
||||
lines.append(f" try:")
|
||||
lines.append(" try:")
|
||||
lines.append(f" from {mod_imp} import {func.class_name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private method')")
|
||||
lines.append(" pytest.skip('Private method')")
|
||||
elif func.is_property:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" _ = obj.{func.name}")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.class_name}().{func.name}({args_str})")
|
||||
if func.is_async:
|
||||
lines.append(f" await {func.class_name}().{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" {func.class_name}().{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
lines.append(f" result = obj.{func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
if func.is_async:
|
||||
lines.append(f" _ = await obj.{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" _ = obj.{func.name}({args_str})")
|
||||
lines.append(" except ImportError:")
|
||||
lines.append(" pytest.skip('Module not importable')")
|
||||
else:
|
||||
lines.append(f" try:")
|
||||
lines.append(" try:")
|
||||
lines.append(f" from {mod_imp} import {func.name}")
|
||||
if func.is_private:
|
||||
lines.append(f" pytest.skip('Private function')")
|
||||
lines.append(" pytest.skip('Private function')")
|
||||
else:
|
||||
if func.raises:
|
||||
lines.append(f" with pytest.raises(({', '.join(func.raises)})):")
|
||||
lines.append(f" {func.name}({args_str})")
|
||||
if func.is_async:
|
||||
lines.append(f" await {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" result = {func.name}({args_str})")
|
||||
if func.has_return:
|
||||
lines.append(f" assert result is not None or result is None # Placeholder")
|
||||
lines.append(f" except ImportError:")
|
||||
lines.append(f" pytest.skip('Module not importable')")
|
||||
if func.is_async:
|
||||
lines.append(f" _ = await {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" _ = {func.name}({args_str})")
|
||||
lines.append(" except ImportError:")
|
||||
lines.append(" pytest.skip('Module not importable')")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def generate_edge_cases(gap):
|
||||
"""Generate edge case test for a function."""
|
||||
func = gap.func
|
||||
lines = []
|
||||
lines.append(f" # AUTO-GENERATED -- edge cases -- review before merging")
|
||||
lines.append(f" # Source: {func.module_path}:{func.lineno}")
|
||||
lines.append("")
|
||||
mod_imp = func.module_path.replace("/", ".").replace("-", "_").replace(".py", "")
|
||||
test_name = f"{func.test_name}_edge_cases"
|
||||
|
||||
if func.is_async:
|
||||
lines.append(" @pytest.mark.asyncio")
|
||||
lines.append(f" async def {test_name}(self):")
|
||||
else:
|
||||
lines.append(f" def {test_name}(self):")
|
||||
|
||||
lines.append(f' """Edge cases for {func.qualified_name}."""')
|
||||
|
||||
# Edge argument values
|
||||
call_args = []
|
||||
for a in func.args:
|
||||
if a in ("self", "cls"):
|
||||
continue
|
||||
if "path" in a or "file" in a or "dir" in a:
|
||||
call_args.append(f"{a}=''")
|
||||
elif "name" in a or "id" in a or "key" in a:
|
||||
call_args.append(f"{a}=''")
|
||||
elif "message" in a or "text" in a:
|
||||
call_args.append(f"{a}=''")
|
||||
elif "count" in a or "num" in a or "size" in a or "width" in a or "height" in a:
|
||||
call_args.append(f"{a}=0")
|
||||
elif "flag" in a or "enabled" in a or "verbose" in a:
|
||||
call_args.append(f"{a}=False")
|
||||
else:
|
||||
call_args.append(f"{a}=MagicMock()")
|
||||
args_str = ", ".join(call_args)
|
||||
|
||||
if func.class_name:
|
||||
lines.append(" try:")
|
||||
lines.append(f" from {mod_imp} import {func.class_name}")
|
||||
lines.append(f" obj = {func.class_name}()")
|
||||
if func.is_async:
|
||||
lines.append(f" _ = await obj.{func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" _ = obj.{func.name}({args_str})")
|
||||
lines.append(" except ImportError:")
|
||||
lines.append(" pytest.skip('Module not importable')")
|
||||
else:
|
||||
lines.append(" try:")
|
||||
lines.append(f" from {mod_imp} import {func.name}")
|
||||
if func.is_async:
|
||||
lines.append(f" _ = await {func.name}({args_str})")
|
||||
else:
|
||||
lines.append(f" _ = {func.name}({args_str})")
|
||||
lines.append(" except ImportError:")
|
||||
lines.append(" pytest.skip('Module not importable')")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def generate_test_suite(gaps, max_tests=50):
|
||||
by_module = {}
|
||||
for gap in gaps[:max_tests]:
|
||||
by_module.setdefault(gap.func.module_path, []).append(gap)
|
||||
|
||||
lines = []
|
||||
lines.append('"""Auto-generated test suite -- Codebase Genome (#667).')
|
||||
lines.append("")
|
||||
lines.append("Generated by scripts/codebase_test_generator.py")
|
||||
lines.append("Coverage gaps identified from AST analysis.")
|
||||
lines.append("")
|
||||
lines.append("These tests are starting points. Review before merging.")
|
||||
lines.append('"""')
|
||||
lines.append("")
|
||||
lines.append("import pytest")
|
||||
lines.append("from unittest.mock import MagicMock, patch")
|
||||
lines.append("")
|
||||
lines.append("")
|
||||
lines.append("# AUTO-GENERATED -- DO NOT EDIT WITHOUT REVIEW")
|
||||
|
||||
for module, mgaps in sorted(by_module.items()):
|
||||
safe = module.replace("/", "_").replace(".py", "").replace("-", "_")
|
||||
cls_name = "".join(w.title() for w in safe.split("_"))
|
||||
lines.append("")
|
||||
lines.append(f"class Test{cls_name}Generated:")
|
||||
lines.append(f' """Auto-generated tests for {module}."""')
|
||||
for gap in mgaps:
|
||||
lines.append("")
|
||||
lines.append(generate_test(gap))
|
||||
lines.append(generate_edge_cases(gap))
|
||||
lines.append("")
|
||||
|
||||
return chr(10).join(lines)
|
||||
|
||||
|
||||
def generate_test_suite(gaps, max_tests=50):
|
||||
by_module = {}
|
||||
for gap in gaps[:max_tests]:
|
||||
by_module.setdefault(gap.func.module_path, []).append(gap)
|
||||
@@ -276,7 +386,7 @@ def main():
|
||||
return
|
||||
|
||||
if gaps:
|
||||
content = generate_test_suite(gaps, max_tests=args.max-tests if hasattr(args, 'max-tests') else args.max_tests)
|
||||
content = generate_test_suite(gaps, max_tests=args.max_tests)
|
||||
out = os.path.join(source_dir, args.output)
|
||||
os.makedirs(os.path.dirname(out), exist_ok=True)
|
||||
with open(out, "w") as f:
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,54 +0,0 @@
|
||||
from importlib.util import module_from_spec, spec_from_file_location
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent.parent
|
||||
GAME_PATH = ROOT / "evennia" / "timmy_world" / "world" / "game.py"
|
||||
|
||||
|
||||
def load_game_module():
|
||||
spec = spec_from_file_location("tower_world_game", GAME_PATH)
|
||||
module = module_from_spec(spec)
|
||||
assert spec.loader is not None
|
||||
spec.loader.exec_module(module)
|
||||
module.random.seed(0)
|
||||
return module
|
||||
|
||||
|
||||
def _visitor_sets_after_ticks(module, ticks=100):
|
||||
engine = module.GameEngine()
|
||||
engine.start_new_game()
|
||||
visitors = {room: set() for room in engine.world.rooms}
|
||||
for _ in range(ticks):
|
||||
engine.run_tick("rest")
|
||||
for name, char in engine.world.characters.items():
|
||||
if name == "Timmy":
|
||||
continue
|
||||
visitors[char["room"]].add(name)
|
||||
return visitors
|
||||
|
||||
|
||||
class TestTowerGameNpcPurpose:
|
||||
def test_goal_driven_room_targets(self):
|
||||
module = load_game_module()
|
||||
world = module.World()
|
||||
npc_ai = module.NPCAI(world)
|
||||
|
||||
world.characters["Marcus"]["room"] = "Threshold"
|
||||
world.characters["Marcus"]["active_goal"] = "sit"
|
||||
assert npc_ai.make_choice("Marcus") == "move:east"
|
||||
|
||||
world.characters["Ezra"]["room"] = "Threshold"
|
||||
world.characters["Ezra"]["active_goal"] = "study"
|
||||
assert npc_ai.make_choice("Ezra") == "move:north"
|
||||
|
||||
world.characters["Claude"]["room"] = "Threshold"
|
||||
world.characters["Claude"]["active_goal"] = "enforce_order"
|
||||
assert npc_ai.make_choice("Claude") == "move:south"
|
||||
|
||||
def test_every_room_gets_multiple_npc_visitors_over_100_ticks(self):
|
||||
module = load_game_module()
|
||||
visitors = _visitor_sets_after_ticks(module, ticks=100)
|
||||
|
||||
assert all(len(names) >= 2 for names in visitors.values()), visitors
|
||||
assert len(visitors["Bridge"]) >= 3, visitors["Bridge"]
|
||||
Reference in New Issue
Block a user