diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 61dd99be..7c33f6d1 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -7,11 +7,8 @@ Key improvements: 4. Security and logging handled by dedicated middleware """ -import asyncio -import json import logging import re -from contextlib import asynccontextmanager from pathlib import Path from fastapi import FastAPI, Request, WebSocket @@ -66,7 +63,11 @@ from dashboard.routes.voice import router as voice_router from dashboard.routes.work_orders import router as work_orders_router from dashboard.routes.world import matrix_router from dashboard.routes.world import router as world_router -from timmy.workshop_state import PRESENCE_FILE +from dashboard.schedulers import ( # noqa: F401 — re-export for backward compat + _SYNTHESIZED_STATE, + _presence_watcher, +) +from dashboard.startup import lifespan class _ColorFormatter(logging.Formatter): @@ -139,444 +140,6 @@ logger = logging.getLogger(__name__) BASE_DIR = Path(__file__).parent PROJECT_ROOT = BASE_DIR.parent.parent -_BRIEFING_INTERVAL_HOURS = 6 - - -async def _briefing_scheduler() -> None: - """Background task: regenerate Timmy's briefing every 6 hours.""" - from infrastructure.notifications.push import notify_briefing_ready - from timmy.briefing import engine as briefing_engine - - await asyncio.sleep(2) - - while True: - try: - if briefing_engine.needs_refresh(): - logger.info("Generating morning briefing…") - briefing = briefing_engine.generate() - await notify_briefing_ready(briefing) - else: - logger.info("Briefing is fresh; skipping generation.") - except Exception as exc: - logger.error("Briefing scheduler error: %s", exc) - - await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600) - - -async def _thinking_scheduler() -> None: - """Background task: execute Timmy's thinking cycle every N seconds.""" - from timmy.thinking import thinking_engine - - await asyncio.sleep(5) # Stagger after briefing scheduler - - while True: - try: - if settings.thinking_enabled: - await asyncio.wait_for( - thinking_engine.think_once(), - timeout=settings.thinking_timeout_seconds, - ) - except TimeoutError: - logger.warning( - "Thinking cycle timed out after %ds — Ollama may be unresponsive", - settings.thinking_timeout_seconds, - ) - except asyncio.CancelledError: - raise - except Exception as exc: - logger.error("Thinking scheduler error: %s", exc) - - await asyncio.sleep(settings.thinking_interval_seconds) - - -async def _hermes_scheduler() -> None: - """Background task: Hermes system health monitor, runs every 5 minutes. - - Checks memory, disk, Ollama, processes, and network. - Auto-resolves what it can; fires push notifications when human help is needed. - """ - from infrastructure.hermes.monitor import hermes_monitor - - await asyncio.sleep(20) # Stagger after other schedulers - - while True: - try: - if settings.hermes_enabled: - report = await hermes_monitor.run_cycle() - if report.has_issues: - logger.warning( - "Hermes health issues detected — overall: %s", - report.overall.value, - ) - except asyncio.CancelledError: - raise - except Exception as exc: - logger.error("Hermes scheduler error: %s", exc) - - await asyncio.sleep(settings.hermes_interval_seconds) - - -async def _loop_qa_scheduler() -> None: - """Background task: run capability self-tests on a separate timer. - - Independent of the thinking loop — runs every N thinking ticks - to probe subsystems and detect degradation. - """ - from timmy.loop_qa import loop_qa_orchestrator - - await asyncio.sleep(10) # Stagger after thinking scheduler - - while True: - try: - if settings.loop_qa_enabled: - result = await asyncio.wait_for( - loop_qa_orchestrator.run_next_test(), - timeout=settings.thinking_timeout_seconds, - ) - if result: - status = "PASS" if result["success"] else "FAIL" - logger.info( - "Loop QA [%s]: %s — %s", - result["capability"], - status, - result.get("details", "")[:80], - ) - except TimeoutError: - logger.warning( - "Loop QA test timed out after %ds", - settings.thinking_timeout_seconds, - ) - except asyncio.CancelledError: - raise - except Exception as exc: - logger.error("Loop QA scheduler error: %s", exc) - - interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks - await asyncio.sleep(interval) - - -_PRESENCE_POLL_SECONDS = 30 -_PRESENCE_INITIAL_DELAY = 3 - -_SYNTHESIZED_STATE: dict = { - "version": 1, - "liveness": None, - "current_focus": "", - "mood": "idle", - "active_threads": [], - "recent_events": [], - "concerns": [], -} - - -async def _presence_watcher() -> None: - """Background task: watch ~/.timmy/presence.json and broadcast changes via WS. - - Polls the file every 30 seconds (matching Timmy's write cadence). - If the file doesn't exist, broadcasts a synthesised idle state. - """ - from infrastructure.ws_manager.handler import ws_manager as ws_mgr - - await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers - - last_mtime: float = 0.0 - - while True: - try: - if PRESENCE_FILE.exists(): - mtime = PRESENCE_FILE.stat().st_mtime - if mtime != last_mtime: - last_mtime = mtime - raw = await asyncio.to_thread(PRESENCE_FILE.read_text) - state = json.loads(raw) - await ws_mgr.broadcast("timmy_state", state) - else: - # File absent — broadcast synthesised state once per cycle - if last_mtime != -1.0: - last_mtime = -1.0 - await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE) - except json.JSONDecodeError as exc: - logger.warning("presence.json parse error: %s", exc) - except Exception as exc: - logger.warning("Presence watcher error: %s", exc) - - await asyncio.sleep(_PRESENCE_POLL_SECONDS) - - -async def _start_chat_integrations_background() -> None: - """Background task: start chat integrations without blocking startup.""" - from integrations.chat_bridge.registry import platform_registry - from integrations.chat_bridge.vendors.discord import discord_bot - from integrations.telegram_bot.bot import telegram_bot - - await asyncio.sleep(0.5) - - # Register Discord in the platform registry - platform_registry.register(discord_bot) - - if settings.telegram_token: - try: - await telegram_bot.start() - logger.info("Telegram bot started") - except Exception as exc: - logger.warning("Failed to start Telegram bot: %s", exc) - else: - logger.debug("Telegram: no token configured, skipping") - - if settings.discord_token or discord_bot.load_token(): - try: - await discord_bot.start() - logger.info("Discord bot started") - except Exception as exc: - logger.warning("Failed to start Discord bot: %s", exc) - else: - logger.debug("Discord: no token configured, skipping") - - # If Discord isn't connected yet, start a watcher that polls for the - # token to appear in the environment or .env file. - if discord_bot.state.name != "CONNECTED": - asyncio.create_task(_discord_token_watcher()) - - -async def _discord_token_watcher() -> None: - """Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot.""" - from integrations.chat_bridge.vendors.discord import discord_bot - - # Don't poll if discord.py isn't even installed - try: - import discord as _discord_check # noqa: F401 - except ImportError: - logger.debug("discord.py not installed — token watcher exiting") - return - - while True: - await asyncio.sleep(30) - - if discord_bot.state.name == "CONNECTED": - return # Already running — stop watching - - # 1. Check settings (pydantic-settings reads env on instantiation; - # hot-reload is handled by re-reading .env below) - token = settings.discord_token - - # 2. Re-read .env file for hot-reload - if not token: - try: - from dotenv import dotenv_values - - env_path = Path(settings.repo_root) / ".env" - if env_path.exists(): - vals = dotenv_values(env_path) - token = vals.get("DISCORD_TOKEN", "") - except ImportError: - pass # python-dotenv not installed - - # 3. Check state file (written by /discord/setup) - if not token: - token = discord_bot.load_token() or "" - - if token: - try: - logger.info( - "Discord watcher: token found, attempting start (state=%s)", - discord_bot.state.name, - ) - success = await discord_bot.start(token=token) - if success: - logger.info("Discord bot auto-started (token detected)") - return # Done — stop watching - logger.warning( - "Discord watcher: start() returned False (state=%s)", - discord_bot.state.name, - ) - except Exception as exc: - logger.warning("Discord auto-start failed: %s", exc) - - -def _startup_init() -> None: - """Validate config and enable event persistence.""" - from config import validate_startup - - validate_startup() - - from infrastructure.events.bus import init_event_bus_persistence - - init_event_bus_persistence() - - from spark.engine import get_spark_engine - - if get_spark_engine().enabled: - logger.info("Spark Intelligence active — event capture enabled") - - -def _startup_background_tasks() -> list[asyncio.Task]: - """Spawn all recurring background tasks (non-blocking).""" - bg_tasks = [ - asyncio.create_task(_briefing_scheduler()), - asyncio.create_task(_thinking_scheduler()), - asyncio.create_task(_loop_qa_scheduler()), - asyncio.create_task(_presence_watcher()), - asyncio.create_task(_start_chat_integrations_background()), - asyncio.create_task(_hermes_scheduler()), - ] - try: - from timmy.paperclip import start_paperclip_poller - - bg_tasks.append(asyncio.create_task(start_paperclip_poller())) - logger.info("Paperclip poller started") - except ImportError: - logger.debug("Paperclip module not found, skipping poller") - - return bg_tasks - - -def _try_prune(label: str, prune_fn, days: int) -> None: - """Run a prune function, log results, swallow errors.""" - try: - pruned = prune_fn() - if pruned: - logger.info( - "%s auto-prune: removed %d entries older than %d days", - label, - pruned, - days, - ) - except Exception as exc: - logger.debug("%s auto-prune skipped: %s", label, exc) - - -def _check_vault_size() -> None: - """Warn if the memory vault exceeds the configured size limit.""" - try: - vault_path = Path(settings.repo_root) / "memory" / "notes" - if vault_path.exists(): - total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file()) - total_mb = total_bytes / (1024 * 1024) - if total_mb > settings.memory_vault_max_mb: - logger.warning( - "Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes", - total_mb, - settings.memory_vault_max_mb, - ) - except Exception as exc: - logger.debug("Vault size check skipped: %s", exc) - - -def _startup_pruning() -> None: - """Auto-prune old memories, thoughts, and events on startup.""" - if settings.memory_prune_days > 0: - from timmy.memory_system import prune_memories - - _try_prune( - "Memory", - lambda: prune_memories( - older_than_days=settings.memory_prune_days, - keep_facts=settings.memory_prune_keep_facts, - ), - settings.memory_prune_days, - ) - - if settings.thoughts_prune_days > 0: - from timmy.thinking import thinking_engine - - _try_prune( - "Thought", - lambda: thinking_engine.prune_old_thoughts( - keep_days=settings.thoughts_prune_days, - keep_min=settings.thoughts_prune_keep_min, - ), - settings.thoughts_prune_days, - ) - - if settings.events_prune_days > 0: - from swarm.event_log import prune_old_events - - _try_prune( - "Event", - lambda: prune_old_events( - keep_days=settings.events_prune_days, - keep_min=settings.events_prune_keep_min, - ), - settings.events_prune_days, - ) - - if settings.memory_vault_max_mb > 0: - _check_vault_size() - - -async def _shutdown_cleanup( - bg_tasks: list[asyncio.Task], - workshop_heartbeat, -) -> None: - """Stop chat bots, MCP sessions, heartbeat, and cancel background tasks.""" - from integrations.chat_bridge.vendors.discord import discord_bot - from integrations.telegram_bot.bot import telegram_bot - - await discord_bot.stop() - await telegram_bot.stop() - - try: - from timmy.mcp_tools import close_mcp_sessions - - await close_mcp_sessions() - except Exception as exc: - logger.debug("MCP shutdown: %s", exc) - - await workshop_heartbeat.stop() - - for task in bg_tasks: - task.cancel() - try: - await task - except asyncio.CancelledError: - pass - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan manager with non-blocking startup.""" - _startup_init() - bg_tasks = _startup_background_tasks() - _startup_pruning() - - # Start Workshop presence heartbeat with WS relay - from dashboard.routes.world import broadcast_world_state - from timmy.workshop_state import WorkshopHeartbeat - - workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state) - await workshop_heartbeat.start() - - # Register session logger with error capture - try: - from infrastructure.error_capture import register_error_recorder - from timmy.session_logger import get_session_logger - - register_error_recorder(get_session_logger().record_error) - except Exception: - logger.debug("Failed to register error recorder") - - # Mark session start for sovereignty duration tracking - try: - from timmy.sovereignty import mark_session_start - - mark_session_start() - except Exception: - logger.debug("Failed to mark sovereignty session start") - - logger.info("✓ Dashboard ready for requests") - - yield - - await _shutdown_cleanup(bg_tasks, workshop_heartbeat) - - # Generate and commit sovereignty session report - try: - from timmy.sovereignty import generate_and_commit_report - - await generate_and_commit_report() - except Exception as exc: - logger.warning("Sovereignty report generation failed at shutdown: %s", exc) - app = FastAPI( title="Mission Control", diff --git a/src/dashboard/schedulers.py b/src/dashboard/schedulers.py new file mode 100644 index 00000000..ad3f9981 --- /dev/null +++ b/src/dashboard/schedulers.py @@ -0,0 +1,278 @@ +"""Background scheduler coroutines for the Timmy dashboard.""" + +import asyncio +import json +import logging +from pathlib import Path + +from config import settings +from timmy.workshop_state import PRESENCE_FILE + +logger = logging.getLogger(__name__) + +__all__ = [ + "_BRIEFING_INTERVAL_HOURS", + "_briefing_scheduler", + "_thinking_scheduler", + "_hermes_scheduler", + "_loop_qa_scheduler", + "_PRESENCE_POLL_SECONDS", + "_PRESENCE_INITIAL_DELAY", + "_SYNTHESIZED_STATE", + "_presence_watcher", + "_start_chat_integrations_background", + "_discord_token_watcher", +] + +_BRIEFING_INTERVAL_HOURS = 6 + + +async def _briefing_scheduler() -> None: + """Background task: regenerate Timmy's briefing every 6 hours.""" + from infrastructure.notifications.push import notify_briefing_ready + from timmy.briefing import engine as briefing_engine + + await asyncio.sleep(2) + + while True: + try: + if briefing_engine.needs_refresh(): + logger.info("Generating morning briefing…") + briefing = briefing_engine.generate() + await notify_briefing_ready(briefing) + else: + logger.info("Briefing is fresh; skipping generation.") + except Exception as exc: + logger.error("Briefing scheduler error: %s", exc) + + await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600) + + +async def _thinking_scheduler() -> None: + """Background task: execute Timmy's thinking cycle every N seconds.""" + from timmy.thinking import thinking_engine + + await asyncio.sleep(5) # Stagger after briefing scheduler + + while True: + try: + if settings.thinking_enabled: + await asyncio.wait_for( + thinking_engine.think_once(), + timeout=settings.thinking_timeout_seconds, + ) + except TimeoutError: + logger.warning( + "Thinking cycle timed out after %ds — Ollama may be unresponsive", + settings.thinking_timeout_seconds, + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error("Thinking scheduler error: %s", exc) + + await asyncio.sleep(settings.thinking_interval_seconds) + + +async def _hermes_scheduler() -> None: + """Background task: Hermes system health monitor, runs every 5 minutes. + + Checks memory, disk, Ollama, processes, and network. + Auto-resolves what it can; fires push notifications when human help is needed. + """ + from infrastructure.hermes.monitor import hermes_monitor + + await asyncio.sleep(20) # Stagger after other schedulers + + while True: + try: + if settings.hermes_enabled: + report = await hermes_monitor.run_cycle() + if report.has_issues: + logger.warning( + "Hermes health issues detected — overall: %s", + report.overall.value, + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error("Hermes scheduler error: %s", exc) + + await asyncio.sleep(settings.hermes_interval_seconds) + + +async def _loop_qa_scheduler() -> None: + """Background task: run capability self-tests on a separate timer. + + Independent of the thinking loop — runs every N thinking ticks + to probe subsystems and detect degradation. + """ + from timmy.loop_qa import loop_qa_orchestrator + + await asyncio.sleep(10) # Stagger after thinking scheduler + + while True: + try: + if settings.loop_qa_enabled: + result = await asyncio.wait_for( + loop_qa_orchestrator.run_next_test(), + timeout=settings.thinking_timeout_seconds, + ) + if result: + status = "PASS" if result["success"] else "FAIL" + logger.info( + "Loop QA [%s]: %s — %s", + result["capability"], + status, + result.get("details", "")[:80], + ) + except TimeoutError: + logger.warning( + "Loop QA test timed out after %ds", + settings.thinking_timeout_seconds, + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.error("Loop QA scheduler error: %s", exc) + + interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks + await asyncio.sleep(interval) + + +_PRESENCE_POLL_SECONDS = 30 +_PRESENCE_INITIAL_DELAY = 3 + +_SYNTHESIZED_STATE: dict = { + "version": 1, + "liveness": None, + "current_focus": "", + "mood": "idle", + "active_threads": [], + "recent_events": [], + "concerns": [], +} + + +async def _presence_watcher() -> None: + """Background task: watch ~/.timmy/presence.json and broadcast changes via WS. + + Polls the file every 30 seconds (matching Timmy's write cadence). + If the file doesn't exist, broadcasts a synthesised idle state. + """ + from infrastructure.ws_manager.handler import ws_manager as ws_mgr + + await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers + + last_mtime: float = 0.0 + + while True: + try: + if PRESENCE_FILE.exists(): + mtime = PRESENCE_FILE.stat().st_mtime + if mtime != last_mtime: + last_mtime = mtime + raw = await asyncio.to_thread(PRESENCE_FILE.read_text) + state = json.loads(raw) + await ws_mgr.broadcast("timmy_state", state) + else: + # File absent — broadcast synthesised state once per cycle + if last_mtime != -1.0: + last_mtime = -1.0 + await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE) + except json.JSONDecodeError as exc: + logger.warning("presence.json parse error: %s", exc) + except Exception as exc: + logger.warning("Presence watcher error: %s", exc) + + await asyncio.sleep(_PRESENCE_POLL_SECONDS) + + +async def _start_chat_integrations_background() -> None: + """Background task: start chat integrations without blocking startup.""" + from integrations.chat_bridge.registry import platform_registry + from integrations.chat_bridge.vendors.discord import discord_bot + from integrations.telegram_bot.bot import telegram_bot + + await asyncio.sleep(0.5) + + # Register Discord in the platform registry + platform_registry.register(discord_bot) + + if settings.telegram_token: + try: + await telegram_bot.start() + logger.info("Telegram bot started") + except Exception as exc: + logger.warning("Failed to start Telegram bot: %s", exc) + else: + logger.debug("Telegram: no token configured, skipping") + + if settings.discord_token or discord_bot.load_token(): + try: + await discord_bot.start() + logger.info("Discord bot started") + except Exception as exc: + logger.warning("Failed to start Discord bot: %s", exc) + else: + logger.debug("Discord: no token configured, skipping") + + # If Discord isn't connected yet, start a watcher that polls for the + # token to appear in the environment or .env file. + if discord_bot.state.name != "CONNECTED": + asyncio.create_task(_discord_token_watcher()) + + +async def _discord_token_watcher() -> None: + """Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot.""" + from integrations.chat_bridge.vendors.discord import discord_bot + + # Don't poll if discord.py isn't even installed + try: + import discord as _discord_check # noqa: F401 + except ImportError: + logger.debug("discord.py not installed — token watcher exiting") + return + + while True: + await asyncio.sleep(30) + + if discord_bot.state.name == "CONNECTED": + return # Already running — stop watching + + # 1. Check settings (pydantic-settings reads env on instantiation; + # hot-reload is handled by re-reading .env below) + token = settings.discord_token + + # 2. Re-read .env file for hot-reload + if not token: + try: + from dotenv import dotenv_values + + env_path = Path(settings.repo_root) / ".env" + if env_path.exists(): + vals = dotenv_values(env_path) + token = vals.get("DISCORD_TOKEN", "") + except ImportError: + pass # python-dotenv not installed + + # 3. Check state file (written by /discord/setup) + if not token: + token = discord_bot.load_token() or "" + + if token: + try: + logger.info( + "Discord watcher: token found, attempting start (state=%s)", + discord_bot.state.name, + ) + success = await discord_bot.start(token=token) + if success: + logger.info("Discord bot auto-started (token detected)") + return # Done — stop watching + logger.warning( + "Discord watcher: start() returned False (state=%s)", + discord_bot.state.name, + ) + except Exception as exc: + logger.warning("Discord auto-start failed: %s", exc) diff --git a/src/dashboard/startup.py b/src/dashboard/startup.py new file mode 100644 index 00000000..ecd8cac1 --- /dev/null +++ b/src/dashboard/startup.py @@ -0,0 +1,205 @@ +"""Application lifecycle management — startup, shutdown, and background task orchestration.""" + +import asyncio +import logging +from contextlib import asynccontextmanager +from pathlib import Path + +from fastapi import FastAPI + +from config import settings +from dashboard.schedulers import ( + _briefing_scheduler, + _hermes_scheduler, + _loop_qa_scheduler, + _presence_watcher, + _start_chat_integrations_background, + _thinking_scheduler, +) + +logger = logging.getLogger(__name__) + + +def _startup_init() -> None: + """Validate config and enable event persistence.""" + from config import validate_startup + + validate_startup() + + from infrastructure.events.bus import init_event_bus_persistence + + init_event_bus_persistence() + + from spark.engine import get_spark_engine + + if get_spark_engine().enabled: + logger.info("Spark Intelligence active — event capture enabled") + + +def _startup_background_tasks() -> list[asyncio.Task]: + """Spawn all recurring background tasks (non-blocking).""" + bg_tasks = [ + asyncio.create_task(_briefing_scheduler()), + asyncio.create_task(_thinking_scheduler()), + asyncio.create_task(_loop_qa_scheduler()), + asyncio.create_task(_presence_watcher()), + asyncio.create_task(_start_chat_integrations_background()), + asyncio.create_task(_hermes_scheduler()), + ] + try: + from timmy.paperclip import start_paperclip_poller + + bg_tasks.append(asyncio.create_task(start_paperclip_poller())) + logger.info("Paperclip poller started") + except ImportError: + logger.debug("Paperclip module not found, skipping poller") + + return bg_tasks + + +def _try_prune(label: str, prune_fn, days: int) -> None: + """Run a prune function, log results, swallow errors.""" + try: + pruned = prune_fn() + if pruned: + logger.info( + "%s auto-prune: removed %d entries older than %d days", + label, + pruned, + days, + ) + except Exception as exc: + logger.debug("%s auto-prune skipped: %s", label, exc) + + +def _check_vault_size() -> None: + """Warn if the memory vault exceeds the configured size limit.""" + try: + vault_path = Path(settings.repo_root) / "memory" / "notes" + if vault_path.exists(): + total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file()) + total_mb = total_bytes / (1024 * 1024) + if total_mb > settings.memory_vault_max_mb: + logger.warning( + "Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes", + total_mb, + settings.memory_vault_max_mb, + ) + except Exception as exc: + logger.debug("Vault size check skipped: %s", exc) + + +def _startup_pruning() -> None: + """Auto-prune old memories, thoughts, and events on startup.""" + if settings.memory_prune_days > 0: + from timmy.memory_system import prune_memories + + _try_prune( + "Memory", + lambda: prune_memories( + older_than_days=settings.memory_prune_days, + keep_facts=settings.memory_prune_keep_facts, + ), + settings.memory_prune_days, + ) + + if settings.thoughts_prune_days > 0: + from timmy.thinking import thinking_engine + + _try_prune( + "Thought", + lambda: thinking_engine.prune_old_thoughts( + keep_days=settings.thoughts_prune_days, + keep_min=settings.thoughts_prune_keep_min, + ), + settings.thoughts_prune_days, + ) + + if settings.events_prune_days > 0: + from swarm.event_log import prune_old_events + + _try_prune( + "Event", + lambda: prune_old_events( + keep_days=settings.events_prune_days, + keep_min=settings.events_prune_keep_min, + ), + settings.events_prune_days, + ) + + if settings.memory_vault_max_mb > 0: + _check_vault_size() + + +async def _shutdown_cleanup( + bg_tasks: list[asyncio.Task], + workshop_heartbeat, +) -> None: + """Stop chat bots, MCP sessions, heartbeat, and cancel background tasks.""" + from integrations.chat_bridge.vendors.discord import discord_bot + from integrations.telegram_bot.bot import telegram_bot + + await discord_bot.stop() + await telegram_bot.stop() + + try: + from timmy.mcp_tools import close_mcp_sessions + + await close_mcp_sessions() + except Exception as exc: + logger.debug("MCP shutdown: %s", exc) + + await workshop_heartbeat.stop() + + for task in bg_tasks: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan manager with non-blocking startup.""" + _startup_init() + bg_tasks = _startup_background_tasks() + _startup_pruning() + + # Start Workshop presence heartbeat with WS relay + from dashboard.routes.world import broadcast_world_state + from timmy.workshop_state import WorkshopHeartbeat + + workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state) + await workshop_heartbeat.start() + + # Register session logger with error capture + try: + from infrastructure.error_capture import register_error_recorder + from timmy.session_logger import get_session_logger + + register_error_recorder(get_session_logger().record_error) + except Exception: + logger.debug("Failed to register error recorder") + + # Mark session start for sovereignty duration tracking + try: + from timmy.sovereignty import mark_session_start + + mark_session_start() + except Exception: + logger.debug("Failed to mark sovereignty session start") + + logger.info("✓ Dashboard ready for requests") + + yield + + await _shutdown_cleanup(bg_tasks, workshop_heartbeat) + + # Generate and commit sovereignty session report + try: + from timmy.sovereignty import generate_and_commit_report + + await generate_and_commit_report() + except Exception as exc: + logger.warning("Sovereignty report generation failed at shutdown: %s", exc)