Compare commits

..

2 Commits

Author SHA1 Message Date
kimi
3de7db770f refactor: break up think_once() into _generate_thought() and _finalize_thought()
Extracts the dedup retry loop and post-hook/broadcast pipeline into
focused helpers, reducing think_once() from 118 lines to ~20.

Fixes #513

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 19:41:28 -04:00
9f2728f529 refactor: break up lifespan() into testable helpers (#515)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 19:30:32 -04:00
3 changed files with 160 additions and 64 deletions

View File

@@ -329,21 +329,35 @@ async def _discord_token_watcher() -> None:
logger.warning("Discord auto-start failed: %s", exc)
def _init_services() -> None:
"""Validate config, enable event persistence, and init Spark engine."""
def _startup_init() -> None:
"""Validate config and enable event persistence."""
from config import validate_startup
from infrastructure.events.bus import init_event_bus_persistence
from spark.engine import get_spark_engine
validate_startup()
from infrastructure.events.bus import init_event_bus_persistence
init_event_bus_persistence()
from spark.engine import get_spark_engine
if get_spark_engine().enabled:
logger.info("Spark Intelligence active — event capture enabled")
def _auto_prune() -> None:
"""Run startup housekeeping: prune memories, thoughts, events, and check vault size."""
def _startup_background_tasks() -> list[asyncio.Task]:
"""Spawn all recurring background tasks (non-blocking)."""
return [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
asyncio.create_task(_start_chat_integrations_background()),
]
def _startup_pruning() -> None:
"""Auto-prune old memories, thoughts, and events on startup."""
if settings.memory_prune_days > 0:
try:
from timmy.memory_system import prune_memories
@@ -411,22 +425,11 @@ def _auto_prune() -> None:
logger.debug("Vault size check skipped: %s", exc)
def _register_error_recorder() -> None:
"""Wire the session logger into the error-capture system."""
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
async def _shutdown(
tasks: list[asyncio.Task],
workshop_heartbeat: object,
async def _shutdown_cleanup(
bg_tasks: list[asyncio.Task],
workshop_heartbeat,
) -> None:
"""Stop integrations, close sessions, and cancel background tasks."""
"""Stop chat bots, MCP sessions, heartbeat, and cancel background tasks."""
from integrations.chat_bridge.vendors.discord import discord_bot
from integrations.telegram_bot.bot import telegram_bot
@@ -440,9 +443,9 @@ async def _shutdown(
except Exception as exc:
logger.debug("MCP shutdown: %s", exc)
await workshop_heartbeat.stop() # type: ignore[union-attr]
await workshop_heartbeat.stop()
for task in tasks:
for task in bg_tasks:
task.cancel()
try:
await task
@@ -453,16 +456,9 @@ async def _shutdown(
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with non-blocking startup."""
_init_services()
_auto_prune()
# Create all background tasks without waiting for them
bg_tasks = [
asyncio.create_task(_briefing_scheduler()),
asyncio.create_task(_thinking_scheduler()),
asyncio.create_task(_loop_qa_scheduler()),
asyncio.create_task(_presence_watcher()),
]
_startup_init()
bg_tasks = _startup_background_tasks()
_startup_pruning()
# Start Workshop presence heartbeat with WS relay
from dashboard.routes.world import broadcast_world_state
@@ -471,15 +467,20 @@ async def lifespan(app: FastAPI):
workshop_heartbeat = WorkshopHeartbeat(on_change=broadcast_world_state)
await workshop_heartbeat.start()
# Start chat integrations in background
bg_tasks.append(asyncio.create_task(_start_chat_integrations_background()))
# Register session logger with error capture
try:
from infrastructure.error_capture import register_error_recorder
from timmy.session_logger import get_session_logger
register_error_recorder(get_session_logger().record_error)
except Exception:
logger.debug("Failed to register error recorder")
_register_error_recorder()
logger.info("✓ Dashboard ready for requests")
yield
await _shutdown(bg_tasks, workshop_heartbeat)
await _shutdown_cleanup(bg_tasks, workshop_heartbeat)
app = FastAPI(

View File

@@ -257,6 +257,28 @@ class ThinkingEngine:
)
return None
content, seed_type = await self._generate_thought(prompt)
if not content:
return None
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
await self._finalize_thought(thought)
return thought
async def _generate_thought(self, prompt: str | None = None) -> tuple[str | None, str]:
"""Generate novel thought content via the dedup retry loop.
Gathers context, builds the LLM prompt, calls the agent, and
retries with a fresh seed if the result is too similar to recent
thoughts.
Returns:
A (content, seed_type) tuple. *content* is ``None`` when the
cycle should be skipped (agent failure, empty response, or
all retries exhausted).
"""
memory_context = self._load_memory_context()
system_context = self._gather_system_snapshot()
recent_thoughts = self.get_recent_thoughts(limit=5)
@@ -284,11 +306,11 @@ class ThinkingEngine:
raw = await self._call_agent(full_prompt)
except Exception as exc:
logger.warning("Thinking cycle failed (Ollama likely down): %s", exc)
return None
return None, seed_type
if not raw or not raw.strip():
logger.debug("Thinking cycle produced empty response, skipping")
return None
return None, seed_type
content = raw.strip()
@@ -308,48 +330,28 @@ class ThinkingEngine:
"Thought still repetitive after %d retries, discarding",
self._MAX_DEDUP_RETRIES + 1,
)
return None
return None, seed_type
if not content:
return None
return content, seed_type
thought = self._store_thought(content, seed_type)
self._last_thought_id = thought.id
# Post-hook: check memory status periodically
async def _finalize_thought(self, thought: Thought) -> None:
"""Run post-hooks, log, journal, and broadcast a stored thought."""
self._maybe_check_memory()
# Post-hook: distill facts from recent thoughts periodically
await self._maybe_distill()
# Post-hook: file Gitea issues for actionable observations
await self._maybe_file_issues()
# Post-hook: check workspace for new messages from Hermes
await self._check_workspace()
# Post-hook: proactive memory status audit
self._maybe_check_memory_status()
# Post-hook: update MEMORY.md with latest reflection
self._update_memory(thought)
# Log to swarm event system
self._log_event(thought)
# Append to daily journal file
self._write_journal(thought)
# Broadcast to WebSocket clients
await self._broadcast(thought)
logger.info(
"Thought [%s] (%s): %s",
thought.id[:8],
seed_type,
thought.seed_type,
thought.content[:80],
)
return thought
def get_recent_thoughts(self, limit: int = 20) -> list[Thought]:
"""Retrieve the most recent thoughts."""

View File

@@ -250,6 +250,99 @@ def test_continuity_includes_recent(tmp_path):
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# _generate_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_generate_thought_returns_content_and_seed_type(tmp_path):
"""_generate_thought should return (content, seed_type) on success."""
from timmy.thinking import SEED_TYPES
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A novel idea."):
content, seed_type = await engine._generate_thought()
assert content == "A novel idea."
assert seed_type in SEED_TYPES
@pytest.mark.asyncio
async def test_generate_thought_with_prompt(tmp_path):
"""_generate_thought(prompt=...) should use 'prompted' seed type."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value="A prompted idea."):
content, seed_type = await engine._generate_thought(prompt="Reflect on joy")
assert content == "A prompted idea."
assert seed_type == "prompted"
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_agent_failure(tmp_path):
"""_generate_thought should return (None, ...) when the agent fails."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", side_effect=Exception("Ollama down")):
content, seed_type = await engine._generate_thought()
assert content is None
@pytest.mark.asyncio
async def test_generate_thought_returns_none_on_empty(tmp_path):
"""_generate_thought should return (None, ...) when agent returns empty."""
engine = _make_engine(tmp_path)
with patch.object(engine, "_call_agent", return_value=" "):
content, seed_type = await engine._generate_thought()
assert content is None
# ---------------------------------------------------------------------------
# _finalize_thought helper
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_finalize_thought_calls_all_hooks(tmp_path):
"""_finalize_thought should call all post-hooks, log, journal, and broadcast."""
engine = _make_engine(tmp_path)
thought = engine._store_thought("Test finalize.", "freeform")
with (
patch.object(engine, "_maybe_check_memory") as m_mem,
patch.object(engine, "_maybe_distill", new_callable=AsyncMock) as m_distill,
patch.object(engine, "_maybe_file_issues", new_callable=AsyncMock) as m_issues,
patch.object(engine, "_check_workspace", new_callable=AsyncMock) as m_ws,
patch.object(engine, "_maybe_check_memory_status") as m_status,
patch.object(engine, "_update_memory") as m_update,
patch.object(engine, "_log_event") as m_log,
patch.object(engine, "_write_journal") as m_journal,
patch.object(engine, "_broadcast", new_callable=AsyncMock) as m_broadcast,
):
await engine._finalize_thought(thought)
m_mem.assert_called_once()
m_distill.assert_awaited_once()
m_issues.assert_awaited_once()
m_ws.assert_awaited_once()
m_status.assert_called_once()
m_update.assert_called_once_with(thought)
m_log.assert_called_once_with(thought)
m_journal.assert_called_once_with(thought)
m_broadcast.assert_awaited_once_with(thought)
# ---------------------------------------------------------------------------
# think_once (async)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_think_once_stores_thought(tmp_path):
"""think_once should store a thought in the DB."""