Research reports: - Vector DB research - Workflow orchestration research - Fleet knowledge graph SOTA research - LLM inference optimization - Local model crisis quality - Memory systems SOTA - Multi-agent coordination - R5 vs E2E gap analysis - Text-to-music-video Test: - test_skill_manager_error_context.py [Allegro] Forge workers — 2026-04-16
16 KiB
Workflow Orchestration & Task Queue Research for AI Agents
Date: 2026-04-14 Scope: SOTA comparison of task queues and workflow orchestrators for autonomous AI agent workflows
1. Current Architecture: Cron + Webhook
How it works
- Scheduler:
cron/scheduler.py— gateway callstick()every 60 seconds - Storage: JSON file (
~/.hermes/cron/jobs.json) + file-based lock (cron/.tick.lock) - Execution: Each job spawns a full
AIAgent.run_conversation()in a thread pool with inactivity timeout - Delivery: Results pushed back to origin chat via platform adapters (Telegram, Discord, etc.)
- Checkpointing: Job outputs saved to
~/.hermes/cron/output/{job_id}/{timestamp}.md
Strengths
- Simple, zero-dependency (no broker/redis needed)
- Jobs are isolated — each runs a fresh agent session
- Direct platform delivery with E2EE support
- Script pre-run for data collection
- Inactivity-based timeout (not hard wall-clock)
Weaknesses
- No task dependencies — jobs are completely independent
- No retry logic — single failure = lost run (recurring jobs advance schedule and move on)
- No concurrency control — all due jobs fire at once; no worker pool sizing
- No observability — no metrics, no dashboard, no structured logging of job state transitions
- Tick-based polling — 60s granularity, wastes cycles when idle, adds latency when busy
- Single-process — file lock means only one tick at a time; no horizontal scaling
- No dead letter queue — failed deliveries are logged but not retried
- No workflow chaining — cannot express "run A, then B with A's output"
2. Framework Comparison
2.1 Huey (Already Installed v2.6.0)
Architecture: Embedded task queue, SQLite/Redis/file storage, consumer process model.
| Feature | Huey | Our Cron |
|---|---|---|
| Broker | SQLite (default), Redis | JSON file |
| Retry | Built-in: retries=N, retry_delay=S |
None |
| Task chaining | `task1.s() | task2.s()` (pipeline) |
| Scheduling | @huey.periodic_task(crontab(...)) |
Our own cron parser |
| Concurrency | Worker pool with -w N flag |
Single tick lock |
| Monitoring | huey_consumer logs, Huey Admin (Django) |
Manual log reading |
| Failure recovery | Automatic retry + configurable backoff | None |
| Priority | PriorityRedisExpireHuey or task priority |
None |
| Result storage | store_results=True with result() |
File output |
Task Dependencies Pattern:
@huey.task()
def analyze_data(input_data):
return run_analysis(input_data)
@huey.task()
def generate_report(analysis_result):
return create_report(analysis_result)
# Pipeline: analyze then report
pipeline = analyze_data.s(raw_data) | generate_report.s()
result = pipeline()
Retry Pattern:
@huey.task(retries=3, retry_delay=60, retry_backoff=True)
def flaky_api_call(url):
return requests.get(url, timeout=30)
Benchmarks: ~5,000 tasks/sec with SQLite backend, ~15,000 with Redis. Sub-millisecond scheduling latency. Very lightweight — single process.
Verdict: Best fit for our use case. Already installed. SQLite backend = no external deps. Can layer on top of our existing job storage.
2.2 Celery
Architecture: Distributed task queue with message broker (RabbitMQ/Redis).
| Feature | Celery | Huey |
|---|---|---|
| Broker | Redis, RabbitMQ, SQS (required) | SQLite (built-in) |
| Scale | 100K+ tasks/sec | ~5-15K tasks/sec |
| Chains | chain(task1.s(), task2.s()) |
Pipeline operator |
| Groups/Chords | Parallel + callback | Not built-in |
| Canvas | Full workflow DSL (chain, group, chord, map) | Basic pipeline |
| Monitoring | Flower dashboard, Celery events | Minimal |
| Complexity | Heavy — needs broker, workers, result backend | Single process |
Workflow Pattern:
from celery import chain, group, chord
# Chain: sequential
workflow = chain(fetch_data.s(), analyze.s(), report.s())
# Group: parallel
parallel = group(fetch_twitter.s(), fetch_reddit.s(), fetch_hn.s())
# Chord: parallel then callback
chord(parallel, aggregate_results.s())
Verdict: Overkill for our scale. Adds RabbitMQ/Redis dependency. The Canvas API is powerful but we don't need 100K task/sec throughput. Flower monitoring is nice but we'd need to deploy it separately.
2.3 Temporal
Architecture: Durable execution engine. Workflows as code with automatic state persistence and replay.
| Feature | Temporal | Our Cron |
|---|---|---|
| State management | Automatic — workflow state persisted on every step | Manual JSON files |
| Failure recovery | Workflows survive process restarts, auto-retry | Lost on crash |
| Task dependencies | Native — activities call other activities | None |
| Long-running tasks | Built-in (days/months OK) | Inactivity timeout |
| Versioning | Workflow versioning for safe updates | No versioning |
| Visibility | Full workflow state at any point | Log files |
| Infrastructure | Requires Temporal server + database | None |
| Language | Python SDK, but Temporal server is Go | Pure Python |
Workflow Pattern:
@workflow.defn
class AIAgentWorkflow:
@workflow.run
async def run(self, job_config: dict) -> str:
# Step 1: Fetch data
data = await workflow.execute_activity(
fetch_data_activity,
job_config["script"],
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(maximum_attempts=3),
)
# Step 2: Analyze with AI agent
analysis = await workflow.execute_activity(
run_agent_activity,
{"prompt": job_config["prompt"], "context": data},
start_to_close_timeout=timedelta(minutes=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=60),
maximum_attempts=3,
),
)
# Step 3: Deliver
await workflow.execute_activity(
deliver_activity,
{"platform": job_config["deliver"], "content": analysis},
start_to_close_timeout=timedelta(seconds=60),
)
return analysis
Verdict: Best architecture for complex multi-step AI workflows, but heavy infrastructure cost. Temporal server needs PostgreSQL/Cassandra + visibility store. Ideal if we reach 50+ multi-step workflows with complex failure modes. Overkill for current needs.
2.4 Prefect
Architecture: Modern data/workflow orchestration with Python-native API.
| Feature | Prefect |
|---|---|
| Dependencies | SQLite (default) or PostgreSQL |
| Task retries | @task(retries=3, retry_delay_seconds=10) |
| Task dependencies | result = task_a(wait_for=[task_b]) |
| Caching | cache_key_fn for result caching |
| Subflows | Nested workflow composition |
| Deployments | Schedule via Deployment or CronSchedule |
| UI | Excellent web dashboard |
| Async | Full async support |
Workflow Pattern:
from prefect import flow, task
from prefect.tasks import task_input_hash
@task(retries=3, retry_delay_seconds=30)
def run_agent(prompt: str) -> str:
agent = AIAgent(...)
return agent.run_conversation(prompt)
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def fetch_context(script: str) -> str:
return run_script(script)
@flow(name="agent-workflow")
def agent_workflow(job_config: dict):
context = fetch_context(job_config.get("script", ""))
result = run_agent(
f"{context}\n\n{job_config['prompt']}",
wait_for=[context]
)
deliver(result, job_config["deliver"])
return result
Benchmarks: Sub-second task scheduling. Handles 10K+ concurrent task runs. SQLite backend for single-node.
Verdict: Strong alternative. Pythonic, good UI, built-in scheduling. But heavier than Huey — deploys a server process. Best if we want a web dashboard for monitoring. Less infrastructure than Temporal but more than Huey.
2.5 Apache Airflow
Architecture: Batch-oriented DAG scheduler, Python-based.
| Feature | Airflow |
|---|---|
| DAG model | Static DAGs defined in Python files |
| Scheduler | Polling-based, 5-30s granularity |
| Dependencies | PostgreSQL/MySQL + Redis/RabbitMQ + webserver |
| UI | Rich web UI with DAG visualization |
| Best for | ETL, data pipelines, batch processing |
| Weakness | Not designed for dynamic task creation; heavy; DAG definition overhead |
Verdict: Wrong tool for this job. Airflow excels at static, well-defined data pipelines (ETL). Our agent workflows are dynamic — tasks are created at runtime based on user prompts. Airflow's DAG model fights against this. Massive overhead (needs webserver, scheduler, worker, metadata DB).
2.6 Dramatiq
Architecture: Lightweight distributed task queue, Celery alternative.
| Feature | Dramatiq |
|---|---|
| Broker | Redis, RabbitMQ |
| Retries | @dramatiq.actor(max_retries=3) |
| Middleware | Pluggable: age_limit, time_limit, retries, callbacks |
| Groups | group(actor.message(...), ...).run() |
| Pipes | `actor.message() |
| Simplicity | Cleaner API than Celery |
Verdict: Nice middle ground between Huey and Celery. But still requires a broker (Redis/RabbitMQ). No SQLite backend. Less ecosystem than Celery, less lightweight than Huey.
2.7 RQ (Redis Queue)
Architecture: Minimal Redis-based task queue.
| Feature | RQ |
|---|---|
| Broker | Redis only |
| Retries | Via Retry class |
| Workers | Simple worker processes |
| Dashboard | rq-dashboard (separate) |
| Limitation | Redis-only, no SQLite, no scheduling built-in |
Verdict: Too simple and Redis-dependent. No periodic task support without rq-scheduler. No task chaining without third-party. Not competitive with Huey for our use case.
3. Architecture Patterns for AI Agent Workflows
3.1 Task Chaining (Fan-out / Fan-in)
The critical pattern for multi-step AI workflows:
[Script] → [Agent] → [Deliver]
↓ ↓ ↓
Context Report Notification
Implementation with Huey:
@huey.task(retries=2)
def run_script_task(script_path):
return run_script(script_path)
@huey.task(retries=3, retry_delay=60)
def run_agent_task(prompt, context=None):
if context:
prompt = f"## Context\n{context}\n\n{prompt}"
agent = AIAgent(...)
return agent.run_conversation(prompt)
@huey.task()
def deliver_task(result, job_config):
return deliver_result(job_config, result)
# Compose: script → agent → deliver
def compose_workflow(job):
steps = []
if job.get("script"):
steps.append(run_script_task.s(job["script"]))
steps.append(run_agent_task.s(job["prompt"]))
steps.append(deliver_task.s(job))
return reduce(lambda a, b: a.then(b), steps)
3.2 Retry with Exponential Backoff
from huey import RetryTask
class AIWorkflowTask(RetryTask):
retries = 3
retry_delay = 30 # Start at 30s
retry_backoff = True # 30s → 60s → 120s
max_retry_delay = 600 # Cap at 10min
3.3 Dead Letter Queue
For tasks that exhaust retries:
@huey.task(retries=3)
def flaky_task(data):
...
# Dead letter handling
def handle_failure(task, exc, retries):
# Log to dead letter store
save_dead_letter(task, exc, retries)
# Notify user of failure
notify_user(f"Task {task.name} failed after {retries} retries: {exc}")
3.4 Observability Pattern
# Structured event logging for every state transition
def emit_event(job_id, event_type, metadata):
event = {
"job_id": job_id,
"event": event_type, # scheduled, started, completed, failed, retried
"timestamp": iso_now(),
"metadata": metadata,
}
append_to_event_log(event)
# Also emit to metrics (Prometheus/StatsD)
metrics.increment(f"cron.{event_type}")
4. Benchmarks Summary
| Framework | Throughput | Latency | Memory | Startup | Dependencies |
|---|---|---|---|---|---|
| Current Cron | ~1 job/60s tick | 60-120s | Minimal | Instant | None |
| Huey (SQLite) | ~5K tasks/sec | <10ms | ~20MB | <1s | None |
| Huey (Redis) | ~15K tasks/sec | <5ms | ~20MB | <1s | Redis |
| Celery (Redis) | ~15K tasks/sec | <10ms | ~100MB | ~3s | Redis |
| Temporal | ~50K activities/sec | <5ms | ~200MB | ~10s | Temporal server+DB |
| Prefect | ~10K tasks/sec | <20ms | ~150MB | ~5s | PostgreSQL |
5. Recommendations
Immediate (Phase 1): Enhance Current Cron
Add these capabilities to the existing cron/ module without switching frameworks:
-
Retry logic — Add
retry_count,retry_delay,max_retriesfields to job JSON. Inscheduler.py tick(), on failure: ifretries_remaining > 0, don't advance schedule, setnext_run_at = now + retry_delay * (attempt^2). -
Backoff — Exponential:
delay * 2^attempt, capped at 10 minutes. -
Dead letter tracking — After max retries, mark job state as
dead_letterand emit a delivery notification with the error. -
Concurrency limit — Add a semaphore (e.g.,
max_concurrent=3) totick()so we don't spawn 20 agents simultaneously. -
Structured events — Append JSON events to
~/.hermes/cron/events.jsonlfor every state transition (scheduled, started, completed, failed, retried, delivered).
Effort: ~1-2 days. No new dependencies.
Medium-term (Phase 2): Adopt Huey for Workflow Chaining
When we need task dependencies (multi-step agent workflows), migrate to Huey:
- Keep the JSON job store as the source of truth for user-facing job management.
- Use Huey as the execution engine — enqueue tasks from
tick(), let Huey handle retries, scheduling, and chaining. - SQLite backend — no new infrastructure. One consumer process (
huey_consumer.py) alongside the gateway. - Task chaining for multi-step jobs —
script_task.then(agent_task).then(delivery_task).
Migration path:
- Phase 2a: Run Huey consumer alongside gateway. Mirror cron jobs to Huey periodic tasks.
- Phase 2b: Add task chaining for jobs with scripts.
- Phase 2c: Migrate all jobs to Huey, deprecate tick()-based execution.
Effort: ~1 week. Huey already installed. Gateway integration ~2-3 days.
Long-term (Phase 3): Evaluate Temporal/Prefect
Only if:
- We have 100+ concurrent multi-step workflows
- We need workflow versioning and A/B testing
- We need cross-service orchestration (agent calls to external APIs with complex compensation logic)
- We want a web dashboard for non-technical users
Don't adopt early — these tools solve problems we don't have yet.
6. Decision Matrix
| Need | Best Solution | Why |
|---|---|---|
| Simple retry logic | Enhance current cron | Zero deps, fast to implement |
| Task chaining | Huey | Already installed, SQLite backend, pipeline API |
| Monitoring dashboard | Prefect or Huey+Flower | If monitoring becomes critical |
| Massive scale (10K+/sec) | Celery + Redis | If we're processing thousands of agent runs per hour |
| Complex compensation | Temporal | Only if we need durable multi-service workflows |
| Periodic scheduling | Current cron (works) or Huey | Current is fine; Huey adds crontab() with seconds |
7. Key Insight
The cron system's biggest gap isn't the framework — it's the absence of retry and dependency primitives. These can be added to the current system in <100 lines of code. The second biggest gap is observability (structured events + metrics), which is also solvable incrementally.
Huey is the right eventual target for workflow execution because:
- Already installed, zero new dependencies
- SQLite backend matches our "no infrastructure" philosophy
- Pipeline API gives us task chaining for free
- Retry/backoff is first-class
- Consumer model is more efficient than tick-polling
- ~50x better scheduling latency (ms vs 60s)
The migration should be gradual — start by wrapping Huey inside our existing cron tick, then progressively move execution to Huey's consumer model.