1
0
This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/spark/memory.py

314 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Spark memory — SQLite-backed event capture and memory consolidation.
Captures swarm events (tasks posted, bids, assignments, completions,
failures) and distills them into higher-level memories with importance
scoring. This is the persistence layer for Spark Intelligence.
Tables
------
spark_events — raw event log (every swarm event)
spark_memories — consolidated insights extracted from event patterns
"""
import logging
import sqlite3
import uuid
from collections.abc import Generator
from contextlib import closing, contextmanager
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
logger = logging.getLogger(__name__)
DB_PATH = Path("data/spark.db")
# Importance thresholds
IMPORTANCE_LOW = 0.3
IMPORTANCE_MEDIUM = 0.6
IMPORTANCE_HIGH = 0.8
@dataclass
class SparkEvent:
"""A single captured swarm event."""
id: str
event_type: str # task_posted, bid, assignment, completion, failure
agent_id: str | None
task_id: str | None
description: str
data: str # JSON payload
importance: float # 0.01.0
created_at: str
@dataclass
class SparkMemory:
"""A consolidated memory distilled from event patterns."""
id: str
memory_type: str # pattern, insight, anomaly
subject: str # agent_id or "system"
content: str # Human-readable insight
confidence: float # 0.01.0
source_events: int # How many events contributed
created_at: str
expires_at: str | None
@contextmanager
def _get_conn() -> Generator[sqlite3.Connection, None, None]:
DB_PATH.parent.mkdir(parents=True, exist_ok=True)
with closing(sqlite3.connect(str(DB_PATH))) as conn:
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA busy_timeout=5000")
conn.execute("""
CREATE TABLE IF NOT EXISTS spark_events (
id TEXT PRIMARY KEY,
event_type TEXT NOT NULL,
agent_id TEXT,
task_id TEXT,
description TEXT NOT NULL DEFAULT '',
data TEXT NOT NULL DEFAULT '{}',
importance REAL NOT NULL DEFAULT 0.5,
created_at TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS spark_memories (
id TEXT PRIMARY KEY,
memory_type TEXT NOT NULL,
subject TEXT NOT NULL DEFAULT 'system',
content TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 0.5,
source_events INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
expires_at TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_type ON spark_events(event_type)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_agent ON spark_events(agent_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_events_task ON spark_events(task_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_memories_subject ON spark_memories(subject)")
conn.commit()
yield conn
# ── Importance scoring ──────────────────────────────────────────────────────
def score_importance(event_type: str, data: dict) -> float:
"""Compute importance score for an event (0.01.0).
High-importance events: failures, large bids, first-time patterns.
Low-importance events: routine bids, repeated successful completions.
"""
base_scores = {
"task_posted": 0.4,
"bid_submitted": 0.2,
"task_assigned": 0.5,
"task_completed": 0.6,
"task_failed": 0.9,
"agent_joined": 0.5,
"prediction_result": 0.7,
}
score = base_scores.get(event_type, 0.5)
# Boost for failures (always important to learn from)
if event_type == "task_failed":
score = min(1.0, score + 0.1)
# Boost for high-value bids
bid_sats = data.get("bid_sats", 0)
if bid_sats and bid_sats > 80:
score = min(1.0, score + 0.15)
return round(score, 2)
# ── Event recording ─────────────────────────────────────────────────────────
def record_event(
event_type: str,
description: str,
agent_id: str | None = None,
task_id: str | None = None,
data: str = "{}",
importance: float | None = None,
) -> str:
"""Record a swarm event. Returns the event id."""
import json
event_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
if importance is None:
try:
parsed = json.loads(data) if isinstance(data, str) else data
except (json.JSONDecodeError, TypeError):
parsed = {}
importance = score_importance(event_type, parsed)
with _get_conn() as conn:
conn.execute(
"""
INSERT INTO spark_events
(id, event_type, agent_id, task_id, description, data, importance, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(event_id, event_type, agent_id, task_id, description, data, importance, now),
)
conn.commit()
# Bridge to unified event log so all events are queryable from one place
try:
from swarm.event_log import EventType as _ET
from swarm.event_log import log_event as _log
_log(
_ET.SYSTEM_INFO,
source=f"spark:{event_type}",
data={"description": description, "importance": importance, "spark_event_id": event_id},
task_id=task_id or "",
agent_id=agent_id or "",
)
except Exception as exc:
logger.debug("Spark event log error: %s", exc)
pass # Graceful — don't break spark if event_log is unavailable
return event_id
def get_events(
event_type: str | None = None,
agent_id: str | None = None,
task_id: str | None = None,
limit: int = 100,
min_importance: float = 0.0,
) -> list[SparkEvent]:
"""Query events with optional filters."""
query = "SELECT * FROM spark_events WHERE importance >= ?"
params: list = [min_importance]
if event_type:
query += " AND event_type = ?"
params.append(event_type)
if agent_id:
query += " AND agent_id = ?"
params.append(agent_id)
if task_id:
query += " AND task_id = ?"
params.append(task_id)
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
with _get_conn() as conn:
rows = conn.execute(query, params).fetchall()
return [
SparkEvent(
id=r["id"],
event_type=r["event_type"],
agent_id=r["agent_id"],
task_id=r["task_id"],
description=r["description"],
data=r["data"],
importance=r["importance"],
created_at=r["created_at"],
)
for r in rows
]
def count_events(event_type: str | None = None) -> int:
"""Count events, optionally filtered by type."""
with _get_conn() as conn:
if event_type:
row = conn.execute(
"SELECT COUNT(*) FROM spark_events WHERE event_type = ?",
(event_type,),
).fetchone()
else:
row = conn.execute("SELECT COUNT(*) FROM spark_events").fetchone()
return row[0]
# ── Memory consolidation ───────────────────────────────────────────────────
def store_memory(
memory_type: str,
subject: str,
content: str,
confidence: float = 0.5,
source_events: int = 0,
expires_at: str | None = None,
) -> str:
"""Store a consolidated memory. Returns the memory id."""
mem_id = str(uuid.uuid4())
now = datetime.now(UTC).isoformat()
with _get_conn() as conn:
conn.execute(
"""
INSERT INTO spark_memories
(id, memory_type, subject, content, confidence, source_events, created_at, expires_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(mem_id, memory_type, subject, content, confidence, source_events, now, expires_at),
)
conn.commit()
return mem_id
def get_memories(
memory_type: str | None = None,
subject: str | None = None,
min_confidence: float = 0.0,
limit: int = 50,
) -> list[SparkMemory]:
"""Query memories with optional filters."""
query = "SELECT * FROM spark_memories WHERE confidence >= ?"
params: list = [min_confidence]
if memory_type:
query += " AND memory_type = ?"
params.append(memory_type)
if subject:
query += " AND subject = ?"
params.append(subject)
query += " ORDER BY created_at DESC LIMIT ?"
params.append(limit)
with _get_conn() as conn:
rows = conn.execute(query, params).fetchall()
return [
SparkMemory(
id=r["id"],
memory_type=r["memory_type"],
subject=r["subject"],
content=r["content"],
confidence=r["confidence"],
source_events=r["source_events"],
created_at=r["created_at"],
expires_at=r["expires_at"],
)
for r in rows
]
def count_memories(memory_type: str | None = None) -> int:
"""Count memories, optionally filtered by type."""
with _get_conn() as conn:
if memory_type:
row = conn.execute(
"SELECT COUNT(*) FROM spark_memories WHERE memory_type = ?",
(memory_type,),
).fetchone()
else:
row = conn.execute("SELECT COUNT(*) FROM spark_memories").fetchone()
return row[0]