diff --git a/docker-compose.yml b/docker-compose.yml index 3a31b8b6..f4106281 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -45,6 +45,8 @@ services: GROK_ENABLED: "${GROK_ENABLED:-false}" XAI_API_KEY: "${XAI_API_KEY:-}" GROK_DEFAULT_MODEL: "${GROK_DEFAULT_MODEL:-grok-3-fast}" + # Celery/Redis — background task queue + REDIS_URL: "redis://redis:6379/0" # Taskosaur API — dashboard can reach it on the internal network TASKOSAUR_API_URL: "http://taskosaur:3000/api" extra_hosts: @@ -131,6 +133,30 @@ services: retries: 5 start_period: 5s + # ── Celery Worker — background task processing ────────────────────────── + celery-worker: + build: . + image: timmy-time:latest + container_name: timmy-celery-worker + user: "0:0" + command: ["celery", "-A", "infrastructure.celery.app", "worker", "--loglevel=info", "--concurrency=2"] + volumes: + - timmy-data:/app/data + - ./src:/app/src + environment: + REDIS_URL: "redis://redis:6379/0" + OLLAMA_URL: "${OLLAMA_URL:-http://host.docker.internal:11434}" + extra_hosts: + - "host.docker.internal:host-gateway" + depends_on: + redis: + condition: service_healthy + networks: + - timmy-net + restart: unless-stopped + profiles: + - celery + # ── OpenFang — vendored agent runtime sidecar ──────────────────────────── openfang: build: diff --git a/poetry.lock b/poetry.lock index 78d7ec2c..d5f2fc40 100644 --- a/poetry.lock +++ b/poetry.lock @@ -391,6 +391,22 @@ torch = "*" tqdm = "*" transformers = "*" +[[package]] +name = "amqp" +version = "5.3.1" +description = "Low-level AMQP client for Python (fork of amqplib)." +optional = true +python-versions = ">=3.6" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2"}, + {file = "amqp-5.3.1.tar.gz", hash = "sha256:cddc00c725449522023bad949f70fff7b48f0b1ade74d170a6f10ab044739432"}, +] + +[package.dependencies] +vine = ">=5.0.0,<6.0.0" + [[package]] name = "annotated-doc" version = "0.0.4" @@ -520,6 +536,77 @@ files = [ {file = "audioop_lts-0.2.2.tar.gz", hash = "sha256:64d0c62d88e67b98a1a5e71987b7aa7b5bcffc7dcee65b635823dbdd0a8dbbd0"}, ] +[[package]] +name = "billiard" +version = "4.2.4" +description = "Python multiprocessing fork with improvements and bugfixes" +optional = true +python-versions = ">=3.7" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "billiard-4.2.4-py3-none-any.whl", hash = "sha256:525b42bdec68d2b983347ac312f892db930858495db601b5836ac24e6477cde5"}, + {file = "billiard-4.2.4.tar.gz", hash = "sha256:55f542c371209e03cd5862299b74e52e4fbcba8250ba611ad94276b369b6a85f"}, +] + +[[package]] +name = "celery" +version = "5.3.1" +description = "Distributed Task Queue." +optional = true +python-versions = ">=3.8" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "celery-5.3.1-py3-none-any.whl", hash = "sha256:27f8f3f3b58de6e0ab4f174791383bbd7445aff0471a43e99cfd77727940753f"}, + {file = "celery-5.3.1.tar.gz", hash = "sha256:f84d1c21a1520c116c2b7d26593926581191435a03aa74b77c941b93ca1c6210"}, +] + +[package.dependencies] +billiard = ">=4.1.0,<5.0" +click = ">=8.1.2,<9.0" +click-didyoumean = ">=0.3.0" +click-plugins = ">=1.1.1" +click-repl = ">=0.2.0" +kombu = ">=5.3.1,<6.0" +python-dateutil = ">=2.8.2" +redis = {version = ">=4.5.2,<4.5.5 || >4.5.5", optional = true, markers = "extra == \"redis\""} +tzdata = ">=2022.7" +vine = ">=5.0.0,<6.0" + +[package.extras] +arangodb = ["pyArango (>=2.0.1)"] +auth = ["cryptography (==41.0.1)"] +azureblockblob = ["azure-storage-blob (>=12.15.0)"] +brotli = ["brotli (>=1.0.0) ; platform_python_implementation == \"CPython\"", "brotlipy (>=0.7.0) ; platform_python_implementation == \"PyPy\""] +cassandra = ["cassandra-driver (>=3.25.0,<4)"] +consul = ["python-consul2 (==0.1.5)"] +cosmosdbsql = ["pydocumentdb (==2.3.5)"] +couchbase = ["couchbase (>=3.0.0) ; platform_python_implementation != \"PyPy\" and (platform_system != \"Windows\" or python_version < \"3.10\")"] +couchdb = ["pycouchdb (==1.14.2)"] +django = ["Django (>=2.2.28)"] +dynamodb = ["boto3 (>=1.26.143)"] +elasticsearch = ["elasticsearch (<8.0)"] +eventlet = ["eventlet (>=0.32.0) ; python_version < \"3.10\""] +gevent = ["gevent (>=1.5.0)"] +librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""] +memcache = ["pylibmc (==1.6.3) ; platform_system != \"Windows\""] +mongodb = ["pymongo[srv] (>=4.0.2)"] +msgpack = ["msgpack (==1.0.5)"] +pymemcache = ["python-memcached (==1.59)"] +pyro = ["pyro4 (==4.82) ; python_version < \"3.11\""] +pytest = ["pytest-celery (==0.0.0)"] +redis = ["redis (>=4.5.2,!=4.5.5)"] +s3 = ["boto3 (>=1.26.143)"] +slmq = ["softlayer-messaging (>=1.0.3)"] +solar = ["ephem (==4.1.4) ; platform_python_implementation != \"PyPy\""] +sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] +sqs = ["boto3 (>=1.26.143)", "kombu[sqs] (>=5.3.0)", "pycurl (>=7.43.0.5) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\"", "urllib3 (>=1.26.16)"] +tblib = ["tblib (>=1.3.0) ; python_version < \"3.8.0\"", "tblib (>=1.5.0) ; python_version >= \"3.8.0\""] +yaml = ["PyYAML (>=3.10)"] +zookeeper = ["kazoo (>=1.3.1)"] +zstd = ["zstandard (==0.21.0)"] + [[package]] name = "certifi" version = "2026.2.25" @@ -768,6 +855,61 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "platform_system == \"Windows\""} +[[package]] +name = "click-didyoumean" +version = "0.3.1" +description = "Enables git-like *did-you-mean* feature in click" +optional = true +python-versions = ">=3.6.2" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "click_didyoumean-0.3.1-py3-none-any.whl", hash = "sha256:5c4bb6007cfea5f2fd6583a2fb6701a22a41eb98957e63d0fac41c10e7c3117c"}, + {file = "click_didyoumean-0.3.1.tar.gz", hash = "sha256:4f82fdff0dbe64ef8ab2279bd6aa3f6a99c3b28c05aa09cbfc07c9d7fbb5a463"}, +] + +[package.dependencies] +click = ">=7" + +[[package]] +name = "click-plugins" +version = "1.1.1.2" +description = "An extension module for click to enable registering CLI commands via setuptools entry-points." +optional = true +python-versions = "*" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "click_plugins-1.1.1.2-py2.py3-none-any.whl", hash = "sha256:008d65743833ffc1f5417bf0e78e8d2c23aab04d9745ba817bd3e71b0feb6aa6"}, + {file = "click_plugins-1.1.1.2.tar.gz", hash = "sha256:d7af3984a99d243c131aa1a828331e7630f4a88a9741fd05c927b204bcf92261"}, +] + +[package.dependencies] +click = ">=4.0" + +[package.extras] +dev = ["coveralls", "pytest (>=3.6)", "pytest-cov", "wheel"] + +[[package]] +name = "click-repl" +version = "0.3.0" +description = "REPL plugin for Click" +optional = true +python-versions = ">=3.6" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "click-repl-0.3.0.tar.gz", hash = "sha256:17849c23dba3d667247dc4defe1757fff98694e90fe37474f3feebb69ced26a9"}, + {file = "click_repl-0.3.0-py3-none-any.whl", hash = "sha256:fb7e06deb8da8de86180a33a9da97ac316751c094c6899382da7feeeeb51b812"}, +] + +[package.dependencies] +click = ">=7.0" +prompt-toolkit = ">=3.0.36" + +[package.extras] +testing = ["pytest (>=7.2.1)", "pytest-cov (>=4.0.0)", "tox (>=4.4.3)"] + [[package]] name = "colorama" version = "0.4.6" @@ -1762,6 +1904,43 @@ files = [ {file = "joblib-1.5.3.tar.gz", hash = "sha256:8561a3269e6801106863fd0d6d84bb737be9e7631e33aaed3fb9ce5953688da3"}, ] +[[package]] +name = "kombu" +version = "5.6.2" +description = "Messaging library for Python." +optional = true +python-versions = ">=3.9" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "kombu-5.6.2-py3-none-any.whl", hash = "sha256:efcfc559da324d41d61ca311b0c64965ea35b4c55cc04ee36e55386145dace93"}, + {file = "kombu-5.6.2.tar.gz", hash = "sha256:8060497058066c6f5aed7c26d7cd0d3b574990b09de842a8c5aaed0b92cc5a55"}, +] + +[package.dependencies] +amqp = ">=5.1.1,<6.0.0" +packaging = "*" +tzdata = ">=2025.2" +vine = "5.1.0" + +[package.extras] +azureservicebus = ["azure-servicebus (>=7.10.0)"] +azurestoragequeues = ["azure-identity (>=1.12.0)", "azure-storage-queue (>=12.6.0)"] +confluentkafka = ["confluent-kafka (>=2.2.0)"] +consul = ["python-consul2 (==0.1.5)"] +gcpubsub = ["google-cloud-monitoring (>=2.16.0)", "google-cloud-pubsub (>=2.18.4)", "grpcio (==1.75.1)", "protobuf (==6.32.1)"] +librabbitmq = ["librabbitmq (>=2.0.0) ; python_version < \"3.11\""] +mongodb = ["pymongo (==4.15.3)"] +msgpack = ["msgpack (==1.1.2)"] +pyro = ["pyro4 (==4.82)"] +qpid = ["qpid-python (==1.36.0.post1)", "qpid-tools (==1.36.0.post1)"] +redis = ["redis (>=4.5.2,!=4.5.5,!=5.0.2,<6.5)"] +slmq = ["softlayer_messaging (>=1.0.3)"] +sqlalchemy = ["sqlalchemy (>=1.4.48,<2.1)"] +sqs = ["boto3 (>=1.26.143)", "pycurl (>=7.43.0.5) ; sys_platform != \"win32\" and platform_python_implementation == \"CPython\"", "urllib3 (>=1.26.16)"] +yaml = ["PyYAML (>=3.10)"] +zookeeper = ["kazoo (>=2.8.0)"] + [[package]] name = "markdown-it-py" version = "4.0.0" @@ -2694,6 +2873,22 @@ files = [ [package.dependencies] tqdm = "*" +[[package]] +name = "prompt-toolkit" +version = "3.0.52" +description = "Library for building powerful interactive command lines in Python" +optional = true +python-versions = ">=3.8" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955"}, + {file = "prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855"}, +] + +[package.dependencies] +wcwidth = "*" + [[package]] name = "propcache" version = "0.4.1" @@ -6771,6 +6966,22 @@ psutil = ["psutil (>=3.0)"] setproctitle = ["setproctitle"] testing = ["filelock"] +[[package]] +name = "python-dateutil" +version = "2.9.0.post0" +description = "Extensions to the standard Python datetime module" +optional = true +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"}, + {file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"}, +] + +[package.dependencies] +six = ">=1.5" + [[package]] name = "python-dotenv" version = "1.2.1" @@ -7420,6 +7631,19 @@ files = [ {file = "shellingham-1.5.4.tar.gz", hash = "sha256:8dbca0739d487e5bd35ab3ca4b36e11c4078f3a234bfce294b0a0291363404de"}, ] +[[package]] +name = "six" +version = "1.17.0" +description = "Python 2 and 3 compatibility utilities" +optional = true +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274"}, + {file = "six-1.17.0.tar.gz", hash = "sha256:ff70335d468e7eb6ec65b95b99d3a2836546063f63acc5171de367e834932a81"}, +] + [[package]] name = "smmap" version = "5.0.2" @@ -7912,6 +8136,19 @@ files = [ [package.dependencies] typing-extensions = ">=4.12.0" +[[package]] +name = "tzdata" +version = "2025.3" +description = "Provider of IANA time zone data" +optional = true +python-versions = ">=2" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1"}, + {file = "tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7"}, +] + [[package]] name = "urllib3" version = "2.6.3" @@ -8024,6 +8261,19 @@ dev = ["Cython (>=3.0,<4.0)", "setuptools (>=60)"] docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx_rtd_theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] test = ["aiohttp (>=3.10.5)", "flake8 (>=6.1,<7.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=25.3.0,<25.4.0)", "pycodestyle (>=2.11.0,<2.12.0)"] +[[package]] +name = "vine" +version = "5.1.0" +description = "Python promises." +optional = true +python-versions = ">=3.6" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "vine-5.1.0-py3-none-any.whl", hash = "sha256:40fdf3c48b2cfe1c38a49e9ae2da6fda88e4794c810050a728bd7413811fb1dc"}, + {file = "vine-5.1.0.tar.gz", hash = "sha256:8b62e981d35c41049211cf62a0a1242d8c1ee9bd15bb196ce38aefd6799e61e0"}, +] + [[package]] name = "watchfiles" version = "1.1.1" @@ -8146,6 +8396,19 @@ files = [ [package.dependencies] anyio = ">=3.0.0" +[[package]] +name = "wcwidth" +version = "0.6.0" +description = "Measures the displayed width of unicode strings in a terminal" +optional = true +python-versions = ">=3.8" +groups = ["main"] +markers = "extra == \"celery\"" +files = [ + {file = "wcwidth-0.6.0-py3-none-any.whl", hash = "sha256:1a3a1e510b553315f8e146c54764f4fb6264ffad731b3d78088cdb1478ffbdad"}, + {file = "wcwidth-0.6.0.tar.gz", hash = "sha256:cdc4e4262d6ef9a1a57e018384cbeb1208d8abbc64176027e2c2455c81313159"}, +] + [[package]] name = "websocket-client" version = "1.9.0" @@ -8399,6 +8662,7 @@ propcache = ">=0.2.1" [extras] bigbrain = ["airllm"] +celery = ["celery"] dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-randomly", "pytest-timeout", "pytest-xdist", "selenium"] discord = ["discord.py"] telegram = ["python-telegram-bot"] @@ -8407,4 +8671,4 @@ voice = ["pyttsx3"] [metadata] lock-version = "2.1" python-versions = ">=3.11,<4" -content-hash = "c4a7adbe5b16d5ea5b0d8425ca9373dfa8b20f0bc1b3a9ad90581e0a005e7acd" +content-hash = "337367c3d31512dfd2600ed1994b4c42a8c961d4eea2ced02a5492dbddd70faf" diff --git a/pyproject.toml b/pyproject.toml index ebb3c75a..176909d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ sentence-transformers = ">=2.0.0" # Local embeddings for brain numpy = ">=1.24.0" # Optional extras redis = { version = ">=5.0.0", optional = true } +celery = { version = ">=5.3.0", extras = ["redis"], optional = true } python-telegram-bot = { version = ">=21.0", optional = true } "discord.py" = { version = ">=2.3.0", optional = true } airllm = { version = ">=2.9.0", optional = true } @@ -58,6 +59,7 @@ telegram = ["python-telegram-bot"] discord = ["discord.py"] bigbrain = ["airllm"] voice = ["pyttsx3"] +celery = ["celery"] dev = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-timeout", "pytest-randomly", "pytest-xdist", "selenium"] [tool.poetry.group.dev.dependencies] diff --git a/src/config.py b/src/config.py index 40bd0772..573edd8e 100644 --- a/src/config.py +++ b/src/config.py @@ -23,6 +23,10 @@ class Settings(BaseSettings): # Discord bot token — set via DISCORD_TOKEN env var or the /discord/setup endpoint discord_token: str = "" + # ── Celery / Redis ────────────────────────────────────────────────────── + redis_url: str = "redis://localhost:6379/0" + celery_enabled: bool = True + # ── AirLLM / backend selection ─────────────────────────────────────────── # "ollama" — always use Ollama (default, safe everywhere) # "airllm" — always use AirLLM (requires pip install ".[bigbrain]") diff --git a/src/dashboard/app.py b/src/dashboard/app.py index 48999733..6fd6894b 100644 --- a/src/dashboard/app.py +++ b/src/dashboard/app.py @@ -41,6 +41,7 @@ from dashboard.routes.thinking import router as thinking_router from dashboard.routes.calm import router as calm_router from dashboard.routes.swarm import router as swarm_router from dashboard.routes.system import router as system_router +from dashboard.routes.tasks_celery import router as celery_router from infrastructure.router.api import router as cascade_router # Import dedicated middleware @@ -306,6 +307,7 @@ app.include_router(thinking_router) app.include_router(calm_router) app.include_router(swarm_router) app.include_router(system_router) +app.include_router(celery_router) app.include_router(cascade_router) diff --git a/src/dashboard/routes/tasks_celery.py b/src/dashboard/routes/tasks_celery.py new file mode 100644 index 00000000..c2b6c390 --- /dev/null +++ b/src/dashboard/routes/tasks_celery.py @@ -0,0 +1,128 @@ +"""Celery task queue routes — view and manage background tasks. + +GET /celery — render the Celery task queue page +GET /celery/api — JSON list of tasks +POST /celery/api — submit a new background task +GET /celery/api/{id} — get status of a specific task +POST /celery/api/{id}/revoke — cancel a running task +""" + +import logging +from pathlib import Path + +from fastapi import APIRouter, Request +from fastapi.responses import HTMLResponse, JSONResponse +from fastapi.templating import Jinja2Templates + +logger = logging.getLogger(__name__) + +router = APIRouter(prefix="/celery", tags=["celery"]) +templates = Jinja2Templates(directory=str(Path(__file__).parent.parent / "templates")) + +# In-memory record of submitted task IDs for the dashboard display. +# In production this would use the Celery result backend directly, +# but this lightweight list keeps the UI functional without Redis. +_submitted_tasks: list[dict] = [] +_MAX_TASK_HISTORY = 100 + + +@router.get("", response_class=HTMLResponse) +async def tasks_page(request: Request): + """Render the Celery task queue page.""" + from infrastructure.celery.app import celery_app + + celery_available = celery_app is not None + tasks = _get_tasks_with_status() + return templates.TemplateResponse( + request, + "celery_tasks.html", + {"tasks": tasks, "celery_available": celery_available}, + ) + + +@router.get("/api", response_class=JSONResponse) +async def tasks_api(): + """Return task list as JSON with current status.""" + return _get_tasks_with_status() + + +@router.post("/api", response_class=JSONResponse) +async def submit_task_api(request: Request): + """Submit a new background task. + + Body: {"prompt": "...", "agent_id": "timmy"} + """ + from infrastructure.celery.client import submit_chat_task + + try: + body = await request.json() + except Exception: + return JSONResponse({"error": "Invalid JSON body"}, status_code=400) + + prompt = body.get("prompt", "").strip() + if not prompt: + return JSONResponse({"error": "prompt is required"}, status_code=400) + + agent_id = body.get("agent_id", "timmy") + task_id = submit_chat_task(prompt=prompt, agent_id=agent_id) + + if task_id is None: + return JSONResponse( + {"error": "Celery is not available. Start Redis and a Celery worker."}, + status_code=503, + ) + + task_record = { + "task_id": task_id, + "prompt": prompt[:200], + "agent_id": agent_id, + "state": "PENDING", + } + _submitted_tasks.append(task_record) + if len(_submitted_tasks) > _MAX_TASK_HISTORY: + _submitted_tasks.pop(0) + + return JSONResponse(task_record, status_code=202) + + +@router.get("/api/{task_id}", response_class=JSONResponse) +async def task_status_api(task_id: str): + """Get status of a specific task.""" + from infrastructure.celery.client import get_task_status + + status = get_task_status(task_id) + if status is None: + return JSONResponse( + {"error": "Celery is not available or task not found"}, + status_code=503, + ) + return status + + +@router.post("/api/{task_id}/revoke", response_class=JSONResponse) +async def revoke_task_api(task_id: str): + """Cancel a pending or running task.""" + from infrastructure.celery.client import revoke_task + + success = revoke_task(task_id) + if not success: + return JSONResponse( + {"error": "Failed to revoke task (Celery unavailable)"}, + status_code=503, + ) + return {"task_id": task_id, "status": "revoked"} + + +def _get_tasks_with_status() -> list[dict]: + """Enrich submitted tasks with current Celery status.""" + from infrastructure.celery.client import get_task_status + + enriched = [] + for record in reversed(_submitted_tasks): + status = get_task_status(record["task_id"]) + if status: + record_copy = {**record, **status} + else: + record_copy = {**record, "state": "UNKNOWN"} + enriched.append(record_copy) + return enriched diff --git a/src/dashboard/templates/celery_tasks.html b/src/dashboard/templates/celery_tasks.html new file mode 100644 index 00000000..eb1f563e --- /dev/null +++ b/src/dashboard/templates/celery_tasks.html @@ -0,0 +1,221 @@ +{% extends "base.html" %} + +{% block title %}Timmy Time — Background Tasks{% endblock %} + +{% block extra_styles %} + +{% endblock %} + +{% block content %} +