322 lines
11 KiB
Python
322 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""constraint_planner.py — Constraint-satisfaction planner for the Timmy Foundation fleet.
|
|
|
|
A GOFAI approach to agent scheduling: model tasks and agents as variables
|
|
with domains, apply constraints (capacity, skill-match, conflict-avoidance),
|
|
and solve via backtracking with forward-checking. No gradient descent, no
|
|
embeddings — just logic.
|
|
|
|
Usage:
|
|
python constraint_planner.py --tasks tasks.yaml
|
|
python constraint_planner.py --self-test
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
yaml = None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain types
|
|
# ---------------------------------------------------------------------------
|
|
@dataclass
|
|
class Agent:
|
|
name: str
|
|
skills: Set[str] = field(default_factory=set)
|
|
max_concurrent: int = 3
|
|
current_load: int = 0
|
|
|
|
|
|
@dataclass
|
|
class Task:
|
|
name: str
|
|
required_skills: Set[str] = field(default_factory=set)
|
|
priority: int = 0
|
|
repo: str = ""
|
|
conflicts_with: Set[str] = field(default_factory=set) # task names
|
|
|
|
|
|
@dataclass
|
|
class Assignment:
|
|
task: str
|
|
agent: str
|
|
reason: str
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Constraint definitions
|
|
# ---------------------------------------------------------------------------
|
|
Constraint = Callable[[Dict[str, str], str, str], bool]
|
|
|
|
|
|
def skill_constraint(agents: Dict[str, Agent], tasks: Dict[str, Task]) -> Constraint:
|
|
"""Agent must have at least one required skill."""
|
|
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
|
task = tasks[task_name]
|
|
agent = agents[agent_name]
|
|
if not task.required_skills:
|
|
return True
|
|
return bool(task.required_skills & agent.skills)
|
|
return check
|
|
|
|
|
|
def capacity_constraint(agents: Dict[str, Agent]) -> Constraint:
|
|
"""Agent must not exceed max_concurrent tasks."""
|
|
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
|
agent = agents[agent_name]
|
|
current = sum(1 for v in assignment.values() if v == agent_name)
|
|
return current < agent.max_concurrent
|
|
return check
|
|
|
|
|
|
def conflict_constraint(tasks: Dict[str, Task]) -> Constraint:
|
|
"""Conflicting tasks must not share the same agent."""
|
|
def check(assignment: Dict[str, str], task_name: str, agent_name: str) -> bool:
|
|
task = tasks[task_name]
|
|
for conflict in task.conflicts_with:
|
|
if conflict in assignment and assignment[conflict] == agent_name:
|
|
return False
|
|
return True
|
|
return check
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Solver
|
|
# ---------------------------------------------------------------------------
|
|
class ConstraintPlanner:
|
|
"""Backtracking CSP solver with forward-checking."""
|
|
|
|
def __init__(
|
|
self,
|
|
agents: List[Agent],
|
|
tasks: List[Task],
|
|
extra_constraints: Optional[List[Constraint]] = None,
|
|
):
|
|
self._agents = {a.name: a for a in agents}
|
|
self._tasks = {t.name: t for t in sorted(tasks, key=lambda t: -t.priority)}
|
|
self._constraints: List[Constraint] = [
|
|
skill_constraint(self._agents, self._tasks),
|
|
capacity_constraint(self._agents),
|
|
conflict_constraint(self._tasks),
|
|
]
|
|
if extra_constraints:
|
|
self._constraints.extend(extra_constraints)
|
|
self._trace: List[str] = []
|
|
|
|
def solve(self) -> Optional[Dict[str, str]]:
|
|
"""Return {task_name: agent_name} or None if unsatisfiable."""
|
|
task_names = list(self._tasks.keys())
|
|
agent_names = list(self._agents.keys())
|
|
assignment: Dict[str, str] = {}
|
|
if self._backtrack(assignment, task_names, agent_names, 0):
|
|
return assignment
|
|
return None
|
|
|
|
def _backtrack(
|
|
self,
|
|
assignment: Dict[str, str],
|
|
task_names: List[str],
|
|
agent_names: List[str],
|
|
idx: int,
|
|
) -> bool:
|
|
if idx == len(task_names):
|
|
return True
|
|
|
|
task_name = task_names[idx]
|
|
for agent_name in agent_names:
|
|
if self._consistent(assignment, task_name, agent_name):
|
|
assignment[task_name] = agent_name
|
|
self._trace.append(f"assign {task_name} -> {agent_name}")
|
|
if self._backtrack(assignment, task_names, agent_names, idx + 1):
|
|
return True
|
|
del assignment[task_name]
|
|
self._trace.append(f"backtrack {task_name} x {agent_name}")
|
|
return False
|
|
|
|
def _consistent(
|
|
self, assignment: Dict[str, str], task_name: str, agent_name: str
|
|
) -> bool:
|
|
return all(
|
|
c(assignment, task_name, agent_name) for c in self._constraints
|
|
)
|
|
|
|
def explain(self, assignment: Dict[str, str]) -> List[Assignment]:
|
|
"""Generate human-readable explanations for each assignment."""
|
|
results = []
|
|
for task_name, agent_name in assignment.items():
|
|
task = self._tasks[task_name]
|
|
agent = self._agents[agent_name]
|
|
overlap = task.required_skills & agent.skills
|
|
reason = f"skills={','.join(sorted(overlap))}" if overlap else "no-skill-required"
|
|
results.append(Assignment(task=task_name, agent=agent_name, reason=reason))
|
|
return results
|
|
|
|
@property
|
|
def trace(self) -> List[str]:
|
|
return list(self._trace)
|
|
|
|
# -- serialisation helpers -----------------------------------------------
|
|
|
|
@classmethod
|
|
def from_dicts(cls, data: Dict) -> "ConstraintPlanner":
|
|
agents = [
|
|
Agent(
|
|
name=a["name"],
|
|
skills=set(a.get("skills", [])),
|
|
max_concurrent=a.get("max_concurrent", 3),
|
|
)
|
|
for a in data.get("agents", [])
|
|
]
|
|
tasks = [
|
|
Task(
|
|
name=t["name"],
|
|
required_skills=set(t.get("required_skills", [])),
|
|
priority=t.get("priority", 0),
|
|
repo=t.get("repo", ""),
|
|
conflicts_with=set(t.get("conflicts_with", [])),
|
|
)
|
|
for t in data.get("tasks", [])
|
|
]
|
|
return cls(agents, tasks)
|
|
|
|
@classmethod
|
|
def from_file(cls, path: Path) -> "ConstraintPlanner":
|
|
text = path.read_text()
|
|
if path.suffix in (".yaml", ".yml"):
|
|
if yaml is None:
|
|
raise RuntimeError("PyYAML required for .yaml")
|
|
data = yaml.safe_load(text)
|
|
else:
|
|
data = json.loads(text)
|
|
return cls.from_dicts(data)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Built-in fleet configuration
|
|
# ---------------------------------------------------------------------------
|
|
DEFAULT_FLEET = {
|
|
"agents": [
|
|
{"name": "timmy", "skills": ["ci", "security", "infra", "hotfix"], "max_concurrent": 4},
|
|
{"name": "gemini", "skills": ["architecture", "sovereignty", "review"], "max_concurrent": 3},
|
|
{"name": "allegro", "skills": ["review", "docs", "quality"], "max_concurrent": 2},
|
|
{"name": "claude", "skills": ["frontend", "3d", "volume"], "max_concurrent": 5},
|
|
{"name": "rockachopa", "skills": ["sovereignty", "design"], "max_concurrent": 2},
|
|
],
|
|
"tasks": [
|
|
{"name": "fix-ci-pipeline", "required_skills": ["ci"], "priority": 10},
|
|
{"name": "security-audit", "required_skills": ["security"], "priority": 9},
|
|
{"name": "nexus-ui-overhaul", "required_skills": ["frontend", "3d"], "priority": 5},
|
|
{"name": "sovereign-orchestrator", "required_skills": ["sovereignty", "architecture"], "priority": 8},
|
|
{"name": "review-pr-backlog", "required_skills": ["review"], "priority": 7},
|
|
{"name": "fleet-ops-hardening", "required_skills": ["infra"], "priority": 6},
|
|
{"name": "mnemosyne-persistence", "required_skills": ["frontend"], "priority": 4,
|
|
"conflicts_with": ["nexus-ui-overhaul"]},
|
|
],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Self-test
|
|
# ---------------------------------------------------------------------------
|
|
def _self_test() -> bool:
|
|
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
|
|
result = planner.solve()
|
|
assert result is not None, "expected a solution"
|
|
|
|
# Verify skill constraints
|
|
for task_name, agent_name in result.items():
|
|
task = planner._tasks[task_name]
|
|
agent = planner._agents[agent_name]
|
|
if task.required_skills:
|
|
assert task.required_skills & agent.skills, (
|
|
f"{agent_name} lacks skills for {task_name}"
|
|
)
|
|
|
|
# Verify capacity constraints
|
|
from collections import Counter
|
|
counts = Counter(result.values())
|
|
for agent_name, count in counts.items():
|
|
agent = planner._agents[agent_name]
|
|
assert count <= agent.max_concurrent, (
|
|
f"{agent_name} overloaded: {count} > {agent.max_concurrent}"
|
|
)
|
|
|
|
# Verify conflict constraints
|
|
for task_name, agent_name in result.items():
|
|
task = planner._tasks[task_name]
|
|
for conflict in task.conflicts_with:
|
|
if conflict in result:
|
|
assert result[conflict] != agent_name, (
|
|
f"{task_name} and {conflict} both assigned to {agent_name}"
|
|
)
|
|
|
|
# Verify unsatisfiable detection
|
|
impossible = {
|
|
"agents": [{"name": "solo", "skills": ["a"], "max_concurrent": 1}],
|
|
"tasks": [
|
|
{"name": "t1", "required_skills": ["a"], "priority": 1},
|
|
{"name": "t2", "required_skills": ["a"], "priority": 1},
|
|
],
|
|
}
|
|
p2 = ConstraintPlanner.from_dicts(impossible)
|
|
assert p2.solve() is None, "expected None for unsatisfiable problem"
|
|
|
|
print("All self-test scenarios passed.")
|
|
explanations = planner.explain(result)
|
|
for e in explanations:
|
|
print(f" {e.task} -> {e.agent} ({e.reason})")
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
def main():
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--tasks", type=Path, help="YAML/JSON task+agent file")
|
|
ap.add_argument("--self-test", action="store_true")
|
|
ap.add_argument("--json", action="store_true")
|
|
args = ap.parse_args()
|
|
|
|
if args.self_test:
|
|
sys.exit(0 if _self_test() else 1)
|
|
|
|
if args.tasks:
|
|
planner = ConstraintPlanner.from_file(args.tasks)
|
|
else:
|
|
planner = ConstraintPlanner.from_dicts(DEFAULT_FLEET)
|
|
|
|
result = planner.solve()
|
|
if result is None:
|
|
print("No valid assignment found. Constraints are unsatisfiable.")
|
|
sys.exit(1)
|
|
|
|
explanations = planner.explain(result)
|
|
if args.json:
|
|
print(json.dumps(
|
|
{"assignments": [{"task": e.task, "agent": e.agent, "reason": e.reason} for e in explanations],
|
|
"trace": planner.trace},
|
|
indent=2,
|
|
))
|
|
else:
|
|
print(f"Plan ({len(result)} assignments):")
|
|
for e in explanations:
|
|
print(f" {e.task} -> {e.agent} ({e.reason})")
|
|
if planner.trace:
|
|
print(f"\nSolver trace ({len(planner.trace)} steps):")
|
|
for line in planner.trace:
|
|
print(f" {line}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|