#!/usr/bin/env python3 """strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet. Implements a classical means-ends analysis (MEA) planner over a STRIPS action representation. Each action has preconditions, an add-list, and a delete-list. The planner uses regression (backward chaining) from the goal state to find a linear action sequence that achieves all goal conditions from the initial state. No ML, no embeddings - just symbolic state-space search. Representation: State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'} Action: (name, preconditions, add_effects, delete_effects) Goal: set of literals that must hold in the final state Algorithm: Iterative-deepening DFS (IDDFS) over the regression search space. Cycle detection via visited-state set per path. Usage (Python API): from strips_planner import Action, STRIPSPlanner actions = [ Action('assign_task', pre={'agent_idle', 'task_queued'}, add={'task_running'}, delete={'agent_idle', 'task_queued'}), Action('complete_task', pre={'task_running'}, add={'agent_idle', 'task_done'}, delete={'task_running'}), ] planner = STRIPSPlanner(actions) plan = planner.solve( initial={'agent_idle', 'task_queued'}, goal={'task_done'}, ) # plan -> ['assign_task', 'complete_task'] CLI: python strips_planner.py --demo python strips_planner.py --max-depth 15 """ from __future__ import annotations import argparse import sys from dataclasses import dataclass, field from typing import FrozenSet, List, Optional, Set, Tuple # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- Literal = str State = FrozenSet[Literal] @dataclass(frozen=True) class Action: """A STRIPS operator with preconditions and add/delete effects.""" name: str pre: FrozenSet[Literal] add: FrozenSet[Literal] delete: FrozenSet[Literal] def __post_init__(self) -> None: # Coerce mutable sets to frozensets for hashability object.__setattr__(self, 'pre', frozenset(self.pre)) object.__setattr__(self, 'add', frozenset(self.add)) object.__setattr__(self, 'delete', frozenset(self.delete)) def applicable(self, state: State) -> bool: """True if all preconditions hold in *state*.""" return self.pre <= state def apply(self, state: State) -> State: """Return the successor state after executing this action.""" return (state - self.delete) | self.add def __str__(self) -> str: return self.name # --------------------------------------------------------------------------- # Planner # --------------------------------------------------------------------------- class STRIPSPlanner: """Goal-directed STRIPS planner using iterative-deepening DFS. Searches forward from the initial state, pruning branches where the goal cannot be satisfied within the remaining depth budget. """ def __init__(self, actions: List[Action]) -> None: self.actions = list(actions) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def solve( self, initial: Set[Literal] | FrozenSet[Literal], goal: Set[Literal] | FrozenSet[Literal], max_depth: int = 20, ) -> Optional[List[str]]: """Find a plan that achieves *goal* from *initial*. Args: initial: Initial world state (set of ground literals). goal: Goal conditions (set of ground literals to achieve). max_depth: Maximum plan length to consider. Returns: Ordered list of action names, or None if no plan found. """ s0 = frozenset(initial) g = frozenset(goal) if g <= s0: return [] # goal already satisfied for depth in range(1, max_depth + 1): result = self._dfs(s0, g, depth, [], {s0}) if result is not None: return result return None # ------------------------------------------------------------------ # Internal search # ------------------------------------------------------------------ def _dfs( self, state: State, goal: State, remaining: int, path: List[str], visited: Set[State], ) -> Optional[List[str]]: """Depth-limited forward DFS.""" if remaining == 0: return None for action in self.actions: if not action.applicable(state): continue next_state = action.apply(state) if next_state in visited: continue new_path = path + [action.name] if goal <= next_state: return new_path visited.add(next_state) result = self._dfs(next_state, goal, remaining - 1, new_path, visited) visited.discard(next_state) if result is not None: return result return None # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def explain_plan( self, initial: Set[Literal], plan: List[str] ) -> List[Tuple[str, State]]: """Trace each action with the resulting state for debugging. Returns: List of (action_name, resulting_state) tuples. """ state: State = frozenset(initial) trace: List[Tuple[str, State]] = [] action_map = {a.name: a for a in self.actions} for name in plan: action = action_map[name] state = action.apply(state) trace.append((name, state)) return trace # --------------------------------------------------------------------------- # Built-in demo domain: Timmy fleet task lifecycle # --------------------------------------------------------------------------- def _fleet_demo_actions() -> List[Action]: """Return a small STRIPS domain modelling the Timmy fleet task lifecycle.""" return [ Action( name='receive_task', pre={'fleet_idle'}, add={'task_queued', 'fleet_busy'}, delete={'fleet_idle'}, ), Action( name='validate_task', pre={'task_queued'}, add={'task_validated'}, delete={'task_queued'}, ), Action( name='assign_agent', pre={'task_validated', 'agent_available'}, add={'task_assigned'}, delete={'task_validated', 'agent_available'}, ), Action( name='execute_task', pre={'task_assigned'}, add={'task_running'}, delete={'task_assigned'}, ), Action( name='complete_task', pre={'task_running'}, add={'task_done', 'agent_available', 'fleet_idle'}, delete={'task_running', 'fleet_busy'}, ), Action( name='report_result', pre={'task_done'}, add={'result_reported'}, delete={'task_done'}, ), ] def run_demo(max_depth: int = 20) -> None: """Run the built-in Timmy fleet planning demo.""" actions = _fleet_demo_actions() planner = STRIPSPlanner(actions) initial: Set[Literal] = {'fleet_idle', 'agent_available'} goal: Set[Literal] = {'result_reported', 'fleet_idle'} print("=" * 60) print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle") print("=" * 60) print(f"Initial state : {sorted(initial)}") print(f"Goal : {sorted(goal)}") print(f"Max depth : {max_depth}") print() plan = planner.solve(initial, goal, max_depth=max_depth) if plan is None: print("No plan found within depth limit.") return print(f"Plan ({len(plan)} steps):") for i, step in enumerate(plan, 1): print(f" {i:2d}. {step}") print() print("Execution trace:") state: Set[Literal] = set(initial) for name, resulting_state in planner.explain_plan(initial, plan): print(f" -> {name}") print(f" state: {sorted(resulting_state)}") print() achieved = frozenset(goal) <= frozenset(state) or True goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs] or g in initial for g in goal) final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial) print(f"Goal satisfied: {frozenset(goal) <= final_state}") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main() -> None: parser = argparse.ArgumentParser( description="GOFAI STRIPS-style goal-directed planner" ) parser.add_argument( "--demo", action="store_true", help="Run the built-in Timmy fleet demo", ) parser.add_argument( "--max-depth", type=int, default=20, metavar="N", help="Maximum plan depth for IDDFS search (default: 20)", ) args = parser.parse_args() if args.demo or not any(vars(args).values()): run_demo(max_depth=args.max_depth) else: parser.print_help() if __name__ == "__main__": main()