Implement Phase 28: Sovereign Knowledge Graph 'Time Travel' - agent/temporal_knowledge_graph.py: SQLite-backed temporal triple store with versioning, validity periods, and temporal query operators (BEFORE, AFTER, DURING, OVERLAPS, AT) - agent/temporal_reasoning.py: Temporal reasoning engine supporting historical queries, fact evolution tracking, and worldview snapshots - tools/temporal_kg_tool.py: Tool integration with functions for storing facts with time, querying historical state, generating temporal summaries, and natural language temporal queries - tests/test_temporal_kg.py: Comprehensive test coverage including storage tests, query operators, historical summaries, and integration tests
492 lines
15 KiB
Python
492 lines
15 KiB
Python
"""Temporal Knowledge Graph Tool for Hermes Agent.
|
|
|
|
Provides tool functions for storing and querying temporal facts,
|
|
enabling Timmy to track how knowledge evolves over time.
|
|
|
|
Functions:
|
|
- store_fact_with_time: Store a fact with temporal bounds
|
|
- query_historical_state: Query facts valid at a specific time
|
|
- get_fact_history: Get the version history of a fact
|
|
- generate_temporal_summary: Generate a historical summary
|
|
"""
|
|
|
|
import logging
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
|
|
from agent.temporal_knowledge_graph import TemporalTripleStore, TemporalOperator
|
|
from agent.temporal_reasoning import TemporalReasoner
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Global instances (singleton pattern)
|
|
_store: Optional[TemporalTripleStore] = None
|
|
_reasoner: Optional[TemporalReasoner] = None
|
|
|
|
|
|
def _get_store() -> TemporalTripleStore:
|
|
"""Get or create the temporal triple store singleton."""
|
|
global _store
|
|
if _store is None:
|
|
_store = TemporalTripleStore()
|
|
return _store
|
|
|
|
|
|
def _get_reasoner() -> TemporalReasoner:
|
|
"""Get or create the temporal reasoner singleton."""
|
|
global _reasoner
|
|
if _reasoner is None:
|
|
_reasoner = TemporalReasoner(_get_store())
|
|
return _reasoner
|
|
|
|
|
|
def store_fact_with_time(
|
|
subject: str,
|
|
predicate: str,
|
|
object: str,
|
|
valid_from: Optional[str] = None,
|
|
valid_until: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Store a fact with temporal metadata.
|
|
|
|
Args:
|
|
subject: The subject of the fact (e.g., "Hermes Agent")
|
|
predicate: The predicate/relationship (e.g., "has_feature")
|
|
object: The object/value (e.g., "input_sanitizer")
|
|
valid_from: When this fact becomes valid (ISO 8601). Defaults to now.
|
|
valid_until: When this fact expires (ISO 8601). None means still valid.
|
|
|
|
Returns:
|
|
Dictionary containing the stored triple details
|
|
|
|
Example:
|
|
>>> store_fact_with_time(
|
|
... subject="Hermes Agent",
|
|
... predicate="has_feature",
|
|
... object="input_sanitizer",
|
|
... valid_from="2026-04-01T01:00:00"
|
|
... )
|
|
"""
|
|
try:
|
|
store = _get_store()
|
|
triple = store.store_fact(subject, predicate, object, valid_from, valid_until)
|
|
|
|
logger.info(f"Stored temporal fact: {subject} {predicate} {object}")
|
|
|
|
return {
|
|
"success": True,
|
|
"triple": {
|
|
"id": triple.id,
|
|
"subject": triple.subject,
|
|
"predicate": triple.predicate,
|
|
"object": triple.object,
|
|
"valid_from": triple.valid_from,
|
|
"valid_until": triple.valid_until,
|
|
"timestamp": triple.timestamp,
|
|
"version": triple.version
|
|
}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to store temporal fact: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def query_historical_state(
|
|
subject: str,
|
|
timestamp: str,
|
|
predicate: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Query what was known about a subject at a specific point in time.
|
|
|
|
Args:
|
|
subject: The entity to query (e.g., "Timmy")
|
|
timestamp: The point in time (ISO 8601, e.g., "2026-03-01T00:00:00")
|
|
predicate: Optional predicate filter
|
|
|
|
Returns:
|
|
Dictionary containing the facts valid at that time
|
|
|
|
Example:
|
|
>>> query_historical_state("Timmy", "2026-03-01T00:00:00")
|
|
# Returns facts valid at that time
|
|
"""
|
|
try:
|
|
store = _get_store()
|
|
facts = store.query_at_time(timestamp, subject=subject, predicate=predicate)
|
|
|
|
logger.info(f"Queried historical state for {subject} at {timestamp}: {len(facts)} facts")
|
|
|
|
return {
|
|
"success": True,
|
|
"subject": subject,
|
|
"timestamp": timestamp,
|
|
"fact_count": len(facts),
|
|
"facts": [
|
|
{
|
|
"predicate": f.predicate,
|
|
"object": f.object,
|
|
"valid_from": f.valid_from,
|
|
"valid_until": f.valid_until,
|
|
"version": f.version
|
|
}
|
|
for f in facts
|
|
]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to query historical state: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def get_fact_history(
|
|
subject: str,
|
|
predicate: str
|
|
) -> Dict[str, Any]:
|
|
"""Get the complete version history of a fact.
|
|
|
|
Args:
|
|
subject: The subject to query
|
|
predicate: The predicate to query
|
|
|
|
Returns:
|
|
Dictionary containing the version history
|
|
|
|
Example:
|
|
>>> get_fact_history("Timmy", "view_on_sovereignty")
|
|
# Returns all versions of this fact
|
|
"""
|
|
try:
|
|
store = _get_store()
|
|
history = store.get_fact_history(subject, predicate)
|
|
|
|
logger.info(f"Retrieved history for {subject} {predicate}: {len(history)} versions")
|
|
|
|
return {
|
|
"success": True,
|
|
"subject": subject,
|
|
"predicate": predicate,
|
|
"version_count": len(history),
|
|
"versions": [
|
|
{
|
|
"object": h.object,
|
|
"valid_from": h.valid_from,
|
|
"valid_until": h.valid_until,
|
|
"timestamp": h.timestamp,
|
|
"version": h.version,
|
|
"superseded_by": h.superseded_by
|
|
}
|
|
for h in history
|
|
]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get fact history: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def generate_temporal_summary(
|
|
entity: str,
|
|
start_time: str,
|
|
end_time: str
|
|
) -> Dict[str, Any]:
|
|
"""Generate a historical summary of an entity's evolution.
|
|
|
|
Args:
|
|
entity: The entity to summarize (e.g., "security_audit")
|
|
start_time: Start of time range (ISO 8601)
|
|
end_time: End of time range (ISO 8601)
|
|
|
|
Returns:
|
|
Dictionary containing the historical summary
|
|
|
|
Example:
|
|
>>> generate_temporal_summary("security_audit", "2026-03-01", "2026-04-01")
|
|
# Returns evolution of security posture
|
|
"""
|
|
try:
|
|
reasoner = _get_reasoner()
|
|
summary = reasoner.generate_temporal_summary(entity, start_time, end_time)
|
|
|
|
logger.info(f"Generated temporal summary for {entity}: {summary.total_changes} changes")
|
|
|
|
return {
|
|
"success": True,
|
|
"entity": entity,
|
|
"start_time": start_time,
|
|
"end_time": end_time,
|
|
"summary": summary.to_dict()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate temporal summary: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def when_did_we_learn(
|
|
subject: str,
|
|
predicate: Optional[str] = None,
|
|
object: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Query when we first learned about something.
|
|
|
|
Args:
|
|
subject: The subject to search for
|
|
predicate: Optional predicate filter
|
|
object: Optional object filter
|
|
|
|
Returns:
|
|
Dictionary containing the timestamp of first knowledge
|
|
|
|
Example:
|
|
>>> when_did_we_learn("MLX", predicate="integrated_with")
|
|
# Returns when MLX integration was first recorded
|
|
"""
|
|
try:
|
|
reasoner = _get_reasoner()
|
|
timestamp = reasoner.when_did_we_learn(subject, predicate, object)
|
|
|
|
if timestamp:
|
|
logger.info(f"Found first knowledge of {subject} at {timestamp}")
|
|
return {
|
|
"success": True,
|
|
"subject": subject,
|
|
"predicate": predicate,
|
|
"object": object,
|
|
"first_known": timestamp
|
|
}
|
|
else:
|
|
return {
|
|
"success": True,
|
|
"subject": subject,
|
|
"predicate": predicate,
|
|
"object": object,
|
|
"first_known": None,
|
|
"message": "No knowledge found for this subject"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to query when we learned: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def how_has_it_changed(
|
|
subject: str,
|
|
since_time: str
|
|
) -> Dict[str, Any]:
|
|
"""Query how something has changed since a specific time.
|
|
|
|
Args:
|
|
subject: The entity to analyze
|
|
since_time: The starting time (ISO 8601)
|
|
|
|
Returns:
|
|
Dictionary containing the list of changes
|
|
|
|
Example:
|
|
>>> how_has_it_changed("codebase", "2026-03-01T00:00:00")
|
|
# Returns changes since the security audit
|
|
"""
|
|
try:
|
|
reasoner = _get_reasoner()
|
|
changes = reasoner.how_has_it_changed(subject, since_time)
|
|
|
|
logger.info(f"Found {len(changes)} changes for {subject} since {since_time}")
|
|
|
|
return {
|
|
"success": True,
|
|
"subject": subject,
|
|
"since_time": since_time,
|
|
"change_count": len(changes),
|
|
"changes": [
|
|
{
|
|
"change_type": c.change_type.value,
|
|
"predicate": c.predicate,
|
|
"old_value": c.old_value,
|
|
"new_value": c.new_value,
|
|
"timestamp": c.timestamp,
|
|
"version": c.version
|
|
}
|
|
for c in changes
|
|
]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to query changes: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def query_with_temporal_operator(
|
|
operator: str,
|
|
timestamp: str,
|
|
subject: Optional[str] = None,
|
|
predicate: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""Query using temporal operators (BEFORE, AFTER, DURING, OVERLAPS).
|
|
|
|
Args:
|
|
operator: Temporal operator (BEFORE, AFTER, DURING, OVERLAPS, AT)
|
|
timestamp: Reference timestamp (ISO 8601)
|
|
subject: Optional subject filter
|
|
predicate: Optional predicate filter
|
|
|
|
Returns:
|
|
Dictionary containing matching facts
|
|
|
|
Example:
|
|
>>> query_with_temporal_operator("BEFORE", "2026-04-01T00:00:00", subject="Timmy")
|
|
# Returns facts about Timmy before April 2026
|
|
"""
|
|
try:
|
|
store = _get_store()
|
|
|
|
# Map string to enum
|
|
op_map = {
|
|
"BEFORE": TemporalOperator.BEFORE,
|
|
"AFTER": TemporalOperator.AFTER,
|
|
"DURING": TemporalOperator.DURING,
|
|
"OVERLAPS": TemporalOperator.OVERLAPS,
|
|
"AT": TemporalOperator.AT
|
|
}
|
|
|
|
if operator.upper() not in op_map:
|
|
return {
|
|
"success": False,
|
|
"error": f"Invalid operator: {operator}. Use BEFORE, AFTER, DURING, OVERLAPS, or AT"
|
|
}
|
|
|
|
op = op_map[operator.upper()]
|
|
facts = store.query_temporal(op, timestamp, subject, predicate)
|
|
|
|
logger.info(f"Queried with operator {operator}: {len(facts)} facts")
|
|
|
|
return {
|
|
"success": True,
|
|
"operator": operator,
|
|
"timestamp": timestamp,
|
|
"subject": subject,
|
|
"predicate": predicate,
|
|
"fact_count": len(facts),
|
|
"facts": [
|
|
{
|
|
"subject": f.subject,
|
|
"predicate": f.predicate,
|
|
"object": f.object,
|
|
"valid_from": f.valid_from,
|
|
"valid_until": f.valid_until,
|
|
"version": f.version
|
|
}
|
|
for f in facts
|
|
]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to query with temporal operator: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def get_worldview_at_time(
|
|
timestamp: str,
|
|
subjects: Optional[List[str]] = None
|
|
) -> Dict[str, Any]:
|
|
"""Get Timmy's complete worldview at a specific point in time.
|
|
|
|
Args:
|
|
timestamp: The point in time (ISO 8601)
|
|
subjects: Optional list of subjects to include. If None, includes all.
|
|
|
|
Returns:
|
|
Dictionary mapping subjects to their facts at that time
|
|
|
|
Example:
|
|
>>> get_worldview_at_time("2026-03-01T00:00:00", ["Timmy", "Hermes"])
|
|
"""
|
|
try:
|
|
reasoner = _get_reasoner()
|
|
worldview = reasoner.get_worldview_at_time(timestamp, subjects)
|
|
|
|
logger.info(f"Retrieved worldview at {timestamp}: {len(worldview)} entities")
|
|
|
|
return {
|
|
"success": True,
|
|
"timestamp": timestamp,
|
|
"entity_count": len(worldview),
|
|
"worldview": worldview
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Failed to get worldview: {e}")
|
|
return {
|
|
"success": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
# Convenience function for natural language queries
|
|
def ask_temporal_question(question: str, **kwargs) -> Dict[str, Any]:
|
|
"""Parse and answer a temporal question.
|
|
|
|
This is a higher-level interface that can parse simple temporal questions
|
|
and route them to the appropriate function.
|
|
|
|
Args:
|
|
question: Natural language temporal question
|
|
**kwargs: Additional context parameters
|
|
|
|
Returns:
|
|
Dictionary containing the answer
|
|
|
|
Example:
|
|
>>> ask_temporal_question("What was Timmy's view on sovereignty before March 2026?")
|
|
"""
|
|
question_lower = question.lower()
|
|
|
|
# Simple pattern matching for common question types
|
|
if "what did we believe" in question_lower or "what was" in question_lower:
|
|
if "before" in question_lower:
|
|
# Extract subject and time
|
|
subject = kwargs.get("subject")
|
|
before_time = kwargs.get("before_time")
|
|
if subject and before_time:
|
|
return query_historical_state(subject, before_time)
|
|
|
|
elif "when did we first learn" in question_lower or "when did we learn" in question_lower:
|
|
subject = kwargs.get("subject")
|
|
predicate = kwargs.get("predicate")
|
|
if subject:
|
|
return when_did_we_learn(subject, predicate)
|
|
|
|
elif "how has" in question_lower and "changed" in question_lower:
|
|
subject = kwargs.get("subject")
|
|
since_time = kwargs.get("since_time")
|
|
if subject and since_time:
|
|
return how_has_it_changed(subject, since_time)
|
|
|
|
return {
|
|
"success": False,
|
|
"error": "Could not parse temporal question. Use specific function calls instead.",
|
|
"available_functions": [
|
|
"store_fact_with_time",
|
|
"query_historical_state",
|
|
"get_fact_history",
|
|
"generate_temporal_summary",
|
|
"when_did_we_learn",
|
|
"how_has_it_changed",
|
|
"query_with_temporal_operator",
|
|
"get_worldview_at_time"
|
|
]
|
|
}
|