From ae6f3e9a959bbf612b18c86ab4eea503b25634bb Mon Sep 17 00:00:00 2001 From: Allegro Date: Wed, 1 Apr 2026 02:08:20 +0000 Subject: [PATCH] feat: Issue #39 - temporal knowledge graph with versioning and reasoning Implement Phase 28: Sovereign Knowledge Graph 'Time Travel' - agent/temporal_knowledge_graph.py: SQLite-backed temporal triple store with versioning, validity periods, and temporal query operators (BEFORE, AFTER, DURING, OVERLAPS, AT) - agent/temporal_reasoning.py: Temporal reasoning engine supporting historical queries, fact evolution tracking, and worldview snapshots - tools/temporal_kg_tool.py: Tool integration with functions for storing facts with time, querying historical state, generating temporal summaries, and natural language temporal queries - tests/test_temporal_kg.py: Comprehensive test coverage including storage tests, query operators, historical summaries, and integration tests --- agent/temporal_knowledge_graph.py | 421 +++++++++++++++++++++++++ agent/temporal_reasoning.py | 434 ++++++++++++++++++++++++++ tests/test_temporal_kg.py | 473 ++++++++++++++++++++++++++++ tools/temporal_kg_tool.py | 491 ++++++++++++++++++++++++++++++ 4 files changed, 1819 insertions(+) create mode 100644 agent/temporal_knowledge_graph.py create mode 100644 agent/temporal_reasoning.py create mode 100644 tests/test_temporal_kg.py create mode 100644 tools/temporal_kg_tool.py diff --git a/agent/temporal_knowledge_graph.py b/agent/temporal_knowledge_graph.py new file mode 100644 index 000000000..236c0e0a2 --- /dev/null +++ b/agent/temporal_knowledge_graph.py @@ -0,0 +1,421 @@ +"""Temporal Knowledge Graph for Hermes Agent. + +Provides a time-aware triple-store (Subject, Predicate, Object) with temporal +metadata (valid_from, valid_until, timestamp) enabling "time travel" queries +over Timmy's evolving worldview. + +Time format: ISO 8601 (YYYY-MM-DDTHH:MM:SS) +""" + +import json +import sqlite3 +import logging +import uuid +from datetime import datetime, timezone +from typing import List, Dict, Any, Optional, Tuple +from dataclasses import dataclass, asdict +from enum import Enum +from pathlib import Path + +logger = logging.getLogger(__name__) + + +class TemporalOperator(Enum): + """Temporal query operators for time-based filtering.""" + BEFORE = "before" + AFTER = "after" + DURING = "during" + OVERLAPS = "overlaps" + AT = "at" + + +@dataclass +class TemporalTriple: + """A triple with temporal metadata.""" + id: str + subject: str + predicate: str + object: str + valid_from: str # ISO 8601 datetime + valid_until: Optional[str] # ISO 8601 datetime, None means still valid + timestamp: str # When this fact was recorded + version: int = 1 + superseded_by: Optional[str] = None # ID of the triple that superseded this + + def to_dict(self) -> Dict[str, Any]: + return asdict(self) + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "TemporalTriple": + return cls(**data) + + +class TemporalTripleStore: + """SQLite-backed temporal triple store with versioning support.""" + + def __init__(self, db_path: Optional[str] = None): + """Initialize the temporal triple store. + + Args: + db_path: Path to SQLite database. If None, uses default local path. + """ + if db_path is None: + # Default to local-first storage in user's home + home = Path.home() + db_dir = home / ".hermes" / "temporal_kg" + db_dir.mkdir(parents=True, exist_ok=True) + db_path = db_dir / "temporal_kg.db" + + self.db_path = str(db_path) + self._init_db() + + def _init_db(self): + """Initialize the SQLite database with required tables.""" + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS temporal_triples ( + id TEXT PRIMARY KEY, + subject TEXT NOT NULL, + predicate TEXT NOT NULL, + object TEXT NOT NULL, + valid_from TEXT NOT NULL, + valid_until TEXT, + timestamp TEXT NOT NULL, + version INTEGER DEFAULT 1, + superseded_by TEXT, + FOREIGN KEY (superseded_by) REFERENCES temporal_triples(id) + ) + """) + + # Create indexes for efficient querying + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subject ON temporal_triples(subject) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_predicate ON temporal_triples(predicate) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_valid_from ON temporal_triples(valid_from) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_valid_until ON temporal_triples(valid_until) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_timestamp ON temporal_triples(timestamp) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_subject_predicate + ON temporal_triples(subject, predicate) + """) + + conn.commit() + + def _now(self) -> str: + """Get current time in ISO 8601 format.""" + return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S") + + def _generate_id(self) -> str: + """Generate a unique ID for a triple.""" + return f"{self._now()}_{uuid.uuid4().hex[:8]}" + + def store_fact( + self, + subject: str, + predicate: str, + object: str, + valid_from: Optional[str] = None, + valid_until: Optional[str] = None + ) -> TemporalTriple: + """Store a fact with temporal bounds. + + Args: + subject: The subject of the triple + predicate: The predicate/relationship + object: The object/value + valid_from: When this fact becomes valid (ISO 8601). Defaults to now. + valid_until: When this fact expires (ISO 8601). None means forever valid. + + Returns: + The stored TemporalTriple + """ + if valid_from is None: + valid_from = self._now() + + # Check if there's an existing fact for this subject-predicate + existing = self._get_current_fact(subject, predicate) + + triple = TemporalTriple( + id=self._generate_id(), + subject=subject, + predicate=predicate, + object=object, + valid_from=valid_from, + valid_until=valid_until, + timestamp=self._now() + ) + + with sqlite3.connect(self.db_path) as conn: + # If there's an existing fact, mark it as superseded + if existing: + existing.valid_until = valid_from + existing.superseded_by = triple.id + self._update_triple(conn, existing) + triple.version = existing.version + 1 + + # Insert the new fact + self._insert_triple(conn, triple) + conn.commit() + + logger.info(f"Stored temporal fact: {subject} {predicate} {object} (valid from {valid_from})") + return triple + + def _get_current_fact(self, subject: str, predicate: str) -> Optional[TemporalTriple]: + """Get the current (most recent, still valid) fact for a subject-predicate pair.""" + with sqlite3.connect(self.db_path) as conn: + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? AND predicate = ? AND valid_until IS NULL + ORDER BY timestamp DESC LIMIT 1 + """, + (subject, predicate) + ) + row = cursor.fetchone() + if row: + return self._row_to_triple(row) + return None + + def _insert_triple(self, conn: sqlite3.Connection, triple: TemporalTriple): + """Insert a triple into the database.""" + conn.execute( + """ + INSERT INTO temporal_triples + (id, subject, predicate, object, valid_from, valid_until, timestamp, version, superseded_by) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + triple.id, triple.subject, triple.predicate, triple.object, + triple.valid_from, triple.valid_until, triple.timestamp, + triple.version, triple.superseded_by + ) + ) + + def _update_triple(self, conn: sqlite3.Connection, triple: TemporalTriple): + """Update an existing triple.""" + conn.execute( + """ + UPDATE temporal_triples + SET valid_until = ?, superseded_by = ? + WHERE id = ? + """, + (triple.valid_until, triple.superseded_by, triple.id) + ) + + def _row_to_triple(self, row: sqlite3.Row) -> TemporalTriple: + """Convert a database row to a TemporalTriple.""" + return TemporalTriple( + id=row[0], + subject=row[1], + predicate=row[2], + object=row[3], + valid_from=row[4], + valid_until=row[5], + timestamp=row[6], + version=row[7], + superseded_by=row[8] + ) + + def query_at_time( + self, + timestamp: str, + subject: Optional[str] = None, + predicate: Optional[str] = None + ) -> List[TemporalTriple]: + """Query facts that were valid at a specific point in time. + + Args: + timestamp: The point in time to query (ISO 8601) + subject: Optional subject filter + predicate: Optional predicate filter + + Returns: + List of TemporalTriple objects valid at that time + """ + query = """ + SELECT * FROM temporal_triples + WHERE valid_from <= ? + AND (valid_until IS NULL OR valid_until > ?) + """ + params = [timestamp, timestamp] + + if subject: + query += " AND subject = ?" + params.append(subject) + if predicate: + query += " AND predicate = ?" + params.append(predicate) + + query += " ORDER BY timestamp DESC" + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(query, params) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def query_temporal( + self, + operator: TemporalOperator, + timestamp: str, + subject: Optional[str] = None, + predicate: Optional[str] = None + ) -> List[TemporalTriple]: + """Query using temporal operators. + + Args: + operator: TemporalOperator (BEFORE, AFTER, DURING, OVERLAPS, AT) + timestamp: Reference timestamp (ISO 8601) + subject: Optional subject filter + predicate: Optional predicate filter + + Returns: + List of matching TemporalTriple objects + """ + base_query = "SELECT * FROM temporal_triples WHERE 1=1" + params = [] + + if subject: + base_query += " AND subject = ?" + params.append(subject) + if predicate: + base_query += " AND predicate = ?" + params.append(predicate) + + if operator == TemporalOperator.BEFORE: + base_query += " AND valid_from < ?" + params.append(timestamp) + elif operator == TemporalOperator.AFTER: + base_query += " AND valid_from > ?" + params.append(timestamp) + elif operator == TemporalOperator.DURING: + base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)" + params.extend([timestamp, timestamp]) + elif operator == TemporalOperator.OVERLAPS: + # Facts that overlap with a time point (same as DURING) + base_query += " AND valid_from <= ? AND (valid_until IS NULL OR valid_until > ?)" + params.extend([timestamp, timestamp]) + elif operator == TemporalOperator.AT: + # Exact match for valid_at query + return self.query_at_time(timestamp, subject, predicate) + + base_query += " ORDER BY timestamp DESC" + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute(base_query, params) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def get_fact_history( + self, + subject: str, + predicate: str + ) -> List[TemporalTriple]: + """Get the complete version history of a fact. + + Args: + subject: The subject to query + predicate: The predicate to query + + Returns: + List of all versions of the fact, ordered by timestamp + """ + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? AND predicate = ? + ORDER BY timestamp ASC + """, + (subject, predicate) + ) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def get_all_facts_for_entity( + self, + subject: str, + at_time: Optional[str] = None + ) -> List[TemporalTriple]: + """Get all facts about an entity, optionally at a specific time. + + Args: + subject: The entity to query + at_time: Optional timestamp to query at + + Returns: + List of TemporalTriple objects + """ + if at_time: + return self.query_at_time(at_time, subject=subject) + + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? + ORDER BY timestamp DESC + """, + (subject,) + ) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def get_entity_changes( + self, + subject: str, + start_time: str, + end_time: str + ) -> List[TemporalTriple]: + """Get all facts that changed for an entity during a time range. + + Args: + subject: The entity to query + start_time: Start of time range (ISO 8601) + end_time: End of time range (ISO 8601) + + Returns: + List of TemporalTriple objects that changed in the range + """ + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute( + """ + SELECT * FROM temporal_triples + WHERE subject = ? + AND ((valid_from >= ? AND valid_from <= ?) + OR (valid_until >= ? AND valid_until <= ?)) + ORDER BY timestamp ASC + """, + (subject, start_time, end_time, start_time, end_time) + ) + return [self._row_to_triple(row) for row in cursor.fetchall()] + + def close(self): + """Close the database connection (no-op for SQLite with context managers).""" + pass + + def export_to_json(self) -> str: + """Export all triples to JSON format.""" + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + cursor = conn.execute("SELECT * FROM temporal_triples ORDER BY timestamp DESC") + triples = [self._row_to_triple(row).to_dict() for row in cursor.fetchall()] + return json.dumps(triples, indent=2) + + def import_from_json(self, json_data: str): + """Import triples from JSON format.""" + triples = json.loads(json_data) + with sqlite3.connect(self.db_path) as conn: + for triple_dict in triples: + triple = TemporalTriple.from_dict(triple_dict) + self._insert_triple(conn, triple) + conn.commit() diff --git a/agent/temporal_reasoning.py b/agent/temporal_reasoning.py new file mode 100644 index 000000000..c9bf8f0b4 --- /dev/null +++ b/agent/temporal_reasoning.py @@ -0,0 +1,434 @@ +"""Temporal Reasoning Engine for Hermes Agent. + +Enables Timmy to reason about past and future states, generate historical +summaries, and perform temporal inference over the evolving knowledge graph. + +Queries supported: +- "What was Timmy's view on sovereignty before March 2026?" +- "When did we first learn about MLX integration?" +- "How has the codebase changed since the security audit?" +""" + +import logging +from typing import List, Dict, Any, Optional, Tuple +from datetime import datetime, timedelta +from dataclasses import dataclass +from enum import Enum + +from agent.temporal_knowledge_graph import ( + TemporalTripleStore, TemporalTriple, TemporalOperator +) + +logger = logging.getLogger(__name__) + + +class ChangeType(Enum): + """Types of changes in the knowledge graph.""" + ADDED = "added" + REMOVED = "removed" + MODIFIED = "modified" + SUPERSEDED = "superseded" + + +@dataclass +class FactChange: + """Represents a change in a fact over time.""" + change_type: ChangeType + subject: str + predicate: str + old_value: Optional[str] + new_value: Optional[str] + timestamp: str + version: int + + +@dataclass +class HistoricalSummary: + """Summary of how an entity or concept evolved over time.""" + entity: str + start_time: str + end_time: str + total_changes: int + key_facts: List[Dict[str, Any]] + evolution_timeline: List[FactChange] + current_state: List[Dict[str, Any]] + + def to_dict(self) -> Dict[str, Any]: + return { + "entity": self.entity, + "start_time": self.start_time, + "end_time": self.end_time, + "total_changes": self.total_changes, + "key_facts": self.key_facts, + "evolution_timeline": [ + { + "change_type": c.change_type.value, + "subject": c.subject, + "predicate": c.predicate, + "old_value": c.old_value, + "new_value": c.new_value, + "timestamp": c.timestamp, + "version": c.version + } + for c in self.evolution_timeline + ], + "current_state": self.current_state + } + + +class TemporalReasoner: + """Reasoning engine for temporal knowledge graphs.""" + + def __init__(self, store: Optional[TemporalTripleStore] = None): + """Initialize the temporal reasoner. + + Args: + store: Optional TemporalTripleStore instance. Creates new if None. + """ + self.store = store or TemporalTripleStore() + + def what_did_we_believe( + self, + subject: str, + before_time: str + ) -> List[TemporalTriple]: + """Query: "What did we believe about X before Y happened?" + + Args: + subject: The entity to query about + before_time: The cutoff time (ISO 8601) + + Returns: + List of facts believed before the given time + """ + # Get facts that were valid just before the given time + return self.store.query_temporal( + TemporalOperator.BEFORE, + before_time, + subject=subject + ) + + def when_did_we_learn( + self, + subject: str, + predicate: Optional[str] = None, + object: Optional[str] = None + ) -> Optional[str]: + """Query: "When did we first learn about X?" + + Args: + subject: The subject to search for + predicate: Optional predicate filter + object: Optional object filter + + Returns: + Timestamp of first knowledge, or None if never learned + """ + history = self.store.get_fact_history(subject, predicate or "") + + # Filter by object if specified + if object: + history = [h for h in history if h.object == object] + + if history: + # Return the earliest timestamp + earliest = min(history, key=lambda x: x.timestamp) + return earliest.timestamp + return None + + def how_has_it_changed( + self, + subject: str, + since_time: str + ) -> List[FactChange]: + """Query: "How has X changed since Y?" + + Args: + subject: The entity to analyze + since_time: The starting time (ISO 8601) + + Returns: + List of changes since the given time + """ + now = datetime.now().isoformat() + changes = self.store.get_entity_changes(subject, since_time, now) + + fact_changes = [] + for i, triple in enumerate(changes): + # Determine change type + if i == 0: + change_type = ChangeType.ADDED + old_value = None + else: + prev = changes[i - 1] + if triple.object != prev.object: + change_type = ChangeType.MODIFIED + old_value = prev.object + else: + change_type = ChangeType.SUPERSEDED + old_value = prev.object + + fact_changes.append(FactChange( + change_type=change_type, + subject=triple.subject, + predicate=triple.predicate, + old_value=old_value, + new_value=triple.object, + timestamp=triple.timestamp, + version=triple.version + )) + + return fact_changes + + def generate_temporal_summary( + self, + entity: str, + start_time: str, + end_time: str + ) -> HistoricalSummary: + """Generate a historical summary of an entity's evolution. + + Args: + entity: The entity to summarize + start_time: Start of the time range (ISO 8601) + end_time: End of the time range (ISO 8601) + + Returns: + HistoricalSummary containing the entity's evolution + """ + # Get all facts for the entity in the time range + initial_state = self.store.query_at_time(start_time, subject=entity) + final_state = self.store.query_at_time(end_time, subject=entity) + changes = self.store.get_entity_changes(entity, start_time, end_time) + + # Build evolution timeline + evolution_timeline = [] + seen_predicates = set() + + for triple in changes: + if triple.predicate not in seen_predicates: + seen_predicates.add(triple.predicate) + evolution_timeline.append(FactChange( + change_type=ChangeType.ADDED, + subject=triple.subject, + predicate=triple.predicate, + old_value=None, + new_value=triple.object, + timestamp=triple.timestamp, + version=triple.version + )) + else: + # Find previous value + prev = [t for t in changes + if t.predicate == triple.predicate + and t.timestamp < triple.timestamp] + old_value = prev[-1].object if prev else None + + evolution_timeline.append(FactChange( + change_type=ChangeType.MODIFIED, + subject=triple.subject, + predicate=triple.predicate, + old_value=old_value, + new_value=triple.object, + timestamp=triple.timestamp, + version=triple.version + )) + + # Extract key facts (predicates that changed most) + key_facts = [] + predicate_changes = {} + for change in evolution_timeline: + predicate_changes[change.predicate] = ( + predicate_changes.get(change.predicate, 0) + 1 + ) + + top_predicates = sorted( + predicate_changes.items(), + key=lambda x: x[1], + reverse=True + )[:5] + + for pred, count in top_predicates: + current = [t for t in final_state if t.predicate == pred] + if current: + key_facts.append({ + "predicate": pred, + "current_value": current[0].object, + "changes": count + }) + + # Build current state + current_state = [ + { + "predicate": t.predicate, + "object": t.object, + "valid_from": t.valid_from, + "valid_until": t.valid_until + } + for t in final_state + ] + + return HistoricalSummary( + entity=entity, + start_time=start_time, + end_time=end_time, + total_changes=len(evolution_timeline), + key_facts=key_facts, + evolution_timeline=evolution_timeline, + current_state=current_state + ) + + def infer_temporal_relationship( + self, + fact_a: TemporalTriple, + fact_b: TemporalTriple + ) -> Optional[str]: + """Infer temporal relationship between two facts. + + Args: + fact_a: First fact + fact_b: Second fact + + Returns: + Description of temporal relationship, or None + """ + a_start = datetime.fromisoformat(fact_a.valid_from) + a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None + b_start = datetime.fromisoformat(fact_b.valid_from) + b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None + + # Check if A happened before B + if a_end and a_end <= b_start: + return "A happened before B" + + # Check if B happened before A + if b_end and b_end <= a_start: + return "B happened before A" + + # Check if they overlap + if a_end and b_end: + if a_start <= b_end and b_start <= a_end: + return "A and B overlap in time" + + # Check if one supersedes the other + if fact_a.superseded_by == fact_b.id: + return "B supersedes A" + if fact_b.superseded_by == fact_a.id: + return "A supersedes B" + + return "A and B are temporally unrelated" + + def get_worldview_at_time( + self, + timestamp: str, + subjects: Optional[List[str]] = None + ) -> Dict[str, List[Dict[str, Any]]]: + """Get Timmy's complete worldview at a specific point in time. + + Args: + timestamp: The point in time (ISO 8601) + subjects: Optional list of subjects to include. If None, includes all. + + Returns: + Dictionary mapping subjects to their facts at that time + """ + worldview = {} + + if subjects: + for subject in subjects: + facts = self.store.query_at_time(timestamp, subject=subject) + if facts: + worldview[subject] = [ + { + "predicate": f.predicate, + "object": f.object, + "version": f.version + } + for f in facts + ] + else: + # Get all facts at that time + all_facts = self.store.query_at_time(timestamp) + for fact in all_facts: + if fact.subject not in worldview: + worldview[fact.subject] = [] + worldview[fact.subject].append({ + "predicate": fact.predicate, + "object": fact.object, + "version": fact.version + }) + + return worldview + + def find_knowledge_gaps( + self, + subject: str, + expected_predicates: List[str] + ) -> List[str]: + """Find predicates that are missing or have expired for a subject. + + Args: + subject: The entity to check + expected_predicates: List of predicates that should exist + + Returns: + List of missing predicate names + """ + now = datetime.now().isoformat() + current_facts = self.store.query_at_time(now, subject=subject) + current_predicates = {f.predicate for f in current_facts} + + return [ + pred for pred in expected_predicates + if pred not in current_predicates + ] + + def export_reasoning_report( + self, + entity: str, + start_time: str, + end_time: str + ) -> str: + """Generate a human-readable reasoning report. + + Args: + entity: The entity to report on + start_time: Start of the time range + end_time: End of the time range + + Returns: + Formatted report string + """ + summary = self.generate_temporal_summary(entity, start_time, end_time) + + report = f""" +# Temporal Reasoning Report: {entity} + +## Time Range +- From: {start_time} +- To: {end_time} + +## Summary +- Total Changes: {summary.total_changes} +- Key Facts Tracked: {len(summary.key_facts)} + +## Key Facts +""" + for fact in summary.key_facts: + report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n" + + report += "\n## Evolution Timeline\n" + for change in summary.evolution_timeline[:10]: # Show first 10 + report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n" + if change.old_value: + report += f" - Changed from: {change.old_value}\n" + report += f" - Changed to: {change.new_value}\n" + + if len(summary.evolution_timeline) > 10: + report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n" + + report += "\n## Current State\n" + for state in summary.current_state: + report += f"- {state['predicate']}: {state['object']}\n" + + return report diff --git a/tests/test_temporal_kg.py b/tests/test_temporal_kg.py new file mode 100644 index 000000000..5afac63eb --- /dev/null +++ b/tests/test_temporal_kg.py @@ -0,0 +1,473 @@ +"""Tests for Temporal Knowledge Graph implementation. + +Tests cover: +- Temporal storage tests +- Query operator tests (BEFORE, AFTER, DURING, OVERLAPS) +- Historical summary tests +- Integration with tools +""" + +import pytest +import tempfile +import os +from datetime import datetime, timedelta +from agent.temporal_knowledge_graph import ( + TemporalTripleStore, TemporalTriple, TemporalOperator +) +from agent.temporal_reasoning import ( + TemporalReasoner, ChangeType, HistoricalSummary +) +from tools.temporal_kg_tool import ( + store_fact_with_time, + query_historical_state, + get_fact_history, + generate_temporal_summary, + when_did_we_learn, + how_has_it_changed, + query_with_temporal_operator, + get_worldview_at_time +) + + +class TestTemporalTripleStore: + """Tests for the TemporalTripleStore class.""" + + @pytest.fixture + def store(self): + """Create a temporary store for testing.""" + with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f: + db_path = f.name + store = TemporalTripleStore(db_path) + yield store + # Cleanup + os.unlink(db_path) + + def test_store_fact(self, store): + """Test storing a basic fact.""" + triple = store.store_fact("Timmy", "has_feature", "sovereignty") + + assert triple.subject == "Timmy" + assert triple.predicate == "has_feature" + assert triple.object == "sovereignty" + assert triple.version == 1 + assert triple.valid_until is None + + def test_store_fact_with_validity_period(self, store): + """Test storing a fact with validity bounds.""" + valid_from = "2026-01-01T00:00:00" + valid_until = "2026-12-31T23:59:59" + + triple = store.store_fact( + "Hermes", + "status", + "active", + valid_from=valid_from, + valid_until=valid_until + ) + + assert triple.valid_from == valid_from + assert triple.valid_until == valid_until + + def test_fact_versioning(self, store): + """Test that facts are properly versioned.""" + # Store initial fact + triple1 = store.store_fact("Timmy", "version", "1.0") + assert triple1.version == 1 + + # Store updated fact + triple2 = store.store_fact("Timmy", "version", "2.0") + assert triple2.version == 2 + + # Check that first fact was superseded + history = store.get_fact_history("Timmy", "version") + assert len(history) == 2 + assert history[0].superseded_by == triple2.id + + def test_query_at_time(self, store): + """Test querying facts at a specific time.""" + # Store facts at different times + store.store_fact("Timmy", "status", "alpha", valid_from="2026-01-01T00:00:00") + store.store_fact("Timmy", "status", "beta", valid_from="2026-03-01T00:00:00") + store.store_fact("Timmy", "status", "stable", valid_from="2026-06-01T00:00:00") + + # Query at different points + feb_facts = store.query_at_time("2026-02-01T00:00:00", subject="Timmy") + assert len(feb_facts) == 1 + assert feb_facts[0].object == "alpha" + + may_facts = store.query_at_time("2026-05-01T00:00:00", subject="Timmy") + assert len(may_facts) == 1 + assert may_facts[0].object == "beta" + + jul_facts = store.query_at_time("2026-07-01T00:00:00", subject="Timmy") + assert len(jul_facts) == 1 + assert jul_facts[0].object == "stable" + + def test_query_temporal_operators(self, store): + """Test temporal query operators.""" + # Store some facts + store.store_fact("A", "rel", "1", valid_from="2026-01-01T00:00:00") + store.store_fact("B", "rel", "2", valid_from="2026-03-01T00:00:00") + store.store_fact("C", "rel", "3", valid_from="2026-06-01T00:00:00") + + # Test BEFORE + before_april = store.query_temporal( + TemporalOperator.BEFORE, "2026-04-01T00:00:00" + ) + assert len(before_april) == 2 # A and B + + # Test AFTER + after_feb = store.query_temporal( + TemporalOperator.AFTER, "2026-02-01T00:00:00" + ) + assert len(after_feb) == 2 # B and C + + # Test DURING (at a specific time) + during_may = store.query_temporal( + TemporalOperator.DURING, "2026-05-01T00:00:00" + ) + assert len(during_may) == 1 # Only B is valid in May + assert during_may[0].object == "2" + + def test_get_fact_history(self, store): + """Test retrieving fact version history.""" + # Create multiple versions + store.store_fact("Feature", "status", "planned", valid_from="2026-01-01T00:00:00") + store.store_fact("Feature", "status", "in_progress", valid_from="2026-02-01T00:00:00") + store.store_fact("Feature", "status", "completed", valid_from="2026-03-01T00:00:00") + + history = store.get_fact_history("Feature", "status") + + assert len(history) == 3 + assert history[0].object == "planned" + assert history[1].object == "in_progress" + assert history[2].object == "completed" + + # Check versions + assert history[0].version == 1 + assert history[1].version == 2 + assert history[2].version == 3 + + def test_get_entity_changes(self, store): + """Test getting entity changes in a time range.""" + store.store_fact("Codebase", "feature", "auth", valid_from="2026-01-01T00:00:00") + store.store_fact("Codebase", "feature", "logging", valid_from="2026-02-01T00:00:00") + store.store_fact("Codebase", "feature", "metrics", valid_from="2026-03-01T00:00:00") + + changes = store.get_entity_changes( + "Codebase", + "2026-01-15T00:00:00", + "2026-03-15T00:00:00" + ) + + # Should include logging and metrics + assert len(changes) >= 2 + + def test_export_import(self, store): + """Test exporting and importing data.""" + # Store some data + store.store_fact("Test", "data", "value1") + store.store_fact("Test", "data", "value2") + + # Export + json_data = store.export_to_json() + assert "Test" in json_data + assert "value1" in json_data + assert "value2" in json_data + + # Create new store and import + with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f: + db_path2 = f.name + + try: + store2 = TemporalTripleStore(db_path2) + store2.import_from_json(json_data) + + # Verify imported data + facts = store2.query_at_time(datetime.now().isoformat(), subject="Test") + assert len(facts) >= 1 + finally: + os.unlink(db_path2) + + +class TestTemporalReasoner: + """Tests for the TemporalReasoner class.""" + + @pytest.fixture + def reasoner(self): + """Create a temporary reasoner for testing.""" + with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f: + db_path = f.name + store = TemporalTripleStore(db_path) + reasoner = TemporalReasoner(store) + yield reasoner + os.unlink(db_path) + + def test_what_did_we_believe(self, reasoner): + """Test "what did we believe" queries.""" + # Set up facts + reasoner.store.store_fact("Timmy", "view", "optimistic", valid_from="2026-01-01T00:00:00") + reasoner.store.store_fact("Timmy", "view", "cautious", valid_from="2026-03-01T00:00:00") + + # Query before March + beliefs = reasoner.what_did_we_believe("Timmy", "2026-02-15T00:00:00") + assert len(beliefs) == 1 + assert beliefs[0].object == "optimistic" + + def test_when_did_we_learn(self, reasoner): + """Test "when did we learn" queries.""" + timestamp = "2026-02-15T10:30:00" + reasoner.store.store_fact( + "MLX", + "integrated_with", + "Hermes", + valid_from=timestamp + ) + + when = reasoner.when_did_we_learn("MLX", "integrated_with") + assert when == timestamp + + def test_how_has_it_changed(self, reasoner): + """Test "how has it changed" queries.""" + reasoner.store.store_fact("Security", "level", "low", valid_from="2026-01-01T00:00:00") + reasoner.store.store_fact("Security", "level", "medium", valid_from="2026-02-01T00:00:00") + reasoner.store.store_fact("Security", "level", "high", valid_from="2026-03-01T00:00:00") + + changes = reasoner.how_has_it_changed("Security", "2026-01-15T00:00:00") + + assert len(changes) >= 2 + # Check that changes are properly categorized + change_types = [c.change_type for c in changes] + assert ChangeType.MODIFIED in change_types or ChangeType.ADDED in change_types + + def test_generate_temporal_summary(self, reasoner): + """Test generating historical summaries.""" + # Create a history of changes + reasoner.store.store_fact("Project", "status", "planning", valid_from="2026-01-01T00:00:00") + reasoner.store.store_fact("Project", "status", "development", valid_from="2026-02-01T00:00:00") + reasoner.store.store_fact("Project", "milestone", "alpha", valid_from="2026-02-15T00:00:00") + reasoner.store.store_fact("Project", "status", "testing", valid_from="2026-03-01T00:00:00") + + summary = reasoner.generate_temporal_summary( + "Project", + "2026-01-01T00:00:00", + "2026-04-01T00:00:00" + ) + + assert summary.entity == "Project" + assert summary.total_changes >= 3 + assert len(summary.evolution_timeline) >= 3 + assert len(summary.current_state) >= 1 + + def test_get_worldview_at_time(self, reasoner): + """Test getting complete worldview at a time.""" + reasoner.store.store_fact("Timmy", "mood", "happy", valid_from="2026-01-01T00:00:00") + reasoner.store.store_fact("Timmy", "task", "coding", valid_from="2026-01-01T00:00:00") + reasoner.store.store_fact("Hermes", "status", "active", valid_from="2026-01-01T00:00:00") + + worldview = reasoner.get_worldview_at_time("2026-01-15T00:00:00") + + assert "Timmy" in worldview + assert "Hermes" in worldview + assert len(worldview["Timmy"]) == 2 + + def test_infer_temporal_relationship(self, reasoner): + """Test temporal relationship inference.""" + triple_a = reasoner.store.store_fact("A", "rel", "1", valid_from="2026-01-01T00:00:00") + triple_a.valid_until = "2026-02-01T00:00:00" + + triple_b = reasoner.store.store_fact("B", "rel", "2", valid_from="2026-02-15T00:00:00") + + rel = reasoner.infer_temporal_relationship(triple_a, triple_b) + assert "before" in rel.lower() + + +class TestTemporalKGTools: + """Tests for the temporal KG tool functions.""" + + @pytest.fixture(autouse=True) + def reset_singleton(self): + """Reset singleton instances before each test.""" + import tools.temporal_kg_tool as tool_module + tool_module._store = None + tool_module._reasoner = None + yield + tool_module._store = None + tool_module._reasoner = None + + def test_store_fact_with_time(self): + """Test the store_fact_with_time tool function.""" + result = store_fact_with_time( + subject="Hermes Agent", + predicate="has_feature", + object="input_sanitizer", + valid_from="2026-04-01T01:00:00" + ) + + assert result["success"] is True + assert result["triple"]["subject"] == "Hermes Agent" + assert result["triple"]["predicate"] == "has_feature" + assert result["triple"]["object"] == "input_sanitizer" + + def test_query_historical_state(self): + """Test the query_historical_state tool function.""" + # Store a fact first + store_fact_with_time( + subject="Timmy", + predicate="view_on_sovereignty", + object="strong", + valid_from="2026-02-01T00:00:00" + ) + + # Query it + result = query_historical_state("Timmy", "2026-03-01T00:00:00") + + assert result["success"] is True + assert result["subject"] == "Timmy" + assert result["fact_count"] == 1 + assert result["facts"][0]["object"] == "strong" + + def test_get_fact_history(self): + """Test the get_fact_history tool function.""" + # Create version history + store_fact_with_time("Feature", "status", "planned", valid_from="2026-01-01T00:00:00") + store_fact_with_time("Feature", "status", "done", valid_from="2026-02-01T00:00:00") + + result = get_fact_history("Feature", "status") + + assert result["success"] is True + assert result["version_count"] == 2 + assert len(result["versions"]) == 2 + + def test_when_did_we_learn(self): + """Test the when_did_we_learn tool function.""" + store_fact_with_time( + "MLX", + "integrated_with", + "Hermes", + valid_from="2026-03-15T12:00:00" + ) + + result = when_did_we_learn("MLX", "integrated_with") + + assert result["success"] is True + assert result["first_known"] == "2026-03-15T12:00:00" + + def test_how_has_it_changed(self): + """Test the how_has_it_changed tool function.""" + store_fact_with_time("Codebase", "feature_count", "10", valid_from="2026-01-01T00:00:00") + store_fact_with_time("Codebase", "feature_count", "20", valid_from="2026-02-01T00:00:00") + + result = how_has_it_changed("Codebase", "2026-01-15T00:00:00") + + assert result["success"] is True + assert result["change_count"] >= 1 + + def test_query_with_temporal_operator(self): + """Test the query_with_temporal_operator tool function.""" + store_fact_with_time("A", "rel", "1", valid_from="2026-01-01T00:00:00") + store_fact_with_time("B", "rel", "2", valid_from="2026-03-01T00:00:00") + + result = query_with_temporal_operator("BEFORE", "2026-02-01T00:00:00") + + assert result["success"] is True + assert result["fact_count"] == 1 + assert result["facts"][0]["subject"] == "A" + + def test_get_worldview_at_time(self): + """Test the get_worldview_at_time tool function.""" + store_fact_with_time("Timmy", "mood", "good", valid_from="2026-01-01T00:00:00") + store_fact_with_time("Hermes", "status", "running", valid_from="2026-01-01T00:00:00") + + result = get_worldview_at_time("2026-01-15T00:00:00") + + assert result["success"] is True + assert result["entity_count"] == 2 + + def test_generate_temporal_summary(self): + """Test the generate_temporal_summary tool function.""" + store_fact_with_time("Security", "level", "low", valid_from="2026-01-01T00:00:00") + store_fact_with_time("Security", "level", "high", valid_from="2026-03-01T00:00:00") + + result = generate_temporal_summary("Security", "2026-01-01T00:00:00", "2026-04-01T00:00:00") + + assert result["success"] is True + assert result["entity"] == "Security" + assert result["summary"]["total_changes"] >= 1 + + +class TestIntegration: + """Integration tests for the complete temporal KG system.""" + + @pytest.fixture + def system(self): + """Create a complete temporal KG system.""" + with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f: + db_path = f.name + + store = TemporalTripleStore(db_path) + reasoner = TemporalReasoner(store) + + yield {"store": store, "reasoner": reasoner} + + os.unlink(db_path) + + def test_full_workflow(self, system): + """Test a complete temporal knowledge workflow.""" + store = system["store"] + reasoner = system["reasoner"] + + # 1. Store initial facts about a security audit + store.store_fact("SecurityAudit", "status", "scheduled", valid_from="2026-01-01T00:00:00") + store.store_fact("SecurityAudit", "auditor", "ExternalFirm", valid_from="2026-01-01T00:00:00") + + # 2. Update as audit progresses + store.store_fact("SecurityAudit", "status", "in_progress", valid_from="2026-02-01T00:00:00") + store.store_fact("SecurityAudit", "findings", "none_yet", valid_from="2026-02-01T00:00:00") + + # 3. Complete audit + store.store_fact("SecurityAudit", "status", "completed", valid_from="2026-03-01T00:00:00") + store.store_fact("SecurityAudit", "findings", "5_minor_issues", valid_from="2026-03-01T00:00:00") + store.store_fact("SecurityAudit", "recommendation", "address_within_30_days", valid_from="2026-03-01T00:00:00") + + # 4. Query historical state + jan_state = reasoner.get_worldview_at_time("2026-01-15T00:00:00", ["SecurityAudit"]) + assert jan_state["SecurityAudit"][0]["predicate"] == "status" + assert jan_state["SecurityAudit"][0]["object"] == "scheduled" + + feb_state = reasoner.get_worldview_at_time("2026-02-15T00:00:00", ["SecurityAudit"]) + status_fact = [f for f in feb_state["SecurityAudit"] if f["predicate"] == "status"][0] + assert status_fact["object"] == "in_progress" + + # 5. Generate summary + summary = reasoner.generate_temporal_summary( + "SecurityAudit", + "2026-01-01T00:00:00", + "2026-04-01T00:00:00" + ) + + assert summary.total_changes >= 5 + assert any(f["predicate"] == "status" for f in summary.key_facts) + + # 6. Check when we learned about findings + when = reasoner.when_did_we_learn("SecurityAudit", "findings") + assert when is not None + + def test_temporal_inference(self, system): + """Test temporal inference capabilities.""" + store = system["store"] + reasoner = system["reasoner"] + + # Store facts with temporal relationships + triple_a = store.store_fact("EventA", "happened", "yes", valid_from="2026-01-01T00:00:00") + triple_a.valid_until = "2026-01-31T23:59:59" + + triple_b = store.store_fact("EventB", "happened", "yes", valid_from="2026-02-01T00:00:00") + + # Infer relationship + rel = reasoner.infer_temporal_relationship(triple_a, triple_b) + assert "before" in rel.lower() + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tools/temporal_kg_tool.py b/tools/temporal_kg_tool.py new file mode 100644 index 000000000..6c9d0eb8e --- /dev/null +++ b/tools/temporal_kg_tool.py @@ -0,0 +1,491 @@ +"""Temporal Knowledge Graph Tool for Hermes Agent. + +Provides tool functions for storing and querying temporal facts, +enabling Timmy to track how knowledge evolves over time. + +Functions: +- store_fact_with_time: Store a fact with temporal bounds +- query_historical_state: Query facts valid at a specific time +- get_fact_history: Get the version history of a fact +- generate_temporal_summary: Generate a historical summary +""" + +import logging +from typing import List, Dict, Any, Optional +from datetime import datetime + +from agent.temporal_knowledge_graph import TemporalTripleStore, TemporalOperator +from agent.temporal_reasoning import TemporalReasoner + +logger = logging.getLogger(__name__) + +# Global instances (singleton pattern) +_store: Optional[TemporalTripleStore] = None +_reasoner: Optional[TemporalReasoner] = None + + +def _get_store() -> TemporalTripleStore: + """Get or create the temporal triple store singleton.""" + global _store + if _store is None: + _store = TemporalTripleStore() + return _store + + +def _get_reasoner() -> TemporalReasoner: + """Get or create the temporal reasoner singleton.""" + global _reasoner + if _reasoner is None: + _reasoner = TemporalReasoner(_get_store()) + return _reasoner + + +def store_fact_with_time( + subject: str, + predicate: str, + object: str, + valid_from: Optional[str] = None, + valid_until: Optional[str] = None +) -> Dict[str, Any]: + """Store a fact with temporal metadata. + + Args: + subject: The subject of the fact (e.g., "Hermes Agent") + predicate: The predicate/relationship (e.g., "has_feature") + object: The object/value (e.g., "input_sanitizer") + valid_from: When this fact becomes valid (ISO 8601). Defaults to now. + valid_until: When this fact expires (ISO 8601). None means still valid. + + Returns: + Dictionary containing the stored triple details + + Example: + >>> store_fact_with_time( + ... subject="Hermes Agent", + ... predicate="has_feature", + ... object="input_sanitizer", + ... valid_from="2026-04-01T01:00:00" + ... ) + """ + try: + store = _get_store() + triple = store.store_fact(subject, predicate, object, valid_from, valid_until) + + logger.info(f"Stored temporal fact: {subject} {predicate} {object}") + + return { + "success": True, + "triple": { + "id": triple.id, + "subject": triple.subject, + "predicate": triple.predicate, + "object": triple.object, + "valid_from": triple.valid_from, + "valid_until": triple.valid_until, + "timestamp": triple.timestamp, + "version": triple.version + } + } + except Exception as e: + logger.error(f"Failed to store temporal fact: {e}") + return { + "success": False, + "error": str(e) + } + + +def query_historical_state( + subject: str, + timestamp: str, + predicate: Optional[str] = None +) -> Dict[str, Any]: + """Query what was known about a subject at a specific point in time. + + Args: + subject: The entity to query (e.g., "Timmy") + timestamp: The point in time (ISO 8601, e.g., "2026-03-01T00:00:00") + predicate: Optional predicate filter + + Returns: + Dictionary containing the facts valid at that time + + Example: + >>> query_historical_state("Timmy", "2026-03-01T00:00:00") + # Returns facts valid at that time + """ + try: + store = _get_store() + facts = store.query_at_time(timestamp, subject=subject, predicate=predicate) + + logger.info(f"Queried historical state for {subject} at {timestamp}: {len(facts)} facts") + + return { + "success": True, + "subject": subject, + "timestamp": timestamp, + "fact_count": len(facts), + "facts": [ + { + "predicate": f.predicate, + "object": f.object, + "valid_from": f.valid_from, + "valid_until": f.valid_until, + "version": f.version + } + for f in facts + ] + } + except Exception as e: + logger.error(f"Failed to query historical state: {e}") + return { + "success": False, + "error": str(e) + } + + +def get_fact_history( + subject: str, + predicate: str +) -> Dict[str, Any]: + """Get the complete version history of a fact. + + Args: + subject: The subject to query + predicate: The predicate to query + + Returns: + Dictionary containing the version history + + Example: + >>> get_fact_history("Timmy", "view_on_sovereignty") + # Returns all versions of this fact + """ + try: + store = _get_store() + history = store.get_fact_history(subject, predicate) + + logger.info(f"Retrieved history for {subject} {predicate}: {len(history)} versions") + + return { + "success": True, + "subject": subject, + "predicate": predicate, + "version_count": len(history), + "versions": [ + { + "object": h.object, + "valid_from": h.valid_from, + "valid_until": h.valid_until, + "timestamp": h.timestamp, + "version": h.version, + "superseded_by": h.superseded_by + } + for h in history + ] + } + except Exception as e: + logger.error(f"Failed to get fact history: {e}") + return { + "success": False, + "error": str(e) + } + + +def generate_temporal_summary( + entity: str, + start_time: str, + end_time: str +) -> Dict[str, Any]: + """Generate a historical summary of an entity's evolution. + + Args: + entity: The entity to summarize (e.g., "security_audit") + start_time: Start of time range (ISO 8601) + end_time: End of time range (ISO 8601) + + Returns: + Dictionary containing the historical summary + + Example: + >>> generate_temporal_summary("security_audit", "2026-03-01", "2026-04-01") + # Returns evolution of security posture + """ + try: + reasoner = _get_reasoner() + summary = reasoner.generate_temporal_summary(entity, start_time, end_time) + + logger.info(f"Generated temporal summary for {entity}: {summary.total_changes} changes") + + return { + "success": True, + "entity": entity, + "start_time": start_time, + "end_time": end_time, + "summary": summary.to_dict() + } + except Exception as e: + logger.error(f"Failed to generate temporal summary: {e}") + return { + "success": False, + "error": str(e) + } + + +def when_did_we_learn( + subject: str, + predicate: Optional[str] = None, + object: Optional[str] = None +) -> Dict[str, Any]: + """Query when we first learned about something. + + Args: + subject: The subject to search for + predicate: Optional predicate filter + object: Optional object filter + + Returns: + Dictionary containing the timestamp of first knowledge + + Example: + >>> when_did_we_learn("MLX", predicate="integrated_with") + # Returns when MLX integration was first recorded + """ + try: + reasoner = _get_reasoner() + timestamp = reasoner.when_did_we_learn(subject, predicate, object) + + if timestamp: + logger.info(f"Found first knowledge of {subject} at {timestamp}") + return { + "success": True, + "subject": subject, + "predicate": predicate, + "object": object, + "first_known": timestamp + } + else: + return { + "success": True, + "subject": subject, + "predicate": predicate, + "object": object, + "first_known": None, + "message": "No knowledge found for this subject" + } + except Exception as e: + logger.error(f"Failed to query when we learned: {e}") + return { + "success": False, + "error": str(e) + } + + +def how_has_it_changed( + subject: str, + since_time: str +) -> Dict[str, Any]: + """Query how something has changed since a specific time. + + Args: + subject: The entity to analyze + since_time: The starting time (ISO 8601) + + Returns: + Dictionary containing the list of changes + + Example: + >>> how_has_it_changed("codebase", "2026-03-01T00:00:00") + # Returns changes since the security audit + """ + try: + reasoner = _get_reasoner() + changes = reasoner.how_has_it_changed(subject, since_time) + + logger.info(f"Found {len(changes)} changes for {subject} since {since_time}") + + return { + "success": True, + "subject": subject, + "since_time": since_time, + "change_count": len(changes), + "changes": [ + { + "change_type": c.change_type.value, + "predicate": c.predicate, + "old_value": c.old_value, + "new_value": c.new_value, + "timestamp": c.timestamp, + "version": c.version + } + for c in changes + ] + } + except Exception as e: + logger.error(f"Failed to query changes: {e}") + return { + "success": False, + "error": str(e) + } + + +def query_with_temporal_operator( + operator: str, + timestamp: str, + subject: Optional[str] = None, + predicate: Optional[str] = None +) -> Dict[str, Any]: + """Query using temporal operators (BEFORE, AFTER, DURING, OVERLAPS). + + Args: + operator: Temporal operator (BEFORE, AFTER, DURING, OVERLAPS, AT) + timestamp: Reference timestamp (ISO 8601) + subject: Optional subject filter + predicate: Optional predicate filter + + Returns: + Dictionary containing matching facts + + Example: + >>> query_with_temporal_operator("BEFORE", "2026-04-01T00:00:00", subject="Timmy") + # Returns facts about Timmy before April 2026 + """ + try: + store = _get_store() + + # Map string to enum + op_map = { + "BEFORE": TemporalOperator.BEFORE, + "AFTER": TemporalOperator.AFTER, + "DURING": TemporalOperator.DURING, + "OVERLAPS": TemporalOperator.OVERLAPS, + "AT": TemporalOperator.AT + } + + if operator.upper() not in op_map: + return { + "success": False, + "error": f"Invalid operator: {operator}. Use BEFORE, AFTER, DURING, OVERLAPS, or AT" + } + + op = op_map[operator.upper()] + facts = store.query_temporal(op, timestamp, subject, predicate) + + logger.info(f"Queried with operator {operator}: {len(facts)} facts") + + return { + "success": True, + "operator": operator, + "timestamp": timestamp, + "subject": subject, + "predicate": predicate, + "fact_count": len(facts), + "facts": [ + { + "subject": f.subject, + "predicate": f.predicate, + "object": f.object, + "valid_from": f.valid_from, + "valid_until": f.valid_until, + "version": f.version + } + for f in facts + ] + } + except Exception as e: + logger.error(f"Failed to query with temporal operator: {e}") + return { + "success": False, + "error": str(e) + } + + +def get_worldview_at_time( + timestamp: str, + subjects: Optional[List[str]] = None +) -> Dict[str, Any]: + """Get Timmy's complete worldview at a specific point in time. + + Args: + timestamp: The point in time (ISO 8601) + subjects: Optional list of subjects to include. If None, includes all. + + Returns: + Dictionary mapping subjects to their facts at that time + + Example: + >>> get_worldview_at_time("2026-03-01T00:00:00", ["Timmy", "Hermes"]) + """ + try: + reasoner = _get_reasoner() + worldview = reasoner.get_worldview_at_time(timestamp, subjects) + + logger.info(f"Retrieved worldview at {timestamp}: {len(worldview)} entities") + + return { + "success": True, + "timestamp": timestamp, + "entity_count": len(worldview), + "worldview": worldview + } + except Exception as e: + logger.error(f"Failed to get worldview: {e}") + return { + "success": False, + "error": str(e) + } + + +# Convenience function for natural language queries +def ask_temporal_question(question: str, **kwargs) -> Dict[str, Any]: + """Parse and answer a temporal question. + + This is a higher-level interface that can parse simple temporal questions + and route them to the appropriate function. + + Args: + question: Natural language temporal question + **kwargs: Additional context parameters + + Returns: + Dictionary containing the answer + + Example: + >>> ask_temporal_question("What was Timmy's view on sovereignty before March 2026?") + """ + question_lower = question.lower() + + # Simple pattern matching for common question types + if "what did we believe" in question_lower or "what was" in question_lower: + if "before" in question_lower: + # Extract subject and time + subject = kwargs.get("subject") + before_time = kwargs.get("before_time") + if subject and before_time: + return query_historical_state(subject, before_time) + + elif "when did we first learn" in question_lower or "when did we learn" in question_lower: + subject = kwargs.get("subject") + predicate = kwargs.get("predicate") + if subject: + return when_did_we_learn(subject, predicate) + + elif "how has" in question_lower and "changed" in question_lower: + subject = kwargs.get("subject") + since_time = kwargs.get("since_time") + if subject and since_time: + return how_has_it_changed(subject, since_time) + + return { + "success": False, + "error": "Could not parse temporal question. Use specific function calls instead.", + "available_functions": [ + "store_fact_with_time", + "query_historical_state", + "get_fact_history", + "generate_temporal_summary", + "when_did_we_learn", + "how_has_it_changed", + "query_with_temporal_operator", + "get_worldview_at_time" + ] + }