Extract background scheduler coroutines into dashboard/schedulers.py (278 lines) and lifecycle management into dashboard/startup.py (205 lines). app.py reduced from 780 to 343 lines — thin entry point with app creation, middleware config, router registration, and route handlers. Net reduction: -437 lines from app.py, +46 lines overhead from module boilerplate.
206 lines
6.2 KiB
Python
206 lines
6.2 KiB
Python
"""Application lifecycle management — startup, shutdown, and background task orchestration."""
|
|
|
|
import asyncio
|
|
import logging
|
|
from contextlib import asynccontextmanager
|
|
from pathlib import Path
|
|
|
|
from fastapi import FastAPI
|
|
|
|
from config import settings
|
|
from dashboard.schedulers import (
|
|
_briefing_scheduler,
|
|
_hermes_scheduler,
|
|
_loop_qa_scheduler,
|
|
_presence_watcher,
|
|
_start_chat_integrations_background,
|
|
_thinking_scheduler,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _startup_init() -> None:
|
|
"""Validate config and enable event persistence."""
|
|
from config import validate_startup
|
|
|
|
validate_startup()
|
|
|
|
from infrastructure.events.bus import init_event_bus_persistence
|
|
|
|
init_event_bus_persistence()
|
|
|
|
from spark.engine import get_spark_engine
|
|
|
|
if get_spark_engine().enabled:
|
|
logger.info("Spark Intelligence active — event capture enabled")
|
|
|
|
|
|
def _startup_background_tasks() -> list[asyncio.Task]:
|
|
"""Spawn all recurring background tasks (non-blocking)."""
|
|
bg_tasks = [
|
|
asyncio.create_task(_briefing_scheduler()),
|
|
asyncio.create_task(_thinking_scheduler()),
|
|
asyncio.create_task(_loop_qa_scheduler()),
|
|
asyncio.create_task(_presence_watcher()),
|
|
asyncio.create_task(_start_chat_integrations_background()),
|
|
asyncio.create_task(_hermes_scheduler()),
|
|
]
|
|
try:
|
|
from timmy.paperclip import start_paperclip_poller
|
|
|
|
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
|
|
logger.info("Paperclip poller started")
|
|
except ImportError:
|
|
logger.debug("Paperclip module not found, skipping poller")
|
|
|
|
return bg_tasks
|
|
|
|
|
|
def _try_prune(label: str, prune_fn, days: int) -> None:
|
|
"""Run a prune function, log results, swallow errors."""
|
|
try:
|
|
pruned = prune_fn()
|
|
if pruned:
|
|
logger.info(
|
|
"%s auto-prune: removed %d entries older than %d days",
|
|
label,
|
|
pruned,
|
|
days,
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("%s auto-prune skipped: %s", label, exc)
|
|
|
|
|
|
def _check_vault_size() -> None:
|
|
"""Warn if the memory vault exceeds the configured size limit."""
|
|
try:
|
|
vault_path = Path(settings.repo_root) / "memory" / "notes"
|
|
if vault_path.exists():
|
|
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
|
|
total_mb = total_bytes / (1024 * 1024)
|
|
if total_mb > settings.memory_vault_max_mb:
|
|
logger.warning(
|
|
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
|
|
total_mb,
|
|
settings.memory_vault_max_mb,
|
|
)
|
|
except Exception as exc:
|
|
logger.debug("Vault size check skipped: %s", exc)
|
|
|
|
|
|
def _startup_pruning() -> None:
|
|
"""Auto-prune old memories, thoughts, and events on startup."""
|
|
if settings.memory_prune_days > 0:
|
|
from timmy.memory_system import prune_memories
|
|
|
|
_try_prune(
|
|
"Memory",
|
|
lambda: prune_memories(
|
|
older_than_days=settings.memory_prune_days,
|
|
keep_facts=settings.memory_prune_keep_facts,
|
|
),
|
|
settings.memory_prune_days,
|
|
)
|
|
|
|
if settings.thoughts_prune_days > 0:
|
|
from timmy.thinking import thinking_engine
|
|
|
|
_try_prune(
|
|
"Thought",
|
|
lambda: thinking_engine.prune_old_thoughts(
|
|
keep_days=settings.thoughts_prune_days,
|
|
keep_min=settings.thoughts_prune_keep_min,
|
|
),
|
|
settings.thoughts_prune_days,
|
|
)
|
|
|
|
if settings.events_prune_days > 0:
|
|
from swarm.event_log import prune_old_events
|
|
|
|
_try_prune(
|
|
"Event",
|
|
lambda: prune_old_events(
|
|
keep_days=settings.events_prune_days,
|
|
keep_min=settings.events_prune_keep_min,
|
|
),
|
|
settings.events_prune_days,
|
|
)
|
|
|
|
if settings.memory_vault_max_mb > 0:
|
|
_check_vault_size()
|
|
|
|
|
|
async def _shutdown_cleanup(
|
|
bg_tasks: list[asyncio.Task],
|
|
workshop_heartbeat,
|
|
) -> None:
|
|
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
|
|
from integrations.chat_bridge.vendors.discord import discord_bot
|
|
from integrations.telegram_bot.bot import telegram_bot
|
|
|
|
await discord_bot.stop()
|
|
await telegram_bot.stop()
|
|
|
|
try:
|
|
from timmy.mcp_tools import close_mcp_sessions
|
|
|
|
await close_mcp_sessions()
|
|
except Exception as exc:
|
|
logger.debug("MCP shutdown: %s", exc)
|
|
|
|
await workshop_heartbeat.stop()
|
|
|
|
for task in bg_tasks:
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan manager with non-blocking startup."""
|
|
_startup_init()
|
|
bg_tasks = _startup_background_tasks()
|
|
_startup_pruning()
|
|
|
|
# Start Workshop presence heartbeat with WS relay
|
|
from dashboard.routes.world import broadcast_world_state
|
|
from timmy.workshop_state import WorkshopHeartbeat
|
|
|
|
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
|
await workshop_heartbeat.start()
|
|
|
|
# Register session logger with error capture
|
|
try:
|
|
from infrastructure.error_capture import register_error_recorder
|
|
from timmy.session_logger import get_session_logger
|
|
|
|
register_error_recorder(get_session_logger().record_error)
|
|
except Exception:
|
|
logger.debug("Failed to register error recorder")
|
|
|
|
# Mark session start for sovereignty duration tracking
|
|
try:
|
|
from timmy.sovereignty import mark_session_start
|
|
|
|
mark_session_start()
|
|
except Exception:
|
|
logger.debug("Failed to mark sovereignty session start")
|
|
|
|
logger.info("✓ Dashboard ready for requests")
|
|
|
|
yield
|
|
|
|
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
|
|
|
|
# Generate and commit sovereignty session report
|
|
try:
|
|
from timmy.sovereignty import generate_and_commit_report
|
|
|
|
await generate_and_commit_report()
|
|
except Exception as exc:
|
|
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
|