From 9f2728f529b1af352f4a41237707cc83d930f5c4 Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Thu, 19 Mar 2026 19:30:32 -0400 Subject: [PATCH] refactor: break up lifespan() into testable helpers (#515) Co-authored-by: Kimi Agent Co-committed-by: Kimi Agent --- src/dashboard/app.py | 99 ++++++++++++++++++++++++-------------------- 1 file changed, 54 insertions(+), 45 deletions(-) diff --git a/src/dashboard/app.py b/src/dashboard/app.py index db7e115..137ded5 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -329,33 +329,35 @@ async def _discord_token_watcher() -> None: logger.warning("Discord auto-start failed: %s", exc) -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan manager with non-blocking startup.""" - - # Validate security config (no-op in test mode) +def _startup_init() -> None: + """Validate config and enable event persistence.""" from config import validate_startup validate_startup() - # Enable event persistence (unified EventBus + swarm event_log) from infrastructure.events.bus import init_event_bus_persistence init_event_bus_persistence() - # Create all background tasks without waiting for them - briefing_task = asyncio.create_task(_briefing_scheduler()) - thinking_task = asyncio.create_task(_thinking_scheduler()) - loop_qa_task = asyncio.create_task(_loop_qa_scheduler()) - presence_task = asyncio.create_task(_presence_watcher()) - - # Initialize Spark Intelligence engine from spark.engine import get_spark_engine if get_spark_engine().enabled: logger.info("Spark Intelligence active — event capture enabled") - # Auto-prune old vector store memories on startup + +def _startup_background_tasks() -> list[asyncio.Task]: + """Spawn all recurring background tasks (non-blocking).""" + return [ + asyncio.create_task(_briefing_scheduler()), + asyncio.create_task(_thinking_scheduler()), + asyncio.create_task(_loop_qa_scheduler()), + asyncio.create_task(_presence_watcher()), + asyncio.create_task(_start_chat_integrations_background()), + ] + + +def _startup_pruning() -> None: + """Auto-prune old memories, thoughts, and events on startup.""" if settings.memory_prune_days > 0: try: from timmy.memory_system import prune_memories @@ -373,7 +375,6 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.debug("Memory auto-prune skipped: %s", exc) - # Auto-prune old thoughts on startup if settings.thoughts_prune_days > 0: try: from timmy.thinking import thinking_engine @@ -391,7 +392,6 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.debug("Thought auto-prune skipped: %s", exc) - # Auto-prune old system events on startup if settings.events_prune_days > 0: try: from swarm.event_log import prune_old_events @@ -409,7 +409,6 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.debug("Event auto-prune skipped: %s", exc) - # Warn if memory vault exceeds size limit if settings.memory_vault_max_mb > 0: try: vault_path = Path(settings.repo_root) / "memory" / "notes" @@ -425,6 +424,42 @@ async def lifespan(app: FastAPI): except Exception as exc: logger.debug("Vault size check skipped: %s", exc) + +async def _shutdown_cleanup( + bg_tasks: list[asyncio.Task], + workshop_heartbeat, +) -> None: + """Stop chat bots, MCP sessions, heartbeat, and cancel background tasks.""" + from integrations.chat_bridge.vendors.discord import discord_bot + from integrations.telegram_bot.bot import telegram_bot + + await discord_bot.stop() + await telegram_bot.stop() + + try: + from timmy.mcp_tools import close_mcp_sessions + + await close_mcp_sessions() + except Exception as exc: + logger.debug("MCP shutdown: %s", exc) + + await workshop_heartbeat.stop() + + for task in bg_tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan manager with non-blocking startup.""" + _startup_init() + bg_tasks = _startup_background_tasks() + _startup_pruning() + # Start Workshop presence heartbeat with WS relay from dashboard.routes.world import broadcast_world_state from timmy.workshop_state import WorkshopHeartbeat @@ -432,10 +467,7 @@ async def lifespan(app: FastAPI): workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state) await workshop_heartbeat.start() - # Start chat integrations in background - chat_task = asyncio.create_task(_start_chat_integrations_background()) - - # Register session logger with error capture (breaks infrastructure → timmy circular dep) + # Register session logger with error capture try: from infrastructure.error_capture import register_error_recorder from timmy.session_logger import get_session_logger @@ -448,30 +480,7 @@ async def lifespan(app: FastAPI): yield - # Cleanup on shutdown - from integrations.chat_bridge.vendors.discord import discord_bot - from integrations.telegram_bot.bot import telegram_bot - - await discord_bot.stop() - await telegram_bot.stop() - - # Close MCP tool server sessions - try: - from timmy.mcp_tools import close_mcp_sessions - - await close_mcp_sessions() - except Exception as exc: - logger.debug("MCP shutdown: %s", exc) - - await workshop_heartbeat.stop() - - for task in [briefing_task, thinking_task, chat_task, loop_qa_task, presence_task]: - if task: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass + await _shutdown_cleanup(bg_tasks, workshop_heartbeat) app = FastAPI(