Implement Phase 28: Sovereign Knowledge Graph 'Time Travel' - agent/temporal_knowledge_graph.py: SQLite-backed temporal triple store with versioning, validity periods, and temporal query operators (BEFORE, AFTER, DURING, OVERLAPS, AT) - agent/temporal_reasoning.py: Temporal reasoning engine supporting historical queries, fact evolution tracking, and worldview snapshots - tools/temporal_kg_tool.py: Tool integration with functions for storing facts with time, querying historical state, generating temporal summaries, and natural language temporal queries - tests/test_temporal_kg.py: Comprehensive test coverage including storage tests, query operators, historical summaries, and integration tests
422 lines
15 KiB
Python
422 lines
15 KiB
Python
"""Temporal Knowledge Graph for Hermes Agent.
|
|
|
|
Provides a time-aware triple-store (Subject, Predicate, Object) with temporal
|
|
metadata (valid_from, valid_until, timestamp) enabling "time travel" queries
|
|
over Timmy's evolving worldview.
|
|
|
|
Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS)
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TemporalOperator(Enum):
|
|
"""Temporal query operators for time-based filtering."""
|
|
BEFORE = "before"
|
|
AFTER = "after"
|
|
DURING = "during"
|
|
OVERLAPS = "overlaps"
|
|
AT = "at"
|
|
|
|
|
|
@dataclass
|
|
class TemporalTriple:
|
|
"""A triple with temporal metadata."""
|
|
id: str
|
|
subject: str
|
|
predicate: str
|
|
object: str
|
|
valid_from: str # ISO 8601 datetime
|
|
valid_until: Optional[str] # ISO 8601 datetime, None means still valid
|
|
timestamp: str # When this fact was recorded
|
|
version: int = 1
|
|
superseded_by: Optional[str] = None # ID of the triple that superseded this
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return asdict(self)
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple":
|
|
return cls(**data)
|
|
|
|
|
|
class TemporalTripleStore:
|
|
"""SQLite-backed temporal triple store with versioning support."""
|
|
|
|
def __init__(self, db_path: Optional[str] = None):
|
|
"""Initialize the temporal triple store.
|
|
|
|
Args:
|
|
db_path: Path to SQLite database. If None, uses default local path.
|
|
"""
|
|
if db_path is None:
|
|
# Default to local-first storage in user's home
|
|
home = Path.home()
|
|
db_dir = home / ".hermes" / "temporal_kg"
|
|
db_dir.mkdir(parents=True, exist_ok=True)
|
|
db_path = db_dir / "temporal_kg.db"
|
|
|
|
self.db_path = str(db_path)
|
|
self._init_db()
|
|
|
|
def _init_db(self):
|
|
"""Initialize the SQLite database with required tables."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS temporal_triples (
|
|
id TEXT PRIMARY KEY,
|
|
subject TEXT NOT NULL,
|
|
predicate TEXT NOT NULL,
|
|
object TEXT NOT NULL,
|
|
valid_from TEXT NOT NULL,
|
|
valid_until TEXT,
|
|
timestamp TEXT NOT NULL,
|
|
version INTEGER DEFAULT 1,
|
|
superseded_by TEXT,
|
|
FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id)
|
|
)
|
|
""")
|
|
|
|
# Create indexes for efficient querying
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp)
|
|
""")
|
|
conn.execute("""
|
|
CREATE INDEX IF NOT EXISTS idx_subject_predicate
|
|
ON temporal_triples(subject, predicate)
|
|
""")
|
|
|
|
conn.commit()
|
|
|
|
def _now(self) -> str:
|
|
"""Get current time in ISO 8601 format."""
|
|
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
|
|
|
|
def _generate_id(self) -> str:
|
|
"""Generate a unique ID for a triple."""
|
|
return f"{self._now()}_{uuid.uuid4().hex[:8]}"
|
|
|
|
def store_fact(
|
|
self,
|
|
subject: str,
|
|
predicate: str,
|
|
object: str,
|
|
valid_from: Optional[str] = None,
|
|
valid_until: Optional[str] = None
|
|
) -> TemporalTriple:
|
|
"""Store a fact with temporal bounds.
|
|
|
|
Args:
|
|
subject: The subject of the triple
|
|
predicate: The predicate/relationship
|
|
object: The object/value
|
|
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
|
|
valid_until: When this fact expires (ISO 8601). None means forever valid.
|
|
|
|
Returns:
|
|
The stored TemporalTriple
|
|
"""
|
|
if valid_from is None:
|
|
valid_from = self._now()
|
|
|
|
# Check if there's an existing fact for this subject-predicate
|
|
existing = self._get_current_fact(subject, predicate)
|
|
|
|
triple = TemporalTriple(
|
|
id=self._generate_id(),
|
|
subject=subject,
|
|
predicate=predicate,
|
|
object=object,
|
|
valid_from=valid_from,
|
|
valid_until=valid_until,
|
|
timestamp=self._now()
|
|
)
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
# If there's an existing fact, mark it as superseded
|
|
if existing:
|
|
existing.valid_until = valid_from
|
|
existing.superseded_by = triple.id
|
|
self._update_triple(conn, existing)
|
|
triple.version = existing.version + 1
|
|
|
|
# Insert the new fact
|
|
self._insert_triple(conn, triple)
|
|
conn.commit()
|
|
|
|
logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})")
|
|
return triple
|
|
|
|
def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]:
|
|
"""Get the current (most recent, still valid) fact for a subject-predicate pair."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM temporal_triples
|
|
WHERE subject = ? AND predicate = ? AND valid_until IS NULL
|
|
ORDER BY timestamp DESC LIMIT 1
|
|
""",
|
|
(subject, predicate)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row:
|
|
return self._row_to_triple(row)
|
|
return None
|
|
|
|
def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
|
"""Insert a triple into the database."""
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO temporal_triples
|
|
(id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
triple.id, triple.subject, triple.predicate, triple.object,
|
|
triple.valid_from, triple.valid_until, triple.timestamp,
|
|
triple.version, triple.superseded_by
|
|
)
|
|
)
|
|
|
|
def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
|
"""Update an existing triple."""
|
|
conn.execute(
|
|
"""
|
|
UPDATE temporal_triples
|
|
SET valid_until = ?, superseded_by = ?
|
|
WHERE id = ?
|
|
""",
|
|
(triple.valid_until, triple.superseded_by, triple.id)
|
|
)
|
|
|
|
def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple:
|
|
"""Convert a database row to a TemporalTriple."""
|
|
return TemporalTriple(
|
|
id=row[0],
|
|
subject=row[1],
|
|
predicate=row[2],
|
|
object=row[3],
|
|
valid_from=row[4],
|
|
valid_until=row[5],
|
|
timestamp=row[6],
|
|
version=row[7],
|
|
superseded_by=row[8]
|
|
)
|
|
|
|
def query_at_time(
|
|
self,
|
|
timestamp: str,
|
|
subject: Optional[str] = None,
|
|
predicate: Optional[str] = None
|
|
) -> List[TemporalTriple]:
|
|
"""Query facts that were valid at a specific point in time.
|
|
|
|
Args:
|
|
timestamp: The point in time to query (ISO 8601)
|
|
subject: Optional subject filter
|
|
predicate: Optional predicate filter
|
|
|
|
Returns:
|
|
List of TemporalTriple objects valid at that time
|
|
"""
|
|
query = """
|
|
SELECT * FROM temporal_triples
|
|
WHERE valid_from <= ?
|
|
AND (valid_until IS NULL OR valid_until > ?)
|
|
"""
|
|
params = [timestamp, timestamp]
|
|
|
|
if subject:
|
|
query += " AND subject = ?"
|
|
params.append(subject)
|
|
if predicate:
|
|
query += " AND predicate = ?"
|
|
params.append(predicate)
|
|
|
|
query += " ORDER BY timestamp DESC"
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(query, params)
|
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
|
|
|
def query_temporal(
|
|
self,
|
|
operator: TemporalOperator,
|
|
timestamp: str,
|
|
subject: Optional[str] = None,
|
|
predicate: Optional[str] = None
|
|
) -> List[TemporalTriple]:
|
|
"""Query using temporal operators.
|
|
|
|
Args:
|
|
operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT)
|
|
timestamp: Reference timestamp (ISO 8601)
|
|
subject: Optional subject filter
|
|
predicate: Optional predicate filter
|
|
|
|
Returns:
|
|
List of matching TemporalTriple objects
|
|
"""
|
|
base_query = "SELECT * FROM temporal_triples WHERE 1=1"
|
|
params = []
|
|
|
|
if subject:
|
|
base_query += " AND subject = ?"
|
|
params.append(subject)
|
|
if predicate:
|
|
base_query += " AND predicate = ?"
|
|
params.append(predicate)
|
|
|
|
if operator == TemporalOperator.BEFORE:
|
|
base_query += " AND valid_from < ?"
|
|
params.append(timestamp)
|
|
elif operator == TemporalOperator.AFTER:
|
|
base_query += " AND valid_from > ?"
|
|
params.append(timestamp)
|
|
elif operator == TemporalOperator.DURING:
|
|
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
|
params.extend([timestamp, timestamp])
|
|
elif operator == TemporalOperator.OVERLAPS:
|
|
# Facts that overlap with a time point (same as DURING)
|
|
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
|
params.extend([timestamp, timestamp])
|
|
elif operator == TemporalOperator.AT:
|
|
# Exact match for valid_at query
|
|
return self.query_at_time(timestamp, subject, predicate)
|
|
|
|
base_query += " ORDER BY timestamp DESC"
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(base_query, params)
|
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
|
|
|
def get_fact_history(
|
|
self,
|
|
subject: str,
|
|
predicate: str
|
|
) -> List[TemporalTriple]:
|
|
"""Get the complete version history of a fact.
|
|
|
|
Args:
|
|
subject: The subject to query
|
|
predicate: The predicate to query
|
|
|
|
Returns:
|
|
List of all versions of the fact, ordered by timestamp
|
|
"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM temporal_triples
|
|
WHERE subject = ? AND predicate = ?
|
|
ORDER BY timestamp ASC
|
|
""",
|
|
(subject, predicate)
|
|
)
|
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
|
|
|
def get_all_facts_for_entity(
|
|
self,
|
|
subject: str,
|
|
at_time: Optional[str] = None
|
|
) -> List[TemporalTriple]:
|
|
"""Get all facts about an entity, optionally at a specific time.
|
|
|
|
Args:
|
|
subject: The entity to query
|
|
at_time: Optional timestamp to query at
|
|
|
|
Returns:
|
|
List of TemporalTriple objects
|
|
"""
|
|
if at_time:
|
|
return self.query_at_time(at_time, subject=subject)
|
|
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM temporal_triples
|
|
WHERE subject = ?
|
|
ORDER BY timestamp DESC
|
|
""",
|
|
(subject,)
|
|
)
|
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
|
|
|
def get_entity_changes(
|
|
self,
|
|
subject: str,
|
|
start_time: str,
|
|
end_time: str
|
|
) -> List[TemporalTriple]:
|
|
"""Get all facts that changed for an entity during a time range.
|
|
|
|
Args:
|
|
subject: The entity to query
|
|
start_time: Start of time range (ISO 8601)
|
|
end_time: End of time range (ISO 8601)
|
|
|
|
Returns:
|
|
List of TemporalTriple objects that changed in the range
|
|
"""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute(
|
|
"""
|
|
SELECT * FROM temporal_triples
|
|
WHERE subject = ?
|
|
AND ((valid_from >= ? AND valid_from <= ?)
|
|
OR (valid_until >= ? AND valid_until <= ?))
|
|
ORDER BY timestamp ASC
|
|
""",
|
|
(subject, start_time, end_time, start_time, end_time)
|
|
)
|
|
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
|
|
|
def close(self):
|
|
"""Close the database connection (no-op for SQLite with context managers)."""
|
|
pass
|
|
|
|
def export_to_json(self) -> str:
|
|
"""Export all triples to JSON format."""
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC")
|
|
triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()]
|
|
return json.dumps(triples, indent=2)
|
|
|
|
def import_from_json(self, json_data: str):
|
|
"""Import triples from JSON format."""
|
|
triples = json.loads(json_data)
|
|
with sqlite3.connect(self.db_path) as conn:
|
|
for triple_dict in triples:
|
|
triple = TemporalTriple.from_dict(triple_dict)
|
|
self._insert_triple(conn, triple)
|
|
conn.commit()
|