forked from Rockachopa/Timmy-time-dashboard
279 lines
9.3 KiB
Python
279 lines
9.3 KiB
Python
"""Background scheduler coroutines for the Timmy dashboard."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from config import settings
|
|
from timmy.workshop_state import PRESENCE_FILE
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
__all__ = [
|
|
"_BRIEFING_INTERVAL_HOURS",
|
|
"_briefing_scheduler",
|
|
"_thinking_scheduler",
|
|
"_hermes_scheduler",
|
|
"_loop_qa_scheduler",
|
|
"_PRESENCE_POLL_SECONDS",
|
|
"_PRESENCE_INITIAL_DELAY",
|
|
"_SYNTHESIZED_STATE",
|
|
"_presence_watcher",
|
|
"_start_chat_integrations_background",
|
|
"_discord_token_watcher",
|
|
]
|
|
|
|
_BRIEFING_INTERVAL_HOURS = 6
|
|
|
|
|
|
async def _briefing_scheduler() -> None:
|
|
"""Background task: regenerate Timmy's briefing every 6 hours."""
|
|
from infrastructure.notifications.push import notify_briefing_ready
|
|
from timmy.briefing import engine as briefing_engine
|
|
|
|
await asyncio.sleep(2)
|
|
|
|
while True:
|
|
try:
|
|
if briefing_engine.needs_refresh():
|
|
logger.info("Generating morning briefing…")
|
|
briefing = briefing_engine.generate()
|
|
await notify_briefing_ready(briefing)
|
|
else:
|
|
logger.info("Briefing is fresh; skipping generation.")
|
|
except Exception as exc:
|
|
logger.error("Briefing scheduler error: %s", exc)
|
|
|
|
await asyncio.sleep(_BRIEFING_INTERVAL_HOURS * 3600)
|
|
|
|
|
|
async def _thinking_scheduler() -> None:
|
|
"""Background task: execute Timmy's thinking cycle every N seconds."""
|
|
from timmy.thinking import thinking_engine
|
|
|
|
await asyncio.sleep(5) # Stagger after briefing scheduler
|
|
|
|
while True:
|
|
try:
|
|
if settings.thinking_enabled:
|
|
await asyncio.wait_for(
|
|
thinking_engine.think_once(),
|
|
timeout=settings.thinking_timeout_seconds,
|
|
)
|
|
except TimeoutError:
|
|
logger.warning(
|
|
"Thinking cycle timed out after %ds — Ollama may be unresponsive",
|
|
settings.thinking_timeout_seconds,
|
|
)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
logger.error("Thinking scheduler error: %s", exc)
|
|
|
|
await asyncio.sleep(settings.thinking_interval_seconds)
|
|
|
|
|
|
async def _hermes_scheduler() -> None:
|
|
"""Background task: Hermes system health monitor, runs every 5 minutes.
|
|
|
|
Checks memory, disk, Ollama, processes, and network.
|
|
Auto-resolves what it can; fires push notifications when human help is needed.
|
|
"""
|
|
from infrastructure.hermes.monitor import hermes_monitor
|
|
|
|
await asyncio.sleep(20) # Stagger after other schedulers
|
|
|
|
while True:
|
|
try:
|
|
if settings.hermes_enabled:
|
|
report = await hermes_monitor.run_cycle()
|
|
if report.has_issues:
|
|
logger.warning(
|
|
"Hermes health issues detected — overall: %s",
|
|
report.overall.value,
|
|
)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
logger.error("Hermes scheduler error: %s", exc)
|
|
|
|
await asyncio.sleep(settings.hermes_interval_seconds)
|
|
|
|
|
|
async def _loop_qa_scheduler() -> None:
|
|
"""Background task: run capability self-tests on a separate timer.
|
|
|
|
Independent of the thinking loop — runs every N thinking ticks
|
|
to probe subsystems and detect degradation.
|
|
"""
|
|
from timmy.loop_qa import loop_qa_orchestrator
|
|
|
|
await asyncio.sleep(10) # Stagger after thinking scheduler
|
|
|
|
while True:
|
|
try:
|
|
if settings.loop_qa_enabled:
|
|
result = await asyncio.wait_for(
|
|
loop_qa_orchestrator.run_next_test(),
|
|
timeout=settings.thinking_timeout_seconds,
|
|
)
|
|
if result:
|
|
status = "PASS" if result["success"] else "FAIL"
|
|
logger.info(
|
|
"Loop QA [%s]: %s — %s",
|
|
result["capability"],
|
|
status,
|
|
result.get("details", "")[:80],
|
|
)
|
|
except TimeoutError:
|
|
logger.warning(
|
|
"Loop QA test timed out after %ds",
|
|
settings.thinking_timeout_seconds,
|
|
)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc:
|
|
logger.error("Loop QA scheduler error: %s", exc)
|
|
|
|
interval = settings.thinking_interval_seconds * settings.loop_qa_interval_ticks
|
|
await asyncio.sleep(interval)
|
|
|
|
|
|
_PRESENCE_POLL_SECONDS = 30
|
|
_PRESENCE_INITIAL_DELAY = 3
|
|
|
|
_SYNTHESIZED_STATE: dict = {
|
|
"version": 1,
|
|
"liveness": None,
|
|
"current_focus": "",
|
|
"mood": "idle",
|
|
"active_threads": [],
|
|
"recent_events": [],
|
|
"concerns": [],
|
|
}
|
|
|
|
|
|
async def _presence_watcher() -> None:
|
|
"""Background task: watch ~/.timmy/presence.json and broadcast changes via WS.
|
|
|
|
Polls the file every 30 seconds (matching Timmy's write cadence).
|
|
If the file doesn't exist, broadcasts a synthesised idle state.
|
|
"""
|
|
from infrastructure.ws_manager.handler import ws_manager as ws_mgr
|
|
|
|
await asyncio.sleep(_PRESENCE_INITIAL_DELAY) # Stagger after other schedulers
|
|
|
|
last_mtime: float = 0.0
|
|
|
|
while True:
|
|
try:
|
|
if PRESENCE_FILE.exists():
|
|
mtime = PRESENCE_FILE.stat().st_mtime
|
|
if mtime != last_mtime:
|
|
last_mtime = mtime
|
|
raw = await asyncio.to_thread(PRESENCE_FILE.read_text)
|
|
state = json.loads(raw)
|
|
await ws_mgr.broadcast("timmy_state", state)
|
|
else:
|
|
# File absent — broadcast synthesised state once per cycle
|
|
if last_mtime != -1.0:
|
|
last_mtime = -1.0
|
|
await ws_mgr.broadcast("timmy_state", _SYNTHESIZED_STATE)
|
|
except json.JSONDecodeError as exc:
|
|
logger.warning("presence.json parse error: %s", exc)
|
|
except Exception as exc:
|
|
logger.warning("Presence watcher error: %s", exc)
|
|
|
|
await asyncio.sleep(_PRESENCE_POLL_SECONDS)
|
|
|
|
|
|
async def _start_chat_integrations_background() -> None:
|
|
"""Background task: start chat integrations without blocking startup."""
|
|
from integrations.chat_bridge.registry import platform_registry
|
|
from integrations.chat_bridge.vendors.discord import discord_bot
|
|
from integrations.telegram_bot.bot import telegram_bot
|
|
|
|
await asyncio.sleep(0.5)
|
|
|
|
# Register Discord in the platform registry
|
|
platform_registry.register(discord_bot)
|
|
|
|
if settings.telegram_token:
|
|
try:
|
|
await telegram_bot.start()
|
|
logger.info("Telegram bot started")
|
|
except Exception as exc:
|
|
logger.warning("Failed to start Telegram bot: %s", exc)
|
|
else:
|
|
logger.debug("Telegram: no token configured, skipping")
|
|
|
|
if settings.discord_token or discord_bot.load_token():
|
|
try:
|
|
await discord_bot.start()
|
|
logger.info("Discord bot started")
|
|
except Exception as exc:
|
|
logger.warning("Failed to start Discord bot: %s", exc)
|
|
else:
|
|
logger.debug("Discord: no token configured, skipping")
|
|
|
|
# If Discord isn't connected yet, start a watcher that polls for the
|
|
# token to appear in the environment or .env file.
|
|
if discord_bot.state.name != "CONNECTED":
|
|
asyncio.create_task(_discord_token_watcher())
|
|
|
|
|
|
async def _discord_token_watcher() -> None:
|
|
"""Poll for DISCORD_TOKEN appearing in env or .env and auto-start Discord bot."""
|
|
from integrations.chat_bridge.vendors.discord import discord_bot
|
|
|
|
# Don't poll if discord.py isn't even installed
|
|
try:
|
|
import discord as _discord_check # noqa: F401
|
|
except ImportError:
|
|
logger.debug("discord.py not installed — token watcher exiting")
|
|
return
|
|
|
|
while True:
|
|
await asyncio.sleep(30)
|
|
|
|
if discord_bot.state.name == "CONNECTED":
|
|
return # Already running — stop watching
|
|
|
|
# 1. Check settings (pydantic-settings reads env on instantiation;
|
|
# hot-reload is handled by re-reading .env below)
|
|
token = settings.discord_token
|
|
|
|
# 2. Re-read .env file for hot-reload
|
|
if not token:
|
|
try:
|
|
from dotenv import dotenv_values
|
|
|
|
env_path = Path(settings.repo_root) / ".env"
|
|
if env_path.exists():
|
|
vals = dotenv_values(env_path)
|
|
token = vals.get("DISCORD_TOKEN", "")
|
|
except ImportError:
|
|
pass # python-dotenv not installed
|
|
|
|
# 3. Check state file (written by /discord/setup)
|
|
if not token:
|
|
token = discord_bot.load_token() or ""
|
|
|
|
if token:
|
|
try:
|
|
logger.info(
|
|
"Discord watcher: token found, attempting start (state=%s)",
|
|
discord_bot.state.name,
|
|
)
|
|
success = await discord_bot.start(token=token)
|
|
if success:
|
|
logger.info("Discord bot auto-started (token detected)")
|
|
return # Done — stop watching
|
|
logger.warning(
|
|
"Discord watcher: start() returned False (state=%s)",
|
|
discord_bot.state.name,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Discord auto-start failed: %s", exc)
|