feat: Issue #39 - temporal knowledge graph with versioning and reasoning
Implement Phase 28: Sovereign Knowledge Graph 'Time Travel' - agent/temporal_knowledge_graph.py: SQLite-backed temporal triple store with versioning, validity periods, and temporal query operators (BEFORE, AFTER, DURING, OVERLAPS, AT) - agent/temporal_reasoning.py: Temporal reasoning engine supporting historical queries, fact evolution tracking, and worldview snapshots - tools/temporal_kg_tool.py: Tool integration with functions for storing facts with time, querying historical state, generating temporal summaries, and natural language temporal queries - tests/test_temporal_kg.py: Comprehensive test coverage including storage tests, query operators, historical summaries, and integration tests
This commit is contained in:
421
agent/temporal_knowledge_graph.py
Normal file
421
agent/temporal_knowledge_graph.py
Normal file
@@ -0,0 +1,421 @@
|
||||
"""Temporal Knowledge Graph for Hermes Agent.
|
||||
|
||||
Provides a time-aware triple-store (Subject, Predicate, Object) with temporal
|
||||
metadata (valid_from, valid_until, timestamp) enabling "time travel" queries
|
||||
over Timmy's evolving worldview.
|
||||
|
||||
Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS)
|
||||
"""
|
||||
|
||||
import json
|
||||
import sqlite3
|
||||
import logging
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from dataclasses import dataclass, asdict
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TemporalOperator(Enum):
|
||||
"""Temporal query operators for time-based filtering."""
|
||||
BEFORE = "before"
|
||||
AFTER = "after"
|
||||
DURING = "during"
|
||||
OVERLAPS = "overlaps"
|
||||
AT = "at"
|
||||
|
||||
|
||||
@dataclass
|
||||
class TemporalTriple:
|
||||
"""A triple with temporal metadata."""
|
||||
id: str
|
||||
subject: str
|
||||
predicate: str
|
||||
object: str
|
||||
valid_from: str # ISO 8601 datetime
|
||||
valid_until: Optional[str] # ISO 8601 datetime, None means still valid
|
||||
timestamp: str # When this fact was recorded
|
||||
version: int = 1
|
||||
superseded_by: Optional[str] = None # ID of the triple that superseded this
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple":
|
||||
return cls(**data)
|
||||
|
||||
|
||||
class TemporalTripleStore:
|
||||
"""SQLite-backed temporal triple store with versioning support."""
|
||||
|
||||
def __init__(self, db_path: Optional[str] = None):
|
||||
"""Initialize the temporal triple store.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database. If None, uses default local path.
|
||||
"""
|
||||
if db_path is None:
|
||||
# Default to local-first storage in user's home
|
||||
home = Path.home()
|
||||
db_dir = home / ".hermes" / "temporal_kg"
|
||||
db_dir.mkdir(parents=True, exist_ok=True)
|
||||
db_path = db_dir / "temporal_kg.db"
|
||||
|
||||
self.db_path = str(db_path)
|
||||
self._init_db()
|
||||
|
||||
def _init_db(self):
|
||||
"""Initialize the SQLite database with required tables."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS temporal_triples (
|
||||
id TEXT PRIMARY KEY,
|
||||
subject TEXT NOT NULL,
|
||||
predicate TEXT NOT NULL,
|
||||
object TEXT NOT NULL,
|
||||
valid_from TEXT NOT NULL,
|
||||
valid_until TEXT,
|
||||
timestamp TEXT NOT NULL,
|
||||
version INTEGER DEFAULT 1,
|
||||
superseded_by TEXT,
|
||||
FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id)
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for efficient querying
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp)
|
||||
""")
|
||||
conn.execute("""
|
||||
CREATE INDEX IF NOT EXISTS idx_subject_predicate
|
||||
ON temporal_triples(subject, predicate)
|
||||
""")
|
||||
|
||||
conn.commit()
|
||||
|
||||
def _now(self) -> str:
|
||||
"""Get current time in ISO 8601 format."""
|
||||
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
def _generate_id(self) -> str:
|
||||
"""Generate a unique ID for a triple."""
|
||||
return f"{self._now()}_{uuid.uuid4().hex[:8]}"
|
||||
|
||||
def store_fact(
|
||||
self,
|
||||
subject: str,
|
||||
predicate: str,
|
||||
object: str,
|
||||
valid_from: Optional[str] = None,
|
||||
valid_until: Optional[str] = None
|
||||
) -> TemporalTriple:
|
||||
"""Store a fact with temporal bounds.
|
||||
|
||||
Args:
|
||||
subject: The subject of the triple
|
||||
predicate: The predicate/relationship
|
||||
object: The object/value
|
||||
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
|
||||
valid_until: When this fact expires (ISO 8601). None means forever valid.
|
||||
|
||||
Returns:
|
||||
The stored TemporalTriple
|
||||
"""
|
||||
if valid_from is None:
|
||||
valid_from = self._now()
|
||||
|
||||
# Check if there's an existing fact for this subject-predicate
|
||||
existing = self._get_current_fact(subject, predicate)
|
||||
|
||||
triple = TemporalTriple(
|
||||
id=self._generate_id(),
|
||||
subject=subject,
|
||||
predicate=predicate,
|
||||
object=object,
|
||||
valid_from=valid_from,
|
||||
valid_until=valid_until,
|
||||
timestamp=self._now()
|
||||
)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
# If there's an existing fact, mark it as superseded
|
||||
if existing:
|
||||
existing.valid_until = valid_from
|
||||
existing.superseded_by = triple.id
|
||||
self._update_triple(conn, existing)
|
||||
triple.version = existing.version + 1
|
||||
|
||||
# Insert the new fact
|
||||
self._insert_triple(conn, triple)
|
||||
conn.commit()
|
||||
|
||||
logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})")
|
||||
return triple
|
||||
|
||||
def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]:
|
||||
"""Get the current (most recent, still valid) fact for a subject-predicate pair."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ? AND predicate = ? AND valid_until IS NULL
|
||||
ORDER BY timestamp DESC LIMIT 1
|
||||
""",
|
||||
(subject, predicate)
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
return self._row_to_triple(row)
|
||||
return None
|
||||
|
||||
def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
||||
"""Insert a triple into the database."""
|
||||
conn.execute(
|
||||
"""
|
||||
INSERT INTO temporal_triples
|
||||
(id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
triple.id, triple.subject, triple.predicate, triple.object,
|
||||
triple.valid_from, triple.valid_until, triple.timestamp,
|
||||
triple.version, triple.superseded_by
|
||||
)
|
||||
)
|
||||
|
||||
def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple):
|
||||
"""Update an existing triple."""
|
||||
conn.execute(
|
||||
"""
|
||||
UPDATE temporal_triples
|
||||
SET valid_until = ?, superseded_by = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
(triple.valid_until, triple.superseded_by, triple.id)
|
||||
)
|
||||
|
||||
def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple:
|
||||
"""Convert a database row to a TemporalTriple."""
|
||||
return TemporalTriple(
|
||||
id=row[0],
|
||||
subject=row[1],
|
||||
predicate=row[2],
|
||||
object=row[3],
|
||||
valid_from=row[4],
|
||||
valid_until=row[5],
|
||||
timestamp=row[6],
|
||||
version=row[7],
|
||||
superseded_by=row[8]
|
||||
)
|
||||
|
||||
def query_at_time(
|
||||
self,
|
||||
timestamp: str,
|
||||
subject: Optional[str] = None,
|
||||
predicate: Optional[str] = None
|
||||
) -> List[TemporalTriple]:
|
||||
"""Query facts that were valid at a specific point in time.
|
||||
|
||||
Args:
|
||||
timestamp: The point in time to query (ISO 8601)
|
||||
subject: Optional subject filter
|
||||
predicate: Optional predicate filter
|
||||
|
||||
Returns:
|
||||
List of TemporalTriple objects valid at that time
|
||||
"""
|
||||
query = """
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE valid_from <= ?
|
||||
AND (valid_until IS NULL OR valid_until > ?)
|
||||
"""
|
||||
params = [timestamp, timestamp]
|
||||
|
||||
if subject:
|
||||
query += " AND subject = ?"
|
||||
params.append(subject)
|
||||
if predicate:
|
||||
query += " AND predicate = ?"
|
||||
params.append(predicate)
|
||||
|
||||
query += " ORDER BY timestamp DESC"
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(query, params)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def query_temporal(
|
||||
self,
|
||||
operator: TemporalOperator,
|
||||
timestamp: str,
|
||||
subject: Optional[str] = None,
|
||||
predicate: Optional[str] = None
|
||||
) -> List[TemporalTriple]:
|
||||
"""Query using temporal operators.
|
||||
|
||||
Args:
|
||||
operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT)
|
||||
timestamp: Reference timestamp (ISO 8601)
|
||||
subject: Optional subject filter
|
||||
predicate: Optional predicate filter
|
||||
|
||||
Returns:
|
||||
List of matching TemporalTriple objects
|
||||
"""
|
||||
base_query = "SELECT * FROM temporal_triples WHERE 1=1"
|
||||
params = []
|
||||
|
||||
if subject:
|
||||
base_query += " AND subject = ?"
|
||||
params.append(subject)
|
||||
if predicate:
|
||||
base_query += " AND predicate = ?"
|
||||
params.append(predicate)
|
||||
|
||||
if operator == TemporalOperator.BEFORE:
|
||||
base_query += " AND valid_from < ?"
|
||||
params.append(timestamp)
|
||||
elif operator == TemporalOperator.AFTER:
|
||||
base_query += " AND valid_from > ?"
|
||||
params.append(timestamp)
|
||||
elif operator == TemporalOperator.DURING:
|
||||
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
||||
params.extend([timestamp, timestamp])
|
||||
elif operator == TemporalOperator.OVERLAPS:
|
||||
# Facts that overlap with a time point (same as DURING)
|
||||
base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)"
|
||||
params.extend([timestamp, timestamp])
|
||||
elif operator == TemporalOperator.AT:
|
||||
# Exact match for valid_at query
|
||||
return self.query_at_time(timestamp, subject, predicate)
|
||||
|
||||
base_query += " ORDER BY timestamp DESC"
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(base_query, params)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_fact_history(
|
||||
self,
|
||||
subject: str,
|
||||
predicate: str
|
||||
) -> List[TemporalTriple]:
|
||||
"""Get the complete version history of a fact.
|
||||
|
||||
Args:
|
||||
subject: The subject to query
|
||||
predicate: The predicate to query
|
||||
|
||||
Returns:
|
||||
List of all versions of the fact, ordered by timestamp
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ? AND predicate = ?
|
||||
ORDER BY timestamp ASC
|
||||
""",
|
||||
(subject, predicate)
|
||||
)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_all_facts_for_entity(
|
||||
self,
|
||||
subject: str,
|
||||
at_time: Optional[str] = None
|
||||
) -> List[TemporalTriple]:
|
||||
"""Get all facts about an entity, optionally at a specific time.
|
||||
|
||||
Args:
|
||||
subject: The entity to query
|
||||
at_time: Optional timestamp to query at
|
||||
|
||||
Returns:
|
||||
List of TemporalTriple objects
|
||||
"""
|
||||
if at_time:
|
||||
return self.query_at_time(at_time, subject=subject)
|
||||
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ?
|
||||
ORDER BY timestamp DESC
|
||||
""",
|
||||
(subject,)
|
||||
)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def get_entity_changes(
|
||||
self,
|
||||
subject: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> List[TemporalTriple]:
|
||||
"""Get all facts that changed for an entity during a time range.
|
||||
|
||||
Args:
|
||||
subject: The entity to query
|
||||
start_time: Start of time range (ISO 8601)
|
||||
end_time: End of time range (ISO 8601)
|
||||
|
||||
Returns:
|
||||
List of TemporalTriple objects that changed in the range
|
||||
"""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute(
|
||||
"""
|
||||
SELECT * FROM temporal_triples
|
||||
WHERE subject = ?
|
||||
AND ((valid_from >= ? AND valid_from <= ?)
|
||||
OR (valid_until >= ? AND valid_until <= ?))
|
||||
ORDER BY timestamp ASC
|
||||
""",
|
||||
(subject, start_time, end_time, start_time, end_time)
|
||||
)
|
||||
return [self._row_to_triple(row) for row in cursor.fetchall()]
|
||||
|
||||
def close(self):
|
||||
"""Close the database connection (no-op for SQLite with context managers)."""
|
||||
pass
|
||||
|
||||
def export_to_json(self) -> str:
|
||||
"""Export all triples to JSON format."""
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC")
|
||||
triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()]
|
||||
return json.dumps(triples, indent=2)
|
||||
|
||||
def import_from_json(self, json_data: str):
|
||||
"""Import triples from JSON format."""
|
||||
triples = json.loads(json_data)
|
||||
with sqlite3.connect(self.db_path) as conn:
|
||||
for triple_dict in triples:
|
||||
triple = TemporalTriple.from_dict(triple_dict)
|
||||
self._insert_triple(conn, triple)
|
||||
conn.commit()
|
||||
434
agent/temporal_reasoning.py
Normal file
434
agent/temporal_reasoning.py
Normal file
@@ -0,0 +1,434 @@
|
||||
"""Temporal Reasoning Engine for Hermes Agent.
|
||||
|
||||
Enables Timmy to reason about past and future states, generate historical
|
||||
summaries, and perform temporal inference over the evolving knowledge graph.
|
||||
|
||||
Queries supported:
|
||||
- "What was Timmy's view on sovereignty before March 2026?"
|
||||
- "When did we first learn about MLX integration?"
|
||||
- "How has the codebase changed since the security audit?"
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Dict, Any, Optional, Tuple
|
||||
from datetime import datetime, timedelta
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
from agent.temporal_knowledge_graph import (
|
||||
TemporalTripleStore, TemporalTriple, TemporalOperator
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ChangeType(Enum):
|
||||
"""Types of changes in the knowledge graph."""
|
||||
ADDED = "added"
|
||||
REMOVED = "removed"
|
||||
MODIFIED = "modified"
|
||||
SUPERSEDED = "superseded"
|
||||
|
||||
|
||||
@dataclass
|
||||
class FactChange:
|
||||
"""Represents a change in a fact over time."""
|
||||
change_type: ChangeType
|
||||
subject: str
|
||||
predicate: str
|
||||
old_value: Optional[str]
|
||||
new_value: Optional[str]
|
||||
timestamp: str
|
||||
version: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class HistoricalSummary:
|
||||
"""Summary of how an entity or concept evolved over time."""
|
||||
entity: str
|
||||
start_time: str
|
||||
end_time: str
|
||||
total_changes: int
|
||||
key_facts: List[Dict[str, Any]]
|
||||
evolution_timeline: List[FactChange]
|
||||
current_state: List[Dict[str, Any]]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"entity": self.entity,
|
||||
"start_time": self.start_time,
|
||||
"end_time": self.end_time,
|
||||
"total_changes": self.total_changes,
|
||||
"key_facts": self.key_facts,
|
||||
"evolution_timeline": [
|
||||
{
|
||||
"change_type": c.change_type.value,
|
||||
"subject": c.subject,
|
||||
"predicate": c.predicate,
|
||||
"old_value": c.old_value,
|
||||
"new_value": c.new_value,
|
||||
"timestamp": c.timestamp,
|
||||
"version": c.version
|
||||
}
|
||||
for c in self.evolution_timeline
|
||||
],
|
||||
"current_state": self.current_state
|
||||
}
|
||||
|
||||
|
||||
class TemporalReasoner:
|
||||
"""Reasoning engine for temporal knowledge graphs."""
|
||||
|
||||
def __init__(self, store: Optional[TemporalTripleStore] = None):
|
||||
"""Initialize the temporal reasoner.
|
||||
|
||||
Args:
|
||||
store: Optional TemporalTripleStore instance. Creates new if None.
|
||||
"""
|
||||
self.store = store or TemporalTripleStore()
|
||||
|
||||
def what_did_we_believe(
|
||||
self,
|
||||
subject: str,
|
||||
before_time: str
|
||||
) -> List[TemporalTriple]:
|
||||
"""Query: "What did we believe about X before Y happened?"
|
||||
|
||||
Args:
|
||||
subject: The entity to query about
|
||||
before_time: The cutoff time (ISO 8601)
|
||||
|
||||
Returns:
|
||||
List of facts believed before the given time
|
||||
"""
|
||||
# Get facts that were valid just before the given time
|
||||
return self.store.query_temporal(
|
||||
TemporalOperator.BEFORE,
|
||||
before_time,
|
||||
subject=subject
|
||||
)
|
||||
|
||||
def when_did_we_learn(
|
||||
self,
|
||||
subject: str,
|
||||
predicate: Optional[str] = None,
|
||||
object: Optional[str] = None
|
||||
) -> Optional[str]:
|
||||
"""Query: "When did we first learn about X?"
|
||||
|
||||
Args:
|
||||
subject: The subject to search for
|
||||
predicate: Optional predicate filter
|
||||
object: Optional object filter
|
||||
|
||||
Returns:
|
||||
Timestamp of first knowledge, or None if never learned
|
||||
"""
|
||||
history = self.store.get_fact_history(subject, predicate or "")
|
||||
|
||||
# Filter by object if specified
|
||||
if object:
|
||||
history = [h for h in history if h.object == object]
|
||||
|
||||
if history:
|
||||
# Return the earliest timestamp
|
||||
earliest = min(history, key=lambda x: x.timestamp)
|
||||
return earliest.timestamp
|
||||
return None
|
||||
|
||||
def how_has_it_changed(
|
||||
self,
|
||||
subject: str,
|
||||
since_time: str
|
||||
) -> List[FactChange]:
|
||||
"""Query: "How has X changed since Y?"
|
||||
|
||||
Args:
|
||||
subject: The entity to analyze
|
||||
since_time: The starting time (ISO 8601)
|
||||
|
||||
Returns:
|
||||
List of changes since the given time
|
||||
"""
|
||||
now = datetime.now().isoformat()
|
||||
changes = self.store.get_entity_changes(subject, since_time, now)
|
||||
|
||||
fact_changes = []
|
||||
for i, triple in enumerate(changes):
|
||||
# Determine change type
|
||||
if i == 0:
|
||||
change_type = ChangeType.ADDED
|
||||
old_value = None
|
||||
else:
|
||||
prev = changes[i - 1]
|
||||
if triple.object != prev.object:
|
||||
change_type = ChangeType.MODIFIED
|
||||
old_value = prev.object
|
||||
else:
|
||||
change_type = ChangeType.SUPERSEDED
|
||||
old_value = prev.object
|
||||
|
||||
fact_changes.append(FactChange(
|
||||
change_type=change_type,
|
||||
subject=triple.subject,
|
||||
predicate=triple.predicate,
|
||||
old_value=old_value,
|
||||
new_value=triple.object,
|
||||
timestamp=triple.timestamp,
|
||||
version=triple.version
|
||||
))
|
||||
|
||||
return fact_changes
|
||||
|
||||
def generate_temporal_summary(
|
||||
self,
|
||||
entity: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> HistoricalSummary:
|
||||
"""Generate a historical summary of an entity's evolution.
|
||||
|
||||
Args:
|
||||
entity: The entity to summarize
|
||||
start_time: Start of the time range (ISO 8601)
|
||||
end_time: End of the time range (ISO 8601)
|
||||
|
||||
Returns:
|
||||
HistoricalSummary containing the entity's evolution
|
||||
"""
|
||||
# Get all facts for the entity in the time range
|
||||
initial_state = self.store.query_at_time(start_time, subject=entity)
|
||||
final_state = self.store.query_at_time(end_time, subject=entity)
|
||||
changes = self.store.get_entity_changes(entity, start_time, end_time)
|
||||
|
||||
# Build evolution timeline
|
||||
evolution_timeline = []
|
||||
seen_predicates = set()
|
||||
|
||||
for triple in changes:
|
||||
if triple.predicate not in seen_predicates:
|
||||
seen_predicates.add(triple.predicate)
|
||||
evolution_timeline.append(FactChange(
|
||||
change_type=ChangeType.ADDED,
|
||||
subject=triple.subject,
|
||||
predicate=triple.predicate,
|
||||
old_value=None,
|
||||
new_value=triple.object,
|
||||
timestamp=triple.timestamp,
|
||||
version=triple.version
|
||||
))
|
||||
else:
|
||||
# Find previous value
|
||||
prev = [t for t in changes
|
||||
if t.predicate == triple.predicate
|
||||
and t.timestamp < triple.timestamp]
|
||||
old_value = prev[-1].object if prev else None
|
||||
|
||||
evolution_timeline.append(FactChange(
|
||||
change_type=ChangeType.MODIFIED,
|
||||
subject=triple.subject,
|
||||
predicate=triple.predicate,
|
||||
old_value=old_value,
|
||||
new_value=triple.object,
|
||||
timestamp=triple.timestamp,
|
||||
version=triple.version
|
||||
))
|
||||
|
||||
# Extract key facts (predicates that changed most)
|
||||
key_facts = []
|
||||
predicate_changes = {}
|
||||
for change in evolution_timeline:
|
||||
predicate_changes[change.predicate] = (
|
||||
predicate_changes.get(change.predicate, 0) + 1
|
||||
)
|
||||
|
||||
top_predicates = sorted(
|
||||
predicate_changes.items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True
|
||||
)[:5]
|
||||
|
||||
for pred, count in top_predicates:
|
||||
current = [t for t in final_state if t.predicate == pred]
|
||||
if current:
|
||||
key_facts.append({
|
||||
"predicate": pred,
|
||||
"current_value": current[0].object,
|
||||
"changes": count
|
||||
})
|
||||
|
||||
# Build current state
|
||||
current_state = [
|
||||
{
|
||||
"predicate": t.predicate,
|
||||
"object": t.object,
|
||||
"valid_from": t.valid_from,
|
||||
"valid_until": t.valid_until
|
||||
}
|
||||
for t in final_state
|
||||
]
|
||||
|
||||
return HistoricalSummary(
|
||||
entity=entity,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
total_changes=len(evolution_timeline),
|
||||
key_facts=key_facts,
|
||||
evolution_timeline=evolution_timeline,
|
||||
current_state=current_state
|
||||
)
|
||||
|
||||
def infer_temporal_relationship(
|
||||
self,
|
||||
fact_a: TemporalTriple,
|
||||
fact_b: TemporalTriple
|
||||
) -> Optional[str]:
|
||||
"""Infer temporal relationship between two facts.
|
||||
|
||||
Args:
|
||||
fact_a: First fact
|
||||
fact_b: Second fact
|
||||
|
||||
Returns:
|
||||
Description of temporal relationship, or None
|
||||
"""
|
||||
a_start = datetime.fromisoformat(fact_a.valid_from)
|
||||
a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None
|
||||
b_start = datetime.fromisoformat(fact_b.valid_from)
|
||||
b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None
|
||||
|
||||
# Check if A happened before B
|
||||
if a_end and a_end <= b_start:
|
||||
return "A happened before B"
|
||||
|
||||
# Check if B happened before A
|
||||
if b_end and b_end <= a_start:
|
||||
return "B happened before A"
|
||||
|
||||
# Check if they overlap
|
||||
if a_end and b_end:
|
||||
if a_start <= b_end and b_start <= a_end:
|
||||
return "A and B overlap in time"
|
||||
|
||||
# Check if one supersedes the other
|
||||
if fact_a.superseded_by == fact_b.id:
|
||||
return "B supersedes A"
|
||||
if fact_b.superseded_by == fact_a.id:
|
||||
return "A supersedes B"
|
||||
|
||||
return "A and B are temporally unrelated"
|
||||
|
||||
def get_worldview_at_time(
|
||||
self,
|
||||
timestamp: str,
|
||||
subjects: Optional[List[str]] = None
|
||||
) -> Dict[str, List[Dict[str, Any]]]:
|
||||
"""Get Timmy's complete worldview at a specific point in time.
|
||||
|
||||
Args:
|
||||
timestamp: The point in time (ISO 8601)
|
||||
subjects: Optional list of subjects to include. If None, includes all.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping subjects to their facts at that time
|
||||
"""
|
||||
worldview = {}
|
||||
|
||||
if subjects:
|
||||
for subject in subjects:
|
||||
facts = self.store.query_at_time(timestamp, subject=subject)
|
||||
if facts:
|
||||
worldview[subject] = [
|
||||
{
|
||||
"predicate": f.predicate,
|
||||
"object": f.object,
|
||||
"version": f.version
|
||||
}
|
||||
for f in facts
|
||||
]
|
||||
else:
|
||||
# Get all facts at that time
|
||||
all_facts = self.store.query_at_time(timestamp)
|
||||
for fact in all_facts:
|
||||
if fact.subject not in worldview:
|
||||
worldview[fact.subject] = []
|
||||
worldview[fact.subject].append({
|
||||
"predicate": fact.predicate,
|
||||
"object": fact.object,
|
||||
"version": fact.version
|
||||
})
|
||||
|
||||
return worldview
|
||||
|
||||
def find_knowledge_gaps(
|
||||
self,
|
||||
subject: str,
|
||||
expected_predicates: List[str]
|
||||
) -> List[str]:
|
||||
"""Find predicates that are missing or have expired for a subject.
|
||||
|
||||
Args:
|
||||
subject: The entity to check
|
||||
expected_predicates: List of predicates that should exist
|
||||
|
||||
Returns:
|
||||
List of missing predicate names
|
||||
"""
|
||||
now = datetime.now().isoformat()
|
||||
current_facts = self.store.query_at_time(now, subject=subject)
|
||||
current_predicates = {f.predicate for f in current_facts}
|
||||
|
||||
return [
|
||||
pred for pred in expected_predicates
|
||||
if pred not in current_predicates
|
||||
]
|
||||
|
||||
def export_reasoning_report(
|
||||
self,
|
||||
entity: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> str:
|
||||
"""Generate a human-readable reasoning report.
|
||||
|
||||
Args:
|
||||
entity: The entity to report on
|
||||
start_time: Start of the time range
|
||||
end_time: End of the time range
|
||||
|
||||
Returns:
|
||||
Formatted report string
|
||||
"""
|
||||
summary = self.generate_temporal_summary(entity, start_time, end_time)
|
||||
|
||||
report = f"""
|
||||
# Temporal Reasoning Report: {entity}
|
||||
|
||||
## Time Range
|
||||
- From: {start_time}
|
||||
- To: {end_time}
|
||||
|
||||
## Summary
|
||||
- Total Changes: {summary.total_changes}
|
||||
- Key Facts Tracked: {len(summary.key_facts)}
|
||||
|
||||
## Key Facts
|
||||
"""
|
||||
for fact in summary.key_facts:
|
||||
report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n"
|
||||
|
||||
report += "\n## Evolution Timeline\n"
|
||||
for change in summary.evolution_timeline[:10]: # Show first 10
|
||||
report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n"
|
||||
if change.old_value:
|
||||
report += f" - Changed from: {change.old_value}\n"
|
||||
report += f" - Changed to: {change.new_value}\n"
|
||||
|
||||
if len(summary.evolution_timeline) > 10:
|
||||
report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n"
|
||||
|
||||
report += "\n## Current State\n"
|
||||
for state in summary.current_state:
|
||||
report += f"- {state['predicate']}: {state['object']}\n"
|
||||
|
||||
return report
|
||||
473
tests/test_temporal_kg.py
Normal file
473
tests/test_temporal_kg.py
Normal file
@@ -0,0 +1,473 @@
|
||||
"""Tests for Temporal Knowledge Graph implementation.
|
||||
|
||||
Tests cover:
|
||||
- Temporal storage tests
|
||||
- Query operator tests (BEFORE, AFTER, DURING, OVERLAPS)
|
||||
- Historical summary tests
|
||||
- Integration with tools
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import tempfile
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from agent.temporal_knowledge_graph import (
|
||||
TemporalTripleStore, TemporalTriple, TemporalOperator
|
||||
)
|
||||
from agent.temporal_reasoning import (
|
||||
TemporalReasoner, ChangeType, HistoricalSummary
|
||||
)
|
||||
from tools.temporal_kg_tool import (
|
||||
store_fact_with_time,
|
||||
query_historical_state,
|
||||
get_fact_history,
|
||||
generate_temporal_summary,
|
||||
when_did_we_learn,
|
||||
how_has_it_changed,
|
||||
query_with_temporal_operator,
|
||||
get_worldview_at_time
|
||||
)
|
||||
|
||||
|
||||
class TestTemporalTripleStore:
|
||||
"""Tests for the TemporalTripleStore class."""
|
||||
|
||||
@pytest.fixture
|
||||
def store(self):
|
||||
"""Create a temporary store for testing."""
|
||||
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
|
||||
db_path = f.name
|
||||
store = TemporalTripleStore(db_path)
|
||||
yield store
|
||||
# Cleanup
|
||||
os.unlink(db_path)
|
||||
|
||||
def test_store_fact(self, store):
|
||||
"""Test storing a basic fact."""
|
||||
triple = store.store_fact("Timmy", "has_feature", "sovereignty")
|
||||
|
||||
assert triple.subject == "Timmy"
|
||||
assert triple.predicate == "has_feature"
|
||||
assert triple.object == "sovereignty"
|
||||
assert triple.version == 1
|
||||
assert triple.valid_until is None
|
||||
|
||||
def test_store_fact_with_validity_period(self, store):
|
||||
"""Test storing a fact with validity bounds."""
|
||||
valid_from = "2026-01-01T00:00:00"
|
||||
valid_until = "2026-12-31T23:59:59"
|
||||
|
||||
triple = store.store_fact(
|
||||
"Hermes",
|
||||
"status",
|
||||
"active",
|
||||
valid_from=valid_from,
|
||||
valid_until=valid_until
|
||||
)
|
||||
|
||||
assert triple.valid_from == valid_from
|
||||
assert triple.valid_until == valid_until
|
||||
|
||||
def test_fact_versioning(self, store):
|
||||
"""Test that facts are properly versioned."""
|
||||
# Store initial fact
|
||||
triple1 = store.store_fact("Timmy", "version", "1.0")
|
||||
assert triple1.version == 1
|
||||
|
||||
# Store updated fact
|
||||
triple2 = store.store_fact("Timmy", "version", "2.0")
|
||||
assert triple2.version == 2
|
||||
|
||||
# Check that first fact was superseded
|
||||
history = store.get_fact_history("Timmy", "version")
|
||||
assert len(history) == 2
|
||||
assert history[0].superseded_by == triple2.id
|
||||
|
||||
def test_query_at_time(self, store):
|
||||
"""Test querying facts at a specific time."""
|
||||
# Store facts at different times
|
||||
store.store_fact("Timmy", "status", "alpha", valid_from="2026-01-01T00:00:00")
|
||||
store.store_fact("Timmy", "status", "beta", valid_from="2026-03-01T00:00:00")
|
||||
store.store_fact("Timmy", "status", "stable", valid_from="2026-06-01T00:00:00")
|
||||
|
||||
# Query at different points
|
||||
feb_facts = store.query_at_time("2026-02-01T00:00:00", subject="Timmy")
|
||||
assert len(feb_facts) == 1
|
||||
assert feb_facts[0].object == "alpha"
|
||||
|
||||
may_facts = store.query_at_time("2026-05-01T00:00:00", subject="Timmy")
|
||||
assert len(may_facts) == 1
|
||||
assert may_facts[0].object == "beta"
|
||||
|
||||
jul_facts = store.query_at_time("2026-07-01T00:00:00", subject="Timmy")
|
||||
assert len(jul_facts) == 1
|
||||
assert jul_facts[0].object == "stable"
|
||||
|
||||
def test_query_temporal_operators(self, store):
|
||||
"""Test temporal query operators."""
|
||||
# Store some facts
|
||||
store.store_fact("A", "rel", "1", valid_from="2026-01-01T00:00:00")
|
||||
store.store_fact("B", "rel", "2", valid_from="2026-03-01T00:00:00")
|
||||
store.store_fact("C", "rel", "3", valid_from="2026-06-01T00:00:00")
|
||||
|
||||
# Test BEFORE
|
||||
before_april = store.query_temporal(
|
||||
TemporalOperator.BEFORE, "2026-04-01T00:00:00"
|
||||
)
|
||||
assert len(before_april) == 2 # A and B
|
||||
|
||||
# Test AFTER
|
||||
after_feb = store.query_temporal(
|
||||
TemporalOperator.AFTER, "2026-02-01T00:00:00"
|
||||
)
|
||||
assert len(after_feb) == 2 # B and C
|
||||
|
||||
# Test DURING (at a specific time)
|
||||
during_may = store.query_temporal(
|
||||
TemporalOperator.DURING, "2026-05-01T00:00:00"
|
||||
)
|
||||
assert len(during_may) == 1 # Only B is valid in May
|
||||
assert during_may[0].object == "2"
|
||||
|
||||
def test_get_fact_history(self, store):
|
||||
"""Test retrieving fact version history."""
|
||||
# Create multiple versions
|
||||
store.store_fact("Feature", "status", "planned", valid_from="2026-01-01T00:00:00")
|
||||
store.store_fact("Feature", "status", "in_progress", valid_from="2026-02-01T00:00:00")
|
||||
store.store_fact("Feature", "status", "completed", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
history = store.get_fact_history("Feature", "status")
|
||||
|
||||
assert len(history) == 3
|
||||
assert history[0].object == "planned"
|
||||
assert history[1].object == "in_progress"
|
||||
assert history[2].object == "completed"
|
||||
|
||||
# Check versions
|
||||
assert history[0].version == 1
|
||||
assert history[1].version == 2
|
||||
assert history[2].version == 3
|
||||
|
||||
def test_get_entity_changes(self, store):
|
||||
"""Test getting entity changes in a time range."""
|
||||
store.store_fact("Codebase", "feature", "auth", valid_from="2026-01-01T00:00:00")
|
||||
store.store_fact("Codebase", "feature", "logging", valid_from="2026-02-01T00:00:00")
|
||||
store.store_fact("Codebase", "feature", "metrics", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
changes = store.get_entity_changes(
|
||||
"Codebase",
|
||||
"2026-01-15T00:00:00",
|
||||
"2026-03-15T00:00:00"
|
||||
)
|
||||
|
||||
# Should include logging and metrics
|
||||
assert len(changes) >= 2
|
||||
|
||||
def test_export_import(self, store):
|
||||
"""Test exporting and importing data."""
|
||||
# Store some data
|
||||
store.store_fact("Test", "data", "value1")
|
||||
store.store_fact("Test", "data", "value2")
|
||||
|
||||
# Export
|
||||
json_data = store.export_to_json()
|
||||
assert "Test" in json_data
|
||||
assert "value1" in json_data
|
||||
assert "value2" in json_data
|
||||
|
||||
# Create new store and import
|
||||
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
|
||||
db_path2 = f.name
|
||||
|
||||
try:
|
||||
store2 = TemporalTripleStore(db_path2)
|
||||
store2.import_from_json(json_data)
|
||||
|
||||
# Verify imported data
|
||||
facts = store2.query_at_time(datetime.now().isoformat(), subject="Test")
|
||||
assert len(facts) >= 1
|
||||
finally:
|
||||
os.unlink(db_path2)
|
||||
|
||||
|
||||
class TestTemporalReasoner:
|
||||
"""Tests for the TemporalReasoner class."""
|
||||
|
||||
@pytest.fixture
|
||||
def reasoner(self):
|
||||
"""Create a temporary reasoner for testing."""
|
||||
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
|
||||
db_path = f.name
|
||||
store = TemporalTripleStore(db_path)
|
||||
reasoner = TemporalReasoner(store)
|
||||
yield reasoner
|
||||
os.unlink(db_path)
|
||||
|
||||
def test_what_did_we_believe(self, reasoner):
|
||||
"""Test "what did we believe" queries."""
|
||||
# Set up facts
|
||||
reasoner.store.store_fact("Timmy", "view", "optimistic", valid_from="2026-01-01T00:00:00")
|
||||
reasoner.store.store_fact("Timmy", "view", "cautious", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
# Query before March
|
||||
beliefs = reasoner.what_did_we_believe("Timmy", "2026-02-15T00:00:00")
|
||||
assert len(beliefs) == 1
|
||||
assert beliefs[0].object == "optimistic"
|
||||
|
||||
def test_when_did_we_learn(self, reasoner):
|
||||
"""Test "when did we learn" queries."""
|
||||
timestamp = "2026-02-15T10:30:00"
|
||||
reasoner.store.store_fact(
|
||||
"MLX",
|
||||
"integrated_with",
|
||||
"Hermes",
|
||||
valid_from=timestamp
|
||||
)
|
||||
|
||||
when = reasoner.when_did_we_learn("MLX", "integrated_with")
|
||||
assert when == timestamp
|
||||
|
||||
def test_how_has_it_changed(self, reasoner):
|
||||
"""Test "how has it changed" queries."""
|
||||
reasoner.store.store_fact("Security", "level", "low", valid_from="2026-01-01T00:00:00")
|
||||
reasoner.store.store_fact("Security", "level", "medium", valid_from="2026-02-01T00:00:00")
|
||||
reasoner.store.store_fact("Security", "level", "high", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
changes = reasoner.how_has_it_changed("Security", "2026-01-15T00:00:00")
|
||||
|
||||
assert len(changes) >= 2
|
||||
# Check that changes are properly categorized
|
||||
change_types = [c.change_type for c in changes]
|
||||
assert ChangeType.MODIFIED in change_types or ChangeType.ADDED in change_types
|
||||
|
||||
def test_generate_temporal_summary(self, reasoner):
|
||||
"""Test generating historical summaries."""
|
||||
# Create a history of changes
|
||||
reasoner.store.store_fact("Project", "status", "planning", valid_from="2026-01-01T00:00:00")
|
||||
reasoner.store.store_fact("Project", "status", "development", valid_from="2026-02-01T00:00:00")
|
||||
reasoner.store.store_fact("Project", "milestone", "alpha", valid_from="2026-02-15T00:00:00")
|
||||
reasoner.store.store_fact("Project", "status", "testing", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
summary = reasoner.generate_temporal_summary(
|
||||
"Project",
|
||||
"2026-01-01T00:00:00",
|
||||
"2026-04-01T00:00:00"
|
||||
)
|
||||
|
||||
assert summary.entity == "Project"
|
||||
assert summary.total_changes >= 3
|
||||
assert len(summary.evolution_timeline) >= 3
|
||||
assert len(summary.current_state) >= 1
|
||||
|
||||
def test_get_worldview_at_time(self, reasoner):
|
||||
"""Test getting complete worldview at a time."""
|
||||
reasoner.store.store_fact("Timmy", "mood", "happy", valid_from="2026-01-01T00:00:00")
|
||||
reasoner.store.store_fact("Timmy", "task", "coding", valid_from="2026-01-01T00:00:00")
|
||||
reasoner.store.store_fact("Hermes", "status", "active", valid_from="2026-01-01T00:00:00")
|
||||
|
||||
worldview = reasoner.get_worldview_at_time("2026-01-15T00:00:00")
|
||||
|
||||
assert "Timmy" in worldview
|
||||
assert "Hermes" in worldview
|
||||
assert len(worldview["Timmy"]) == 2
|
||||
|
||||
def test_infer_temporal_relationship(self, reasoner):
|
||||
"""Test temporal relationship inference."""
|
||||
triple_a = reasoner.store.store_fact("A", "rel", "1", valid_from="2026-01-01T00:00:00")
|
||||
triple_a.valid_until = "2026-02-01T00:00:00"
|
||||
|
||||
triple_b = reasoner.store.store_fact("B", "rel", "2", valid_from="2026-02-15T00:00:00")
|
||||
|
||||
rel = reasoner.infer_temporal_relationship(triple_a, triple_b)
|
||||
assert "before" in rel.lower()
|
||||
|
||||
|
||||
class TestTemporalKGTools:
|
||||
"""Tests for the temporal KG tool functions."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_singleton(self):
|
||||
"""Reset singleton instances before each test."""
|
||||
import tools.temporal_kg_tool as tool_module
|
||||
tool_module._store = None
|
||||
tool_module._reasoner = None
|
||||
yield
|
||||
tool_module._store = None
|
||||
tool_module._reasoner = None
|
||||
|
||||
def test_store_fact_with_time(self):
|
||||
"""Test the store_fact_with_time tool function."""
|
||||
result = store_fact_with_time(
|
||||
subject="Hermes Agent",
|
||||
predicate="has_feature",
|
||||
object="input_sanitizer",
|
||||
valid_from="2026-04-01T01:00:00"
|
||||
)
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["triple"]["subject"] == "Hermes Agent"
|
||||
assert result["triple"]["predicate"] == "has_feature"
|
||||
assert result["triple"]["object"] == "input_sanitizer"
|
||||
|
||||
def test_query_historical_state(self):
|
||||
"""Test the query_historical_state tool function."""
|
||||
# Store a fact first
|
||||
store_fact_with_time(
|
||||
subject="Timmy",
|
||||
predicate="view_on_sovereignty",
|
||||
object="strong",
|
||||
valid_from="2026-02-01T00:00:00"
|
||||
)
|
||||
|
||||
# Query it
|
||||
result = query_historical_state("Timmy", "2026-03-01T00:00:00")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["subject"] == "Timmy"
|
||||
assert result["fact_count"] == 1
|
||||
assert result["facts"][0]["object"] == "strong"
|
||||
|
||||
def test_get_fact_history(self):
|
||||
"""Test the get_fact_history tool function."""
|
||||
# Create version history
|
||||
store_fact_with_time("Feature", "status", "planned", valid_from="2026-01-01T00:00:00")
|
||||
store_fact_with_time("Feature", "status", "done", valid_from="2026-02-01T00:00:00")
|
||||
|
||||
result = get_fact_history("Feature", "status")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["version_count"] == 2
|
||||
assert len(result["versions"]) == 2
|
||||
|
||||
def test_when_did_we_learn(self):
|
||||
"""Test the when_did_we_learn tool function."""
|
||||
store_fact_with_time(
|
||||
"MLX",
|
||||
"integrated_with",
|
||||
"Hermes",
|
||||
valid_from="2026-03-15T12:00:00"
|
||||
)
|
||||
|
||||
result = when_did_we_learn("MLX", "integrated_with")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["first_known"] == "2026-03-15T12:00:00"
|
||||
|
||||
def test_how_has_it_changed(self):
|
||||
"""Test the how_has_it_changed tool function."""
|
||||
store_fact_with_time("Codebase", "feature_count", "10", valid_from="2026-01-01T00:00:00")
|
||||
store_fact_with_time("Codebase", "feature_count", "20", valid_from="2026-02-01T00:00:00")
|
||||
|
||||
result = how_has_it_changed("Codebase", "2026-01-15T00:00:00")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["change_count"] >= 1
|
||||
|
||||
def test_query_with_temporal_operator(self):
|
||||
"""Test the query_with_temporal_operator tool function."""
|
||||
store_fact_with_time("A", "rel", "1", valid_from="2026-01-01T00:00:00")
|
||||
store_fact_with_time("B", "rel", "2", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
result = query_with_temporal_operator("BEFORE", "2026-02-01T00:00:00")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["fact_count"] == 1
|
||||
assert result["facts"][0]["subject"] == "A"
|
||||
|
||||
def test_get_worldview_at_time(self):
|
||||
"""Test the get_worldview_at_time tool function."""
|
||||
store_fact_with_time("Timmy", "mood", "good", valid_from="2026-01-01T00:00:00")
|
||||
store_fact_with_time("Hermes", "status", "running", valid_from="2026-01-01T00:00:00")
|
||||
|
||||
result = get_worldview_at_time("2026-01-15T00:00:00")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["entity_count"] == 2
|
||||
|
||||
def test_generate_temporal_summary(self):
|
||||
"""Test the generate_temporal_summary tool function."""
|
||||
store_fact_with_time("Security", "level", "low", valid_from="2026-01-01T00:00:00")
|
||||
store_fact_with_time("Security", "level", "high", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
result = generate_temporal_summary("Security", "2026-01-01T00:00:00", "2026-04-01T00:00:00")
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["entity"] == "Security"
|
||||
assert result["summary"]["total_changes"] >= 1
|
||||
|
||||
|
||||
class TestIntegration:
|
||||
"""Integration tests for the complete temporal KG system."""
|
||||
|
||||
@pytest.fixture
|
||||
def system(self):
|
||||
"""Create a complete temporal KG system."""
|
||||
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
|
||||
db_path = f.name
|
||||
|
||||
store = TemporalTripleStore(db_path)
|
||||
reasoner = TemporalReasoner(store)
|
||||
|
||||
yield {"store": store, "reasoner": reasoner}
|
||||
|
||||
os.unlink(db_path)
|
||||
|
||||
def test_full_workflow(self, system):
|
||||
"""Test a complete temporal knowledge workflow."""
|
||||
store = system["store"]
|
||||
reasoner = system["reasoner"]
|
||||
|
||||
# 1. Store initial facts about a security audit
|
||||
store.store_fact("SecurityAudit", "status", "scheduled", valid_from="2026-01-01T00:00:00")
|
||||
store.store_fact("SecurityAudit", "auditor", "ExternalFirm", valid_from="2026-01-01T00:00:00")
|
||||
|
||||
# 2. Update as audit progresses
|
||||
store.store_fact("SecurityAudit", "status", "in_progress", valid_from="2026-02-01T00:00:00")
|
||||
store.store_fact("SecurityAudit", "findings", "none_yet", valid_from="2026-02-01T00:00:00")
|
||||
|
||||
# 3. Complete audit
|
||||
store.store_fact("SecurityAudit", "status", "completed", valid_from="2026-03-01T00:00:00")
|
||||
store.store_fact("SecurityAudit", "findings", "5_minor_issues", valid_from="2026-03-01T00:00:00")
|
||||
store.store_fact("SecurityAudit", "recommendation", "address_within_30_days", valid_from="2026-03-01T00:00:00")
|
||||
|
||||
# 4. Query historical state
|
||||
jan_state = reasoner.get_worldview_at_time("2026-01-15T00:00:00", ["SecurityAudit"])
|
||||
assert jan_state["SecurityAudit"][0]["predicate"] == "status"
|
||||
assert jan_state["SecurityAudit"][0]["object"] == "scheduled"
|
||||
|
||||
feb_state = reasoner.get_worldview_at_time("2026-02-15T00:00:00", ["SecurityAudit"])
|
||||
status_fact = [f for f in feb_state["SecurityAudit"] if f["predicate"] == "status"][0]
|
||||
assert status_fact["object"] == "in_progress"
|
||||
|
||||
# 5. Generate summary
|
||||
summary = reasoner.generate_temporal_summary(
|
||||
"SecurityAudit",
|
||||
"2026-01-01T00:00:00",
|
||||
"2026-04-01T00:00:00"
|
||||
)
|
||||
|
||||
assert summary.total_changes >= 5
|
||||
assert any(f["predicate"] == "status" for f in summary.key_facts)
|
||||
|
||||
# 6. Check when we learned about findings
|
||||
when = reasoner.when_did_we_learn("SecurityAudit", "findings")
|
||||
assert when is not None
|
||||
|
||||
def test_temporal_inference(self, system):
|
||||
"""Test temporal inference capabilities."""
|
||||
store = system["store"]
|
||||
reasoner = system["reasoner"]
|
||||
|
||||
# Store facts with temporal relationships
|
||||
triple_a = store.store_fact("EventA", "happened", "yes", valid_from="2026-01-01T00:00:00")
|
||||
triple_a.valid_until = "2026-01-31T23:59:59"
|
||||
|
||||
triple_b = store.store_fact("EventB", "happened", "yes", valid_from="2026-02-01T00:00:00")
|
||||
|
||||
# Infer relationship
|
||||
rel = reasoner.infer_temporal_relationship(triple_a, triple_b)
|
||||
assert "before" in rel.lower()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
491
tools/temporal_kg_tool.py
Normal file
491
tools/temporal_kg_tool.py
Normal file
@@ -0,0 +1,491 @@
|
||||
"""Temporal Knowledge Graph Tool for Hermes Agent.
|
||||
|
||||
Provides tool functions for storing and querying temporal facts,
|
||||
enabling Timmy to track how knowledge evolves over time.
|
||||
|
||||
Functions:
|
||||
- store_fact_with_time: Store a fact with temporal bounds
|
||||
- query_historical_state: Query facts valid at a specific time
|
||||
- get_fact_history: Get the version history of a fact
|
||||
- generate_temporal_summary: Generate a historical summary
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Dict, Any, Optional
|
||||
from datetime import datetime
|
||||
|
||||
from agent.temporal_knowledge_graph import TemporalTripleStore, TemporalOperator
|
||||
from agent.temporal_reasoning import TemporalReasoner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Global instances (singleton pattern)
|
||||
_store: Optional[TemporalTripleStore] = None
|
||||
_reasoner: Optional[TemporalReasoner] = None
|
||||
|
||||
|
||||
def _get_store() -> TemporalTripleStore:
|
||||
"""Get or create the temporal triple store singleton."""
|
||||
global _store
|
||||
if _store is None:
|
||||
_store = TemporalTripleStore()
|
||||
return _store
|
||||
|
||||
|
||||
def _get_reasoner() -> TemporalReasoner:
|
||||
"""Get or create the temporal reasoner singleton."""
|
||||
global _reasoner
|
||||
if _reasoner is None:
|
||||
_reasoner = TemporalReasoner(_get_store())
|
||||
return _reasoner
|
||||
|
||||
|
||||
def store_fact_with_time(
|
||||
subject: str,
|
||||
predicate: str,
|
||||
object: str,
|
||||
valid_from: Optional[str] = None,
|
||||
valid_until: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Store a fact with temporal metadata.
|
||||
|
||||
Args:
|
||||
subject: The subject of the fact (e.g., "Hermes Agent")
|
||||
predicate: The predicate/relationship (e.g., "has_feature")
|
||||
object: The object/value (e.g., "input_sanitizer")
|
||||
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
|
||||
valid_until: When this fact expires (ISO 8601). None means still valid.
|
||||
|
||||
Returns:
|
||||
Dictionary containing the stored triple details
|
||||
|
||||
Example:
|
||||
>>> store_fact_with_time(
|
||||
... subject="Hermes Agent",
|
||||
... predicate="has_feature",
|
||||
... object="input_sanitizer",
|
||||
... valid_from="2026-04-01T01:00:00"
|
||||
... )
|
||||
"""
|
||||
try:
|
||||
store = _get_store()
|
||||
triple = store.store_fact(subject, predicate, object, valid_from, valid_until)
|
||||
|
||||
logger.info(f"Stored temporal fact: {subject} {predicate} {object}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"triple": {
|
||||
"id": triple.id,
|
||||
"subject": triple.subject,
|
||||
"predicate": triple.predicate,
|
||||
"object": triple.object,
|
||||
"valid_from": triple.valid_from,
|
||||
"valid_until": triple.valid_until,
|
||||
"timestamp": triple.timestamp,
|
||||
"version": triple.version
|
||||
}
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to store temporal fact: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def query_historical_state(
|
||||
subject: str,
|
||||
timestamp: str,
|
||||
predicate: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Query what was known about a subject at a specific point in time.
|
||||
|
||||
Args:
|
||||
subject: The entity to query (e.g., "Timmy")
|
||||
timestamp: The point in time (ISO 8601, e.g., "2026-03-01T00:00:00")
|
||||
predicate: Optional predicate filter
|
||||
|
||||
Returns:
|
||||
Dictionary containing the facts valid at that time
|
||||
|
||||
Example:
|
||||
>>> query_historical_state("Timmy", "2026-03-01T00:00:00")
|
||||
# Returns facts valid at that time
|
||||
"""
|
||||
try:
|
||||
store = _get_store()
|
||||
facts = store.query_at_time(timestamp, subject=subject, predicate=predicate)
|
||||
|
||||
logger.info(f"Queried historical state for {subject} at {timestamp}: {len(facts)} facts")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"subject": subject,
|
||||
"timestamp": timestamp,
|
||||
"fact_count": len(facts),
|
||||
"facts": [
|
||||
{
|
||||
"predicate": f.predicate,
|
||||
"object": f.object,
|
||||
"valid_from": f.valid_from,
|
||||
"valid_until": f.valid_until,
|
||||
"version": f.version
|
||||
}
|
||||
for f in facts
|
||||
]
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to query historical state: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def get_fact_history(
|
||||
subject: str,
|
||||
predicate: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Get the complete version history of a fact.
|
||||
|
||||
Args:
|
||||
subject: The subject to query
|
||||
predicate: The predicate to query
|
||||
|
||||
Returns:
|
||||
Dictionary containing the version history
|
||||
|
||||
Example:
|
||||
>>> get_fact_history("Timmy", "view_on_sovereignty")
|
||||
# Returns all versions of this fact
|
||||
"""
|
||||
try:
|
||||
store = _get_store()
|
||||
history = store.get_fact_history(subject, predicate)
|
||||
|
||||
logger.info(f"Retrieved history for {subject} {predicate}: {len(history)} versions")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"subject": subject,
|
||||
"predicate": predicate,
|
||||
"version_count": len(history),
|
||||
"versions": [
|
||||
{
|
||||
"object": h.object,
|
||||
"valid_from": h.valid_from,
|
||||
"valid_until": h.valid_until,
|
||||
"timestamp": h.timestamp,
|
||||
"version": h.version,
|
||||
"superseded_by": h.superseded_by
|
||||
}
|
||||
for h in history
|
||||
]
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get fact history: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def generate_temporal_summary(
|
||||
entity: str,
|
||||
start_time: str,
|
||||
end_time: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Generate a historical summary of an entity's evolution.
|
||||
|
||||
Args:
|
||||
entity: The entity to summarize (e.g., "security_audit")
|
||||
start_time: Start of time range (ISO 8601)
|
||||
end_time: End of time range (ISO 8601)
|
||||
|
||||
Returns:
|
||||
Dictionary containing the historical summary
|
||||
|
||||
Example:
|
||||
>>> generate_temporal_summary("security_audit", "2026-03-01", "2026-04-01")
|
||||
# Returns evolution of security posture
|
||||
"""
|
||||
try:
|
||||
reasoner = _get_reasoner()
|
||||
summary = reasoner.generate_temporal_summary(entity, start_time, end_time)
|
||||
|
||||
logger.info(f"Generated temporal summary for {entity}: {summary.total_changes} changes")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"entity": entity,
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"summary": summary.to_dict()
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to generate temporal summary: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def when_did_we_learn(
|
||||
subject: str,
|
||||
predicate: Optional[str] = None,
|
||||
object: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Query when we first learned about something.
|
||||
|
||||
Args:
|
||||
subject: The subject to search for
|
||||
predicate: Optional predicate filter
|
||||
object: Optional object filter
|
||||
|
||||
Returns:
|
||||
Dictionary containing the timestamp of first knowledge
|
||||
|
||||
Example:
|
||||
>>> when_did_we_learn("MLX", predicate="integrated_with")
|
||||
# Returns when MLX integration was first recorded
|
||||
"""
|
||||
try:
|
||||
reasoner = _get_reasoner()
|
||||
timestamp = reasoner.when_did_we_learn(subject, predicate, object)
|
||||
|
||||
if timestamp:
|
||||
logger.info(f"Found first knowledge of {subject} at {timestamp}")
|
||||
return {
|
||||
"success": True,
|
||||
"subject": subject,
|
||||
"predicate": predicate,
|
||||
"object": object,
|
||||
"first_known": timestamp
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"success": True,
|
||||
"subject": subject,
|
||||
"predicate": predicate,
|
||||
"object": object,
|
||||
"first_known": None,
|
||||
"message": "No knowledge found for this subject"
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to query when we learned: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def how_has_it_changed(
|
||||
subject: str,
|
||||
since_time: str
|
||||
) -> Dict[str, Any]:
|
||||
"""Query how something has changed since a specific time.
|
||||
|
||||
Args:
|
||||
subject: The entity to analyze
|
||||
since_time: The starting time (ISO 8601)
|
||||
|
||||
Returns:
|
||||
Dictionary containing the list of changes
|
||||
|
||||
Example:
|
||||
>>> how_has_it_changed("codebase", "2026-03-01T00:00:00")
|
||||
# Returns changes since the security audit
|
||||
"""
|
||||
try:
|
||||
reasoner = _get_reasoner()
|
||||
changes = reasoner.how_has_it_changed(subject, since_time)
|
||||
|
||||
logger.info(f"Found {len(changes)} changes for {subject} since {since_time}")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"subject": subject,
|
||||
"since_time": since_time,
|
||||
"change_count": len(changes),
|
||||
"changes": [
|
||||
{
|
||||
"change_type": c.change_type.value,
|
||||
"predicate": c.predicate,
|
||||
"old_value": c.old_value,
|
||||
"new_value": c.new_value,
|
||||
"timestamp": c.timestamp,
|
||||
"version": c.version
|
||||
}
|
||||
for c in changes
|
||||
]
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to query changes: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def query_with_temporal_operator(
|
||||
operator: str,
|
||||
timestamp: str,
|
||||
subject: Optional[str] = None,
|
||||
predicate: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Query using temporal operators (BEFORE, AFTER, DURING, OVERLAPS).
|
||||
|
||||
Args:
|
||||
operator: Temporal operator (BEFORE, AFTER, DURING, OVERLAPS, AT)
|
||||
timestamp: Reference timestamp (ISO 8601)
|
||||
subject: Optional subject filter
|
||||
predicate: Optional predicate filter
|
||||
|
||||
Returns:
|
||||
Dictionary containing matching facts
|
||||
|
||||
Example:
|
||||
>>> query_with_temporal_operator("BEFORE", "2026-04-01T00:00:00", subject="Timmy")
|
||||
# Returns facts about Timmy before April 2026
|
||||
"""
|
||||
try:
|
||||
store = _get_store()
|
||||
|
||||
# Map string to enum
|
||||
op_map = {
|
||||
"BEFORE": TemporalOperator.BEFORE,
|
||||
"AFTER": TemporalOperator.AFTER,
|
||||
"DURING": TemporalOperator.DURING,
|
||||
"OVERLAPS": TemporalOperator.OVERLAPS,
|
||||
"AT": TemporalOperator.AT
|
||||
}
|
||||
|
||||
if operator.upper() not in op_map:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Invalid operator: {operator}. Use BEFORE, AFTER, DURING, OVERLAPS, or AT"
|
||||
}
|
||||
|
||||
op = op_map[operator.upper()]
|
||||
facts = store.query_temporal(op, timestamp, subject, predicate)
|
||||
|
||||
logger.info(f"Queried with operator {operator}: {len(facts)} facts")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"operator": operator,
|
||||
"timestamp": timestamp,
|
||||
"subject": subject,
|
||||
"predicate": predicate,
|
||||
"fact_count": len(facts),
|
||||
"facts": [
|
||||
{
|
||||
"subject": f.subject,
|
||||
"predicate": f.predicate,
|
||||
"object": f.object,
|
||||
"valid_from": f.valid_from,
|
||||
"valid_until": f.valid_until,
|
||||
"version": f.version
|
||||
}
|
||||
for f in facts
|
||||
]
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to query with temporal operator: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
def get_worldview_at_time(
|
||||
timestamp: str,
|
||||
subjects: Optional[List[str]] = None
|
||||
) -> Dict[str, Any]:
|
||||
"""Get Timmy's complete worldview at a specific point in time.
|
||||
|
||||
Args:
|
||||
timestamp: The point in time (ISO 8601)
|
||||
subjects: Optional list of subjects to include. If None, includes all.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping subjects to their facts at that time
|
||||
|
||||
Example:
|
||||
>>> get_worldview_at_time("2026-03-01T00:00:00", ["Timmy", "Hermes"])
|
||||
"""
|
||||
try:
|
||||
reasoner = _get_reasoner()
|
||||
worldview = reasoner.get_worldview_at_time(timestamp, subjects)
|
||||
|
||||
logger.info(f"Retrieved worldview at {timestamp}: {len(worldview)} entities")
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"timestamp": timestamp,
|
||||
"entity_count": len(worldview),
|
||||
"worldview": worldview
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get worldview: {e}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
|
||||
# Convenience function for natural language queries
|
||||
def ask_temporal_question(question: str, **kwargs) -> Dict[str, Any]:
|
||||
"""Parse and answer a temporal question.
|
||||
|
||||
This is a higher-level interface that can parse simple temporal questions
|
||||
and route them to the appropriate function.
|
||||
|
||||
Args:
|
||||
question: Natural language temporal question
|
||||
**kwargs: Additional context parameters
|
||||
|
||||
Returns:
|
||||
Dictionary containing the answer
|
||||
|
||||
Example:
|
||||
>>> ask_temporal_question("What was Timmy's view on sovereignty before March 2026?")
|
||||
"""
|
||||
question_lower = question.lower()
|
||||
|
||||
# Simple pattern matching for common question types
|
||||
if "what did we believe" in question_lower or "what was" in question_lower:
|
||||
if "before" in question_lower:
|
||||
# Extract subject and time
|
||||
subject = kwargs.get("subject")
|
||||
before_time = kwargs.get("before_time")
|
||||
if subject and before_time:
|
||||
return query_historical_state(subject, before_time)
|
||||
|
||||
elif "when did we first learn" in question_lower or "when did we learn" in question_lower:
|
||||
subject = kwargs.get("subject")
|
||||
predicate = kwargs.get("predicate")
|
||||
if subject:
|
||||
return when_did_we_learn(subject, predicate)
|
||||
|
||||
elif "how has" in question_lower and "changed" in question_lower:
|
||||
subject = kwargs.get("subject")
|
||||
since_time = kwargs.get("since_time")
|
||||
if subject and since_time:
|
||||
return how_has_it_changed(subject, since_time)
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Could not parse temporal question. Use specific function calls instead.",
|
||||
"available_functions": [
|
||||
"store_fact_with_time",
|
||||
"query_historical_state",
|
||||
"get_fact_history",
|
||||
"generate_temporal_summary",
|
||||
"when_did_we_learn",
|
||||
"how_has_it_changed",
|
||||
"query_with_temporal_operator",
|
||||
"get_worldview_at_time"
|
||||
]
|
||||
}
|
||||
Reference in New Issue
Block a user