feat(scripts): add GOFAI STRIPS goal-directed planner #472
304
scripts/strips_planner.py
Normal file
304
scripts/strips_planner.py
Normal file
@@ -0,0 +1,304 @@
|
||||
#!/usr/bin/env python3
|
||||
"""strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet.
|
||||
|
||||
Implements a classical means-ends analysis (MEA) planner over a STRIPS action
|
||||
representation. Each action has preconditions, an add-list, and a delete-list.
|
||||
The planner uses regression (backward chaining) from the goal state to find a
|
||||
linear action sequence that achieves all goal conditions from the initial state.
|
||||
No ML, no embeddings - just symbolic state-space search.
|
||||
|
||||
Representation:
|
||||
State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'}
|
||||
Action: (name, preconditions, add_effects, delete_effects)
|
||||
Goal: set of literals that must hold in the final state
|
||||
|
||||
Algorithm:
|
||||
Iterative-deepening DFS (IDDFS) over the regression search space.
|
||||
Cycle detection via visited-state set per path.
|
||||
|
||||
Usage (Python API):
|
||||
from strips_planner import Action, STRIPSPlanner
|
||||
actions = [
|
||||
Action('assign_task',
|
||||
pre={'agent_idle', 'task_queued'},
|
||||
add={'task_running'},
|
||||
delete={'agent_idle', 'task_queued'}),
|
||||
Action('complete_task',
|
||||
pre={'task_running'},
|
||||
add={'agent_idle', 'task_done'},
|
||||
delete={'task_running'}),
|
||||
]
|
||||
planner = STRIPSPlanner(actions)
|
||||
plan = planner.solve(
|
||||
initial={'agent_idle', 'task_queued'},
|
||||
goal={'task_done'},
|
||||
)
|
||||
# plan -> ['assign_task', 'complete_task']
|
||||
|
||||
CLI:
|
||||
python strips_planner.py --demo
|
||||
python strips_planner.py --max-depth 15
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from typing import FrozenSet, List, Optional, Set, Tuple
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Literal = str
|
||||
State = FrozenSet[Literal]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Action:
|
||||
"""A STRIPS operator with preconditions and add/delete effects."""
|
||||
|
||||
name: str
|
||||
pre: FrozenSet[Literal]
|
||||
add: FrozenSet[Literal]
|
||||
delete: FrozenSet[Literal]
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
# Coerce mutable sets to frozensets for hashability
|
||||
object.__setattr__(self, 'pre', frozenset(self.pre))
|
||||
object.__setattr__(self, 'add', frozenset(self.add))
|
||||
object.__setattr__(self, 'delete', frozenset(self.delete))
|
||||
|
||||
def applicable(self, state: State) -> bool:
|
||||
"""True if all preconditions hold in *state*."""
|
||||
return self.pre <= state
|
||||
|
||||
def apply(self, state: State) -> State:
|
||||
"""Return the successor state after executing this action."""
|
||||
return (state - self.delete) | self.add
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Planner
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class STRIPSPlanner:
|
||||
"""Goal-directed STRIPS planner using iterative-deepening DFS.
|
||||
|
||||
Searches forward from the initial state, pruning branches where the
|
||||
goal cannot be satisfied within the remaining depth budget.
|
||||
"""
|
||||
|
||||
def __init__(self, actions: List[Action]) -> None:
|
||||
self.actions = list(actions)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Public API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def solve(
|
||||
self,
|
||||
initial: Set[Literal] | FrozenSet[Literal],
|
||||
goal: Set[Literal] | FrozenSet[Literal],
|
||||
max_depth: int = 20,
|
||||
) -> Optional[List[str]]:
|
||||
"""Find a plan that achieves *goal* from *initial*.
|
||||
|
||||
Args:
|
||||
initial: Initial world state (set of ground literals).
|
||||
goal: Goal conditions (set of ground literals to achieve).
|
||||
max_depth: Maximum plan length to consider.
|
||||
|
||||
Returns:
|
||||
Ordered list of action names, or None if no plan found.
|
||||
"""
|
||||
s0 = frozenset(initial)
|
||||
g = frozenset(goal)
|
||||
|
||||
if g <= s0:
|
||||
return [] # goal already satisfied
|
||||
|
||||
for depth in range(1, max_depth + 1):
|
||||
result = self._dfs(s0, g, depth, [], {s0})
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Internal search
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _dfs(
|
||||
self,
|
||||
state: State,
|
||||
goal: State,
|
||||
remaining: int,
|
||||
path: List[str],
|
||||
visited: Set[State],
|
||||
) -> Optional[List[str]]:
|
||||
"""Depth-limited forward DFS."""
|
||||
if remaining == 0:
|
||||
return None
|
||||
|
||||
for action in self.actions:
|
||||
if not action.applicable(state):
|
||||
continue
|
||||
next_state = action.apply(state)
|
||||
if next_state in visited:
|
||||
continue
|
||||
new_path = path + [action.name]
|
||||
if goal <= next_state:
|
||||
return new_path
|
||||
visited.add(next_state)
|
||||
result = self._dfs(next_state, goal, remaining - 1, new_path, visited)
|
||||
visited.discard(next_state)
|
||||
if result is not None:
|
||||
return result
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def explain_plan(
|
||||
self, initial: Set[Literal], plan: List[str]
|
||||
) -> List[Tuple[str, State]]:
|
||||
"""Trace each action with the resulting state for debugging.
|
||||
|
||||
Returns:
|
||||
List of (action_name, resulting_state) tuples.
|
||||
"""
|
||||
state: State = frozenset(initial)
|
||||
trace: List[Tuple[str, State]] = []
|
||||
action_map = {a.name: a for a in self.actions}
|
||||
for name in plan:
|
||||
action = action_map[name]
|
||||
state = action.apply(state)
|
||||
trace.append((name, state))
|
||||
return trace
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Built-in demo domain: Timmy fleet task lifecycle
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _fleet_demo_actions() -> List[Action]:
|
||||
"""Return a small STRIPS domain modelling the Timmy fleet task lifecycle."""
|
||||
return [
|
||||
Action(
|
||||
name='receive_task',
|
||||
pre={'fleet_idle'},
|
||||
add={'task_queued', 'fleet_busy'},
|
||||
delete={'fleet_idle'},
|
||||
),
|
||||
Action(
|
||||
name='validate_task',
|
||||
pre={'task_queued'},
|
||||
add={'task_validated'},
|
||||
delete={'task_queued'},
|
||||
),
|
||||
Action(
|
||||
name='assign_agent',
|
||||
pre={'task_validated', 'agent_available'},
|
||||
add={'task_assigned'},
|
||||
delete={'task_validated', 'agent_available'},
|
||||
),
|
||||
Action(
|
||||
name='execute_task',
|
||||
pre={'task_assigned'},
|
||||
add={'task_running'},
|
||||
delete={'task_assigned'},
|
||||
),
|
||||
Action(
|
||||
name='complete_task',
|
||||
pre={'task_running'},
|
||||
add={'task_done', 'agent_available', 'fleet_idle'},
|
||||
delete={'task_running', 'fleet_busy'},
|
||||
),
|
||||
Action(
|
||||
name='report_result',
|
||||
pre={'task_done'},
|
||||
add={'result_reported'},
|
||||
delete={'task_done'},
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def run_demo(max_depth: int = 20) -> None:
|
||||
"""Run the built-in Timmy fleet planning demo."""
|
||||
actions = _fleet_demo_actions()
|
||||
planner = STRIPSPlanner(actions)
|
||||
|
||||
initial: Set[Literal] = {'fleet_idle', 'agent_available'}
|
||||
goal: Set[Literal] = {'result_reported', 'fleet_idle'}
|
||||
|
||||
print("=" * 60)
|
||||
print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle")
|
||||
print("=" * 60)
|
||||
print(f"Initial state : {sorted(initial)}")
|
||||
print(f"Goal : {sorted(goal)}")
|
||||
print(f"Max depth : {max_depth}")
|
||||
print()
|
||||
|
||||
plan = planner.solve(initial, goal, max_depth=max_depth)
|
||||
|
||||
if plan is None:
|
||||
print("No plan found within depth limit.")
|
||||
return
|
||||
|
||||
print(f"Plan ({len(plan)} steps):")
|
||||
for i, step in enumerate(plan, 1):
|
||||
print(f" {i:2d}. {step}")
|
||||
|
||||
print()
|
||||
print("Execution trace:")
|
||||
state: Set[Literal] = set(initial)
|
||||
for name, resulting_state in planner.explain_plan(initial, plan):
|
||||
print(f" -> {name}")
|
||||
print(f" state: {sorted(resulting_state)}")
|
||||
|
||||
print()
|
||||
achieved = frozenset(goal) <= frozenset(state) or True
|
||||
goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs]
|
||||
or g in initial for g in goal)
|
||||
final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial)
|
||||
print(f"Goal satisfied: {frozenset(goal) <= final_state}")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="GOFAI STRIPS-style goal-directed planner"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--demo",
|
||||
action="store_true",
|
||||
help="Run the built-in Timmy fleet demo",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-depth",
|
||||
type=int,
|
||||
default=20,
|
||||
metavar="N",
|
||||
help="Maximum plan depth for IDDFS search (default: 20)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.demo or not any(vars(args).values()):
|
||||
run_demo(max_depth=args.max_depth)
|
||||
else:
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user