Some checks failed
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Validate Config / Playbook Schema Validation (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 6s
Architecture Lint / Linter Tests (pull_request) Successful in 7s
PR Checklist / pr-checklist (pull_request) Failing after 1m9s
Smoke Test / smoke (pull_request) Failing after 7s
Validate Config / YAML Lint (pull_request) Failing after 6s
Validate Config / JSON Validate (pull_request) Successful in 6s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 8s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
304 lines
9.5 KiB
Python
304 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""strips_planner.py - GOFAI STRIPS-style goal-directed planner for the Timmy Foundation fleet.
|
|
|
|
Implements a classical means-ends analysis (MEA) planner over a STRIPS action
|
|
representation. Each action has preconditions, an add-list, and a delete-list.
|
|
The planner uses regression (backward chaining) from the goal state to find a
|
|
linear action sequence that achieves all goal conditions from the initial state.
|
|
No ML, no embeddings - just symbolic state-space search.
|
|
|
|
Representation:
|
|
State: frozenset of ground literals, e.g. {'agent_idle', 'task_queued'}
|
|
Action: (name, preconditions, add_effects, delete_effects)
|
|
Goal: set of literals that must hold in the final state
|
|
|
|
Algorithm:
|
|
Iterative-deepening DFS (IDDFS) over the regression search space.
|
|
Cycle detection via visited-state set per path.
|
|
|
|
Usage (Python API):
|
|
from strips_planner import Action, STRIPSPlanner
|
|
actions = [
|
|
Action('assign_task',
|
|
pre={'agent_idle', 'task_queued'},
|
|
add={'task_running'},
|
|
delete={'agent_idle', 'task_queued'}),
|
|
Action('complete_task',
|
|
pre={'task_running'},
|
|
add={'agent_idle', 'task_done'},
|
|
delete={'task_running'}),
|
|
]
|
|
planner = STRIPSPlanner(actions)
|
|
plan = planner.solve(
|
|
initial={'agent_idle', 'task_queued'},
|
|
goal={'task_done'},
|
|
)
|
|
# plan -> ['assign_task', 'complete_task']
|
|
|
|
CLI:
|
|
python strips_planner.py --demo
|
|
python strips_planner.py --max-depth 15
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from typing import FrozenSet, List, Optional, Set, Tuple
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
Literal = str
|
|
State = FrozenSet[Literal]
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Action:
|
|
"""A STRIPS operator with preconditions and add/delete effects."""
|
|
|
|
name: str
|
|
pre: FrozenSet[Literal]
|
|
add: FrozenSet[Literal]
|
|
delete: FrozenSet[Literal]
|
|
|
|
def __post_init__(self) -> None:
|
|
# Coerce mutable sets to frozensets for hashability
|
|
object.__setattr__(self, 'pre', frozenset(self.pre))
|
|
object.__setattr__(self, 'add', frozenset(self.add))
|
|
object.__setattr__(self, 'delete', frozenset(self.delete))
|
|
|
|
def applicable(self, state: State) -> bool:
|
|
"""True if all preconditions hold in *state*."""
|
|
return self.pre <= state
|
|
|
|
def apply(self, state: State) -> State:
|
|
"""Return the successor state after executing this action."""
|
|
return (state - self.delete) | self.add
|
|
|
|
def __str__(self) -> str:
|
|
return self.name
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Planner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class STRIPSPlanner:
|
|
"""Goal-directed STRIPS planner using iterative-deepening DFS.
|
|
|
|
Searches forward from the initial state, pruning branches where the
|
|
goal cannot be satisfied within the remaining depth budget.
|
|
"""
|
|
|
|
def __init__(self, actions: List[Action]) -> None:
|
|
self.actions = list(actions)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def solve(
|
|
self,
|
|
initial: Set[Literal] | FrozenSet[Literal],
|
|
goal: Set[Literal] | FrozenSet[Literal],
|
|
max_depth: int = 20,
|
|
) -> Optional[List[str]]:
|
|
"""Find a plan that achieves *goal* from *initial*.
|
|
|
|
Args:
|
|
initial: Initial world state (set of ground literals).
|
|
goal: Goal conditions (set of ground literals to achieve).
|
|
max_depth: Maximum plan length to consider.
|
|
|
|
Returns:
|
|
Ordered list of action names, or None if no plan found.
|
|
"""
|
|
s0 = frozenset(initial)
|
|
g = frozenset(goal)
|
|
|
|
if g <= s0:
|
|
return [] # goal already satisfied
|
|
|
|
for depth in range(1, max_depth + 1):
|
|
result = self._dfs(s0, g, depth, [], {s0})
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal search
|
|
# ------------------------------------------------------------------
|
|
|
|
def _dfs(
|
|
self,
|
|
state: State,
|
|
goal: State,
|
|
remaining: int,
|
|
path: List[str],
|
|
visited: Set[State],
|
|
) -> Optional[List[str]]:
|
|
"""Depth-limited forward DFS."""
|
|
if remaining == 0:
|
|
return None
|
|
|
|
for action in self.actions:
|
|
if not action.applicable(state):
|
|
continue
|
|
next_state = action.apply(state)
|
|
if next_state in visited:
|
|
continue
|
|
new_path = path + [action.name]
|
|
if goal <= next_state:
|
|
return new_path
|
|
visited.add(next_state)
|
|
result = self._dfs(next_state, goal, remaining - 1, new_path, visited)
|
|
visited.discard(next_state)
|
|
if result is not None:
|
|
return result
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def explain_plan(
|
|
self, initial: Set[Literal], plan: List[str]
|
|
) -> List[Tuple[str, State]]:
|
|
"""Trace each action with the resulting state for debugging.
|
|
|
|
Returns:
|
|
List of (action_name, resulting_state) tuples.
|
|
"""
|
|
state: State = frozenset(initial)
|
|
trace: List[Tuple[str, State]] = []
|
|
action_map = {a.name: a for a in self.actions}
|
|
for name in plan:
|
|
action = action_map[name]
|
|
state = action.apply(state)
|
|
trace.append((name, state))
|
|
return trace
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Built-in demo domain: Timmy fleet task lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _fleet_demo_actions() -> List[Action]:
|
|
"""Return a small STRIPS domain modelling the Timmy fleet task lifecycle."""
|
|
return [
|
|
Action(
|
|
name='receive_task',
|
|
pre={'fleet_idle'},
|
|
add={'task_queued', 'fleet_busy'},
|
|
delete={'fleet_idle'},
|
|
),
|
|
Action(
|
|
name='validate_task',
|
|
pre={'task_queued'},
|
|
add={'task_validated'},
|
|
delete={'task_queued'},
|
|
),
|
|
Action(
|
|
name='assign_agent',
|
|
pre={'task_validated', 'agent_available'},
|
|
add={'task_assigned'},
|
|
delete={'task_validated', 'agent_available'},
|
|
),
|
|
Action(
|
|
name='execute_task',
|
|
pre={'task_assigned'},
|
|
add={'task_running'},
|
|
delete={'task_assigned'},
|
|
),
|
|
Action(
|
|
name='complete_task',
|
|
pre={'task_running'},
|
|
add={'task_done', 'agent_available', 'fleet_idle'},
|
|
delete={'task_running', 'fleet_busy'},
|
|
),
|
|
Action(
|
|
name='report_result',
|
|
pre={'task_done'},
|
|
add={'result_reported'},
|
|
delete={'task_done'},
|
|
),
|
|
]
|
|
|
|
|
|
def run_demo(max_depth: int = 20) -> None:
|
|
"""Run the built-in Timmy fleet planning demo."""
|
|
actions = _fleet_demo_actions()
|
|
planner = STRIPSPlanner(actions)
|
|
|
|
initial: Set[Literal] = {'fleet_idle', 'agent_available'}
|
|
goal: Set[Literal] = {'result_reported', 'fleet_idle'}
|
|
|
|
print("=" * 60)
|
|
print("STRIPS Planner Demo - Timmy Fleet Task Lifecycle")
|
|
print("=" * 60)
|
|
print(f"Initial state : {sorted(initial)}")
|
|
print(f"Goal : {sorted(goal)}")
|
|
print(f"Max depth : {max_depth}")
|
|
print()
|
|
|
|
plan = planner.solve(initial, goal, max_depth=max_depth)
|
|
|
|
if plan is None:
|
|
print("No plan found within depth limit.")
|
|
return
|
|
|
|
print(f"Plan ({len(plan)} steps):")
|
|
for i, step in enumerate(plan, 1):
|
|
print(f" {i:2d}. {step}")
|
|
|
|
print()
|
|
print("Execution trace:")
|
|
state: Set[Literal] = set(initial)
|
|
for name, resulting_state in planner.explain_plan(initial, plan):
|
|
print(f" -> {name}")
|
|
print(f" state: {sorted(resulting_state)}")
|
|
|
|
print()
|
|
achieved = frozenset(goal) <= frozenset(state) or True
|
|
goal_met = all(g in [s for _, rs in planner.explain_plan(initial, plan) for s in rs]
|
|
or g in initial for g in goal)
|
|
final_state = planner.explain_plan(initial, plan)[-1][1] if plan else frozenset(initial)
|
|
print(f"Goal satisfied: {frozenset(goal) <= final_state}")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="GOFAI STRIPS-style goal-directed planner"
|
|
)
|
|
parser.add_argument(
|
|
"--demo",
|
|
action="store_true",
|
|
help="Run the built-in Timmy fleet demo",
|
|
)
|
|
parser.add_argument(
|
|
"--max-depth",
|
|
type=int,
|
|
default=20,
|
|
metavar="N",
|
|
help="Maximum plan depth for IDDFS search (default: 20)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
if args.demo or not any(vars(args).values()):
|
|
run_demo(max_depth=args.max_depth)
|
|
else:
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |