Files
timmy-config/scripts/knowledge_base.py

388 lines
13 KiB
Python

#!/usr/bin/env python3
"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet.
A classical AI knowledge representation system: stores facts as ground atoms,
supports first-order-logic-style queries, and maintains a provenance chain so
every belief can be traced back to its source. No neural nets, no embeddings -
just structured symbolic reasoning over a typed fact store.
Usage:
kb = KnowledgeBase()
kb.assert_fact('agent', 'online', 'timmy')
kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy')
results = kb.query('task', 'assigned_to', '?x', 'timmy')
# results -> [{'?x': 'task-42'}]
CLI:
python knowledge_base.py --assert "agent online hermes"
python knowledge_base.py --query "agent online ?who"
python knowledge_base.py --dump
"""
from __future__ import annotations
import argparse
import ast
import json
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
VAR_PREFIX = "?"
def is_var(term: str) -> bool:
"""Return True if *term* is a logic variable (starts with '?')."""
return term.startswith(VAR_PREFIX)
@dataclass(frozen=True)
class Fact:
"""An immutable ground atom: (relation, *args)."""
relation: str
args: Tuple[str, ...]
source: str = "user"
timestamp: float = field(default_factory=time.time)
def __str__(self) -> str:
args_str = " ".join(self.args)
return f"({self.relation} {args_str})"
Bindings = Dict[str, str]
# ---------------------------------------------------------------------------
# Unification
# ---------------------------------------------------------------------------
def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]:
"""Unify a single pattern term against a ground value.
Returns updated bindings on success, or None on failure.
"""
if is_var(pattern):
if pattern in bindings:
return bindings if bindings[pattern] == value else None
return {**bindings, pattern: value}
return bindings if pattern == value else None
def unify_fact(
pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings
) -> Optional[Bindings]:
"""Unify a full argument tuple, returning final bindings or None."""
if len(pattern) != len(fact_args):
return None
b = bindings
for p, v in zip(pattern, fact_args):
b = unify_term(p, v, b)
if b is None:
return None
return b
# ---------------------------------------------------------------------------
# Knowledge Base
# ---------------------------------------------------------------------------
class KnowledgeBase:
"""In-memory symbolic knowledge base with optional JSON persistence."""
def __init__(self, persist_path: Optional[Path] = None) -> None:
self._facts: List[Fact] = []
self._persist_path = persist_path
if persist_path and persist_path.exists():
self._load(persist_path)
# ------------------------------------------------------------------
# Fact management
# ------------------------------------------------------------------
def assert_fact(
self, relation: str, *args: str, source: str = "user"
) -> Fact:
"""Add a ground fact to the knowledge base.
Idempotent: duplicate (relation, args) pairs are not added twice.
"""
f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time())
for existing in self._facts:
if existing.relation == f.relation and existing.args == f.args:
return existing # already known
self._facts.append(f)
if self._persist_path:
self._save(self._persist_path)
return f
def retract_fact(self, relation: str, *args: str) -> int:
"""Remove all facts matching (relation, *args). Returns count removed."""
before = len(self._facts)
self._facts = [
f
for f in self._facts
if not (f.relation == relation and f.args == tuple(args))
]
removed = before - len(self._facts)
if removed and self._persist_path:
self._save(self._persist_path)
return removed
def ingest_python_file(
self, path: Path, *, module_name: Optional[str] = None, source: str = "ast"
) -> List[Fact]:
"""Parse a Python file with ``ast`` and assert symbolic structure facts."""
tree = ast.parse(path.read_text(), filename=str(path))
module = module_name or path.stem
fact_source = f"{source}:{path.name}"
added: List[Fact] = []
def add(relation: str, *args: str) -> None:
added.append(self.assert_fact(relation, *args, source=fact_source))
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
add("imports", module, alias.name)
elif isinstance(node, ast.ImportFrom):
prefix = f"{node.module}." if node.module else ""
for alias in node.names:
add("imports", module, f"{prefix}{alias.name}")
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
add("defines_function", module, node.name)
elif isinstance(node, ast.ClassDef):
add("defines_class", module, node.name)
for child in node.body:
if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)):
add("defines_method", node.name, child.name)
elif isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id.isupper():
add("defines_constant", module, target.id)
elif isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name) and node.target.id.isupper():
add("defines_constant", module, node.target.id)
return added
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def query(
self, relation: str, *pattern_args: str, source_filter: Optional[str] = None
) -> List[Bindings]:
"""Return all binding dictionaries satisfying the query pattern.
Variables in *pattern_args* start with '?'. Ground terms must match
exactly. An empty binding dict means the fact matched with no
variables to bind.
Args:
relation: The relation name to match.
*pattern_args: Mixed ground/variable argument tuple.
source_filter: Optional provenance filter (e.g. 'scheduler').
Returns:
List of binding dicts, one per matching fact.
"""
results: List[Bindings] = []
for fact in self._facts:
if fact.relation != relation:
continue
if source_filter and fact.source != source_filter:
continue
b = unify_fact(tuple(pattern_args), fact.args, {})
if b is not None:
results.append(b)
return results
def query_one(
self, relation: str, *pattern_args: str
) -> Optional[Bindings]:
"""Return the first matching binding dict or None."""
for b in self.query(relation, *pattern_args):
return b
return None
def facts_for(self, relation: str) -> Iterator[Fact]:
"""Iterate over all facts with the given relation."""
for f in self._facts:
if f.relation == relation:
yield f
# ------------------------------------------------------------------
# Bulk operations
# ------------------------------------------------------------------
def all_facts(self) -> List[Fact]:
"""Return a snapshot of all stored facts."""
return list(self._facts)
def fact_count(self) -> int:
return len(self._facts)
def clear(self) -> None:
"""Remove all facts from memory (does not touch disk)."""
self._facts.clear()
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _save(self, path: Path) -> None:
records = [
{
"relation": f.relation,
"args": list(f.args),
"source": f.source,
"timestamp": f.timestamp,
}
for f in self._facts
]
path.write_text(json.dumps(records, indent=2))
def _load(self, path: Path) -> None:
try:
records = json.loads(path.read_text())
for r in records:
self._facts.append(
Fact(
relation=r["relation"],
args=tuple(r["args"]),
source=r.get("source", "persisted"),
timestamp=r.get("timestamp", 0.0),
)
)
except (json.JSONDecodeError, KeyError) as exc:
print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr)
def save_to(self, path: Path) -> None:
"""Explicitly save to a given path."""
self._save(path)
# ------------------------------------------------------------------
# Debug / display
# ------------------------------------------------------------------
def dump(self, relation_filter: Optional[str] = None) -> None:
"""Print all (or filtered) facts to stdout."""
for f in self._facts:
if relation_filter and f.relation != relation_filter:
continue
print(f" {f} [source={f.source}]")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_terms(raw: str) -> List[str]:
"""Split a whitespace-separated string into terms."""
return raw.strip().split()
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI symbolic knowledge base CLI"
)
parser.add_argument(
"--db",
default="kb.json",
help="Path to persistent JSON store (default: kb.json)",
)
parser.add_argument(
"--assert",
dest="assert_stmt",
metavar="RELATION ARG...",
help='Assert a fact, e.g. --assert "agent online timmy"',
)
parser.add_argument(
"--retract",
dest="retract_stmt",
metavar="RELATION ARG...",
help='Retract a fact, e.g. --retract "agent online timmy"',
)
parser.add_argument(
"--query",
dest="query_stmt",
metavar="RELATION ARG...",
help='Query the KB, e.g. --query "agent online ?who"',
)
parser.add_argument(
"--dump",
action="store_true",
help="Dump all facts",
)
parser.add_argument(
"--ingest-python",
dest="ingest_python",
type=Path,
help="Parse a Python file with AST and assert symbolic structure facts",
)
parser.add_argument(
"--relation",
help="Filter --dump to a specific relation",
)
args = parser.parse_args()
db_path = Path(args.db)
kb = KnowledgeBase(persist_path=db_path)
if args.assert_stmt:
terms = _parse_terms(args.assert_stmt)
if len(terms) < 2:
print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
fact = kb.assert_fact(terms[0], *terms[1:], source="cli")
print(f"Asserted: {fact}")
if args.ingest_python:
added = kb.ingest_python_file(args.ingest_python, source="cli-ast")
print(f"Ingested {len(added)} AST fact(s) from {args.ingest_python}")
if args.retract_stmt:
terms = _parse_terms(args.retract_stmt)
if len(terms) < 2:
print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
n = kb.retract_fact(terms[0], *terms[1:])
print(f"Retracted {n} fact(s).")
if args.query_stmt:
terms = _parse_terms(args.query_stmt)
if len(terms) < 2:
print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
results = kb.query(terms[0], *terms[1:])
if not results:
print("No results.")
else:
for i, b in enumerate(results, 1):
if b:
bindings_str = ", ".join(f"{k}={v}" for k, v in b.items())
print(f" [{i}] {bindings_str}")
else:
print(f" [{i}] (ground match)")
if args.dump:
count = kb.fact_count()
print(f"Knowledge Base — {count} fact(s):")
kb.dump(relation_filter=args.relation)
if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]):
parser.print_help()
if __name__ == "__main__":
main()