forked from Rockachopa/Timmy-time-dashboard
Compare commits
2 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3de7db770f | ||
| 9f2728f529 |
@@ -329,21 +329,35 @@ async def _discord_token_watcher() -> None:
|
||||
logger.warning("Discord auto-start failed: %s", exc)
|
||||
|
||||
|
||||
def _init_services() -> None:
|
||||
"""Validate config, enable event persistence, and init Spark engine."""
|
||||
def _startup_init() -> None:
|
||||
"""Validate config and enable event persistence."""
|
||||
from config import validate_startup
|
||||
from infrastructure.events.bus import init_event_bus_persistence
|
||||
from spark.engine import get_spark_engine
|
||||
|
||||
validate_startup()
|
||||
|
||||
from infrastructure.events.bus import init_event_bus_persistence
|
||||
|
||||
init_event_bus_persistence()
|
||||
|
||||
from spark.engine import get_spark_engine
|
||||
|
||||
if get_spark_engine().enabled:
|
||||
logger.info("Spark Intelligence active — event capture enabled")
|
||||
|
||||
|
||||
def _auto_prune() -> None:
|
||||
"""Run startup housekeeping: prune memories, thoughts, events, and check vault size."""
|
||||
def _startup_background_tasks() -> list[asyncio.Task]:
|
||||
"""Spawn all recurring background tasks (non-blocking)."""
|
||||
return [
|
||||
asyncio.create_task(_briefing_scheduler()),
|
||||
asyncio.create_task(_thinking_scheduler()),
|
||||
asyncio.create_task(_loop_qa_scheduler()),
|
||||
asyncio.create_task(_presence_watcher()),
|
||||
asyncio.create_task(_start_chat_integrations_background()),
|
||||
]
|
||||
|
||||
|
||||
def _startup_pruning() -> None:
|
||||
"""Auto-prune old memories, thoughts, and events on startup."""
|
||||
if settings.memory_prune_days > 0:
|
||||
try:
|
||||
from timmy.memory_system import prune_memories
|
||||
@@ -411,22 +425,11 @@ def _auto_prune() -> None:
|
||||
logger.debug("Vault size check skipped: %s", exc)
|
||||
|
||||
|
||||
def _register_error_recorder() -> None:
|
||||
"""Wire the session logger into the error-capture system."""
|
||||
try:
|
||||
from infrastructure.error_capture import register_error_recorder
|
||||
from timmy.session_logger import get_session_logger
|
||||
|
||||
register_error_recorder(get_session_logger().record_error)
|
||||
except Exception:
|
||||
logger.debug("Failed to register error recorder")
|
||||
|
||||
|
||||
async def _shutdown(
|
||||
tasks: list[asyncio.Task],
|
||||
workshop_heartbeat: object,
|
||||
async def _shutdown_cleanup(
|
||||
bg_tasks: list[asyncio.Task],
|
||||
workshop_heartbeat,
|
||||
) -> None:
|
||||
"""Stop integrations, close sessions, and cancel background tasks."""
|
||||
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
|
||||
from integrations.chat_bridge.vendors.discord import discord_bot
|
||||
from integrations.telegram_bot.bot import telegram_bot
|
||||
|
||||
@@ -440,9 +443,9 @@ async def _shutdown(
|
||||
except Exception as exc:
|
||||
logger.debug("MCP shutdown: %s", exc)
|
||||
|
||||
await workshop_heartbeat.stop() # type: ignore[union-attr]
|
||||
await workshop_heartbeat.stop()
|
||||
|
||||
for task in tasks:
|
||||
for task in bg_tasks:
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
@@ -453,16 +456,9 @@ async def _shutdown(
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Application lifespan manager with non-blocking startup."""
|
||||
_init_services()
|
||||
_auto_prune()
|
||||
|
||||
# Create all background tasks without waiting for them
|
||||
bg_tasks = [
|
||||
asyncio.create_task(_briefing_scheduler()),
|
||||
asyncio.create_task(_thinking_scheduler()),
|
||||
asyncio.create_task(_loop_qa_scheduler()),
|
||||
asyncio.create_task(_presence_watcher()),
|
||||
]
|
||||
_startup_init()
|
||||
bg_tasks = _startup_background_tasks()
|
||||
_startup_pruning()
|
||||
|
||||
# Start Workshop presence heartbeat with WS relay
|
||||
from dashboard.routes.world import broadcast_world_state
|
||||
@@ -471,15 +467,20 @@ async def lifespan(app: FastAPI):
|
||||
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
|
||||
await workshop_heartbeat.start()
|
||||
|
||||
# Start chat integrations in background
|
||||
bg_tasks.append(asyncio.create_task(_start_chat_integrations_background()))
|
||||
# Register session logger with error capture
|
||||
try:
|
||||
from infrastructure.error_capture import register_error_recorder
|
||||
from timmy.session_logger import get_session_logger
|
||||
|
||||
register_error_recorder(get_session_logger().record_error)
|
||||
except Exception:
|
||||
logger.debug("Failed to register error recorder")
|
||||
|
||||
_register_error_recorder()
|
||||
logger.info("✓ Dashboard ready for requests")
|
||||
|
||||
yield
|
||||
|
||||
await _shutdown(bg_tasks, workshop_heartbeat)
|
||||
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
|
||||
@@ -257,6 +257,28 @@ class ThinkingEngine:
|
||||
)
|
||||
return None
|
||||
|
||||
content, seed_type = await self._generate_thought(prompt)
|
||||
if not content:
|
||||
return None
|
||||
|
||||
thought = self._store_thought(content, seed_type)
|
||||
self._last_thought_id = thought.id
|
||||
|
||||
await self._finalize_thought(thought)
|
||||
return thought
|
||||
|
||||
async def _generate_thought(self, prompt: str | None = None) -> tuple[str | None, str]:
|
||||
"""Generate novel thought content via the dedup retry loop.
|
||||
|
||||
Gathers context, builds the LLM prompt, calls the agent, and
|
||||
retries with a fresh seed if the result is too similar to recent
|
||||
thoughts.
|
||||
|
||||
Returns:
|
||||
A (content, seed_type) tuple. *content* is ``None`` when the
|
||||
cycle should be skipped (agent failure, empty response, or
|
||||
all retries exhausted).
|
||||
"""
|
||||
memory_context = self._load_memory_context()
|
||||
system_context = self._gather_system_snapshot()
|
||||
recent_thoughts = self.get_recent_thoughts(limit=5)
|
||||
@@ -284,11 +306,11 @@ class ThinkingEngine:
|
||||
raw = await self._call_agent(full_prompt)
|
||||
except Exception as exc:
|
||||
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
|
||||
return None
|
||||
return None, seed_type
|
||||
|
||||
if not raw or not raw.strip():
|
||||
logger.debug("Thinking cycle produced empty response, skipping")
|
||||
return None
|
||||
return None, seed_type
|
||||
|
||||
content = raw.strip()
|
||||
|
||||
@@ -308,48 +330,28 @@ class ThinkingEngine:
|
||||
"Thought still repetitive after %d retries, discarding",
|
||||
self._MAX_DEDUP_RETRIES + 1,
|
||||
)
|
||||
return None
|
||||
return None, seed_type
|
||||
|
||||
if not content:
|
||||
return None
|
||||
return content, seed_type
|
||||
|
||||
thought = self._store_thought(content, seed_type)
|
||||
self._last_thought_id = thought.id
|
||||
|
||||
# Post-hook: check memory status periodically
|
||||
async def _finalize_thought(self, thought: Thought) -> None:
|
||||
"""Run post-hooks, log, journal, and broadcast a stored thought."""
|
||||
self._maybe_check_memory()
|
||||
|
||||
# Post-hook: distill facts from recent thoughts periodically
|
||||
await self._maybe_distill()
|
||||
|
||||
# Post-hook: file Gitea issues for actionable observations
|
||||
await self._maybe_file_issues()
|
||||
|
||||
# Post-hook: check workspace for new messages from Hermes
|
||||
await self._check_workspace()
|
||||
|
||||
# Post-hook: proactive memory status audit
|
||||
self._maybe_check_memory_status()
|
||||
|
||||
# Post-hook: update MEMORY.md with latest reflection
|
||||
self._update_memory(thought)
|
||||
|
||||
# Log to swarm event system
|
||||
self._log_event(thought)
|
||||
|
||||
# Append to daily journal file
|
||||
self._write_journal(thought)
|
||||
|
||||
# Broadcast to WebSocket clients
|
||||
await self._broadcast(thought)
|
||||
|
||||
logger.info(
|
||||
"Thought [%s] (%s): %s",
|
||||
thought.id[:8],
|
||||
seed_type,
|
||||
thought.seed_type,
|
||||
thought.content[:80],
|
||||
)
|
||||
return thought
|
||||
|
||||
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
|
||||
"""Retrieve the most recent thoughts."""
|
||||
|
||||
@@ -250,6 +250,99 @@ def test_continuity_includes_recent(tmp_path):
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _generate_thought helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_thought_returns_content_and_seed_type(tmp_path):
|
||||
"""_generate_thought should return (content, seed_type) on success."""
|
||||
from timmy.thinking import SEED_TYPES
|
||||
|
||||
engine = _make_engine(tmp_path)
|
||||
|
||||
with patch.object(engine, "_call_agent", return_value="A novel idea."):
|
||||
content, seed_type = await engine._generate_thought()
|
||||
|
||||
assert content == "A novel idea."
|
||||
assert seed_type in SEED_TYPES
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_thought_with_prompt(tmp_path):
|
||||
"""_generate_thought(prompt=...) should use 'prompted' seed type."""
|
||||
engine = _make_engine(tmp_path)
|
||||
|
||||
with patch.object(engine, "_call_agent", return_value="A prompted idea."):
|
||||
content, seed_type = await engine._generate_thought(prompt="Reflect on joy")
|
||||
|
||||
assert content == "A prompted idea."
|
||||
assert seed_type == "prompted"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_thought_returns_none_on_agent_failure(tmp_path):
|
||||
"""_generate_thought should return (None, ...) when the agent fails."""
|
||||
engine = _make_engine(tmp_path)
|
||||
|
||||
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
|
||||
content, seed_type = await engine._generate_thought()
|
||||
|
||||
assert content is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_generate_thought_returns_none_on_empty(tmp_path):
|
||||
"""_generate_thought should return (None, ...) when agent returns empty."""
|
||||
engine = _make_engine(tmp_path)
|
||||
|
||||
with patch.object(engine, "_call_agent", return_value=" "):
|
||||
content, seed_type = await engine._generate_thought()
|
||||
|
||||
assert content is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _finalize_thought helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_finalize_thought_calls_all_hooks(tmp_path):
|
||||
"""_finalize_thought should call all post-hooks, log, journal, and broadcast."""
|
||||
engine = _make_engine(tmp_path)
|
||||
thought = engine._store_thought("Test finalize.", "freeform")
|
||||
|
||||
with (
|
||||
patch.object(engine, "_maybe_check_memory") as m_mem,
|
||||
patch.object(engine, "_maybe_distill", new_callable=AsyncMock) as m_distill,
|
||||
patch.object(engine, "_maybe_file_issues", new_callable=AsyncMock) as m_issues,
|
||||
patch.object(engine, "_check_workspace", new_callable=AsyncMock) as m_ws,
|
||||
patch.object(engine, "_maybe_check_memory_status") as m_status,
|
||||
patch.object(engine, "_update_memory") as m_update,
|
||||
patch.object(engine, "_log_event") as m_log,
|
||||
patch.object(engine, "_write_journal") as m_journal,
|
||||
patch.object(engine, "_broadcast", new_callable=AsyncMock) as m_broadcast,
|
||||
):
|
||||
await engine._finalize_thought(thought)
|
||||
|
||||
m_mem.assert_called_once()
|
||||
m_distill.assert_awaited_once()
|
||||
m_issues.assert_awaited_once()
|
||||
m_ws.assert_awaited_once()
|
||||
m_status.assert_called_once()
|
||||
m_update.assert_called_once_with(thought)
|
||||
m_log.assert_called_once_with(thought)
|
||||
m_journal.assert_called_once_with(thought)
|
||||
m_broadcast.assert_awaited_once_with(thought)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# think_once (async)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_think_once_stores_thought(tmp_path):
|
||||
"""think_once should store a thought in the DB."""
|
||||
|
||||
Reference in New Issue
Block a user