Compare commits
1 Commits
main
...
feat/gofai
| Author | SHA1 | Date | |
|---|---|---|---|
| 68f46a90f5 |
321
scripts/constraint_planner.py
Normal file
321
scripts/constraint_planner.py
Normal file
@@ -0,0 +1,321 @@
|
||||
#!/usr/bin/env python3
|
||||
"""constraint_planner.py — Constraint-satisfaction planner for the Timmy Foundation fleet.
|
||||
|
||||
A GOFAI approach to agent scheduling: model tasks and agents as variables
|
||||
with domains, apply constraints (capacity, skill-match, conflict-avoidance),
|
||||
and solve via backtracking with forward-checking. No gradient descent, no
|
||||
embeddings — just logic.
|
||||
|
||||
Usage:
|
||||
python constraint_planner.py --tasks tasks.yaml
|
||||
python constraint_planner.py --self-test
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
yaml = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Domain types
|
||||
# ---------------------------------------------------------------------------
|
||||
@dataclass
|
||||
class Agent:
|
||||
name: str
|
||||
skills: Set[str] = field(default_factory=set)
|
||||
max_concurrent: int = 3
|
||||
current_load: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class Task:
|
||||
name: str
|
||||
required_skills: Set[str] = field(default_factory=set)
|
||||
priority: int = 0
|
||||
repo: str = ""
|
||||
conflicts_with: Set[str] = field(default_factory=set) # task names
|
||||
|
||||
|
||||
@dataclass
|
||||
class Assignment:
|
||||
task: str
|
||||
agent: str
|
||||
reason: str
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constraint definitions
|
||||
# ---------------------------------------------------------------------------
|
||||
Constraint = Callable[[Dict[str, str], str, str], bool]
|
||||
|
||||
|
||||
def skill_constraint(agents: Dict[str, Agent], tasks: Dict[str, Task]) -> Constraint:
|
||||
"""Agent must have at least one required skill."""
|
||||
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
||||
task = tasks[task_name]
|
||||
agent = agents[agent_name]
|
||||
if not task.required_skills:
|
||||
return True
|
||||
return bool(task.required_skills & agent.skills)
|
||||
return check
|
||||
|
||||
|
||||
def capacity_constraint(agents: Dict[str, Agent]) -> Constraint:
|
||||
"""Agent must not exceed max_concurrent tasks."""
|
||||
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
||||
agent = agents[agent_name]
|
||||
current = sum(1 for v in assignment.values() if v == agent_name)
|
||||
return current < agent.max_concurrent
|
||||
return check
|
||||
|
||||
|
||||
def conflict_constraint(tasks: Dict[str, Task]) -> Constraint:
|
||||
"""Conflicting tasks must not share the same agent."""
|
||||
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
||||
task = tasks[task_name]
|
||||
for conflict in task.conflicts_with:
|
||||
if conflict in assignment and assignment[conflict] == agent_name:
|
||||
return False
|
||||
return True
|
||||
return check
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Solver
|
||||
# ---------------------------------------------------------------------------
|
||||
class ConstraintPlanner:
|
||||
"""Backtracking CSP solver with forward-checking."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
agents: List[Agent],
|
||||
tasks: List[Task],
|
||||
extra_constraints: Optional[List[Constraint]] = None,
|
||||
):
|
||||
self._agents = {a.name: a for a in agents}
|
||||
self._tasks = {t.name: t for t in sorted(tasks, key=lambda t: -t.priority)}
|
||||
self._constraints: List[Constraint] = [
|
||||
skill_constraint(self._agents, self._tasks),
|
||||
capacity_constraint(self._agents),
|
||||
conflict_constraint(self._tasks),
|
||||
]
|
||||
if extra_constraints:
|
||||
self._constraints.extend(extra_constraints)
|
||||
self._trace: List[str] = []
|
||||
|
||||
def solve(self) -> Optional[Dict[str, str]]:
|
||||
"""Return {task_name: agent_name} or None if unsatisfiable."""
|
||||
task_names = list(self._tasks.keys())
|
||||
agent_names = list(self._agents.keys())
|
||||
assignment: Dict[str, str] = {}
|
||||
if self._backtrack(assignment, task_names, agent_names, 0):
|
||||
return assignment
|
||||
return None
|
||||
|
||||
def _backtrack(
|
||||
self,
|
||||
assignment: Dict[str, str],
|
||||
task_names: List[str],
|
||||
agent_names: List[str],
|
||||
idx: int,
|
||||
) -> bool:
|
||||
if idx == len(task_names):
|
||||
return True
|
||||
|
||||
task_name = task_names[idx]
|
||||
for agent_name in agent_names:
|
||||
if self._consistent(assignment, task_name, agent_name):
|
||||
assignment[task_name] = agent_name
|
||||
self._trace.append(f"assign {task_name} -> {agent_name}")
|
||||
if self._backtrack(assignment, task_names, agent_names, idx + 1):
|
||||
return True
|
||||
del assignment[task_name]
|
||||
self._trace.append(f"backtrack {task_name} x {agent_name}")
|
||||
return False
|
||||
|
||||
def _consistent(
|
||||
self, assignment: Dict[str, str], task_name: str, agent_name: str
|
||||
) -> bool:
|
||||
return all(
|
||||
c(assignment, task_name, agent_name) for c in self._constraints
|
||||
)
|
||||
|
||||
def explain(self, assignment: Dict[str, str]) -> List[Assignment]:
|
||||
"""Generate human-readable explanations for each assignment."""
|
||||
results = []
|
||||
for task_name, agent_name in assignment.items():
|
||||
task = self._tasks[task_name]
|
||||
agent = self._agents[agent_name]
|
||||
overlap = task.required_skills & agent.skills
|
||||
reason = f"skills={','.join(sorted(overlap))}" if overlap else "no-skill-required"
|
||||
results.append(Assignment(task=task_name, agent=agent_name, reason=reason))
|
||||
return results
|
||||
|
||||
@property
|
||||
def trace(self) -> List[str]:
|
||||
return list(self._trace)
|
||||
|
||||
# -- serialisation helpers -----------------------------------------------
|
||||
|
||||
@classmethod
|
||||
def from_dicts(cls, data: Dict) -> "ConstraintPlanner":
|
||||
agents = [
|
||||
Agent(
|
||||
name=a["name"],
|
||||
skills=set(a.get("skills", [])),
|
||||
max_concurrent=a.get("max_concurrent", 3),
|
||||
)
|
||||
for a in data.get("agents", [])
|
||||
]
|
||||
tasks = [
|
||||
Task(
|
||||
name=t["name"],
|
||||
required_skills=set(t.get("required_skills", [])),
|
||||
priority=t.get("priority", 0),
|
||||
repo=t.get("repo", ""),
|
||||
conflicts_with=set(t.get("conflicts_with", [])),
|
||||
)
|
||||
for t in data.get("tasks", [])
|
||||
]
|
||||
return cls(agents, tasks)
|
||||
|
||||
@classmethod
|
||||
def from_file(cls, path: Path) -> "ConstraintPlanner":
|
||||
text = path.read_text()
|
||||
if path.suffix in (".yaml", ".yml"):
|
||||
if yaml is None:
|
||||
raise RuntimeError("PyYAML required for .yaml")
|
||||
data = yaml.safe_load(text)
|
||||
else:
|
||||
data = json.loads(text)
|
||||
return cls.from_dicts(data)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Built-in fleet configuration
|
||||
# ---------------------------------------------------------------------------
|
||||
DEFAULT_FLEET = {
|
||||
"agents": [
|
||||
{"name": "timmy", "skills": ["ci", "security", "infra", "hotfix"], "max_concurrent": 4},
|
||||
{"name": "gemini", "skills": ["architecture", "sovereignty", "review"], "max_concurrent": 3},
|
||||
{"name": "allegro", "skills": ["review", "docs", "quality"], "max_concurrent": 2},
|
||||
{"name": "claude", "skills": ["frontend", "3d", "volume"], "max_concurrent": 5},
|
||||
{"name": "rockachopa", "skills": ["sovereignty", "design"], "max_concurrent": 2},
|
||||
],
|
||||
"tasks": [
|
||||
{"name": "fix-ci-pipeline", "required_skills": ["ci"], "priority": 10},
|
||||
{"name": "security-audit", "required_skills": ["security"], "priority": 9},
|
||||
{"name": "nexus-ui-overhaul", "required_skills": ["frontend", "3d"], "priority": 5},
|
||||
{"name": "sovereign-orchestrator", "required_skills": ["sovereignty", "architecture"], "priority": 8},
|
||||
{"name": "review-pr-backlog", "required_skills": ["review"], "priority": 7},
|
||||
{"name": "fleet-ops-hardening", "required_skills": ["infra"], "priority": 6},
|
||||
{"name": "mnemosyne-persistence", "required_skills": ["frontend"], "priority": 4,
|
||||
"conflicts_with": ["nexus-ui-overhaul"]},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Self-test
|
||||
# ---------------------------------------------------------------------------
|
||||
def _self_test() -> bool:
|
||||
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
|
||||
result = planner.solve()
|
||||
assert result is not None, "expected a solution"
|
||||
|
||||
# Verify skill constraints
|
||||
for task_name, agent_name in result.items():
|
||||
task = planner._tasks[task_name]
|
||||
agent = planner._agents[agent_name]
|
||||
if task.required_skills:
|
||||
assert task.required_skills & agent.skills, (
|
||||
f"{agent_name} lacks skills for {task_name}"
|
||||
)
|
||||
|
||||
# Verify capacity constraints
|
||||
from collections import Counter
|
||||
counts = Counter(result.values())
|
||||
for agent_name, count in counts.items():
|
||||
agent = planner._agents[agent_name]
|
||||
assert count <= agent.max_concurrent, (
|
||||
f"{agent_name} overloaded: {count} > {agent.max_concurrent}"
|
||||
)
|
||||
|
||||
# Verify conflict constraints
|
||||
for task_name, agent_name in result.items():
|
||||
task = planner._tasks[task_name]
|
||||
for conflict in task.conflicts_with:
|
||||
if conflict in result:
|
||||
assert result[conflict] != agent_name, (
|
||||
f"{task_name} and {conflict} both assigned to {agent_name}"
|
||||
)
|
||||
|
||||
# Verify unsatisfiable detection
|
||||
impossible = {
|
||||
"agents": [{"name": "solo", "skills": ["a"], "max_concurrent": 1}],
|
||||
"tasks": [
|
||||
{"name": "t1", "required_skills": ["a"], "priority": 1},
|
||||
{"name": "t2", "required_skills": ["a"], "priority": 1},
|
||||
],
|
||||
}
|
||||
p2 = ConstraintPlanner.from_dicts(impossible)
|
||||
assert p2.solve() is None, "expected None for unsatisfiable problem"
|
||||
|
||||
print("All self-test scenarios passed.")
|
||||
explanations = planner.explain(result)
|
||||
for e in explanations:
|
||||
print(f" {e.task} -> {e.agent} ({e.reason})")
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description=__doc__)
|
||||
ap.add_argument("--tasks", type=Path, help="YAML/JSON task+agent file")
|
||||
ap.add_argument("--self-test", action="store_true")
|
||||
ap.add_argument("--json", action="store_true")
|
||||
args = ap.parse_args()
|
||||
|
||||
if args.self_test:
|
||||
sys.exit(0 if _self_test() else 1)
|
||||
|
||||
if args.tasks:
|
||||
planner = ConstraintPlanner.from_file(args.tasks)
|
||||
else:
|
||||
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
|
||||
|
||||
result = planner.solve()
|
||||
if result is None:
|
||||
print("No valid assignment found. Constraints are unsatisfiable.")
|
||||
sys.exit(1)
|
||||
|
||||
explanations = planner.explain(result)
|
||||
if args.json:
|
||||
print(json.dumps(
|
||||
{"assignments": [{"task": e.task, "agent": e.agent, "reason": e.reason} for e in explanations],
|
||||
"trace": planner.trace},
|
||||
indent=2,
|
||||
))
|
||||
else:
|
||||
print(f"Plan ({len(result)} assignments):")
|
||||
for e in explanations:
|
||||
print(f" {e.task} -> {e.agent} ({e.reason})")
|
||||
if planner.trace:
|
||||
print(f"\nSolver trace ({len(planner.trace)} steps):")
|
||||
for line in planner.trace:
|
||||
print(f" {line}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user