#!/usr/bin/env python3 """knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet. A classical AI knowledge representation system: stores facts as ground atoms, supports first-order-logic-style queries, and maintains a provenance chain so every belief can be traced back to its source. No neural nets, no embeddings - just structured symbolic reasoning over a typed fact store. Usage: kb = KnowledgeBase() kb.assert_fact('agent', 'online', 'timmy') kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy') results = kb.query('task', 'assigned_to', '?x', 'timmy') # results -> [{'?x': 'task-42'}] CLI: python knowledge_base.py --assert "agent online hermes" python knowledge_base.py --query "agent online ?who" python knowledge_base.py --dump """ from __future__ import annotations import argparse import json import os import sys import time from dataclasses import dataclass, field from pathlib import Path from typing import Dict, Iterator, List, Optional, Tuple # --------------------------------------------------------------------------- # Data model # --------------------------------------------------------------------------- VAR_PREFIX = "?" def is_var(term: str) -> bool: """Return True if *term* is a logic variable (starts with '?').""" return term.startswith(VAR_PREFIX) @dataclass(frozen=True) class Fact: """An immutable ground atom: (relation, *args).""" relation: str args: Tuple[str, ...] source: str = "user" timestamp: float = field(default_factory=time.time) def __str__(self) -> str: args_str = " ".join(self.args) return f"({self.relation} {args_str})" Bindings = Dict[str, str] # --------------------------------------------------------------------------- # Unification # --------------------------------------------------------------------------- def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]: """Unify a single pattern term against a ground value. Returns updated bindings on success, or None on failure. """ if is_var(pattern): if pattern in bindings: return bindings if bindings[pattern] == value else None return {**bindings, pattern: value} return bindings if pattern == value else None def unify_fact( pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings ) -> Optional[Bindings]: """Unify a full argument tuple, returning final bindings or None.""" if len(pattern) != len(fact_args): return None b = bindings for p, v in zip(pattern, fact_args): b = unify_term(p, v, b) if b is None: return None return b # --------------------------------------------------------------------------- # Knowledge Base # --------------------------------------------------------------------------- class KnowledgeBase: """In-memory symbolic knowledge base with optional JSON persistence.""" def __init__(self, persist_path: Optional[Path] = None) -> None: self._facts: List[Fact] = [] self._persist_path = persist_path if persist_path and persist_path.exists(): self._load(persist_path) # ------------------------------------------------------------------ # Fact management # ------------------------------------------------------------------ def assert_fact( self, relation: str, *args: str, source: str = "user" ) -> Fact: """Add a ground fact to the knowledge base. Idempotent: duplicate (relation, args) pairs are not added twice. """ f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time()) for existing in self._facts: if existing.relation == f.relation and existing.args == f.args: return existing # already known self._facts.append(f) if self._persist_path: self._save(self._persist_path) return f def retract_fact(self, relation: str, *args: str) -> int: """Remove all facts matching (relation, *args). Returns count removed.""" before = len(self._facts) self._facts = [ f for f in self._facts if not (f.relation == relation and f.args == tuple(args)) ] removed = before - len(self._facts) if removed and self._persist_path: self._save(self._persist_path) return removed # ------------------------------------------------------------------ # Query # ------------------------------------------------------------------ def query( self, relation: str, *pattern_args: str, source_filter: Optional[str] = None ) -> List[Bindings]: """Return all binding dictionaries satisfying the query pattern. Variables in *pattern_args* start with '?'. Ground terms must match exactly. An empty binding dict means the fact matched with no variables to bind. Args: relation: The relation name to match. *pattern_args: Mixed ground/variable argument tuple. source_filter: Optional provenance filter (e.g. 'scheduler'). Returns: List of binding dicts, one per matching fact. """ results: List[Bindings] = [] for fact in self._facts: if fact.relation != relation: continue if source_filter and fact.source != source_filter: continue b = unify_fact(tuple(pattern_args), fact.args, {}) if b is not None: results.append(b) return results def query_one( self, relation: str, *pattern_args: str ) -> Optional[Bindings]: """Return the first matching binding dict or None.""" for b in self.query(relation, *pattern_args): return b return None def facts_for(self, relation: str) -> Iterator[Fact]: """Iterate over all facts with the given relation.""" for f in self._facts: if f.relation == relation: yield f # ------------------------------------------------------------------ # Bulk operations # ------------------------------------------------------------------ def all_facts(self) -> List[Fact]: """Return a snapshot of all stored facts.""" return list(self._facts) def fact_count(self) -> int: return len(self._facts) def clear(self) -> None: """Remove all facts from memory (does not touch disk).""" self._facts.clear() # ------------------------------------------------------------------ # Persistence # ------------------------------------------------------------------ def _save(self, path: Path) -> None: records = [ { "relation": f.relation, "args": list(f.args), "source": f.source, "timestamp": f.timestamp, } for f in self._facts ] path.write_text(json.dumps(records, indent=2)) def _load(self, path: Path) -> None: try: records = json.loads(path.read_text()) for r in records: self._facts.append( Fact( relation=r["relation"], args=tuple(r["args"]), source=r.get("source", "persisted"), timestamp=r.get("timestamp", 0.0), ) ) except (json.JSONDecodeError, KeyError) as exc: print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr) def save_to(self, path: Path) -> None: """Explicitly save to a given path.""" self._save(path) # ------------------------------------------------------------------ # Debug / display # ------------------------------------------------------------------ def dump(self, relation_filter: Optional[str] = None) -> None: """Print all (or filtered) facts to stdout.""" for f in self._facts: if relation_filter and f.relation != relation_filter: continue print(f" {f} [source={f.source}]") # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def _parse_terms(raw: str) -> List[str]: """Split a whitespace-separated string into terms.""" return raw.strip().split() def main() -> None: parser = argparse.ArgumentParser( description="GOFAI symbolic knowledge base CLI" ) parser.add_argument( "--db", default="kb.json", help="Path to persistent JSON store (default: kb.json)", ) parser.add_argument( "--assert", dest="assert_stmt", metavar="RELATION ARG...", help='Assert a fact, e.g. --assert "agent online timmy"', ) parser.add_argument( "--retract", dest="retract_stmt", metavar="RELATION ARG...", help='Retract a fact, e.g. --retract "agent online timmy"', ) parser.add_argument( "--query", dest="query_stmt", metavar="RELATION ARG...", help='Query the KB, e.g. --query "agent online ?who"', ) parser.add_argument( "--dump", action="store_true", help="Dump all facts", ) parser.add_argument( "--relation", help="Filter --dump to a specific relation", ) args = parser.parse_args() db_path = Path(args.db) kb = KnowledgeBase(persist_path=db_path) if args.assert_stmt: terms = _parse_terms(args.assert_stmt) if len(terms) < 2: print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr) sys.exit(1) fact = kb.assert_fact(terms[0], *terms[1:], source="cli") print(f"Asserted: {fact}") if args.retract_stmt: terms = _parse_terms(args.retract_stmt) if len(terms) < 2: print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr) sys.exit(1) n = kb.retract_fact(terms[0], *terms[1:]) print(f"Retracted {n} fact(s).") if args.query_stmt: terms = _parse_terms(args.query_stmt) if len(terms) < 2: print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr) sys.exit(1) results = kb.query(terms[0], *terms[1:]) if not results: print("No results.") else: for i, b in enumerate(results, 1): if b: bindings_str = ", ".join(f"{k}={v}" for k, v in b.items()) print(f" [{i}] {bindings_str}") else: print(f" [{i}] (ground match)") if args.dump: count = kb.fact_count() print(f"Knowledge Base — {count} fact(s):") kb.dump(relation_filter=args.relation) if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]): parser.print_help() if __name__ == "__main__": main()