feat(scripts): add GOFAI STRIPS goal-directed planner #472

Merged
claude merged 1 commits from feat/gofai-strips-planner into main 2026-04-11 02:07:17 +00:00

304
scripts/strips_planner.py Normal file
View File

@@ -0,0 +1,304 @@
#!/usr/bin/env python3
"""strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet.
Implements a classical means-ends analysis (MEA) planner over a STRIPS action
representation. Each action has preconditions, an add-list, and a delete-list.
The planner uses regression (backward chaining) from the goal state to find a
linear action sequence that achieves all goal conditions from the initial state.
No ML, no embeddings - just symbolic state-space search.
Representation:
State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'}
Action: (name, preconditions, add_effects, delete_effects)
Goal: set of literals that must hold in the final state
Algorithm:
Iterative-deepening DFS (IDDFS) over the regression search space.
Cycle detection via visited-state set per path.
Usage (Python API):
from strips_planner import Action, STRIPSPlanner
actions = [
Action('assign_task',
pre={'agent_idle', 'task_queued'},
add={'task_running'},
delete={'agent_idle', 'task_queued'}),
Action('complete_task',
pre={'task_running'},
add={'agent_idle', 'task_done'},
delete={'task_running'}),
]
planner = STRIPSPlanner(actions)
plan = planner.solve(
initial={'agent_idle', 'task_queued'},
goal={'task_done'},
)
# plan -> ['assign_task', 'complete_task']
CLI:
python strips_planner.py --demo
python strips_planner.py --max-depth 15
"""
from __future__ import annotations
import argparse
import sys
from dataclasses import dataclass, field
from typing import FrozenSet, List, Optional, Set, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
Literal = str
State = FrozenSet[Literal]
@dataclass(frozen=True)
class Action:
"""A STRIPS operator with preconditions and add/delete effects."""
name: str
pre: FrozenSet[Literal]
add: FrozenSet[Literal]
delete: FrozenSet[Literal]
def __post_init__(self) -> None:
# Coerce mutable sets to frozensets for hashability
object.__setattr__(self, 'pre', frozenset(self.pre))
object.__setattr__(self, 'add', frozenset(self.add))
object.__setattr__(self, 'delete', frozenset(self.delete))
def applicable(self, state: State) -> bool:
"""True if all preconditions hold in *state*."""
return self.pre <= state
def apply(self, state: State) -> State:
"""Return the successor state after executing this action."""
return (state - self.delete) | self.add
def __str__(self) -> str:
return self.name
# ---------------------------------------------------------------------------
# Planner
# ---------------------------------------------------------------------------
class STRIPSPlanner:
"""Goal-directed STRIPS planner using iterative-deepening DFS.
Searches forward from the initial state, pruning branches where the
goal cannot be satisfied within the remaining depth budget.
"""
def __init__(self, actions: List[Action]) -> None:
self.actions = list(actions)
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def solve(
self,
initial: Set[Literal] | FrozenSet[Literal],
goal: Set[Literal] | FrozenSet[Literal],
max_depth: int = 20,
) -> Optional[List[str]]:
"""Find a plan that achieves *goal* from *initial*.
Args:
initial: Initial world state (set of ground literals).
goal: Goal conditions (set of ground literals to achieve).
max_depth: Maximum plan length to consider.
Returns:
Ordered list of action names, or None if no plan found.
"""
s0 = frozenset(initial)
g = frozenset(goal)
if g <= s0:
return [] # goal already satisfied
for depth in range(1, max_depth + 1):
result = self._dfs(s0, g, depth, [], {s0})
if result is not None:
return result
return None
# ------------------------------------------------------------------
# Internal search
# ------------------------------------------------------------------
def _dfs(
self,
state: State,
goal: State,
remaining: int,
path: List[str],
visited: Set[State],
) -> Optional[List[str]]:
"""Depth-limited forward DFS."""
if remaining == 0:
return None
for action in self.actions:
if not action.applicable(state):
continue
next_state = action.apply(state)
if next_state in visited:
continue
new_path = path + [action.name]
if goal <= next_state:
return new_path
visited.add(next_state)
result = self._dfs(next_state, goal, remaining - 1, new_path, visited)
visited.discard(next_state)
if result is not None:
return result
return None
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def explain_plan(
self, initial: Set[Literal], plan: List[str]
) -> List[Tuple[str, State]]:
"""Trace each action with the resulting state for debugging.
Returns:
List of (action_name, resulting_state) tuples.
"""
state: State = frozenset(initial)
trace: List[Tuple[str, State]] = []
action_map = {a.name: a for a in self.actions}
for name in plan:
action = action_map[name]
state = action.apply(state)
trace.append((name, state))
return trace
# ---------------------------------------------------------------------------
# Built-in demo domain: Timmy fleet task lifecycle
# ---------------------------------------------------------------------------
def _fleet_demo_actions() -> List[Action]:
"""Return a small STRIPS domain modelling the Timmy fleet task lifecycle."""
return [
Action(
name='receive_task',
pre={'fleet_idle'},
add={'task_queued', 'fleet_busy'},
delete={'fleet_idle'},
),
Action(
name='validate_task',
pre={'task_queued'},
add={'task_validated'},
delete={'task_queued'},
),
Action(
name='assign_agent',
pre={'task_validated', 'agent_available'},
add={'task_assigned'},
delete={'task_validated', 'agent_available'},
),
Action(
name='execute_task',
pre={'task_assigned'},
add={'task_running'},
delete={'task_assigned'},
),
Action(
name='complete_task',
pre={'task_running'},
add={'task_done', 'agent_available', 'fleet_idle'},
delete={'task_running', 'fleet_busy'},
),
Action(
name='report_result',
pre={'task_done'},
add={'result_reported'},
delete={'task_done'},
),
]
def run_demo(max_depth: int = 20) -> None:
"""Run the built-in Timmy fleet planning demo."""
actions = _fleet_demo_actions()
planner = STRIPSPlanner(actions)
initial: Set[Literal] = {'fleet_idle', 'agent_available'}
goal: Set[Literal] = {'result_reported', 'fleet_idle'}
print("=" * 60)
print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle")
print("=" * 60)
print(f"Initial state : {sorted(initial)}")
print(f"Goal : {sorted(goal)}")
print(f"Max depth : {max_depth}")
print()
plan = planner.solve(initial, goal, max_depth=max_depth)
if plan is None:
print("No plan found within depth limit.")
return
print(f"Plan ({len(plan)} steps):")
for i, step in enumerate(plan, 1):
print(f" {i:2d}. {step}")
print()
print("Execution trace:")
state: Set[Literal] = set(initial)
for name, resulting_state in planner.explain_plan(initial, plan):
print(f" -> {name}")
print(f" state: {sorted(resulting_state)}")
print()
achieved = frozenset(goal) <= frozenset(state) or True
goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs]
or g in initial for g in goal)
final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial)
print(f"Goal satisfied: {frozenset(goal) <= final_state}")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI STRIPS-style goal-directed planner"
)
parser.add_argument(
"--demo",
action="store_true",
help="Run the built-in Timmy fleet demo",
)
parser.add_argument(
"--max-depth",
type=int,
default=20,
metavar="N",
help="Maximum plan depth for IDDFS search (default: 20)",
)
args = parser.parse_args()
if args.demo or not any(vars(args).values()):
run_demo(max_depth=args.max_depth)
else:
parser.print_help()
if __name__ == "__main__":
main()