Files
timmy-config/scripts/knowledge_base.py
Alexander Whitestone 1c82a2a560
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 23s
Smoke Test / smoke (pull_request) Failing after 15s
Validate Config / YAML Lint (pull_request) Failing after 15s
Validate Config / JSON Validate (pull_request) Successful in 17s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 37s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Shell Script Lint (pull_request) Failing after 49s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 11s
Validate Config / Playbook Schema Validation (pull_request) Successful in 20s
Validate Training Data / validate (pull_request) Failing after 15s
Architecture Lint / Lint Repository (pull_request) Failing after 17s
PR Checklist / pr-checklist (pull_request) Failing after 2m53s
fix: add python3 shebangs to all Python scripts (#681)
Added #!/usr/bin/env python3 to 74 Python scripts across:
- bin/ (16 scripts)
- scripts/ (48 scripts)
- pipeline/ (3 scripts)
- training/scripts/ (3 scripts)
- hermes-sovereign/ (1 script)

Also added executable permissions to fixed scripts.
Test files excluded (run by pytest, not directly).

Closes #681
2026-04-20 19:17:19 -04:00

389 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
#!/usr/bin/env python3
"""knowledge_base.py - GOFAI symbolic knowledge base for the Timmy Foundation fleet.
A classical AI knowledge representation system: stores facts as ground atoms,
supports first-order-logic-style queries, and maintains a provenance chain so
every belief can be traced back to its source. No neural nets, no embeddings -
just structured symbolic reasoning over a typed fact store.
Usage:
kb = KnowledgeBase()
kb.assert_fact('agent', 'online', 'timmy')
kb.assert_fact('task', 'assigned_to', 'task-42', 'timmy')
results = kb.query('task', 'assigned_to', '?x', 'timmy')
# results -> [{'?x': 'task-42'}]
CLI:
python knowledge_base.py --assert "agent online hermes"
python knowledge_base.py --query "agent online ?who"
python knowledge_base.py --dump
"""
from __future__ import annotations
import argparse
import ast
import json
import os
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Tuple
# ---------------------------------------------------------------------------
# Data model
# ---------------------------------------------------------------------------
VAR_PREFIX = "?"
def is_var(term: str) -> bool:
"""Return True if *term* is a logic variable (starts with '?')."""
return term.startswith(VAR_PREFIX)
@dataclass(frozen=True)
class Fact:
"""An immutable ground atom: (relation, *args)."""
relation: str
args: Tuple[str, ...]
source: str = "user"
timestamp: float = field(default_factory=time.time)
def __str__(self) -> str:
args_str = " ".join(self.args)
return f"({self.relation} {args_str})"
Bindings = Dict[str, str]
# ---------------------------------------------------------------------------
# Unification
# ---------------------------------------------------------------------------
def unify_term(pattern: str, value: str, bindings: Bindings) -> Optional[Bindings]:
"""Unify a single pattern term against a ground value.
Returns updated bindings on success, or None on failure.
"""
if is_var(pattern):
if pattern in bindings:
return bindings if bindings[pattern] == value else None
return {**bindings, pattern: value}
return bindings if pattern == value else None
def unify_fact(
pattern: Tuple[str, ...], fact_args: Tuple[str, ...], bindings: Bindings
) -> Optional[Bindings]:
"""Unify a full argument tuple, returning final bindings or None."""
if len(pattern) != len(fact_args):
return None
b = bindings
for p, v in zip(pattern, fact_args):
b = unify_term(p, v, b)
if b is None:
return None
return b
# ---------------------------------------------------------------------------
# Knowledge Base
# ---------------------------------------------------------------------------
class KnowledgeBase:
"""In-memory symbolic knowledge base with optional JSON persistence."""
def __init__(self, persist_path: Optional[Path] = None) -> None:
self._facts: List[Fact] = []
self._persist_path = persist_path
if persist_path and persist_path.exists():
self._load(persist_path)
# ------------------------------------------------------------------
# Fact management
# ------------------------------------------------------------------
def assert_fact(
self, relation: str, *args: str, source: str = "user"
) -> Fact:
"""Add a ground fact to the knowledge base.
Idempotent: duplicate (relation, args) pairs are not added twice.
"""
f = Fact(relation=relation, args=tuple(args), source=source, timestamp=time.time())
for existing in self._facts:
if existing.relation == f.relation and existing.args == f.args:
return existing # already known
self._facts.append(f)
if self._persist_path:
self._save(self._persist_path)
return f
def retract_fact(self, relation: str, *args: str) -> int:
"""Remove all facts matching (relation, *args). Returns count removed."""
before = len(self._facts)
self._facts = [
f
for f in self._facts
if not (f.relation == relation and f.args == tuple(args))
]
removed = before - len(self._facts)
if removed and self._persist_path:
self._save(self._persist_path)
return removed
def ingest_python_file(
self, path: Path, *, module_name: Optional[str] = None, source: str = "ast"
) -> List[Fact]:
"""Parse a Python file with ``ast`` and assert symbolic structure facts."""
tree = ast.parse(path.read_text(), filename=str(path))
module = module_name or path.stem
fact_source = f"{source}:{path.name}"
added: List[Fact] = []
def add(relation: str, *args: str) -> None:
added.append(self.assert_fact(relation, *args, source=fact_source))
for node in tree.body:
if isinstance(node, ast.Import):
for alias in node.names:
add("imports", module, alias.name)
elif isinstance(node, ast.ImportFrom):
prefix = f"{node.module}." if node.module else ""
for alias in node.names:
add("imports", module, f"{prefix}{alias.name}")
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
add("defines_function", module, node.name)
elif isinstance(node, ast.ClassDef):
add("defines_class", module, node.name)
for child in node.body:
if isinstance(child, (ast.FunctionDef, ast.AsyncFunctionDef)):
add("defines_method", node.name, child.name)
elif isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id.isupper():
add("defines_constant", module, target.id)
elif isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name) and node.target.id.isupper():
add("defines_constant", module, node.target.id)
return added
# ------------------------------------------------------------------
# Query
# ------------------------------------------------------------------
def query(
self, relation: str, *pattern_args: str, source_filter: Optional[str] = None
) -> List[Bindings]:
"""Return all binding dictionaries satisfying the query pattern.
Variables in *pattern_args* start with '?'. Ground terms must match
exactly. An empty binding dict means the fact matched with no
variables to bind.
Args:
relation: The relation name to match.
*pattern_args: Mixed ground/variable argument tuple.
source_filter: Optional provenance filter (e.g. 'scheduler').
Returns:
List of binding dicts, one per matching fact.
"""
results: List[Bindings] = []
for fact in self._facts:
if fact.relation != relation:
continue
if source_filter and fact.source != source_filter:
continue
b = unify_fact(tuple(pattern_args), fact.args, {})
if b is not None:
results.append(b)
return results
def query_one(
self, relation: str, *pattern_args: str
) -> Optional[Bindings]:
"""Return the first matching binding dict or None."""
for b in self.query(relation, *pattern_args):
return b
return None
def facts_for(self, relation: str) -> Iterator[Fact]:
"""Iterate over all facts with the given relation."""
for f in self._facts:
if f.relation == relation:
yield f
# ------------------------------------------------------------------
# Bulk operations
# ------------------------------------------------------------------
def all_facts(self) -> List[Fact]:
"""Return a snapshot of all stored facts."""
return list(self._facts)
def fact_count(self) -> int:
return len(self._facts)
def clear(self) -> None:
"""Remove all facts from memory (does not touch disk)."""
self._facts.clear()
# ------------------------------------------------------------------
# Persistence
# ------------------------------------------------------------------
def _save(self, path: Path) -> None:
records = [
{
"relation": f.relation,
"args": list(f.args),
"source": f.source,
"timestamp": f.timestamp,
}
for f in self._facts
]
path.write_text(json.dumps(records, indent=2))
def _load(self, path: Path) -> None:
try:
records = json.loads(path.read_text())
for r in records:
self._facts.append(
Fact(
relation=r["relation"],
args=tuple(r["args"]),
source=r.get("source", "persisted"),
timestamp=r.get("timestamp", 0.0),
)
)
except (json.JSONDecodeError, KeyError) as exc:
print(f"[kb] Warning: could not load {path}: {exc}", file=sys.stderr)
def save_to(self, path: Path) -> None:
"""Explicitly save to a given path."""
self._save(path)
# ------------------------------------------------------------------
# Debug / display
# ------------------------------------------------------------------
def dump(self, relation_filter: Optional[str] = None) -> None:
"""Print all (or filtered) facts to stdout."""
for f in self._facts:
if relation_filter and f.relation != relation_filter:
continue
print(f" {f} [source={f.source}]")
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def _parse_terms(raw: str) -> List[str]:
"""Split a whitespace-separated string into terms."""
return raw.strip().split()
def main() -> None:
parser = argparse.ArgumentParser(
description="GOFAI symbolic knowledge base CLI"
)
parser.add_argument(
"--db",
default="kb.json",
help="Path to persistent JSON store (default: kb.json)",
)
parser.add_argument(
"--assert",
dest="assert_stmt",
metavar="RELATION ARG...",
help='Assert a fact, e.g. --assert "agent online timmy"',
)
parser.add_argument(
"--retract",
dest="retract_stmt",
metavar="RELATION ARG...",
help='Retract a fact, e.g. --retract "agent online timmy"',
)
parser.add_argument(
"--query",
dest="query_stmt",
metavar="RELATION ARG...",
help='Query the KB, e.g. --query "agent online ?who"',
)
parser.add_argument(
"--dump",
action="store_true",
help="Dump all facts",
)
parser.add_argument(
"--ingest-python",
dest="ingest_python",
type=Path,
help="Parse a Python file with AST and assert symbolic structure facts",
)
parser.add_argument(
"--relation",
help="Filter --dump to a specific relation",
)
args = parser.parse_args()
db_path = Path(args.db)
kb = KnowledgeBase(persist_path=db_path)
if args.assert_stmt:
terms = _parse_terms(args.assert_stmt)
if len(terms) < 2:
print("ERROR: --assert requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
fact = kb.assert_fact(terms[0], *terms[1:], source="cli")
print(f"Asserted: {fact}")
if args.ingest_python:
added = kb.ingest_python_file(args.ingest_python, source="cli-ast")
print(f"Ingested {len(added)} AST fact(s) from {args.ingest_python}")
if args.retract_stmt:
terms = _parse_terms(args.retract_stmt)
if len(terms) < 2:
print("ERROR: --retract requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
n = kb.retract_fact(terms[0], *terms[1:])
print(f"Retracted {n} fact(s).")
if args.query_stmt:
terms = _parse_terms(args.query_stmt)
if len(terms) < 2:
print("ERROR: --query requires at least RELATION and one ARG", file=sys.stderr)
sys.exit(1)
results = kb.query(terms[0], *terms[1:])
if not results:
print("No results.")
else:
for i, b in enumerate(results, 1):
if b:
bindings_str = ", ".join(f"{k}={v}" for k, v in b.items())
print(f" [{i}] {bindings_str}")
else:
print(f" [{i}] (ground match)")
if args.dump:
count = kb.fact_count()
print(f"Knowledge Base — {count} fact(s):")
kb.dump(relation_filter=args.relation)
if not any([args.assert_stmt, args.retract_stmt, args.query_stmt, args.dump]):
parser.print_help()
if __name__ == "__main__":
main()