"""System-level dashboard routes (ledger, upgrades, etc.).""" import logging from pathlib import Path from fastapi import APIRouter, Request from fastapi.responses import HTMLResponse, JSONResponse from config import settings from dashboard.templating import templates logger = logging.getLogger(__name__) router = APIRouter(tags=["system"]) @router.get("/lightning/ledger", response_class=HTMLResponse) async def lightning_ledger(request: Request): """Ledger and balance page.""" # Mock data for now, as this seems to be a UI-first feature balance = { "available_sats": 1337, "incoming_total_sats": 2000, "outgoing_total_sats": 663, "fees_paid_sats": 5, "net_sats": 1337, "pending_incoming_sats": 0, "pending_outgoing_sats": 0, } # Mock transactions from collections import namedtuple from enum import Enum class TxType(Enum): incoming = "incoming" outgoing = "outgoing" class TxStatus(Enum): completed = "completed" pending = "pending" Tx = namedtuple( "Tx", ["tx_type", "status", "amount_sats", "payment_hash", "memo", "created_at"] ) transactions = [ Tx( TxType.outgoing, TxStatus.completed, 50, "hash1", "Model inference", "2026-03-04 10:00:00", ), Tx( TxType.incoming, TxStatus.completed, 1000, "hash2", "Manual deposit", "2026-03-03 15:00:00", ), ] return templates.TemplateResponse( request, "ledger.html", { "balance": balance, "transactions": transactions, "tx_types": ["incoming", "outgoing"], "tx_statuses": ["completed", "pending"], "filter_type": None, "filter_status": None, "stats": {}, }, ) @router.get("/self-modify/queue", response_class=HTMLResponse) async def self_modify_queue(request: Request): """Self-modification / upgrade queue page.""" return templates.TemplateResponse( request, "upgrade_queue.html", { "pending_count": 0, "pending": [], "approved": [], "applied": [], "rejected": [], "failed": [], }, ) @router.get("/swarm/mission-control", response_class=HTMLResponse) async def mission_control(request: Request): return templates.TemplateResponse(request, "mission_control.html", {}) @router.get("/bugs", response_class=HTMLResponse) async def bugs_page(request: Request): return templates.TemplateResponse( request, "bugs.html", { "bugs": [], "total": 0, "stats": {}, "filter_status": None, }, ) @router.get("/self-coding", response_class=HTMLResponse) async def self_coding(request: Request): return templates.TemplateResponse(request, "self_coding.html", {"stats": {}}) @router.get("/hands", response_class=HTMLResponse) async def hands_page(request: Request): return templates.TemplateResponse(request, "hands.html", {"executions": []}) @router.get("/creative/ui", response_class=HTMLResponse) async def creative_ui(request: Request): return templates.TemplateResponse(request, "creative.html", {}) @router.get("/api/notifications", response_class=JSONResponse) async def api_notifications(): """Return recent system events for the notification dropdown.""" try: from spark.engine import spark_engine events = spark_engine.get_timeline(limit=20) return JSONResponse( [ { "event_type": e.event_type, "title": getattr(e, "description", e.event_type), "timestamp": str(getattr(e, "timestamp", "")), } for e in events ] ) except Exception as exc: logger.debug("System events fetch error: %s", exc) return JSONResponse([]) @router.get("/api/briefing/status", response_class=JSONResponse) async def api_briefing_status(): """Return briefing status including pending approvals and last generated time.""" from timmy import approvals from timmy.briefing import engine as briefing_engine pending = approvals.list_pending() pending_count = len(pending) last_generated = None try: cached = briefing_engine.get_cached() if cached: last_generated = cached.generated_at.isoformat() except Exception: logger.debug("Failed to read briefing cache") return JSONResponse( { "status": "ok", "pending_approvals": pending_count, "last_generated": last_generated, } ) @router.get("/api/memory/status", response_class=JSONResponse) async def api_memory_status(): """Return memory database status including file info and indexed files count.""" from timmy.memory_system import get_memory_stats db_path = Path(settings.repo_root) / "data" / "memory.db" db_exists = db_path.exists() db_size = db_path.stat().st_size if db_exists else 0 try: stats = get_memory_stats() indexed_files = stats.get("total_entries", 0) except Exception: logger.debug("Failed to get memory stats") indexed_files = 0 return JSONResponse( { "status": "ok", "db_exists": db_exists, "db_size_bytes": db_size, "indexed_files": indexed_files, } ) @router.get("/api/swarm/status", response_class=JSONResponse) async def api_swarm_status(): """Return swarm worker status and pending tasks count.""" from dashboard.routes.tasks import _get_db pending_tasks = 0 try: with _get_db() as db: row = db.execute( "SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved')" ).fetchone() pending_tasks = row["cnt"] if row else 0 except Exception: logger.debug("Failed to count pending tasks") return JSONResponse( { "status": "ok", "active_workers": 0, "pending_tasks": pending_tasks, "message": "Swarm monitoring endpoint", } )