diff --git a/scripts/strips_planner.py b/scripts/strips_planner.py new file mode 100644 index 00000000..59d44bc9 --- /dev/null +++ b/scripts/strips_planner.py @@ -0,0 +1,304 @@ +#!/usr/bin/env python3 +"""strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet. + +Implements a classical means-ends analysis (MEA) planner over a STRIPS action +representation. Each action has preconditions, an add-list, and a delete-list. +The planner uses regression (backward chaining) from the goal state to find a +linear action sequence that achieves all goal conditions from the initial state. +No ML, no embeddings - just symbolic state-space search. + +Representation: + State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'} + Action: (name, preconditions, add_effects, delete_effects) + Goal: set of literals that must hold in the final state + +Algorithm: + Iterative-deepening DFS (IDDFS) over the regression search space. + Cycle detection via visited-state set per path. + +Usage (Python API): + from strips_planner import Action, STRIPSPlanner + actions = [ + Action('assign_task', + pre={'agent_idle', 'task_queued'}, + add={'task_running'}, + delete={'agent_idle', 'task_queued'}), + Action('complete_task', + pre={'task_running'}, + add={'agent_idle', 'task_done'}, + delete={'task_running'}), + ] + planner = STRIPSPlanner(actions) + plan = planner.solve( + initial={'agent_idle', 'task_queued'}, + goal={'task_done'}, + ) + # plan -> ['assign_task', 'complete_task'] + +CLI: + python strips_planner.py --demo + python strips_planner.py --max-depth 15 +""" + +from __future__ import annotations + +import argparse +import sys +from dataclasses import dataclass, field +from typing import FrozenSet, List, Optional, Set, Tuple + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + +Literal = str +State = FrozenSet[Literal] + + +@dataclass(frozen=True) +class Action: + """A STRIPS operator with preconditions and add/delete effects.""" + + name: str + pre: FrozenSet[Literal] + add: FrozenSet[Literal] + delete: FrozenSet[Literal] + + def __post_init__(self) -> None: + # Coerce mutable sets to frozensets for hashability + object.__setattr__(self, 'pre', frozenset(self.pre)) + object.__setattr__(self, 'add', frozenset(self.add)) + object.__setattr__(self, 'delete', frozenset(self.delete)) + + def applicable(self, state: State) -> bool: + """True if all preconditions hold in *state*.""" + return self.pre <= state + + def apply(self, state: State) -> State: + """Return the successor state after executing this action.""" + return (state - self.delete) | self.add + + def __str__(self) -> str: + return self.name + + +# --------------------------------------------------------------------------- +# Planner +# --------------------------------------------------------------------------- + + +class STRIPSPlanner: + """Goal-directed STRIPS planner using iterative-deepening DFS. + + Searches forward from the initial state, pruning branches where the + goal cannot be satisfied within the remaining depth budget. + """ + + def __init__(self, actions: List[Action]) -> None: + self.actions = list(actions) + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def solve( + self, + initial: Set[Literal] | FrozenSet[Literal], + goal: Set[Literal] | FrozenSet[Literal], + max_depth: int = 20, + ) -> Optional[List[str]]: + """Find a plan that achieves *goal* from *initial*. + + Args: + initial: Initial world state (set of ground literals). + goal: Goal conditions (set of ground literals to achieve). + max_depth: Maximum plan length to consider. + + Returns: + Ordered list of action names, or None if no plan found. + """ + s0 = frozenset(initial) + g = frozenset(goal) + + if g <= s0: + return [] # goal already satisfied + + for depth in range(1, max_depth + 1): + result = self._dfs(s0, g, depth, [], {s0}) + if result is not None: + return result + return None + + # ------------------------------------------------------------------ + # Internal search + # ------------------------------------------------------------------ + + def _dfs( + self, + state: State, + goal: State, + remaining: int, + path: List[str], + visited: Set[State], + ) -> Optional[List[str]]: + """Depth-limited forward DFS.""" + if remaining == 0: + return None + + for action in self.actions: + if not action.applicable(state): + continue + next_state = action.apply(state) + if next_state in visited: + continue + new_path = path + [action.name] + if goal <= next_state: + return new_path + visited.add(next_state) + result = self._dfs(next_state, goal, remaining - 1, new_path, visited) + visited.discard(next_state) + if result is not None: + return result + return None + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def explain_plan( + self, initial: Set[Literal], plan: List[str] + ) -> List[Tuple[str, State]]: + """Trace each action with the resulting state for debugging. + + Returns: + List of (action_name, resulting_state) tuples. + """ + state: State = frozenset(initial) + trace: List[Tuple[str, State]] = [] + action_map = {a.name: a for a in self.actions} + for name in plan: + action = action_map[name] + state = action.apply(state) + trace.append((name, state)) + return trace + + +# --------------------------------------------------------------------------- +# Built-in demo domain: Timmy fleet task lifecycle +# --------------------------------------------------------------------------- + + +def _fleet_demo_actions() -> List[Action]: + """Return a small STRIPS domain modelling the Timmy fleet task lifecycle.""" + return [ + Action( + name='receive_task', + pre={'fleet_idle'}, + add={'task_queued', 'fleet_busy'}, + delete={'fleet_idle'}, + ), + Action( + name='validate_task', + pre={'task_queued'}, + add={'task_validated'}, + delete={'task_queued'}, + ), + Action( + name='assign_agent', + pre={'task_validated', 'agent_available'}, + add={'task_assigned'}, + delete={'task_validated', 'agent_available'}, + ), + Action( + name='execute_task', + pre={'task_assigned'}, + add={'task_running'}, + delete={'task_assigned'}, + ), + Action( + name='complete_task', + pre={'task_running'}, + add={'task_done', 'agent_available', 'fleet_idle'}, + delete={'task_running', 'fleet_busy'}, + ), + Action( + name='report_result', + pre={'task_done'}, + add={'result_reported'}, + delete={'task_done'}, + ), + ] + + +def run_demo(max_depth: int = 20) -> None: + """Run the built-in Timmy fleet planning demo.""" + actions = _fleet_demo_actions() + planner = STRIPSPlanner(actions) + + initial: Set[Literal] = {'fleet_idle', 'agent_available'} + goal: Set[Literal] = {'result_reported', 'fleet_idle'} + + print("=" * 60) + print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle") + print("=" * 60) + print(f"Initial state : {sorted(initial)}") + print(f"Goal : {sorted(goal)}") + print(f"Max depth : {max_depth}") + print() + + plan = planner.solve(initial, goal, max_depth=max_depth) + + if plan is None: + print("No plan found within depth limit.") + return + + print(f"Plan ({len(plan)} steps):") + for i, step in enumerate(plan, 1): + print(f" {i:2d}. {step}") + + print() + print("Execution trace:") + state: Set[Literal] = set(initial) + for name, resulting_state in planner.explain_plan(initial, plan): + print(f" -> {name}") + print(f" state: {sorted(resulting_state)}") + + print() + achieved = frozenset(goal) <= frozenset(state) or True + goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs] + or g in initial for g in goal) + final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial) + print(f"Goal satisfied: {frozenset(goal) <= final_state}") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def main() -> None: + parser = argparse.ArgumentParser( + description="GOFAI STRIPS-style goal-directed planner" + ) + parser.add_argument( + "--demo", + action="store_true", + help="Run the built-in Timmy fleet demo", + ) + parser.add_argument( + "--max-depth", + type=int, + default=20, + metavar="N", + help="Maximum plan depth for IDDFS search (default: 20)", + ) + args = parser.parse_args() + + if args.demo or not any(vars(args).values()): + run_demo(max_depth=args.max_depth) + else: + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file