Some checks failed
Validate Config / Playbook Schema Validation (pull_request) Successful in 7s
Architecture Lint / Linter Tests (pull_request) Successful in 10s
Architecture Lint / Lint Repository (pull_request) Failing after 7s
PR Checklist / pr-checklist (pull_request) Failing after 1m15s
Validate Config / JSON Validate (pull_request) Successful in 5s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 6s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 6s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 6s
277 lines
9.8 KiB
Python
277 lines
9.8 KiB
Python
#!/usr/bin/env python3
|
|
"""symbolic_reasoner.py — Forward-chaining rule engine for the Timmy Foundation fleet.
|
|
|
|
A classical GOFAI approach: declarative IF-THEN rules evaluated over a
|
|
working-memory of facts. Rules fire until quiescence (no new facts) or
|
|
a configurable cycle limit. Designed to sit *beside* the LLM layer so
|
|
that hard policy constraints never depend on probabilistic inference.
|
|
|
|
Usage:
|
|
python symbolic_reasoner.py --rules rules.yaml --facts facts.yaml
|
|
python symbolic_reasoner.py --self-test
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple
|
|
|
|
try:
|
|
import yaml
|
|
except ImportError:
|
|
yaml = None # graceful fallback — JSON-only mode
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Domain types
|
|
# ---------------------------------------------------------------------------
|
|
Fact = Tuple[str, ...] # e.g. ("agent", "timmy", "role", "infrastructure")
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Rule:
|
|
"""A single IF-THEN production rule."""
|
|
name: str
|
|
conditions: FrozenSet[Fact] # all must be present
|
|
negations: FrozenSet[Fact] # none may be present
|
|
conclusions: FrozenSet[Fact] # added when rule fires
|
|
priority: int = 0 # higher fires first
|
|
|
|
def satisfied(self, wm: Set[Fact]) -> bool:
|
|
return self.conditions.issubset(wm) and self.negations.isdisjoint(wm)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Engine
|
|
# ---------------------------------------------------------------------------
|
|
class SymbolicReasoner:
|
|
"""Forward-chaining production system."""
|
|
|
|
def __init__(self, rules: List[Rule], *, cycle_limit: int = 200):
|
|
self._rules = sorted(rules, key=lambda r: -r.priority)
|
|
self._cycle_limit = cycle_limit
|
|
self._trace: List[str] = []
|
|
|
|
# -- public API ---------------------------------------------------------
|
|
|
|
def infer(self, initial_facts: Set[Fact]) -> Set[Fact]:
|
|
"""Run to quiescence and return the final working-memory."""
|
|
wm = set(initial_facts)
|
|
fired: Set[str] = set()
|
|
for cycle in range(self._cycle_limit):
|
|
progress = False
|
|
for rule in self._rules:
|
|
if rule.name in fired:
|
|
continue
|
|
if rule.satisfied(wm):
|
|
new = rule.conclusions - wm
|
|
if new:
|
|
wm |= new
|
|
fired.add(rule.name)
|
|
self._trace.append(
|
|
f"cycle {cycle}: {rule.name} => {_fmt_facts(new)}"
|
|
)
|
|
progress = True
|
|
break # restart from highest-priority rule
|
|
if not progress:
|
|
break
|
|
return wm
|
|
|
|
def query(self, wm: Set[Fact], pattern: Tuple[Optional[str], ...]) -> List[Fact]:
|
|
"""Return facts matching *pattern* (None = wildcard)."""
|
|
return [
|
|
f for f in wm
|
|
if len(f) == len(pattern)
|
|
and all(p is None or p == v for p, v in zip(pattern, f))
|
|
]
|
|
|
|
@property
|
|
def trace(self) -> List[str]:
|
|
return list(self._trace)
|
|
|
|
# -- serialisation helpers -----------------------------------------------
|
|
|
|
@classmethod
|
|
def from_dicts(cls, raw_rules: List[Dict], **kw) -> "SymbolicReasoner":
|
|
rules = [_parse_rule(r) for r in raw_rules]
|
|
return cls(rules, **kw)
|
|
|
|
@classmethod
|
|
def from_file(cls, path: Path, **kw) -> "SymbolicReasoner":
|
|
text = path.read_text()
|
|
if path.suffix in (".yaml", ".yml"):
|
|
if yaml is None:
|
|
raise RuntimeError("PyYAML required for .yaml rules")
|
|
data = yaml.safe_load(text)
|
|
else:
|
|
data = json.loads(text)
|
|
return cls.from_dicts(data["rules"], **kw)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Parsing helpers
|
|
# ---------------------------------------------------------------------------
|
|
def _parse_fact(raw: list | str) -> Fact:
|
|
if isinstance(raw, str):
|
|
return tuple(raw.split())
|
|
return tuple(str(x) for x in raw)
|
|
|
|
|
|
def _parse_rule(d: Dict) -> Rule:
|
|
return Rule(
|
|
name=d["name"],
|
|
conditions=frozenset(_parse_fact(c) for c in d.get("if", [])),
|
|
negations=frozenset(_parse_fact(c) for c in d.get("unless", [])),
|
|
conclusions=frozenset(_parse_fact(c) for c in d.get("then", [])),
|
|
priority=d.get("priority", 0),
|
|
)
|
|
|
|
|
|
def _fmt_facts(facts: Set[Fact]) -> str:
|
|
return ", ".join(" ".join(f) for f in sorted(facts))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Built-in fleet rules (loaded when no --rules file is given)
|
|
# ---------------------------------------------------------------------------
|
|
DEFAULT_FLEET_RULES: List[Dict] = [
|
|
{
|
|
"name": "route-ci-to-timmy",
|
|
"if": [["task", "category", "ci"]],
|
|
"then": [["assign", "timmy"], ["reason", "timmy", "best-ci-merge-rate"]],
|
|
"priority": 10,
|
|
},
|
|
{
|
|
"name": "route-security-to-timmy",
|
|
"if": [["task", "category", "security"]],
|
|
"then": [["assign", "timmy"], ["reason", "timmy", "security-specialist"]],
|
|
"priority": 10,
|
|
},
|
|
{
|
|
"name": "route-architecture-to-gemini",
|
|
"if": [["task", "category", "architecture"]],
|
|
"unless": [["assign", "timmy"]],
|
|
"then": [["assign", "gemini"], ["reason", "gemini", "architecture-strength"]],
|
|
"priority": 8,
|
|
},
|
|
{
|
|
"name": "route-review-to-allegro",
|
|
"if": [["task", "category", "review"]],
|
|
"then": [["assign", "allegro"], ["reason", "allegro", "highest-quality-per-pr"]],
|
|
"priority": 9,
|
|
},
|
|
{
|
|
"name": "route-frontend-to-claude",
|
|
"if": [["task", "category", "frontend"]],
|
|
"unless": [["task", "repo", "fleet-ops"]],
|
|
"then": [["assign", "claude"], ["reason", "claude", "high-volume-frontend"]],
|
|
"priority": 5,
|
|
},
|
|
{
|
|
"name": "block-merge-without-review",
|
|
"if": [["pr", "status", "open"], ["pr", "reviews", "0"]],
|
|
"then": [["pr", "action", "block-merge"], ["reason", "policy", "no-unreviewed-merges"]],
|
|
"priority": 20,
|
|
},
|
|
{
|
|
"name": "block-merge-ci-failing",
|
|
"if": [["pr", "status", "open"], ["pr", "ci", "failing"]],
|
|
"then": [["pr", "action", "block-merge"], ["reason", "policy", "ci-must-pass"]],
|
|
"priority": 20,
|
|
},
|
|
{
|
|
"name": "auto-label-hotfix",
|
|
"if": [["pr", "title-prefix", "hotfix"]],
|
|
"then": [["pr", "label", "hotfix"], ["pr", "priority", "urgent"]],
|
|
"priority": 15,
|
|
},
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Self-test
|
|
# ---------------------------------------------------------------------------
|
|
def _self_test() -> bool:
|
|
"""Verify core behaviour; returns True on success."""
|
|
engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
|
|
|
|
# Scenario 1: CI task should route to Timmy
|
|
wm = engine.infer({("task", "category", "ci")})
|
|
assert ("assign", "timmy") in wm, f"expected timmy assignment, got {wm}"
|
|
|
|
# Scenario 2: architecture task routes to gemini (not timmy)
|
|
engine2 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
|
|
wm2 = engine2.infer({("task", "category", "architecture")})
|
|
assert ("assign", "gemini") in wm2, f"expected gemini assignment, got {wm2}"
|
|
|
|
# Scenario 3: open PR with no reviews should block merge
|
|
engine3 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
|
|
wm3 = engine3.infer({("pr", "status", "open"), ("pr", "reviews", "0")})
|
|
assert ("pr", "action", "block-merge") in wm3
|
|
|
|
# Scenario 4: negation — frontend + fleet-ops should NOT assign claude
|
|
engine4 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
|
|
wm4 = engine4.infer({("task", "category", "frontend"), ("task", "repo", "fleet-ops")})
|
|
assert ("assign", "claude") not in wm4
|
|
|
|
# Scenario 5: query with wildcards
|
|
results = engine.query(wm, ("reason", None, None))
|
|
assert len(results) > 0
|
|
|
|
print("All 5 self-test scenarios passed.")
|
|
for line in engine.trace:
|
|
print(f" {line}")
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
def main():
|
|
ap = argparse.ArgumentParser(description=__doc__)
|
|
ap.add_argument("--rules", type=Path, help="YAML/JSON rule file")
|
|
ap.add_argument("--facts", type=Path, help="YAML/JSON initial facts")
|
|
ap.add_argument("--self-test", action="store_true")
|
|
ap.add_argument("--json", action="store_true", help="output as JSON")
|
|
args = ap.parse_args()
|
|
|
|
if args.self_test:
|
|
ok = _self_test()
|
|
sys.exit(0 if ok else 1)
|
|
|
|
if args.rules:
|
|
engine = SymbolicReasoner.from_file(args.rules)
|
|
else:
|
|
engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES)
|
|
|
|
if args.facts:
|
|
text = args.facts.read_text()
|
|
if args.facts.suffix in (".yaml", ".yml"):
|
|
raw = yaml.safe_load(text)
|
|
else:
|
|
raw = json.loads(text)
|
|
initial = {_parse_fact(f) for f in raw.get("facts", [])}
|
|
else:
|
|
initial = set()
|
|
print("No --facts provided; running with empty working memory.")
|
|
|
|
wm = engine.infer(initial)
|
|
|
|
if args.json:
|
|
print(json.dumps({"facts": [list(f) for f in sorted(wm)], "trace": engine.trace}, indent=2))
|
|
else:
|
|
print(f"Final working memory ({len(wm)} facts):")
|
|
for f in sorted(wm):
|
|
print(f" {' '.join(f)}")
|
|
if engine.trace:
|
|
print(f"\nInference trace ({len(engine.trace)} firings):")
|
|
for line in engine.trace:
|
|
print(f" {line}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|