diff --git a/scripts/knowledge_base.py b/scripts/knowledge_base.py new file mode 100644 index 00000000..69e822fb --- /dev/null +++ b/scripts/knowledge_base.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet. + +A classical AI knowledge representation system: stores facts as ground atoms, +supports first-order-logic-style queries, and maintains a provenance chain so +every belief can be traced back to its source. No neural nets, no embeddings - +just structured symbolic reasoning over a typed fact store. + +Usage: + kb = KnowledgeBase() + kb.assert_fact('agent', 'online', 'timmy') + kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy') + results = kb.query('task', 'assigned_to', '?x', 'timmy') + # results -> [{'?x': 'task-42'}] + +CLI: + python knowledge_base.py --assert "agent online hermes" + python knowledge_base.py --query "agent online ?who" + python knowledge_base.py --dump +""" + +from __future__ import annotations + +import argparse +import json +import os +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Dict, Iterator, List, Optional, Tuple + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + +VAR_PREFIX = "?" + + +def is_var(term: str) -> bool: + """Return True if *term* is a logic variable (starts with '?').""" + return term.startswith(VAR_PREFIX) + + +@dataclass(frozen=True) +class Fact: + """An immutable ground atom: (relation, *args).""" + + relation: str + args: Tuple[str, ...] + source: str = "user" + timestamp: float = field(default_factory=time.time) + + def __str__(self) -> str: + args_str = " ".join(self.args) + return f"({self.relation} {args_str})" + + +Bindings = Dict[str, str] + + +# --------------------------------------------------------------------------- +# Unification +# --------------------------------------------------------------------------- + + +def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]: + """Unify a single pattern term against a ground value. + + Returns updated bindings on success, or None on failure. + """ + if is_var(pattern): + if pattern in bindings: + return bindings if bindings[pattern] == value else None + return {**bindings, pattern: value} + return bindings if pattern == value else None + + +def unify_fact( + pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings +) -> Optional[Bindings]: + """Unify a full argument tuple, returning final bindings or None.""" + if len(pattern) != len(fact_args): + return None + b = bindings + for p, v in zip(pattern, fact_args): + b = unify_term(p, v, b) + if b is None: + return None + return b + + +# --------------------------------------------------------------------------- +# Knowledge Base +# --------------------------------------------------------------------------- + + +class KnowledgeBase: + """In-memory symbolic knowledge base with optional JSON persistence.""" + + def __init__(self, persist_path: Optional[Path] = None) -> None: + self._facts: List[Fact] = [] + self._persist_path = persist_path + if persist_path and persist_path.exists(): + self._load(persist_path) + + # ------------------------------------------------------------------ + # Fact management + # ------------------------------------------------------------------ + + def assert_fact( + self, relation: str, *args: str, source: str = "user" + ) -> Fact: + """Add a ground fact to the knowledge base. + + Idempotent: duplicate (relation, args) pairs are not added twice. + """ + f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time()) + for existing in self._facts: + if existing.relation == f.relation and existing.args == f.args: + return existing # already known + self._facts.append(f) + if self._persist_path: + self._save(self._persist_path) + return f + + def retract_fact(self, relation: str, *args: str) -> int: + """Remove all facts matching (relation, *args). Returns count removed.""" + before = len(self._facts) + self._facts = [ + f + for f in self._facts + if not (f.relation == relation and f.args == tuple(args)) + ] + removed = before - len(self._facts) + if removed and self._persist_path: + self._save(self._persist_path) + return removed + + # ------------------------------------------------------------------ + # Query + # ------------------------------------------------------------------ + + def query( + self, relation: str, *pattern_args: str, source_filter: Optional[str] = None + ) -> List[Bindings]: + """Return all binding dictionaries satisfying the query pattern. + + Variables in *pattern_args* start with '?'. Ground terms must match + exactly. An empty binding dict means the fact matched with no + variables to bind. + + Args: + relation: The relation name to match. + *pattern_args: Mixed ground/variable argument tuple. + source_filter: Optional provenance filter (e.g. 'scheduler'). + + Returns: + List of binding dicts, one per matching fact. + """ + results: List[Bindings] = [] + for fact in self._facts: + if fact.relation != relation: + continue + if source_filter and fact.source != source_filter: + continue + b = unify_fact(tuple(pattern_args), fact.args, {}) + if b is not None: + results.append(b) + return results + + def query_one( + self, relation: str, *pattern_args: str + ) -> Optional[Bindings]: + """Return the first matching binding dict or None.""" + for b in self.query(relation, *pattern_args): + return b + return None + + def facts_for(self, relation: str) -> Iterator[Fact]: + """Iterate over all facts with the given relation.""" + for f in self._facts: + if f.relation == relation: + yield f + + # ------------------------------------------------------------------ + # Bulk operations + # ------------------------------------------------------------------ + + def all_facts(self) -> List[Fact]: + """Return a snapshot of all stored facts.""" + return list(self._facts) + + def fact_count(self) -> int: + return len(self._facts) + + def clear(self) -> None: + """Remove all facts from memory (does not touch disk).""" + self._facts.clear() + + # ------------------------------------------------------------------ + # Persistence + # ------------------------------------------------------------------ + + def _save(self, path: Path) -> None: + records = [ + { + "relation": f.relation, + "args": list(f.args), + "source": f.source, + "timestamp": f.timestamp, + } + for f in self._facts + ] + path.write_text(json.dumps(records, indent=2)) + + def _load(self, path: Path) -> None: + try: + records = json.loads(path.read_text()) + for r in records: + self._facts.append( + Fact( + relation=r["relation"], + args=tuple(r["args"]), + source=r.get("source", "persisted"), + timestamp=r.get("timestamp", 0.0), + ) + ) + except (json.JSONDecodeError, KeyError) as exc: + print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr) + + def save_to(self, path: Path) -> None: + """Explicitly save to a given path.""" + self._save(path) + + # ------------------------------------------------------------------ + # Debug / display + # ------------------------------------------------------------------ + + def dump(self, relation_filter: Optional[str] = None) -> None: + """Print all (or filtered) facts to stdout.""" + for f in self._facts: + if relation_filter and f.relation != relation_filter: + continue + print(f" {f} [source={f.source}]") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- + + +def _parse_terms(raw: str) -> List[str]: + """Split a whitespace-separated string into terms.""" + return raw.strip().split() + + +def main() -> None: + parser = argparse.ArgumentParser( + description="GOFAI symbolic knowledge base CLI" + ) + parser.add_argument( + "--db", + default="kb.json", + help="Path to persistent JSON store (default: kb.json)", + ) + parser.add_argument( + "--assert", + dest="assert_stmt", + metavar="RELATION ARG...", + help='Assert a fact, e.g. --assert "agent online timmy"', + ) + parser.add_argument( + "--retract", + dest="retract_stmt", + metavar="RELATION ARG...", + help='Retract a fact, e.g. --retract "agent online timmy"', + ) + parser.add_argument( + "--query", + dest="query_stmt", + metavar="RELATION ARG...", + help='Query the KB, e.g. --query "agent online ?who"', + ) + parser.add_argument( + "--dump", + action="store_true", + help="Dump all facts", + ) + parser.add_argument( + "--relation", + help="Filter --dump to a specific relation", + ) + args = parser.parse_args() + + db_path = Path(args.db) + kb = KnowledgeBase(persist_path=db_path) + + if args.assert_stmt: + terms = _parse_terms(args.assert_stmt) + if len(terms) < 2: + print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr) + sys.exit(1) + fact = kb.assert_fact(terms[0], *terms[1:], source="cli") + print(f"Asserted: {fact}") + + if args.retract_stmt: + terms = _parse_terms(args.retract_stmt) + if len(terms) < 2: + print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr) + sys.exit(1) + n = kb.retract_fact(terms[0], *terms[1:]) + print(f"Retracted {n} fact(s).") + + if args.query_stmt: + terms = _parse_terms(args.query_stmt) + if len(terms) < 2: + print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr) + sys.exit(1) + results = kb.query(terms[0], *terms[1:]) + if not results: + print("No results.") + else: + for i, b in enumerate(results, 1): + if b: + bindings_str = ", ".join(f"{k}={v}" for k, v in b.items()) + print(f" [{i}] {bindings_str}") + else: + print(f" [{i}] (ground match)") + + if args.dump: + count = kb.fact_count() + print(f"Knowledge Base — {count} fact(s):") + kb.dump(relation_filter=args.relation) + + if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]): + parser.print_help() + + +if __name__ == "__main__": + main() \ No newline at end of file