Files
timmy-config/bin/crucible_mcp_server.py

460 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Z3-backed Crucible MCP server for Timmy.
Sidecar-only. Lives in timmy-config, deploys into ~/.hermes/bin/, and is loaded
by Hermes through native MCP tool discovery. No hermes-agent fork required.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from mcp.server import FastMCP
from z3 import And, Bool, Distinct, If, Implies, Int, Optimize, Or, Sum, sat, unsat
mcp = FastMCP(
name="crucible",
instructions=(
"Formal verification sidecar for Timmy. Use these tools for scheduling, "
"dependency ordering, and resource/capacity feasibility. Return SAT/UNSAT "
"with witness models instead of fuzzy prose."
),
dependencies=["z3-solver"],
)
def _hermes_home() -> Path:
return Path(os.path.expanduser(os.getenv("HERMES_HOME", "~/.hermes")))
def _proof_dir() -> Path:
path = _hermes_home() / "logs" / "crucible"
path.mkdir(parents=True, exist_ok=True)
return path
def _ts() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S_%fZ")
def _json_default(value: Any) -> Any:
if isinstance(value, Path):
return str(value)
raise TypeError(f"Unsupported type for JSON serialization: {type(value)!r}")
def _log_proof(tool_name: str, request: dict[str, Any], result: dict[str, Any]) -> str:
path = _proof_dir() / f"{_ts()}_{tool_name}.json"
payload = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool": tool_name,
"request": request,
"result": result,
}
path.write_text(json.dumps(payload, indent=2, default=_json_default))
return str(path)
def _ensure_unique(names: list[str], label: str) -> None:
if len(set(names)) != len(names):
raise ValueError(f"Duplicate {label} names are not allowed: {names}")
def _normalize_dependency(dep: Any) -> tuple[str, str, int]:
if isinstance(dep, dict):
before = dep.get("before")
after = dep.get("after")
lag = int(dep.get("lag", 0))
if not before or not after:
raise ValueError(f"Dependency dict must include before/after: {dep!r}")
return str(before), str(after), lag
if isinstance(dep, (list, tuple)) and len(dep) in (2, 3):
before = str(dep[0])
after = str(dep[1])
lag = int(dep[2]) if len(dep) == 3 else 0
return before, after, lag
raise ValueError(f"Unsupported dependency shape: {dep!r}")
def _normalize_task(task: dict[str, Any]) -> dict[str, Any]:
name = str(task["name"])
duration = int(task["duration"])
if duration <= 0:
raise ValueError(f"Task duration must be positive: {task!r}")
return {"name": name, "duration": duration}
def _normalize_item(item: dict[str, Any]) -> dict[str, Any]:
name = str(item["name"])
amount = int(item["amount"])
value = int(item.get("value", amount))
required = bool(item.get("required", False))
if amount < 0:
raise ValueError(f"Item amount must be non-negative: {item!r}")
return {
"name": name,
"amount": amount,
"value": value,
"required": required,
}
def solve_schedule_tasks(
tasks: list[dict[str, Any]],
horizon: int,
dependencies: list[Any] | None = None,
fixed_starts: dict[str, int] | None = None,
max_parallel_tasks: int = 1,
minimize_makespan: bool = True,
) -> dict[str, Any]:
tasks = [_normalize_task(task) for task in tasks]
dependencies = dependencies or []
fixed_starts = fixed_starts or {}
horizon = int(horizon)
max_parallel_tasks = int(max_parallel_tasks)
if horizon <= 0:
raise ValueError("horizon must be positive")
if max_parallel_tasks <= 0:
raise ValueError("max_parallel_tasks must be positive")
names = [task["name"] for task in tasks]
_ensure_unique(names, "task")
durations = {task["name"]: task["duration"] for task in tasks}
opt = Optimize()
start = {name: Int(f"start_{name}") for name in names}
end = {name: Int(f"end_{name}") for name in names}
makespan = Int("makespan")
for name in names:
opt.add(start[name] >= 0)
opt.add(end[name] == start[name] + durations[name])
opt.add(end[name] <= horizon)
if name in fixed_starts:
opt.add(start[name] == int(fixed_starts[name]))
for dep in dependencies:
before, after, lag = _normalize_dependency(dep)
if before not in start or after not in start:
raise ValueError(f"Unknown task in dependency {dep!r}")
opt.add(start[after] >= end[before] + lag)
# Discrete resource capacity over integer time slots.
for t in range(horizon):
active = [If(And(start[name] <= t, t < end[name]), 1, 0) for name in names]
opt.add(Sum(active) <= max_parallel_tasks)
for name in names:
opt.add(makespan >= end[name])
if minimize_makespan:
opt.minimize(makespan)
result = opt.check()
proof: dict[str, Any]
if result == sat:
model = opt.model()
schedule = []
for name in sorted(names, key=lambda n: model.eval(start[n]).as_long()):
s = model.eval(start[name]).as_long()
e = model.eval(end[name]).as_long()
schedule.append({
"name": name,
"start": s,
"end": e,
"duration": durations[name],
})
proof = {
"status": "sat",
"summary": "Schedule proven feasible.",
"horizon": horizon,
"max_parallel_tasks": max_parallel_tasks,
"makespan": model.eval(makespan).as_long(),
"schedule": schedule,
"dependencies": [
{"before": b, "after": a, "lag": lag}
for b, a, lag in (_normalize_dependency(dep) for dep in dependencies)
],
}
elif result == unsat:
proof = {
"status": "unsat",
"summary": "Schedule is impossible under the given horizon/dependency/capacity constraints.",
"horizon": horizon,
"max_parallel_tasks": max_parallel_tasks,
"dependencies": [
{"before": b, "after": a, "lag": lag}
for b, a, lag in (_normalize_dependency(dep) for dep in dependencies)
],
}
else:
proof = {
"status": "unknown",
"summary": "Solver could not prove SAT or UNSAT for this schedule.",
"horizon": horizon,
"max_parallel_tasks": max_parallel_tasks,
}
proof["proof_log"] = _log_proof(
"schedule_tasks",
{
"tasks": tasks,
"horizon": horizon,
"dependencies": dependencies,
"fixed_starts": fixed_starts,
"max_parallel_tasks": max_parallel_tasks,
"minimize_makespan": minimize_makespan,
},
proof,
)
return proof
def solve_dependency_order(
entities: list[str],
before: list[Any],
fixed_positions: dict[str, int] | None = None,
) -> dict[str, Any]:
entities = [str(entity) for entity in entities]
fixed_positions = fixed_positions or {}
_ensure_unique(entities, "entity")
opt = Optimize()
pos = {entity: Int(f"pos_{entity}") for entity in entities}
opt.add(Distinct(*pos.values()))
for entity in entities:
opt.add(pos[entity] >= 0)
opt.add(pos[entity] < len(entities))
if entity in fixed_positions:
opt.add(pos[entity] == int(fixed_positions[entity]))
normalized = []
for dep in before:
left, right, _lag = _normalize_dependency(dep)
if left not in pos or right not in pos:
raise ValueError(f"Unknown entity in ordering constraint: {dep!r}")
opt.add(pos[left] < pos[right])
normalized.append({"before": left, "after": right})
result = opt.check()
if result == sat:
model = opt.model()
ordering = sorted(entities, key=lambda entity: model.eval(pos[entity]).as_long())
proof = {
"status": "sat",
"summary": "Dependency ordering is consistent.",
"ordering": ordering,
"positions": {entity: model.eval(pos[entity]).as_long() for entity in entities},
"constraints": normalized,
}
elif result == unsat:
proof = {
"status": "unsat",
"summary": "Dependency ordering contains a contradiction/cycle.",
"constraints": normalized,
}
else:
proof = {
"status": "unknown",
"summary": "Solver could not prove SAT or UNSAT for this dependency graph.",
"constraints": normalized,
}
proof["proof_log"] = _log_proof(
"order_dependencies",
{
"entities": entities,
"before": before,
"fixed_positions": fixed_positions,
},
proof,
)
return proof
def solve_capacity_fit(
items: list[dict[str, Any]],
capacity: int,
maximize_value: bool = True,
) -> dict[str, Any]:
items = [_normalize_item(item) for item in items]
capacity = int(capacity)
if capacity < 0:
raise ValueError("capacity must be non-negative")
names = [item["name"] for item in items]
_ensure_unique(names, "item")
choose = {item["name"]: Bool(f"choose_{item['name']}") for item in items}
opt = Optimize()
for item in items:
if item["required"]:
opt.add(choose[item["name"]])
total_amount = Sum([If(choose[item["name"]], item["amount"], 0) for item in items])
total_value = Sum([If(choose[item["name"]], item["value"], 0) for item in items])
opt.add(total_amount <= capacity)
if maximize_value:
opt.maximize(total_value)
result = opt.check()
if result == sat:
model = opt.model()
chosen = [item for item in items if bool(model.eval(choose[item["name"]], model_completion=True))]
skipped = [item for item in items if item not in chosen]
used = sum(item["amount"] for item in chosen)
proof = {
"status": "sat",
"summary": "Capacity constraints are feasible.",
"capacity": capacity,
"used": used,
"remaining": capacity - used,
"chosen": chosen,
"skipped": skipped,
"total_value": sum(item["value"] for item in chosen),
}
elif result == unsat:
proof = {
"status": "unsat",
"summary": "Required items exceed available capacity.",
"capacity": capacity,
"required_items": [item for item in items if item["required"]],
}
else:
proof = {
"status": "unknown",
"summary": "Solver could not prove SAT or UNSAT for this capacity check.",
"capacity": capacity,
}
proof["proof_log"] = _log_proof(
"capacity_fit",
{
"items": items,
"capacity": capacity,
"maximize_value": maximize_value,
},
proof,
)
return proof
@mcp.tool(
name="schedule_tasks",
description=(
"Crucible template for discrete scheduling. Proves whether integer-duration "
"tasks fit within a time horizon under dependency and parallelism constraints."
),
structured_output=True,
)
def schedule_tasks(
tasks: list[dict[str, Any]],
horizon: int,
dependencies: list[Any] | None = None,
fixed_starts: dict[str, int] | None = None,
max_parallel_tasks: int = 1,
minimize_makespan: bool = True,
) -> dict[str, Any]:
return solve_schedule_tasks(
tasks=tasks,
horizon=horizon,
dependencies=dependencies,
fixed_starts=fixed_starts,
max_parallel_tasks=max_parallel_tasks,
minimize_makespan=minimize_makespan,
)
@mcp.tool(
name="order_dependencies",
description=(
"Crucible template for dependency ordering. Proves whether a set of before/after "
"constraints is consistent and returns a valid topological order when SAT."
),
structured_output=True,
)
def order_dependencies(
entities: list[str],
before: list[Any],
fixed_positions: dict[str, int] | None = None,
) -> dict[str, Any]:
return solve_dependency_order(
entities=entities,
before=before,
fixed_positions=fixed_positions,
)
@mcp.tool(
name="capacity_fit",
description=(
"Crucible template for resource capacity. Proves whether required items fit "
"within a capacity budget and chooses an optimal feasible subset of optional items."
),
structured_output=True,
)
def capacity_fit(
items: list[dict[str, Any]],
capacity: int,
maximize_value: bool = True,
) -> dict[str, Any]:
return solve_capacity_fit(items=items, capacity=capacity, maximize_value=maximize_value)
def run_selftest() -> dict[str, Any]:
return {
"schedule_unsat_single_worker": solve_schedule_tasks(
tasks=[
{"name": "A", "duration": 2},
{"name": "B", "duration": 3},
{"name": "C", "duration": 4},
],
horizon=8,
dependencies=[{"before": "A", "after": "B"}],
max_parallel_tasks=1,
),
"schedule_sat_two_workers": solve_schedule_tasks(
tasks=[
{"name": "A", "duration": 2},
{"name": "B", "duration": 3},
{"name": "C", "duration": 4},
],
horizon=8,
dependencies=[{"before": "A", "after": "B"}],
max_parallel_tasks=2,
),
"ordering_sat": solve_dependency_order(
entities=["fetch", "train", "eval"],
before=[
{"before": "fetch", "after": "train"},
{"before": "train", "after": "eval"},
],
),
"capacity_sat": solve_capacity_fit(
items=[
{"name": "gpu_job", "amount": 6, "value": 6, "required": True},
{"name": "telemetry", "amount": 1, "value": 1, "required": True},
{"name": "export", "amount": 2, "value": 4, "required": False},
{"name": "viz", "amount": 3, "value": 5, "required": False},
],
capacity=8,
),
}
def main() -> int:
if len(sys.argv) > 1 and sys.argv[1] == "selftest":
print(json.dumps(run_selftest(), indent=2))
return 0
mcp.run(transport="stdio")
return 0
if __name__ == "__main__":
raise SystemExit(main())