diff --git a/agent/knowledge_ingester.py b/agent/knowledge_ingester.py index 786b2a0f..da24c435 100644 --- a/agent/knowledge_ingester.py +++ b/agent/knowledge_ingester.py @@ -1,13 +1,14 @@ """Sovereign Knowledge Ingester for Hermes Agent. Uses Gemini 3.1 Pro to learn from Google Search in real-time and -persists the knowledge to Timmy's sovereign memory. +persists the knowledge to Timmy's sovereign memory (both Markdown and Symbolic). """ import logging import base64 from typing import Any, Dict, List, Optional from agent.gemini_adapter import GeminiAdapter +from agent.symbolic_memory import SymbolicMemory from tools.gitea_client import GiteaClient logger = logging.getLogger(__name__) @@ -16,6 +17,7 @@ class KnowledgeIngester: def __init__(self): self.adapter = GeminiAdapter() self.gitea = GiteaClient() + self.symbolic = SymbolicMemory() def learn_about(self, topic: str) -> str: """Searches Google, analyzes the results, and saves the knowledge.""" @@ -43,12 +45,14 @@ Include: knowledge_fragment = result["text"] - # 2. Persist to Timmy's Memory + # 2. Extract Symbolic Triples + self.symbolic.ingest_text(knowledge_fragment) + + # 3. Persist to Timmy's Memory (Markdown) repo = "Timmy_Foundation/timmy-config" filename = f"memories/realtime_learning/{topic.lower().replace(' ', '_')}.md" try: - # Check if file exists to get SHA sha = None try: existing = self.gitea.get_file(repo, filename) @@ -63,7 +67,7 @@ Include: else: self.gitea.create_file(repo, filename, content_b64, f"Initial knowledge on {topic}") - return f"Successfully learned about {topic} and updated Timmy's memory at {filename}" + return f"Successfully learned about {topic}. Updated Timmy's Markdown memory and Symbolic Knowledge Graph." except Exception as e: logger.error(f"Failed to persist knowledge: {e}") - return f"Learned about {topic}, but failed to save to memory: {e}\n\n{knowledge_fragment}" + return f"Learned about {topic}, but failed to save to Markdown memory: {e}\n\n{knowledge_fragment}" diff --git a/agent/symbolic_memory.py b/agent/symbolic_memory.py new file mode 100644 index 00000000..ef3cf7e0 --- /dev/null +++ b/agent/symbolic_memory.py @@ -0,0 +1,74 @@ +"""Sovereign Intersymbolic Memory Layer. + +Bridges Neural (LLM) and Symbolic (Graph) reasoning by extracting +structured triples from unstructured text and performing graph lookups. +""" + +import logging +import json +from typing import List, Dict, Any +from agent.gemini_adapter import GeminiAdapter +from tools.graph_store import GraphStore + +logger = logging.getLogger(__name__) + +class SymbolicMemory: + def __init__(self): + self.adapter = GeminiAdapter() + self.store = GraphStore() + + def ingest_text(self, text: str): + """Extracts triples from text and adds them to the graph.""" + prompt = f""" +Extract all meaningful entities and their relationships from the following text. +Format the output as a JSON list of triples: [{{"s": "subject", "p": "predicate", "o": "object"}}] + +Text: +{text} + +Guidelines: +- Use clear, concise labels for entities and predicates. +- Focus on stable facts and structural relationships. +- Predicates should be verbs or descriptive relations (e.g., 'is_a', 'works_at', 'collaborates_with'). +""" + try: + result = self.adapter.generate( + model="gemini-3.1-pro-preview", + prompt=prompt, + system_instruction="You are Timmy's Symbolic Extraction Engine. Extract high-fidelity knowledge triples.", + response_mime_type="application/json" + ) + + triples = json.loads(result["text"]) + if isinstance(triples, list): + count = self.store.add_triples(triples) + logger.info(f"Ingested {count} new triples into symbolic memory.") + return count + except Exception as e: + logger.error(f"Symbolic ingestion failed: {e}") + return 0 + + def get_context_for(self, topic: str) -> str: + """Performs a 2-hop graph search to find related context for a topic.""" + # 1. Find direct relations + direct = self.store.query(subject=topic) + self.store.query(object=topic) + + # 2. Find 2nd hop + related_entities = set() + for t in direct: + related_entities.add(t['s']) + related_entities.add(t['o']) + + extended = [] + for entity in related_entities: + if entity == topic: continue + extended.extend(self.store.query(subject=entity)) + + all_triples = direct + extended + if not all_triples: + return "" + + context = "Symbolic Knowledge Graph Context:\n" + for t in all_triples: + context += f"- {t['s']} --({t['p']})--> {t['o']}\n" + return context diff --git a/skills/memory/intersymbolic_graph.py b/skills/memory/intersymbolic_graph.py new file mode 100644 index 00000000..baad6c63 --- /dev/null +++ b/skills/memory/intersymbolic_graph.py @@ -0,0 +1,27 @@ +""" +--- +title: Intersymbolic Graph Query +description: Queries Timmy's sovereign knowledge graph to find connections and structured facts. +conditions: + - Complex relationship analysis + - Fact checking against structured memory + - Finding non-obvious connections +--- +""" + +from agent.symbolic_memory import SymbolicMemory + +def query_graph(topic: str) -> str: + """ + Queries the knowledge graph for a specific topic and returns structured context. + + Args: + topic: The entity or topic to search for in the graph. + """ + memory = SymbolicMemory() + context = memory.get_context_for(topic) + + if not context: + return f"No symbolic connections found for '{topic}' in the knowledge graph." + + return context diff --git a/tests/agent/test_symbolic_memory.py b/tests/agent/test_symbolic_memory.py new file mode 100644 index 00000000..84258634 --- /dev/null +++ b/tests/agent/test_symbolic_memory.py @@ -0,0 +1,141 @@ +"""Tests for Symbolic Memory / Intersymbolic Layer. + +Generated by Allegro during PR #9 review. +""" + +import pytest +from unittest.mock import MagicMock, patch +import json + + +class TestSymbolicMemory: + """Test suite for agent/symbolic_memory.py""" + + @pytest.fixture + def mock_adapter(self): + """Mock GeminiAdapter.""" + with patch('agent.symbolic_memory.GeminiAdapter') as MockAdapter: + mock = MagicMock() + MockAdapter.return_value = mock + yield mock + + @pytest.fixture + def mock_store(self): + """Mock GraphStore.""" + with patch('agent.symbolic_memory.GraphStore') as MockStore: + mock = MagicMock() + MockStore.return_value = mock + yield mock + + @pytest.fixture + def memory(self, mock_adapter, mock_store): + """Create SymbolicMemory with mocked deps.""" + from agent.symbolic_memory import SymbolicMemory + return SymbolicMemory() + + def test_ingest_text_success(self, memory, mock_adapter, mock_store): + """Should extract triples and add to graph.""" + mock_adapter.generate.return_value = { + "text": json.dumps([ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "has_goal", "o": "Sovereignty"} + ]) + } + mock_store.add_triples.return_value = 2 + + count = memory.ingest_text("Timmy is an AI with the goal of Sovereignty.") + + assert count == 2 + mock_store.add_triples.assert_called_once() + + def test_ingest_text_invalid_json(self, memory, mock_adapter, mock_store): + """Should handle malformed JSON gracefully.""" + mock_adapter.generate.return_value = { + "text": "not valid json" + } + + count = memory.ingest_text("Some text that confuses the model") + + assert count == 0 # Should fail gracefully + mock_store.add_triples.assert_not_called() + + def test_ingest_text_not_list(self, memory, mock_adapter, mock_store): + """Should handle non-list JSON response.""" + mock_adapter.generate.return_value = { + "text": json.dumps({"s": "Timmy", "p": "is_a", "o": "AI"}) # Dict, not list + } + + count = memory.ingest_text("Timmy is an AI") + + # Current implementation might fail here - this test documents the gap + # Should be handled: check isinstance(triples, list) + + def test_get_context_for_direct_relations(self, memory, mock_store): + """Should find direct 1-hop relations.""" + mock_store.query.side_effect = lambda subject=None, **kwargs: [ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "works_at", "o": "Foundation"} + ] if subject == "Timmy" else [] + + context = memory.get_context_for("Timmy") + + assert "Timmy" in context + assert "is_a" in context + assert "AI" in context + + def test_get_context_for_2hop(self, memory, mock_store): + """Should find 2-hop relations.""" + # First call: direct relations + # Second call: extended relations + mock_store.query.side_effect = [ + [{"s": "Timmy", "p": "works_at", "o": "Foundation"}], # Direct + [{"s": "Foundation", "p": "founded_by", "o": "Alexander"}] # 2-hop + ] + + context = memory.get_context_for("Timmy") + + assert "Foundation" in context + assert "founded_by" in context + + def test_get_context_for_empty(self, memory, mock_store): + """Should return empty string when no context found.""" + mock_store.query.return_value = [] + + context = memory.get_context_for("UnknownEntity") + + assert context == "" + + +class TestIntersymbolicGraphSkill: + """Test suite for skills/memory/intersymbolic_graph.py""" + + @patch('skills.memory.intersymbolic_graph.SymbolicMemory') + def test_query_graph_with_results(self, MockMemory): + """Skill should return formatted context.""" + from skills.memory.intersymbolic_graph import query_graph + + mock_instance = MagicMock() + mock_instance.get_context_for.return_value = "- Timmy --(is_a)--> AI\n" + MockMemory.return_value = mock_instance + + result = query_graph("Timmy") + + assert "Timmy" in result + assert "is_a" in result + + @patch('skills.memory.intersymbolic_graph.SymbolicMemory') + def test_query_graph_no_results(self, MockMemory): + """Skill should handle empty results gracefully.""" + from skills.memory.intersymbolic_graph import query_graph + + mock_instance = MagicMock() + mock_instance.get_context_for.return_value = "" + MockMemory.return_value = mock_instance + + result = query_graph("Unknown") + + assert "No symbolic connections" in result + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/tools/test_graph_store.py b/tests/tools/test_graph_store.py new file mode 100644 index 00000000..813f200e --- /dev/null +++ b/tests/tools/test_graph_store.py @@ -0,0 +1,156 @@ +"""Tests for Knowledge Graph Store. + +Generated by Allegro during PR #9 review. +""" + +import pytest +from unittest.mock import MagicMock, patch +import json +import base64 + + +class TestGraphStore: + """Test suite for tools/graph_store.py""" + + @pytest.fixture + def mock_gitea(self): + """Mock GiteaClient.""" + with patch('tools.graph_store.GiteaClient') as MockGitea: + mock = MagicMock() + MockGitea.return_value = mock + yield mock + + @pytest.fixture + def store(self, mock_gitea): + """Create GraphStore with mocked Gitea.""" + from tools.graph_store import GraphStore + return GraphStore() + + def test_load_empty_graph(self, store, mock_gitea): + """Should return empty graph when file doesn't exist.""" + mock_gitea.get_file.side_effect = Exception("404") + + graph = store._load_graph() + + assert graph == {"triples": [], "entities": {}} + + def test_add_triples_new(self, store, mock_gitea): + """Should add new triples.""" + mock_gitea.get_file.side_effect = Exception("404") # New file + + triples = [ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "works_at", "o": "Foundation"} + ] + + count = store.add_triples(triples) + + assert count == 2 + mock_gitea.create_file.assert_called_once() + + def test_add_triples_deduplication(self, store, mock_gitea): + """Should not add duplicate triples.""" + existing = { + "triples": [{"s": "Timmy", "p": "is_a", "o": "AI"}], + "entities": {} + } + mock_gitea.get_file.return_value = { + "content": base64.b64encode(json.dumps(existing).encode()).decode() + } + + # Try to add same triple again + count = store.add_triples([{"s": "Timmy", "p": "is_a", "o": "AI"}]) + + assert count == 0 # No new triples added + + def test_query_by_subject(self, store, mock_gitea): + """Should filter by subject.""" + existing = { + "triples": [ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Allegro", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "works_at", "o": "Foundation"} + ], + "entities": {} + } + mock_gitea.get_file.return_value = { + "content": base64.b64encode(json.dumps(existing).encode()).decode() + } + + results = store.query(subject="Timmy") + + assert len(results) == 2 + assert all(r["s"] == "Timmy" for r in results) + + def test_query_by_predicate(self, store, mock_gitea): + """Should filter by predicate.""" + existing = { + "triples": [ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Allegro", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "works_at", "o": "Foundation"} + ], + "entities": {} + } + mock_gitea.get_file.return_value = { + "content": base64.b64encode(json.dumps(existing).encode()).decode() + } + + results = store.query(predicate="is_a") + + assert len(results) == 2 + assert all(r["p"] == "is_a" for r in results) + + def test_query_by_object(self, store, mock_gitea): + """Should filter by object.""" + existing = { + "triples": [ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Allegro", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "works_at", "o": "Foundation"} + ], + "entities": {} + } + mock_gitea.get_file.return_value = { + "content": base64.b64encode(json.dumps(existing).encode()).decode() + } + + results = store.query(object="AI") + + assert len(results) == 2 + assert all(r["o"] == "AI" for r in results) + + def test_query_combined_filters(self, store, mock_gitea): + """Should support combined filters.""" + existing = { + "triples": [ + {"s": "Timmy", "p": "is_a", "o": "AI"}, + {"s": "Timmy", "p": "works_at", "o": "Foundation"} + ], + "entities": {} + } + mock_gitea.get_file.return_value = { + "content": base64.b64encode(json.dumps(existing).encode()).decode() + } + + results = store.query(subject="Timmy", predicate="is_a") + + assert len(results) == 1 + assert results[0]["o"] == "AI" + + +class TestGraphStoreRaceCondition: + """Document race condition behavior.""" + + def test_concurrent_writes_risk(self): + """Document that concurrent writes may lose triples. + + This is a known limitation of the read-modify-write pattern. + For MVP, this is acceptable. Future: implement file locking or + use atomic Gitea operations. + """ + pass # Documentation test + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tools/graph_store.py b/tools/graph_store.py new file mode 100644 index 00000000..e89a3e52 --- /dev/null +++ b/tools/graph_store.py @@ -0,0 +1,64 @@ +"""Sovereign Knowledge Graph Store for Hermes Agent. + +Provides a simple triple-store (Subject, Predicate, Object) persisted +to Timmy's sovereign Gitea instance. +""" + +import json +import base64 +import logging +from typing import List, Dict, Any, Optional +from tools.gitea_client import GiteaClient + +logger = logging.getLogger(__name__) + +class GraphStore: + def __init__(self, repo: str = "Timmy_Foundation/timmy-config", path: str = "memories/knowledge_graph.json"): + self.repo = repo + self.path = path + self.gitea = GiteaClient() + + def _load_graph(self) -> Dict[str, Any]: + try: + content = self.gitea.get_file(self.repo, self.path) + raw = base64.b64decode(content["content"]).decode() + return json.loads(raw) + except Exception: + return {"triples": [], "entities": {}} + + def _save_graph(self, graph: Dict[str, Any], message: str): + sha = None + try: + existing = self.gitea.get_file(self.repo, self.path) + sha = existing.get("sha") + except: + pass + + content_b64 = base64.b64encode(json.dumps(graph, indent=2).encode()).decode() + if sha: + self.gitea.update_file(self.repo, self.path, content_b64, message, sha) + else: + self.gitea.create_file(self.repo, self.path, content_b64, message) + + def add_triples(self, triples: List[Dict[str, str]]): + """Adds a list of triples: [{'s': '...', 'p': '...', 'o': '...'}]""" + graph = self._load_graph() + added_count = 0 + for t in triples: + if t not in graph["triples"]: + graph["triples"].append(t) + added_count += 1 + + if added_count > 0: + self._save_graph(graph, f"Add {added_count} triples to knowledge graph") + return added_count + + def query(self, subject: Optional[str] = None, predicate: Optional[str] = None, object: Optional[str] = None) -> List[Dict[str, str]]: + graph = self._load_graph() + results = [] + for t in graph["triples"]: + if subject and t['s'] != subject: continue + if predicate and t['p'] != predicate: continue + if object and t['o'] != object: continue + results.append(t) + return results