diff --git a/scripts/symbolic_reasoner.py b/scripts/symbolic_reasoner.py new file mode 100644 index 00000000..812be24e --- /dev/null +++ b/scripts/symbolic_reasoner.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python3 +"""symbolic_reasoner.py — Forward-chaining rule engine for the Timmy Foundation fleet. + +A classical GOFAI approach: declarative IF-THEN rules evaluated over a +working-memory of facts. Rules fire until quiescence (no new facts) or +a configurable cycle limit. Designed to sit *beside* the LLM layer so +that hard policy constraints never depend on probabilistic inference. + +Usage: + python symbolic_reasoner.py --rules rules.yaml --facts facts.yaml + python symbolic_reasoner.py --self-test +""" +from __future__ import annotations + +import argparse +import json +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple + +try: + import yaml +except ImportError: + yaml = None # graceful fallback — JSON-only mode + + +# --------------------------------------------------------------------------- +# Domain types +# --------------------------------------------------------------------------- +Fact = Tuple[str, ...] # e.g. ("agent", "timmy", "role", "infrastructure") + + +@dataclass(frozen=True) +class Rule: + """A single IF-THEN production rule.""" + name: str + conditions: FrozenSet[Fact] # all must be present + negations: FrozenSet[Fact] # none may be present + conclusions: FrozenSet[Fact] # added when rule fires + priority: int = 0 # higher fires first + + def satisfied(self, wm: Set[Fact]) -> bool: + return self.conditions.issubset(wm) and self.negations.isdisjoint(wm) + + +# --------------------------------------------------------------------------- +# Engine +# --------------------------------------------------------------------------- +class SymbolicReasoner: + """Forward-chaining production system.""" + + def __init__(self, rules: List[Rule], *, cycle_limit: int = 200): + self._rules = sorted(rules, key=lambda r: -r.priority) + self._cycle_limit = cycle_limit + self._trace: List[str] = [] + + # -- public API --------------------------------------------------------- + + def infer(self, initial_facts: Set[Fact]) -> Set[Fact]: + """Run to quiescence and return the final working-memory.""" + wm = set(initial_facts) + fired: Set[str] = set() + for cycle in range(self._cycle_limit): + progress = False + for rule in self._rules: + if rule.name in fired: + continue + if rule.satisfied(wm): + new = rule.conclusions - wm + if new: + wm |= new + fired.add(rule.name) + self._trace.append( + f"cycle {cycle}: {rule.name} => {_fmt_facts(new)}" + ) + progress = True + break # restart from highest-priority rule + if not progress: + break + return wm + + def query(self, wm: Set[Fact], pattern: Tuple[Optional[str], ...]) -> List[Fact]: + """Return facts matching *pattern* (None = wildcard).""" + return [ + f for f in wm + if len(f) == len(pattern) + and all(p is None or p == v for p, v in zip(pattern, f)) + ] + + @property + def trace(self) -> List[str]: + return list(self._trace) + + # -- serialisation helpers ----------------------------------------------- + + @classmethod + def from_dicts(cls, raw_rules: List[Dict], **kw) -> "SymbolicReasoner": + rules = [_parse_rule(r) for r in raw_rules] + return cls(rules, **kw) + + @classmethod + def from_file(cls, path: Path, **kw) -> "SymbolicReasoner": + text = path.read_text() + if path.suffix in (".yaml", ".yml"): + if yaml is None: + raise RuntimeError("PyYAML required for .yaml rules") + data = yaml.safe_load(text) + else: + data = json.loads(text) + return cls.from_dicts(data["rules"], **kw) + + +# --------------------------------------------------------------------------- +# Parsing helpers +# --------------------------------------------------------------------------- +def _parse_fact(raw: list | str) -> Fact: + if isinstance(raw, str): + return tuple(raw.split()) + return tuple(str(x) for x in raw) + + +def _parse_rule(d: Dict) -> Rule: + return Rule( + name=d["name"], + conditions=frozenset(_parse_fact(c) for c in d.get("if", [])), + negations=frozenset(_parse_fact(c) for c in d.get("unless", [])), + conclusions=frozenset(_parse_fact(c) for c in d.get("then", [])), + priority=d.get("priority", 0), + ) + + +def _fmt_facts(facts: Set[Fact]) -> str: + return ", ".join(" ".join(f) for f in sorted(facts)) + + +# --------------------------------------------------------------------------- +# Built-in fleet rules (loaded when no --rules file is given) +# --------------------------------------------------------------------------- +DEFAULT_FLEET_RULES: List[Dict] = [ + { + "name": "route-ci-to-timmy", + "if": [["task", "category", "ci"]], + "then": [["assign", "timmy"], ["reason", "timmy", "best-ci-merge-rate"]], + "priority": 10, + }, + { + "name": "route-security-to-timmy", + "if": [["task", "category", "security"]], + "then": [["assign", "timmy"], ["reason", "timmy", "security-specialist"]], + "priority": 10, + }, + { + "name": "route-architecture-to-gemini", + "if": [["task", "category", "architecture"]], + "unless": [["assign", "timmy"]], + "then": [["assign", "gemini"], ["reason", "gemini", "architecture-strength"]], + "priority": 8, + }, + { + "name": "route-review-to-allegro", + "if": [["task", "category", "review"]], + "then": [["assign", "allegro"], ["reason", "allegro", "highest-quality-per-pr"]], + "priority": 9, + }, + { + "name": "route-frontend-to-claude", + "if": [["task", "category", "frontend"]], + "unless": [["task", "repo", "fleet-ops"]], + "then": [["assign", "claude"], ["reason", "claude", "high-volume-frontend"]], + "priority": 5, + }, + { + "name": "block-merge-without-review", + "if": [["pr", "status", "open"], ["pr", "reviews", "0"]], + "then": [["pr", "action", "block-merge"], ["reason", "policy", "no-unreviewed-merges"]], + "priority": 20, + }, + { + "name": "block-merge-ci-failing", + "if": [["pr", "status", "open"], ["pr", "ci", "failing"]], + "then": [["pr", "action", "block-merge"], ["reason", "policy", "ci-must-pass"]], + "priority": 20, + }, + { + "name": "auto-label-hotfix", + "if": [["pr", "title-prefix", "hotfix"]], + "then": [["pr", "label", "hotfix"], ["pr", "priority", "urgent"]], + "priority": 15, + }, +] + + +# --------------------------------------------------------------------------- +# Self-test +# --------------------------------------------------------------------------- +def _self_test() -> bool: + """Verify core behaviour; returns True on success.""" + engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES) + + # Scenario 1: CI task should route to Timmy + wm = engine.infer({("task", "category", "ci")}) + assert ("assign", "timmy") in wm, f"expected timmy assignment, got {wm}" + + # Scenario 2: architecture task routes to gemini (not timmy) + engine2 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES) + wm2 = engine2.infer({("task", "category", "architecture")}) + assert ("assign", "gemini") in wm2, f"expected gemini assignment, got {wm2}" + + # Scenario 3: open PR with no reviews should block merge + engine3 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES) + wm3 = engine3.infer({("pr", "status", "open"), ("pr", "reviews", "0")}) + assert ("pr", "action", "block-merge") in wm3 + + # Scenario 4: negation — frontend + fleet-ops should NOT assign claude + engine4 = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES) + wm4 = engine4.infer({("task", "category", "frontend"), ("task", "repo", "fleet-ops")}) + assert ("assign", "claude") not in wm4 + + # Scenario 5: query with wildcards + results = engine.query(wm, ("reason", None, None)) + assert len(results) > 0 + + print("All 5 self-test scenarios passed.") + for line in engine.trace: + print(f" {line}") + return True + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- +def main(): + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--rules", type=Path, help="YAML/JSON rule file") + ap.add_argument("--facts", type=Path, help="YAML/JSON initial facts") + ap.add_argument("--self-test", action="store_true") + ap.add_argument("--json", action="store_true", help="output as JSON") + args = ap.parse_args() + + if args.self_test: + ok = _self_test() + sys.exit(0 if ok else 1) + + if args.rules: + engine = SymbolicReasoner.from_file(args.rules) + else: + engine = SymbolicReasoner.from_dicts(DEFAULT_FLEET_RULES) + + if args.facts: + text = args.facts.read_text() + if args.facts.suffix in (".yaml", ".yml"): + raw = yaml.safe_load(text) + else: + raw = json.loads(text) + initial = {_parse_fact(f) for f in raw.get("facts", [])} + else: + initial = set() + print("No --facts provided; running with empty working memory.") + + wm = engine.infer(initial) + + if args.json: + print(json.dumps({"facts": [list(f) for f in sorted(wm)], "trace": engine.trace}, indent=2)) + else: + print(f"Final working memory ({len(wm)} facts):") + for f in sorted(wm): + print(f" {' '.join(f)}") + if engine.trace: + print(f"\nInference trace ({len(engine.trace)} firings):") + for line in engine.trace: + print(f" {line}") + + +if __name__ == "__main__": + main()