This commit was merged in pull request #1455.
This commit is contained in:
@@ -7,11 +7,8 @@ Key improvements:
|
|||||||
4. Security and logging handled by dedicated middleware
|
4. Security and logging handled by dedicated middleware
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import json
|
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
from fastapi import FastAPI, Request, WebSocket
|
from fastapi import FastAPI, Request, WebSocket
|
||||||
@@ -66,7 +63,11 @@ from dashboard.routes.voice import router as voice_router
|
|||||||
from dashboard.routes.work_orders import router as work_orders_router
|
from dashboard.routes.work_orders import router as work_orders_router
|
||||||
from dashboard.routes.world import matrix_router
|
from dashboard.routes.world import matrix_router
|
||||||
from dashboard.routes.world import router as world_router
|
from dashboard.routes.world import router as world_router
|
||||||
from timmy.workshop_state import PRESENCE_FILE
|
from dashboard.schedulers import ( # noqa: F401 — re-export for backward compat
|
||||||
|
_SYNTHESIZED_STATE,
|
||||||
|
_presence_watcher,
|
||||||
|
)
|
||||||
|
from dashboard.startup import lifespan
|
||||||
|
|
||||||
|
|
||||||
class _ColorFormatter(logging.Formatter):
|
class _ColorFormatter(logging.Formatter):
|
||||||
@@ -139,444 +140,6 @@ logger = logging.getLogger(__name__)
|
|||||||
BASE_DIR = Path(__file__).parent
|
BASE_DIR = Path(__file__).parent
|
||||||
PROJECT_ROOT = BASE_DIR.parent.parent
|
PROJECT_ROOT = BASE_DIR.parent.parent
|
||||||
|
|
||||||
_BRIEFING_INTERVAL_HOURS = 6
|
|
||||||
|
|
||||||
|
|
||||||
async def _briefing_scheduler() -> None:
|
|
||||||
"""Background task: regenerate Timmy's briefing every 6 hours."""
|
|
||||||
from infrastructure.notifications.push import notify_briefing_ready
|
|
||||||
from timmy.briefing import engine as briefing_engine
|
|
||||||
|
|
||||||
await asyncio.sleep(2)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
if briefing_engine.needs_refresh():
|
|
||||||
logger.info("Generating morning briefing…")
|
|
||||||
briefing = briefing_engine.generate()
|
|
||||||
await notify_briefing_ready(briefing)
|
|
||||||
else:
|
|
||||||
logger.info("Briefing is fresh; skipping generation.")
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Briefing scheduler error: %s", exc)
|
|
||||||
|
|
||||||
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
|
|
||||||
|
|
||||||
|
|
||||||
async def _thinking_scheduler() -> None:
|
|
||||||
"""Background task: execute Timmy's thinking cycle every N seconds."""
|
|
||||||
from timmy.thinking import thinking_engine
|
|
||||||
|
|
||||||
await asyncio.sleep(5) # Stagger after briefing scheduler
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
if settings.thinking_enabled:
|
|
||||||
await asyncio.wait_for(
|
|
||||||
thinking_engine.think_once(),
|
|
||||||
timeout=settings.thinking_timeout_seconds,
|
|
||||||
)
|
|
||||||
except TimeoutError:
|
|
||||||
logger.warning(
|
|
||||||
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
|
|
||||||
settings.thinking_timeout_seconds,
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Thinking scheduler error: %s", exc)
|
|
||||||
|
|
||||||
await asyncio.sleep(settings.thinking_interval_seconds)
|
|
||||||
|
|
||||||
|
|
||||||
async def _hermes_scheduler() -> None:
|
|
||||||
"""Background task: Hermes system health monitor, runs every 5 minutes.
|
|
||||||
|
|
||||||
Checks memory, disk, Ollama, processes, and network.
|
|
||||||
Auto-resolves what it can; fires push notifications when human help is needed.
|
|
||||||
"""
|
|
||||||
from infrastructure.hermes.monitor import hermes_monitor
|
|
||||||
|
|
||||||
await asyncio.sleep(20) # Stagger after other schedulers
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
if settings.hermes_enabled:
|
|
||||||
report = await hermes_monitor.run_cycle()
|
|
||||||
if report.has_issues:
|
|
||||||
logger.warning(
|
|
||||||
"Hermes health issues detected — overall: %s",
|
|
||||||
report.overall.value,
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Hermes scheduler error: %s", exc)
|
|
||||||
|
|
||||||
await asyncio.sleep(settings.hermes_interval_seconds)
|
|
||||||
|
|
||||||
|
|
||||||
async def _loop_qa_scheduler() -> None:
|
|
||||||
"""Background task: run capability self-tests on a separate timer.
|
|
||||||
|
|
||||||
Independent of the thinking loop — runs every N thinking ticks
|
|
||||||
to probe subsystems and detect degradation.
|
|
||||||
"""
|
|
||||||
from timmy.loop_qa import loop_qa_orchestrator
|
|
||||||
|
|
||||||
await asyncio.sleep(10) # Stagger after thinking scheduler
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
if settings.loop_qa_enabled:
|
|
||||||
result = await asyncio.wait_for(
|
|
||||||
loop_qa_orchestrator.run_next_test(),
|
|
||||||
timeout=settings.thinking_timeout_seconds,
|
|
||||||
)
|
|
||||||
if result:
|
|
||||||
status = "PASS" if result["success"] else "FAIL"
|
|
||||||
logger.info(
|
|
||||||
"Loop QA [%s]: %s — %s",
|
|
||||||
result["capability"],
|
|
||||||
status,
|
|
||||||
result.get("details", "")[:80],
|
|
||||||
)
|
|
||||||
except TimeoutError:
|
|
||||||
logger.warning(
|
|
||||||
"Loop QA test timed out after %ds",
|
|
||||||
settings.thinking_timeout_seconds,
|
|
||||||
)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
raise
|
|
||||||
except Exception as exc:
|
|
||||||
logger.error("Loop QA scheduler error: %s", exc)
|
|
||||||
|
|
||||||
interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks
|
|
||||||
await asyncio.sleep(interval)
|
|
||||||
|
|
||||||
|
|
||||||
_PRESENCE_POLL_SECONDS = 30
|
|
||||||
_PRESENCE_INITIAL_DELAY = 3
|
|
||||||
|
|
||||||
_SYNTHESIZED_STATE: dict = {
|
|
||||||
"version": 1,
|
|
||||||
"liveness": None,
|
|
||||||
"current_focus": "",
|
|
||||||
"mood": "idle",
|
|
||||||
"active_threads": [],
|
|
||||||
"recent_events": [],
|
|
||||||
"concerns": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async def _presence_watcher() -> None:
|
|
||||||
"""Background task: watch ~/.timmy/presence.json and broadcast changes via WS.
|
|
||||||
|
|
||||||
Polls the file every 30 seconds (matching Timmy's write cadence).
|
|
||||||
If the file doesn't exist, broadcasts a synthesised idle state.
|
|
||||||
"""
|
|
||||||
from infrastructure.ws_manager.handler import ws_manager as ws_mgr
|
|
||||||
|
|
||||||
await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers
|
|
||||||
|
|
||||||
last_mtime: float = 0.0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
if PRESENCE_FILE.exists():
|
|
||||||
mtime = PRESENCE_FILE.stat().st_mtime
|
|
||||||
if mtime != last_mtime:
|
|
||||||
last_mtime = mtime
|
|
||||||
raw = await asyncio.to_thread(PRESENCE_FILE.read_text)
|
|
||||||
state = json.loads(raw)
|
|
||||||
await ws_mgr.broadcast("timmy_state", state)
|
|
||||||
else:
|
|
||||||
# File absent — broadcast synthesised state once per cycle
|
|
||||||
if last_mtime != -1.0:
|
|
||||||
last_mtime = -1.0
|
|
||||||
await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE)
|
|
||||||
except json.JSONDecodeError as exc:
|
|
||||||
logger.warning("presence.json parse error: %s", exc)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Presence watcher error: %s", exc)
|
|
||||||
|
|
||||||
await asyncio.sleep(_PRESENCE_POLL_SECONDS)
|
|
||||||
|
|
||||||
|
|
||||||
async def _start_chat_integrations_background() -> None:
|
|
||||||
"""Background task: start chat integrations without blocking startup."""
|
|
||||||
from integrations.chat_bridge.registry import platform_registry
|
|
||||||
from integrations.chat_bridge.vendors.discord import discord_bot
|
|
||||||
from integrations.telegram_bot.bot import telegram_bot
|
|
||||||
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
|
|
||||||
# Register Discord in the platform registry
|
|
||||||
platform_registry.register(discord_bot)
|
|
||||||
|
|
||||||
if settings.telegram_token:
|
|
||||||
try:
|
|
||||||
await telegram_bot.start()
|
|
||||||
logger.info("Telegram bot started")
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Failed to start Telegram bot: %s", exc)
|
|
||||||
else:
|
|
||||||
logger.debug("Telegram: no token configured, skipping")
|
|
||||||
|
|
||||||
if settings.discord_token or discord_bot.load_token():
|
|
||||||
try:
|
|
||||||
await discord_bot.start()
|
|
||||||
logger.info("Discord bot started")
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Failed to start Discord bot: %s", exc)
|
|
||||||
else:
|
|
||||||
logger.debug("Discord: no token configured, skipping")
|
|
||||||
|
|
||||||
# If Discord isn't connected yet, start a watcher that polls for the
|
|
||||||
# token to appear in the environment or .env file.
|
|
||||||
if discord_bot.state.name != "CONNECTED":
|
|
||||||
asyncio.create_task(_discord_token_watcher())
|
|
||||||
|
|
||||||
|
|
||||||
async def _discord_token_watcher() -> None:
|
|
||||||
"""Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot."""
|
|
||||||
from integrations.chat_bridge.vendors.discord import discord_bot
|
|
||||||
|
|
||||||
# Don't poll if discord.py isn't even installed
|
|
||||||
try:
|
|
||||||
import discord as _discord_check # noqa: F401
|
|
||||||
except ImportError:
|
|
||||||
logger.debug("discord.py not installed — token watcher exiting")
|
|
||||||
return
|
|
||||||
|
|
||||||
while True:
|
|
||||||
await asyncio.sleep(30)
|
|
||||||
|
|
||||||
if discord_bot.state.name == "CONNECTED":
|
|
||||||
return # Already running — stop watching
|
|
||||||
|
|
||||||
# 1. Check settings (pydantic-settings reads env on instantiation;
|
|
||||||
# hot-reload is handled by re-reading .env below)
|
|
||||||
token = settings.discord_token
|
|
||||||
|
|
||||||
# 2. Re-read .env file for hot-reload
|
|
||||||
if not token:
|
|
||||||
try:
|
|
||||||
from dotenv import dotenv_values
|
|
||||||
|
|
||||||
env_path = Path(settings.repo_root) / ".env"
|
|
||||||
if env_path.exists():
|
|
||||||
vals = dotenv_values(env_path)
|
|
||||||
token = vals.get("DISCORD_TOKEN", "")
|
|
||||||
except ImportError:
|
|
||||||
pass # python-dotenv not installed
|
|
||||||
|
|
||||||
# 3. Check state file (written by /discord/setup)
|
|
||||||
if not token:
|
|
||||||
token = discord_bot.load_token() or ""
|
|
||||||
|
|
||||||
if token:
|
|
||||||
try:
|
|
||||||
logger.info(
|
|
||||||
"Discord watcher: token found, attempting start (state=%s)",
|
|
||||||
discord_bot.state.name,
|
|
||||||
)
|
|
||||||
success = await discord_bot.start(token=token)
|
|
||||||
if success:
|
|
||||||
logger.info("Discord bot auto-started (token detected)")
|
|
||||||
return # Done — stop watching
|
|
||||||
logger.warning(
|
|
||||||
"Discord watcher: start() returned False (state=%s)",
|
|
||||||
discord_bot.state.name,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Discord auto-start failed: %s", exc)
|
|
||||||
|
|
||||||
|
|
||||||
def _startup_init() -> None:
|
|
||||||
"""Validate config and enable event persistence."""
|
|
||||||
from config import validate_startup
|
|
||||||
|
|
||||||
validate_startup()
|
|
||||||
|
|
||||||
from infrastructure.events.bus import init_event_bus_persistence
|
|
||||||
|
|
||||||
init_event_bus_persistence()
|
|
||||||
|
|
||||||
from spark.engine import get_spark_engine
|
|
||||||
|
|
||||||
if get_spark_engine().enabled:
|
|
||||||
logger.info("Spark Intelligence active — event capture enabled")
|
|
||||||
|
|
||||||
|
|
||||||
def _startup_background_tasks() -> list[asyncio.Task]:
|
|
||||||
"""Spawn all recurring background tasks (non-blocking)."""
|
|
||||||
bg_tasks = [
|
|
||||||
asyncio.create_task(_briefing_scheduler()),
|
|
||||||
asyncio.create_task(_thinking_scheduler()),
|
|
||||||
asyncio.create_task(_loop_qa_scheduler()),
|
|
||||||
asyncio.create_task(_presence_watcher()),
|
|
||||||
asyncio.create_task(_start_chat_integrations_background()),
|
|
||||||
asyncio.create_task(_hermes_scheduler()),
|
|
||||||
]
|
|
||||||
try:
|
|
||||||
from timmy.paperclip import start_paperclip_poller
|
|
||||||
|
|
||||||
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
|
|
||||||
logger.info("Paperclip poller started")
|
|
||||||
except ImportError:
|
|
||||||
logger.debug("Paperclip module not found, skipping poller")
|
|
||||||
|
|
||||||
return bg_tasks
|
|
||||||
|
|
||||||
|
|
||||||
def _try_prune(label: str, prune_fn, days: int) -> None:
|
|
||||||
"""Run a prune function, log results, swallow errors."""
|
|
||||||
try:
|
|
||||||
pruned = prune_fn()
|
|
||||||
if pruned:
|
|
||||||
logger.info(
|
|
||||||
"%s auto-prune: removed %d entries older than %d days",
|
|
||||||
label,
|
|
||||||
pruned,
|
|
||||||
days,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("%s auto-prune skipped: %s", label, exc)
|
|
||||||
|
|
||||||
|
|
||||||
def _check_vault_size() -> None:
|
|
||||||
"""Warn if the memory vault exceeds the configured size limit."""
|
|
||||||
try:
|
|
||||||
vault_path = Path(settings.repo_root) / "memory" / "notes"
|
|
||||||
if vault_path.exists():
|
|
||||||
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
|
|
||||||
total_mb = total_bytes / (1024 * 1024)
|
|
||||||
if total_mb > settings.memory_vault_max_mb:
|
|
||||||
logger.warning(
|
|
||||||
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
|
|
||||||
total_mb,
|
|
||||||
settings.memory_vault_max_mb,
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("Vault size check skipped: %s", exc)
|
|
||||||
|
|
||||||
|
|
||||||
def _startup_pruning() -> None:
|
|
||||||
"""Auto-prune old memories, thoughts, and events on startup."""
|
|
||||||
if settings.memory_prune_days > 0:
|
|
||||||
from timmy.memory_system import prune_memories
|
|
||||||
|
|
||||||
_try_prune(
|
|
||||||
"Memory",
|
|
||||||
lambda: prune_memories(
|
|
||||||
older_than_days=settings.memory_prune_days,
|
|
||||||
keep_facts=settings.memory_prune_keep_facts,
|
|
||||||
),
|
|
||||||
settings.memory_prune_days,
|
|
||||||
)
|
|
||||||
|
|
||||||
if settings.thoughts_prune_days > 0:
|
|
||||||
from timmy.thinking import thinking_engine
|
|
||||||
|
|
||||||
_try_prune(
|
|
||||||
"Thought",
|
|
||||||
lambda: thinking_engine.prune_old_thoughts(
|
|
||||||
keep_days=settings.thoughts_prune_days,
|
|
||||||
keep_min=settings.thoughts_prune_keep_min,
|
|
||||||
),
|
|
||||||
settings.thoughts_prune_days,
|
|
||||||
)
|
|
||||||
|
|
||||||
if settings.events_prune_days > 0:
|
|
||||||
from swarm.event_log import prune_old_events
|
|
||||||
|
|
||||||
_try_prune(
|
|
||||||
"Event",
|
|
||||||
lambda: prune_old_events(
|
|
||||||
keep_days=settings.events_prune_days,
|
|
||||||
keep_min=settings.events_prune_keep_min,
|
|
||||||
),
|
|
||||||
settings.events_prune_days,
|
|
||||||
)
|
|
||||||
|
|
||||||
if settings.memory_vault_max_mb > 0:
|
|
||||||
_check_vault_size()
|
|
||||||
|
|
||||||
|
|
||||||
async def _shutdown_cleanup(
|
|
||||||
bg_tasks: list[asyncio.Task],
|
|
||||||
workshop_heartbeat,
|
|
||||||
) -> None:
|
|
||||||
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
|
|
||||||
from integrations.chat_bridge.vendors.discord import discord_bot
|
|
||||||
from integrations.telegram_bot.bot import telegram_bot
|
|
||||||
|
|
||||||
await discord_bot.stop()
|
|
||||||
await telegram_bot.stop()
|
|
||||||
|
|
||||||
try:
|
|
||||||
from timmy.mcp_tools import close_mcp_sessions
|
|
||||||
|
|
||||||
await close_mcp_sessions()
|
|
||||||
except Exception as exc:
|
|
||||||
logger.debug("MCP shutdown: %s", exc)
|
|
||||||
|
|
||||||
await workshop_heartbeat.stop()
|
|
||||||
|
|
||||||
for task in bg_tasks:
|
|
||||||
task.cancel()
|
|
||||||
try:
|
|
||||||
await task
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def lifespan(app: FastAPI):
|
|
||||||
"""Application lifespan manager with non-blocking startup."""
|
|
||||||
_startup_init()
|
|
||||||
bg_tasks = _startup_background_tasks()
|
|
||||||
_startup_pruning()
|
|
||||||
|
|
||||||
# Start Workshop presence heartbeat with WS relay
|
|
||||||
from dashboard.routes.world import broadcast_world_state
|
|
||||||
from timmy.workshop_state import WorkshopHeartbeat
|
|
||||||
|
|
||||||
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
|
||||||
await workshop_heartbeat.start()
|
|
||||||
|
|
||||||
# Register session logger with error capture
|
|
||||||
try:
|
|
||||||
from infrastructure.error_capture import register_error_recorder
|
|
||||||
from timmy.session_logger import get_session_logger
|
|
||||||
|
|
||||||
register_error_recorder(get_session_logger().record_error)
|
|
||||||
except Exception:
|
|
||||||
logger.debug("Failed to register error recorder")
|
|
||||||
|
|
||||||
# Mark session start for sovereignty duration tracking
|
|
||||||
try:
|
|
||||||
from timmy.sovereignty import mark_session_start
|
|
||||||
|
|
||||||
mark_session_start()
|
|
||||||
except Exception:
|
|
||||||
logger.debug("Failed to mark sovereignty session start")
|
|
||||||
|
|
||||||
logger.info("✓ Dashboard ready for requests")
|
|
||||||
|
|
||||||
yield
|
|
||||||
|
|
||||||
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
|
|
||||||
|
|
||||||
# Generate and commit sovereignty session report
|
|
||||||
try:
|
|
||||||
from timmy.sovereignty import generate_and_commit_report
|
|
||||||
|
|
||||||
await generate_and_commit_report()
|
|
||||||
except Exception as exc:
|
|
||||||
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
|
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="Mission Control",
|
title="Mission Control",
|
||||||
|
|||||||
278
src/dashboard/schedulers.py
Normal file
278
src/dashboard/schedulers.py
Normal file
@@ -0,0 +1,278 @@
|
|||||||
|
"""Background scheduler coroutines for the Timmy dashboard."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
from timmy.workshop_state import PRESENCE_FILE
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"_BRIEFING_INTERVAL_HOURS",
|
||||||
|
"_briefing_scheduler",
|
||||||
|
"_thinking_scheduler",
|
||||||
|
"_hermes_scheduler",
|
||||||
|
"_loop_qa_scheduler",
|
||||||
|
"_PRESENCE_POLL_SECONDS",
|
||||||
|
"_PRESENCE_INITIAL_DELAY",
|
||||||
|
"_SYNTHESIZED_STATE",
|
||||||
|
"_presence_watcher",
|
||||||
|
"_start_chat_integrations_background",
|
||||||
|
"_discord_token_watcher",
|
||||||
|
]
|
||||||
|
|
||||||
|
_BRIEFING_INTERVAL_HOURS = 6
|
||||||
|
|
||||||
|
|
||||||
|
async def _briefing_scheduler() -> None:
|
||||||
|
"""Background task: regenerate Timmy's briefing every 6 hours."""
|
||||||
|
from infrastructure.notifications.push import notify_briefing_ready
|
||||||
|
from timmy.briefing import engine as briefing_engine
|
||||||
|
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if briefing_engine.needs_refresh():
|
||||||
|
logger.info("Generating morning briefing…")
|
||||||
|
briefing = briefing_engine.generate()
|
||||||
|
await notify_briefing_ready(briefing)
|
||||||
|
else:
|
||||||
|
logger.info("Briefing is fresh; skipping generation.")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Briefing scheduler error: %s", exc)
|
||||||
|
|
||||||
|
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
|
||||||
|
|
||||||
|
|
||||||
|
async def _thinking_scheduler() -> None:
|
||||||
|
"""Background task: execute Timmy's thinking cycle every N seconds."""
|
||||||
|
from timmy.thinking import thinking_engine
|
||||||
|
|
||||||
|
await asyncio.sleep(5) # Stagger after briefing scheduler
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if settings.thinking_enabled:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
thinking_engine.think_once(),
|
||||||
|
timeout=settings.thinking_timeout_seconds,
|
||||||
|
)
|
||||||
|
except TimeoutError:
|
||||||
|
logger.warning(
|
||||||
|
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
|
||||||
|
settings.thinking_timeout_seconds,
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Thinking scheduler error: %s", exc)
|
||||||
|
|
||||||
|
await asyncio.sleep(settings.thinking_interval_seconds)
|
||||||
|
|
||||||
|
|
||||||
|
async def _hermes_scheduler() -> None:
|
||||||
|
"""Background task: Hermes system health monitor, runs every 5 minutes.
|
||||||
|
|
||||||
|
Checks memory, disk, Ollama, processes, and network.
|
||||||
|
Auto-resolves what it can; fires push notifications when human help is needed.
|
||||||
|
"""
|
||||||
|
from infrastructure.hermes.monitor import hermes_monitor
|
||||||
|
|
||||||
|
await asyncio.sleep(20) # Stagger after other schedulers
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if settings.hermes_enabled:
|
||||||
|
report = await hermes_monitor.run_cycle()
|
||||||
|
if report.has_issues:
|
||||||
|
logger.warning(
|
||||||
|
"Hermes health issues detected — overall: %s",
|
||||||
|
report.overall.value,
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Hermes scheduler error: %s", exc)
|
||||||
|
|
||||||
|
await asyncio.sleep(settings.hermes_interval_seconds)
|
||||||
|
|
||||||
|
|
||||||
|
async def _loop_qa_scheduler() -> None:
|
||||||
|
"""Background task: run capability self-tests on a separate timer.
|
||||||
|
|
||||||
|
Independent of the thinking loop — runs every N thinking ticks
|
||||||
|
to probe subsystems and detect degradation.
|
||||||
|
"""
|
||||||
|
from timmy.loop_qa import loop_qa_orchestrator
|
||||||
|
|
||||||
|
await asyncio.sleep(10) # Stagger after thinking scheduler
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if settings.loop_qa_enabled:
|
||||||
|
result = await asyncio.wait_for(
|
||||||
|
loop_qa_orchestrator.run_next_test(),
|
||||||
|
timeout=settings.thinking_timeout_seconds,
|
||||||
|
)
|
||||||
|
if result:
|
||||||
|
status = "PASS" if result["success"] else "FAIL"
|
||||||
|
logger.info(
|
||||||
|
"Loop QA [%s]: %s — %s",
|
||||||
|
result["capability"],
|
||||||
|
status,
|
||||||
|
result.get("details", "")[:80],
|
||||||
|
)
|
||||||
|
except TimeoutError:
|
||||||
|
logger.warning(
|
||||||
|
"Loop QA test timed out after %ds",
|
||||||
|
settings.thinking_timeout_seconds,
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as exc:
|
||||||
|
logger.error("Loop QA scheduler error: %s", exc)
|
||||||
|
|
||||||
|
interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks
|
||||||
|
await asyncio.sleep(interval)
|
||||||
|
|
||||||
|
|
||||||
|
_PRESENCE_POLL_SECONDS = 30
|
||||||
|
_PRESENCE_INITIAL_DELAY = 3
|
||||||
|
|
||||||
|
_SYNTHESIZED_STATE: dict = {
|
||||||
|
"version": 1,
|
||||||
|
"liveness": None,
|
||||||
|
"current_focus": "",
|
||||||
|
"mood": "idle",
|
||||||
|
"active_threads": [],
|
||||||
|
"recent_events": [],
|
||||||
|
"concerns": [],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def _presence_watcher() -> None:
|
||||||
|
"""Background task: watch ~/.timmy/presence.json and broadcast changes via WS.
|
||||||
|
|
||||||
|
Polls the file every 30 seconds (matching Timmy's write cadence).
|
||||||
|
If the file doesn't exist, broadcasts a synthesised idle state.
|
||||||
|
"""
|
||||||
|
from infrastructure.ws_manager.handler import ws_manager as ws_mgr
|
||||||
|
|
||||||
|
await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers
|
||||||
|
|
||||||
|
last_mtime: float = 0.0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
if PRESENCE_FILE.exists():
|
||||||
|
mtime = PRESENCE_FILE.stat().st_mtime
|
||||||
|
if mtime != last_mtime:
|
||||||
|
last_mtime = mtime
|
||||||
|
raw = await asyncio.to_thread(PRESENCE_FILE.read_text)
|
||||||
|
state = json.loads(raw)
|
||||||
|
await ws_mgr.broadcast("timmy_state", state)
|
||||||
|
else:
|
||||||
|
# File absent — broadcast synthesised state once per cycle
|
||||||
|
if last_mtime != -1.0:
|
||||||
|
last_mtime = -1.0
|
||||||
|
await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE)
|
||||||
|
except json.JSONDecodeError as exc:
|
||||||
|
logger.warning("presence.json parse error: %s", exc)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Presence watcher error: %s", exc)
|
||||||
|
|
||||||
|
await asyncio.sleep(_PRESENCE_POLL_SECONDS)
|
||||||
|
|
||||||
|
|
||||||
|
async def _start_chat_integrations_background() -> None:
|
||||||
|
"""Background task: start chat integrations without blocking startup."""
|
||||||
|
from integrations.chat_bridge.registry import platform_registry
|
||||||
|
from integrations.chat_bridge.vendors.discord import discord_bot
|
||||||
|
from integrations.telegram_bot.bot import telegram_bot
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
|
# Register Discord in the platform registry
|
||||||
|
platform_registry.register(discord_bot)
|
||||||
|
|
||||||
|
if settings.telegram_token:
|
||||||
|
try:
|
||||||
|
await telegram_bot.start()
|
||||||
|
logger.info("Telegram bot started")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to start Telegram bot: %s", exc)
|
||||||
|
else:
|
||||||
|
logger.debug("Telegram: no token configured, skipping")
|
||||||
|
|
||||||
|
if settings.discord_token or discord_bot.load_token():
|
||||||
|
try:
|
||||||
|
await discord_bot.start()
|
||||||
|
logger.info("Discord bot started")
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Failed to start Discord bot: %s", exc)
|
||||||
|
else:
|
||||||
|
logger.debug("Discord: no token configured, skipping")
|
||||||
|
|
||||||
|
# If Discord isn't connected yet, start a watcher that polls for the
|
||||||
|
# token to appear in the environment or .env file.
|
||||||
|
if discord_bot.state.name != "CONNECTED":
|
||||||
|
asyncio.create_task(_discord_token_watcher())
|
||||||
|
|
||||||
|
|
||||||
|
async def _discord_token_watcher() -> None:
|
||||||
|
"""Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot."""
|
||||||
|
from integrations.chat_bridge.vendors.discord import discord_bot
|
||||||
|
|
||||||
|
# Don't poll if discord.py isn't even installed
|
||||||
|
try:
|
||||||
|
import discord as _discord_check # noqa: F401
|
||||||
|
except ImportError:
|
||||||
|
logger.debug("discord.py not installed — token watcher exiting")
|
||||||
|
return
|
||||||
|
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
|
||||||
|
if discord_bot.state.name == "CONNECTED":
|
||||||
|
return # Already running — stop watching
|
||||||
|
|
||||||
|
# 1. Check settings (pydantic-settings reads env on instantiation;
|
||||||
|
# hot-reload is handled by re-reading .env below)
|
||||||
|
token = settings.discord_token
|
||||||
|
|
||||||
|
# 2. Re-read .env file for hot-reload
|
||||||
|
if not token:
|
||||||
|
try:
|
||||||
|
from dotenv import dotenv_values
|
||||||
|
|
||||||
|
env_path = Path(settings.repo_root) / ".env"
|
||||||
|
if env_path.exists():
|
||||||
|
vals = dotenv_values(env_path)
|
||||||
|
token = vals.get("DISCORD_TOKEN", "")
|
||||||
|
except ImportError:
|
||||||
|
pass # python-dotenv not installed
|
||||||
|
|
||||||
|
# 3. Check state file (written by /discord/setup)
|
||||||
|
if not token:
|
||||||
|
token = discord_bot.load_token() or ""
|
||||||
|
|
||||||
|
if token:
|
||||||
|
try:
|
||||||
|
logger.info(
|
||||||
|
"Discord watcher: token found, attempting start (state=%s)",
|
||||||
|
discord_bot.state.name,
|
||||||
|
)
|
||||||
|
success = await discord_bot.start(token=token)
|
||||||
|
if success:
|
||||||
|
logger.info("Discord bot auto-started (token detected)")
|
||||||
|
return # Done — stop watching
|
||||||
|
logger.warning(
|
||||||
|
"Discord watcher: start() returned False (state=%s)",
|
||||||
|
discord_bot.state.name,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Discord auto-start failed: %s", exc)
|
||||||
205
src/dashboard/startup.py
Normal file
205
src/dashboard/startup.py
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
"""Application lifecycle management — startup, shutdown, and background task orchestration."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from fastapi import FastAPI
|
||||||
|
|
||||||
|
from config import settings
|
||||||
|
from dashboard.schedulers import (
|
||||||
|
_briefing_scheduler,
|
||||||
|
_hermes_scheduler,
|
||||||
|
_loop_qa_scheduler,
|
||||||
|
_presence_watcher,
|
||||||
|
_start_chat_integrations_background,
|
||||||
|
_thinking_scheduler,
|
||||||
|
)
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def _startup_init() -> None:
|
||||||
|
"""Validate config and enable event persistence."""
|
||||||
|
from config import validate_startup
|
||||||
|
|
||||||
|
validate_startup()
|
||||||
|
|
||||||
|
from infrastructure.events.bus import init_event_bus_persistence
|
||||||
|
|
||||||
|
init_event_bus_persistence()
|
||||||
|
|
||||||
|
from spark.engine import get_spark_engine
|
||||||
|
|
||||||
|
if get_spark_engine().enabled:
|
||||||
|
logger.info("Spark Intelligence active — event capture enabled")
|
||||||
|
|
||||||
|
|
||||||
|
def _startup_background_tasks() -> list[asyncio.Task]:
|
||||||
|
"""Spawn all recurring background tasks (non-blocking)."""
|
||||||
|
bg_tasks = [
|
||||||
|
asyncio.create_task(_briefing_scheduler()),
|
||||||
|
asyncio.create_task(_thinking_scheduler()),
|
||||||
|
asyncio.create_task(_loop_qa_scheduler()),
|
||||||
|
asyncio.create_task(_presence_watcher()),
|
||||||
|
asyncio.create_task(_start_chat_integrations_background()),
|
||||||
|
asyncio.create_task(_hermes_scheduler()),
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
from timmy.paperclip import start_paperclip_poller
|
||||||
|
|
||||||
|
bg_tasks.append(asyncio.create_task(start_paperclip_poller()))
|
||||||
|
logger.info("Paperclip poller started")
|
||||||
|
except ImportError:
|
||||||
|
logger.debug("Paperclip module not found, skipping poller")
|
||||||
|
|
||||||
|
return bg_tasks
|
||||||
|
|
||||||
|
|
||||||
|
def _try_prune(label: str, prune_fn, days: int) -> None:
|
||||||
|
"""Run a prune function, log results, swallow errors."""
|
||||||
|
try:
|
||||||
|
pruned = prune_fn()
|
||||||
|
if pruned:
|
||||||
|
logger.info(
|
||||||
|
"%s auto-prune: removed %d entries older than %d days",
|
||||||
|
label,
|
||||||
|
pruned,
|
||||||
|
days,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("%s auto-prune skipped: %s", label, exc)
|
||||||
|
|
||||||
|
|
||||||
|
def _check_vault_size() -> None:
|
||||||
|
"""Warn if the memory vault exceeds the configured size limit."""
|
||||||
|
try:
|
||||||
|
vault_path = Path(settings.repo_root) / "memory" / "notes"
|
||||||
|
if vault_path.exists():
|
||||||
|
total_bytes = sum(f.stat().st_size for f in vault_path.rglob("*") if f.is_file())
|
||||||
|
total_mb = total_bytes / (1024 * 1024)
|
||||||
|
if total_mb > settings.memory_vault_max_mb:
|
||||||
|
logger.warning(
|
||||||
|
"Memory vault (%.1f MB) exceeds limit (%d MB) — consider archiving old notes",
|
||||||
|
total_mb,
|
||||||
|
settings.memory_vault_max_mb,
|
||||||
|
)
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("Vault size check skipped: %s", exc)
|
||||||
|
|
||||||
|
|
||||||
|
def _startup_pruning() -> None:
|
||||||
|
"""Auto-prune old memories, thoughts, and events on startup."""
|
||||||
|
if settings.memory_prune_days > 0:
|
||||||
|
from timmy.memory_system import prune_memories
|
||||||
|
|
||||||
|
_try_prune(
|
||||||
|
"Memory",
|
||||||
|
lambda: prune_memories(
|
||||||
|
older_than_days=settings.memory_prune_days,
|
||||||
|
keep_facts=settings.memory_prune_keep_facts,
|
||||||
|
),
|
||||||
|
settings.memory_prune_days,
|
||||||
|
)
|
||||||
|
|
||||||
|
if settings.thoughts_prune_days > 0:
|
||||||
|
from timmy.thinking import thinking_engine
|
||||||
|
|
||||||
|
_try_prune(
|
||||||
|
"Thought",
|
||||||
|
lambda: thinking_engine.prune_old_thoughts(
|
||||||
|
keep_days=settings.thoughts_prune_days,
|
||||||
|
keep_min=settings.thoughts_prune_keep_min,
|
||||||
|
),
|
||||||
|
settings.thoughts_prune_days,
|
||||||
|
)
|
||||||
|
|
||||||
|
if settings.events_prune_days > 0:
|
||||||
|
from swarm.event_log import prune_old_events
|
||||||
|
|
||||||
|
_try_prune(
|
||||||
|
"Event",
|
||||||
|
lambda: prune_old_events(
|
||||||
|
keep_days=settings.events_prune_days,
|
||||||
|
keep_min=settings.events_prune_keep_min,
|
||||||
|
),
|
||||||
|
settings.events_prune_days,
|
||||||
|
)
|
||||||
|
|
||||||
|
if settings.memory_vault_max_mb > 0:
|
||||||
|
_check_vault_size()
|
||||||
|
|
||||||
|
|
||||||
|
async def _shutdown_cleanup(
|
||||||
|
bg_tasks: list[asyncio.Task],
|
||||||
|
workshop_heartbeat,
|
||||||
|
) -> None:
|
||||||
|
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
|
||||||
|
from integrations.chat_bridge.vendors.discord import discord_bot
|
||||||
|
from integrations.telegram_bot.bot import telegram_bot
|
||||||
|
|
||||||
|
await discord_bot.stop()
|
||||||
|
await telegram_bot.stop()
|
||||||
|
|
||||||
|
try:
|
||||||
|
from timmy.mcp_tools import close_mcp_sessions
|
||||||
|
|
||||||
|
await close_mcp_sessions()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("MCP shutdown: %s", exc)
|
||||||
|
|
||||||
|
await workshop_heartbeat.stop()
|
||||||
|
|
||||||
|
for task in bg_tasks:
|
||||||
|
task.cancel()
|
||||||
|
try:
|
||||||
|
await task
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def lifespan(app: FastAPI):
|
||||||
|
"""Application lifespan manager with non-blocking startup."""
|
||||||
|
_startup_init()
|
||||||
|
bg_tasks = _startup_background_tasks()
|
||||||
|
_startup_pruning()
|
||||||
|
|
||||||
|
# Start Workshop presence heartbeat with WS relay
|
||||||
|
from dashboard.routes.world import broadcast_world_state
|
||||||
|
from timmy.workshop_state import WorkshopHeartbeat
|
||||||
|
|
||||||
|
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
||||||
|
await workshop_heartbeat.start()
|
||||||
|
|
||||||
|
# Register session logger with error capture
|
||||||
|
try:
|
||||||
|
from infrastructure.error_capture import register_error_recorder
|
||||||
|
from timmy.session_logger import get_session_logger
|
||||||
|
|
||||||
|
register_error_recorder(get_session_logger().record_error)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Failed to register error recorder")
|
||||||
|
|
||||||
|
# Mark session start for sovereignty duration tracking
|
||||||
|
try:
|
||||||
|
from timmy.sovereignty import mark_session_start
|
||||||
|
|
||||||
|
mark_session_start()
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Failed to mark sovereignty session start")
|
||||||
|
|
||||||
|
logger.info("✓ Dashboard ready for requests")
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
|
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
|
||||||
|
|
||||||
|
# Generate and commit sovereignty session report
|
||||||
|
try:
|
||||||
|
from timmy.sovereignty import generate_and_commit_report
|
||||||
|
|
||||||
|
await generate_and_commit_report()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Sovereignty report generation failed at shutdown: %s", exc)
|
||||||
Reference in New Issue
Block a user