Some checks failed
Validate Config / Playbook Schema Validation (pull_request) Successful in 9s
PR Checklist / pr-checklist (pull_request) Failing after 1m12s
Validate Config / JSON Validate (pull_request) Successful in 6s
Smoke Test / smoke (pull_request) Failing after 6s
Validate Config / YAML Lint (pull_request) Failing after 5s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 7s
Validate Config / Shell Script Lint (pull_request) Successful in 14s
Validate Config / Cron Syntax Check (pull_request) Successful in 5s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 5s
Architecture Lint / Lint Repository (pull_request) Failing after 8s
Architecture Lint / Linter Tests (pull_request) Successful in 9s
341 lines
11 KiB
Python
341 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet.
|
|
|
|
A classical AI knowledge representation system: stores facts as ground atoms,
|
|
supports first-order-logic-style queries, and maintains a provenance chain so
|
|
every belief can be traced back to its source. No neural nets, no embeddings -
|
|
just structured symbolic reasoning over a typed fact store.
|
|
|
|
Usage:
|
|
kb = KnowledgeBase()
|
|
kb.assert_fact('agent', 'online', 'timmy')
|
|
kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy')
|
|
results = kb.query('task', 'assigned_to', '?x', 'timmy')
|
|
# results -> [{'?x': 'task-42'}]
|
|
|
|
CLI:
|
|
python knowledge_base.py --assert "agent online hermes"
|
|
python knowledge_base.py --query "agent online ?who"
|
|
python knowledge_base.py --dump
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Dict, Iterator, List, Optional, Tuple
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data model
|
|
# ---------------------------------------------------------------------------
|
|
|
|
VAR_PREFIX = "?"
|
|
|
|
|
|
def is_var(term: str) -> bool:
|
|
"""Return True if *term* is a logic variable (starts with '?')."""
|
|
return term.startswith(VAR_PREFIX)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Fact:
|
|
"""An immutable ground atom: (relation, *args)."""
|
|
|
|
relation: str
|
|
args: Tuple[str, ...]
|
|
source: str = "user"
|
|
timestamp: float = field(default_factory=time.time)
|
|
|
|
def __str__(self) -> str:
|
|
args_str = " ".join(self.args)
|
|
return f"({self.relation} {args_str})"
|
|
|
|
|
|
Bindings = Dict[str, str]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Unification
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]:
|
|
"""Unify a single pattern term against a ground value.
|
|
|
|
Returns updated bindings on success, or None on failure.
|
|
"""
|
|
if is_var(pattern):
|
|
if pattern in bindings:
|
|
return bindings if bindings[pattern] == value else None
|
|
return {**bindings, pattern: value}
|
|
return bindings if pattern == value else None
|
|
|
|
|
|
def unify_fact(
|
|
pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings
|
|
) -> Optional[Bindings]:
|
|
"""Unify a full argument tuple, returning final bindings or None."""
|
|
if len(pattern) != len(fact_args):
|
|
return None
|
|
b = bindings
|
|
for p, v in zip(pattern, fact_args):
|
|
b = unify_term(p, v, b)
|
|
if b is None:
|
|
return None
|
|
return b
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Knowledge Base
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class KnowledgeBase:
|
|
"""In-memory symbolic knowledge base with optional JSON persistence."""
|
|
|
|
def __init__(self, persist_path: Optional[Path] = None) -> None:
|
|
self._facts: List[Fact] = []
|
|
self._persist_path = persist_path
|
|
if persist_path and persist_path.exists():
|
|
self._load(persist_path)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Fact management
|
|
# ------------------------------------------------------------------
|
|
|
|
def assert_fact(
|
|
self, relation: str, *args: str, source: str = "user"
|
|
) -> Fact:
|
|
"""Add a ground fact to the knowledge base.
|
|
|
|
Idempotent: duplicate (relation, args) pairs are not added twice.
|
|
"""
|
|
f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time())
|
|
for existing in self._facts:
|
|
if existing.relation == f.relation and existing.args == f.args:
|
|
return existing # already known
|
|
self._facts.append(f)
|
|
if self._persist_path:
|
|
self._save(self._persist_path)
|
|
return f
|
|
|
|
def retract_fact(self, relation: str, *args: str) -> int:
|
|
"""Remove all facts matching (relation, *args). Returns count removed."""
|
|
before = len(self._facts)
|
|
self._facts = [
|
|
f
|
|
for f in self._facts
|
|
if not (f.relation == relation and f.args == tuple(args))
|
|
]
|
|
removed = before - len(self._facts)
|
|
if removed and self._persist_path:
|
|
self._save(self._persist_path)
|
|
return removed
|
|
|
|
# ------------------------------------------------------------------
|
|
# Query
|
|
# ------------------------------------------------------------------
|
|
|
|
def query(
|
|
self, relation: str, *pattern_args: str, source_filter: Optional[str] = None
|
|
) -> List[Bindings]:
|
|
"""Return all binding dictionaries satisfying the query pattern.
|
|
|
|
Variables in *pattern_args* start with '?'. Ground terms must match
|
|
exactly. An empty binding dict means the fact matched with no
|
|
variables to bind.
|
|
|
|
Args:
|
|
relation: The relation name to match.
|
|
*pattern_args: Mixed ground/variable argument tuple.
|
|
source_filter: Optional provenance filter (e.g. 'scheduler').
|
|
|
|
Returns:
|
|
List of binding dicts, one per matching fact.
|
|
"""
|
|
results: List[Bindings] = []
|
|
for fact in self._facts:
|
|
if fact.relation != relation:
|
|
continue
|
|
if source_filter and fact.source != source_filter:
|
|
continue
|
|
b = unify_fact(tuple(pattern_args), fact.args, {})
|
|
if b is not None:
|
|
results.append(b)
|
|
return results
|
|
|
|
def query_one(
|
|
self, relation: str, *pattern_args: str
|
|
) -> Optional[Bindings]:
|
|
"""Return the first matching binding dict or None."""
|
|
for b in self.query(relation, *pattern_args):
|
|
return b
|
|
return None
|
|
|
|
def facts_for(self, relation: str) -> Iterator[Fact]:
|
|
"""Iterate over all facts with the given relation."""
|
|
for f in self._facts:
|
|
if f.relation == relation:
|
|
yield f
|
|
|
|
# ------------------------------------------------------------------
|
|
# Bulk operations
|
|
# ------------------------------------------------------------------
|
|
|
|
def all_facts(self) -> List[Fact]:
|
|
"""Return a snapshot of all stored facts."""
|
|
return list(self._facts)
|
|
|
|
def fact_count(self) -> int:
|
|
return len(self._facts)
|
|
|
|
def clear(self) -> None:
|
|
"""Remove all facts from memory (does not touch disk)."""
|
|
self._facts.clear()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Persistence
|
|
# ------------------------------------------------------------------
|
|
|
|
def _save(self, path: Path) -> None:
|
|
records = [
|
|
{
|
|
"relation": f.relation,
|
|
"args": list(f.args),
|
|
"source": f.source,
|
|
"timestamp": f.timestamp,
|
|
}
|
|
for f in self._facts
|
|
]
|
|
path.write_text(json.dumps(records, indent=2))
|
|
|
|
def _load(self, path: Path) -> None:
|
|
try:
|
|
records = json.loads(path.read_text())
|
|
for r in records:
|
|
self._facts.append(
|
|
Fact(
|
|
relation=r["relation"],
|
|
args=tuple(r["args"]),
|
|
source=r.get("source", "persisted"),
|
|
timestamp=r.get("timestamp", 0.0),
|
|
)
|
|
)
|
|
except (json.JSONDecodeError, KeyError) as exc:
|
|
print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr)
|
|
|
|
def save_to(self, path: Path) -> None:
|
|
"""Explicitly save to a given path."""
|
|
self._save(path)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Debug / display
|
|
# ------------------------------------------------------------------
|
|
|
|
def dump(self, relation_filter: Optional[str] = None) -> None:
|
|
"""Print all (or filtered) facts to stdout."""
|
|
for f in self._facts:
|
|
if relation_filter and f.relation != relation_filter:
|
|
continue
|
|
print(f" {f} [source={f.source}]")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _parse_terms(raw: str) -> List[str]:
|
|
"""Split a whitespace-separated string into terms."""
|
|
return raw.strip().split()
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="GOFAI symbolic knowledge base CLI"
|
|
)
|
|
parser.add_argument(
|
|
"--db",
|
|
default="kb.json",
|
|
help="Path to persistent JSON store (default: kb.json)",
|
|
)
|
|
parser.add_argument(
|
|
"--assert",
|
|
dest="assert_stmt",
|
|
metavar="RELATION ARG...",
|
|
help='Assert a fact, e.g. --assert "agent online timmy"',
|
|
)
|
|
parser.add_argument(
|
|
"--retract",
|
|
dest="retract_stmt",
|
|
metavar="RELATION ARG...",
|
|
help='Retract a fact, e.g. --retract "agent online timmy"',
|
|
)
|
|
parser.add_argument(
|
|
"--query",
|
|
dest="query_stmt",
|
|
metavar="RELATION ARG...",
|
|
help='Query the KB, e.g. --query "agent online ?who"',
|
|
)
|
|
parser.add_argument(
|
|
"--dump",
|
|
action="store_true",
|
|
help="Dump all facts",
|
|
)
|
|
parser.add_argument(
|
|
"--relation",
|
|
help="Filter --dump to a specific relation",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
db_path = Path(args.db)
|
|
kb = KnowledgeBase(persist_path=db_path)
|
|
|
|
if args.assert_stmt:
|
|
terms = _parse_terms(args.assert_stmt)
|
|
if len(terms) < 2:
|
|
print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr)
|
|
sys.exit(1)
|
|
fact = kb.assert_fact(terms[0], *terms[1:], source="cli")
|
|
print(f"Asserted: {fact}")
|
|
|
|
if args.retract_stmt:
|
|
terms = _parse_terms(args.retract_stmt)
|
|
if len(terms) < 2:
|
|
print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr)
|
|
sys.exit(1)
|
|
n = kb.retract_fact(terms[0], *terms[1:])
|
|
print(f"Retracted {n} fact(s).")
|
|
|
|
if args.query_stmt:
|
|
terms = _parse_terms(args.query_stmt)
|
|
if len(terms) < 2:
|
|
print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr)
|
|
sys.exit(1)
|
|
results = kb.query(terms[0], *terms[1:])
|
|
if not results:
|
|
print("No results.")
|
|
else:
|
|
for i, b in enumerate(results, 1):
|
|
if b:
|
|
bindings_str = ", ".join(f"{k}={v}" for k, v in b.items())
|
|
print(f" [{i}] {bindings_str}")
|
|
else:
|
|
print(f" [{i}] (ground match)")
|
|
|
|
if args.dump:
|
|
count = kb.fact_count()
|
|
print(f"Knowledge Base — {count} fact(s):")
|
|
kb.dump(relation_filter=args.relation)
|
|
|
|
if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]):
|
|
parser.print_help()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |