feat(crucible): Z3 sidecar MCP verifier -- rebased onto current main
Closes #86. Adds: - bin/crucible_mcp_server.py (schedule, dependency, capacity proofs) - docs/crucible-first-cut.md - playbooks/verified-logic.yaml - config.yaml crucible MCP server entry
This commit is contained in:
459
bin/crucible_mcp_server.py
Normal file
459
bin/crucible_mcp_server.py
Normal file
@@ -0,0 +1,459 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Z3-backed Crucible MCP server for Timmy.
|
||||
|
||||
Sidecar-only. Lives in timmy-config, deploys into ~/.hermes/bin/, and is loaded
|
||||
by Hermes through native MCP tool discovery. No hermes-agent fork required.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from mcp.server import FastMCP
|
||||
from z3 import And, Bool, Distinct, If, Implies, Int, Optimize, Or, Sum, sat, unsat
|
||||
|
||||
mcp = FastMCP(
|
||||
name="crucible",
|
||||
instructions=(
|
||||
"Formal verification sidecar for Timmy. Use these tools for scheduling, "
|
||||
"dependency ordering, and resource/capacity feasibility. Return SAT/UNSAT "
|
||||
"with witness models instead of fuzzy prose."
|
||||
),
|
||||
dependencies=["z3-solver"],
|
||||
)
|
||||
|
||||
|
||||
def _hermes_home() -> Path:
|
||||
return Path(os.path.expanduser(os.getenv("HERMES_HOME", "~/.hermes")))
|
||||
|
||||
|
||||
def _proof_dir() -> Path:
|
||||
path = _hermes_home() / "logs" / "crucible"
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
return path
|
||||
|
||||
|
||||
def _ts() -> str:
|
||||
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S_%fZ")
|
||||
|
||||
|
||||
def _json_default(value: Any) -> Any:
|
||||
if isinstance(value, Path):
|
||||
return str(value)
|
||||
raise TypeError(f"Unsupported type for JSON serialization: {type(value)!r}")
|
||||
|
||||
|
||||
def _log_proof(tool_name: str, request: dict[str, Any], result: dict[str, Any]) -> str:
|
||||
path = _proof_dir() / f"{_ts()}_{tool_name}.json"
|
||||
payload = {
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"tool": tool_name,
|
||||
"request": request,
|
||||
"result": result,
|
||||
}
|
||||
path.write_text(json.dumps(payload, indent=2, default=_json_default))
|
||||
return str(path)
|
||||
|
||||
|
||||
def _ensure_unique(names: list[str], label: str) -> None:
|
||||
if len(set(names)) != len(names):
|
||||
raise ValueError(f"Duplicate {label} names are not allowed: {names}")
|
||||
|
||||
|
||||
def _normalize_dependency(dep: Any) -> tuple[str, str, int]:
|
||||
if isinstance(dep, dict):
|
||||
before = dep.get("before")
|
||||
after = dep.get("after")
|
||||
lag = int(dep.get("lag", 0))
|
||||
if not before or not after:
|
||||
raise ValueError(f"Dependency dict must include before/after: {dep!r}")
|
||||
return str(before), str(after), lag
|
||||
if isinstance(dep, (list, tuple)) and len(dep) in (2, 3):
|
||||
before = str(dep[0])
|
||||
after = str(dep[1])
|
||||
lag = int(dep[2]) if len(dep) == 3 else 0
|
||||
return before, after, lag
|
||||
raise ValueError(f"Unsupported dependency shape: {dep!r}")
|
||||
|
||||
|
||||
def _normalize_task(task: dict[str, Any]) -> dict[str, Any]:
|
||||
name = str(task["name"])
|
||||
duration = int(task["duration"])
|
||||
if duration <= 0:
|
||||
raise ValueError(f"Task duration must be positive: {task!r}")
|
||||
return {"name": name, "duration": duration}
|
||||
|
||||
|
||||
def _normalize_item(item: dict[str, Any]) -> dict[str, Any]:
|
||||
name = str(item["name"])
|
||||
amount = int(item["amount"])
|
||||
value = int(item.get("value", amount))
|
||||
required = bool(item.get("required", False))
|
||||
if amount < 0:
|
||||
raise ValueError(f"Item amount must be non-negative: {item!r}")
|
||||
return {
|
||||
"name": name,
|
||||
"amount": amount,
|
||||
"value": value,
|
||||
"required": required,
|
||||
}
|
||||
|
||||
|
||||
def solve_schedule_tasks(
|
||||
tasks: list[dict[str, Any]],
|
||||
horizon: int,
|
||||
dependencies: list[Any] | None = None,
|
||||
fixed_starts: dict[str, int] | None = None,
|
||||
max_parallel_tasks: int = 1,
|
||||
minimize_makespan: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
tasks = [_normalize_task(task) for task in tasks]
|
||||
dependencies = dependencies or []
|
||||
fixed_starts = fixed_starts or {}
|
||||
horizon = int(horizon)
|
||||
max_parallel_tasks = int(max_parallel_tasks)
|
||||
|
||||
if horizon <= 0:
|
||||
raise ValueError("horizon must be positive")
|
||||
if max_parallel_tasks <= 0:
|
||||
raise ValueError("max_parallel_tasks must be positive")
|
||||
|
||||
names = [task["name"] for task in tasks]
|
||||
_ensure_unique(names, "task")
|
||||
durations = {task["name"]: task["duration"] for task in tasks}
|
||||
|
||||
opt = Optimize()
|
||||
start = {name: Int(f"start_{name}") for name in names}
|
||||
end = {name: Int(f"end_{name}") for name in names}
|
||||
makespan = Int("makespan")
|
||||
|
||||
for name in names:
|
||||
opt.add(start[name] >= 0)
|
||||
opt.add(end[name] == start[name] + durations[name])
|
||||
opt.add(end[name] <= horizon)
|
||||
if name in fixed_starts:
|
||||
opt.add(start[name] == int(fixed_starts[name]))
|
||||
|
||||
for dep in dependencies:
|
||||
before, after, lag = _normalize_dependency(dep)
|
||||
if before not in start or after not in start:
|
||||
raise ValueError(f"Unknown task in dependency {dep!r}")
|
||||
opt.add(start[after] >= end[before] + lag)
|
||||
|
||||
# Discrete resource capacity over integer time slots.
|
||||
for t in range(horizon):
|
||||
active = [If(And(start[name] <= t, t < end[name]), 1, 0) for name in names]
|
||||
opt.add(Sum(active) <= max_parallel_tasks)
|
||||
|
||||
for name in names:
|
||||
opt.add(makespan >= end[name])
|
||||
if minimize_makespan:
|
||||
opt.minimize(makespan)
|
||||
|
||||
result = opt.check()
|
||||
proof: dict[str, Any]
|
||||
if result == sat:
|
||||
model = opt.model()
|
||||
schedule = []
|
||||
for name in sorted(names, key=lambda n: model.eval(start[n]).as_long()):
|
||||
s = model.eval(start[name]).as_long()
|
||||
e = model.eval(end[name]).as_long()
|
||||
schedule.append({
|
||||
"name": name,
|
||||
"start": s,
|
||||
"end": e,
|
||||
"duration": durations[name],
|
||||
})
|
||||
proof = {
|
||||
"status": "sat",
|
||||
"summary": "Schedule proven feasible.",
|
||||
"horizon": horizon,
|
||||
"max_parallel_tasks": max_parallel_tasks,
|
||||
"makespan": model.eval(makespan).as_long(),
|
||||
"schedule": schedule,
|
||||
"dependencies": [
|
||||
{"before": b, "after": a, "lag": lag}
|
||||
for b, a, lag in (_normalize_dependency(dep) for dep in dependencies)
|
||||
],
|
||||
}
|
||||
elif result == unsat:
|
||||
proof = {
|
||||
"status": "unsat",
|
||||
"summary": "Schedule is impossible under the given horizon/dependency/capacity constraints.",
|
||||
"horizon": horizon,
|
||||
"max_parallel_tasks": max_parallel_tasks,
|
||||
"dependencies": [
|
||||
{"before": b, "after": a, "lag": lag}
|
||||
for b, a, lag in (_normalize_dependency(dep) for dep in dependencies)
|
||||
],
|
||||
}
|
||||
else:
|
||||
proof = {
|
||||
"status": "unknown",
|
||||
"summary": "Solver could not prove SAT or UNSAT for this schedule.",
|
||||
"horizon": horizon,
|
||||
"max_parallel_tasks": max_parallel_tasks,
|
||||
}
|
||||
|
||||
proof["proof_log"] = _log_proof(
|
||||
"schedule_tasks",
|
||||
{
|
||||
"tasks": tasks,
|
||||
"horizon": horizon,
|
||||
"dependencies": dependencies,
|
||||
"fixed_starts": fixed_starts,
|
||||
"max_parallel_tasks": max_parallel_tasks,
|
||||
"minimize_makespan": minimize_makespan,
|
||||
},
|
||||
proof,
|
||||
)
|
||||
return proof
|
||||
|
||||
|
||||
def solve_dependency_order(
|
||||
entities: list[str],
|
||||
before: list[Any],
|
||||
fixed_positions: dict[str, int] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
entities = [str(entity) for entity in entities]
|
||||
fixed_positions = fixed_positions or {}
|
||||
_ensure_unique(entities, "entity")
|
||||
|
||||
opt = Optimize()
|
||||
pos = {entity: Int(f"pos_{entity}") for entity in entities}
|
||||
opt.add(Distinct(*pos.values()))
|
||||
for entity in entities:
|
||||
opt.add(pos[entity] >= 0)
|
||||
opt.add(pos[entity] < len(entities))
|
||||
if entity in fixed_positions:
|
||||
opt.add(pos[entity] == int(fixed_positions[entity]))
|
||||
|
||||
normalized = []
|
||||
for dep in before:
|
||||
left, right, _lag = _normalize_dependency(dep)
|
||||
if left not in pos or right not in pos:
|
||||
raise ValueError(f"Unknown entity in ordering constraint: {dep!r}")
|
||||
opt.add(pos[left] < pos[right])
|
||||
normalized.append({"before": left, "after": right})
|
||||
|
||||
result = opt.check()
|
||||
if result == sat:
|
||||
model = opt.model()
|
||||
ordering = sorted(entities, key=lambda entity: model.eval(pos[entity]).as_long())
|
||||
proof = {
|
||||
"status": "sat",
|
||||
"summary": "Dependency ordering is consistent.",
|
||||
"ordering": ordering,
|
||||
"positions": {entity: model.eval(pos[entity]).as_long() for entity in entities},
|
||||
"constraints": normalized,
|
||||
}
|
||||
elif result == unsat:
|
||||
proof = {
|
||||
"status": "unsat",
|
||||
"summary": "Dependency ordering contains a contradiction/cycle.",
|
||||
"constraints": normalized,
|
||||
}
|
||||
else:
|
||||
proof = {
|
||||
"status": "unknown",
|
||||
"summary": "Solver could not prove SAT or UNSAT for this dependency graph.",
|
||||
"constraints": normalized,
|
||||
}
|
||||
|
||||
proof["proof_log"] = _log_proof(
|
||||
"order_dependencies",
|
||||
{
|
||||
"entities": entities,
|
||||
"before": before,
|
||||
"fixed_positions": fixed_positions,
|
||||
},
|
||||
proof,
|
||||
)
|
||||
return proof
|
||||
|
||||
|
||||
def solve_capacity_fit(
|
||||
items: list[dict[str, Any]],
|
||||
capacity: int,
|
||||
maximize_value: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
items = [_normalize_item(item) for item in items]
|
||||
capacity = int(capacity)
|
||||
if capacity < 0:
|
||||
raise ValueError("capacity must be non-negative")
|
||||
|
||||
names = [item["name"] for item in items]
|
||||
_ensure_unique(names, "item")
|
||||
choose = {item["name"]: Bool(f"choose_{item['name']}") for item in items}
|
||||
|
||||
opt = Optimize()
|
||||
for item in items:
|
||||
if item["required"]:
|
||||
opt.add(choose[item["name"]])
|
||||
|
||||
total_amount = Sum([If(choose[item["name"]], item["amount"], 0) for item in items])
|
||||
total_value = Sum([If(choose[item["name"]], item["value"], 0) for item in items])
|
||||
opt.add(total_amount <= capacity)
|
||||
if maximize_value:
|
||||
opt.maximize(total_value)
|
||||
|
||||
result = opt.check()
|
||||
if result == sat:
|
||||
model = opt.model()
|
||||
chosen = [item for item in items if bool(model.eval(choose[item["name"]], model_completion=True))]
|
||||
skipped = [item for item in items if item not in chosen]
|
||||
used = sum(item["amount"] for item in chosen)
|
||||
proof = {
|
||||
"status": "sat",
|
||||
"summary": "Capacity constraints are feasible.",
|
||||
"capacity": capacity,
|
||||
"used": used,
|
||||
"remaining": capacity - used,
|
||||
"chosen": chosen,
|
||||
"skipped": skipped,
|
||||
"total_value": sum(item["value"] for item in chosen),
|
||||
}
|
||||
elif result == unsat:
|
||||
proof = {
|
||||
"status": "unsat",
|
||||
"summary": "Required items exceed available capacity.",
|
||||
"capacity": capacity,
|
||||
"required_items": [item for item in items if item["required"]],
|
||||
}
|
||||
else:
|
||||
proof = {
|
||||
"status": "unknown",
|
||||
"summary": "Solver could not prove SAT or UNSAT for this capacity check.",
|
||||
"capacity": capacity,
|
||||
}
|
||||
|
||||
proof["proof_log"] = _log_proof(
|
||||
"capacity_fit",
|
||||
{
|
||||
"items": items,
|
||||
"capacity": capacity,
|
||||
"maximize_value": maximize_value,
|
||||
},
|
||||
proof,
|
||||
)
|
||||
return proof
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="schedule_tasks",
|
||||
description=(
|
||||
"Crucible template for discrete scheduling. Proves whether integer-duration "
|
||||
"tasks fit within a time horizon under dependency and parallelism constraints."
|
||||
),
|
||||
structured_output=True,
|
||||
)
|
||||
def schedule_tasks(
|
||||
tasks: list[dict[str, Any]],
|
||||
horizon: int,
|
||||
dependencies: list[Any] | None = None,
|
||||
fixed_starts: dict[str, int] | None = None,
|
||||
max_parallel_tasks: int = 1,
|
||||
minimize_makespan: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
return solve_schedule_tasks(
|
||||
tasks=tasks,
|
||||
horizon=horizon,
|
||||
dependencies=dependencies,
|
||||
fixed_starts=fixed_starts,
|
||||
max_parallel_tasks=max_parallel_tasks,
|
||||
minimize_makespan=minimize_makespan,
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="order_dependencies",
|
||||
description=(
|
||||
"Crucible template for dependency ordering. Proves whether a set of before/after "
|
||||
"constraints is consistent and returns a valid topological order when SAT."
|
||||
),
|
||||
structured_output=True,
|
||||
)
|
||||
def order_dependencies(
|
||||
entities: list[str],
|
||||
before: list[Any],
|
||||
fixed_positions: dict[str, int] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
return solve_dependency_order(
|
||||
entities=entities,
|
||||
before=before,
|
||||
fixed_positions=fixed_positions,
|
||||
)
|
||||
|
||||
|
||||
@mcp.tool(
|
||||
name="capacity_fit",
|
||||
description=(
|
||||
"Crucible template for resource capacity. Proves whether required items fit "
|
||||
"within a capacity budget and chooses an optimal feasible subset of optional items."
|
||||
),
|
||||
structured_output=True,
|
||||
)
|
||||
def capacity_fit(
|
||||
items: list[dict[str, Any]],
|
||||
capacity: int,
|
||||
maximize_value: bool = True,
|
||||
) -> dict[str, Any]:
|
||||
return solve_capacity_fit(items=items, capacity=capacity, maximize_value=maximize_value)
|
||||
|
||||
|
||||
def run_selftest() -> dict[str, Any]:
|
||||
return {
|
||||
"schedule_unsat_single_worker": solve_schedule_tasks(
|
||||
tasks=[
|
||||
{"name": "A", "duration": 2},
|
||||
{"name": "B", "duration": 3},
|
||||
{"name": "C", "duration": 4},
|
||||
],
|
||||
horizon=8,
|
||||
dependencies=[{"before": "A", "after": "B"}],
|
||||
max_parallel_tasks=1,
|
||||
),
|
||||
"schedule_sat_two_workers": solve_schedule_tasks(
|
||||
tasks=[
|
||||
{"name": "A", "duration": 2},
|
||||
{"name": "B", "duration": 3},
|
||||
{"name": "C", "duration": 4},
|
||||
],
|
||||
horizon=8,
|
||||
dependencies=[{"before": "A", "after": "B"}],
|
||||
max_parallel_tasks=2,
|
||||
),
|
||||
"ordering_sat": solve_dependency_order(
|
||||
entities=["fetch", "train", "eval"],
|
||||
before=[
|
||||
{"before": "fetch", "after": "train"},
|
||||
{"before": "train", "after": "eval"},
|
||||
],
|
||||
),
|
||||
"capacity_sat": solve_capacity_fit(
|
||||
items=[
|
||||
{"name": "gpu_job", "amount": 6, "value": 6, "required": True},
|
||||
{"name": "telemetry", "amount": 1, "value": 1, "required": True},
|
||||
{"name": "export", "amount": 2, "value": 4, "required": False},
|
||||
{"name": "viz", "amount": 3, "value": 5, "required": False},
|
||||
],
|
||||
capacity=8,
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
def main() -> int:
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "selftest":
|
||||
print(json.dumps(run_selftest(), indent=2))
|
||||
return 0
|
||||
mcp.run(transport="stdio")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user