Files
hermes-agent/.plans/streaming-support.md
teknium1 586fe5d62d Merge PR #724: feat: --yolo flag to bypass all approval prompts
Authored by dmahan93. Adds HERMES_YOLO_MODE env var and --yolo CLI flag
to auto-approve all dangerous command prompts.

Post-merge: renamed --fuck-it-ship-it to --yolo for brevity,
resolved conflict with --checkpoints flag.
2026-03-10 20:56:30 -07:00

24 KiB

Streaming LLM Response Support for Hermes Agent

Overview

Add token-by-token streaming of LLM responses across all platforms. When enabled, users see the response typing out live instead of waiting for the full generation. Streaming is opt-in via config, defaults to off, and all existing non-streaming code paths remain intact as the default.

Design Principles

  1. Feature-flagged: streaming.enabled: true in config.yaml. Off by default. When off, all existing code paths are unchanged — zero risk to current behavior.
  2. Callback-based: A simple stream_callback(text_delta: str) function injected into AIAgent. The agent doesn't know or care what the consumer does with tokens.
  3. Graceful degradation: If the provider doesn't support streaming, or streaming fails for any reason, silently fall back to the non-streaming path.
  4. Platform-agnostic core: The streaming mechanism in AIAgent works the same regardless of whether the consumer is CLI, Telegram, Discord, or the API server.

Architecture

                              stream_callback(delta)
                                    │
  ┌─────────────┐    ┌─────────────▼──────────────┐
  │  LLM API    │    │      queue.Queue()          │
  │  (stream)   │───►│  thread-safe bridge between │
  │             │    │  agent thread & consumer    │
  └─────────────┘    └─────────────┬──────────────┘
                                   │
                    ┌──────────────┼──────────────┐
                    │              │              │
              ┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
              │    CLI     │ │  Gateway  │ │ API Server│
              │ print to   │ │ edit msg  │ │ SSE event │
              │ terminal   │ │ on Tg/Dc  │ │ to client │
              └───────────┘ └───────────┘ └───────────┘

The agent runs in a thread. The callback puts tokens into a thread-safe queue. Each consumer reads the queue in its own context (async task, main thread, etc.).


Configuration

config.yaml

streaming:
  enabled: false          # Master switch. Default off.
  # Per-platform overrides (optional):
  # cli: true             # Override for CLI only
  # telegram: true        # Override for Telegram only
  # discord: false        # Keep Discord non-streaming
  # api_server: true      # Override for API server

Environment variables

HERMES_STREAMING_ENABLED=true    # Master switch via env

How the flag is read

  • CLI: load_cli_config() reads streaming.enabled, sets env var. AIAgent checks at init time.
  • Gateway: _run_agent() reads config, decides whether to pass stream_callback to the AIAgent constructor.
  • API server: For Chat Completions stream=true requests, always uses streaming regardless of config (the client is explicitly requesting it). For non-stream requests, uses config.

Precedence

  1. API server: client's stream field overrides everything
  2. Per-platform config override (e.g., streaming.telegram: true)
  3. Master streaming.enabled flag
  4. Default: off

Implementation Plan

Phase 1: Core streaming infrastructure in AIAgent

File: run_agent.py

1a. Add stream_callback parameter to init (~5 lines)

def __init__(self, ..., stream_callback: callable = None, ...):
    self.stream_callback = stream_callback

No other init changes. The callback is optional — when None, everything works exactly as before.

1b. Add _run_streaming_chat_completion() method (~65 lines)

New method for Chat Completions API streaming:

def _run_streaming_chat_completion(self, api_kwargs: dict):
    """Stream a chat completion, emitting text tokens via stream_callback.
    
    Returns a fake response object compatible with the non-streaming code path.
    Falls back to non-streaming on any error.
    """
    stream_kwargs = dict(api_kwargs)
    stream_kwargs["stream"] = True
    stream_kwargs["stream_options"] = {"include_usage": True}
    
    accumulated_content = []
    accumulated_tool_calls = {}  # index -> {id, name, arguments}
    final_usage = None
    
    try:
        stream = self.client.chat.completions.create(**stream_kwargs)
        
        for chunk in stream:
            if not chunk.choices:
                # Usage-only chunk (final)
                if chunk.usage:
                    final_usage = chunk.usage
                continue
            
            delta = chunk.choices[0].delta
            
            # Text content — emit via callback
            if delta.content:
                accumulated_content.append(delta.content)
                if self.stream_callback:
                    try:
                        self.stream_callback(delta.content)
                    except Exception:
                        pass
            
            # Tool call deltas — accumulate silently
            if delta.tool_calls:
                for tc_delta in delta.tool_calls:
                    idx = tc_delta.index
                    if idx not in accumulated_tool_calls:
                        accumulated_tool_calls[idx] = {
                            "id": tc_delta.id or "",
                            "name": "", "arguments": ""
                        }
                    if tc_delta.function:
                        if tc_delta.function.name:
                            accumulated_tool_calls[idx]["name"] = tc_delta.function.name
                        if tc_delta.function.arguments:
                            accumulated_tool_calls[idx]["arguments"] += tc_delta.function.arguments
        
        # Build fake response compatible with existing code
        tool_calls = []
        for idx in sorted(accumulated_tool_calls):
            tc = accumulated_tool_calls[idx]
            if tc["name"]:
                tool_calls.append(SimpleNamespace(
                    id=tc["id"], type="function",
                    function=SimpleNamespace(name=tc["name"], arguments=tc["arguments"]),
                ))
        
        return SimpleNamespace(
            choices=[SimpleNamespace(
                message=SimpleNamespace(
                    content="".join(accumulated_content) or "",
                    tool_calls=tool_calls or None,
                    role="assistant",
                ),
                finish_reason="tool_calls" if tool_calls else "stop",
            )],
            usage=final_usage,
            model=self.model,
        )
    
    except Exception as e:
        logger.debug("Streaming failed, falling back to non-streaming: %s", e)
        return self.client.chat.completions.create(**api_kwargs)

1c. Modify _run_codex_stream() for Responses API (~10 lines)

The method already iterates the stream. Add callback emission:

def _run_codex_stream(self, api_kwargs: dict):
    with self.client.responses.stream(**api_kwargs) as stream:
        for event in stream:
            # Emit text deltas if streaming callback is set
            if self.stream_callback and hasattr(event, 'type'):
                if event.type == 'response.output_text.delta':
                    try:
                        self.stream_callback(event.delta)
                    except Exception:
                        pass
        return stream.get_final_response()

1d. Modify _interruptible_api_call() (~5 lines)

Add the streaming branch:

def _call():
    try:
        if self.api_mode == "codex_responses":
            result["response"] = self._run_codex_stream(api_kwargs)
        elif self.stream_callback is not None:
            result["response"] = self._run_streaming_chat_completion(api_kwargs)
        else:
            result["response"] = self.client.chat.completions.create(**api_kwargs)
    except Exception as e:
        result["error"] = e

1e. Signal end-of-stream to consumers (~5 lines)

After the API call returns, signal the callback that streaming is done so consumers can finalize (remove cursor, close SSE, etc.):

# In run_conversation(), after _interruptible_api_call returns:
if self.stream_callback:
    try:
        self.stream_callback(None)  # None = end of stream signal
    except Exception:
        pass

Consumers check: if delta is None: finalize()

Tests for Phase 1: (~150 lines)

  • Test _run_streaming_chat_completion with mocked stream
  • Test fallback to non-streaming on error
  • Test tool_call accumulation during streaming
  • Test stream_callback receives correct deltas
  • Test None signal at end of stream
  • Test streaming disabled when callback is None

Phase 2: Gateway consumers (Telegram, Discord, etc.)

File: gateway/run.py

2a. Read streaming config (~15 lines)

In _run_agent(), before creating the AIAgent:

# Read streaming config
_streaming_enabled = False
try:
    # Check per-platform override first
    platform_key = source.platform.value if source.platform else ""
    _stream_cfg = {}  # loaded from config.yaml streaming section
    if _stream_cfg.get(platform_key) is not None:
        _streaming_enabled = bool(_stream_cfg[platform_key])
    else:
        _streaming_enabled = bool(_stream_cfg.get("enabled", False))
except Exception:
    pass
# Env var override
if os.getenv("HERMES_STREAMING_ENABLED", "").lower() in ("true", "1", "yes"):
    _streaming_enabled = True

2b. Set up queue + callback (~15 lines)

_stream_q = None
_stream_done = None
_stream_msg_id = [None]  # mutable ref for the async task

if _streaming_enabled:
    import queue as _q
    _stream_q = _q.Queue()
    _stream_done = threading.Event()
    
    def _on_token(delta):
        if delta is None:
            _stream_done.set()
        else:
            _stream_q.put(delta)

Pass stream_callback=_on_token to the AIAgent constructor.

2c. Telegram/Discord stream preview task (~50 lines)

async def stream_preview():
    """Progressively edit a message with streaming tokens."""
    if not _stream_q:
        return
    adapter = self.adapters.get(source.platform)
    if not adapter:
        return
    
    accumulated = []
    token_count = 0
    last_edit = 0.0
    MIN_TOKENS = 20          # Don't show until enough context
    EDIT_INTERVAL = 1.5      # Respect Telegram rate limits
    
    try:
        while not _stream_done.is_set():
            try:
                chunk = _stream_q.get(timeout=0.1)
                accumulated.append(chunk)
                token_count += 1
            except queue.Empty:
                continue
            
            now = time.monotonic()
            if token_count >= MIN_TOKENS and (now - last_edit) >= EDIT_INTERVAL:
                preview = "".join(accumulated) + " ▌"
                if _stream_msg_id[0] is None:
                    r = await adapter.send(
                        chat_id=source.chat_id,
                        content=preview,
                        metadata=_thread_metadata,
                    )
                    if r.success and r.message_id:
                        _stream_msg_id[0] = r.message_id
                else:
                    await adapter.edit_message(
                        chat_id=source.chat_id,
                        message_id=_stream_msg_id[0],
                        content=preview,
                    )
                last_edit = now
        
        # Drain remaining tokens
        while not _stream_q.empty():
            accumulated.append(_stream_q.get_nowait())
        
        # Final edit — remove cursor, show complete text
        if _stream_msg_id[0] and accumulated:
            await adapter.edit_message(
                chat_id=source.chat_id,
                message_id=_stream_msg_id[0],
                content="".join(accumulated),
            )
    
    except asyncio.CancelledError:
        # Clean up on cancel
        if _stream_msg_id[0] and accumulated:
            try:
                await adapter.edit_message(
                    chat_id=source.chat_id,
                    message_id=_stream_msg_id[0],
                    content="".join(accumulated),
                )
            except Exception:
                pass
    except Exception as e:
        logger.debug("stream_preview error: %s", e)

2d. Skip final send if already streamed (~10 lines)

In _process_message_background() (base.py), after getting the response, if streaming was active and _stream_msg_id[0] is set, the final response was already delivered via progressive edits. Skip the normal self.send() call to avoid duplicating the message.

This is the most delicate integration point — we need to communicate from the gateway's _run_agent back to the base adapter's response sender that the response was already delivered. Options:

  • Option A: Return a special marker in the result dict: result["_streamed_msg_id"] = _stream_msg_id[0] The base adapter checks this and skips send().

  • Option B: Edit the already-sent message with the final response (which may differ slightly from accumulated tokens due to think-block stripping, etc.) and don't send a new one.

  • Option C: The stream preview task handles the FULL final response (including any post-processing), and the handler returns None to skip the normal send path.

Recommended: Option A — cleanest separation. The result dict already carries metadata; adding one more field is low-risk.

Platform-specific considerations:

Platform Edit support Rate limits Streaming approach
Telegram edit_message_text ~20 edits/min Edit every 1.5s
Discord message.edit 5 edits/5s per message Edit every 1.2s
Slack chat.update Tier 3 (~50/min) Edit every 1.5s
WhatsApp no edit support N/A Skip streaming, use normal path
HomeAssistant no edit N/A Skip streaming
API Server SSE native No limit Real SSE events

WhatsApp and HomeAssistant fall back to non-streaming automatically because they don't support message editing.

Tests for Phase 2: (~100 lines)

  • Test stream_preview sends/edits correctly
  • Test skip-final-send when streaming delivered
  • Test WhatsApp/HA graceful fallback
  • Test streaming disabled per-platform config
  • Test thread_id metadata forwarded in stream messages

Phase 3: CLI streaming

File: cli.py

3a. Set up callback in the CLI chat loop (~20 lines)

In _chat_once() or wherever the agent is invoked:

if streaming_enabled:
    _stream_q = queue.Queue()
    _stream_done = threading.Event()
    
    def _cli_stream_callback(delta):
        if delta is None:
            _stream_done.set()
        else:
            _stream_q.put(delta)
    
    agent.stream_callback = _cli_stream_callback

3b. Token display thread/task (~30 lines)

Start a thread that reads the queue and prints tokens:

def _stream_display():
    """Print tokens to terminal as they arrive."""
    first_token = True
    while not _stream_done.is_set():
        try:
            delta = _stream_q.get(timeout=0.1)
        except queue.Empty:
            continue
        if first_token:
            # Print response box top border
            _cprint(f"\n{top}")
            first_token = False
        sys.stdout.write(delta)
        sys.stdout.flush()
    # Drain remaining
    while not _stream_q.empty():
        sys.stdout.write(_stream_q.get_nowait())
    sys.stdout.flush()
    # Print bottom border
    _cprint(f"\n\n{bot}")

Integration challenge: prompt_toolkit

The CLI uses prompt_toolkit which controls the terminal. Writing directly to stdout while prompt_toolkit is active can cause display corruption. The existing KawaiiSpinner already solves this by using prompt_toolkit's patch_stdout context. The streaming display would need to do the same.

Alternative: use _cprint() for each token chunk (routes through prompt_toolkit's renderer). But this might be slow for individual tokens.

Recommended approach: accumulate tokens in small batches (e.g., every 50ms) and _cprint() the batch. This balances display responsiveness with prompt_toolkit compatibility.

Tests for Phase 3: (~50 lines)

  • Test CLI streaming callback setup
  • Test response box borders with streaming
  • Test fallback when streaming disabled

Phase 4: API Server real streaming

File: gateway/platforms/api_server.py

Replace the pseudo-streaming _write_sse_chat_completion() with real token-by-token SSE when the agent supports it.

4a. Wire streaming callback for stream=true requests (~20 lines)

if stream:
    _stream_q = queue.Queue()
    
    def _api_stream_callback(delta):
        _stream_q.put(delta)  # None = done
    
    # Pass callback to _run_agent
    result, usage = await self._run_agent(
        ..., stream_callback=_api_stream_callback,
    )

4b. Real SSE writer (~40 lines)

async def _write_real_sse(self, request, completion_id, model, stream_q):
    response = web.StreamResponse(
        headers={"Content-Type": "text/event-stream", "Cache-Control": "no-cache"},
    )
    await response.prepare(request)
    
    # Role chunk
    await response.write(...)
    
    # Stream content chunks as they arrive
    while True:
        try:
            delta = await asyncio.get_event_loop().run_in_executor(
                None, lambda: stream_q.get(timeout=0.1)
            )
        except queue.Empty:
            continue
        
        if delta is None:  # End of stream
            break
        
        chunk = {"id": completion_id, "object": "chat.completion.chunk", ...
                 "choices": [{"delta": {"content": delta}, ...}]}
        await response.write(f"data: {json.dumps(chunk)}\n\n".encode())
    
    # Finish + [DONE]
    await response.write(...)
    await response.write(b"data: [DONE]\n\n")
    return response

Challenge: concurrent execution

The agent runs in a thread executor. SSE writing happens in the async event loop. The queue bridges them. But _run_agent() currently awaits the full result before returning. For real streaming, we need to start the agent in the background and stream tokens while it runs:

# Start agent in background
agent_task = asyncio.create_task(self._run_agent_async(...))

# Stream tokens while agent runs
await self._write_real_sse(request, ..., stream_q)

# Agent is done by now (stream_q received None)
result, usage = await agent_task

This requires splitting _run_agent into an async version that doesn't block waiting for the result, or running it in a separate task.

Responses API SSE format:

For /v1/responses with stream=true, the SSE events are different:

event: response.output_text.delta
data: {"type":"response.output_text.delta","delta":"Hello"}

event: response.completed  
data: {"type":"response.completed","response":{...}}

This needs a separate SSE writer that emits Responses API format events.

Tests for Phase 4: (~80 lines)

  • Test real SSE streaming with mocked agent
  • Test SSE event format (Chat Completions vs Responses)
  • Test client disconnect during streaming
  • Test fallback to pseudo-streaming when callback not available

Integration Issues & Edge Cases

1. Tool calls during streaming

When the model returns tool calls instead of text, no text tokens are emitted. The stream_callback is simply never called with text. After tools execute, the next API call may produce the final text response — streaming picks up again.

The stream preview task needs to handle this: if no tokens arrive during a tool-call round, don't send/edit any message. The tool progress messages continue working as before.

2. Duplicate messages

The biggest risk: the agent sends the final response normally (via the existing send path) AND the stream preview already showed it. The user sees the response twice.

Prevention: when streaming is active and tokens were delivered, the final response send must be suppressed. The result["_streamed_msg_id"] marker tells the base adapter to skip its normal send.

3. Response post-processing

The final response may differ from the accumulated streamed tokens:

  • Think block stripping (<think>...</think> removed)
  • Trailing whitespace cleanup
  • Tool result media tag appending

The stream preview shows raw tokens. The final edit should use the post-processed version. This means the final edit (removing the cursor) should use the post-processed final_response, not just the accumulated stream text.

4. Context compression during streaming

If the agent triggers context compression mid-conversation, the streaming tokens from BEFORE compression are from a different context than those after. This isn't a problem in practice — compression happens between API calls, not during streaming.

5. Interrupt during streaming

User sends a new message while streaming → interrupt. The stream is killed (HTTP connection closed), accumulated tokens are shown as-is (no cursor), and the interrupt message is processed normally. This is already handled by _interruptible_api_call closing the client.

6. Multi-model / fallback

If the primary model fails and the agent falls back to a different model, streaming state resets. The fallback call may or may not support streaming. The graceful fallback in _run_streaming_chat_completion handles this.

7. Rate limiting on edits

Telegram: ~20 edits/minute (~1 every 3 seconds to be safe) Discord: 5 edits per 5 seconds per message Slack: ~50 API calls/minute

The 1.5s edit interval is conservative enough for all platforms. If we get 429 rate limit errors on edits, just skip that edit cycle and try next time.


Files Changed Summary

File Phase Changes
run_agent.py 1 +stream_callback param, +_run_streaming_chat_completion(), modify _run_codex_stream(), modify _interruptible_api_call()
gateway/run.py 2 +streaming config reader, +queue/callback setup, +stream_preview task, +skip-final-send logic
gateway/platforms/base.py 2 +check for _streamed_msg_id in response handler
cli.py 3 +streaming setup, +token display, +response box integration
gateway/platforms/api_server.py 4 +real SSE writer, +streaming callback wiring
hermes_cli/config.py 1 +streaming config defaults
cli-config.yaml.example 1 +streaming section
tests/test_streaming.py 1-4 NEW — ~380 lines of tests

Total new code: ~500 lines across all phases Total test code: ~380 lines


Rollout Plan

  1. Phase 1 (core): Merge to main. Streaming disabled by default. Zero impact on existing behavior. Can be tested with env var.

  2. Phase 2 (gateway): Merge to main. Test on Telegram manually. Enable per-platform: streaming.telegram: true in config.

  3. Phase 3 (CLI): Merge to main. Test in terminal. Enable: streaming.cli: true or streaming.enabled: true.

  4. Phase 4 (API server): Merge to main. Test with Open WebUI. Auto-enabled when client sends stream: true.

Each phase is independently mergeable and testable. Streaming stays off by default throughout. Once all phases are stable, consider changing the default to enabled.


Config Reference (final state)

# config.yaml
streaming:
  enabled: false          # Master switch (default: off)
  cli: true               # Per-platform override
  telegram: true
  discord: true
  slack: true
  api_server: true        # API server always streams when client requests it
  edit_interval: 1.5      # Seconds between message edits (default: 1.5)
  min_tokens: 20          # Tokens before first display (default: 20)
# Environment variable override
HERMES_STREAMING_ENABLED=true