Merge PR #9: SOTA Sovereign Intersymbolic Knowledge Graph (SIKG)
Features: - tools/graph_store.py: Sovereign triple-store with Gitea persistence - agent/symbolic_memory.py: Neural-to-symbolic bridge with multi-hop search - skills/memory/intersymbolic_graph.py: Graph query skill - Integrated into KnowledgeIngester for automatic symbolic extraction Tests added: - tests/tools/test_graph_store.py (127 lines) - tests/agent/test_symbolic_memory.py (144 lines) Reviewed and merged by Allegro (BURN MODE).
This commit is contained in:
@@ -1,13 +1,14 @@
|
|||||||
"""Sovereign Knowledge Ingester for Hermes Agent.
|
"""Sovereign Knowledge Ingester for Hermes Agent.
|
||||||
|
|
||||||
Uses Gemini 3.1 Pro to learn from Google Search in real-time and
|
Uses Gemini 3.1 Pro to learn from Google Search in real-time and
|
||||||
persists the knowledge to Timmy's sovereign memory.
|
persists the knowledge to Timmy's sovereign memory (both Markdown and Symbolic).
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import base64
|
import base64
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
from agent.gemini_adapter import GeminiAdapter
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from agent.symbolic_memory import SymbolicMemory
|
||||||
from tools.gitea_client import GiteaClient
|
from tools.gitea_client import GiteaClient
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -16,6 +17,7 @@ class KnowledgeIngester:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.adapter = GeminiAdapter()
|
self.adapter = GeminiAdapter()
|
||||||
self.gitea = GiteaClient()
|
self.gitea = GiteaClient()
|
||||||
|
self.symbolic = SymbolicMemory()
|
||||||
|
|
||||||
def learn_about(self, topic: str) -> str:
|
def learn_about(self, topic: str) -> str:
|
||||||
"""Searches Google, analyzes the results, and saves the knowledge."""
|
"""Searches Google, analyzes the results, and saves the knowledge."""
|
||||||
@@ -43,12 +45,14 @@ Include:
|
|||||||
|
|
||||||
knowledge_fragment = result["text"]
|
knowledge_fragment = result["text"]
|
||||||
|
|
||||||
# 2. Persist to Timmy's Memory
|
# 2. Extract Symbolic Triples
|
||||||
|
self.symbolic.ingest_text(knowledge_fragment)
|
||||||
|
|
||||||
|
# 3. Persist to Timmy's Memory (Markdown)
|
||||||
repo = "Timmy_Foundation/timmy-config"
|
repo = "Timmy_Foundation/timmy-config"
|
||||||
filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md"
|
filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Check if file exists to get SHA
|
|
||||||
sha = None
|
sha = None
|
||||||
try:
|
try:
|
||||||
existing = self.gitea.get_file(repo, filename)
|
existing = self.gitea.get_file(repo, filename)
|
||||||
@@ -63,7 +67,7 @@ Include:
|
|||||||
else:
|
else:
|
||||||
self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}")
|
self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}")
|
||||||
|
|
||||||
return f"Successfully learned about {topic} and updated Timmy's memory at {filename}"
|
return f"Successfully learned about {topic}. Updated Timmy's Markdown memory and Symbolic Knowledge Graph."
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to persist knowledge: {e}")
|
logger.error(f"Failed to persist knowledge: {e}")
|
||||||
return f"Learned about {topic}, but failed to save to memory: {e}\n\n{knowledge_fragment}"
|
return f"Learned about {topic}, but failed to save to Markdown memory: {e}\n\n{knowledge_fragment}"
|
||||||
|
|||||||
74
agent/symbolic_memory.py
Normal file
74
agent/symbolic_memory.py
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
"""Sovereign Intersymbolic Memory Layer.
|
||||||
|
|
||||||
|
Bridges Neural (LLM) and Symbolic (Graph) reasoning by extracting
|
||||||
|
structured triples from unstructured text and performing graph lookups.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
from agent.gemini_adapter import GeminiAdapter
|
||||||
|
from tools.graph_store import GraphStore
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class SymbolicMemory:
|
||||||
|
def __init__(self):
|
||||||
|
self.adapter = GeminiAdapter()
|
||||||
|
self.store = GraphStore()
|
||||||
|
|
||||||
|
def ingest_text(self, text: str):
|
||||||
|
"""Extracts triples from text and adds them to the graph."""
|
||||||
|
prompt = f"""
|
||||||
|
Extract all meaningful entities and their relationships from the following text.
|
||||||
|
Format the output as a JSON list of triples: [{{"s": "subject", "p": "predicate", "o": "object"}}]
|
||||||
|
|
||||||
|
Text:
|
||||||
|
{text}
|
||||||
|
|
||||||
|
Guidelines:
|
||||||
|
- Use clear, concise labels for entities and predicates.
|
||||||
|
- Focus on stable facts and structural relationships.
|
||||||
|
- Predicates should be verbs or descriptive relations (e.g., 'is_a', 'works_at', 'collaborates_with').
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
result = self.adapter.generate(
|
||||||
|
model="gemini-3.1-pro-preview",
|
||||||
|
prompt=prompt,
|
||||||
|
system_instruction="You are Timmy's Symbolic Extraction Engine. Extract high-fidelity knowledge triples.",
|
||||||
|
response_mime_type="application/json"
|
||||||
|
)
|
||||||
|
|
||||||
|
triples = json.loads(result["text"])
|
||||||
|
if isinstance(triples, list):
|
||||||
|
count = self.store.add_triples(triples)
|
||||||
|
logger.info(f"Ingested {count} new triples into symbolic memory.")
|
||||||
|
return count
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Symbolic ingestion failed: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def get_context_for(self, topic: str) -> str:
|
||||||
|
"""Performs a 2-hop graph search to find related context for a topic."""
|
||||||
|
# 1. Find direct relations
|
||||||
|
direct = self.store.query(subject=topic) + self.store.query(object=topic)
|
||||||
|
|
||||||
|
# 2. Find 2nd hop
|
||||||
|
related_entities = set()
|
||||||
|
for t in direct:
|
||||||
|
related_entities.add(t['s'])
|
||||||
|
related_entities.add(t['o'])
|
||||||
|
|
||||||
|
extended = []
|
||||||
|
for entity in related_entities:
|
||||||
|
if entity == topic: continue
|
||||||
|
extended.extend(self.store.query(subject=entity))
|
||||||
|
|
||||||
|
all_triples = direct + extended
|
||||||
|
if not all_triples:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
context = "Symbolic Knowledge Graph Context:\n"
|
||||||
|
for t in all_triples:
|
||||||
|
context += f"- {t['s']} --({t['p']})--> {t['o']}\n"
|
||||||
|
return context
|
||||||
27
skills/memory/intersymbolic_graph.py
Normal file
27
skills/memory/intersymbolic_graph.py
Normal file
@@ -0,0 +1,27 @@
|
|||||||
|
"""
|
||||||
|
---
|
||||||
|
title: Intersymbolic Graph Query
|
||||||
|
description: Queries Timmy's sovereign knowledge graph to find connections and structured facts.
|
||||||
|
conditions:
|
||||||
|
- Complex relationship analysis
|
||||||
|
- Fact checking against structured memory
|
||||||
|
- Finding non-obvious connections
|
||||||
|
---
|
||||||
|
"""
|
||||||
|
|
||||||
|
from agent.symbolic_memory import SymbolicMemory
|
||||||
|
|
||||||
|
def query_graph(topic: str) -> str:
|
||||||
|
"""
|
||||||
|
Queries the knowledge graph for a specific topic and returns structured context.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
topic: The entity or topic to search for in the graph.
|
||||||
|
"""
|
||||||
|
memory = SymbolicMemory()
|
||||||
|
context = memory.get_context_for(topic)
|
||||||
|
|
||||||
|
if not context:
|
||||||
|
return f"No symbolic connections found for '{topic}' in the knowledge graph."
|
||||||
|
|
||||||
|
return context
|
||||||
141
tests/agent/test_symbolic_memory.py
Normal file
141
tests/agent/test_symbolic_memory.py
Normal file
@@ -0,0 +1,141 @@
|
|||||||
|
"""Tests for Symbolic Memory / Intersymbolic Layer.
|
||||||
|
|
||||||
|
Generated by Allegro during PR #9 review.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class TestSymbolicMemory:
|
||||||
|
"""Test suite for agent/symbolic_memory.py"""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_adapter(self):
|
||||||
|
"""Mock GeminiAdapter."""
|
||||||
|
with patch('agent.symbolic_memory.GeminiAdapter') as MockAdapter:
|
||||||
|
mock = MagicMock()
|
||||||
|
MockAdapter.return_value = mock
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_store(self):
|
||||||
|
"""Mock GraphStore."""
|
||||||
|
with patch('agent.symbolic_memory.GraphStore') as MockStore:
|
||||||
|
mock = MagicMock()
|
||||||
|
MockStore.return_value = mock
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def memory(self, mock_adapter, mock_store):
|
||||||
|
"""Create SymbolicMemory with mocked deps."""
|
||||||
|
from agent.symbolic_memory import SymbolicMemory
|
||||||
|
return SymbolicMemory()
|
||||||
|
|
||||||
|
def test_ingest_text_success(self, memory, mock_adapter, mock_store):
|
||||||
|
"""Should extract triples and add to graph."""
|
||||||
|
mock_adapter.generate.return_value = {
|
||||||
|
"text": json.dumps([
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "has_goal", "o": "Sovereignty"}
|
||||||
|
])
|
||||||
|
}
|
||||||
|
mock_store.add_triples.return_value = 2
|
||||||
|
|
||||||
|
count = memory.ingest_text("Timmy is an AI with the goal of Sovereignty.")
|
||||||
|
|
||||||
|
assert count == 2
|
||||||
|
mock_store.add_triples.assert_called_once()
|
||||||
|
|
||||||
|
def test_ingest_text_invalid_json(self, memory, mock_adapter, mock_store):
|
||||||
|
"""Should handle malformed JSON gracefully."""
|
||||||
|
mock_adapter.generate.return_value = {
|
||||||
|
"text": "not valid json"
|
||||||
|
}
|
||||||
|
|
||||||
|
count = memory.ingest_text("Some text that confuses the model")
|
||||||
|
|
||||||
|
assert count == 0 # Should fail gracefully
|
||||||
|
mock_store.add_triples.assert_not_called()
|
||||||
|
|
||||||
|
def test_ingest_text_not_list(self, memory, mock_adapter, mock_store):
|
||||||
|
"""Should handle non-list JSON response."""
|
||||||
|
mock_adapter.generate.return_value = {
|
||||||
|
"text": json.dumps({"s": "Timmy", "p": "is_a", "o": "AI"}) # Dict, not list
|
||||||
|
}
|
||||||
|
|
||||||
|
count = memory.ingest_text("Timmy is an AI")
|
||||||
|
|
||||||
|
# Current implementation might fail here - this test documents the gap
|
||||||
|
# Should be handled: check isinstance(triples, list)
|
||||||
|
|
||||||
|
def test_get_context_for_direct_relations(self, memory, mock_store):
|
||||||
|
"""Should find direct 1-hop relations."""
|
||||||
|
mock_store.query.side_effect = lambda subject=None, **kwargs: [
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "works_at", "o": "Foundation"}
|
||||||
|
] if subject == "Timmy" else []
|
||||||
|
|
||||||
|
context = memory.get_context_for("Timmy")
|
||||||
|
|
||||||
|
assert "Timmy" in context
|
||||||
|
assert "is_a" in context
|
||||||
|
assert "AI" in context
|
||||||
|
|
||||||
|
def test_get_context_for_2hop(self, memory, mock_store):
|
||||||
|
"""Should find 2-hop relations."""
|
||||||
|
# First call: direct relations
|
||||||
|
# Second call: extended relations
|
||||||
|
mock_store.query.side_effect = [
|
||||||
|
[{"s": "Timmy", "p": "works_at", "o": "Foundation"}], # Direct
|
||||||
|
[{"s": "Foundation", "p": "founded_by", "o": "Alexander"}] # 2-hop
|
||||||
|
]
|
||||||
|
|
||||||
|
context = memory.get_context_for("Timmy")
|
||||||
|
|
||||||
|
assert "Foundation" in context
|
||||||
|
assert "founded_by" in context
|
||||||
|
|
||||||
|
def test_get_context_for_empty(self, memory, mock_store):
|
||||||
|
"""Should return empty string when no context found."""
|
||||||
|
mock_store.query.return_value = []
|
||||||
|
|
||||||
|
context = memory.get_context_for("UnknownEntity")
|
||||||
|
|
||||||
|
assert context == ""
|
||||||
|
|
||||||
|
|
||||||
|
class TestIntersymbolicGraphSkill:
|
||||||
|
"""Test suite for skills/memory/intersymbolic_graph.py"""
|
||||||
|
|
||||||
|
@patch('skills.memory.intersymbolic_graph.SymbolicMemory')
|
||||||
|
def test_query_graph_with_results(self, MockMemory):
|
||||||
|
"""Skill should return formatted context."""
|
||||||
|
from skills.memory.intersymbolic_graph import query_graph
|
||||||
|
|
||||||
|
mock_instance = MagicMock()
|
||||||
|
mock_instance.get_context_for.return_value = "- Timmy --(is_a)--> AI\n"
|
||||||
|
MockMemory.return_value = mock_instance
|
||||||
|
|
||||||
|
result = query_graph("Timmy")
|
||||||
|
|
||||||
|
assert "Timmy" in result
|
||||||
|
assert "is_a" in result
|
||||||
|
|
||||||
|
@patch('skills.memory.intersymbolic_graph.SymbolicMemory')
|
||||||
|
def test_query_graph_no_results(self, MockMemory):
|
||||||
|
"""Skill should handle empty results gracefully."""
|
||||||
|
from skills.memory.intersymbolic_graph import query_graph
|
||||||
|
|
||||||
|
mock_instance = MagicMock()
|
||||||
|
mock_instance.get_context_for.return_value = ""
|
||||||
|
MockMemory.return_value = mock_instance
|
||||||
|
|
||||||
|
result = query_graph("Unknown")
|
||||||
|
|
||||||
|
assert "No symbolic connections" in result
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
156
tests/tools/test_graph_store.py
Normal file
156
tests/tools/test_graph_store.py
Normal file
@@ -0,0 +1,156 @@
|
|||||||
|
"""Tests for Knowledge Graph Store.
|
||||||
|
|
||||||
|
Generated by Allegro during PR #9 review.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
|
||||||
|
|
||||||
|
class TestGraphStore:
|
||||||
|
"""Test suite for tools/graph_store.py"""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_gitea(self):
|
||||||
|
"""Mock GiteaClient."""
|
||||||
|
with patch('tools.graph_store.GiteaClient') as MockGitea:
|
||||||
|
mock = MagicMock()
|
||||||
|
MockGitea.return_value = mock
|
||||||
|
yield mock
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def store(self, mock_gitea):
|
||||||
|
"""Create GraphStore with mocked Gitea."""
|
||||||
|
from tools.graph_store import GraphStore
|
||||||
|
return GraphStore()
|
||||||
|
|
||||||
|
def test_load_empty_graph(self, store, mock_gitea):
|
||||||
|
"""Should return empty graph when file doesn't exist."""
|
||||||
|
mock_gitea.get_file.side_effect = Exception("404")
|
||||||
|
|
||||||
|
graph = store._load_graph()
|
||||||
|
|
||||||
|
assert graph == {"triples": [], "entities": {}}
|
||||||
|
|
||||||
|
def test_add_triples_new(self, store, mock_gitea):
|
||||||
|
"""Should add new triples."""
|
||||||
|
mock_gitea.get_file.side_effect = Exception("404") # New file
|
||||||
|
|
||||||
|
triples = [
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "works_at", "o": "Foundation"}
|
||||||
|
]
|
||||||
|
|
||||||
|
count = store.add_triples(triples)
|
||||||
|
|
||||||
|
assert count == 2
|
||||||
|
mock_gitea.create_file.assert_called_once()
|
||||||
|
|
||||||
|
def test_add_triples_deduplication(self, store, mock_gitea):
|
||||||
|
"""Should not add duplicate triples."""
|
||||||
|
existing = {
|
||||||
|
"triples": [{"s": "Timmy", "p": "is_a", "o": "AI"}],
|
||||||
|
"entities": {}
|
||||||
|
}
|
||||||
|
mock_gitea.get_file.return_value = {
|
||||||
|
"content": base64.b64encode(json.dumps(existing).encode()).decode()
|
||||||
|
}
|
||||||
|
|
||||||
|
# Try to add same triple again
|
||||||
|
count = store.add_triples([{"s": "Timmy", "p": "is_a", "o": "AI"}])
|
||||||
|
|
||||||
|
assert count == 0 # No new triples added
|
||||||
|
|
||||||
|
def test_query_by_subject(self, store, mock_gitea):
|
||||||
|
"""Should filter by subject."""
|
||||||
|
existing = {
|
||||||
|
"triples": [
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Allegro", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "works_at", "o": "Foundation"}
|
||||||
|
],
|
||||||
|
"entities": {}
|
||||||
|
}
|
||||||
|
mock_gitea.get_file.return_value = {
|
||||||
|
"content": base64.b64encode(json.dumps(existing).encode()).decode()
|
||||||
|
}
|
||||||
|
|
||||||
|
results = store.query(subject="Timmy")
|
||||||
|
|
||||||
|
assert len(results) == 2
|
||||||
|
assert all(r["s"] == "Timmy" for r in results)
|
||||||
|
|
||||||
|
def test_query_by_predicate(self, store, mock_gitea):
|
||||||
|
"""Should filter by predicate."""
|
||||||
|
existing = {
|
||||||
|
"triples": [
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Allegro", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "works_at", "o": "Foundation"}
|
||||||
|
],
|
||||||
|
"entities": {}
|
||||||
|
}
|
||||||
|
mock_gitea.get_file.return_value = {
|
||||||
|
"content": base64.b64encode(json.dumps(existing).encode()).decode()
|
||||||
|
}
|
||||||
|
|
||||||
|
results = store.query(predicate="is_a")
|
||||||
|
|
||||||
|
assert len(results) == 2
|
||||||
|
assert all(r["p"] == "is_a" for r in results)
|
||||||
|
|
||||||
|
def test_query_by_object(self, store, mock_gitea):
|
||||||
|
"""Should filter by object."""
|
||||||
|
existing = {
|
||||||
|
"triples": [
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Allegro", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "works_at", "o": "Foundation"}
|
||||||
|
],
|
||||||
|
"entities": {}
|
||||||
|
}
|
||||||
|
mock_gitea.get_file.return_value = {
|
||||||
|
"content": base64.b64encode(json.dumps(existing).encode()).decode()
|
||||||
|
}
|
||||||
|
|
||||||
|
results = store.query(object="AI")
|
||||||
|
|
||||||
|
assert len(results) == 2
|
||||||
|
assert all(r["o"] == "AI" for r in results)
|
||||||
|
|
||||||
|
def test_query_combined_filters(self, store, mock_gitea):
|
||||||
|
"""Should support combined filters."""
|
||||||
|
existing = {
|
||||||
|
"triples": [
|
||||||
|
{"s": "Timmy", "p": "is_a", "o": "AI"},
|
||||||
|
{"s": "Timmy", "p": "works_at", "o": "Foundation"}
|
||||||
|
],
|
||||||
|
"entities": {}
|
||||||
|
}
|
||||||
|
mock_gitea.get_file.return_value = {
|
||||||
|
"content": base64.b64encode(json.dumps(existing).encode()).decode()
|
||||||
|
}
|
||||||
|
|
||||||
|
results = store.query(subject="Timmy", predicate="is_a")
|
||||||
|
|
||||||
|
assert len(results) == 1
|
||||||
|
assert results[0]["o"] == "AI"
|
||||||
|
|
||||||
|
|
||||||
|
class TestGraphStoreRaceCondition:
|
||||||
|
"""Document race condition behavior."""
|
||||||
|
|
||||||
|
def test_concurrent_writes_risk(self):
|
||||||
|
"""Document that concurrent writes may lose triples.
|
||||||
|
|
||||||
|
This is a known limitation of the read-modify-write pattern.
|
||||||
|
For MVP, this is acceptable. Future: implement file locking or
|
||||||
|
use atomic Gitea operations.
|
||||||
|
"""
|
||||||
|
pass # Documentation test
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
64
tools/graph_store.py
Normal file
64
tools/graph_store.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
"""Sovereign Knowledge Graph Store for Hermes Agent.
|
||||||
|
|
||||||
|
Provides a simple triple-store (Subject, Predicate, Object) persisted
|
||||||
|
to Timmy's sovereign Gitea instance.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
import logging
|
||||||
|
from typing import List, Dict, Any, Optional
|
||||||
|
from tools.gitea_client import GiteaClient
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class GraphStore:
|
||||||
|
def __init__(self, repo: str = "Timmy_Foundation/timmy-config", path: str = "memories/knowledge_graph.json"):
|
||||||
|
self.repo = repo
|
||||||
|
self.path = path
|
||||||
|
self.gitea = GiteaClient()
|
||||||
|
|
||||||
|
def _load_graph(self) -> Dict[str, Any]:
|
||||||
|
try:
|
||||||
|
content = self.gitea.get_file(self.repo, self.path)
|
||||||
|
raw = base64.b64decode(content["content"]).decode()
|
||||||
|
return json.loads(raw)
|
||||||
|
except Exception:
|
||||||
|
return {"triples": [], "entities": {}}
|
||||||
|
|
||||||
|
def _save_graph(self, graph: Dict[str, Any], message: str):
|
||||||
|
sha = None
|
||||||
|
try:
|
||||||
|
existing = self.gitea.get_file(self.repo, self.path)
|
||||||
|
sha = existing.get("sha")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
content_b64 = base64.b64encode(json.dumps(graph, indent=2).encode()).decode()
|
||||||
|
if sha:
|
||||||
|
self.gitea.update_file(self.repo, self.path, content_b64, message, sha)
|
||||||
|
else:
|
||||||
|
self.gitea.create_file(self.repo, self.path, content_b64, message)
|
||||||
|
|
||||||
|
def add_triples(self, triples: List[Dict[str, str]]):
|
||||||
|
"""Adds a list of triples: [{'s': '...', 'p': '...', 'o': '...'}]"""
|
||||||
|
graph = self._load_graph()
|
||||||
|
added_count = 0
|
||||||
|
for t in triples:
|
||||||
|
if t not in graph["triples"]:
|
||||||
|
graph["triples"].append(t)
|
||||||
|
added_count += 1
|
||||||
|
|
||||||
|
if added_count > 0:
|
||||||
|
self._save_graph(graph, f"Add {added_count} triples to knowledge graph")
|
||||||
|
return added_count
|
||||||
|
|
||||||
|
def query(self, subject: Optional[str] = None, predicate: Optional[str] = None, object: Optional[str] = None) -> List[Dict[str, str]]:
|
||||||
|
graph = self._load_graph()
|
||||||
|
results = []
|
||||||
|
for t in graph["triples"]:
|
||||||
|
if subject and t['s'] != subject: continue
|
||||||
|
if predicate and t['p'] != predicate: continue
|
||||||
|
if object and t['o'] != object: continue
|
||||||
|
results.append(t)
|
||||||
|
return results
|
||||||
Reference in New Issue
Block a user