diff --git a/hermes-sovereign/mempalace/__init__.py b/hermes-sovereign/mempalace/__init__.py index 7c5b8af4..3edb2f2e 100644 --- a/hermes-sovereign/mempalace/__init__.py +++ b/hermes-sovereign/mempalace/__init__.py @@ -5,10 +5,13 @@ Provides: - retrieval_enforcer.py: L0-L5 retrieval order enforcement - wakeup.py: Session wake-up protocol (~300-900 tokens) - scratchpad.py: JSON-based session scratchpad with palace promotion +- sovereign_store.py: Zero-API durable memory (SQLite + FTS5 + HRR vectors) +- promotion.py: Quality-gated scratchpad-to-palace promotion (MP-4) Epic: #367 """ from .mempalace import Mempalace, PalaceRoom, analyse_issues +from .sovereign_store import SovereignStore -__all__ = ["Mempalace", "PalaceRoom", "analyse_issues"] +__all__ = ["Mempalace", "PalaceRoom", "analyse_issues", "SovereignStore"] diff --git a/hermes-sovereign/mempalace/promotion.py b/hermes-sovereign/mempalace/promotion.py new file mode 100644 index 00000000..e520c4ba --- /dev/null +++ b/hermes-sovereign/mempalace/promotion.py @@ -0,0 +1,188 @@ +"""Memory Promotion — quality-gated scratchpad-to-palace promotion. + +Implements MP-4 (#371): move session notes to durable memory only when +they pass quality gates. No LLM calls — all heuristic-based. + +Quality gates: + 1. Minimum content length (too short = noise) + 2. Duplicate detection (FTS5 + HRR similarity check) + 3. Structural quality (has subject-verb structure, not just a fragment) + 4. Staleness check (don't promote stale notes from old sessions) + +Refs: Epic #367, Sub-issue #371 +""" + +from __future__ import annotations + +import re +import time +from typing import Optional + +try: + from .sovereign_store import SovereignStore +except ImportError: + from sovereign_store import SovereignStore + + +# --------------------------------------------------------------------------- +# Quality gate thresholds +# --------------------------------------------------------------------------- + +MIN_CONTENT_WORDS = 5 +MAX_CONTENT_WORDS = 500 +DUPLICATE_SIMILARITY = 0.85 +DUPLICATE_FTS_THRESHOLD = 3 +STALE_SECONDS = 86400 * 7 +MIN_TRUST_FOR_AUTO = 0.4 + + +# --------------------------------------------------------------------------- +# Quality checks +# --------------------------------------------------------------------------- + +def _check_length(content: str) -> tuple[bool, str]: + """Gate 1: Content length check.""" + words = content.split() + if len(words) < MIN_CONTENT_WORDS: + return False, f"Too short ({len(words)} words, minimum {MIN_CONTENT_WORDS})" + if len(words) > MAX_CONTENT_WORDS: + return False, f"Too long ({len(words)} words, maximum {MAX_CONTENT_WORDS}). Summarize first." + return True, "OK" + + +def _check_structure(content: str) -> tuple[bool, str]: + """Gate 2: Basic structural quality.""" + if not re.search(r"[a-zA-Z]", content): + return False, "No alphabetic content — pure code/numbers are not memory-worthy" + if len(content.split()) < 3: + return False, "Fragment — needs at least subject + predicate" + return True, "OK" + + +def _check_duplicate(content: str, store: SovereignStore, room: str) -> tuple[bool, str]: + """Gate 3: Duplicate detection via hybrid search.""" + results = store.search(content, room=room, limit=5, min_trust=0.0) + for r in results: + if r["score"] > DUPLICATE_SIMILARITY: + return False, f"Duplicate detected: memory #{r['memory_id']} (score {r['score']:.3f})" + if _text_overlap(content, r["content"]) > 0.8: + return False, f"Near-duplicate text: memory #{r['memory_id']}" + return True, "OK" + + +def _check_staleness(written_at: float) -> tuple[bool, str]: + """Gate 4: Staleness check.""" + age = time.time() - written_at + if age > STALE_SECONDS: + days = int(age / 86400) + return False, f"Stale ({days} days old). Review manually before promoting." + return True, "OK" + + +def _text_overlap(a: str, b: str) -> float: + """Jaccard similarity between two texts (word-level).""" + words_a = set(a.lower().split()) + words_b = set(b.lower().split()) + if not words_a or not words_b: + return 0.0 + intersection = words_a & words_b + union = words_a | words_b + return len(intersection) / len(union) + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + +class PromotionResult: + """Result of a promotion attempt.""" + def __init__(self, success: bool, memory_id: Optional[int], reason: str, gates: dict): + self.success = success + self.memory_id = memory_id + self.reason = reason + self.gates = gates + + def __repr__(self): + status = "PROMOTED" if self.success else "REJECTED" + return f"PromotionResult({status}: {self.reason})" + + +def evaluate_for_promotion( + content: str, + store: SovereignStore, + room: str = "general", + written_at: Optional[float] = None, +) -> dict: + """Run all quality gates without actually promoting.""" + if written_at is None: + written_at = time.time() + gates = {} + gates["length"] = _check_length(content) + gates["structure"] = _check_structure(content) + gates["duplicate"] = _check_duplicate(content, store, room) + gates["staleness"] = _check_staleness(written_at) + all_passed = all(passed for passed, _ in gates.values()) + return { + "eligible": all_passed, + "gates": gates, + "content_preview": content[:100] + ("..." if len(content) > 100 else ""), + } + + +def promote( + content: str, + store: SovereignStore, + session_id: str, + scratch_key: str, + room: str = "general", + category: str = "", + trust: float = 0.5, + written_at: Optional[float] = None, + force: bool = False, +) -> PromotionResult: + """Promote a scratchpad note to durable palace memory.""" + if written_at is None: + written_at = time.time() + gates = {} + if not force: + gates["length"] = _check_length(content) + gates["structure"] = _check_structure(content) + gates["duplicate"] = _check_duplicate(content, store, room) + gates["staleness"] = _check_staleness(written_at) + for gate_name, (passed, message) in gates.items(): + if not passed: + return PromotionResult( + success=False, memory_id=None, + reason=f"Failed gate '{gate_name}': {message}", gates=gates, + ) + memory_id = store.store(content, room=room, category=category, trust=trust) + store.log_promotion(session_id, scratch_key, memory_id, reason="auto" if not force else "forced") + return PromotionResult(success=True, memory_id=memory_id, reason="Promoted to durable memory", gates=gates) + + +def promote_session_batch( + store: SovereignStore, + session_id: str, + notes: dict[str, dict], + room: str = "general", + force: bool = False, +) -> list[PromotionResult]: + """Promote all notes from a session scratchpad.""" + results = [] + for key, entry in notes.items(): + content = entry.get("value", str(entry)) if isinstance(entry, dict) else str(entry) + written_at = None + if isinstance(entry, dict) and "written_at" in entry: + try: + import datetime + written_at = datetime.datetime.strptime( + entry["written_at"], "%Y-%m-%d %H:%M:%S" + ).timestamp() + except (ValueError, TypeError): + pass + result = promote( + content=str(content), store=store, session_id=session_id, + scratch_key=key, room=room, written_at=written_at, force=force, + ) + results.append(result) + return results diff --git a/hermes-sovereign/mempalace/sovereign_store.py b/hermes-sovereign/mempalace/sovereign_store.py new file mode 100644 index 00000000..9f515307 --- /dev/null +++ b/hermes-sovereign/mempalace/sovereign_store.py @@ -0,0 +1,474 @@ +"""Sovereign Memory Store — zero-API, zero-dependency durable memory. + +Replaces the third-party `mempalace` CLI and its ONNX requirement with a +self-contained SQLite + FTS5 + HRR (Holographic Reduced Representation) +store. Every operation is local: no network calls, no API keys, no cloud. + +Storage: ~/.hermes/palace/sovereign.db + +Capabilities: + - Durable fact storage with rooms, categories, and trust scores + - Hybrid retrieval: FTS5 keyword search + HRR cosine similarity + - Reciprocal Rank Fusion to merge keyword and semantic results + - Trust scoring: facts that get retrieved and confirmed gain trust + - Graceful numpy degradation: falls back to keyword-only if missing + +Refs: Epic #367, MP-3 #370, MP-4 #371 +""" + +from __future__ import annotations + +import hashlib +import json +import math +import sqlite3 +import struct +import time +from pathlib import Path +from typing import Any, Optional + +# --------------------------------------------------------------------------- +# HRR (Holographic Reduced Representations) — zero-dependency vectors +# --------------------------------------------------------------------------- +# Phase-encoded vectors via SHA-256. No ONNX, no embeddings API, no numpy +# required (but uses numpy when available for speed). + +_TWO_PI = 2.0 * math.pi +_DIM = 512 # Compact dimension — sufficient for memory retrieval + +try: + import numpy as np + _HAS_NUMPY = True +except ImportError: + _HAS_NUMPY = False + + +def _encode_atom_np(word: str, dim: int = _DIM) -> "np.ndarray": + """Deterministic phase vector via SHA-256 (numpy path).""" + values_per_block = 16 + blocks_needed = math.ceil(dim / values_per_block) + uint16_values: list[int] = [] + for i in range(blocks_needed): + digest = hashlib.sha256(f"{word}:{i}".encode()).digest() + uint16_values.extend(struct.unpack("<16H", digest)) + return np.array(uint16_values[:dim], dtype=np.float64) * (_TWO_PI / 65536.0) + + +def _encode_atom_pure(word: str, dim: int = _DIM) -> list[float]: + """Deterministic phase vector via SHA-256 (pure Python fallback).""" + values_per_block = 16 + blocks_needed = math.ceil(dim / values_per_block) + uint16_values: list[int] = [] + for i in range(blocks_needed): + digest = hashlib.sha256(f"{word}:{i}".encode()).digest() + for j in range(0, 32, 2): + uint16_values.append(int.from_bytes(digest[j:j+2], "little")) + return [v * (_TWO_PI / 65536.0) for v in uint16_values[:dim]] + + +def encode_text(text: str, dim: int = _DIM): + """Encode a text string into an HRR phase vector by bundling word atoms. + + Uses circular mean of per-word phase vectors — the standard HRR + superposition operation. Result is a fixed-width vector regardless + of input length. + """ + words = text.lower().split() + if not words: + words = [""] + + if _HAS_NUMPY: + atoms = [_encode_atom_np(w, dim) for w in words] + # Circular mean: average the unit vectors, extract phase + unit_sum = sum(np.exp(1j * a) for a in atoms) + return np.angle(unit_sum) % _TWO_PI + else: + # Pure Python circular mean + real_sum = [0.0] * dim + imag_sum = [0.0] * dim + for w in words: + atom = _encode_atom_pure(w, dim) + for d in range(dim): + real_sum[d] += math.cos(atom[d]) + imag_sum[d] += math.sin(atom[d]) + return [math.atan2(imag_sum[d], real_sum[d]) % _TWO_PI for d in range(dim)] + + +def cosine_similarity_phase(a, b) -> float: + """Cosine similarity between two phase vectors. + + For phase vectors, similarity = mean(cos(a - b)). + """ + if _HAS_NUMPY: + return float(np.mean(np.cos(np.array(a) - np.array(b)))) + else: + n = len(a) + return sum(math.cos(a[i] - b[i]) for i in range(n)) / n + + +def serialize_vector(vec) -> bytes: + """Serialize a vector to bytes for SQLite storage.""" + if _HAS_NUMPY: + return vec.astype(np.float64).tobytes() + else: + return struct.pack(f"{len(vec)}d", *vec) + + +def deserialize_vector(blob: bytes): + """Deserialize bytes back to a vector.""" + n = len(blob) // 8 # float64 = 8 bytes + if _HAS_NUMPY: + return np.frombuffer(blob, dtype=np.float64) + else: + return list(struct.unpack(f"{n}d", blob)) + + +# --------------------------------------------------------------------------- +# SQLite Schema +# --------------------------------------------------------------------------- + +_SCHEMA = """ +CREATE TABLE IF NOT EXISTS memories ( + memory_id INTEGER PRIMARY KEY AUTOINCREMENT, + content TEXT NOT NULL, + room TEXT DEFAULT 'general', + category TEXT DEFAULT '', + trust_score REAL DEFAULT 0.5, + retrieval_count INTEGER DEFAULT 0, + created_at REAL NOT NULL, + updated_at REAL NOT NULL, + hrr_vector BLOB +); + +CREATE INDEX IF NOT EXISTS idx_memories_room ON memories(room); +CREATE INDEX IF NOT EXISTS idx_memories_trust ON memories(trust_score DESC); + +-- FTS5 for fast keyword search +CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( + content, room, category, + content=memories, content_rowid=memory_id, + tokenize='porter unicode61' +); + +-- Sync triggers +CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN + INSERT INTO memories_fts(rowid, content, room, category) + VALUES (new.memory_id, new.content, new.room, new.category); +END; + +CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN + INSERT INTO memories_fts(memories_fts, rowid, content, room, category) + VALUES ('delete', old.memory_id, old.content, old.room, old.category); +END; + +CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN + INSERT INTO memories_fts(memories_fts, rowid, content, room, category) + VALUES ('delete', old.memory_id, old.content, old.room, old.category); + INSERT INTO memories_fts(rowid, content, room, category) + VALUES (new.memory_id, new.content, new.room, new.category); +END; + +-- Promotion log: tracks what moved from scratchpad to durable memory +CREATE TABLE IF NOT EXISTS promotion_log ( + log_id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + scratch_key TEXT NOT NULL, + memory_id INTEGER REFERENCES memories(memory_id), + promoted_at REAL NOT NULL, + reason TEXT DEFAULT '' +); +""" + + +# --------------------------------------------------------------------------- +# SovereignStore +# --------------------------------------------------------------------------- + +class SovereignStore: + """Zero-API durable memory store. + + All operations are local SQLite. No network calls. No API keys. + HRR vectors provide semantic similarity without embedding models. + FTS5 provides fast keyword search. RRF merges both rankings. + """ + + def __init__(self, db_path: Optional[str] = None): + if db_path is None: + db_path = str(Path.home() / ".hermes" / "palace" / "sovereign.db") + self._db_path = db_path + Path(db_path).parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(db_path) + self._conn.row_factory = sqlite3.Row + self._conn.executescript(_SCHEMA) + + def close(self): + self._conn.close() + + # ------------------------------------------------------------------ + # Store + # ------------------------------------------------------------------ + + def store( + self, + content: str, + room: str = "general", + category: str = "", + trust: float = 0.5, + ) -> int: + """Store a fact in durable memory. Returns the memory_id.""" + now = time.time() + vec = encode_text(content) + blob = serialize_vector(vec) + cur = self._conn.execute( + """INSERT INTO memories (content, room, category, trust_score, + created_at, updated_at, hrr_vector) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + (content, room, category, trust, now, now, blob), + ) + self._conn.commit() + return cur.lastrowid + + def store_batch(self, items: list[dict]) -> list[int]: + """Store multiple facts. Each item: {content, room?, category?, trust?}.""" + ids = [] + now = time.time() + for item in items: + content = item["content"] + vec = encode_text(content) + blob = serialize_vector(vec) + cur = self._conn.execute( + """INSERT INTO memories (content, room, category, trust_score, + created_at, updated_at, hrr_vector) + VALUES (?, ?, ?, ?, ?, ?, ?)""", + ( + content, + item.get("room", "general"), + item.get("category", ""), + item.get("trust", 0.5), + now, now, blob, + ), + ) + ids.append(cur.lastrowid) + self._conn.commit() + return ids + + # ------------------------------------------------------------------ + # Search — hybrid FTS5 + HRR with Reciprocal Rank Fusion + # ------------------------------------------------------------------ + + def search( + self, + query: str, + room: Optional[str] = None, + limit: int = 10, + min_trust: float = 0.0, + fts_weight: float = 0.5, + hrr_weight: float = 0.5, + ) -> list[dict]: + """Hybrid search: FTS5 keywords + HRR semantic similarity. + + Uses Reciprocal Rank Fusion (RRF) to merge both rankings. + Returns list of dicts with content, room, score, trust_score. + """ + k_rrf = 60 # Standard RRF constant + + # Stage 1: FTS5 candidates + fts_results = self._fts_search(query, room, min_trust, limit * 3) + + # Stage 2: HRR candidates (scan top N by trust) + hrr_results = self._hrr_search(query, room, min_trust, limit * 3) + + # Stage 3: RRF fusion + scores: dict[int, float] = {} + meta: dict[int, dict] = {} + + for rank, row in enumerate(fts_results): + mid = row["memory_id"] + scores[mid] = scores.get(mid, 0) + fts_weight / (k_rrf + rank + 1) + meta[mid] = dict(row) + + for rank, row in enumerate(hrr_results): + mid = row["memory_id"] + scores[mid] = scores.get(mid, 0) + hrr_weight / (k_rrf + rank + 1) + if mid not in meta: + meta[mid] = dict(row) + + # Sort by fused score + ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:limit] + + results = [] + for mid, score in ranked: + m = meta[mid] + # Bump retrieval count + self._conn.execute( + "UPDATE memories SET retrieval_count = retrieval_count + 1 WHERE memory_id = ?", + (mid,), + ) + results.append({ + "memory_id": mid, + "content": m["content"], + "room": m["room"], + "category": m.get("category", ""), + "trust_score": m["trust_score"], + "score": round(score, 6), + }) + + if results: + self._conn.commit() + return results + + def _fts_search( + self, query: str, room: Optional[str], min_trust: float, limit: int + ) -> list[dict]: + """FTS5 full-text search.""" + try: + if room: + rows = self._conn.execute( + """SELECT m.memory_id, m.content, m.room, m.category, + m.trust_score, m.retrieval_count + FROM memories_fts f + JOIN memories m ON f.rowid = m.memory_id + WHERE memories_fts MATCH ? AND m.room = ? + AND m.trust_score >= ? + ORDER BY rank LIMIT ?""", + (query, room, min_trust, limit), + ).fetchall() + else: + rows = self._conn.execute( + """SELECT m.memory_id, m.content, m.room, m.category, + m.trust_score, m.retrieval_count + FROM memories_fts f + JOIN memories m ON f.rowid = m.memory_id + WHERE memories_fts MATCH ? + AND m.trust_score >= ? + ORDER BY rank LIMIT ?""", + (query, min_trust, limit), + ).fetchall() + return [dict(r) for r in rows] + except sqlite3.OperationalError: + # Bad FTS query syntax — degrade gracefully + return [] + + def _hrr_search( + self, query: str, room: Optional[str], min_trust: float, limit: int + ) -> list[dict]: + """HRR cosine similarity search (brute-force scan, fast for <100K facts).""" + query_vec = encode_text(query) + + if room: + rows = self._conn.execute( + """SELECT memory_id, content, room, category, trust_score, + retrieval_count, hrr_vector + FROM memories + WHERE room = ? AND trust_score >= ? AND hrr_vector IS NOT NULL""", + (room, min_trust), + ).fetchall() + else: + rows = self._conn.execute( + """SELECT memory_id, content, room, category, trust_score, + retrieval_count, hrr_vector + FROM memories + WHERE trust_score >= ? AND hrr_vector IS NOT NULL""", + (min_trust,), + ).fetchall() + + scored = [] + for r in rows: + stored_vec = deserialize_vector(r["hrr_vector"]) + sim = cosine_similarity_phase(query_vec, stored_vec) + scored.append((sim, dict(r))) + + scored.sort(key=lambda x: x[0], reverse=True) + return [item[1] for item in scored[:limit]] + + # ------------------------------------------------------------------ + # Trust management + # ------------------------------------------------------------------ + + def boost_trust(self, memory_id: int, delta: float = 0.05) -> None: + """Increase trust score when a memory proves useful.""" + self._conn.execute( + """UPDATE memories SET trust_score = MIN(1.0, trust_score + ?), + updated_at = ? WHERE memory_id = ?""", + (delta, time.time(), memory_id), + ) + self._conn.commit() + + def decay_trust(self, memory_id: int, delta: float = 0.02) -> None: + """Decrease trust score when a memory is contradicted.""" + self._conn.execute( + """UPDATE memories SET trust_score = MAX(0.0, trust_score - ?), + updated_at = ? WHERE memory_id = ?""", + (delta, time.time(), memory_id), + ) + self._conn.commit() + + # ------------------------------------------------------------------ + # Room operations + # ------------------------------------------------------------------ + + def list_rooms(self) -> list[dict]: + """List all rooms with fact counts.""" + rows = self._conn.execute( + """SELECT room, COUNT(*) as count, + AVG(trust_score) as avg_trust + FROM memories GROUP BY room ORDER BY count DESC""" + ).fetchall() + return [dict(r) for r in rows] + + def room_contents(self, room: str, limit: int = 50) -> list[dict]: + """Get all facts in a room, ordered by trust.""" + rows = self._conn.execute( + """SELECT memory_id, content, category, trust_score, + retrieval_count, created_at + FROM memories WHERE room = ? + ORDER BY trust_score DESC, created_at DESC LIMIT ?""", + (room, limit), + ).fetchall() + return [dict(r) for r in rows] + + # ------------------------------------------------------------------ + # Stats + # ------------------------------------------------------------------ + + def stats(self) -> dict: + """Return store statistics.""" + row = self._conn.execute( + """SELECT COUNT(*) as total, + AVG(trust_score) as avg_trust, + SUM(retrieval_count) as total_retrievals, + COUNT(DISTINCT room) as room_count + FROM memories""" + ).fetchone() + return dict(row) + + # ------------------------------------------------------------------ + # Promotion support (scratchpad → durable) + # ------------------------------------------------------------------ + + def log_promotion( + self, + session_id: str, + scratch_key: str, + memory_id: int, + reason: str = "", + ) -> None: + """Record a scratchpad-to-palace promotion in the audit log.""" + self._conn.execute( + """INSERT INTO promotion_log + (session_id, scratch_key, memory_id, promoted_at, reason) + VALUES (?, ?, ?, ?, ?)""", + (session_id, scratch_key, memory_id, time.time(), reason), + ) + self._conn.commit() + + def recent_promotions(self, limit: int = 20) -> list[dict]: + """Get recent promotion log entries.""" + rows = self._conn.execute( + """SELECT p.*, m.content, m.room + FROM promotion_log p + LEFT JOIN memories m ON p.memory_id = m.memory_id + ORDER BY p.promoted_at DESC LIMIT ?""", + (limit,), + ).fetchall() + return [dict(r) for r in rows] diff --git a/hermes-sovereign/mempalace/tests/test_sovereign_store.py b/hermes-sovereign/mempalace/tests/test_sovereign_store.py new file mode 100644 index 00000000..38c1fd3c --- /dev/null +++ b/hermes-sovereign/mempalace/tests/test_sovereign_store.py @@ -0,0 +1,255 @@ +"""Tests for the Sovereign Memory Store and Promotion system. + +Zero-API, zero-network — everything runs against an in-memory SQLite DB. +""" + +import os +import sys +import tempfile +import time +import unittest + +# Allow imports from parent package +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from sovereign_store import ( + SovereignStore, + encode_text, + cosine_similarity_phase, + serialize_vector, + deserialize_vector, +) +from promotion import ( + evaluate_for_promotion, + promote, + promote_session_batch, +) + + +class TestHRRVectors(unittest.TestCase): + """Test the HRR encoding and similarity functions.""" + + def test_deterministic_encoding(self): + """Same text always produces the same vector.""" + v1 = encode_text("hello world") + v2 = encode_text("hello world") + self.assertAlmostEqual(cosine_similarity_phase(v1, v2), 1.0, places=5) + + def test_similar_texts_higher_similarity(self): + """Related texts should be more similar than unrelated ones.""" + v_agent = encode_text("agent memory palace retrieval") + v_similar = encode_text("agent recall memory search") + v_unrelated = encode_text("banana strawberry fruit smoothie") + sim_related = cosine_similarity_phase(v_agent, v_similar) + sim_unrelated = cosine_similarity_phase(v_agent, v_unrelated) + self.assertGreater(sim_related, sim_unrelated) + + def test_serialize_roundtrip(self): + """Vectors survive serialization to/from bytes.""" + vec = encode_text("test serialization") + blob = serialize_vector(vec) + restored = deserialize_vector(blob) + sim = cosine_similarity_phase(vec, restored) + self.assertAlmostEqual(sim, 1.0, places=5) + + def test_empty_text(self): + """Empty text gets a fallback encoding.""" + vec = encode_text("") + self.assertEqual(len(vec) if hasattr(vec, '__len__') else len(list(vec)), 512) + + +class TestSovereignStore(unittest.TestCase): + """Test the SQLite-backed sovereign store.""" + + def setUp(self): + self.db_path = os.path.join(tempfile.mkdtemp(), "test.db") + self.store = SovereignStore(db_path=self.db_path) + + def tearDown(self): + self.store.close() + if os.path.exists(self.db_path): + os.remove(self.db_path) + + def test_store_and_retrieve(self): + """Store a fact and find it via search.""" + mid = self.store.store("Timmy is a sovereign AI agent on Hermes VPS", room="identity") + results = self.store.search("sovereign agent", room="identity") + self.assertTrue(any(r["memory_id"] == mid for r in results)) + + def test_fts_search(self): + """FTS5 keyword search works.""" + self.store.store("The beacon game uses paperclips mechanics", room="projects") + self.store.store("Fleet agents handle delegation and dispatch", room="fleet") + results = self.store.search("paperclips") + self.assertTrue(len(results) > 0) + self.assertIn("paperclips", results[0]["content"].lower()) + + def test_hrr_search_semantic(self): + """HRR similarity finds related content even without exact keywords.""" + self.store.store("Memory palace rooms organize facts spatially", room="memory") + self.store.store("Pizza delivery service runs on weekends", room="unrelated") + results = self.store.search("organize knowledge rooms", room="memory") + self.assertTrue(len(results) > 0) + self.assertIn("palace", results[0]["content"].lower()) + + def test_room_filtering(self): + """Room filter restricts search scope.""" + self.store.store("Hermes harness manages tool calls", room="infrastructure") + self.store.store("Hermes mythology Greek god", room="lore") + results = self.store.search("Hermes", room="infrastructure") + self.assertTrue(all(r["room"] == "infrastructure" for r in results)) + + def test_trust_boost(self): + """Trust score increases when boosted.""" + mid = self.store.store("fact", trust=0.5) + self.store.boost_trust(mid, delta=0.1) + results = self.store.room_contents("general") + fact = next(r for r in results if r["memory_id"] == mid) + self.assertAlmostEqual(fact["trust_score"], 0.6, places=2) + + def test_trust_decay(self): + """Trust score decreases when decayed.""" + mid = self.store.store("questionable fact", trust=0.5) + self.store.decay_trust(mid, delta=0.2) + results = self.store.room_contents("general") + fact = next(r for r in results if r["memory_id"] == mid) + self.assertAlmostEqual(fact["trust_score"], 0.3, places=2) + + def test_batch_store(self): + """Batch store works.""" + ids = self.store.store_batch([ + {"content": "fact one", "room": "test"}, + {"content": "fact two", "room": "test"}, + {"content": "fact three", "room": "test"}, + ]) + self.assertEqual(len(ids), 3) + rooms = self.store.list_rooms() + test_room = next(r for r in rooms if r["room"] == "test") + self.assertEqual(test_room["count"], 3) + + def test_stats(self): + """Stats returns correct counts.""" + self.store.store("a fact", room="r1") + self.store.store("another fact", room="r2") + s = self.store.stats() + self.assertEqual(s["total"], 2) + self.assertEqual(s["room_count"], 2) + + def test_retrieval_count_increments(self): + """Retrieval count goes up when a fact is found via search.""" + self.store.store("unique searchable content xyz123", room="test") + self.store.search("xyz123") + results = self.store.room_contents("test") + self.assertTrue(any(r["retrieval_count"] > 0 for r in results)) + + +class TestPromotion(unittest.TestCase): + """Test the quality-gated promotion system.""" + + def setUp(self): + self.db_path = os.path.join(tempfile.mkdtemp(), "promo_test.db") + self.store = SovereignStore(db_path=self.db_path) + + def tearDown(self): + self.store.close() + + def test_successful_promotion(self): + """Good content passes all gates.""" + result = promote( + content="Timmy runs on the Hermes VPS at 143.198.27.163 with local Ollama inference", + store=self.store, + session_id="test-session-001", + scratch_key="vps_info", + room="infrastructure", + ) + self.assertTrue(result.success) + self.assertIsNotNone(result.memory_id) + + def test_reject_too_short(self): + """Short fragments get rejected.""" + result = promote( + content="yes", + store=self.store, + session_id="test", + scratch_key="short", + ) + self.assertFalse(result.success) + self.assertIn("Too short", result.reason) + + def test_reject_duplicate(self): + """Duplicate content gets rejected.""" + self.store.store("SOUL.md is the canonical identity document for Timmy", room="identity") + result = promote( + content="SOUL.md is the canonical identity document for Timmy", + store=self.store, + session_id="test", + scratch_key="soul", + room="identity", + ) + self.assertFalse(result.success) + self.assertIn("uplicate", result.reason) + + def test_reject_stale(self): + """Old notes get flagged as stale.""" + old_time = time.time() - (86400 * 10) + result = promote( + content="This is a note from long ago about something important", + store=self.store, + session_id="test", + scratch_key="old", + written_at=old_time, + ) + self.assertFalse(result.success) + self.assertIn("Stale", result.reason) + + def test_force_bypasses_gates(self): + """Force flag overrides quality gates.""" + result = promote( + content="ok", + store=self.store, + session_id="test", + scratch_key="forced", + force=True, + ) + self.assertTrue(result.success) + + def test_evaluate_dry_run(self): + """Evaluate returns gate details without promoting.""" + eval_result = evaluate_for_promotion( + content="The fleet uses kimi-k2.5 as the primary model for all agent operations", + store=self.store, + room="fleet", + ) + self.assertTrue(eval_result["eligible"]) + self.assertTrue(all(p for p, _ in eval_result["gates"].values())) + + def test_batch_promotion(self): + """Batch promotion processes all notes.""" + notes = { + "infra": {"value": "Hermes VPS runs Ubuntu 22.04 with 2 vCPUs and 4GB RAM", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")}, + "short": {"value": "no", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")}, + "model": {"value": "The primary local model is gemma4:latest running on Ollama", "written_at": time.strftime("%Y-%m-%d %H:%M:%S")}, + } + results = promote_session_batch(self.store, "batch-session", notes, room="config") + promoted = [r for r in results if r.success] + rejected = [r for r in results if not r.success] + self.assertEqual(len(promoted), 2) + self.assertEqual(len(rejected), 1) + + def test_promotion_logged(self): + """Successful promotions appear in the audit log.""" + promote( + content="Forge is hosted at forge.alexanderwhitestone.com running Gitea", + store=self.store, + session_id="log-test", + scratch_key="forge", + room="infrastructure", + ) + log = self.store.recent_promotions() + self.assertTrue(len(log) > 0) + self.assertEqual(log[0]["session_id"], "log-test") + self.assertEqual(log[0]["scratch_key"], "forge") + + +if __name__ == "__main__": + unittest.main() diff --git a/hermes-sovereign/orchestrator/orchestrate.sh b/hermes-sovereign/orchestrator/orchestrate.sh new file mode 100755 index 00000000..8af52ebc --- /dev/null +++ b/hermes-sovereign/orchestrator/orchestrate.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# orchestrate.sh — Sovereign Orchestrator wrapper +# Sets environment and runs orchestrator.py +# +# Usage: +# ./orchestrate.sh # dry-run (safe default) +# ./orchestrate.sh --once # single live dispatch cycle +# ./orchestrate.sh --daemon # continuous (every 15 min) +# ./orchestrate.sh --dry-run # explicit dry-run + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +HERMES_DIR="${HOME}/.hermes" + +# Load Gitea token +if [[ -z "${GITEA_TOKEN:-}" ]]; then + if [[ -f "${HERMES_DIR}/gitea_token_vps" ]]; then + export GITEA_TOKEN="$(cat "${HERMES_DIR}/gitea_token_vps")" + else + echo "[FATAL] No GITEA_TOKEN and ~/.hermes/gitea_token_vps not found" + exit 1 + fi +fi + +# Load Telegram token +if [[ -z "${TELEGRAM_BOT_TOKEN:-}" ]]; then + if [[ -f "${HOME}/.config/telegram/special_bot" ]]; then + export TELEGRAM_BOT_TOKEN="$(cat "${HOME}/.config/telegram/special_bot")" + fi +fi + +# Run preflight checks if available +if [[ -x "${HERMES_DIR}/bin/api-key-preflight.sh" ]]; then + "${HERMES_DIR}/bin/api-key-preflight.sh" 2>/dev/null || true +fi + +# Run the orchestrator +exec python3 "${SCRIPT_DIR}/orchestrator.py" "$@" diff --git a/hermes-sovereign/orchestrator/orchestrator.py b/hermes-sovereign/orchestrator/orchestrator.py new file mode 100755 index 00000000..075e5d59 --- /dev/null +++ b/hermes-sovereign/orchestrator/orchestrator.py @@ -0,0 +1,645 @@ +#!/usr/bin/env python3 +""" +Sovereign Orchestrator v1 +Reads the Gitea backlog, scores/prioritizes issues, dispatches to agents. + +Usage: + python3 orchestrator.py --once # single dispatch cycle + python3 orchestrator.py --daemon # run every 15 min + python3 orchestrator.py --dry-run # score and report, no dispatch +""" + +import json +import os +import sys +import time +import subprocess +import urllib.request +import urllib.error +import urllib.parse +from datetime import datetime, timezone + +# --------------------------------------------------------------------------- +# CONFIG +# --------------------------------------------------------------------------- + +GITEA_API = "https://forge.alexanderwhitestone.com/api/v1" +GITEA_OWNER = "Timmy_Foundation" +REPOS = ["timmy-config", "the-nexus", "timmy-home"] + +TELEGRAM_CHAT_ID = "-1003664764329" +DAEMON_INTERVAL = 900 # 15 minutes + +# Tags that mark issues we should never auto-dispatch +FILTER_TAGS = ["[EPIC]", "[DO NOT CLOSE]", "[PERMANENT]", "[PHILOSOPHY]", "[MORNING REPORT]"] + +# Known agent usernames on Gitea (for assignee detection) +AGENT_USERNAMES = {"groq", "ezra", "bezalel", "allegro", "timmy", "thetimmyc"} + +# --------------------------------------------------------------------------- +# AGENT ROSTER +# --------------------------------------------------------------------------- + +AGENTS = { + "groq": { + "type": "loop", + "endpoint": "local", + "strengths": ["code", "bug-fix", "small-changes"], + "repos": ["the-nexus", "hermes-agent", "timmy-config", "timmy-home"], + "max_concurrent": 1, + }, + "ezra": { + "type": "gateway", + "endpoint": "http://143.198.27.163:8643/v1/chat/completions", + "ssh": "root@143.198.27.163", + "strengths": ["research", "architecture", "complex", "multi-file"], + "repos": ["timmy-config", "the-nexus", "timmy-home"], + "max_concurrent": 1, + }, + "bezalel": { + "type": "gateway", + "endpoint": "http://159.203.146.185:8643/v1/chat/completions", + "ssh": "root@159.203.146.185", + "strengths": ["ci", "infra", "ops", "testing"], + "repos": ["timmy-config", "hermes-agent", "the-nexus"], + "max_concurrent": 1, + }, +} + +# --------------------------------------------------------------------------- +# CREDENTIALS +# --------------------------------------------------------------------------- + +def load_gitea_token(): + """Read Gitea token from env or file.""" + token = os.environ.get("GITEA_TOKEN", "") + if token: + return token.strip() + token_path = os.path.expanduser("~/.hermes/gitea_token_vps") + try: + with open(token_path) as f: + return f.read().strip() + except FileNotFoundError: + print(f"[FATAL] No GITEA_TOKEN env and {token_path} not found") + sys.exit(1) + + +def load_telegram_token(): + """Read Telegram bot token from file.""" + path = os.path.expanduser("~/.config/telegram/special_bot") + try: + with open(path) as f: + return f.read().strip() + except FileNotFoundError: + return "" + + +GITEA_TOKEN = "" +TELEGRAM_TOKEN = "" + +# --------------------------------------------------------------------------- +# HTTP HELPERS (stdlib only) +# --------------------------------------------------------------------------- + +def gitea_request(path, method="GET", data=None): + """Make an authenticated Gitea API request.""" + url = f"{GITEA_API}{path}" + headers = { + "Authorization": f"token {GITEA_TOKEN}", + "Content-Type": "application/json", + "Accept": "application/json", + } + body = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=body, headers=headers, method=method) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + body_text = e.read().decode() if e.fp else "" + print(f"[API ERROR] {method} {url} -> {e.code}: {body_text[:200]}") + return None + except Exception as e: + print(f"[API ERROR] {method} {url} -> {e}") + return None + + +def send_telegram(message): + """Send message to Telegram group.""" + if not TELEGRAM_TOKEN: + print("[WARN] No Telegram token, skipping notification") + return False + url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" + data = json.dumps({ + "chat_id": TELEGRAM_CHAT_ID, + "text": message, + "parse_mode": "Markdown", + "disable_web_page_preview": True, + }).encode() + req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=15) as resp: + return resp.status == 200 + except Exception as e: + print(f"[TELEGRAM ERROR] {e}") + return False + + +# --------------------------------------------------------------------------- +# 1. BACKLOG READER +# --------------------------------------------------------------------------- + +def fetch_issues(repo): + """Fetch all open issues from a repo, handling pagination.""" + issues = [] + page = 1 + while True: + result = gitea_request( + f"/repos/{GITEA_OWNER}/{repo}/issues?state=open&type=issues&limit=50&page={page}" + ) + if not result: + break + issues.extend(result) + if len(result) < 50: + break + page += 1 + return issues + + +def should_filter(issue): + """Check if issue title contains any filter tags.""" + title = issue.get("title", "").upper() + for tag in FILTER_TAGS: + if tag.upper().replace("[", "").replace("]", "") in title.replace("[", "").replace("]", ""): + return True + # Also filter pull requests + if issue.get("pull_request"): + return True + return False + + +def read_backlog(): + """Read and filter the full backlog across all repos.""" + backlog = [] + for repo in REPOS: + print(f" Fetching {repo}...") + issues = fetch_issues(repo) + for issue in issues: + if should_filter(issue): + continue + assignees = [a.get("login", "") for a in (issue.get("assignees") or [])] + labels = [l.get("name", "") for l in (issue.get("labels") or [])] + backlog.append({ + "repo": repo, + "number": issue["number"], + "title": issue["title"], + "labels": labels, + "assignees": assignees, + "created_at": issue.get("created_at", ""), + "comments": issue.get("comments", 0), + "url": issue.get("html_url", ""), + }) + print(f" Total actionable issues: {len(backlog)}") + return backlog + + +# --------------------------------------------------------------------------- +# 2. PRIORITY SCORER +# --------------------------------------------------------------------------- + +def score_issue(issue): + """Score an issue 0-100 based on priority signals.""" + score = 0 + title_upper = issue["title"].upper() + labels_upper = [l.upper() for l in issue["labels"]] + all_text = title_upper + " " + " ".join(labels_upper) + + # Critical / Bug: +30 + if any(tag in all_text for tag in ["CRITICAL", "BUG"]): + score += 30 + + # P0 / Urgent: +25 + if any(tag in all_text for tag in ["P0", "URGENT"]): + score += 25 + + # P1: +15 + if "P1" in all_text: + score += 15 + + # OPS / Security: +10 + if any(tag in all_text for tag in ["OPS", "SECURITY"]): + score += 10 + + # Unassigned: +10 + if not issue["assignees"]: + score += 10 + + # Age > 7 days: +5 + try: + created = issue["created_at"].replace("Z", "+00:00") + created_dt = datetime.fromisoformat(created) + age_days = (datetime.now(timezone.utc) - created_dt).days + if age_days > 7: + score += 5 + except (ValueError, AttributeError): + pass + + # Has comments: +5 + if issue["comments"] > 0: + score += 5 + + # Infrastructure repo: +5 + if issue["repo"] == "timmy-config": + score += 5 + + # Already assigned to an agent: -10 + if any(a.lower() in AGENT_USERNAMES for a in issue["assignees"]): + score -= 10 + + issue["score"] = max(0, min(100, score)) + return issue + + +def prioritize_backlog(backlog): + """Score and sort the backlog by priority.""" + scored = [score_issue(i) for i in backlog] + scored.sort(key=lambda x: x["score"], reverse=True) + return scored + + +# --------------------------------------------------------------------------- +# 3. AGENT HEALTH CHECKS +# --------------------------------------------------------------------------- + +def check_process(pattern): + """Check if a local process matching pattern is running.""" + try: + result = subprocess.run( + ["pgrep", "-f", pattern], + capture_output=True, text=True, timeout=5 + ) + return result.returncode == 0 + except Exception: + return False + + +def check_ssh_service(host, service_name): + """Check if a remote service is running via SSH.""" + try: + result = subprocess.run( + ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no", + f"root@{host}", + f"systemctl is-active {service_name} 2>/dev/null || pgrep -f {service_name}"], + capture_output=True, text=True, timeout=15 + ) + return result.returncode == 0 + except Exception: + return False + + +def check_agent_health(name, agent): + """Check if an agent is alive and available.""" + if agent["type"] == "loop": + alive = check_process(f"agent-loop.*{name}") + elif agent["type"] == "gateway": + host = agent["ssh"].split("@")[1] + service = f"hermes-{name}" + alive = check_ssh_service(host, service) + else: + alive = False + return alive + + +def get_agent_status(): + """Get health status for all agents.""" + status = {} + for name, agent in AGENTS.items(): + alive = check_agent_health(name, agent) + status[name] = { + "alive": alive, + "type": agent["type"], + "strengths": agent["strengths"], + } + symbol = "UP" if alive else "DOWN" + print(f" {name}: {symbol} ({agent['type']})") + return status + + +# --------------------------------------------------------------------------- +# 4. DISPATCHER +# --------------------------------------------------------------------------- + +def classify_issue(issue): + """Classify issue type based on title and labels.""" + title = issue["title"].upper() + labels = " ".join(issue["labels"]).upper() + all_text = title + " " + labels + + types = [] + if any(w in all_text for w in ["BUG", "FIX", "BROKEN", "ERROR", "CRASH"]): + types.append("bug-fix") + if any(w in all_text for w in ["OPS", "DEPLOY", "CI", "INFRA", "PIPELINE", "MONITOR"]): + types.append("ops") + if any(w in all_text for w in ["SECURITY", "AUTH", "TOKEN", "CERT"]): + types.append("ops") + if any(w in all_text for w in ["RESEARCH", "AUDIT", "INVESTIGATE", "EXPLORE"]): + types.append("research") + if any(w in all_text for w in ["ARCHITECT", "DESIGN", "REFACTOR", "REWRITE"]): + types.append("architecture") + if any(w in all_text for w in ["TEST", "TESTING", "QA", "VALIDATE"]): + types.append("testing") + if any(w in all_text for w in ["CODE", "IMPLEMENT", "ADD", "CREATE", "BUILD"]): + types.append("code") + if any(w in all_text for w in ["SMALL", "QUICK", "SIMPLE", "MINOR", "TWEAK"]): + types.append("small-changes") + if any(w in all_text for w in ["COMPLEX", "MULTI", "LARGE", "OVERHAUL"]): + types.append("complex") + + if not types: + types = ["code"] # default + + return types + + +def match_agent(issue, agent_status, dispatched_this_cycle): + """Find the best available agent for an issue.""" + issue_types = classify_issue(issue) + candidates = [] + + for name, agent in AGENTS.items(): + # Agent must be alive + if not agent_status.get(name, {}).get("alive", False): + continue + + # Agent must handle this repo + if issue["repo"] not in agent["repos"]: + continue + + # Agent must not already be dispatched this cycle + if dispatched_this_cycle.get(name, 0) >= agent["max_concurrent"]: + continue + + # Score match based on overlapping strengths + overlap = len(set(issue_types) & set(agent["strengths"])) + candidates.append((name, overlap)) + + if not candidates: + return None + + # Sort by overlap score descending, return best match + candidates.sort(key=lambda x: x[1], reverse=True) + return candidates[0][0] + + +def assign_issue(repo, number, agent_name): + """Assign an issue to an agent on Gitea.""" + # First get current assignees to not clobber + result = gitea_request(f"/repos/{GITEA_OWNER}/{repo}/issues/{number}") + if not result: + return False + + current = [a.get("login", "") for a in result.get("assignees", [])] + if agent_name in current: + print(f" Already assigned to {agent_name}") + return True + + new_assignees = current + [agent_name] + patch_result = gitea_request( + f"/repos/{GITEA_OWNER}/{repo}/issues/{number}", + method="PATCH", + data={"assignees": new_assignees} + ) + return patch_result is not None + + +def dispatch_to_gateway(agent_name, agent, issue): + """Trigger work on a gateway agent via SSH.""" + host = agent["ssh"] + repo = issue["repo"] + number = issue["number"] + title = issue["title"] + + # Try to trigger dispatch via SSH + cmd = ( + f'ssh -o ConnectTimeout=10 -o StrictHostKeyChecking=no {host} ' + f'"echo \'Dispatched by orchestrator: {repo}#{number} - {title}\' ' + f'>> /tmp/hermes-dispatch.log"' + ) + try: + subprocess.run(cmd, shell=True, timeout=20, capture_output=True) + return True + except Exception as e: + print(f" [WARN] SSH dispatch to {agent_name} failed: {e}") + return False + + +def dispatch_cycle(backlog, agent_status, dry_run=False): + """Run one dispatch cycle. Returns dispatch report.""" + dispatched = [] + skipped = [] + dispatched_count = {} # agent_name -> count dispatched this cycle + + # Only dispatch unassigned issues (or issues not assigned to agents) + for issue in backlog: + agent_assigned = any(a.lower() in AGENT_USERNAMES for a in issue["assignees"]) + + if agent_assigned: + skipped.append((issue, "already assigned to agent")) + continue + + if issue["score"] < 5: + skipped.append((issue, "score too low")) + continue + + best_agent = match_agent(issue, agent_status, dispatched_count) + if not best_agent: + skipped.append((issue, "no available agent")) + continue + + if dry_run: + dispatched.append({ + "agent": best_agent, + "repo": issue["repo"], + "number": issue["number"], + "title": issue["title"], + "score": issue["score"], + "dry_run": True, + }) + dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1 + continue + + # Actually dispatch + print(f" Dispatching {issue['repo']}#{issue['number']} -> {best_agent}") + success = assign_issue(issue["repo"], issue["number"], best_agent) + if success: + agent = AGENTS[best_agent] + if agent["type"] == "gateway": + dispatch_to_gateway(best_agent, agent, issue) + + dispatched.append({ + "agent": best_agent, + "repo": issue["repo"], + "number": issue["number"], + "title": issue["title"], + "score": issue["score"], + }) + dispatched_count[best_agent] = dispatched_count.get(best_agent, 0) + 1 + else: + skipped.append((issue, "assignment failed")) + + return dispatched, skipped + + +# --------------------------------------------------------------------------- +# 5. CONSOLIDATED REPORT +# --------------------------------------------------------------------------- + +def generate_report(backlog, dispatched, skipped, agent_status, dry_run=False): + """Generate dispatch cycle report.""" + now = datetime.now().strftime("%Y-%m-%d %H:%M") + mode = " [DRY RUN]" if dry_run else "" + + lines = [] + lines.append(f"=== Sovereign Orchestrator Report{mode} ===") + lines.append(f"Time: {now}") + lines.append(f"Total backlog: {len(backlog)} issues") + lines.append("") + + # Agent health + lines.append("-- Agent Health --") + for name, info in agent_status.items(): + symbol = "UP" if info["alive"] else "DOWN" + lines.append(f" {name}: {symbol} ({info['type']})") + lines.append("") + + # Dispatched + lines.append(f"-- Dispatched: {len(dispatched)} --") + for d in dispatched: + dry = " (dry-run)" if d.get("dry_run") else "" + lines.append(f" [{d['score']}] {d['repo']}#{d['number']} -> {d['agent']}{dry}") + lines.append(f" {d['title'][:60]}") + lines.append("") + + # Skipped (top 10) + skip_summary = {} + for issue, reason in skipped: + skip_summary[reason] = skip_summary.get(reason, 0) + 1 + lines.append(f"-- Skipped: {len(skipped)} --") + for reason, count in sorted(skip_summary.items(), key=lambda x: -x[1]): + lines.append(f" {reason}: {count}") + lines.append("") + + # Top 5 unassigned + unassigned = [i for i in backlog if not i["assignees"]][:5] + lines.append("-- Top 5 Unassigned (by priority) --") + for i in unassigned: + lines.append(f" [{i['score']}] {i['repo']}#{i['number']}: {i['title'][:55]}") + lines.append("") + + report = "\n".join(lines) + return report + + +def format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=False): + """Format a compact Telegram message.""" + mode = " DRY RUN" if dry_run else "" + now = datetime.now().strftime("%H:%M") + + parts = [f"*Orchestrator{mode}* ({now})"] + parts.append(f"Backlog: {len(backlog)} | Dispatched: {len(dispatched)} | Skipped: {len(skipped)}") + + # Agent status line + agent_line = " | ".join( + f"{'✅' if v['alive'] else '❌'}{k}" for k, v in agent_status.items() + ) + parts.append(agent_line) + + if dispatched: + parts.append("") + parts.append("*Dispatched:*") + for d in dispatched[:5]: + dry = " 🔍" if d.get("dry_run") else "" + parts.append(f" `{d['repo']}#{d['number']}` → {d['agent']}{dry}") + + # Top unassigned + unassigned = [i for i in backlog if not i["assignees"]][:3] + if unassigned: + parts.append("") + parts.append("*Top unassigned:*") + for i in unassigned: + parts.append(f" [{i['score']}] `{i['repo']}#{i['number']}` {i['title'][:40]}") + + return "\n".join(parts) + + +# --------------------------------------------------------------------------- +# 6. MAIN +# --------------------------------------------------------------------------- + +def run_cycle(dry_run=False): + """Execute one full orchestration cycle.""" + global GITEA_TOKEN, TELEGRAM_TOKEN + GITEA_TOKEN = load_gitea_token() + TELEGRAM_TOKEN = load_telegram_token() + + print("\n[1/4] Reading backlog...") + backlog = read_backlog() + + print("\n[2/4] Scoring and prioritizing...") + backlog = prioritize_backlog(backlog) + for i in backlog[:10]: + print(f" [{i['score']:3d}] {i['repo']}/{i['number']}: {i['title'][:55]}") + + print("\n[3/4] Checking agent health...") + agent_status = get_agent_status() + + print("\n[4/4] Dispatching...") + dispatched, skipped = dispatch_cycle(backlog, agent_status, dry_run=dry_run) + + # Generate reports + report = generate_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run) + print("\n" + report) + + # Send Telegram notification + if dispatched or not dry_run: + tg_msg = format_telegram_report(backlog, dispatched, skipped, agent_status, dry_run=dry_run) + send_telegram(tg_msg) + + return backlog, dispatched, skipped + + +def main(): + import argparse + parser = argparse.ArgumentParser(description="Sovereign Orchestrator v1") + parser.add_argument("--once", action="store_true", help="Single dispatch cycle") + parser.add_argument("--daemon", action="store_true", help="Run every 15 min") + parser.add_argument("--dry-run", action="store_true", help="Score/report only, no dispatch") + parser.add_argument("--interval", type=int, default=DAEMON_INTERVAL, + help=f"Daemon interval in seconds (default: {DAEMON_INTERVAL})") + args = parser.parse_args() + + if not any([args.once, args.daemon, args.dry_run]): + args.dry_run = True # safe default + print("[INFO] No mode specified, defaulting to --dry-run") + + print("=" * 60) + print(" SOVEREIGN ORCHESTRATOR v1") + print("=" * 60) + + if args.daemon: + print(f"[DAEMON] Running every {args.interval}s (Ctrl+C to stop)") + cycle = 0 + while True: + cycle += 1 + print(f"\n--- Cycle {cycle} ---") + try: + run_cycle(dry_run=args.dry_run) + except Exception as e: + print(f"[ERROR] Cycle failed: {e}") + print(f"[DAEMON] Sleeping {args.interval}s...") + time.sleep(args.interval) + else: + run_cycle(dry_run=args.dry_run) + + +if __name__ == "__main__": + main()