Compare commits
1 Commits
feat/gofai
...
feat/gofai
| Author | SHA1 | Date | |
|---|---|---|---|
| d43deb1d79 |
341
scripts/knowledge_base.py
Normal file
341
scripts/knowledge_base.py
Normal file
@@ -0,0 +1,341 @@
|
||||
#!/usr/bin/env python3
|
||||
"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet.
|
||||
|
||||
A classical AI knowledge representation system: stores facts as ground atoms,
|
||||
supports first-order-logic-style queries, and maintains a provenance chain so
|
||||
every belief can be traced back to its source. No neural nets, no embeddings -
|
||||
just structured symbolic reasoning over a typed fact store.
|
||||
|
||||
Usage:
|
||||
kb = KnowledgeBase()
|
||||
kb.assert_fact('agent', 'online', 'timmy')
|
||||
kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy')
|
||||
results = kb.query('task', 'assigned_to', '?x', 'timmy')
|
||||
# results -> [{'?x': 'task-42'}]
|
||||
|
||||
CLI:
|
||||
python knowledge_base.py --assert "agent online hermes"
|
||||
python knowledge_base.py --query "agent online ?who"
|
||||
python knowledge_base.py --dump
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Dict, Iterator, List, Optional, Tuple
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data model
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
VAR_PREFIX = "?"
|
||||
|
||||
|
||||
def is_var(term: str) -> bool:
|
||||
"""Return True if *term* is a logic variable (starts with '?')."""
|
||||
return term.startswith(VAR_PREFIX)
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class Fact:
|
||||
"""An immutable ground atom: (relation, *args)."""
|
||||
|
||||
relation: str
|
||||
args: Tuple[str, ...]
|
||||
source: str = "user"
|
||||
timestamp: float = field(default_factory=time.time)
|
||||
|
||||
def __str__(self) -> str:
|
||||
args_str = " ".join(self.args)
|
||||
return f"({self.relation} {args_str})"
|
||||
|
||||
|
||||
Bindings = Dict[str, str]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unification
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]:
|
||||
"""Unify a single pattern term against a ground value.
|
||||
|
||||
Returns updated bindings on success, or None on failure.
|
||||
"""
|
||||
if is_var(pattern):
|
||||
if pattern in bindings:
|
||||
return bindings if bindings[pattern] == value else None
|
||||
return {**bindings, pattern: value}
|
||||
return bindings if pattern == value else None
|
||||
|
||||
|
||||
def unify_fact(
|
||||
pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings
|
||||
) -> Optional[Bindings]:
|
||||
"""Unify a full argument tuple, returning final bindings or None."""
|
||||
if len(pattern) != len(fact_args):
|
||||
return None
|
||||
b = bindings
|
||||
for p, v in zip(pattern, fact_args):
|
||||
b = unify_term(p, v, b)
|
||||
if b is None:
|
||||
return None
|
||||
return b
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Knowledge Base
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class KnowledgeBase:
|
||||
"""In-memory symbolic knowledge base with optional JSON persistence."""
|
||||
|
||||
def __init__(self, persist_path: Optional[Path] = None) -> None:
|
||||
self._facts: List[Fact] = []
|
||||
self._persist_path = persist_path
|
||||
if persist_path and persist_path.exists():
|
||||
self._load(persist_path)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Fact management
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def assert_fact(
|
||||
self, relation: str, *args: str, source: str = "user"
|
||||
) -> Fact:
|
||||
"""Add a ground fact to the knowledge base.
|
||||
|
||||
Idempotent: duplicate (relation, args) pairs are not added twice.
|
||||
"""
|
||||
f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time())
|
||||
for existing in self._facts:
|
||||
if existing.relation == f.relation and existing.args == f.args:
|
||||
return existing # already known
|
||||
self._facts.append(f)
|
||||
if self._persist_path:
|
||||
self._save(self._persist_path)
|
||||
return f
|
||||
|
||||
def retract_fact(self, relation: str, *args: str) -> int:
|
||||
"""Remove all facts matching (relation, *args). Returns count removed."""
|
||||
before = len(self._facts)
|
||||
self._facts = [
|
||||
f
|
||||
for f in self._facts
|
||||
if not (f.relation == relation and f.args == tuple(args))
|
||||
]
|
||||
removed = before - len(self._facts)
|
||||
if removed and self._persist_path:
|
||||
self._save(self._persist_path)
|
||||
return removed
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Query
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def query(
|
||||
self, relation: str, *pattern_args: str, source_filter: Optional[str] = None
|
||||
) -> List[Bindings]:
|
||||
"""Return all binding dictionaries satisfying the query pattern.
|
||||
|
||||
Variables in *pattern_args* start with '?'. Ground terms must match
|
||||
exactly. An empty binding dict means the fact matched with no
|
||||
variables to bind.
|
||||
|
||||
Args:
|
||||
relation: The relation name to match.
|
||||
*pattern_args: Mixed ground/variable argument tuple.
|
||||
source_filter: Optional provenance filter (e.g. 'scheduler').
|
||||
|
||||
Returns:
|
||||
List of binding dicts, one per matching fact.
|
||||
"""
|
||||
results: List[Bindings] = []
|
||||
for fact in self._facts:
|
||||
if fact.relation != relation:
|
||||
continue
|
||||
if source_filter and fact.source != source_filter:
|
||||
continue
|
||||
b = unify_fact(tuple(pattern_args), fact.args, {})
|
||||
if b is not None:
|
||||
results.append(b)
|
||||
return results
|
||||
|
||||
def query_one(
|
||||
self, relation: str, *pattern_args: str
|
||||
) -> Optional[Bindings]:
|
||||
"""Return the first matching binding dict or None."""
|
||||
for b in self.query(relation, *pattern_args):
|
||||
return b
|
||||
return None
|
||||
|
||||
def facts_for(self, relation: str) -> Iterator[Fact]:
|
||||
"""Iterate over all facts with the given relation."""
|
||||
for f in self._facts:
|
||||
if f.relation == relation:
|
||||
yield f
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Bulk operations
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def all_facts(self) -> List[Fact]:
|
||||
"""Return a snapshot of all stored facts."""
|
||||
return list(self._facts)
|
||||
|
||||
def fact_count(self) -> int:
|
||||
return len(self._facts)
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all facts from memory (does not touch disk)."""
|
||||
self._facts.clear()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Persistence
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _save(self, path: Path) -> None:
|
||||
records = [
|
||||
{
|
||||
"relation": f.relation,
|
||||
"args": list(f.args),
|
||||
"source": f.source,
|
||||
"timestamp": f.timestamp,
|
||||
}
|
||||
for f in self._facts
|
||||
]
|
||||
path.write_text(json.dumps(records, indent=2))
|
||||
|
||||
def _load(self, path: Path) -> None:
|
||||
try:
|
||||
records = json.loads(path.read_text())
|
||||
for r in records:
|
||||
self._facts.append(
|
||||
Fact(
|
||||
relation=r["relation"],
|
||||
args=tuple(r["args"]),
|
||||
source=r.get("source", "persisted"),
|
||||
timestamp=r.get("timestamp", 0.0),
|
||||
)
|
||||
)
|
||||
except (json.JSONDecodeError, KeyError) as exc:
|
||||
print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr)
|
||||
|
||||
def save_to(self, path: Path) -> None:
|
||||
"""Explicitly save to a given path."""
|
||||
self._save(path)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Debug / display
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def dump(self, relation_filter: Optional[str] = None) -> None:
|
||||
"""Print all (or filtered) facts to stdout."""
|
||||
for f in self._facts:
|
||||
if relation_filter and f.relation != relation_filter:
|
||||
continue
|
||||
print(f" {f} [source={f.source}]")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _parse_terms(raw: str) -> List[str]:
|
||||
"""Split a whitespace-separated string into terms."""
|
||||
return raw.strip().split()
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="GOFAI symbolic knowledge base CLI"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--db",
|
||||
default="kb.json",
|
||||
help="Path to persistent JSON store (default: kb.json)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--assert",
|
||||
dest="assert_stmt",
|
||||
metavar="RELATION ARG...",
|
||||
help='Assert a fact, e.g. --assert "agent online timmy"',
|
||||
)
|
||||
parser.add_argument(
|
||||
"--retract",
|
||||
dest="retract_stmt",
|
||||
metavar="RELATION ARG...",
|
||||
help='Retract a fact, e.g. --retract "agent online timmy"',
|
||||
)
|
||||
parser.add_argument(
|
||||
"--query",
|
||||
dest="query_stmt",
|
||||
metavar="RELATION ARG...",
|
||||
help='Query the KB, e.g. --query "agent online ?who"',
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dump",
|
||||
action="store_true",
|
||||
help="Dump all facts",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--relation",
|
||||
help="Filter --dump to a specific relation",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
db_path = Path(args.db)
|
||||
kb = KnowledgeBase(persist_path=db_path)
|
||||
|
||||
if args.assert_stmt:
|
||||
terms = _parse_terms(args.assert_stmt)
|
||||
if len(terms) < 2:
|
||||
print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
fact = kb.assert_fact(terms[0], *terms[1:], source="cli")
|
||||
print(f"Asserted: {fact}")
|
||||
|
||||
if args.retract_stmt:
|
||||
terms = _parse_terms(args.retract_stmt)
|
||||
if len(terms) < 2:
|
||||
print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
n = kb.retract_fact(terms[0], *terms[1:])
|
||||
print(f"Retracted {n} fact(s).")
|
||||
|
||||
if args.query_stmt:
|
||||
terms = _parse_terms(args.query_stmt)
|
||||
if len(terms) < 2:
|
||||
print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
results = kb.query(terms[0], *terms[1:])
|
||||
if not results:
|
||||
print("No results.")
|
||||
else:
|
||||
for i, b in enumerate(results, 1):
|
||||
if b:
|
||||
bindings_str = ", ".join(f"{k}={v}" for k, v in b.items())
|
||||
print(f" [{i}] {bindings_str}")
|
||||
else:
|
||||
print(f" [{i}] (ground match)")
|
||||
|
||||
if args.dump:
|
||||
count = kb.fact_count()
|
||||
print(f"Knowledge Base — {count} fact(s):")
|
||||
kb.dump(relation_filter=args.relation)
|
||||
|
||||
if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]):
|
||||
parser.print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user