@@ -125,6 +221,16 @@ function connect() {
}
function handleMessage(message) {
+ // Handle activity feed events (from event_log broadcaster)
+ if (message.type === 'event' && message.payload) {
+ addActivityEvent(message.payload);
+ // Also add to log
+ var evt = message.payload;
+ var logMsg = evt.event_type + ': ' + (evt.source || '');
+ addLog(logMsg, 'info');
+ return;
+ }
+
if (message.type === 'initial_state' || message.type === 'state_update') {
var data = message.data;
document.getElementById('stat-agents').textContent = data.agents.total;
@@ -158,6 +264,87 @@ function handleMessage(message) {
}
}
+// Activity Feed Functions
+const EVENT_ICONS = {
+ 'task.created': '📝',
+ 'task.bidding': '⏳',
+ 'task.assigned': '👤',
+ 'task.started': '▶️',
+ 'task.completed': '✅',
+ 'task.failed': '❌',
+ 'agent.joined': '🟢',
+ 'agent.left': '🔴',
+ 'bid.submitted': '💰',
+ 'auction.closed': '🏁',
+ 'tool.called': '🔧',
+ 'system.error': '⚠️',
+};
+
+const EVENT_LABELS = {
+ 'task.created': 'New task',
+ 'task.assigned': 'Task assigned',
+ 'task.completed': 'Task completed',
+ 'task.failed': 'Task failed',
+ 'agent.joined': 'Agent joined',
+ 'agent.left': 'Agent left',
+ 'bid.submitted': 'Bid submitted',
+};
+
+function addActivityEvent(evt) {
+ var container = document.getElementById('activity-feed');
+
+ // Remove empty message if present
+ var empty = container.querySelector('.activity-empty');
+ if (empty) empty.remove();
+
+ // Create activity item
+ var item = document.createElement('div');
+ item.className = 'activity-item';
+
+ var icon = EVENT_ICONS[evt.event_type] || '•';
+ var label = EVENT_LABELS[evt.event_type] || evt.event_type;
+ var time = evt.timestamp ? evt.timestamp.split('T')[1].slice(0, 8) : '--:--:--';
+
+ // Build description from data
+ var desc = '';
+ if (evt.data) {
+ try {
+ var data = typeof evt.data === 'string' ? JSON.parse(evt.data) : evt.data;
+ if (data.description) desc = data.description.slice(0, 50);
+ else if (data.reason) desc = data.reason.slice(0, 50);
+ } catch(e) {}
+ }
+
+ item.innerHTML = `
+
${icon}
+
+
${label}
+ ${desc ? `
${desc}
` : ''}
+
+ ${time}
+ ${evt.source || 'system'}
+
+
+ `;
+
+ // Add to top
+ container.insertBefore(item, container.firstChild);
+
+ // Keep only last 50 items
+ while (container.children.length > 50) {
+ container.removeChild(container.lastChild);
+ }
+
+ // Update badge
+ var badge = document.getElementById('activity-badge');
+ if (badge) {
+ badge.style.background = '#28a745';
+ setTimeout(() => {
+ badge.style.background = '';
+ }, 500);
+ }
+}
+
function refreshStats() {
fetch('/swarm').then(function(r) { return r.json(); }).then(function(data) {
document.getElementById('stat-agents').textContent = data.agents || 0;
diff --git a/src/dashboard/templates/upgrade_queue.html b/src/dashboard/templates/upgrade_queue.html
new file mode 100644
index 00000000..6f617db7
--- /dev/null
+++ b/src/dashboard/templates/upgrade_queue.html
@@ -0,0 +1,290 @@
+{% extends "base.html" %}
+
+{% block title %}Upgrade Queue - Timmy Time{% endblock %}
+
+{% block content %}
+
+
+
+
+
+
+ Pending Upgrades
+ {% if pending_count > 0 %}
+ {{ pending_count }}
+ {% endif %}
+
+
+ {% if pending %}
+
+ {% for upgrade in pending %}
+
+
+
+
+ Branch: {{ upgrade.branch_name }}
+ Proposed: {{ upgrade.proposed_at[11:16] }}
+
+
+
+ Files: {{ upgrade.files_changed|join(', ') }}
+
+
+
+ {% if upgrade.test_passed %}
+ ✓ Tests passed
+ {% else %}
+ ✗ Tests failed
+ {% endif %}
+
+
+
+
+ {% endfor %}
+
+ {% else %}
+
+
No pending upgrades.
+
Proposed modifications will appear here for review.
+
+ {% endif %}
+
+
+
+ {% if approved %}
+
+
Approved (Ready to Apply)
+
+ {% for upgrade in approved %}
+
+ {% endfor %}
+
+
+ {% endif %}
+
+
+
+
History
+
+ {% if applied %}
+
Applied
+
+ {% for upgrade in applied %}
+
+ {{ upgrade.description }}
+ APPLIED
+ {{ upgrade.applied_at[11:16] if upgrade.applied_at else '' }}
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if rejected %}
+
Rejected
+
+ {% for upgrade in rejected %}
+
+ {{ upgrade.description }}
+ REJECTED
+
+ {% endfor %}
+
+ {% endif %}
+
+ {% if failed %}
+
Failed
+
+ {% for upgrade in failed %}
+
+ {{ upgrade.description }}
+ FAILED
+ ⚠️
+
+ {% endfor %}
+
+ {% endif %}
+
+
+
+
+
+
+{% endblock %}
diff --git a/src/events/broadcaster.py b/src/events/broadcaster.py
new file mode 100644
index 00000000..d03f79c3
--- /dev/null
+++ b/src/events/broadcaster.py
@@ -0,0 +1,186 @@
+"""Event Broadcaster - bridges event_log to WebSocket clients.
+
+When events are logged, they are broadcast to all connected dashboard clients
+via WebSocket for real-time activity feed updates.
+"""
+
+import asyncio
+import json
+import logging
+from typing import Optional
+
+from swarm.event_log import EventLogEntry
+
+logger = logging.getLogger(__name__)
+
+
+class EventBroadcaster:
+ """Broadcasts events to WebSocket clients.
+
+ Usage:
+ from events.broadcaster import event_broadcaster
+ event_broadcaster.broadcast(event)
+ """
+
+ def __init__(self) -> None:
+ self._ws_manager: Optional = None
+
+ def _get_ws_manager(self):
+ """Lazy import to avoid circular deps."""
+ if self._ws_manager is None:
+ try:
+ from ws_manager.handler import ws_manager
+ self._ws_manager = ws_manager
+ except Exception as exc:
+ logger.debug("WebSocket manager not available: %s", exc)
+ return self._ws_manager
+
+ async def broadcast(self, event: EventLogEntry) -> int:
+ """Broadcast an event to all connected WebSocket clients.
+
+ Args:
+ event: The event to broadcast
+
+ Returns:
+ Number of clients notified
+ """
+ ws_manager = self._get_ws_manager()
+ if not ws_manager:
+ return 0
+
+ # Build message payload
+ payload = {
+ "type": "event",
+ "payload": {
+ "id": event.id,
+ "event_type": event.event_type.value,
+ "source": event.source,
+ "task_id": event.task_id,
+ "agent_id": event.agent_id,
+ "timestamp": event.timestamp,
+ "data": event.data,
+ }
+ }
+
+ try:
+ # Broadcast to all connected clients
+ count = await ws_manager.broadcast_json(payload)
+ logger.debug("Broadcasted event %s to %d clients", event.id[:8], count)
+ return count
+ except Exception as exc:
+ logger.error("Failed to broadcast event: %s", exc)
+ return 0
+
+ def broadcast_sync(self, event: EventLogEntry) -> None:
+ """Synchronous wrapper for broadcast.
+
+ Use this from synchronous code - it schedules the async broadcast
+ in the event loop if one is running.
+ """
+ try:
+ loop = asyncio.get_running_loop()
+ # Schedule in background, don't wait
+ asyncio.create_task(self.broadcast(event))
+ except RuntimeError:
+ # No event loop running, skip broadcast
+ pass
+
+
+# Global singleton
+event_broadcaster = EventBroadcaster()
+
+
+# Event type to icon/emoji mapping
+EVENT_ICONS = {
+ "task.created": "📝",
+ "task.bidding": "⏳",
+ "task.assigned": "👤",
+ "task.started": "▶️",
+ "task.completed": "✅",
+ "task.failed": "❌",
+ "agent.joined": "🟢",
+ "agent.left": "🔴",
+ "agent.status_changed": "🔄",
+ "bid.submitted": "💰",
+ "auction.closed": "🏁",
+ "tool.called": "🔧",
+ "tool.completed": "⚙️",
+ "tool.failed": "💥",
+ "system.error": "⚠️",
+ "system.warning": "🔶",
+ "system.info": "ℹ️",
+}
+
+EVENT_LABELS = {
+ "task.created": "New task",
+ "task.bidding": "Bidding open",
+ "task.assigned": "Task assigned",
+ "task.started": "Task started",
+ "task.completed": "Task completed",
+ "task.failed": "Task failed",
+ "agent.joined": "Agent joined",
+ "agent.left": "Agent left",
+ "agent.status_changed": "Status changed",
+ "bid.submitted": "Bid submitted",
+ "auction.closed": "Auction closed",
+ "tool.called": "Tool called",
+ "tool.completed": "Tool completed",
+ "tool.failed": "Tool failed",
+ "system.error": "Error",
+ "system.warning": "Warning",
+ "system.info": "Info",
+}
+
+
+def get_event_icon(event_type: str) -> str:
+ """Get emoji icon for event type."""
+ return EVENT_ICONS.get(event_type, "•")
+
+
+def get_event_label(event_type: str) -> str:
+ """Get human-readable label for event type."""
+ return EVENT_LABELS.get(event_type, event_type)
+
+
+def format_event_for_display(event: EventLogEntry) -> dict:
+ """Format event for display in activity feed.
+
+ Returns dict with display-friendly fields.
+ """
+ data = event.data or {}
+
+ # Build description based on event type
+ description = ""
+ if event.event_type.value == "task.created":
+ desc = data.get("description", "")
+ description = desc[:60] + "..." if len(desc) > 60 else desc
+ elif event.event_type.value == "task.assigned":
+ agent = event.agent_id[:8] if event.agent_id else "unknown"
+ bid = data.get("bid_sats", "?")
+ description = f"to {agent} ({bid} sats)"
+ elif event.event_type.value == "bid.submitted":
+ bid = data.get("bid_sats", "?")
+ description = f"{bid} sats"
+ elif event.event_type.value == "agent.joined":
+ persona = data.get("persona_id", "")
+ description = f"Persona: {persona}" if persona else "New agent"
+ else:
+ # Generic: use any string data
+ for key in ["message", "reason", "description"]:
+ if key in data:
+ val = str(data[key])
+ description = val[:60] + "..." if len(val) > 60 else val
+ break
+
+ return {
+ "id": event.id,
+ "icon": get_event_icon(event.event_type.value),
+ "label": get_event_label(event.event_type.value),
+ "type": event.event_type.value,
+ "source": event.source,
+ "description": description,
+ "timestamp": event.timestamp,
+ "time_short": event.timestamp[11:19] if event.timestamp else "",
+ "task_id": event.task_id,
+ "agent_id": event.agent_id,
+ }
diff --git a/src/lightning/ledger.py b/src/lightning/ledger.py
new file mode 100644
index 00000000..6e9763e8
--- /dev/null
+++ b/src/lightning/ledger.py
@@ -0,0 +1,488 @@
+"""Lightning Network transaction ledger.
+
+Tracks all Lightning payments in SQLite for audit, accounting, and dashboard display.
+"""
+
+import sqlite3
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Optional
+
+DB_PATH = Path("data/swarm.db")
+
+
+class TransactionType(str, Enum):
+ """Types of Lightning transactions."""
+ INCOMING = "incoming" # Invoice created (we're receiving)
+ OUTGOING = "outgoing" # Payment sent (we're paying)
+
+
+class TransactionStatus(str, Enum):
+ """Status of a transaction."""
+ PENDING = "pending"
+ SETTLED = "settled"
+ FAILED = "failed"
+ EXPIRED = "expired"
+
+
+@dataclass
+class LedgerEntry:
+ """A Lightning transaction record."""
+ id: str = field(default_factory=lambda: str(uuid.uuid4()))
+ tx_type: TransactionType = TransactionType.INCOMING
+ status: TransactionStatus = TransactionStatus.PENDING
+ payment_hash: str = "" # Lightning payment hash
+ amount_sats: int = 0
+ memo: str = "" # Description/purpose
+ invoice: Optional[str] = None # BOLT11 invoice string
+ preimage: Optional[str] = None # Payment preimage (proof of payment)
+ source: str = "" # Component that created the transaction
+ task_id: Optional[str] = None # Associated task, if any
+ agent_id: Optional[str] = None # Associated agent, if any
+ created_at: str = field(
+ default_factory=lambda: datetime.now(timezone.utc).isoformat()
+ )
+ settled_at: Optional[str] = None
+ fee_sats: int = 0 # Routing fee paid
+
+
+def _get_conn() -> sqlite3.Connection:
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(DB_PATH))
+ conn.row_factory = sqlite3.Row
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS ledger (
+ id TEXT PRIMARY KEY,
+ tx_type TEXT NOT NULL,
+ status TEXT NOT NULL DEFAULT 'pending',
+ payment_hash TEXT UNIQUE NOT NULL,
+ amount_sats INTEGER NOT NULL,
+ memo TEXT,
+ invoice TEXT,
+ preimage TEXT,
+ source TEXT NOT NULL,
+ task_id TEXT,
+ agent_id TEXT,
+ created_at TEXT NOT NULL,
+ settled_at TEXT,
+ fee_sats INTEGER DEFAULT 0
+ )
+ """
+ )
+ # Create indexes for common queries
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_ledger_status ON ledger(status)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_ledger_hash ON ledger(payment_hash)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_ledger_task ON ledger(task_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_ledger_agent ON ledger(agent_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_ledger_created ON ledger(created_at)"
+ )
+ conn.commit()
+ return conn
+
+
+def create_invoice_entry(
+ payment_hash: str,
+ amount_sats: int,
+ memo: str = "",
+ invoice: Optional[str] = None,
+ source: str = "system",
+ task_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
+) -> LedgerEntry:
+ """Record a new incoming invoice (we're receiving payment).
+
+ Args:
+ payment_hash: Lightning payment hash
+ amount_sats: Invoice amount in satoshis
+ memo: Payment description
+ invoice: Full BOLT11 invoice string
+ source: Component that created the invoice
+ task_id: Associated task ID
+ agent_id: Associated agent ID
+
+ Returns:
+ The created LedgerEntry
+ """
+ entry = LedgerEntry(
+ tx_type=TransactionType.INCOMING,
+ status=TransactionStatus.PENDING,
+ payment_hash=payment_hash,
+ amount_sats=amount_sats,
+ memo=memo,
+ invoice=invoice,
+ source=source,
+ task_id=task_id,
+ agent_id=agent_id,
+ )
+
+ conn = _get_conn()
+ conn.execute(
+ """
+ INSERT INTO ledger (id, tx_type, status, payment_hash, amount_sats,
+ memo, invoice, source, task_id, agent_id, created_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ entry.id,
+ entry.tx_type.value,
+ entry.status.value,
+ entry.payment_hash,
+ entry.amount_sats,
+ entry.memo,
+ entry.invoice,
+ entry.source,
+ entry.task_id,
+ entry.agent_id,
+ entry.created_at,
+ ),
+ )
+ conn.commit()
+ conn.close()
+ return entry
+
+
+def record_outgoing_payment(
+ payment_hash: str,
+ amount_sats: int,
+ memo: str = "",
+ invoice: Optional[str] = None,
+ source: str = "system",
+ task_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
+) -> LedgerEntry:
+ """Record an outgoing payment (we're paying someone).
+
+ Args:
+ payment_hash: Lightning payment hash
+ amount_sats: Payment amount in satoshis
+ memo: Payment description
+ invoice: BOLT11 invoice we paid
+ source: Component that initiated payment
+ task_id: Associated task ID
+ agent_id: Associated agent ID
+
+ Returns:
+ The created LedgerEntry
+ """
+ entry = LedgerEntry(
+ tx_type=TransactionType.OUTGOING,
+ status=TransactionStatus.PENDING,
+ payment_hash=payment_hash,
+ amount_sats=amount_sats,
+ memo=memo,
+ invoice=invoice,
+ source=source,
+ task_id=task_id,
+ agent_id=agent_id,
+ )
+
+ conn = _get_conn()
+ conn.execute(
+ """
+ INSERT INTO ledger (id, tx_type, status, payment_hash, amount_sats,
+ memo, invoice, source, task_id, agent_id, created_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ entry.id,
+ entry.tx_type.value,
+ entry.status.value,
+ entry.payment_hash,
+ entry.amount_sats,
+ entry.memo,
+ entry.invoice,
+ entry.source,
+ entry.task_id,
+ entry.agent_id,
+ entry.created_at,
+ ),
+ )
+ conn.commit()
+ conn.close()
+ return entry
+
+
+def mark_settled(
+ payment_hash: str,
+ preimage: Optional[str] = None,
+ fee_sats: int = 0,
+) -> Optional[LedgerEntry]:
+ """Mark a transaction as settled (payment received or sent successfully).
+
+ Args:
+ payment_hash: Lightning payment hash
+ preimage: Payment preimage (proof of payment)
+ fee_sats: Routing fee paid (for outgoing payments)
+
+ Returns:
+ Updated LedgerEntry or None if not found
+ """
+ settled_at = datetime.now(timezone.utc).isoformat()
+
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ UPDATE ledger
+ SET status = ?, preimage = ?, settled_at = ?, fee_sats = ?
+ WHERE payment_hash = ?
+ """,
+ (TransactionStatus.SETTLED.value, preimage, settled_at, fee_sats, payment_hash),
+ )
+ conn.commit()
+
+ if cursor.rowcount == 0:
+ conn.close()
+ return None
+
+ # Fetch and return updated entry
+ entry = get_by_hash(payment_hash)
+ conn.close()
+ return entry
+
+
+def mark_failed(payment_hash: str, reason: str = "") -> Optional[LedgerEntry]:
+ """Mark a transaction as failed.
+
+ Args:
+ payment_hash: Lightning payment hash
+ reason: Failure reason (stored in memo)
+
+ Returns:
+ Updated LedgerEntry or None if not found
+ """
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ UPDATE ledger
+ SET status = ?, memo = memo || ' [FAILED: ' || ? || ']'
+ WHERE payment_hash = ?
+ """,
+ (TransactionStatus.FAILED.value, reason, payment_hash),
+ )
+ conn.commit()
+
+ if cursor.rowcount == 0:
+ conn.close()
+ return None
+
+ entry = get_by_hash(payment_hash)
+ conn.close()
+ return entry
+
+
+def get_by_hash(payment_hash: str) -> Optional[LedgerEntry]:
+ """Get a transaction by payment hash."""
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT * FROM ledger WHERE payment_hash = ?", (payment_hash,)
+ ).fetchone()
+ conn.close()
+
+ if row is None:
+ return None
+
+ return LedgerEntry(
+ id=row["id"],
+ tx_type=TransactionType(row["tx_type"]),
+ status=TransactionStatus(row["status"]),
+ payment_hash=row["payment_hash"],
+ amount_sats=row["amount_sats"],
+ memo=row["memo"],
+ invoice=row["invoice"],
+ preimage=row["preimage"],
+ source=row["source"],
+ task_id=row["task_id"],
+ agent_id=row["agent_id"],
+ created_at=row["created_at"],
+ settled_at=row["settled_at"],
+ fee_sats=row["fee_sats"],
+ )
+
+
+def list_transactions(
+ tx_type: Optional[TransactionType] = None,
+ status: Optional[TransactionStatus] = None,
+ task_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
+ limit: int = 100,
+ offset: int = 0,
+) -> list[LedgerEntry]:
+ """List transactions with optional filtering.
+
+ Returns:
+ List of LedgerEntry objects, newest first
+ """
+ conn = _get_conn()
+
+ conditions = []
+ params = []
+
+ if tx_type:
+ conditions.append("tx_type = ?")
+ params.append(tx_type.value)
+ if status:
+ conditions.append("status = ?")
+ params.append(status.value)
+ if task_id:
+ conditions.append("task_id = ?")
+ params.append(task_id)
+ if agent_id:
+ conditions.append("agent_id = ?")
+ params.append(agent_id)
+
+ where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
+
+ query = f"""
+ SELECT * FROM ledger
+ {where_clause}
+ ORDER BY created_at DESC
+ LIMIT ? OFFSET ?
+ """
+ params.extend([limit, offset])
+
+ rows = conn.execute(query, params).fetchall()
+ conn.close()
+
+ return [
+ LedgerEntry(
+ id=r["id"],
+ tx_type=TransactionType(r["tx_type"]),
+ status=TransactionStatus(r["status"]),
+ payment_hash=r["payment_hash"],
+ amount_sats=r["amount_sats"],
+ memo=r["memo"],
+ invoice=r["invoice"],
+ preimage=r["preimage"],
+ source=r["source"],
+ task_id=r["task_id"],
+ agent_id=r["agent_id"],
+ created_at=r["created_at"],
+ settled_at=r["settled_at"],
+ fee_sats=r["fee_sats"],
+ )
+ for r in rows
+ ]
+
+
+def get_balance() -> dict:
+ """Get current balance summary.
+
+ Returns:
+ Dict with incoming, outgoing, pending, and available balances
+ """
+ conn = _get_conn()
+
+ # Incoming (invoices we created that are settled)
+ incoming = conn.execute(
+ """
+ SELECT COALESCE(SUM(amount_sats), 0) as total
+ FROM ledger
+ WHERE tx_type = ? AND status = ?
+ """,
+ (TransactionType.INCOMING.value, TransactionStatus.SETTLED.value),
+ ).fetchone()["total"]
+
+ # Outgoing (payments we sent that are settled)
+ outgoing_result = conn.execute(
+ """
+ SELECT COALESCE(SUM(amount_sats), 0) as total,
+ COALESCE(SUM(fee_sats), 0) as fees
+ FROM ledger
+ WHERE tx_type = ? AND status = ?
+ """,
+ (TransactionType.OUTGOING.value, TransactionStatus.SETTLED.value),
+ ).fetchone()
+ outgoing = outgoing_result["total"]
+ fees = outgoing_result["fees"]
+
+ # Pending incoming
+ pending_incoming = conn.execute(
+ """
+ SELECT COALESCE(SUM(amount_sats), 0) as total
+ FROM ledger
+ WHERE tx_type = ? AND status = ?
+ """,
+ (TransactionType.INCOMING.value, TransactionStatus.PENDING.value),
+ ).fetchone()["total"]
+
+ # Pending outgoing
+ pending_outgoing = conn.execute(
+ """
+ SELECT COALESCE(SUM(amount_sats), 0) as total
+ FROM ledger
+ WHERE tx_type = ? AND status = ?
+ """,
+ (TransactionType.OUTGOING.value, TransactionStatus.PENDING.value),
+ ).fetchone()["total"]
+
+ conn.close()
+
+ return {
+ "incoming_total_sats": incoming,
+ "outgoing_total_sats": outgoing,
+ "fees_paid_sats": fees,
+ "net_sats": incoming - outgoing - fees,
+ "pending_incoming_sats": pending_incoming,
+ "pending_outgoing_sats": pending_outgoing,
+ "available_sats": incoming - outgoing - fees - pending_outgoing,
+ }
+
+
+def get_transaction_stats(days: int = 30) -> dict:
+ """Get transaction statistics for the last N days.
+
+ Returns:
+ Dict with daily transaction counts and volumes
+ """
+ conn = _get_conn()
+
+ from datetime import timedelta
+ cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).isoformat()
+
+ rows = conn.execute(
+ """
+ SELECT
+ date(created_at) as date,
+ tx_type,
+ status,
+ COUNT(*) as count,
+ SUM(amount_sats) as volume
+ FROM ledger
+ WHERE created_at > ?
+ GROUP BY date(created_at), tx_type, status
+ ORDER BY date DESC
+ """,
+ (cutoff,),
+ ).fetchall()
+
+ conn.close()
+
+ stats = {}
+ for r in rows:
+ date = r["date"]
+ if date not in stats:
+ stats[date] = {"incoming": {"count": 0, "volume": 0},
+ "outgoing": {"count": 0, "volume": 0}}
+
+ tx_type = r["tx_type"]
+ if tx_type == TransactionType.INCOMING.value:
+ stats[date]["incoming"]["count"] += r["count"]
+ stats[date]["incoming"]["volume"] += r["volume"]
+ else:
+ stats[date]["outgoing"]["count"] += r["count"]
+ stats[date]["outgoing"]["volume"] += r["volume"]
+
+ return stats
diff --git a/src/memory/vector_store.py b/src/memory/vector_store.py
new file mode 100644
index 00000000..638233a2
--- /dev/null
+++ b/src/memory/vector_store.py
@@ -0,0 +1,483 @@
+"""Vector store for semantic memory using sqlite-vss.
+
+Provides embedding-based similarity search for the Echo agent
+to retrieve relevant context from conversation history.
+"""
+
+import json
+import sqlite3
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Optional
+
+DB_PATH = Path("data/swarm.db")
+
+# Simple embedding function using sentence-transformers if available,
+# otherwise fall back to keyword-based "pseudo-embeddings"
+try:
+ from sentence_transformers import SentenceTransformer
+ _model = SentenceTransformer('all-MiniLM-L6-v2')
+ _has_embeddings = True
+except ImportError:
+ _has_embeddings = False
+ _model = None
+
+
+def _get_embedding_dimension() -> int:
+ """Get the dimension of embeddings."""
+ if _has_embeddings and _model:
+ return _model.get_sentence_embedding_dimension()
+ return 384 # Default for all-MiniLM-L6-v2
+
+
+def _compute_embedding(text: str) -> list[float]:
+ """Compute embedding vector for text.
+
+ Uses sentence-transformers if available, otherwise returns
+ a simple hash-based vector for basic similarity.
+ """
+ if _has_embeddings and _model:
+ return _model.encode(text).tolist()
+
+ # Fallback: simple character n-gram hash embedding
+ # Not as good but allows the system to work without heavy deps
+ dim = 384
+ vec = [0.0] * dim
+ text = text.lower()
+
+ # Generate character trigram features
+ for i in range(len(text) - 2):
+ trigram = text[i:i+3]
+ hash_val = hash(trigram) % dim
+ vec[hash_val] += 1.0
+
+ # Normalize
+ norm = sum(x*x for x in vec) ** 0.5
+ if norm > 0:
+ vec = [x/norm for x in vec]
+
+ return vec
+
+
+@dataclass
+class MemoryEntry:
+ """A memory entry with vector embedding."""
+ id: str = field(default_factory=lambda: str(uuid.uuid4()))
+ content: str = "" # The actual text content
+ source: str = "" # Where it came from (agent, user, system)
+ context_type: str = "conversation" # conversation, document, fact, etc.
+ agent_id: Optional[str] = None
+ task_id: Optional[str] = None
+ session_id: Optional[str] = None
+ metadata: Optional[dict] = None
+ embedding: Optional[list[float]] = None
+ timestamp: str = field(
+ default_factory=lambda: datetime.now(timezone.utc).isoformat()
+ )
+ relevance_score: Optional[float] = None # Set during search
+
+
+def _get_conn() -> sqlite3.Connection:
+ """Get database connection with vector extension."""
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(DB_PATH))
+ conn.row_factory = sqlite3.Row
+
+ # Try to load sqlite-vss extension
+ try:
+ conn.enable_load_extension(True)
+ conn.load_extension("vector0")
+ conn.load_extension("vss0")
+ _has_vss = True
+ except Exception:
+ _has_vss = False
+
+ # Create tables
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS memory_entries (
+ id TEXT PRIMARY KEY,
+ content TEXT NOT NULL,
+ source TEXT NOT NULL,
+ context_type TEXT NOT NULL DEFAULT 'conversation',
+ agent_id TEXT,
+ task_id TEXT,
+ session_id TEXT,
+ metadata TEXT,
+ embedding TEXT, -- JSON array of floats
+ timestamp TEXT NOT NULL
+ )
+ """
+ )
+
+ # Create indexes
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_memory_agent ON memory_entries(agent_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_memory_task ON memory_entries(task_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_memory_session ON memory_entries(session_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_memory_time ON memory_entries(timestamp)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_memory_type ON memory_entries(context_type)"
+ )
+
+ conn.commit()
+ return conn
+
+
+def store_memory(
+ content: str,
+ source: str,
+ context_type: str = "conversation",
+ agent_id: Optional[str] = None,
+ task_id: Optional[str] = None,
+ session_id: Optional[str] = None,
+ metadata: Optional[dict] = None,
+ compute_embedding: bool = True,
+) -> MemoryEntry:
+ """Store a memory entry with optional embedding.
+
+ Args:
+ content: The text content to store
+ source: Source of the memory (agent name, user, system)
+ context_type: Type of context (conversation, document, fact)
+ agent_id: Associated agent ID
+ task_id: Associated task ID
+ session_id: Session identifier
+ metadata: Additional structured data
+ compute_embedding: Whether to compute vector embedding
+
+ Returns:
+ The stored MemoryEntry
+ """
+ embedding = None
+ if compute_embedding:
+ embedding = _compute_embedding(content)
+
+ entry = MemoryEntry(
+ content=content,
+ source=source,
+ context_type=context_type,
+ agent_id=agent_id,
+ task_id=task_id,
+ session_id=session_id,
+ metadata=metadata,
+ embedding=embedding,
+ )
+
+ conn = _get_conn()
+ conn.execute(
+ """
+ INSERT INTO memory_entries
+ (id, content, source, context_type, agent_id, task_id, session_id,
+ metadata, embedding, timestamp)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ entry.id,
+ entry.content,
+ entry.source,
+ entry.context_type,
+ entry.agent_id,
+ entry.task_id,
+ entry.session_id,
+ json.dumps(metadata) if metadata else None,
+ json.dumps(embedding) if embedding else None,
+ entry.timestamp,
+ ),
+ )
+ conn.commit()
+ conn.close()
+
+ return entry
+
+
+def search_memories(
+ query: str,
+ limit: int = 10,
+ context_type: Optional[str] = None,
+ agent_id: Optional[str] = None,
+ session_id: Optional[str] = None,
+ min_relevance: float = 0.0,
+) -> list[MemoryEntry]:
+ """Search for memories by semantic similarity.
+
+ Args:
+ query: Search query text
+ limit: Maximum results
+ context_type: Filter by context type
+ agent_id: Filter by agent
+ session_id: Filter by session
+ min_relevance: Minimum similarity score (0-1)
+
+ Returns:
+ List of MemoryEntry objects sorted by relevance
+ """
+ query_embedding = _compute_embedding(query)
+
+ conn = _get_conn()
+
+ # Build query with filters
+ conditions = []
+ params = []
+
+ if context_type:
+ conditions.append("context_type = ?")
+ params.append(context_type)
+ if agent_id:
+ conditions.append("agent_id = ?")
+ params.append(agent_id)
+ if session_id:
+ conditions.append("session_id = ?")
+ params.append(session_id)
+
+ where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
+
+ # Fetch candidates (we'll do in-memory similarity for now)
+ # For production with sqlite-vss, this would use vector similarity index
+ query_sql = f"""
+ SELECT * FROM memory_entries
+ {where_clause}
+ ORDER BY timestamp DESC
+ LIMIT ?
+ """
+ params.append(limit * 3) # Get more candidates for ranking
+
+ rows = conn.execute(query_sql, params).fetchall()
+ conn.close()
+
+ # Compute similarity scores
+ results = []
+ for row in rows:
+ entry = MemoryEntry(
+ id=row["id"],
+ content=row["content"],
+ source=row["source"],
+ context_type=row["context_type"],
+ agent_id=row["agent_id"],
+ task_id=row["task_id"],
+ session_id=row["session_id"],
+ metadata=json.loads(row["metadata"]) if row["metadata"] else None,
+ embedding=json.loads(row["embedding"]) if row["embedding"] else None,
+ timestamp=row["timestamp"],
+ )
+
+ if entry.embedding:
+ # Cosine similarity
+ score = _cosine_similarity(query_embedding, entry.embedding)
+ entry.relevance_score = score
+ if score >= min_relevance:
+ results.append(entry)
+ else:
+ # Fallback: check for keyword overlap
+ score = _keyword_overlap(query, entry.content)
+ entry.relevance_score = score
+ if score >= min_relevance:
+ results.append(entry)
+
+ # Sort by relevance and return top results
+ results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
+ return results[:limit]
+
+
+def _cosine_similarity(a: list[float], b: list[float]) -> float:
+ """Compute cosine similarity between two vectors."""
+ dot = sum(x*y for x, y in zip(a, b))
+ norm_a = sum(x*x for x in a) ** 0.5
+ norm_b = sum(x*x for x in b) ** 0.5
+ if norm_a == 0 or norm_b == 0:
+ return 0.0
+ return dot / (norm_a * norm_b)
+
+
+def _keyword_overlap(query: str, content: str) -> float:
+ """Simple keyword overlap score as fallback."""
+ query_words = set(query.lower().split())
+ content_words = set(content.lower().split())
+ if not query_words:
+ return 0.0
+ overlap = len(query_words & content_words)
+ return overlap / len(query_words)
+
+
+def get_memory_context(
+ query: str,
+ max_tokens: int = 2000,
+ **filters
+) -> str:
+ """Get relevant memory context as formatted text for LLM prompts.
+
+ Args:
+ query: Search query
+ max_tokens: Approximate maximum tokens to return
+ **filters: Additional filters (agent_id, session_id, etc.)
+
+ Returns:
+ Formatted context string for inclusion in prompts
+ """
+ memories = search_memories(query, limit=20, **filters)
+
+ context_parts = []
+ total_chars = 0
+ max_chars = max_tokens * 4 # Rough approximation
+
+ for mem in memories:
+ formatted = f"[{mem.source}]: {mem.content}"
+ if total_chars + len(formatted) > max_chars:
+ break
+ context_parts.append(formatted)
+ total_chars += len(formatted)
+
+ if not context_parts:
+ return ""
+
+ return "Relevant context from memory:\n" + "\n\n".join(context_parts)
+
+
+def recall_personal_facts(agent_id: Optional[str] = None) -> list[str]:
+ """Recall personal facts about the user or system.
+
+ Args:
+ agent_id: Optional agent filter
+
+ Returns:
+ List of fact strings
+ """
+ conn = _get_conn()
+
+ if agent_id:
+ rows = conn.execute(
+ """
+ SELECT content FROM memory_entries
+ WHERE context_type = 'fact' AND agent_id = ?
+ ORDER BY timestamp DESC
+ LIMIT 100
+ """,
+ (agent_id,),
+ ).fetchall()
+ else:
+ rows = conn.execute(
+ """
+ SELECT content FROM memory_entries
+ WHERE context_type = 'fact'
+ ORDER BY timestamp DESC
+ LIMIT 100
+ """,
+ ).fetchall()
+
+ conn.close()
+ return [r["content"] for r in rows]
+
+
+def store_personal_fact(fact: str, agent_id: Optional[str] = None) -> MemoryEntry:
+ """Store a personal fact about the user or system.
+
+ Args:
+ fact: The fact to store
+ agent_id: Associated agent
+
+ Returns:
+ The stored MemoryEntry
+ """
+ return store_memory(
+ content=fact,
+ source="system",
+ context_type="fact",
+ agent_id=agent_id,
+ metadata={"auto_extracted": False},
+ )
+
+
+def delete_memory(memory_id: str) -> bool:
+ """Delete a memory entry by ID.
+
+ Returns:
+ True if deleted, False if not found
+ """
+ conn = _get_conn()
+ cursor = conn.execute(
+ "DELETE FROM memory_entries WHERE id = ?",
+ (memory_id,),
+ )
+ conn.commit()
+ deleted = cursor.rowcount > 0
+ conn.close()
+ return deleted
+
+
+def get_memory_stats() -> dict:
+ """Get statistics about the memory store.
+
+ Returns:
+ Dict with counts by type, total entries, etc.
+ """
+ conn = _get_conn()
+
+ total = conn.execute(
+ "SELECT COUNT(*) as count FROM memory_entries"
+ ).fetchone()["count"]
+
+ by_type = {}
+ rows = conn.execute(
+ "SELECT context_type, COUNT(*) as count FROM memory_entries GROUP BY context_type"
+ ).fetchall()
+ for row in rows:
+ by_type[row["context_type"]] = row["count"]
+
+ with_embeddings = conn.execute(
+ "SELECT COUNT(*) as count FROM memory_entries WHERE embedding IS NOT NULL"
+ ).fetchone()["count"]
+
+ conn.close()
+
+ return {
+ "total_entries": total,
+ "by_type": by_type,
+ "with_embeddings": with_embeddings,
+ "has_embedding_model": _has_embeddings,
+ }
+
+
+def prune_memories(older_than_days: int = 90, keep_facts: bool = True) -> int:
+ """Delete old memories to manage storage.
+
+ Args:
+ older_than_days: Delete memories older than this
+ keep_facts: Whether to preserve fact-type memories
+
+ Returns:
+ Number of entries deleted
+ """
+ from datetime import timedelta
+
+ cutoff = (datetime.now(timezone.utc) - timedelta(days=older_than_days)).isoformat()
+
+ conn = _get_conn()
+
+ if keep_facts:
+ cursor = conn.execute(
+ """
+ DELETE FROM memory_entries
+ WHERE timestamp < ? AND context_type != 'fact'
+ """,
+ (cutoff,),
+ )
+ else:
+ cursor = conn.execute(
+ "DELETE FROM memory_entries WHERE timestamp < ?",
+ (cutoff,),
+ )
+
+ deleted = cursor.rowcount
+ conn.commit()
+ conn.close()
+
+ return deleted
diff --git a/src/swarm/coordinator.py b/src/swarm/coordinator.py
index 940e7b1e..04b0c0cb 100644
--- a/src/swarm/coordinator.py
+++ b/src/swarm/coordinator.py
@@ -28,6 +28,10 @@ from swarm.tasks import (
list_tasks,
update_task,
)
+from swarm.event_log import (
+ EventType,
+ log_event,
+)
# Spark Intelligence integration — lazy import to avoid circular deps
def _get_spark():
@@ -92,6 +96,14 @@ class SwarmCoordinator:
aid = agent_id or str(__import__("uuid").uuid4())
node = PersonaNode(persona_id=persona_id, agent_id=aid, comms=self.comms)
+
+ # Log agent join event
+ log_event(
+ EventType.AGENT_JOINED,
+ source="coordinator",
+ agent_id=aid,
+ data={"persona_id": persona_id, "name": node.name},
+ )
def _bid_and_register(msg):
task_id = msg.data.get("task_id")
@@ -209,6 +221,18 @@ class SwarmCoordinator:
self.auctions.open_auction(task.id)
self.comms.post_task(task.id, description)
logger.info("Task posted: %s (%s)", task.id, description[:50])
+ # Log task creation event
+ log_event(
+ EventType.TASK_CREATED,
+ source="coordinator",
+ task_id=task.id,
+ data={"description": description[:200]},
+ )
+ log_event(
+ EventType.TASK_BIDDING,
+ source="coordinator",
+ task_id=task.id,
+ )
# Broadcast task posted via WebSocket
self._broadcast(self._broadcast_task_posted, task.id, description)
# Spark: capture task-posted event with candidate agents
@@ -280,6 +304,14 @@ class SwarmCoordinator:
"Task %s assigned to %s at %d sats",
task_id, winner.agent_id, winner.bid_sats,
)
+ # Log task assignment event
+ log_event(
+ EventType.TASK_ASSIGNED,
+ source="coordinator",
+ task_id=task_id,
+ agent_id=winner.agent_id,
+ data={"bid_sats": winner.bid_sats},
+ )
# Broadcast task assigned via WebSocket
self._broadcast(self._broadcast_task_assigned, task_id, winner.agent_id)
# Spark: capture assignment
@@ -289,6 +321,13 @@ class SwarmCoordinator:
else:
update_task(task_id, status=TaskStatus.FAILED)
logger.warning("Task %s: no bids received, marked as failed", task_id)
+ # Log task failure event
+ log_event(
+ EventType.TASK_FAILED,
+ source="coordinator",
+ task_id=task_id,
+ data={"reason": "no bids received"},
+ )
return winner
def complete_task(self, task_id: str, result: str) -> Optional[Task]:
@@ -308,6 +347,14 @@ class SwarmCoordinator:
self.comms.complete_task(task_id, task.assigned_agent, result)
# Record success in learner
swarm_learner.record_task_result(task_id, task.assigned_agent, succeeded=True)
+ # Log task completion event
+ log_event(
+ EventType.TASK_COMPLETED,
+ source="coordinator",
+ task_id=task_id,
+ agent_id=task.assigned_agent,
+ data={"result_preview": result[:500]},
+ )
# Broadcast task completed via WebSocket
self._broadcast(
self._broadcast_task_completed,
@@ -335,6 +382,14 @@ class SwarmCoordinator:
registry.update_status(task.assigned_agent, "idle")
# Record failure in learner
swarm_learner.record_task_result(task_id, task.assigned_agent, succeeded=False)
+ # Log task failure event
+ log_event(
+ EventType.TASK_FAILED,
+ source="coordinator",
+ task_id=task_id,
+ agent_id=task.assigned_agent,
+ data={"reason": reason},
+ )
# Spark: capture failure
spark = _get_spark()
if spark:
diff --git a/src/swarm/event_log.py b/src/swarm/event_log.py
new file mode 100644
index 00000000..bdac7ca8
--- /dev/null
+++ b/src/swarm/event_log.py
@@ -0,0 +1,329 @@
+"""Event logging for swarm system.
+
+All agent actions, task lifecycle events, and system events are logged
+to SQLite for audit, debugging, and analytics.
+"""
+
+import sqlite3
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Optional
+
+DB_PATH = Path("data/swarm.db")
+
+
+class EventType(str, Enum):
+ """Types of events logged."""
+ # Task lifecycle
+ TASK_CREATED = "task.created"
+ TASK_BIDDING = "task.bidding"
+ TASK_ASSIGNED = "task.assigned"
+ TASK_STARTED = "task.started"
+ TASK_COMPLETED = "task.completed"
+ TASK_FAILED = "task.failed"
+
+ # Agent lifecycle
+ AGENT_JOINED = "agent.joined"
+ AGENT_LEFT = "agent.left"
+ AGENT_STATUS_CHANGED = "agent.status_changed"
+
+ # Bidding
+ BID_SUBMITTED = "bid.submitted"
+ AUCTION_CLOSED = "auction.closed"
+
+ # Tool execution
+ TOOL_CALLED = "tool.called"
+ TOOL_COMPLETED = "tool.completed"
+ TOOL_FAILED = "tool.failed"
+
+ # System
+ SYSTEM_ERROR = "system.error"
+ SYSTEM_WARNING = "system.warning"
+ SYSTEM_INFO = "system.info"
+
+
+@dataclass
+class EventLogEntry:
+ """A logged event."""
+ id: str = field(default_factory=lambda: str(uuid.uuid4()))
+ event_type: EventType = EventType.SYSTEM_INFO
+ source: str = "" # Agent or component that emitted the event
+ task_id: Optional[str] = None
+ agent_id: Optional[str] = None
+ data: Optional[str] = None # JSON string of additional data
+ timestamp: str = field(
+ default_factory=lambda: datetime.now(timezone.utc).isoformat()
+ )
+
+
+def _get_conn() -> sqlite3.Connection:
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(DB_PATH))
+ conn.row_factory = sqlite3.Row
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS event_log (
+ id TEXT PRIMARY KEY,
+ event_type TEXT NOT NULL,
+ source TEXT NOT NULL,
+ task_id TEXT,
+ agent_id TEXT,
+ data TEXT,
+ timestamp TEXT NOT NULL
+ )
+ """
+ )
+ # Create indexes for common queries
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_event_log_task ON event_log(task_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_event_log_agent ON event_log(agent_id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_event_log_type ON event_log(event_type)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_event_log_time ON event_log(timestamp)"
+ )
+ conn.commit()
+ return conn
+
+
+def log_event(
+ event_type: EventType,
+ source: str,
+ task_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
+ data: Optional[dict] = None,
+) -> EventLogEntry:
+ """Log an event to the database.
+
+ Args:
+ event_type: Type of event
+ source: Component or agent that emitted the event
+ task_id: Optional associated task ID
+ agent_id: Optional associated agent ID
+ data: Optional dictionary of additional data (will be JSON serialized)
+
+ Returns:
+ The created EventLogEntry
+ """
+ import json
+
+ entry = EventLogEntry(
+ event_type=event_type,
+ source=source,
+ task_id=task_id,
+ agent_id=agent_id,
+ data=json.dumps(data) if data else None,
+ )
+
+ conn = _get_conn()
+ conn.execute(
+ """
+ INSERT INTO event_log (id, event_type, source, task_id, agent_id, data, timestamp)
+ VALUES (?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ entry.id,
+ entry.event_type.value,
+ entry.source,
+ entry.task_id,
+ entry.agent_id,
+ entry.data,
+ entry.timestamp,
+ ),
+ )
+ conn.commit()
+ conn.close()
+
+ # Broadcast to WebSocket clients for real-time activity feed
+ try:
+ from events.broadcaster import event_broadcaster
+ event_broadcaster.broadcast_sync(entry)
+ except Exception:
+ # Don't fail if broadcaster unavailable
+ pass
+
+ return entry
+
+
+def get_event(event_id: str) -> Optional[EventLogEntry]:
+ """Get a single event by ID."""
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT * FROM event_log WHERE id = ?", (event_id,)
+ ).fetchone()
+ conn.close()
+
+ if row is None:
+ return None
+
+ return EventLogEntry(
+ id=row["id"],
+ event_type=EventType(row["event_type"]),
+ source=row["source"],
+ task_id=row["task_id"],
+ agent_id=row["agent_id"],
+ data=row["data"],
+ timestamp=row["timestamp"],
+ )
+
+
+def list_events(
+ event_type: Optional[EventType] = None,
+ task_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
+ source: Optional[str] = None,
+ limit: int = 100,
+ offset: int = 0,
+) -> list[EventLogEntry]:
+ """List events with optional filtering.
+
+ Args:
+ event_type: Filter by event type
+ task_id: Filter by associated task
+ agent_id: Filter by associated agent
+ source: Filter by source component
+ limit: Maximum number of events to return
+ offset: Number of events to skip (for pagination)
+
+ Returns:
+ List of EventLogEntry objects, newest first
+ """
+ conn = _get_conn()
+
+ conditions = []
+ params = []
+
+ if event_type:
+ conditions.append("event_type = ?")
+ params.append(event_type.value)
+ if task_id:
+ conditions.append("task_id = ?")
+ params.append(task_id)
+ if agent_id:
+ conditions.append("agent_id = ?")
+ params.append(agent_id)
+ if source:
+ conditions.append("source = ?")
+ params.append(source)
+
+ where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
+
+ query = f"""
+ SELECT * FROM event_log
+ {where_clause}
+ ORDER BY timestamp DESC
+ LIMIT ? OFFSET ?
+ """
+ params.extend([limit, offset])
+
+ rows = conn.execute(query, params).fetchall()
+ conn.close()
+
+ return [
+ EventLogEntry(
+ id=r["id"],
+ event_type=EventType(r["event_type"]),
+ source=r["source"],
+ task_id=r["task_id"],
+ agent_id=r["agent_id"],
+ data=r["data"],
+ timestamp=r["timestamp"],
+ )
+ for r in rows
+ ]
+
+
+def get_task_events(task_id: str) -> list[EventLogEntry]:
+ """Get all events for a specific task."""
+ return list_events(task_id=task_id, limit=1000)
+
+
+def get_agent_events(agent_id: str) -> list[EventLogEntry]:
+ """Get all events for a specific agent."""
+ return list_events(agent_id=agent_id, limit=1000)
+
+
+def get_recent_events(minutes: int = 60) -> list[EventLogEntry]:
+ """Get events from the last N minutes."""
+ conn = _get_conn()
+
+ from datetime import timedelta
+ cutoff = (datetime.now(timezone.utc) - timedelta(minutes=minutes)).isoformat()
+
+ rows = conn.execute(
+ """
+ SELECT * FROM event_log
+ WHERE timestamp > ?
+ ORDER BY timestamp DESC
+ """,
+ (cutoff,),
+ ).fetchall()
+ conn.close()
+
+ return [
+ EventLogEntry(
+ id=r["id"],
+ event_type=EventType(r["event_type"]),
+ source=r["source"],
+ task_id=r["task_id"],
+ agent_id=r["agent_id"],
+ data=r["data"],
+ timestamp=r["timestamp"],
+ )
+ for r in rows
+ ]
+
+
+def get_event_summary(minutes: int = 60) -> dict:
+ """Get a summary of recent events by type.
+
+ Returns:
+ Dict mapping event types to counts
+ """
+ conn = _get_conn()
+
+ from datetime import timedelta
+ cutoff = (datetime.now(timezone.utc) - timedelta(minutes=minutes)).isoformat()
+
+ rows = conn.execute(
+ """
+ SELECT event_type, COUNT(*) as count
+ FROM event_log
+ WHERE timestamp > ?
+ GROUP BY event_type
+ ORDER BY count DESC
+ """,
+ (cutoff,),
+ ).fetchall()
+ conn.close()
+
+ return {r["event_type"]: r["count"] for r in rows}
+
+
+def prune_events(older_than_days: int = 30) -> int:
+ """Delete events older than specified days.
+
+ Returns:
+ Number of events deleted
+ """
+ conn = _get_conn()
+
+ from datetime import timedelta
+ cutoff = (datetime.now(timezone.utc) - timedelta(days=older_than_days)).isoformat()
+
+ cursor = conn.execute(
+ "DELETE FROM event_log WHERE timestamp < ?",
+ (cutoff,),
+ )
+ deleted = cursor.rowcount
+ conn.commit()
+ conn.close()
+
+ return deleted
diff --git a/src/timmy/cascade_adapter.py b/src/timmy/cascade_adapter.py
new file mode 100644
index 00000000..59984648
--- /dev/null
+++ b/src/timmy/cascade_adapter.py
@@ -0,0 +1,137 @@
+"""Cascade Router adapter for Timmy agent.
+
+Provides automatic failover between LLM providers with:
+- Circuit breaker pattern for failing providers
+- Metrics tracking per provider
+- Priority-based routing (local first, then APIs)
+"""
+
+import logging
+from dataclasses import dataclass
+from typing import Optional
+
+from router.cascade import CascadeRouter
+from timmy.prompts import TIMMY_SYSTEM_PROMPT
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass
+class TimmyResponse:
+ """Response from Timmy via Cascade Router."""
+ content: str
+ provider_used: str
+ latency_ms: float
+ fallback_used: bool = False
+
+
+class TimmyCascadeAdapter:
+ """Adapter that routes Timmy requests through Cascade Router.
+
+ Usage:
+ adapter = TimmyCascadeAdapter()
+ response = await adapter.chat("Hello")
+ print(f"Response: {response.content}")
+ print(f"Provider: {response.provider_used}")
+ """
+
+ def __init__(self, router: Optional[CascadeRouter] = None) -> None:
+ """Initialize adapter with Cascade Router.
+
+ Args:
+ router: CascadeRouter instance. If None, creates default.
+ """
+ self.router = router or CascadeRouter()
+ logger.info("TimmyCascadeAdapter initialized with %d providers",
+ len(self.router.providers))
+
+ async def chat(self, message: str, context: Optional[str] = None) -> TimmyResponse:
+ """Send message through cascade router with automatic failover.
+
+ Args:
+ message: User message
+ context: Optional conversation context
+
+ Returns:
+ TimmyResponse with content and metadata
+ """
+ # Build messages array
+ messages = []
+ if context:
+ messages.append({"role": "system", "content": context})
+ messages.append({"role": "user", "content": message})
+
+ # Route through cascade
+ import time
+ start = time.time()
+
+ try:
+ result = await self.router.complete(
+ messages=messages,
+ system_prompt=TIMMY_SYSTEM_PROMPT,
+ )
+
+ latency = (time.time() - start) * 1000
+
+ # Determine if fallback was used
+ primary = self.router.providers[0] if self.router.providers else None
+ fallback_used = primary and primary.status.value != "healthy"
+
+ return TimmyResponse(
+ content=result.content,
+ provider_used=result.provider_name,
+ latency_ms=latency,
+ fallback_used=fallback_used,
+ )
+
+ except Exception as exc:
+ logger.error("All providers failed: %s", exc)
+ raise
+
+ def get_provider_status(self) -> list[dict]:
+ """Get status of all providers.
+
+ Returns:
+ List of provider status dicts
+ """
+ return [
+ {
+ "name": p.name,
+ "type": p.type,
+ "status": p.status.value,
+ "circuit_state": p.circuit_state.value,
+ "metrics": {
+ "total": p.metrics.total_requests,
+ "success": p.metrics.successful_requests,
+ "failed": p.metrics.failed_requests,
+ "avg_latency_ms": round(p.metrics.avg_latency_ms, 1),
+ "error_rate": round(p.metrics.error_rate, 3),
+ },
+ "priority": p.priority,
+ "enabled": p.enabled,
+ }
+ for p in self.router.providers
+ ]
+
+ def get_preferred_provider(self) -> Optional[str]:
+ """Get name of highest-priority healthy provider.
+
+ Returns:
+ Provider name or None if all unhealthy
+ """
+ for provider in self.router.providers:
+ if provider.status.value == "healthy" and provider.enabled:
+ return provider.name
+ return None
+
+
+# Global singleton for reuse
+_cascade_adapter: Optional[TimmyCascadeAdapter] = None
+
+
+def get_cascade_adapter() -> TimmyCascadeAdapter:
+ """Get or create global cascade adapter singleton."""
+ global _cascade_adapter
+ if _cascade_adapter is None:
+ _cascade_adapter = TimmyCascadeAdapter()
+ return _cascade_adapter
diff --git a/src/timmy_serve/payment_handler.py b/src/timmy_serve/payment_handler.py
index dd42f730..2233ca03 100644
--- a/src/timmy_serve/payment_handler.py
+++ b/src/timmy_serve/payment_handler.py
@@ -5,6 +5,8 @@ The actual backend (mock or LND) is selected via LIGHTNING_BACKEND env var.
For backward compatibility, the PaymentHandler class and payment_handler
singleton are preserved, but they delegate to the lightning backend.
+
+All transactions are logged to the ledger for audit and accounting.
"""
import logging
@@ -13,6 +15,12 @@ from typing import Optional
# Import from the new lightning module
from lightning import get_backend, Invoice
from lightning.base import LightningBackend
+from lightning.ledger import (
+ create_invoice_entry,
+ mark_settled,
+ get_balance,
+ list_transactions,
+)
logger = logging.getLogger(__name__)
@@ -42,22 +50,66 @@ class PaymentHandler:
self._backend = backend or get_backend()
logger.info("PaymentHandler initialized — backend: %s", self._backend.name)
- def create_invoice(self, amount_sats: int, memo: str = "") -> Invoice:
- """Create a new Lightning invoice."""
+ def create_invoice(
+ self,
+ amount_sats: int,
+ memo: str = "",
+ source: str = "payment_handler",
+ task_id: Optional[str] = None,
+ agent_id: Optional[str] = None,
+ ) -> Invoice:
+ """Create a new Lightning invoice.
+
+ Args:
+ amount_sats: Invoice amount in satoshis
+ memo: Payment description
+ source: Component creating the invoice
+ task_id: Associated task ID
+ agent_id: Associated agent ID
+ """
invoice = self._backend.create_invoice(amount_sats, memo)
logger.info(
"Invoice created: %d sats — %s (hash: %s…)",
amount_sats, memo, invoice.payment_hash[:12],
)
+
+ # Log to ledger
+ create_invoice_entry(
+ payment_hash=invoice.payment_hash,
+ amount_sats=amount_sats,
+ memo=memo,
+ invoice=invoice.bolt11 if hasattr(invoice, 'bolt11') else None,
+ source=source,
+ task_id=task_id,
+ agent_id=agent_id,
+ )
+
return invoice
def check_payment(self, payment_hash: str) -> bool:
- """Check whether an invoice has been paid."""
- return self._backend.check_payment(payment_hash)
+ """Check whether an invoice has been paid.
+
+ If paid, updates the ledger entry.
+ """
+ is_paid = self._backend.check_payment(payment_hash)
+
+ if is_paid:
+ # Update ledger entry
+ mark_settled(payment_hash)
+
+ return is_paid
def settle_invoice(self, payment_hash: str, preimage: str) -> bool:
- """Manually settle an invoice with a preimage (for testing)."""
- return self._backend.settle_invoice(payment_hash, preimage)
+ """Manually settle an invoice with a preimage (for testing).
+
+ Also updates the ledger entry.
+ """
+ result = self._backend.settle_invoice(payment_hash, preimage)
+
+ if result:
+ mark_settled(payment_hash, preimage=preimage)
+
+ return result
def get_invoice(self, payment_hash: str) -> Optional[Invoice]:
"""Get invoice details by payment hash."""
@@ -75,6 +127,26 @@ class PaymentHandler:
def backend_name(self) -> str:
"""Get the name of the current backend."""
return self._backend.name
+
+ def get_balance(self) -> dict:
+ """Get current balance summary from ledger.
+
+ Returns:
+ Dict with incoming, outgoing, pending, and available balances
+ """
+ return get_balance()
+
+ def list_transactions(self, limit: int = 100, **filters) -> list:
+ """List transactions from ledger.
+
+ Args:
+ limit: Maximum number of transactions
+ **filters: Optional filters (tx_type, status, task_id, agent_id)
+
+ Returns:
+ List of LedgerEntry objects
+ """
+ return list_transactions(limit=limit, **filters)
# Module-level singleton
diff --git a/src/upgrades/models.py b/src/upgrades/models.py
new file mode 100644
index 00000000..ef67e2f4
--- /dev/null
+++ b/src/upgrades/models.py
@@ -0,0 +1,331 @@
+"""Database models for Self-Upgrade Approval Queue."""
+
+import json
+import sqlite3
+import uuid
+from dataclasses import dataclass, field
+from datetime import datetime, timezone
+from enum import Enum
+from pathlib import Path
+from typing import Optional
+
+DB_PATH = Path("data/swarm.db")
+
+
+class UpgradeStatus(str, Enum):
+ """Status of an upgrade proposal."""
+ PROPOSED = "proposed"
+ APPROVED = "approved"
+ REJECTED = "rejected"
+ APPLIED = "applied"
+ FAILED = "failed"
+ EXPIRED = "expired"
+
+
+@dataclass
+class Upgrade:
+ """A self-modification upgrade proposal."""
+ id: str = field(default_factory=lambda: str(uuid.uuid4()))
+ status: UpgradeStatus = UpgradeStatus.PROPOSED
+
+ # Timestamps
+ proposed_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
+ approved_at: Optional[str] = None
+ applied_at: Optional[str] = None
+ rejected_at: Optional[str] = None
+
+ # Proposal details
+ branch_name: str = ""
+ description: str = ""
+ files_changed: list[str] = field(default_factory=list)
+ diff_preview: str = ""
+
+ # Test results
+ test_passed: bool = False
+ test_output: str = ""
+
+ # Execution results
+ error_message: Optional[str] = None
+ approved_by: Optional[str] = None
+
+
+def _get_conn() -> sqlite3.Connection:
+ """Get database connection with schema initialized."""
+ DB_PATH.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(DB_PATH))
+ conn.row_factory = sqlite3.Row
+
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS upgrades (
+ id TEXT PRIMARY KEY,
+ status TEXT NOT NULL DEFAULT 'proposed',
+ proposed_at TEXT NOT NULL,
+ approved_at TEXT,
+ applied_at TEXT,
+ rejected_at TEXT,
+ branch_name TEXT NOT NULL,
+ description TEXT NOT NULL,
+ files_changed TEXT, -- JSON array
+ diff_preview TEXT,
+ test_passed INTEGER DEFAULT 0,
+ test_output TEXT,
+ error_message TEXT,
+ approved_by TEXT
+ )
+ """
+ )
+
+ # Indexes
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_upgrades_status ON upgrades(status)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_upgrades_proposed ON upgrades(proposed_at)")
+
+ conn.commit()
+ return conn
+
+
+def create_upgrade(
+ branch_name: str,
+ description: str,
+ files_changed: list[str],
+ diff_preview: str,
+ test_passed: bool = False,
+ test_output: str = "",
+) -> Upgrade:
+ """Create a new upgrade proposal.
+
+ Args:
+ branch_name: Git branch name for the upgrade
+ description: Human-readable description
+ files_changed: List of files that would be modified
+ diff_preview: Short diff preview for review
+ test_passed: Whether tests passed on the branch
+ test_output: Test output text
+
+ Returns:
+ The created Upgrade
+ """
+ upgrade = Upgrade(
+ branch_name=branch_name,
+ description=description,
+ files_changed=files_changed,
+ diff_preview=diff_preview,
+ test_passed=test_passed,
+ test_output=test_output,
+ )
+
+ conn = _get_conn()
+ conn.execute(
+ """
+ INSERT INTO upgrades (id, status, proposed_at, branch_name, description,
+ files_changed, diff_preview, test_passed, test_output)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """,
+ (
+ upgrade.id,
+ upgrade.status.value,
+ upgrade.proposed_at,
+ upgrade.branch_name,
+ upgrade.description,
+ json.dumps(files_changed),
+ upgrade.diff_preview,
+ int(test_passed),
+ test_output,
+ ),
+ )
+ conn.commit()
+ conn.close()
+
+ return upgrade
+
+
+def get_upgrade(upgrade_id: str) -> Optional[Upgrade]:
+ """Get upgrade by ID."""
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT * FROM upgrades WHERE id = ?", (upgrade_id,)
+ ).fetchone()
+ conn.close()
+
+ if not row:
+ return None
+
+ return Upgrade(
+ id=row["id"],
+ status=UpgradeStatus(row["status"]),
+ proposed_at=row["proposed_at"],
+ approved_at=row["approved_at"],
+ applied_at=row["applied_at"],
+ rejected_at=row["rejected_at"],
+ branch_name=row["branch_name"],
+ description=row["description"],
+ files_changed=json.loads(row["files_changed"]) if row["files_changed"] else [],
+ diff_preview=row["diff_preview"] or "",
+ test_passed=bool(row["test_passed"]),
+ test_output=row["test_output"] or "",
+ error_message=row["error_message"],
+ approved_by=row["approved_by"],
+ )
+
+
+def list_upgrades(
+ status: Optional[UpgradeStatus] = None,
+ limit: int = 100,
+) -> list[Upgrade]:
+ """List upgrades, optionally filtered by status."""
+ conn = _get_conn()
+
+ if status:
+ rows = conn.execute(
+ "SELECT * FROM upgrades WHERE status = ? ORDER BY proposed_at DESC LIMIT ?",
+ (status.value, limit),
+ ).fetchall()
+ else:
+ rows = conn.execute(
+ "SELECT * FROM upgrades ORDER BY proposed_at DESC LIMIT ?",
+ (limit,),
+ ).fetchall()
+
+ conn.close()
+
+ return [
+ Upgrade(
+ id=r["id"],
+ status=UpgradeStatus(r["status"]),
+ proposed_at=r["proposed_at"],
+ approved_at=r["approved_at"],
+ applied_at=r["applied_at"],
+ rejected_at=r["rejected_at"],
+ branch_name=r["branch_name"],
+ description=r["description"],
+ files_changed=json.loads(r["files_changed"]) if r["files_changed"] else [],
+ diff_preview=r["diff_preview"] or "",
+ test_passed=bool(r["test_passed"]),
+ test_output=r["test_output"] or "",
+ error_message=r["error_message"],
+ approved_by=r["approved_by"],
+ )
+ for r in rows
+ ]
+
+
+def approve_upgrade(upgrade_id: str, approved_by: str = "dashboard") -> Optional[Upgrade]:
+ """Approve an upgrade proposal."""
+ now = datetime.now(timezone.utc).isoformat()
+
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ UPDATE upgrades
+ SET status = ?, approved_at = ?, approved_by = ?
+ WHERE id = ? AND status = ?
+ """,
+ (UpgradeStatus.APPROVED.value, now, approved_by, upgrade_id, UpgradeStatus.PROPOSED.value),
+ )
+ conn.commit()
+ updated = cursor.rowcount > 0
+ conn.close()
+
+ if not updated:
+ return None
+
+ return get_upgrade(upgrade_id)
+
+
+def reject_upgrade(upgrade_id: str) -> Optional[Upgrade]:
+ """Reject an upgrade proposal."""
+ now = datetime.now(timezone.utc).isoformat()
+
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ UPDATE upgrades
+ SET status = ?, rejected_at = ?
+ WHERE id = ? AND status = ?
+ """,
+ (UpgradeStatus.REJECTED.value, now, upgrade_id, UpgradeStatus.PROPOSED.value),
+ )
+ conn.commit()
+ updated = cursor.rowcount > 0
+ conn.close()
+
+ if not updated:
+ return None
+
+ return get_upgrade(upgrade_id)
+
+
+def mark_applied(upgrade_id: str) -> Optional[Upgrade]:
+ """Mark upgrade as successfully applied."""
+ now = datetime.now(timezone.utc).isoformat()
+
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ UPDATE upgrades
+ SET status = ?, applied_at = ?
+ WHERE id = ? AND status = ?
+ """,
+ (UpgradeStatus.APPLIED.value, now, upgrade_id, UpgradeStatus.APPROVED.value),
+ )
+ conn.commit()
+ updated = cursor.rowcount > 0
+ conn.close()
+
+ if not updated:
+ return None
+
+ return get_upgrade(upgrade_id)
+
+
+def mark_failed(upgrade_id: str, error_message: str) -> Optional[Upgrade]:
+ """Mark upgrade as failed."""
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ UPDATE upgrades
+ SET status = ?, error_message = ?
+ WHERE id = ? AND status = ?
+ """,
+ (UpgradeStatus.FAILED.value, error_message, upgrade_id, UpgradeStatus.APPROVED.value),
+ )
+ conn.commit()
+ updated = cursor.rowcount > 0
+ conn.close()
+
+ if not updated:
+ return None
+
+ return get_upgrade(upgrade_id)
+
+
+def get_pending_count() -> int:
+ """Get count of pending (proposed) upgrades."""
+ conn = _get_conn()
+ row = conn.execute(
+ "SELECT COUNT(*) as count FROM upgrades WHERE status = ?",
+ (UpgradeStatus.PROPOSED.value,),
+ ).fetchone()
+ conn.close()
+ return row["count"]
+
+
+def prune_old_upgrades(older_than_days: int = 30) -> int:
+ """Delete old completed upgrades."""
+ from datetime import timedelta
+
+ cutoff = (datetime.now(timezone.utc) - timedelta(days=older_than_days)).isoformat()
+
+ conn = _get_conn()
+ cursor = conn.execute(
+ """
+ DELETE FROM upgrades
+ WHERE proposed_at < ? AND status IN ('applied', 'rejected', 'failed')
+ """,
+ (cutoff,),
+ )
+ deleted = cursor.rowcount
+ conn.commit()
+ conn.close()
+
+ return deleted
diff --git a/src/upgrades/queue.py b/src/upgrades/queue.py
new file mode 100644
index 00000000..8b80ef68
--- /dev/null
+++ b/src/upgrades/queue.py
@@ -0,0 +1,285 @@
+"""Upgrade Queue management - bridges self-modify loop with approval workflow."""
+
+import logging
+import subprocess
+from pathlib import Path
+from typing import Optional
+
+from upgrades.models import (
+ Upgrade,
+ UpgradeStatus,
+ create_upgrade,
+ get_upgrade,
+ approve_upgrade,
+ reject_upgrade,
+ mark_applied,
+ mark_failed,
+)
+
+logger = logging.getLogger(__name__)
+
+PROJECT_ROOT = Path(__file__).parent.parent.parent
+
+
+class UpgradeQueue:
+ """Manages the upgrade approval and application workflow."""
+
+ @staticmethod
+ def propose(
+ branch_name: str,
+ description: str,
+ files_changed: list[str],
+ diff_preview: str,
+ test_passed: bool = False,
+ test_output: str = "",
+ ) -> Upgrade:
+ """Propose a new upgrade for approval.
+
+ This is called by the self-modify loop when it generates changes.
+ The upgrade is created in 'proposed' state and waits for human approval.
+
+ Args:
+ branch_name: Git branch with the changes
+ description: What the upgrade does
+ files_changed: List of modified files
+ diff_preview: Short diff for review
+ test_passed: Whether tests passed
+ test_output: Test output
+
+ Returns:
+ The created Upgrade proposal
+ """
+ upgrade = create_upgrade(
+ branch_name=branch_name,
+ description=description,
+ files_changed=files_changed,
+ diff_preview=diff_preview,
+ test_passed=test_passed,
+ test_output=test_output,
+ )
+
+ logger.info(
+ "Upgrade proposed: %s (%s) - %d files",
+ upgrade.id[:8],
+ branch_name,
+ len(files_changed),
+ )
+
+ # Log to event log
+ try:
+ from swarm.event_log import log_event, EventType
+ log_event(
+ EventType.SYSTEM_INFO,
+ source="upgrade_queue",
+ data={
+ "upgrade_id": upgrade.id,
+ "branch": branch_name,
+ "description": description,
+ "test_passed": test_passed,
+ },
+ )
+ except Exception:
+ pass
+
+ return upgrade
+
+ @staticmethod
+ def approve(upgrade_id: str, approved_by: str = "dashboard") -> Optional[Upgrade]:
+ """Approve an upgrade proposal.
+
+ Called from dashboard when user clicks "Approve".
+ Does NOT apply the upgrade - that happens separately.
+
+ Args:
+ upgrade_id: The upgrade to approve
+ approved_by: Who approved it (for audit)
+
+ Returns:
+ Updated Upgrade or None if not found/not in proposed state
+ """
+ upgrade = approve_upgrade(upgrade_id, approved_by)
+
+ if upgrade:
+ logger.info("Upgrade approved: %s by %s", upgrade_id[:8], approved_by)
+
+ return upgrade
+
+ @staticmethod
+ def reject(upgrade_id: str) -> Optional[Upgrade]:
+ """Reject an upgrade proposal.
+
+ Called from dashboard when user clicks "Reject".
+ Cleans up the branch.
+
+ Args:
+ upgrade_id: The upgrade to reject
+
+ Returns:
+ Updated Upgrade or None
+ """
+ upgrade = reject_upgrade(upgrade_id)
+
+ if upgrade:
+ logger.info("Upgrade rejected: %s", upgrade_id[:8])
+
+ # Clean up branch
+ try:
+ subprocess.run(
+ ["git", "branch", "-D", upgrade.branch_name],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ check=False,
+ )
+ except Exception as exc:
+ logger.warning("Failed to delete branch %s: %s", upgrade.branch_name, exc)
+
+ return upgrade
+
+ @staticmethod
+ def apply(upgrade_id: str) -> tuple[bool, str]:
+ """Apply an approved upgrade.
+
+ This is the critical operation that actually modifies the codebase:
+ 1. Checks out the branch
+ 2. Runs tests
+ 3. If tests pass: merges to main
+ 4. Updates upgrade status
+
+ Args:
+ upgrade_id: The approved upgrade to apply
+
+ Returns:
+ (success, message) tuple
+ """
+ upgrade = get_upgrade(upgrade_id)
+
+ if not upgrade:
+ return False, "Upgrade not found"
+
+ if upgrade.status != UpgradeStatus.APPROVED:
+ return False, f"Upgrade not approved (status: {upgrade.status.value})"
+
+ logger.info("Applying upgrade: %s (%s)", upgrade_id[:8], upgrade.branch_name)
+
+ try:
+ # 1. Checkout branch
+ result = subprocess.run(
+ ["git", "checkout", upgrade.branch_name],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ mark_failed(upgrade_id, f"Checkout failed: {result.stderr}")
+ return False, f"Failed to checkout branch: {result.stderr}"
+
+ # 2. Run tests
+ result = subprocess.run(
+ ["python", "-m", "pytest", "tests/", "-x", "-q"],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ text=True,
+ timeout=120,
+ )
+
+ if result.returncode != 0:
+ mark_failed(upgrade_id, f"Tests failed: {result.stdout}\n{result.stderr}")
+ # Switch back to main
+ subprocess.run(["git", "checkout", "main"], cwd=PROJECT_ROOT, check=False)
+ return False, "Tests failed"
+
+ # 3. Merge to main
+ result = subprocess.run(
+ ["git", "checkout", "main"],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ mark_failed(upgrade_id, f"Failed to checkout main: {result.stderr}")
+ return False, "Failed to checkout main"
+
+ result = subprocess.run(
+ ["git", "merge", "--no-ff", upgrade.branch_name, "-m", f"Apply upgrade: {upgrade.description}"],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ text=True,
+ )
+ if result.returncode != 0:
+ mark_failed(upgrade_id, f"Merge failed: {result.stderr}")
+ return False, "Merge failed"
+
+ # 4. Mark as applied
+ mark_applied(upgrade_id)
+
+ # 5. Clean up branch
+ subprocess.run(
+ ["git", "branch", "-d", upgrade.branch_name],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ check=False,
+ )
+
+ logger.info("Upgrade applied successfully: %s", upgrade_id[:8])
+ return True, "Upgrade applied successfully"
+
+ except subprocess.TimeoutExpired:
+ mark_failed(upgrade_id, "Tests timed out")
+ subprocess.run(["git", "checkout", "main"], cwd=PROJECT_ROOT, check=False)
+ return False, "Tests timed out"
+
+ except Exception as exc:
+ error_msg = str(exc)
+ mark_failed(upgrade_id, error_msg)
+ subprocess.run(["git", "checkout", "main"], cwd=PROJECT_ROOT, check=False)
+ return False, f"Error: {error_msg}"
+
+ @staticmethod
+ def get_full_diff(upgrade_id: str) -> str:
+ """Get full git diff for an upgrade.
+
+ Args:
+ upgrade_id: The upgrade to get diff for
+
+ Returns:
+ Git diff output
+ """
+ upgrade = get_upgrade(upgrade_id)
+ if not upgrade:
+ return "Upgrade not found"
+
+ try:
+ result = subprocess.run(
+ ["git", "diff", "main..." + upgrade.branch_name],
+ cwd=PROJECT_ROOT,
+ capture_output=True,
+ text=True,
+ )
+ return result.stdout if result.returncode == 0 else result.stderr
+ except Exception as exc:
+ return f"Error getting diff: {exc}"
+
+
+# Convenience functions for self-modify loop
+def propose_upgrade_from_loop(
+ branch_name: str,
+ description: str,
+ files_changed: list[str],
+ diff: str,
+ test_output: str = "",
+) -> Upgrade:
+ """Called by self-modify loop to propose an upgrade.
+
+ Tests are expected to have been run by the loop before calling this.
+ """
+ # Check if tests passed from output
+ test_passed = "passed" in test_output.lower() or " PASSED " in test_output
+
+ return UpgradeQueue.propose(
+ branch_name=branch_name,
+ description=description,
+ files_changed=files_changed,
+ diff_preview=diff[:2000], # First 2000 chars
+ test_passed=test_passed,
+ test_output=test_output,
+ )
diff --git a/src/ws_manager/handler.py b/src/ws_manager/handler.py
index 304f9b05..5435b0f2 100644
--- a/src/ws_manager/handler.py
+++ b/src/ws_manager/handler.py
@@ -119,6 +119,34 @@ class WebSocketManager:
def connection_count(self) -> int:
return len(self._connections)
+ async def broadcast_json(self, data: dict) -> int:
+ """Broadcast raw JSON data to all connected clients.
+
+ Args:
+ data: Dictionary to send as JSON
+
+ Returns:
+ Number of clients notified
+ """
+ import json
+
+ message = json.dumps(data)
+ disconnected = []
+ count = 0
+
+ for ws in self._connections:
+ try:
+ await ws.send_text(message)
+ count += 1
+ except Exception:
+ disconnected.append(ws)
+
+ # Clean up dead connections
+ for ws in disconnected:
+ self.disconnect(ws)
+
+ return count
+
@property
def event_history(self) -> list[WSEvent]:
return list(self._event_history)
diff --git a/tests/functional/conftest.py b/tests/functional/conftest.py
index bc5cd663..3eb1d866 100644
--- a/tests/functional/conftest.py
+++ b/tests/functional/conftest.py
@@ -1,185 +1,96 @@
-"""Functional test fixtures — real services, no mocking.
-
-These fixtures provide:
-- TestClient hitting the real FastAPI app (singletons, SQLite, etc.)
-- Typer CliRunner for CLI commands
-- Real temporary SQLite for swarm state
-- Real payment handler with mock lightning backend (LIGHTNING_BACKEND=mock)
-- Docker compose lifecycle for container-level tests
-"""
+"""Shared fixtures for functional/E2E tests."""
import os
import subprocess
import sys
import time
-from pathlib import Path
-from unittest.mock import MagicMock
+import urllib.request
import pytest
-from fastapi.testclient import TestClient
-# ── Stub heavy optional deps (same as root conftest) ─────────────────────────
-# These aren't mocks — they're import compatibility shims for packages
-# not installed in the test environment. The code under test handles
-# their absence via try/except ImportError.
-for _mod in [
- "agno", "agno.agent", "agno.models", "agno.models.ollama",
- "agno.db", "agno.db.sqlite",
- "airllm",
- "telegram", "telegram.ext",
-]:
- sys.modules.setdefault(_mod, MagicMock())
-
-os.environ["TIMMY_TEST_MODE"] = "1"
+# Default dashboard URL - override with DASHBOARD_URL env var
+DASHBOARD_URL = os.environ.get("DASHBOARD_URL", "http://localhost:8000")
-# ── Isolation: fresh coordinator state per test ───────────────────────────────
-
-@pytest.fixture(autouse=True)
-def _isolate_state():
- """Reset all singleton state between tests so they can't leak."""
- from dashboard.store import message_log
- message_log.clear()
- yield
- message_log.clear()
- from swarm.coordinator import coordinator
- coordinator.auctions._auctions.clear()
- coordinator.comms._listeners.clear()
- coordinator._in_process_nodes.clear()
- coordinator.manager.stop_all()
+def is_server_running():
+ """Check if dashboard is already running."""
try:
- from swarm import routing
- routing.routing_engine._manifests.clear()
+ urllib.request.urlopen(f"{DASHBOARD_URL}/health", timeout=2)
+ return True
except Exception:
- pass
-
-
-# ── TestClient with real app, no patches ──────────────────────────────────────
-
-@pytest.fixture
-def app_client(tmp_path):
- """TestClient wrapping the real dashboard app.
-
- Uses a tmp_path for swarm SQLite so tests don't pollute each other.
- No mocking — Ollama is offline (graceful degradation), singletons are real.
- """
- data_dir = tmp_path / "data"
- data_dir.mkdir()
-
- import swarm.tasks as tasks_mod
- import swarm.registry as registry_mod
- original_tasks_db = tasks_mod.DB_PATH
- original_reg_db = registry_mod.DB_PATH
-
- tasks_mod.DB_PATH = data_dir / "swarm.db"
- registry_mod.DB_PATH = data_dir / "swarm.db"
-
- from dashboard.app import app
- with TestClient(app) as c:
- yield c
-
- tasks_mod.DB_PATH = original_tasks_db
- registry_mod.DB_PATH = original_reg_db
-
-
-# ── Timmy-serve TestClient ────────────────────────────────────────────────────
-
-@pytest.fixture
-def serve_client():
- """TestClient wrapping the timmy-serve L402 app.
-
- Uses real mock-lightning backend (LIGHTNING_BACKEND=mock).
- """
- from timmy_serve.app import create_timmy_serve_app
-
- app = create_timmy_serve_app(price_sats=100)
- with TestClient(app) as c:
- yield c
-
-
-# ── CLI runners ───────────────────────────────────────────────────────────────
-
-@pytest.fixture
-def timmy_runner():
- """Typer CliRunner + app for the `timmy` CLI."""
- from typer.testing import CliRunner
- from timmy.cli import app
- return CliRunner(), app
-
-
-@pytest.fixture
-def serve_runner():
- """Typer CliRunner + app for the `timmy-serve` CLI."""
- from typer.testing import CliRunner
- from timmy_serve.cli import app
- return CliRunner(), app
-
-
-@pytest.fixture
-def tdd_runner():
- """Typer CliRunner + app for the `self-tdd` CLI."""
- from typer.testing import CliRunner
- from self_tdd.watchdog import app
- return CliRunner(), app
-
-
-# ── Docker compose lifecycle ──────────────────────────────────────────────────
-
-PROJECT_ROOT = Path(__file__).parent.parent.parent
-COMPOSE_TEST = PROJECT_ROOT / "docker-compose.test.yml"
-
-
-def _compose(*args, timeout=60):
- """Run a docker compose command against the test compose file."""
- cmd = ["docker", "compose", "-f", str(COMPOSE_TEST), "-p", "timmy-test", *args]
- return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, cwd=str(PROJECT_ROOT))
-
-
-def _wait_for_healthy(url: str, retries=30, interval=2):
- """Poll a URL until it returns 200 or we run out of retries."""
- import httpx
- for i in range(retries):
- try:
- r = httpx.get(url, timeout=5)
- if r.status_code == 200:
- return True
- except Exception:
- pass
- time.sleep(interval)
- return False
+ return False
@pytest.fixture(scope="session")
-def docker_stack():
- """Spin up the test compose stack once per session.
-
- Yields a base URL (http://localhost:18000) to hit the dashboard.
- Tears down after all tests complete.
-
- Skipped unless FUNCTIONAL_DOCKER=1 is set.
+def live_server():
+ """Start the real Timmy server for E2E tests.
+
+ Yields the base URL (http://localhost:8000).
+ Kills the server after tests complete.
"""
- if not COMPOSE_TEST.exists():
- pytest.skip("docker-compose.test.yml not found")
- if os.environ.get("FUNCTIONAL_DOCKER") != "1":
- pytest.skip("Set FUNCTIONAL_DOCKER=1 to run Docker tests")
-
- # Verify Docker daemon is reachable before attempting build
- docker_check = subprocess.run(
- ["docker", "info"], capture_output=True, text=True, timeout=10,
+ # Check if server already running
+ if is_server_running():
+ print(f"\n📡 Using existing server at {DASHBOARD_URL}")
+ yield DASHBOARD_URL
+ return
+
+ # Start server in subprocess
+ print(f"\n🚀 Starting server on {DASHBOARD_URL}...")
+
+ env = os.environ.copy()
+ env["PYTHONPATH"] = "src"
+ env["TIMMY_ENV"] = "test" # Use test config if available
+
+ # Determine project root
+ project_root = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
+
+ proc = subprocess.Popen(
+ [sys.executable, "-m", "uvicorn", "dashboard.app:app",
+ "--host", "127.0.0.1", "--port", "8000",
+ "--log-level", "warning"],
+ cwd=project_root,
+ env=env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
)
- if docker_check.returncode != 0:
- pytest.skip(f"Docker daemon not available: {docker_check.stderr.strip()}")
+
+ # Wait for server to start
+ max_retries = 30
+ for i in range(max_retries):
+ if is_server_running():
+ print(f"✅ Server ready!")
+ break
+ time.sleep(1)
+ print(f"⏳ Waiting for server... ({i+1}/{max_retries})")
+ else:
+ proc.terminate()
+ proc.wait()
+ raise RuntimeError("Server failed to start")
+
+ yield DASHBOARD_URL
+
+ # Cleanup
+ print("\n🛑 Stopping server...")
+ proc.terminate()
+ try:
+ proc.wait(timeout=5)
+ except subprocess.TimeoutExpired:
+ proc.kill()
+ proc.wait()
+ print("✅ Server stopped")
- result = _compose("up", "-d", "--build", "--wait", timeout=300)
- if result.returncode != 0:
- pytest.fail(f"docker compose up failed:\n{result.stderr}")
- base_url = "http://localhost:18000"
- if not _wait_for_healthy(f"{base_url}/health"):
- logs = _compose("logs")
- _compose("down", "-v")
- pytest.fail(f"Dashboard never became healthy:\n{logs.stdout}")
+# Add custom pytest option for headed mode
+def pytest_addoption(parser):
+ parser.addoption(
+ "--headed",
+ action="store_true",
+ default=False,
+ help="Run browser in non-headless mode (visible)",
+ )
- yield base_url
- _compose("down", "-v", timeout=60)
+@pytest.fixture
+def headed_mode(request):
+ """Check if --headed flag was passed."""
+ return request.config.getoption("--headed")
diff --git a/tests/functional/test_activity_feed_e2e.py b/tests/functional/test_activity_feed_e2e.py
new file mode 100644
index 00000000..23b2725d
--- /dev/null
+++ b/tests/functional/test_activity_feed_e2e.py
@@ -0,0 +1,211 @@
+"""E2E tests for Real-Time Activity Feed.
+
+RUN: pytest tests/functional/test_activity_feed_e2e.py -v --headed
+"""
+
+import os
+import time
+
+import pytest
+from selenium import webdriver
+from selenium.webdriver.chrome.options import Options
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.support.ui import WebDriverWait
+import httpx
+
+from .conftest import DASHBOARD_URL
+
+
+@pytest.fixture
+def driver():
+ """Non-headless Chrome so you can watch."""
+ opts = Options()
+ opts.add_argument("--no-sandbox")
+ opts.add_argument("--disable-dev-shm-usage")
+ opts.add_argument("--window-size=1400,900")
+
+ d = webdriver.Chrome(options=opts)
+ d.implicitly_wait(5)
+ yield d
+ d.quit()
+
+
+class TestActivityFeedUI:
+ """Real-time activity feed on dashboard."""
+
+ def test_activity_feed_exists_on_swarm_live(self, driver):
+ """Swarm live page has activity feed panel."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ # Look for activity feed
+ feed = driver.find_elements(
+ By.CSS_SELECTOR, ".activity-feed, .live-feed, .events-feed"
+ )
+
+ # Or look for activity header
+ headers = driver.find_elements(
+ By.XPATH, "//*[contains(text(), 'Activity') or contains(text(), 'Live')]"
+ )
+
+ assert feed or headers, "Should have activity feed panel"
+
+ def test_activity_feed_shows_events(self, driver):
+ """Activity feed displays events."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ time.sleep(2) # Let feed load
+
+ # Look for event items
+ events = driver.find_elements(By.CSS_SELECTOR, ".event-item, .activity-item")
+
+ # Or empty state
+ empty = driver.find_elements(By.XPATH, "//*[contains(text(), 'No activity')]")
+
+ assert events or empty, "Should show events or empty state"
+
+ def test_activity_feed_updates_in_realtime(self, driver):
+ """Creating a task shows up in activity feed immediately.
+
+ This tests the WebSocket real-time update.
+ """
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ # Get initial event count
+ initial = len(driver.find_elements(By.CSS_SELECTOR, ".event-item"))
+
+ # Create a task via API (this should trigger event)
+ task_desc = f"Activity test {time.time()}"
+ try:
+ httpx.post(
+ f"{DASHBOARD_URL}/swarm/tasks",
+ data={"description": task_desc},
+ timeout=5
+ )
+ except Exception:
+ pass # Task may not complete, but event should still fire
+
+ # Wait for WebSocket update
+ time.sleep(3)
+
+ # Check for new event
+ current = len(driver.find_elements(By.CSS_SELECTOR, ".event-item"))
+
+ # Or check for task-related text
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+ has_task_event = "task" in page_text and "created" in page_text
+
+ assert current > initial or has_task_event, "Should see new activity"
+
+ def test_activity_feed_shows_task_events(self, driver):
+ """Task lifecycle events appear in feed."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ time.sleep(2)
+
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+
+ # Should see task-related events if any exist
+ task_related = any(x in page_text for x in [
+ "task.created", "task assigned", "task completed", "new task"
+ ])
+
+ # Not a failure if no tasks exist, just check the feed is there
+ feed_exists = driver.find_elements(By.CSS_SELECTOR, ".activity-feed")
+ assert feed_exists, "Activity feed should exist"
+
+ def test_activity_feed_shows_agent_events(self, driver):
+ """Agent join/leave events appear in feed."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ time.sleep(2)
+
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+
+ # Should see agent-related events if any exist
+ agent_related = any(x in page_text for x in [
+ "agent joined", "agent left", "agent status"
+ ])
+
+ # Feed should exist regardless
+ feed = driver.find_elements(By.CSS_SELECTOR, ".activity-feed, .live-feed")
+
+ def test_activity_feed_shows_bid_events(self, driver):
+ """Bid events appear in feed."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ time.sleep(2)
+
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+
+ # Look for bid-related text
+ bid_related = any(x in page_text for x in [
+ "bid", "sats", "auction"
+ ])
+
+ def test_activity_feed_timestamps(self, driver):
+ """Events show timestamps."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ time.sleep(2)
+
+ # Look for time patterns
+ page_text = driver.find_element(By.TAG_NAME, "body").text
+
+ # Should have timestamps (HH:MM format)
+ import re
+ time_pattern = re.search(r'\d{1,2}:\d{2}', page_text)
+
+ # If there are events, they should have timestamps
+ events = driver.find_elements(By.CSS_SELECTOR, ".event-item")
+ if events:
+ assert time_pattern, "Events should have timestamps"
+
+ def test_activity_feed_icons(self, driver):
+ """Different event types have different icons."""
+ driver.get(f"{DASHBOARD_URL}/swarm/live")
+
+ time.sleep(2)
+
+ # Look for icons or visual indicators
+ icons = driver.find_elements(By.CSS_SELECTOR, ".event-icon, .activity-icon, .icon")
+
+ # Not required but nice to have
+
+
+class TestActivityFeedIntegration:
+ """Activity feed integration with other features."""
+
+ def test_activity_appears_in_event_log(self, driver):
+ """Activity feed events are also in event log page."""
+ # Create a task
+ try:
+ httpx.post(
+ f"{DASHBOARD_URL}/swarm/tasks",
+ data={"description": "Integration test task"},
+ timeout=5
+ )
+ except Exception:
+ pass
+
+ time.sleep(2)
+
+ # Check event log
+ driver.get(f"{DASHBOARD_URL}/swarm/events")
+
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+ assert "task" in page_text, "Event log should show task events"
+
+ def test_nav_to_swarm_live(self, driver):
+ """Can navigate to swarm live page."""
+ driver.get(DASHBOARD_URL)
+
+ # Look for swarm/live link
+ live_link = driver.find_elements(
+ By.XPATH, "//a[contains(@href, '/swarm/live') or contains(text(), 'Live')]"
+ )
+
+ if live_link:
+ live_link[0].click()
+ time.sleep(1)
+ assert "/swarm/live" in driver.current_url
diff --git a/tests/functional/test_cascade_router_e2e.py b/tests/functional/test_cascade_router_e2e.py
new file mode 100644
index 00000000..af4623f3
--- /dev/null
+++ b/tests/functional/test_cascade_router_e2e.py
@@ -0,0 +1,133 @@
+"""E2E tests for Cascade Router Integration.
+
+RUN: pytest tests/functional/test_cascade_router_e2e.py -v --headed
+"""
+
+import os
+import time
+
+import pytest
+from selenium import webdriver
+from selenium.webdriver.chrome.options import Options
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.support.ui import WebDriverWait
+
+from .conftest import DASHBOARD_URL
+
+
+@pytest.fixture
+def driver():
+ """Non-headless Chrome so you can watch."""
+ opts = Options()
+ # NO --headless - you will see the browser!
+ opts.add_argument("--no-sandbox")
+ opts.add_argument("--disable-dev-shm-usage")
+ opts.add_argument("--window-size=1400,900")
+
+ d = webdriver.Chrome(options=opts)
+ d.implicitly_wait(5)
+ yield d
+ d.quit()
+
+
+class TestCascadeRouterUI:
+ """Cascade Router dashboard and failover behavior."""
+
+ def test_router_status_page_exists(self, driver):
+ """Router status page loads at /router/status."""
+ driver.get(f"{DASHBOARD_URL}/router/status")
+
+ header = WebDriverWait(driver, 10).until(
+ EC.presence_of_element_located((By.TAG_NAME, "h1"))
+ )
+ assert "router" in header.text.lower() or "provider" in header.text.lower()
+
+ # Should show provider list
+ providers = driver.find_elements(By.CSS_SELECTOR, ".provider-card, .provider-row")
+ assert len(providers) >= 1, "Should show at least one provider"
+
+ def test_router_shows_ollama_provider(self, driver):
+ """Ollama provider is listed as priority 1."""
+ driver.get(f"{DASHBOARD_URL}/router/status")
+
+ # Look for Ollama
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+ assert "ollama" in page_text, "Should show Ollama provider"
+
+ def test_router_shows_provider_health(self, driver):
+ """Each provider shows health status (healthy/degraded/unhealthy)."""
+ driver.get(f"{DASHBOARD_URL}/router/status")
+
+ # Look for health indicators
+ health_badges = driver.find_elements(
+ By.CSS_SELECTOR, ".health-badge, .status-healthy, .status-degraded, .status-unhealthy"
+ )
+ assert len(health_badges) >= 1, "Should show health status"
+
+ def test_router_shows_metrics(self, driver):
+ """Providers show request counts, latency, error rates."""
+ driver.get(f"{DASHBOARD_URL}/router/status")
+
+ # Look for metrics
+ page_text = driver.find_element(By.TAG_NAME, "body").text
+
+ # Should show some metrics
+ has_requests = "request" in page_text.lower()
+ has_latency = "ms" in page_text.lower() or "latency" in page_text.lower()
+
+ assert has_requests or has_latency, "Should show provider metrics"
+
+ def test_chat_uses_cascade_router(self, driver):
+ """Sending chat message routes through cascade (may show provider used)."""
+ driver.get(DASHBOARD_URL)
+
+ # Wait for chat to load
+ chat_input = WebDriverWait(driver, 10).until(
+ EC.presence_of_element_located((By.CSS_SELECTOR, "input[name='message']"))
+ )
+
+ # Send a message
+ chat_input.send_keys("test cascade routing")
+ chat_input.send_keys(Keys.RETURN)
+
+ # Wait for response
+ time.sleep(5)
+
+ # Should get some response (even if error)
+ messages = driver.find_elements(By.CSS_SELECTOR, ".chat-message")
+ assert len(messages) >= 2, "Should have user message and response"
+
+ def test_nav_link_to_router(self, driver):
+ """Navigation menu has link to router status."""
+ driver.get(DASHBOARD_URL)
+
+ # Look for router link
+ router_link = driver.find_elements(
+ By.XPATH, "//a[contains(@href, '/router') or contains(text(), 'Router')]"
+ )
+
+ if router_link:
+ router_link[0].click()
+ time.sleep(1)
+ assert "/router" in driver.current_url
+
+
+class TestCascadeFailover:
+ """Router failover behavior (if we can simulate failures)."""
+
+ def test_fallback_to_next_provider_on_failure(self, driver):
+ """If primary fails, automatically uses secondary."""
+ # This is hard to test in E2E without actually breaking Ollama
+ # We'll just verify the router has multiple providers configured
+
+ driver.get(f"{DASHBOARD_URL}/router/status")
+
+ # Count providers
+ providers = driver.find_elements(By.CSS_SELECTOR, ".provider-card, .provider-row")
+
+ # If multiple providers, failover is possible
+ if len(providers) >= 2:
+ # Look for priority numbers
+ page_text = driver.find_element(By.TAG_NAME, "body").text
+ assert "priority" in page_text.lower() or "1" in page_text or "2" in page_text
diff --git a/tests/functional/test_new_features_e2e.py b/tests/functional/test_new_features_e2e.py
new file mode 100644
index 00000000..0d650b04
--- /dev/null
+++ b/tests/functional/test_new_features_e2e.py
@@ -0,0 +1,289 @@
+"""E2E tests for new features: Event Log, Ledger, Memory.
+
+REQUIRES: Dashboard running at http://localhost:8000
+RUN: SELENIUM_UI=1 pytest tests/functional/test_new_features_e2e.py -v
+
+These tests verify the new features through the actual UI:
+1. Event Log - viewable in dashboard
+2. Lightning Ledger - balance and transactions visible
+3. Semantic Memory - searchable memory browser
+"""
+
+import os
+import time
+
+import pytest
+from selenium import webdriver
+from selenium.webdriver.chrome.options import Options
+from selenium.webdriver.common.by import By
+from selenium.webdriver.common.keys import Keys
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.support.ui import WebDriverWait
+
+pytestmark = pytest.mark.skipif(
+ os.environ.get("SELENIUM_UI") != "1",
+ reason="Set SELENIUM_UI=1 to run Selenium UI tests",
+)
+
+@pytest.fixture(scope="module")
+def driver():
+ """Headless Chrome WebDriver."""
+ opts = Options()
+ opts.add_argument("--headless=new")
+ opts.add_argument("--no-sandbox")
+ opts.add_argument("--disable-dev-shm-usage")
+ opts.add_argument("--disable-gpu")
+ opts.add_argument("--window-size=1280,900")
+
+ d = webdriver.Chrome(options=opts)
+ d.implicitly_wait(5)
+ yield d
+ d.quit()
+
+
+@pytest.fixture(scope="module")
+def dashboard_url(live_server):
+ """Base URL for dashboard (from live_server fixture)."""
+ return live_server
+
+
+def _wait_for_element(driver, selector, timeout=10):
+ """Wait for element to appear."""
+ return WebDriverWait(driver, timeout).until(
+ EC.presence_of_element_located((By.CSS_SELECTOR, selector))
+ )
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# EVENT LOG E2E TESTS
+# ═══════════════════════════════════════════════════════════════════════════════
+
+class TestEventLogUI:
+ """Event Log feature - viewable through dashboard."""
+
+ def test_event_log_page_exists(self, driver):
+ """Event log page loads at /swarm/events."""
+ driver.get(f"{dashboard_url}/swarm/events")
+ header = _wait_for_element(driver, "h1, h2, .page-title", timeout=10)
+ assert "event" in header.text.lower() or "log" in header.text.lower()
+
+ def test_event_log_shows_recent_events(self, driver):
+ """Event log displays events table with timestamp, type, source."""
+ driver.get(f"{dashboard_url}/swarm/events")
+
+ # Should show events table or "no events" message
+ table = driver.find_elements(By.CSS_SELECTOR, ".events-table, table")
+ no_events = driver.find_elements(By.XPATH, "//*[contains(text(), 'no events') or contains(text(), 'No events')]")
+
+ assert table or no_events, "Should show events table or 'no events' message"
+
+ def test_event_log_filters_by_type(self, driver):
+ """Can filter events by type (task, agent, system)."""
+ driver.get(f"{dashboard_url}/swarm/events")
+
+ # Look for filter dropdown or buttons
+ filters = driver.find_elements(By.CSS_SELECTOR, "select[name='type'], .filter-btn, [data-filter]")
+
+ # If filters exist, test them
+ if filters:
+ # Select 'task' filter
+ filter_select = driver.find_element(By.CSS_SELECTOR, "select[name='type']")
+ filter_select.click()
+ driver.find_element(By.CSS_SELECTOR, "option[value='task']").click()
+
+ # Wait for filtered results
+ time.sleep(1)
+
+ # Check URL changed or content updated
+ events = driver.find_elements(By.CSS_SELECTOR, ".event-row, tr")
+ # Just verify no error occurred
+
+ def test_event_log_shows_task_events_after_task_created(self, driver):
+ """Creating a task generates visible event log entries."""
+ # First create a task via API
+ import httpx
+ task_desc = f"E2E test task {time.time()}"
+ httpx.post(f"{dashboard_url}/swarm/tasks", data={"description": task_desc})
+
+ time.sleep(1) # Wait for event to be logged
+
+ # Now check event log
+ driver.get(f"{dashboard_url}/swarm/events")
+
+ # Should see the task creation event
+ page_text = driver.find_element(By.TAG_NAME, "body").text
+ assert "task.created" in page_text.lower() or "task created" in page_text.lower()
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# LIGHTNING LEDGER E2E TESTS
+# ═══════════════════════════════════════════════════════════════════════════════
+
+class TestLedgerUI:
+ """Lightning Ledger - balance and transactions visible in dashboard."""
+
+ def test_ledger_page_exists(self, driver):
+ """Ledger page loads at /lightning/ledger."""
+ driver.get(f"{dashboard_url}/lightning/ledger")
+ header = _wait_for_element(driver, "h1, h2, .page-title", timeout=10)
+ assert "ledger" in header.text.lower() or "transaction" in header.text.lower()
+
+ def test_ledger_shows_balance(self, driver):
+ """Ledger displays current balance."""
+ driver.get(f"{dashboard_url}/lightning/ledger")
+
+ # Look for balance display
+ balance = driver.find_elements(By.CSS_SELECTOR, ".balance, .sats-balance, [class*='balance']")
+ balance_text = driver.find_elements(By.XPATH, "//*[contains(text(), 'sats') or contains(text(), 'SATS')]")
+
+ assert balance or balance_text, "Should show balance in sats"
+
+ def test_ledger_shows_transactions(self, driver):
+ """Ledger displays transaction history."""
+ driver.get(f"{dashboard_url}/lightning/ledger")
+
+ # Should show transactions table or "no transactions" message
+ table = driver.find_elements(By.CSS_SELECTOR, ".transactions-table, table")
+ empty = driver.find_elements(By.XPATH, "//*[contains(text(), 'no transaction') or contains(text(), 'No transaction')]")
+
+ assert table or empty, "Should show transactions or empty state"
+
+ def test_ledger_transaction_has_required_fields(self, driver):
+ """Each transaction shows: hash, amount, status, timestamp."""
+ driver.get(f"{dashboard_url}/lightning/ledger")
+
+ rows = driver.find_elements(By.CSS_SELECTOR, ".transaction-row, tbody tr")
+
+ if rows:
+ # Check first row has expected fields
+ first_row = rows[0]
+ text = first_row.text.lower()
+
+ # Should have some of these indicators
+ has_amount = any(x in text for x in ["sats", "sat", "000"])
+ has_status = any(x in text for x in ["pending", "settled", "failed"])
+
+ assert has_amount, "Transaction should show amount"
+ assert has_status, "Transaction should show status"
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# SEMANTIC MEMORY E2E TESTS
+# ═══════════════════════════════════════════════════════════════════════════════
+
+class TestMemoryUI:
+ """Semantic Memory - searchable memory browser."""
+
+ def test_memory_page_exists(self, driver):
+ """Memory browser loads at /memory."""
+ driver.get(f"{dashboard_url}/memory")
+ header = _wait_for_element(driver, "h1, h2, .page-title", timeout=10)
+ assert "memory" in header.text.lower()
+
+ def test_memory_has_search_box(self, driver):
+ """Memory page has search input."""
+ driver.get(f"{dashboard_url}/memory")
+
+ search = driver.find_elements(By.CSS_SELECTOR, "input[type='search'], input[name='query'], .search-input")
+ assert search, "Should have search input"
+
+ def test_memory_search_returns_results(self, driver):
+ """Search returns memory entries with relevance scores."""
+ driver.get(f"{dashboard_url}/memory")
+
+ search_input = driver.find_element(By.CSS_SELECTOR, "input[type='search'], input[name='query']")
+ search_input.send_keys("test query")
+ search_input.send_keys(Keys.RETURN)
+
+ time.sleep(2) # Wait for search results
+
+ # Should show results or "no results"
+ results = driver.find_elements(By.CSS_SELECTOR, ".memory-entry, .search-result")
+ no_results = driver.find_elements(By.XPATH, "//*[contains(text(), 'no results') or contains(text(), 'No results')]")
+
+ assert results or no_results, "Should show search results or 'no results'"
+
+ def test_memory_shows_entry_content(self, driver):
+ """Memory entries show content, source, and timestamp."""
+ driver.get(f"{dashboard_url}/memory")
+
+ entries = driver.find_elements(By.CSS_SELECTOR, ".memory-entry")
+
+ if entries:
+ first = entries[0]
+ text = first.text
+
+ # Should have content and source
+ has_source = any(x in text.lower() for x in ["source:", "from", "by"])
+ has_time = any(x in text.lower() for x in ["202", ":", "ago"])
+
+ assert len(text) > 10, "Entry should have content"
+
+ def test_memory_add_fact_button(self, driver):
+ """Can add personal fact through UI."""
+ driver.get(f"{dashboard_url}/memory")
+
+ # Look for add fact button or form
+ add_btn = driver.find_elements(By.XPATH, "//button[contains(text(), 'Add') or contains(text(), 'New')]")
+ add_form = driver.find_elements(By.CSS_SELECTOR, "form[action*='memory'], .add-memory-form")
+
+ assert add_btn or add_form, "Should have way to add new memory"
+
+
+# ═══════════════════════════════════════════════════════════════════════════════
+# INTEGRATION E2E TESTS
+# ═══════════════════════════════════════════════════════════════════════════════
+
+class TestFeatureIntegration:
+ """Integration tests - features work together."""
+
+ def test_creating_task_creates_event_and_appears_in_log(self, driver):
+ """Full flow: Create task → event logged → visible in event log UI."""
+ import httpx
+
+ # Create task via API
+ task_desc = f"Integration test {time.time()}"
+ response = httpx.post(
+ f"{dashboard_url}/swarm/tasks",
+ data={"description": task_desc}
+ )
+ assert response.status_code == 200
+
+ time.sleep(1) # Wait for event log
+
+ # Check event log UI
+ driver.get(f"{dashboard_url}/swarm/events")
+ page_text = driver.find_element(By.TAG_NAME, "body").text
+
+ # Should see task creation
+ assert "task" in page_text.lower()
+
+ def test_swarm_live_page_shows_agent_events(self, driver):
+ """Swarm live page shows real-time agent activity."""
+ driver.get(f"{dashboard_url}/swarm/live")
+
+ # Should show activity feed or status
+ feed = driver.find_elements(By.CSS_SELECTOR, ".activity-feed, .events-list, .live-feed")
+ agents = driver.find_elements(By.CSS_SELECTOR, ".agent-status, .swarm-status")
+
+ assert feed or agents, "Should show activity feed or agent status"
+
+ def test_navigation_between_new_features(self, driver):
+ """Can navigate between Event Log, Ledger, and Memory pages."""
+ # Start at home
+ driver.get(dashboard_url)
+
+ # Find and click link to events
+ event_links = driver.find_elements(By.XPATH, "//a[contains(@href, '/swarm/events') or contains(text(), 'Events')]")
+ if event_links:
+ event_links[0].click()
+ time.sleep(1)
+ assert "/swarm/events" in driver.current_url
+
+ # Navigate to ledger
+ driver.get(f"{dashboard_url}/lightning/ledger")
+ assert "/lightning/ledger" in driver.current_url
+
+ # Navigate to memory
+ driver.get(f"{dashboard_url}/memory")
+ assert "/memory" in driver.current_url
diff --git a/tests/functional/test_upgrade_queue_e2e.py b/tests/functional/test_upgrade_queue_e2e.py
new file mode 100644
index 00000000..c13f687d
--- /dev/null
+++ b/tests/functional/test_upgrade_queue_e2e.py
@@ -0,0 +1,190 @@
+"""E2E tests for Self-Upgrade Approval Queue.
+
+RUN: pytest tests/functional/test_upgrade_queue_e2e.py -v --headed
+"""
+
+import os
+import time
+
+import pytest
+from selenium import webdriver
+from selenium.webdriver.chrome.options import Options
+from selenium.webdriver.common.by import By
+from selenium.webdriver.support import expected_conditions as EC
+from selenium.webdriver.support.ui import WebDriverWait
+
+from .conftest import DASHBOARD_URL
+
+
+@pytest.fixture
+def driver():
+ """Non-headless Chrome so you can watch."""
+ opts = Options()
+ opts.add_argument("--no-sandbox")
+ opts.add_argument("--disable-dev-shm-usage")
+ opts.add_argument("--window-size=1400,900")
+
+ d = webdriver.Chrome(options=opts)
+ d.implicitly_wait(5)
+ yield d
+ d.quit()
+
+
+class TestUpgradeQueueUI:
+ """Upgrade queue dashboard functionality."""
+
+ def test_upgrade_queue_page_exists(self, driver):
+ """Upgrade queue loads at /self-modify/queue."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ header = WebDriverWait(driver, 10).until(
+ EC.presence_of_element_located((By.TAG_NAME, "h1"))
+ )
+ assert "upgrade" in header.text.lower() or "queue" in header.text.lower()
+
+ def test_queue_shows_pending_upgrades(self, driver):
+ """Queue shows pending upgrades with status."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ # Should show either pending upgrades or empty state
+ pending = driver.find_elements(By.CSS_SELECTOR, ".upgrade-pending, .upgrade-card")
+ empty = driver.find_elements(By.XPATH, "//*[contains(text(), 'No pending') or contains(text(), 'empty')]")
+
+ assert pending or empty, "Should show pending upgrades or empty state"
+
+ def test_queue_shows_upgrade_details(self, driver):
+ """Each upgrade shows description, files changed, test status."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ upgrades = driver.find_elements(By.CSS_SELECTOR, ".upgrade-card")
+
+ if upgrades:
+ first = upgrades[0]
+ text = first.text.lower()
+
+ # Should have description
+ assert len(text) > 20, "Should show upgrade description"
+
+ # Should show status
+ has_status = any(x in text for x in ["pending", "proposed", "waiting"])
+ assert has_status, "Should show upgrade status"
+
+ def test_approve_button_exists(self, driver):
+ """Pending upgrades have approve button."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ approve_btns = driver.find_elements(
+ By.XPATH, "//button[contains(text(), 'Approve') or contains(text(), 'APPROVE')]"
+ )
+
+ # If there are pending upgrades, there should be approve buttons
+ pending = driver.find_elements(By.CSS_SELECTOR, ".upgrade-pending")
+ if pending:
+ assert len(approve_btns) >= 1, "Should have approve buttons for pending upgrades"
+
+ def test_reject_button_exists(self, driver):
+ """Pending upgrades have reject button."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ reject_btns = driver.find_elements(
+ By.XPATH, "//button[contains(text(), 'Reject') or contains(text(), 'REJECT')]"
+ )
+
+ pending = driver.find_elements(By.CSS_SELECTOR, ".upgrade-pending")
+ if pending:
+ assert len(reject_btns) >= 1, "Should have reject buttons for pending upgrades"
+
+ def test_upgrade_history_section(self, driver):
+ """Queue page shows history of past upgrades."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ # Look for history section
+ history = driver.find_elements(
+ By.XPATH, "//*[contains(text(), 'History') or contains(text(), 'Past')]"
+ )
+
+ # Or look for applied/rejected upgrades
+ past = driver.find_elements(By.CSS_SELECTOR, ".upgrade-applied, .upgrade-rejected, .upgrade-failed")
+
+ assert history or past, "Should show upgrade history section or past upgrades"
+
+ def test_view_diff_button(self, driver):
+ """Can view diff for an upgrade."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ view_btns = driver.find_elements(
+ By.XPATH, "//button[contains(text(), 'View') or contains(text(), 'Diff')]"
+ )
+
+ upgrades = driver.find_elements(By.CSS_SELECTOR, ".upgrade-card")
+ if upgrades and view_btns:
+ # Click view
+ view_btns[0].click()
+ time.sleep(1)
+
+ # Should show diff or modal
+ diff = driver.find_elements(By.CSS_SELECTOR, ".diff, .code-block, pre")
+ assert diff or "diff" in driver.page_source.lower(), "Should show diff view"
+
+ def test_nav_link_to_queue(self, driver):
+ """Navigation has link to upgrade queue."""
+ driver.get(DASHBOARD_URL)
+
+ queue_link = driver.find_elements(
+ By.XPATH, "//a[contains(@href, 'self-modify') or contains(text(), 'Upgrade')]"
+ )
+
+ if queue_link:
+ queue_link[0].click()
+ time.sleep(1)
+ assert "self-modify" in driver.current_url or "upgrade" in driver.current_url
+
+
+class TestUpgradeWorkflow:
+ """Full upgrade approval workflow."""
+
+ def test_full_approve_workflow(self, driver):
+ """Propose → Review → Approve → Applied.
+
+ This test requires a pre-existing pending upgrade.
+ """
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ # Find first pending upgrade
+ pending = driver.find_elements(By.CSS_SELECTOR, ".upgrade-pending")
+
+ if not pending:
+ pytest.skip("No pending upgrades to test workflow")
+
+ # Click approve
+ approve_btn = driver.find_element(
+ By.XPATH, "(//button[contains(text(), 'Approve')])[1]"
+ )
+ approve_btn.click()
+
+ # Wait for confirmation or status change
+ time.sleep(2)
+
+ # Should show success or status change
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+ assert any(x in page_text for x in ["approved", "applied", "success"])
+
+ def test_full_reject_workflow(self, driver):
+ """Propose → Review → Reject."""
+ driver.get(f"{DASHBOARD_URL}/self-modify/queue")
+
+ pending = driver.find_elements(By.CSS_SELECTOR, ".upgrade-pending")
+
+ if not pending:
+ pytest.skip("No pending upgrades to test workflow")
+
+ # Click reject
+ reject_btn = driver.find_element(
+ By.XPATH, "(//button[contains(text(), 'Reject')])[1]"
+ )
+ reject_btn.click()
+
+ time.sleep(2)
+
+ page_text = driver.find_element(By.TAG_NAME, "body").text.lower()
+ assert "rejected" in page_text or "removed" in page_text
diff --git a/tests/test_event_log.py b/tests/test_event_log.py
new file mode 100644
index 00000000..9936d8f3
--- /dev/null
+++ b/tests/test_event_log.py
@@ -0,0 +1,169 @@
+"""Tests for swarm event logging system."""
+
+import pytest
+from datetime import datetime, timezone
+from swarm.event_log import (
+ EventType,
+ log_event,
+ get_event,
+ list_events,
+ get_task_events,
+ get_agent_events,
+ get_recent_events,
+ get_event_summary,
+ prune_events,
+)
+
+
+class TestEventLog:
+ """Test suite for event logging functionality."""
+
+ def test_log_simple_event(self):
+ """Test logging a basic event."""
+ event = log_event(
+ event_type=EventType.SYSTEM_INFO,
+ source="test",
+ data={"message": "test event"},
+ )
+
+ assert event.event_type == EventType.SYSTEM_INFO
+ assert event.source == "test"
+ assert event.data is not None
+
+ # Verify we can retrieve it
+ retrieved = get_event(event.id)
+ assert retrieved is not None
+ assert retrieved.source == "test"
+
+ def test_log_task_event(self):
+ """Test logging a task lifecycle event."""
+ task_id = "task-123"
+ agent_id = "agent-456"
+
+ event = log_event(
+ event_type=EventType.TASK_ASSIGNED,
+ source="coordinator",
+ task_id=task_id,
+ agent_id=agent_id,
+ data={"bid_sats": 100},
+ )
+
+ assert event.task_id == task_id
+ assert event.agent_id == agent_id
+
+ # Verify filtering by task works
+ task_events = get_task_events(task_id)
+ assert len(task_events) >= 1
+ assert any(e.id == event.id for e in task_events)
+
+ def test_log_agent_event(self):
+ """Test logging agent lifecycle events."""
+ agent_id = "agent-test-001"
+
+ event = log_event(
+ event_type=EventType.AGENT_JOINED,
+ source="coordinator",
+ agent_id=agent_id,
+ data={"persona_id": "forge"},
+ )
+
+ # Verify filtering by agent works
+ agent_events = get_agent_events(agent_id)
+ assert len(agent_events) >= 1
+ assert any(e.id == event.id for e in agent_events)
+
+ def test_list_events_filtering(self):
+ """Test filtering events by type."""
+ # Create events of different types
+ log_event(EventType.TASK_CREATED, source="test")
+ log_event(EventType.TASK_COMPLETED, source="test")
+ log_event(EventType.SYSTEM_INFO, source="test")
+
+ # Filter by type
+ task_events = list_events(event_type=EventType.TASK_CREATED, limit=10)
+ assert all(e.event_type == EventType.TASK_CREATED for e in task_events)
+
+ # Filter by source
+ source_events = list_events(source="test", limit=10)
+ assert all(e.source == "test" for e in source_events)
+
+ def test_get_recent_events(self):
+ """Test retrieving recent events."""
+ # Log an event
+ log_event(EventType.SYSTEM_INFO, source="recent_test")
+
+ # Get events from last minute
+ recent = get_recent_events(minutes=1)
+ assert any(e.source == "recent_test" for e in recent)
+
+ def test_event_summary(self):
+ """Test event summary statistics."""
+ # Create some events
+ log_event(EventType.TASK_CREATED, source="summary_test")
+ log_event(EventType.TASK_CREATED, source="summary_test")
+ log_event(EventType.TASK_COMPLETED, source="summary_test")
+
+ # Get summary
+ summary = get_event_summary(minutes=1)
+ assert "task.created" in summary or "task.completed" in summary
+
+ def test_prune_events(self):
+ """Test pruning old events."""
+ # This test just verifies the function doesn't error
+ # (we don't want to delete real data in tests)
+ count = prune_events(older_than_days=365)
+ # Result depends on database state, just verify no exception
+ assert isinstance(count, int)
+
+ def test_event_data_serialization(self):
+ """Test that complex data is properly serialized."""
+ complex_data = {
+ "nested": {"key": "value"},
+ "list": [1, 2, 3],
+ "number": 42.5,
+ }
+
+ event = log_event(
+ EventType.TOOL_CALLED,
+ source="test",
+ data=complex_data,
+ )
+
+ retrieved = get_event(event.id)
+ # Data should be stored as JSON string
+ assert retrieved.data is not None
+
+
+class TestEventTypes:
+ """Test that all event types can be logged."""
+
+ @pytest.mark.parametrize("event_type", [
+ EventType.TASK_CREATED,
+ EventType.TASK_BIDDING,
+ EventType.TASK_ASSIGNED,
+ EventType.TASK_STARTED,
+ EventType.TASK_COMPLETED,
+ EventType.TASK_FAILED,
+ EventType.AGENT_JOINED,
+ EventType.AGENT_LEFT,
+ EventType.AGENT_STATUS_CHANGED,
+ EventType.BID_SUBMITTED,
+ EventType.AUCTION_CLOSED,
+ EventType.TOOL_CALLED,
+ EventType.TOOL_COMPLETED,
+ EventType.TOOL_FAILED,
+ EventType.SYSTEM_ERROR,
+ EventType.SYSTEM_WARNING,
+ EventType.SYSTEM_INFO,
+ ])
+ def test_all_event_types(self, event_type):
+ """Verify all event types can be logged and retrieved."""
+ event = log_event(
+ event_type=event_type,
+ source="type_test",
+ data={"test": True},
+ )
+
+ retrieved = get_event(event.id)
+ assert retrieved is not None
+ assert retrieved.event_type == event_type
diff --git a/tests/test_ledger.py b/tests/test_ledger.py
new file mode 100644
index 00000000..6e1ad9a1
--- /dev/null
+++ b/tests/test_ledger.py
@@ -0,0 +1,211 @@
+"""Tests for Lightning ledger system."""
+
+import pytest
+from lightning.ledger import (
+ TransactionType,
+ TransactionStatus,
+ create_invoice_entry,
+ record_outgoing_payment,
+ mark_settled,
+ mark_failed,
+ get_by_hash,
+ list_transactions,
+ get_balance,
+ get_transaction_stats,
+)
+
+
+class TestLedger:
+ """Test suite for Lightning ledger functionality."""
+
+ def test_create_invoice_entry(self):
+ """Test creating an incoming invoice entry."""
+ entry = create_invoice_entry(
+ payment_hash="test_hash_001",
+ amount_sats=1000,
+ memo="Test invoice",
+ invoice="lnbc10u1...",
+ source="test",
+ task_id="task-123",
+ agent_id="agent-456",
+ )
+
+ assert entry.tx_type == TransactionType.INCOMING
+ assert entry.status == TransactionStatus.PENDING
+ assert entry.amount_sats == 1000
+ assert entry.payment_hash == "test_hash_001"
+ assert entry.memo == "Test invoice"
+ assert entry.task_id == "task-123"
+ assert entry.agent_id == "agent-456"
+
+ def test_record_outgoing_payment(self):
+ """Test recording an outgoing payment."""
+ entry = record_outgoing_payment(
+ payment_hash="test_hash_002",
+ amount_sats=500,
+ memo="Test payment",
+ source="test",
+ task_id="task-789",
+ )
+
+ assert entry.tx_type == TransactionType.OUTGOING
+ assert entry.status == TransactionStatus.PENDING
+ assert entry.amount_sats == 500
+ assert entry.payment_hash == "test_hash_002"
+
+ def test_mark_settled(self):
+ """Test marking a transaction as settled."""
+ # Create invoice
+ entry = create_invoice_entry(
+ payment_hash="test_hash_settle",
+ amount_sats=100,
+ memo="To be settled",
+ )
+ assert entry.status == TransactionStatus.PENDING
+
+ # Mark as settled
+ settled = mark_settled(
+ payment_hash="test_hash_settle",
+ preimage="preimage123",
+ fee_sats=1,
+ )
+
+ assert settled is not None
+ assert settled.status == TransactionStatus.SETTLED
+ assert settled.preimage == "preimage123"
+ assert settled.fee_sats == 1
+ assert settled.settled_at is not None
+
+ # Verify retrieval
+ retrieved = get_by_hash("test_hash_settle")
+ assert retrieved.status == TransactionStatus.SETTLED
+
+ def test_mark_failed(self):
+ """Test marking a transaction as failed."""
+ # Create invoice
+ entry = create_invoice_entry(
+ payment_hash="test_hash_fail",
+ amount_sats=200,
+ memo="To fail",
+ )
+
+ # Mark as failed
+ failed = mark_failed("test_hash_fail", reason="Timeout")
+
+ assert failed is not None
+ assert failed.status == TransactionStatus.FAILED
+ assert "Timeout" in failed.memo
+
+ def test_get_by_hash_not_found(self):
+ """Test retrieving non-existent transaction."""
+ result = get_by_hash("nonexistent_hash")
+ assert result is None
+
+ def test_list_transactions_filtering(self):
+ """Test filtering transactions."""
+ # Create various transactions
+ create_invoice_entry("filter_test_1", 100, source="filter_test")
+ create_invoice_entry("filter_test_2", 200, source="filter_test")
+
+ # Filter by type
+ incoming = list_transactions(
+ tx_type=TransactionType.INCOMING,
+ limit=10,
+ )
+ assert all(t.tx_type == TransactionType.INCOMING for t in incoming)
+
+ # Filter by status
+ pending = list_transactions(
+ status=TransactionStatus.PENDING,
+ limit=10,
+ )
+ assert all(t.status == TransactionStatus.PENDING for t in pending)
+
+ def test_get_balance(self):
+ """Test balance calculation."""
+ # Get initial balance
+ balance = get_balance()
+
+ assert "incoming_total_sats" in balance
+ assert "outgoing_total_sats" in balance
+ assert "net_sats" in balance
+ assert isinstance(balance["incoming_total_sats"], int)
+ assert isinstance(balance["outgoing_total_sats"], int)
+
+ def test_transaction_stats(self):
+ """Test transaction statistics."""
+ # Create some transactions
+ create_invoice_entry("stats_test_1", 100, source="stats_test")
+ create_invoice_entry("stats_test_2", 200, source="stats_test")
+
+ # Get stats
+ stats = get_transaction_stats(days=1)
+
+ # Should return dict with dates
+ assert isinstance(stats, dict)
+ # Stats structure depends on current date, just verify it's a dict
+
+ def test_unique_payment_hash(self):
+ """Test that payment hashes must be unique."""
+ import sqlite3
+
+ hash_value = "unique_hash_test"
+
+ # First creation should succeed
+ create_invoice_entry(hash_value, 100)
+
+ # Second creation with same hash should fail with IntegrityError
+ with pytest.raises(sqlite3.IntegrityError):
+ create_invoice_entry(hash_value, 200)
+
+
+class TestLedgerIntegration:
+ """Integration tests for ledger workflow."""
+
+ def test_full_invoice_lifecycle(self):
+ """Test complete invoice lifecycle: create -> settle."""
+ # Create invoice
+ entry = create_invoice_entry(
+ payment_hash="lifecycle_test",
+ amount_sats=5000,
+ memo="Full lifecycle test",
+ source="integration_test",
+ )
+
+ assert entry.status == TransactionStatus.PENDING
+
+ # Mark as settled
+ settled = mark_settled("lifecycle_test", preimage="secret_preimage")
+
+ assert settled.status == TransactionStatus.SETTLED
+ assert settled.preimage == "secret_preimage"
+
+ # Verify in list
+ transactions = list_transactions(limit=100)
+ assert any(t.payment_hash == "lifecycle_test" for t in transactions)
+
+ # Verify balance reflects it
+ balance = get_balance()
+ # Balance should include this settled invoice
+
+ def test_outgoing_payment_lifecycle(self):
+ """Test complete outgoing payment lifecycle."""
+ # Record outgoing payment
+ entry = record_outgoing_payment(
+ payment_hash="outgoing_test",
+ amount_sats=300,
+ memo="Outgoing payment",
+ source="integration_test",
+ )
+
+ assert entry.tx_type == TransactionType.OUTGOING
+
+ # Mark as settled (payment completed)
+ settled = mark_settled(
+ "outgoing_test",
+ preimage="payment_proof",
+ fee_sats=3,
+ )
+
+ assert settled.fee_sats == 3
+ assert settled.status == TransactionStatus.SETTLED
diff --git a/tests/test_vector_store.py b/tests/test_vector_store.py
new file mode 100644
index 00000000..9b4b6f6e
--- /dev/null
+++ b/tests/test_vector_store.py
@@ -0,0 +1,262 @@
+"""Tests for vector store (semantic memory) system."""
+
+import pytest
+from memory.vector_store import (
+ store_memory,
+ search_memories,
+ get_memory_context,
+ recall_personal_facts,
+ store_personal_fact,
+ delete_memory,
+ get_memory_stats,
+ prune_memories,
+ _cosine_similarity,
+ _keyword_overlap,
+)
+
+
+class TestVectorStore:
+ """Test suite for vector store functionality."""
+
+ def test_store_simple_memory(self):
+ """Test storing a basic memory entry."""
+ entry = store_memory(
+ content="This is a test memory",
+ source="test_agent",
+ context_type="conversation",
+ )
+
+ assert entry.content == "This is a test memory"
+ assert entry.source == "test_agent"
+ assert entry.context_type == "conversation"
+ assert entry.id is not None
+ assert entry.timestamp is not None
+
+ def test_store_memory_with_metadata(self):
+ """Test storing memory with metadata."""
+ entry = store_memory(
+ content="Memory with metadata",
+ source="user",
+ context_type="fact",
+ agent_id="agent-001",
+ task_id="task-123",
+ session_id="session-456",
+ metadata={"importance": "high", "tags": ["test"]},
+ )
+
+ assert entry.agent_id == "agent-001"
+ assert entry.task_id == "task-123"
+ assert entry.session_id == "session-456"
+ assert entry.metadata == {"importance": "high", "tags": ["test"]}
+
+ def test_search_memories_basic(self):
+ """Test basic memory search."""
+ # Store some memories
+ store_memory("Bitcoin is a decentralized currency", source="user")
+ store_memory("Lightning Network enables fast payments", source="user")
+ store_memory("Python is a programming language", source="user")
+
+ # Search for Bitcoin-related memories
+ results = search_memories("cryptocurrency", limit=5)
+
+ # Should find at least one relevant result
+ assert len(results) > 0
+ # Check that results have relevance scores
+ assert all(r.relevance_score is not None for r in results)
+
+ def test_search_with_filters(self):
+ """Test searching with filters."""
+ # Store memories with different types
+ store_memory(
+ "Conversation about AI",
+ source="user",
+ context_type="conversation",
+ agent_id="agent-1",
+ )
+ store_memory(
+ "Fact: AI stands for artificial intelligence",
+ source="system",
+ context_type="fact",
+ agent_id="agent-1",
+ )
+ store_memory(
+ "Another conversation",
+ source="user",
+ context_type="conversation",
+ agent_id="agent-2",
+ )
+
+ # Filter by context type
+ facts = search_memories("AI", context_type="fact", limit=5)
+ assert all(f.context_type == "fact" for f in facts)
+
+ # Filter by agent
+ agent1_memories = search_memories("conversation", agent_id="agent-1", limit=5)
+ assert all(m.agent_id == "agent-1" for m in agent1_memories)
+
+ def test_get_memory_context(self):
+ """Test getting formatted memory context."""
+ # Store memories
+ store_memory("Important fact about the project", source="user")
+ store_memory("Another relevant detail", source="agent")
+
+ # Get context
+ context = get_memory_context("project details", max_tokens=500)
+
+ assert isinstance(context, str)
+ assert len(context) > 0
+ assert "Relevant context from memory:" in context
+
+ def test_personal_facts(self):
+ """Test storing and recalling personal facts."""
+ # Store a personal fact
+ fact = store_personal_fact("User prefers dark mode", agent_id="agent-1")
+
+ assert fact.context_type == "fact"
+ assert fact.content == "User prefers dark mode"
+
+ # Recall facts
+ facts = recall_personal_facts(agent_id="agent-1")
+ assert "User prefers dark mode" in facts
+
+ def test_delete_memory(self):
+ """Test deleting a memory entry."""
+ # Create a memory
+ entry = store_memory("To be deleted", source="test")
+
+ # Delete it
+ deleted = delete_memory(entry.id)
+ assert deleted is True
+
+ # Verify it's gone (search shouldn't find it)
+ results = search_memories("To be deleted", limit=10)
+ assert not any(r.id == entry.id for r in results)
+
+ # Deleting non-existent should return False
+ deleted_again = delete_memory(entry.id)
+ assert deleted_again is False
+
+ def test_get_memory_stats(self):
+ """Test memory statistics."""
+ stats = get_memory_stats()
+
+ assert "total_entries" in stats
+ assert "by_type" in stats
+ assert "with_embeddings" in stats
+ assert "has_embedding_model" in stats
+ assert isinstance(stats["total_entries"], int)
+
+ def test_prune_memories(self):
+ """Test pruning old memories."""
+ # This just verifies the function works without error
+ # (we don't want to delete test data)
+ count = prune_memories(older_than_days=365, keep_facts=True)
+ assert isinstance(count, int)
+
+
+class TestVectorStoreUtils:
+ """Test utility functions."""
+
+ def test_cosine_similarity_identical(self):
+ """Test cosine similarity of identical vectors."""
+ vec = [1.0, 0.0, 0.0]
+ similarity = _cosine_similarity(vec, vec)
+ assert similarity == pytest.approx(1.0)
+
+ def test_cosine_similarity_orthogonal(self):
+ """Test cosine similarity of orthogonal vectors."""
+ vec1 = [1.0, 0.0, 0.0]
+ vec2 = [0.0, 1.0, 0.0]
+ similarity = _cosine_similarity(vec1, vec2)
+ assert similarity == pytest.approx(0.0)
+
+ def test_cosine_similarity_opposite(self):
+ """Test cosine similarity of opposite vectors."""
+ vec1 = [1.0, 0.0, 0.0]
+ vec2 = [-1.0, 0.0, 0.0]
+ similarity = _cosine_similarity(vec1, vec2)
+ assert similarity == pytest.approx(-1.0)
+
+ def test_cosine_similarity_zero_vector(self):
+ """Test cosine similarity with zero vector."""
+ vec1 = [1.0, 0.0, 0.0]
+ vec2 = [0.0, 0.0, 0.0]
+ similarity = _cosine_similarity(vec1, vec2)
+ assert similarity == 0.0
+
+ def test_keyword_overlap_exact(self):
+ """Test keyword overlap with exact match."""
+ query = "bitcoin lightning"
+ content = "bitcoin lightning network"
+ overlap = _keyword_overlap(query, content)
+ assert overlap == 1.0
+
+ def test_keyword_overlap_partial(self):
+ """Test keyword overlap with partial match."""
+ query = "bitcoin lightning"
+ content = "bitcoin is great"
+ overlap = _keyword_overlap(query, content)
+ assert overlap == 0.5
+
+ def test_keyword_overlap_none(self):
+ """Test keyword overlap with no match."""
+ query = "bitcoin"
+ content = "completely different topic"
+ overlap = _keyword_overlap(query, content)
+ assert overlap == 0.0
+
+
+class TestVectorStoreIntegration:
+ """Integration tests for vector store workflow."""
+
+ def test_memory_workflow(self):
+ """Test complete memory workflow: store -> search -> retrieve."""
+ # Store memories
+ store_memory(
+ "The project deadline is next Friday",
+ source="user",
+ context_type="fact",
+ session_id="session-1",
+ )
+ store_memory(
+ "We need to implement the payment system",
+ source="user",
+ context_type="conversation",
+ session_id="session-1",
+ )
+ store_memory(
+ "The database schema needs updating",
+ source="agent",
+ context_type="conversation",
+ session_id="session-1",
+ )
+
+ # Search for deadline-related memories
+ results = search_memories("when is the deadline", limit=5)
+
+ # Should find the deadline memory
+ assert len(results) > 0
+ # Check that the most relevant result contains "deadline"
+ assert any("deadline" in r.content.lower() for r in results[:3])
+
+ # Get context for a prompt
+ context = get_memory_context("project timeline", session_id="session-1")
+ assert "deadline" in context.lower() or "implement" in context.lower()
+
+ def test_embedding_vs_keyword_fallback(self):
+ """Test that the system works with or without embedding model."""
+ stats = get_memory_stats()
+
+ # Store a memory
+ entry = store_memory(
+ "Testing embedding functionality",
+ source="test",
+ compute_embedding=True,
+ )
+
+ # Should have embedding (even if it's fallback)
+ assert entry.embedding is not None
+
+ # Search should work regardless
+ results = search_memories("embedding test", limit=5)
+ assert len(results) > 0