This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/dashboard/routes/system.py

229 lines
6.3 KiB
Python

"""System-level dashboard routes (ledger, upgrades, etc.)."""
import logging
from pathlib import Path
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse, JSONResponse
from config import settings
from dashboard.templating import templates
logger = logging.getLogger(__name__)
router = APIRouter(tags=["system"])
@router.get("/lightning/ledger", response_class=HTMLResponse)
async def lightning_ledger(request: Request):
"""Ledger and balance page."""
# Mock data for now, as this seems to be a UI-first feature
balance = {
"available_sats": 1337,
"incoming_total_sats": 2000,
"outgoing_total_sats": 663,
"fees_paid_sats": 5,
"net_sats": 1337,
"pending_incoming_sats": 0,
"pending_outgoing_sats": 0,
}
# Mock transactions
from collections import namedtuple
from enum import Enum
class TxType(Enum):
incoming = "incoming"
outgoing = "outgoing"
class TxStatus(Enum):
completed = "completed"
pending = "pending"
Tx = namedtuple(
"Tx", ["tx_type", "status", "amount_sats", "payment_hash", "memo", "created_at"]
)
transactions = [
Tx(
TxType.outgoing,
TxStatus.completed,
50,
"hash1",
"Model inference",
"2026-03-04 10:00:00",
),
Tx(
TxType.incoming,
TxStatus.completed,
1000,
"hash2",
"Manual deposit",
"2026-03-03 15:00:00",
),
]
return templates.TemplateResponse(
request,
"ledger.html",
{
"balance": balance,
"transactions": transactions,
"tx_types": ["incoming", "outgoing"],
"tx_statuses": ["completed", "pending"],
"filter_type": None,
"filter_status": None,
"stats": {},
},
)
@router.get("/self-modify/queue", response_class=HTMLResponse)
async def self_modify_queue(request: Request):
"""Self-modification / upgrade queue page."""
return templates.TemplateResponse(
request,
"upgrade_queue.html",
{
"pending_count": 0,
"pending": [],
"approved": [],
"applied": [],
"rejected": [],
"failed": [],
},
)
@router.get("/swarm/mission-control", response_class=HTMLResponse)
async def mission_control(request: Request):
return templates.TemplateResponse(request, "mission_control.html", {})
@router.get("/bugs", response_class=HTMLResponse)
async def bugs_page(request: Request):
return templates.TemplateResponse(
request,
"bugs.html",
{
"bugs": [],
"total": 0,
"stats": {},
"filter_status": None,
},
)
@router.get("/self-coding", response_class=HTMLResponse)
async def self_coding(request: Request):
return templates.TemplateResponse(request, "self_coding.html", {"stats": {}})
@router.get("/hands", response_class=HTMLResponse)
async def hands_page(request: Request):
return templates.TemplateResponse(request, "hands.html", {"executions": []})
@router.get("/creative/ui", response_class=HTMLResponse)
async def creative_ui(request: Request):
return templates.TemplateResponse(request, "creative.html", {})
@router.get("/api/notifications", response_class=JSONResponse)
async def api_notifications():
"""Return recent system events for the notification dropdown."""
try:
from spark.engine import spark_engine
events = spark_engine.get_timeline(limit=20)
return JSONResponse(
[
{
"event_type": e.event_type,
"title": getattr(e, "description", e.event_type),
"timestamp": str(getattr(e, "timestamp", "")),
}
for e in events
]
)
except Exception as exc:
logger.debug("System events fetch error: %s", exc)
return JSONResponse([])
@router.get("/api/briefing/status", response_class=JSONResponse)
async def api_briefing_status():
"""Return briefing status including pending approvals and last generated time."""
from timmy import approvals
from timmy.briefing import engine as briefing_engine
pending = approvals.list_pending()
pending_count = len(pending)
last_generated = None
try:
cached = briefing_engine.get_cached()
if cached:
last_generated = cached.generated_at.isoformat()
except Exception:
logger.debug("Failed to read briefing cache")
return JSONResponse(
{
"status": "ok",
"pending_approvals": pending_count,
"last_generated": last_generated,
}
)
@router.get("/api/memory/status", response_class=JSONResponse)
async def api_memory_status():
"""Return memory database status including file info and indexed files count."""
from timmy.memory_system import get_memory_stats
db_path = Path(settings.repo_root) / "data" / "memory.db"
db_exists = db_path.exists()
db_size = db_path.stat().st_size if db_exists else 0
try:
stats = get_memory_stats()
indexed_files = stats.get("total_entries", 0)
except Exception:
logger.debug("Failed to get memory stats")
indexed_files = 0
return JSONResponse(
{
"status": "ok",
"db_exists": db_exists,
"db_size_bytes": db_size,
"indexed_files": indexed_files,
}
)
@router.get("/api/swarm/status", response_class=JSONResponse)
async def api_swarm_status():
"""Return swarm worker status and pending tasks count."""
from dashboard.routes.tasks import _get_db
pending_tasks = 0
try:
with _get_db() as db:
row = db.execute(
"SELECT COUNT(*) as cnt FROM tasks WHERE status IN ('pending_approval','approved')"
).fetchone()
pending_tasks = row["cnt"] if row else 0
except Exception:
logger.debug("Failed to count pending tasks")
return JSONResponse(
{
"status": "ok",
"active_workers": 0,
"pending_tasks": pending_tasks,
"message": "Swarm monitoring endpoint",
}
)