Closes #86. Adds: - bin/crucible_mcp_server.py (schedule, dependency, capacity proofs) - docs/crucible-first-cut.md - playbooks/verified-logic.yaml - config.yaml crucible MCP server entry
460 lines
14 KiB
Python
460 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""Z3-backed Crucible MCP server for Timmy.
|
|
|
|
Sidecar-only. Lives in timmy-config, deploys into ~/.hermes/bin/, and is loaded
|
|
by Hermes through native MCP tool discovery. No hermes-agent fork required.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from mcp.server import FastMCP
|
|
from z3 import And, Bool, Distinct, If, Implies, Int, Optimize, Or, Sum, sat, unsat
|
|
|
|
mcp = FastMCP(
|
|
name="crucible",
|
|
instructions=(
|
|
"Formal verification sidecar for Timmy. Use these tools for scheduling, "
|
|
"dependency ordering, and resource/capacity feasibility. Return SAT/UNSAT "
|
|
"with witness models instead of fuzzy prose."
|
|
),
|
|
dependencies=["z3-solver"],
|
|
)
|
|
|
|
|
|
def _hermes_home() -> Path:
|
|
return Path(os.path.expanduser(os.getenv("HERMES_HOME", "~/.hermes")))
|
|
|
|
|
|
def _proof_dir() -> Path:
|
|
path = _hermes_home() / "logs" / "crucible"
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
|
|
def _ts() -> str:
|
|
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S_%fZ")
|
|
|
|
|
|
def _json_default(value: Any) -> Any:
|
|
if isinstance(value, Path):
|
|
return str(value)
|
|
raise TypeError(f"Unsupported type for JSON serialization: {type(value)!r}")
|
|
|
|
|
|
def _log_proof(tool_name: str, request: dict[str, Any], result: dict[str, Any]) -> str:
|
|
path = _proof_dir() / f"{_ts()}_{tool_name}.json"
|
|
payload = {
|
|
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"tool": tool_name,
|
|
"request": request,
|
|
"result": result,
|
|
}
|
|
path.write_text(json.dumps(payload, indent=2, default=_json_default))
|
|
return str(path)
|
|
|
|
|
|
def _ensure_unique(names: list[str], label: str) -> None:
|
|
if len(set(names)) != len(names):
|
|
raise ValueError(f"Duplicate {label} names are not allowed: {names}")
|
|
|
|
|
|
def _normalize_dependency(dep: Any) -> tuple[str, str, int]:
|
|
if isinstance(dep, dict):
|
|
before = dep.get("before")
|
|
after = dep.get("after")
|
|
lag = int(dep.get("lag", 0))
|
|
if not before or not after:
|
|
raise ValueError(f"Dependency dict must include before/after: {dep!r}")
|
|
return str(before), str(after), lag
|
|
if isinstance(dep, (list, tuple)) and len(dep) in (2, 3):
|
|
before = str(dep[0])
|
|
after = str(dep[1])
|
|
lag = int(dep[2]) if len(dep) == 3 else 0
|
|
return before, after, lag
|
|
raise ValueError(f"Unsupported dependency shape: {dep!r}")
|
|
|
|
|
|
def _normalize_task(task: dict[str, Any]) -> dict[str, Any]:
|
|
name = str(task["name"])
|
|
duration = int(task["duration"])
|
|
if duration <= 0:
|
|
raise ValueError(f"Task duration must be positive: {task!r}")
|
|
return {"name": name, "duration": duration}
|
|
|
|
|
|
def _normalize_item(item: dict[str, Any]) -> dict[str, Any]:
|
|
name = str(item["name"])
|
|
amount = int(item["amount"])
|
|
value = int(item.get("value", amount))
|
|
required = bool(item.get("required", False))
|
|
if amount < 0:
|
|
raise ValueError(f"Item amount must be non-negative: {item!r}")
|
|
return {
|
|
"name": name,
|
|
"amount": amount,
|
|
"value": value,
|
|
"required": required,
|
|
}
|
|
|
|
|
|
def solve_schedule_tasks(
|
|
tasks: list[dict[str, Any]],
|
|
horizon: int,
|
|
dependencies: list[Any] | None = None,
|
|
fixed_starts: dict[str, int] | None = None,
|
|
max_parallel_tasks: int = 1,
|
|
minimize_makespan: bool = True,
|
|
) -> dict[str, Any]:
|
|
tasks = [_normalize_task(task) for task in tasks]
|
|
dependencies = dependencies or []
|
|
fixed_starts = fixed_starts or {}
|
|
horizon = int(horizon)
|
|
max_parallel_tasks = int(max_parallel_tasks)
|
|
|
|
if horizon <= 0:
|
|
raise ValueError("horizon must be positive")
|
|
if max_parallel_tasks <= 0:
|
|
raise ValueError("max_parallel_tasks must be positive")
|
|
|
|
names = [task["name"] for task in tasks]
|
|
_ensure_unique(names, "task")
|
|
durations = {task["name"]: task["duration"] for task in tasks}
|
|
|
|
opt = Optimize()
|
|
start = {name: Int(f"start_{name}") for name in names}
|
|
end = {name: Int(f"end_{name}") for name in names}
|
|
makespan = Int("makespan")
|
|
|
|
for name in names:
|
|
opt.add(start[name] >= 0)
|
|
opt.add(end[name] == start[name] + durations[name])
|
|
opt.add(end[name] <= horizon)
|
|
if name in fixed_starts:
|
|
opt.add(start[name] == int(fixed_starts[name]))
|
|
|
|
for dep in dependencies:
|
|
before, after, lag = _normalize_dependency(dep)
|
|
if before not in start or after not in start:
|
|
raise ValueError(f"Unknown task in dependency {dep!r}")
|
|
opt.add(start[after] >= end[before] + lag)
|
|
|
|
# Discrete resource capacity over integer time slots.
|
|
for t in range(horizon):
|
|
active = [If(And(start[name] <= t, t < end[name]), 1, 0) for name in names]
|
|
opt.add(Sum(active) <= max_parallel_tasks)
|
|
|
|
for name in names:
|
|
opt.add(makespan >= end[name])
|
|
if minimize_makespan:
|
|
opt.minimize(makespan)
|
|
|
|
result = opt.check()
|
|
proof: dict[str, Any]
|
|
if result == sat:
|
|
model = opt.model()
|
|
schedule = []
|
|
for name in sorted(names, key=lambda n: model.eval(start[n]).as_long()):
|
|
s = model.eval(start[name]).as_long()
|
|
e = model.eval(end[name]).as_long()
|
|
schedule.append({
|
|
"name": name,
|
|
"start": s,
|
|
"end": e,
|
|
"duration": durations[name],
|
|
})
|
|
proof = {
|
|
"status": "sat",
|
|
"summary": "Schedule proven feasible.",
|
|
"horizon": horizon,
|
|
"max_parallel_tasks": max_parallel_tasks,
|
|
"makespan": model.eval(makespan).as_long(),
|
|
"schedule": schedule,
|
|
"dependencies": [
|
|
{"before": b, "after": a, "lag": lag}
|
|
for b, a, lag in (_normalize_dependency(dep) for dep in dependencies)
|
|
],
|
|
}
|
|
elif result == unsat:
|
|
proof = {
|
|
"status": "unsat",
|
|
"summary": "Schedule is impossible under the given horizon/dependency/capacity constraints.",
|
|
"horizon": horizon,
|
|
"max_parallel_tasks": max_parallel_tasks,
|
|
"dependencies": [
|
|
{"before": b, "after": a, "lag": lag}
|
|
for b, a, lag in (_normalize_dependency(dep) for dep in dependencies)
|
|
],
|
|
}
|
|
else:
|
|
proof = {
|
|
"status": "unknown",
|
|
"summary": "Solver could not prove SAT or UNSAT for this schedule.",
|
|
"horizon": horizon,
|
|
"max_parallel_tasks": max_parallel_tasks,
|
|
}
|
|
|
|
proof["proof_log"] = _log_proof(
|
|
"schedule_tasks",
|
|
{
|
|
"tasks": tasks,
|
|
"horizon": horizon,
|
|
"dependencies": dependencies,
|
|
"fixed_starts": fixed_starts,
|
|
"max_parallel_tasks": max_parallel_tasks,
|
|
"minimize_makespan": minimize_makespan,
|
|
},
|
|
proof,
|
|
)
|
|
return proof
|
|
|
|
|
|
def solve_dependency_order(
|
|
entities: list[str],
|
|
before: list[Any],
|
|
fixed_positions: dict[str, int] | None = None,
|
|
) -> dict[str, Any]:
|
|
entities = [str(entity) for entity in entities]
|
|
fixed_positions = fixed_positions or {}
|
|
_ensure_unique(entities, "entity")
|
|
|
|
opt = Optimize()
|
|
pos = {entity: Int(f"pos_{entity}") for entity in entities}
|
|
opt.add(Distinct(*pos.values()))
|
|
for entity in entities:
|
|
opt.add(pos[entity] >= 0)
|
|
opt.add(pos[entity] < len(entities))
|
|
if entity in fixed_positions:
|
|
opt.add(pos[entity] == int(fixed_positions[entity]))
|
|
|
|
normalized = []
|
|
for dep in before:
|
|
left, right, _lag = _normalize_dependency(dep)
|
|
if left not in pos or right not in pos:
|
|
raise ValueError(f"Unknown entity in ordering constraint: {dep!r}")
|
|
opt.add(pos[left] < pos[right])
|
|
normalized.append({"before": left, "after": right})
|
|
|
|
result = opt.check()
|
|
if result == sat:
|
|
model = opt.model()
|
|
ordering = sorted(entities, key=lambda entity: model.eval(pos[entity]).as_long())
|
|
proof = {
|
|
"status": "sat",
|
|
"summary": "Dependency ordering is consistent.",
|
|
"ordering": ordering,
|
|
"positions": {entity: model.eval(pos[entity]).as_long() for entity in entities},
|
|
"constraints": normalized,
|
|
}
|
|
elif result == unsat:
|
|
proof = {
|
|
"status": "unsat",
|
|
"summary": "Dependency ordering contains a contradiction/cycle.",
|
|
"constraints": normalized,
|
|
}
|
|
else:
|
|
proof = {
|
|
"status": "unknown",
|
|
"summary": "Solver could not prove SAT or UNSAT for this dependency graph.",
|
|
"constraints": normalized,
|
|
}
|
|
|
|
proof["proof_log"] = _log_proof(
|
|
"order_dependencies",
|
|
{
|
|
"entities": entities,
|
|
"before": before,
|
|
"fixed_positions": fixed_positions,
|
|
},
|
|
proof,
|
|
)
|
|
return proof
|
|
|
|
|
|
def solve_capacity_fit(
|
|
items: list[dict[str, Any]],
|
|
capacity: int,
|
|
maximize_value: bool = True,
|
|
) -> dict[str, Any]:
|
|
items = [_normalize_item(item) for item in items]
|
|
capacity = int(capacity)
|
|
if capacity < 0:
|
|
raise ValueError("capacity must be non-negative")
|
|
|
|
names = [item["name"] for item in items]
|
|
_ensure_unique(names, "item")
|
|
choose = {item["name"]: Bool(f"choose_{item['name']}") for item in items}
|
|
|
|
opt = Optimize()
|
|
for item in items:
|
|
if item["required"]:
|
|
opt.add(choose[item["name"]])
|
|
|
|
total_amount = Sum([If(choose[item["name"]], item["amount"], 0) for item in items])
|
|
total_value = Sum([If(choose[item["name"]], item["value"], 0) for item in items])
|
|
opt.add(total_amount <= capacity)
|
|
if maximize_value:
|
|
opt.maximize(total_value)
|
|
|
|
result = opt.check()
|
|
if result == sat:
|
|
model = opt.model()
|
|
chosen = [item for item in items if bool(model.eval(choose[item["name"]], model_completion=True))]
|
|
skipped = [item for item in items if item not in chosen]
|
|
used = sum(item["amount"] for item in chosen)
|
|
proof = {
|
|
"status": "sat",
|
|
"summary": "Capacity constraints are feasible.",
|
|
"capacity": capacity,
|
|
"used": used,
|
|
"remaining": capacity - used,
|
|
"chosen": chosen,
|
|
"skipped": skipped,
|
|
"total_value": sum(item["value"] for item in chosen),
|
|
}
|
|
elif result == unsat:
|
|
proof = {
|
|
"status": "unsat",
|
|
"summary": "Required items exceed available capacity.",
|
|
"capacity": capacity,
|
|
"required_items": [item for item in items if item["required"]],
|
|
}
|
|
else:
|
|
proof = {
|
|
"status": "unknown",
|
|
"summary": "Solver could not prove SAT or UNSAT for this capacity check.",
|
|
"capacity": capacity,
|
|
}
|
|
|
|
proof["proof_log"] = _log_proof(
|
|
"capacity_fit",
|
|
{
|
|
"items": items,
|
|
"capacity": capacity,
|
|
"maximize_value": maximize_value,
|
|
},
|
|
proof,
|
|
)
|
|
return proof
|
|
|
|
|
|
@mcp.tool(
|
|
name="schedule_tasks",
|
|
description=(
|
|
"Crucible template for discrete scheduling. Proves whether integer-duration "
|
|
"tasks fit within a time horizon under dependency and parallelism constraints."
|
|
),
|
|
structured_output=True,
|
|
)
|
|
def schedule_tasks(
|
|
tasks: list[dict[str, Any]],
|
|
horizon: int,
|
|
dependencies: list[Any] | None = None,
|
|
fixed_starts: dict[str, int] | None = None,
|
|
max_parallel_tasks: int = 1,
|
|
minimize_makespan: bool = True,
|
|
) -> dict[str, Any]:
|
|
return solve_schedule_tasks(
|
|
tasks=tasks,
|
|
horizon=horizon,
|
|
dependencies=dependencies,
|
|
fixed_starts=fixed_starts,
|
|
max_parallel_tasks=max_parallel_tasks,
|
|
minimize_makespan=minimize_makespan,
|
|
)
|
|
|
|
|
|
@mcp.tool(
|
|
name="order_dependencies",
|
|
description=(
|
|
"Crucible template for dependency ordering. Proves whether a set of before/after "
|
|
"constraints is consistent and returns a valid topological order when SAT."
|
|
),
|
|
structured_output=True,
|
|
)
|
|
def order_dependencies(
|
|
entities: list[str],
|
|
before: list[Any],
|
|
fixed_positions: dict[str, int] | None = None,
|
|
) -> dict[str, Any]:
|
|
return solve_dependency_order(
|
|
entities=entities,
|
|
before=before,
|
|
fixed_positions=fixed_positions,
|
|
)
|
|
|
|
|
|
@mcp.tool(
|
|
name="capacity_fit",
|
|
description=(
|
|
"Crucible template for resource capacity. Proves whether required items fit "
|
|
"within a capacity budget and chooses an optimal feasible subset of optional items."
|
|
),
|
|
structured_output=True,
|
|
)
|
|
def capacity_fit(
|
|
items: list[dict[str, Any]],
|
|
capacity: int,
|
|
maximize_value: bool = True,
|
|
) -> dict[str, Any]:
|
|
return solve_capacity_fit(items=items, capacity=capacity, maximize_value=maximize_value)
|
|
|
|
|
|
def run_selftest() -> dict[str, Any]:
|
|
return {
|
|
"schedule_unsat_single_worker": solve_schedule_tasks(
|
|
tasks=[
|
|
{"name": "A", "duration": 2},
|
|
{"name": "B", "duration": 3},
|
|
{"name": "C", "duration": 4},
|
|
],
|
|
horizon=8,
|
|
dependencies=[{"before": "A", "after": "B"}],
|
|
max_parallel_tasks=1,
|
|
),
|
|
"schedule_sat_two_workers": solve_schedule_tasks(
|
|
tasks=[
|
|
{"name": "A", "duration": 2},
|
|
{"name": "B", "duration": 3},
|
|
{"name": "C", "duration": 4},
|
|
],
|
|
horizon=8,
|
|
dependencies=[{"before": "A", "after": "B"}],
|
|
max_parallel_tasks=2,
|
|
),
|
|
"ordering_sat": solve_dependency_order(
|
|
entities=["fetch", "train", "eval"],
|
|
before=[
|
|
{"before": "fetch", "after": "train"},
|
|
{"before": "train", "after": "eval"},
|
|
],
|
|
),
|
|
"capacity_sat": solve_capacity_fit(
|
|
items=[
|
|
{"name": "gpu_job", "amount": 6, "value": 6, "required": True},
|
|
{"name": "telemetry", "amount": 1, "value": 1, "required": True},
|
|
{"name": "export", "amount": 2, "value": 4, "required": False},
|
|
{"name": "viz", "amount": 3, "value": 5, "required": False},
|
|
],
|
|
capacity=8,
|
|
),
|
|
}
|
|
|
|
|
|
def main() -> int:
|
|
if len(sys.argv) > 1 and sys.argv[1] == "selftest":
|
|
print(json.dumps(run_selftest(), indent=2))
|
|
return 0
|
|
mcp.run(transport="stdio")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|