Implement Phase 28: Sovereign Knowledge Graph 'Time Travel' - agent/temporal_knowledge_graph.py: SQLite-backed temporal triple store with versioning, validity periods, and temporal query operators (BEFORE, AFTER, DURING, OVERLAPS, AT) - agent/temporal_reasoning.py: Temporal reasoning engine supporting historical queries, fact evolution tracking, and worldview snapshots - tools/temporal_kg_tool.py: Tool integration with functions for storing facts with time, querying historical state, generating temporal summaries, and natural language temporal queries - tests/test_temporal_kg.py: Comprehensive test coverage including storage tests, query operators, historical summaries, and integration tests
435 lines
14 KiB
Python
435 lines
14 KiB
Python
"""Temporal Reasoning Engine for Hermes Agent.
|
|
|
|
Enables Timmy to reason about past and future states, generate historical
|
|
summaries, and perform temporal inference over the evolving knowledge graph.
|
|
|
|
Queries supported:
|
|
- "What was Timmy's view on sovereignty before March 2026?"
|
|
- "When did we first learn about MLX integration?"
|
|
- "How has the codebase changed since the security audit?"
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
|
|
from agent.temporal_knowledge_graph import (
|
|
TemporalTripleStore, TemporalTriple, TemporalOperator
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ChangeType(Enum):
|
|
"""Types of changes in the knowledge graph."""
|
|
ADDED = "added"
|
|
REMOVED = "removed"
|
|
MODIFIED = "modified"
|
|
SUPERSEDED = "superseded"
|
|
|
|
|
|
@dataclass
|
|
class FactChange:
|
|
"""Represents a change in a fact over time."""
|
|
change_type: ChangeType
|
|
subject: str
|
|
predicate: str
|
|
old_value: Optional[str]
|
|
new_value: Optional[str]
|
|
timestamp: str
|
|
version: int
|
|
|
|
|
|
@dataclass
|
|
class HistoricalSummary:
|
|
"""Summary of how an entity or concept evolved over time."""
|
|
entity: str
|
|
start_time: str
|
|
end_time: str
|
|
total_changes: int
|
|
key_facts: List[Dict[str, Any]]
|
|
evolution_timeline: List[FactChange]
|
|
current_state: List[Dict[str, Any]]
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"entity": self.entity,
|
|
"start_time": self.start_time,
|
|
"end_time": self.end_time,
|
|
"total_changes": self.total_changes,
|
|
"key_facts": self.key_facts,
|
|
"evolution_timeline": [
|
|
{
|
|
"change_type": c.change_type.value,
|
|
"subject": c.subject,
|
|
"predicate": c.predicate,
|
|
"old_value": c.old_value,
|
|
"new_value": c.new_value,
|
|
"timestamp": c.timestamp,
|
|
"version": c.version
|
|
}
|
|
for c in self.evolution_timeline
|
|
],
|
|
"current_state": self.current_state
|
|
}
|
|
|
|
|
|
class TemporalReasoner:
|
|
"""Reasoning engine for temporal knowledge graphs."""
|
|
|
|
def __init__(self, store: Optional[TemporalTripleStore] = None):
|
|
"""Initialize the temporal reasoner.
|
|
|
|
Args:
|
|
store: Optional TemporalTripleStore instance. Creates new if None.
|
|
"""
|
|
self.store = store or TemporalTripleStore()
|
|
|
|
def what_did_we_believe(
|
|
self,
|
|
subject: str,
|
|
before_time: str
|
|
) -> List[TemporalTriple]:
|
|
"""Query: "What did we believe about X before Y happened?"
|
|
|
|
Args:
|
|
subject: The entity to query about
|
|
before_time: The cutoff time (ISO 8601)
|
|
|
|
Returns:
|
|
List of facts believed before the given time
|
|
"""
|
|
# Get facts that were valid just before the given time
|
|
return self.store.query_temporal(
|
|
TemporalOperator.BEFORE,
|
|
before_time,
|
|
subject=subject
|
|
)
|
|
|
|
def when_did_we_learn(
|
|
self,
|
|
subject: str,
|
|
predicate: Optional[str] = None,
|
|
object: Optional[str] = None
|
|
) -> Optional[str]:
|
|
"""Query: "When did we first learn about X?"
|
|
|
|
Args:
|
|
subject: The subject to search for
|
|
predicate: Optional predicate filter
|
|
object: Optional object filter
|
|
|
|
Returns:
|
|
Timestamp of first knowledge, or None if never learned
|
|
"""
|
|
history = self.store.get_fact_history(subject, predicate or "")
|
|
|
|
# Filter by object if specified
|
|
if object:
|
|
history = [h for h in history if h.object == object]
|
|
|
|
if history:
|
|
# Return the earliest timestamp
|
|
earliest = min(history, key=lambda x: x.timestamp)
|
|
return earliest.timestamp
|
|
return None
|
|
|
|
def how_has_it_changed(
|
|
self,
|
|
subject: str,
|
|
since_time: str
|
|
) -> List[FactChange]:
|
|
"""Query: "How has X changed since Y?"
|
|
|
|
Args:
|
|
subject: The entity to analyze
|
|
since_time: The starting time (ISO 8601)
|
|
|
|
Returns:
|
|
List of changes since the given time
|
|
"""
|
|
now = datetime.now().isoformat()
|
|
changes = self.store.get_entity_changes(subject, since_time, now)
|
|
|
|
fact_changes = []
|
|
for i, triple in enumerate(changes):
|
|
# Determine change type
|
|
if i == 0:
|
|
change_type = ChangeType.ADDED
|
|
old_value = None
|
|
else:
|
|
prev = changes[i - 1]
|
|
if triple.object != prev.object:
|
|
change_type = ChangeType.MODIFIED
|
|
old_value = prev.object
|
|
else:
|
|
change_type = ChangeType.SUPERSEDED
|
|
old_value = prev.object
|
|
|
|
fact_changes.append(FactChange(
|
|
change_type=change_type,
|
|
subject=triple.subject,
|
|
predicate=triple.predicate,
|
|
old_value=old_value,
|
|
new_value=triple.object,
|
|
timestamp=triple.timestamp,
|
|
version=triple.version
|
|
))
|
|
|
|
return fact_changes
|
|
|
|
def generate_temporal_summary(
|
|
self,
|
|
entity: str,
|
|
start_time: str,
|
|
end_time: str
|
|
) -> HistoricalSummary:
|
|
"""Generate a historical summary of an entity's evolution.
|
|
|
|
Args:
|
|
entity: The entity to summarize
|
|
start_time: Start of the time range (ISO 8601)
|
|
end_time: End of the time range (ISO 8601)
|
|
|
|
Returns:
|
|
HistoricalSummary containing the entity's evolution
|
|
"""
|
|
# Get all facts for the entity in the time range
|
|
initial_state = self.store.query_at_time(start_time, subject=entity)
|
|
final_state = self.store.query_at_time(end_time, subject=entity)
|
|
changes = self.store.get_entity_changes(entity, start_time, end_time)
|
|
|
|
# Build evolution timeline
|
|
evolution_timeline = []
|
|
seen_predicates = set()
|
|
|
|
for triple in changes:
|
|
if triple.predicate not in seen_predicates:
|
|
seen_predicates.add(triple.predicate)
|
|
evolution_timeline.append(FactChange(
|
|
change_type=ChangeType.ADDED,
|
|
subject=triple.subject,
|
|
predicate=triple.predicate,
|
|
old_value=None,
|
|
new_value=triple.object,
|
|
timestamp=triple.timestamp,
|
|
version=triple.version
|
|
))
|
|
else:
|
|
# Find previous value
|
|
prev = [t for t in changes
|
|
if t.predicate == triple.predicate
|
|
and t.timestamp < triple.timestamp]
|
|
old_value = prev[-1].object if prev else None
|
|
|
|
evolution_timeline.append(FactChange(
|
|
change_type=ChangeType.MODIFIED,
|
|
subject=triple.subject,
|
|
predicate=triple.predicate,
|
|
old_value=old_value,
|
|
new_value=triple.object,
|
|
timestamp=triple.timestamp,
|
|
version=triple.version
|
|
))
|
|
|
|
# Extract key facts (predicates that changed most)
|
|
key_facts = []
|
|
predicate_changes = {}
|
|
for change in evolution_timeline:
|
|
predicate_changes[change.predicate] = (
|
|
predicate_changes.get(change.predicate, 0) + 1
|
|
)
|
|
|
|
top_predicates = sorted(
|
|
predicate_changes.items(),
|
|
key=lambda x: x[1],
|
|
reverse=True
|
|
)[:5]
|
|
|
|
for pred, count in top_predicates:
|
|
current = [t for t in final_state if t.predicate == pred]
|
|
if current:
|
|
key_facts.append({
|
|
"predicate": pred,
|
|
"current_value": current[0].object,
|
|
"changes": count
|
|
})
|
|
|
|
# Build current state
|
|
current_state = [
|
|
{
|
|
"predicate": t.predicate,
|
|
"object": t.object,
|
|
"valid_from": t.valid_from,
|
|
"valid_until": t.valid_until
|
|
}
|
|
for t in final_state
|
|
]
|
|
|
|
return HistoricalSummary(
|
|
entity=entity,
|
|
start_time=start_time,
|
|
end_time=end_time,
|
|
total_changes=len(evolution_timeline),
|
|
key_facts=key_facts,
|
|
evolution_timeline=evolution_timeline,
|
|
current_state=current_state
|
|
)
|
|
|
|
def infer_temporal_relationship(
|
|
self,
|
|
fact_a: TemporalTriple,
|
|
fact_b: TemporalTriple
|
|
) -> Optional[str]:
|
|
"""Infer temporal relationship between two facts.
|
|
|
|
Args:
|
|
fact_a: First fact
|
|
fact_b: Second fact
|
|
|
|
Returns:
|
|
Description of temporal relationship, or None
|
|
"""
|
|
a_start = datetime.fromisoformat(fact_a.valid_from)
|
|
a_end = datetime.fromisoformat(fact_a.valid_until) if fact_a.valid_until else None
|
|
b_start = datetime.fromisoformat(fact_b.valid_from)
|
|
b_end = datetime.fromisoformat(fact_b.valid_until) if fact_b.valid_until else None
|
|
|
|
# Check if A happened before B
|
|
if a_end and a_end <= b_start:
|
|
return "A happened before B"
|
|
|
|
# Check if B happened before A
|
|
if b_end and b_end <= a_start:
|
|
return "B happened before A"
|
|
|
|
# Check if they overlap
|
|
if a_end and b_end:
|
|
if a_start <= b_end and b_start <= a_end:
|
|
return "A and B overlap in time"
|
|
|
|
# Check if one supersedes the other
|
|
if fact_a.superseded_by == fact_b.id:
|
|
return "B supersedes A"
|
|
if fact_b.superseded_by == fact_a.id:
|
|
return "A supersedes B"
|
|
|
|
return "A and B are temporally unrelated"
|
|
|
|
def get_worldview_at_time(
|
|
self,
|
|
timestamp: str,
|
|
subjects: Optional[List[str]] = None
|
|
) -> Dict[str, List[Dict[str, Any]]]:
|
|
"""Get Timmy's complete worldview at a specific point in time.
|
|
|
|
Args:
|
|
timestamp: The point in time (ISO 8601)
|
|
subjects: Optional list of subjects to include. If None, includes all.
|
|
|
|
Returns:
|
|
Dictionary mapping subjects to their facts at that time
|
|
"""
|
|
worldview = {}
|
|
|
|
if subjects:
|
|
for subject in subjects:
|
|
facts = self.store.query_at_time(timestamp, subject=subject)
|
|
if facts:
|
|
worldview[subject] = [
|
|
{
|
|
"predicate": f.predicate,
|
|
"object": f.object,
|
|
"version": f.version
|
|
}
|
|
for f in facts
|
|
]
|
|
else:
|
|
# Get all facts at that time
|
|
all_facts = self.store.query_at_time(timestamp)
|
|
for fact in all_facts:
|
|
if fact.subject not in worldview:
|
|
worldview[fact.subject] = []
|
|
worldview[fact.subject].append({
|
|
"predicate": fact.predicate,
|
|
"object": fact.object,
|
|
"version": fact.version
|
|
})
|
|
|
|
return worldview
|
|
|
|
def find_knowledge_gaps(
|
|
self,
|
|
subject: str,
|
|
expected_predicates: List[str]
|
|
) -> List[str]:
|
|
"""Find predicates that are missing or have expired for a subject.
|
|
|
|
Args:
|
|
subject: The entity to check
|
|
expected_predicates: List of predicates that should exist
|
|
|
|
Returns:
|
|
List of missing predicate names
|
|
"""
|
|
now = datetime.now().isoformat()
|
|
current_facts = self.store.query_at_time(now, subject=subject)
|
|
current_predicates = {f.predicate for f in current_facts}
|
|
|
|
return [
|
|
pred for pred in expected_predicates
|
|
if pred not in current_predicates
|
|
]
|
|
|
|
def export_reasoning_report(
|
|
self,
|
|
entity: str,
|
|
start_time: str,
|
|
end_time: str
|
|
) -> str:
|
|
"""Generate a human-readable reasoning report.
|
|
|
|
Args:
|
|
entity: The entity to report on
|
|
start_time: Start of the time range
|
|
end_time: End of the time range
|
|
|
|
Returns:
|
|
Formatted report string
|
|
"""
|
|
summary = self.generate_temporal_summary(entity, start_time, end_time)
|
|
|
|
report = f"""
|
|
# Temporal Reasoning Report: {entity}
|
|
|
|
## Time Range
|
|
- From: {start_time}
|
|
- To: {end_time}
|
|
|
|
## Summary
|
|
- Total Changes: {summary.total_changes}
|
|
- Key Facts Tracked: {len(summary.key_facts)}
|
|
|
|
## Key Facts
|
|
"""
|
|
for fact in summary.key_facts:
|
|
report += f"- **{fact['predicate']}**: {fact['current_value']} ({fact['changes']} changes)\n"
|
|
|
|
report += "\n## Evolution Timeline\n"
|
|
for change in summary.evolution_timeline[:10]: # Show first 10
|
|
report += f"- [{change.timestamp}] {change.change_type.value}: {change.predicate}\n"
|
|
if change.old_value:
|
|
report += f" - Changed from: {change.old_value}\n"
|
|
report += f" - Changed to: {change.new_value}\n"
|
|
|
|
if len(summary.evolution_timeline) > 10:
|
|
report += f"\n... and {len(summary.evolution_timeline) - 10} more changes\n"
|
|
|
|
report += "\n## Current State\n"
|
|
for state in summary.current_state:
|
|
report += f"- {state['predicate']}: {state['object']}\n"
|
|
|
|
return report
|