1
0

Integrate Celery task queue for background task processing (#129)

This commit is contained in:
Alexander Whitestone
2026-03-05 12:09:51 -05:00
committed by GitHub
parent b8ff534ad8
commit f2dacf4ee0
15 changed files with 1181 additions and 1 deletions

View File

@@ -0,0 +1,22 @@
"""Celery task queue integration — optional background task processing.
Gracefully degrades when Redis or Celery are unavailable.
"""
from infrastructure.celery.app import celery_app
from infrastructure.celery.client import (
get_active_tasks,
get_task_status,
revoke_task,
submit_chat_task,
submit_tool_task,
)
__all__ = [
"celery_app",
"get_active_tasks",
"get_task_status",
"revoke_task",
"submit_chat_task",
"submit_tool_task",
]

View File

@@ -0,0 +1,44 @@
"""Celery application factory with graceful degradation.
When Redis is unavailable or Celery is not installed, ``celery_app`` is set
to ``None`` and all task submissions become safe no-ops.
"""
import logging
import os
logger = logging.getLogger(__name__)
celery_app = None
_TEST_MODE = os.environ.get("TIMMY_TEST_MODE") == "1"
if not _TEST_MODE:
try:
from celery import Celery
from config import settings
if not settings.celery_enabled:
logger.info("Celery disabled via settings (celery_enabled=False)")
else:
celery_app = Celery("timmy")
celery_app.conf.update(
broker_url=settings.redis_url,
result_backend=settings.redis_url,
task_serializer="json",
result_serializer="json",
accept_content=["json"],
result_expires=3600,
task_track_started=True,
worker_hijack_root_logger=False,
)
# Autodiscover tasks in the celery package
celery_app.autodiscover_tasks(["infrastructure.celery"])
logger.info("Celery app configured (broker=%s)", settings.redis_url)
except ImportError:
logger.info("Celery not installed — background tasks disabled")
except Exception as exc:
logger.warning("Celery setup failed (Redis down?): %s", exc)
celery_app = None
else:
logger.debug("Celery disabled in test mode")

View File

@@ -0,0 +1,150 @@
"""Client API for submitting and querying Celery tasks.
All functions gracefully return None/empty when Celery is unavailable.
"""
import logging
from typing import Any
logger = logging.getLogger(__name__)
def _get_app():
"""Get the Celery app instance."""
from infrastructure.celery.app import celery_app
return celery_app
def submit_chat_task(
prompt: str,
agent_id: str = "timmy",
session_id: str = "celery",
) -> str | None:
"""Submit a chat task to the Celery queue.
Returns:
Task ID string, or None if Celery is unavailable.
"""
app = _get_app()
if app is None:
logger.debug("Celery unavailable — chat task not submitted")
return None
try:
from infrastructure.celery.tasks import run_agent_chat
result = run_agent_chat.delay(prompt, agent_id=agent_id, session_id=session_id)
logger.info("Submitted chat task %s for %s", result.id, agent_id)
return result.id
except Exception as exc:
logger.warning("Failed to submit chat task: %s", exc)
return None
def submit_tool_task(
tool_name: str,
kwargs: dict | None = None,
agent_id: str = "timmy",
) -> str | None:
"""Submit a tool execution task to the Celery queue.
Returns:
Task ID string, or None if Celery is unavailable.
"""
app = _get_app()
if app is None:
logger.debug("Celery unavailable — tool task not submitted")
return None
try:
from infrastructure.celery.tasks import execute_tool
result = execute_tool.delay(tool_name, kwargs=kwargs or {}, agent_id=agent_id)
logger.info("Submitted tool task %s: %s", result.id, tool_name)
return result.id
except Exception as exc:
logger.warning("Failed to submit tool task: %s", exc)
return None
def get_task_status(task_id: str) -> dict[str, Any] | None:
"""Get status of a Celery task.
Returns:
Dict with state, result, etc., or None if Celery unavailable.
"""
app = _get_app()
if app is None:
return None
try:
result = app.AsyncResult(task_id)
data: dict[str, Any] = {
"task_id": task_id,
"state": result.state,
"ready": result.ready(),
}
if result.ready():
data["result"] = result.result
if result.failed():
data["error"] = str(result.result)
return data
except Exception as exc:
logger.warning("Failed to get task status: %s", exc)
return None
def get_active_tasks() -> list[dict[str, Any]]:
"""List currently active/reserved tasks.
Returns:
List of task dicts, or empty list if Celery unavailable.
"""
app = _get_app()
if app is None:
return []
try:
inspector = app.control.inspect()
active = inspector.active() or {}
reserved = inspector.reserved() or {}
tasks = []
for worker_tasks in active.values():
for t in worker_tasks:
tasks.append({
"task_id": t.get("id"),
"name": t.get("name"),
"state": "STARTED",
"args": t.get("args"),
"worker": t.get("hostname"),
})
for worker_tasks in reserved.values():
for t in worker_tasks:
tasks.append({
"task_id": t.get("id"),
"name": t.get("name"),
"state": "PENDING",
"args": t.get("args"),
})
return tasks
except Exception as exc:
logger.warning("Failed to inspect active tasks: %s", exc)
return []
def revoke_task(task_id: str) -> bool:
"""Revoke (cancel) a pending or running task.
Returns:
True if revoke was sent, False if Celery unavailable.
"""
app = _get_app()
if app is None:
return False
try:
app.control.revoke(task_id, terminate=True)
logger.info("Revoked task %s", task_id)
return True
except Exception as exc:
logger.warning("Failed to revoke task: %s", exc)
return False

View File

@@ -0,0 +1,147 @@
"""Celery task definitions for background processing.
Tasks:
- run_agent_chat: Execute a chat prompt via Timmy's session
- execute_tool: Run a specific tool function asynchronously
- run_thinking_cycle: Execute one thinking engine cycle
"""
import logging
from datetime import datetime, timezone
logger = logging.getLogger(__name__)
def _get_app():
"""Get the Celery app (lazy import to avoid circular deps)."""
from infrastructure.celery.app import celery_app
return celery_app
_app = _get_app()
if _app is not None:
@_app.task(bind=True, name="infrastructure.celery.tasks.run_agent_chat")
def run_agent_chat(self, prompt, agent_id="timmy", session_id="celery"):
"""Execute a chat prompt against Timmy's agent session.
Args:
prompt: The message to send to the agent.
agent_id: Agent identifier (currently only "timmy" supported).
session_id: Chat session ID for context continuity.
Returns:
Dict with agent_id, response, and completed_at.
"""
logger.info("Celery task [%s]: chat prompt for %s", self.request.id, agent_id)
try:
from timmy.session import chat
response = chat(prompt, session_id=session_id)
result = {
"agent_id": agent_id,
"prompt": prompt[:200],
"response": response,
"completed_at": datetime.now(timezone.utc).isoformat(),
}
_log_completion("run_agent_chat", self.request.id, success=True)
return result
except Exception as exc:
logger.error("Celery chat task failed: %s", exc)
_log_completion("run_agent_chat", self.request.id, success=False)
return {
"agent_id": agent_id,
"prompt": prompt[:200],
"error": str(exc),
"completed_at": datetime.now(timezone.utc).isoformat(),
}
@_app.task(bind=True, name="infrastructure.celery.tasks.execute_tool")
def execute_tool(self, tool_name, kwargs=None, agent_id="timmy"):
"""Run a specific tool function asynchronously.
Args:
tool_name: Name of the tool to execute (e.g., "web_search").
kwargs: Dict of keyword arguments for the tool.
agent_id: Agent requesting the tool execution.
Returns:
Dict with tool_name, result, and success flag.
"""
kwargs = kwargs or {}
logger.info("Celery task [%s]: tool=%s for %s", self.request.id, tool_name, agent_id)
try:
from timmy.tools import create_full_toolkit
toolkit = create_full_toolkit()
if toolkit is None:
return {"tool_name": tool_name, "error": "Toolkit unavailable", "success": False}
# Find and call the tool function
tool_fn = None
for fn in toolkit.functions.values():
if fn.name == tool_name:
tool_fn = fn
break
if tool_fn is None:
return {"tool_name": tool_name, "error": f"Tool '{tool_name}' not found", "success": False}
result = tool_fn.entrypoint(**kwargs)
_log_completion("execute_tool", self.request.id, success=True)
return {
"tool_name": tool_name,
"result": str(result)[:5000],
"success": True,
"completed_at": datetime.now(timezone.utc).isoformat(),
}
except Exception as exc:
logger.error("Celery tool task failed: %s", exc)
_log_completion("execute_tool", self.request.id, success=False)
return {"tool_name": tool_name, "error": str(exc), "success": False}
@_app.task(bind=True, name="infrastructure.celery.tasks.run_thinking_cycle")
def run_thinking_cycle(self):
"""Execute one thinking engine cycle in the background.
Returns:
Dict with thought data or None if thinking is disabled.
"""
import asyncio
logger.info("Celery task [%s]: thinking cycle", self.request.id)
try:
from timmy.thinking import thinking_engine
# Run the async think_once in a sync context
loop = asyncio.new_event_loop()
try:
thought = loop.run_until_complete(thinking_engine.think_once())
finally:
loop.close()
if thought:
_log_completion("run_thinking_cycle", self.request.id, success=True)
return {
"thought_id": thought.id,
"content": thought.content,
"seed_type": thought.seed_type,
"created_at": thought.created_at,
}
return None
except Exception as exc:
logger.error("Celery thinking task failed: %s", exc)
_log_completion("run_thinking_cycle", self.request.id, success=False)
return None
def _log_completion(task_name, task_id, success=True):
"""Log task completion to the Spark engine if available."""
try:
from spark.engine import spark_engine
spark_engine.on_tool_executed(
agent_id="celery-worker",
tool_name=f"celery.{task_name}",
success=success,
)
except Exception:
pass