[SYNC] Merge upstream NousResearch/hermes-agent — 499 commits #201

Merged
Rockachopa merged 500 commits from upstream-sync into main 2026-04-07 14:03:16 +00:00
6 changed files with 996 additions and 18 deletions
Showing only changes of commit 0c54da8aaf - Show all commits

View File

@@ -1932,6 +1932,37 @@ class DiscordAdapter(BasePlatformAdapter):
except Exception as e:
return SendResult(success=False, error=str(e))
async def send_update_prompt(
self, chat_id: str, prompt: str, default: str = "",
session_key: str = "",
) -> SendResult:
"""Send an interactive button-based update prompt (Yes / No).
Used by the gateway ``/update`` watcher when ``hermes update --gateway``
needs user input (stash restore, config migration).
"""
if not self._client or not DISCORD_AVAILABLE:
return SendResult(success=False, error="Not connected")
try:
channel = self._client.get_channel(int(chat_id))
if not channel:
channel = await self._client.fetch_channel(int(chat_id))
default_hint = f" (default: {default})" if default else ""
embed = discord.Embed(
title="⚕ Update Needs Your Input",
description=f"{prompt}{default_hint}",
color=discord.Color.gold(),
)
view = UpdatePromptView(
session_key=session_key,
allowed_user_ids=self._allowed_user_ids,
)
msg = await channel.send(embed=embed, view=view)
return SendResult(success=True, message_id=str(msg.id))
except Exception as e:
return SendResult(success=False, error=str(e))
def _get_parent_channel_id(self, channel: Any) -> Optional[str]:
"""Return the parent channel ID for a Discord thread-like channel, if present."""
parent = getattr(channel, "parent", None)
@@ -2344,3 +2375,82 @@ if DISCORD_AVAILABLE:
self.resolved = True
for child in self.children:
child.disabled = True
class UpdatePromptView(discord.ui.View):
"""Interactive Yes/No buttons for ``hermes update`` prompts.
Clicking a button writes the answer to ``.update_response`` so the
detached update process can pick it up. Only authorized users can
click. Times out after 5 minutes (the update process also has a
5-minute timeout on its side).
"""
def __init__(self, session_key: str, allowed_user_ids: set):
super().__init__(timeout=300)
self.session_key = session_key
self.allowed_user_ids = allowed_user_ids
self.resolved = False
def _check_auth(self, interaction: discord.Interaction) -> bool:
if not self.allowed_user_ids:
return True
return str(interaction.user.id) in self.allowed_user_ids
async def _respond(
self, interaction: discord.Interaction, answer: str,
color: discord.Color, label: str,
):
if self.resolved:
await interaction.response.send_message(
"Already answered~", ephemeral=True
)
return
if not self._check_auth(interaction):
await interaction.response.send_message(
"You're not authorized~", ephemeral=True
)
return
self.resolved = True
# Update embed
embed = interaction.message.embeds[0] if interaction.message.embeds else None
if embed:
embed.color = color
embed.set_footer(text=f"{label} by {interaction.user.display_name}")
for child in self.children:
child.disabled = True
await interaction.response.edit_message(embed=embed, view=self)
# Write response file
try:
from hermes_constants import get_hermes_home
home = get_hermes_home()
response_path = home / ".update_response"
tmp = response_path.with_suffix(".tmp")
tmp.write_text(answer)
tmp.replace(response_path)
logger.info(
"Discord update prompt answered '%s' by %s",
answer, interaction.user.display_name,
)
except Exception as exc:
logger.error("Failed to write update response: %s", exc)
@discord.ui.button(label="Yes", style=discord.ButtonStyle.green, emoji="")
async def yes_btn(
self, interaction: discord.Interaction, button: discord.ui.Button
):
await self._respond(interaction, "y", discord.Color.green(), "Yes")
@discord.ui.button(label="No", style=discord.ButtonStyle.red, emoji="")
async def no_btn(
self, interaction: discord.Interaction, button: discord.ui.Button
):
await self._respond(interaction, "n", discord.Color.red(), "No")
async def on_timeout(self):
self.resolved = True
for child in self.children:
child.disabled = True

View File

@@ -17,10 +17,11 @@ from typing import Dict, List, Optional, Any
logger = logging.getLogger(__name__)
try:
from telegram import Update, Bot, Message
from telegram import Update, Bot, Message, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
MessageHandler as TelegramMessageHandler,
ContextTypes,
filters,
@@ -33,8 +34,11 @@ except ImportError:
Update = Any
Bot = Any
Message = Any
InlineKeyboardButton = Any
InlineKeyboardMarkup = Any
Application = Any
CommandHandler = Any
CallbackQueryHandler = Any
TelegramMessageHandler = Any
HTTPXRequest = Any
filters = None
@@ -543,6 +547,8 @@ class TelegramAdapter(BasePlatformAdapter):
filters.PHOTO | filters.VIDEO | filters.AUDIO | filters.VOICE | filters.Document.ALL | filters.Sticker.ALL,
self._handle_media_message
))
# Handle inline keyboard button callbacks (update prompts)
self._app.add_handler(CallbackQueryHandler(self._handle_callback_query))
# Start polling — retry initialize() for transient TLS resets
try:
@@ -950,6 +956,72 @@ class TelegramAdapter(BasePlatformAdapter):
)
return SendResult(success=False, error=str(e))
async def send_update_prompt(
self, chat_id: str, prompt: str, default: str = "",
session_key: str = "",
) -> SendResult:
"""Send an inline-keyboard update prompt (Yes / No buttons).
Used by the gateway ``/update`` watcher when ``hermes update --gateway``
needs user input (stash restore, config migration).
"""
if not self._bot:
return SendResult(success=False, error="Not connected")
try:
default_hint = f" (default: {default})" if default else ""
text = f"⚕ *Update needs your input:*\n\n{prompt}{default_hint}"
keyboard = InlineKeyboardMarkup([
[
InlineKeyboardButton("✓ Yes", callback_data="update_prompt:y"),
InlineKeyboardButton("✗ No", callback_data="update_prompt:n"),
]
])
msg = await self._bot.send_message(
chat_id=int(chat_id),
text=text,
parse_mode=ParseMode.MARKDOWN,
reply_markup=keyboard,
)
return SendResult(success=True, message_id=str(msg.message_id))
except Exception as e:
logger.warning("[%s] send_update_prompt failed: %s", self.name, e)
return SendResult(success=False, error=str(e))
async def _handle_callback_query(
self, update: "Update", context: "ContextTypes.DEFAULT_TYPE"
) -> None:
"""Handle inline keyboard button clicks (update prompts)."""
query = update.callback_query
if not query or not query.data:
return
data = query.data
if not data.startswith("update_prompt:"):
return
answer = data.split(":", 1)[1] # "y" or "n"
await query.answer(text=f"Sent '{answer}' to the update process.")
# Edit the message to show the choice and remove buttons
label = "Yes" if answer == "y" else "No"
try:
await query.edit_message_text(
text=f"⚕ Update prompt answered: *{label}*",
parse_mode=ParseMode.MARKDOWN,
reply_markup=None,
)
except Exception:
pass # non-fatal if edit fails
# Write the response file
try:
from hermes_constants import get_hermes_home
home = get_hermes_home()
response_path = home / ".update_response"
tmp = response_path.with_suffix(".tmp")
tmp.write_text(answer)
tmp.replace(response_path)
logger.info("Telegram update prompt answered '%s' by user %s",
answer, getattr(query.from_user, "id", "unknown"))
except Exception as exc:
logger.error("Failed to write update response from callback: %s", exc)
async def send_voice(
self,
chat_id: str,

View File

@@ -517,6 +517,10 @@ class GatewayRunner:
# Key: Platform enum, Value: {"config": platform_config, "attempts": int, "next_retry": float}
self._failed_platforms: Dict[Platform, Dict[str, Any]] = {}
# Track pending /update prompt responses per session.
# Key: session_key, Value: True when a prompt is waiting for user input.
self._update_prompt_pending: Dict[str, bool] = {}
# Persistent Honcho managers keyed by gateway session key.
# This preserves write_frequency="session" semantics across short-lived
# per-message AIAgent instances.
@@ -1737,6 +1741,35 @@ class GatewayRunner:
self.pairing_store._record_rate_limit(platform_name, source.user_id)
return None
# Intercept messages that are responses to a pending /update prompt.
# The update process (detached) wrote .update_prompt.json; the watcher
# forwarded it to the user; now the user's reply goes back via
# .update_response so the update process can continue.
_quick_key = self._session_key_for_source(source)
_update_prompts = getattr(self, "_update_prompt_pending", {})
if _update_prompts.get(_quick_key):
raw = (event.text or "").strip()
# Accept /approve and /deny as shorthand for yes/no
cmd = event.get_command()
if cmd in ("approve", "yes"):
response_text = "y"
elif cmd in ("deny", "no"):
response_text = "n"
else:
response_text = raw
if response_text:
response_path = _hermes_home / ".update_response"
try:
tmp = response_path.with_suffix(".tmp")
tmp.write_text(response_text)
tmp.replace(response_path)
except OSError as e:
logger.warning("Failed to write update response: %s", e)
return f"✗ Failed to send response to update process: {e}"
_update_prompts.pop(_quick_key, None)
label = response_text if len(response_text) <= 20 else response_text[:20] + ""
return f"✓ Sent `{label}` to the update process."
# PRIORITY handling when an agent is already running for this session.
# Default behavior is to interrupt immediately so user text/stop messages
# are handled with minimal latency.
@@ -1744,7 +1777,6 @@ class GatewayRunner:
# Special case: Telegram/photo bursts often arrive as multiple near-
# simultaneous updates. Do NOT interrupt for photo-only follow-ups here;
# let the adapter-level batching/queueing logic absorb them.
_quick_key = self._session_key_for_source(source)
# Staleness eviction: if an entry has been in _running_agents for
# longer than the agent timeout, it's a leaked lock from a hung or
@@ -4929,6 +4961,15 @@ class GatewayRunner:
logger.info("User denied %d dangerous command(s) via /deny", count)
return f"❌ Command{'s' if count > 1 else ''} denied{count_msg}."
# Platforms where /update is allowed. ACP, API server, and webhooks are
# programmatic interfaces that should not trigger system updates.
_UPDATE_ALLOWED_PLATFORMS = frozenset({
Platform.TELEGRAM, Platform.DISCORD, Platform.SLACK, Platform.WHATSAPP,
Platform.SIGNAL, Platform.MATTERMOST, Platform.MATRIX,
Platform.HOMEASSISTANT, Platform.EMAIL, Platform.SMS, Platform.DINGTALK,
Platform.FEISHU, Platform.WECOM, Platform.LOCAL,
})
async def _handle_update_command(self, event: MessageEvent) -> str:
"""Handle /update command — update Hermes Agent to the latest version.
@@ -4943,6 +4984,11 @@ class GatewayRunner:
from datetime import datetime
from hermes_cli.config import is_managed, format_managed_message
# Block non-messaging platforms (API server, webhooks, ACP)
platform = event.source.platform
if platform not in self._UPDATE_ALLOWED_PLATFORMS:
return "✗ /update is only available from messaging platforms. Run `hermes update` from the terminal."
if is_managed():
return f"{format_managed_message('update Hermes Agent')}"
@@ -4964,10 +5010,12 @@ class GatewayRunner:
pending_path = _hermes_home / ".update_pending.json"
output_path = _hermes_home / ".update_output.txt"
exit_code_path = _hermes_home / ".update_exit_code"
session_key = self._session_key_for_source(event.source)
pending = {
"platform": event.source.platform.value,
"chat_id": event.source.chat_id,
"user_id": event.source.user_id,
"session_key": session_key,
"timestamp": datetime.now().isoformat(),
}
_tmp_pending = pending_path.with_suffix(".tmp")
@@ -4975,12 +5023,18 @@ class GatewayRunner:
_tmp_pending.replace(pending_path)
exit_code_path.unlink(missing_ok=True)
# Spawn `hermes update` detached so it survives gateway restart.
# Spawn `hermes update --gateway` detached so it survives gateway restart.
# --gateway enables file-based IPC for interactive prompts (stash
# restore, config migration) so the gateway can forward them to the
# user instead of silently skipping them.
# Use setsid for portable session detach (works under system services
# where systemd-run --user fails due to missing D-Bus session).
# PYTHONUNBUFFERED ensures output is flushed line-by-line so the
# gateway can stream it to the messenger in near-real-time.
hermes_cmd_str = " ".join(shlex.quote(part) for part in hermes_cmd)
update_cmd = (
f"{hermes_cmd_str} update > {shlex.quote(str(output_path))} 2>&1; "
f"PYTHONUNBUFFERED=1 {hermes_cmd_str} update --gateway"
f" > {shlex.quote(str(output_path))} 2>&1; "
f"status=$?; printf '%s' \"$status\" > {shlex.quote(str(exit_code_path))}"
)
try:
@@ -5007,7 +5061,7 @@ class GatewayRunner:
return f"✗ Failed to start update: {e}"
self._schedule_update_notification_watch()
return "⚕ Starting Hermes update… I'll notify you when it's done."
return "⚕ Starting Hermes update… I'll stream progress here."
def _schedule_update_notification_watch(self) -> None:
"""Ensure a background task is watching for update completion."""
@@ -5017,39 +5071,210 @@ class GatewayRunner:
try:
self._update_notification_task = asyncio.create_task(
self._watch_for_update_completion()
self._watch_update_progress()
)
except RuntimeError:
logger.debug("Skipping update notification watcher: no running event loop")
async def _watch_for_update_completion(
async def _watch_update_progress(
self,
poll_interval: float = 2.0,
stream_interval: float = 4.0,
timeout: float = 1800.0,
) -> None:
"""Wait for ``hermes update`` to finish, then send its notification."""
"""Watch ``hermes update --gateway``, streaming output + forwarding prompts.
Polls ``.update_output.txt`` for new content and sends chunks to the
user periodically. Detects ``.update_prompt.json`` (written by the
update process when it needs user input) and forwards the prompt to
the messenger. The user's next message is intercepted by
``_handle_message`` and written to ``.update_response``.
"""
import json
import re as _re
pending_path = _hermes_home / ".update_pending.json"
claimed_path = _hermes_home / ".update_pending.claimed.json"
output_path = _hermes_home / ".update_output.txt"
exit_code_path = _hermes_home / ".update_exit_code"
prompt_path = _hermes_home / ".update_prompt.json"
loop = asyncio.get_running_loop()
deadline = loop.time() + timeout
while (pending_path.exists() or claimed_path.exists()) and loop.time() < deadline:
if exit_code_path.exists():
# Resolve the adapter and chat_id for sending messages
adapter = None
chat_id = None
session_key = None
for path in (claimed_path, pending_path):
if path.exists():
try:
pending = json.loads(path.read_text())
platform_str = pending.get("platform")
chat_id = pending.get("chat_id")
session_key = pending.get("session_key")
if platform_str and chat_id:
platform = Platform(platform_str)
adapter = self.adapters.get(platform)
# Fallback session key if not stored (old pending files)
if not session_key:
session_key = f"{platform_str}:{chat_id}"
break
except Exception:
pass
if not adapter or not chat_id:
logger.warning("Update watcher: cannot resolve adapter/chat_id, falling back to completion-only")
# Fall back to old behavior: wait for exit code and send final notification
while (pending_path.exists() or claimed_path.exists()) and loop.time() < deadline:
if exit_code_path.exists():
await self._send_update_notification()
return
await asyncio.sleep(poll_interval)
if (pending_path.exists() or claimed_path.exists()) and not exit_code_path.exists():
exit_code_path.write_text("124")
await self._send_update_notification()
return
def _strip_ansi(text: str) -> str:
return _re.sub(r'\x1b\[[0-9;]*[A-Za-z]', '', text)
bytes_sent = 0
last_stream_time = loop.time()
buffer = ""
async def _flush_buffer() -> None:
"""Send buffered output to the user."""
nonlocal buffer, last_stream_time
if not buffer.strip():
buffer = ""
return
# Chunk to fit message limits (Telegram: 4096, others: generous)
clean = _strip_ansi(buffer).strip()
buffer = ""
last_stream_time = loop.time()
if not clean:
return
# Split into chunks if too long
max_chunk = 3500
chunks = [clean[i:i + max_chunk] for i in range(0, len(clean), max_chunk)]
for chunk in chunks:
try:
await adapter.send(chat_id, f"```\n{chunk}\n```")
except Exception as e:
logger.debug("Update stream send failed: %s", e)
while loop.time() < deadline:
# Check for completion
if exit_code_path.exists():
# Read any remaining output
if output_path.exists():
try:
content = output_path.read_text()
if len(content) > bytes_sent:
buffer += content[bytes_sent:]
bytes_sent = len(content)
except OSError:
pass
await _flush_buffer()
# Send final status
try:
exit_code_raw = exit_code_path.read_text().strip() or "1"
exit_code = int(exit_code_raw)
if exit_code == 0:
await adapter.send(chat_id, "✅ Hermes update finished.")
else:
await adapter.send(chat_id, "❌ Hermes update failed (exit code {}).".format(exit_code))
logger.info("Update finished (exit=%s), notified %s", exit_code, session_key)
except Exception as e:
logger.warning("Update final notification failed: %s", e)
# Cleanup
for p in (pending_path, claimed_path, output_path,
exit_code_path, prompt_path):
p.unlink(missing_ok=True)
(_hermes_home / ".update_response").unlink(missing_ok=True)
self._update_prompt_pending.pop(session_key, None)
return
# Check for new output
if output_path.exists():
try:
content = output_path.read_text()
if len(content) > bytes_sent:
buffer += content[bytes_sent:]
bytes_sent = len(content)
except OSError:
pass
# Flush buffer periodically
if buffer.strip() and (loop.time() - last_stream_time) >= stream_interval:
await _flush_buffer()
# Check for prompts
if prompt_path.exists() and session_key:
try:
prompt_data = json.loads(prompt_path.read_text())
prompt_text = prompt_data.get("prompt", "")
default = prompt_data.get("default", "")
if prompt_text:
# Flush any buffered output first so the user sees
# context before the prompt
await _flush_buffer()
# Try platform-native buttons first (Discord, Telegram)
sent_buttons = False
if getattr(type(adapter), "send_update_prompt", None) is not None:
try:
await adapter.send_update_prompt(
chat_id=chat_id,
prompt=prompt_text,
default=default,
session_key=session_key,
)
sent_buttons = True
except Exception as btn_err:
logger.debug("Button-based update prompt failed: %s", btn_err)
if not sent_buttons:
default_hint = f" (default: {default})" if default else ""
await adapter.send(
chat_id,
f"⚕ **Update needs your input:**\n\n"
f"{prompt_text}{default_hint}\n\n"
f"Reply `/approve` (yes) or `/deny` (no), "
f"or type your answer directly."
)
self._update_prompt_pending[session_key] = True
logger.info("Forwarded update prompt to %s: %s", session_key, prompt_text[:80])
except (json.JSONDecodeError, OSError) as e:
logger.debug("Failed to read update prompt: %s", e)
await asyncio.sleep(poll_interval)
if (pending_path.exists() or claimed_path.exists()) and not exit_code_path.exists():
logger.warning("Update watcher timed out waiting for completion marker")
# Timeout
if not exit_code_path.exists():
logger.warning("Update watcher timed out after %.0fs", timeout)
exit_code_path.write_text("124")
await self._send_update_notification()
await _flush_buffer()
try:
await adapter.send(chat_id, "❌ Hermes update timed out after 30 minutes.")
except Exception:
pass
for p in (pending_path, claimed_path, output_path,
exit_code_path, prompt_path):
p.unlink(missing_ok=True)
(_hermes_home / ".update_response").unlink(missing_ok=True)
self._update_prompt_pending.pop(session_key, None)
async def _send_update_notification(self) -> bool:
"""If an update finished, notify the user.
Returns False when the update is still running so a caller can retry
later. Returns True after a definitive send/skip decision.
This is the legacy notification path used when the streaming watcher
cannot resolve the adapter (e.g. after a gateway restart where the
platform hasn't reconnected yet).
"""
import json
import re as _re

View File

@@ -2554,6 +2554,57 @@ def _clear_bytecode_cache(root: Path) -> int:
return removed
def _gateway_prompt(prompt_text: str, default: str = "", timeout: float = 300.0) -> str:
"""File-based IPC prompt for gateway mode.
Writes a prompt marker file so the gateway can forward the question to the
user, then polls for a response file. Falls back to *default* on timeout.
Used by ``hermes update --gateway`` so interactive prompts (stash restore,
config migration) are forwarded to the messenger instead of being silently
skipped.
"""
import json as _json
import uuid as _uuid
from hermes_constants import get_hermes_home
home = get_hermes_home()
prompt_path = home / ".update_prompt.json"
response_path = home / ".update_response"
# Clean any stale response file
response_path.unlink(missing_ok=True)
payload = {
"prompt": prompt_text,
"default": default,
"id": str(_uuid.uuid4()),
}
tmp = prompt_path.with_suffix(".tmp")
tmp.write_text(_json.dumps(payload))
tmp.replace(prompt_path)
# Poll for response
import time as _time
deadline = _time.monotonic() + timeout
while _time.monotonic() < deadline:
if response_path.exists():
try:
answer = response_path.read_text().strip()
response_path.unlink(missing_ok=True)
prompt_path.unlink(missing_ok=True)
return answer if answer else default
except (OSError, ValueError):
pass
_time.sleep(0.5)
# Timeout — clean up and use default
prompt_path.unlink(missing_ok=True)
response_path.unlink(missing_ok=True)
print(f" (no response after {int(timeout)}s, using default: {default!r})")
return default
def _update_via_zip(args):
"""Update Hermes Agent by downloading a ZIP archive.
@@ -2747,6 +2798,7 @@ def _restore_stashed_changes(
cwd: Path,
stash_ref: str,
prompt_user: bool = False,
input_fn=None,
) -> bool:
if prompt_user:
print()
@@ -2754,7 +2806,10 @@ def _restore_stashed_changes(
print(" Restoring them may reapply local customizations onto the updated codebase.")
print(" Review the result afterward if Hermes behaves unexpectedly.")
print("Restore local changes now? [Y/n]")
response = input().strip().lower()
if input_fn is not None:
response = input_fn("Restore local changes now? [Y/n]", "y")
else:
response = input().strip().lower()
if response not in ("", "y", "yes"):
print("Skipped restoring local changes.")
print("Your changes are still preserved in git stash.")
@@ -3185,6 +3240,10 @@ def cmd_update(args):
if is_managed():
managed_error("update Hermes Agent")
return
gateway_mode = getattr(args, "gateway", False)
# In gateway mode, use file-based IPC for prompts instead of stdin
gw_input_fn = (lambda prompt, default="": _gateway_prompt(prompt, default)) if gateway_mode else None
print("⚕ Updating Hermes Agent...")
print()
@@ -3281,7 +3340,9 @@ def cmd_update(args):
else:
auto_stash_ref = _stash_local_changes_if_needed(git_cmd, PROJECT_ROOT)
prompt_for_restore = auto_stash_ref is not None and sys.stdin.isatty() and sys.stdout.isatty()
prompt_for_restore = auto_stash_ref is not None and (
gateway_mode or (sys.stdin.isatty() and sys.stdout.isatty())
)
# Check if there are updates
result = subprocess.run(
@@ -3300,6 +3361,7 @@ def cmd_update(args):
_restore_stashed_changes(
git_cmd, PROJECT_ROOT, auto_stash_ref,
prompt_user=prompt_for_restore,
input_fn=gw_input_fn,
)
if current_branch not in ("main", "HEAD"):
subprocess.run(
@@ -3351,6 +3413,7 @@ def cmd_update(args):
PROJECT_ROOT,
auto_stash_ref,
prompt_user=prompt_for_restore,
input_fn=gw_input_fn,
)
_invalidate_update_cache()
@@ -3490,7 +3553,11 @@ def cmd_update(args):
print(f" {len(missing_config)} new config option(s) available")
print()
if not (sys.stdin.isatty() and sys.stdout.isatty()):
if gateway_mode:
response = _gateway_prompt(
"Would you like to configure new options now? [Y/n]", "n"
).strip().lower()
elif not (sys.stdin.isatty() and sys.stdout.isatty()):
print(" Non-interactive session — skipping config migration prompt.")
print(" Run 'hermes config migrate' later to apply any new config/env options.")
response = "n"
@@ -3502,11 +3569,15 @@ def cmd_update(args):
if response in ('', 'y', 'yes'):
print()
results = migrate_config(interactive=True, quiet=False)
# In gateway mode, run auto-migrations only (no input() prompts
# for API keys which would hang the detached process).
results = migrate_config(interactive=not gateway_mode, quiet=False)
if results["env_added"] or results["config_added"]:
print()
print("✓ Configuration updated!")
if gateway_mode and missing_env:
print(" API keys require manual entry: hermes config migrate")
else:
print()
print("Skipped. Run 'hermes config migrate' later to configure.")
@@ -5247,6 +5318,10 @@ For more help on a command:
help="Update Hermes Agent to the latest version",
description="Pull the latest changes from git and reinstall dependencies"
)
update_parser.add_argument(
"--gateway", action="store_true", default=False,
help="Gateway mode: use file-based IPC for prompts instead of stdin (used internally by /update)"
)
update_parser.set_defaults(func=cmd_update)
# =========================================================================

View File

@@ -330,7 +330,7 @@ class TestHandleUpdateCommand:
patch("subprocess.Popen"):
result = await runner._handle_update_command(event)
assert "notify you when it's done" in result
assert "stream progress" in result
# ---------------------------------------------------------------------------

View File

@@ -0,0 +1,496 @@
"""Tests for /update live streaming, prompt forwarding, and gateway IPC.
Tests the new --gateway mode for hermes update, including:
- _gateway_prompt() file-based IPC
- _watch_update_progress() output streaming and prompt detection
- Message interception for update prompt responses
- _restore_stashed_changes() with input_fn parameter
"""
import json
import os
import time
import asyncio
from pathlib import Path
from unittest.mock import patch, MagicMock, AsyncMock
import pytest
from gateway.config import Platform
from gateway.platforms.base import MessageEvent
from gateway.session import SessionSource
def _make_event(text="/update", platform=Platform.TELEGRAM,
user_id="12345", chat_id="67890"):
"""Build a MessageEvent for testing."""
source = SessionSource(
platform=platform,
user_id=user_id,
chat_id=chat_id,
user_name="testuser",
)
return MessageEvent(text=text, source=source)
def _make_runner(hermes_home=None):
"""Create a bare GatewayRunner without calling __init__."""
from gateway.run import GatewayRunner
runner = object.__new__(GatewayRunner)
runner.adapters = {}
runner._voice_mode = {}
runner._update_prompt_pending = {}
runner._running_agents = {}
runner._running_agents_ts = {}
runner._pending_messages = {}
runner._pending_approvals = {}
runner._failed_platforms = {}
return runner
# ---------------------------------------------------------------------------
# _gateway_prompt (file-based IPC in main.py)
# ---------------------------------------------------------------------------
class TestGatewayPrompt:
"""Tests for _gateway_prompt() function."""
def test_writes_prompt_file_and_reads_response(self, tmp_path):
"""Writes .update_prompt.json, reads .update_response, returns answer."""
import threading
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
# Simulate the response arriving after a short delay
def write_response():
time.sleep(0.3)
(hermes_home / ".update_response").write_text("y")
thread = threading.Thread(target=write_response)
thread.start()
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from hermes_cli.main import _gateway_prompt
result = _gateway_prompt("Restore? [Y/n]", "y", timeout=5.0)
thread.join()
assert result == "y"
# Both files should be cleaned up
assert not (hermes_home / ".update_prompt.json").exists()
assert not (hermes_home / ".update_response").exists()
def test_prompt_file_content(self, tmp_path):
"""Verifies the prompt JSON structure."""
import threading
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
prompt_data = None
def capture_and_respond():
nonlocal prompt_data
prompt_path = hermes_home / ".update_prompt.json"
for _ in range(20):
if prompt_path.exists():
prompt_data = json.loads(prompt_path.read_text())
(hermes_home / ".update_response").write_text("n")
return
time.sleep(0.1)
thread = threading.Thread(target=capture_and_respond)
thread.start()
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from hermes_cli.main import _gateway_prompt
_gateway_prompt("Configure now? [Y/n]", "n", timeout=5.0)
thread.join()
assert prompt_data is not None
assert prompt_data["prompt"] == "Configure now? [Y/n]"
assert prompt_data["default"] == "n"
assert "id" in prompt_data
def test_timeout_returns_default(self, tmp_path):
"""Returns default when no response within timeout."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from hermes_cli.main import _gateway_prompt
result = _gateway_prompt("test?", "default_val", timeout=0.5)
assert result == "default_val"
def test_empty_response_returns_default(self, tmp_path):
"""Empty response file returns default."""
hermes_home = tmp_path / ".hermes"
hermes_home.mkdir()
(hermes_home / ".update_response").write_text("")
# Write prompt file so the function starts polling
with patch.dict(os.environ, {"HERMES_HOME": str(hermes_home)}):
from hermes_cli.main import _gateway_prompt
# Pre-create the response
result = _gateway_prompt("test?", "default_val", timeout=2.0)
assert result == "default_val"
# ---------------------------------------------------------------------------
# _restore_stashed_changes with input_fn
# ---------------------------------------------------------------------------
class TestRestoreStashWithInputFn:
"""Tests for _restore_stashed_changes with the input_fn parameter."""
def test_uses_input_fn_when_provided(self, tmp_path):
"""When input_fn is provided, it's called instead of input()."""
from hermes_cli.main import _restore_stashed_changes
captured_args = []
def fake_input_fn(prompt, default=""):
captured_args.append((prompt, default))
return "n"
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(
returncode=0, stdout="", stderr=""
)
result = _restore_stashed_changes(
["git"], tmp_path, "abc123",
prompt_user=True,
input_fn=fake_input_fn,
)
assert len(captured_args) == 1
assert "Restore" in captured_args[0][0]
assert result is False # user declined
def test_input_fn_yes_proceeds_with_restore(self, tmp_path):
"""When input_fn returns 'y', stash apply is attempted."""
from hermes_cli.main import _restore_stashed_changes
call_count = [0]
def fake_run(*args, **kwargs):
call_count[0] += 1
mock = MagicMock()
mock.returncode = 0
mock.stdout = ""
mock.stderr = ""
return mock
with patch("subprocess.run", side_effect=fake_run):
_restore_stashed_changes(
["git"], tmp_path, "abc123",
prompt_user=True,
input_fn=lambda p, d="": "y",
)
# Should have called git stash apply + git diff --name-only
assert call_count[0] >= 2
# ---------------------------------------------------------------------------
# Update command spawns --gateway flag
# ---------------------------------------------------------------------------
class TestUpdateCommandGatewayFlag:
"""Verify the gateway spawns hermes update --gateway."""
@pytest.mark.asyncio
async def test_spawns_with_gateway_flag(self, tmp_path):
"""The spawned update command includes --gateway and PYTHONUNBUFFERED."""
runner = _make_runner()
event = _make_event()
fake_root = tmp_path / "project"
fake_root.mkdir()
(fake_root / ".git").mkdir()
(fake_root / "gateway").mkdir()
(fake_root / "gateway" / "run.py").touch()
fake_file = str(fake_root / "gateway" / "run.py")
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
mock_popen = MagicMock()
with patch("gateway.run._hermes_home", hermes_home), \
patch("gateway.run.__file__", fake_file), \
patch("shutil.which", side_effect=lambda x: f"/usr/bin/{x}"), \
patch("subprocess.Popen", mock_popen):
result = await runner._handle_update_command(event)
# Check the bash command string contains --gateway and PYTHONUNBUFFERED
call_args = mock_popen.call_args[0][0]
cmd_string = call_args[-1] if isinstance(call_args, list) else str(call_args)
assert "--gateway" in cmd_string
assert "PYTHONUNBUFFERED" in cmd_string
assert "stream progress" in result
# ---------------------------------------------------------------------------
# _watch_update_progress — output streaming
# ---------------------------------------------------------------------------
class TestWatchUpdateProgress:
"""Tests for _watch_update_progress() streaming output."""
@pytest.mark.asyncio
async def test_streams_output_to_adapter(self, tmp_path):
"""New output is sent to the adapter periodically."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222",
"session_key": "agent:main:telegram:dm:111"}
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
# Write output
(hermes_home / ".update_output.txt").write_text("→ Fetching updates...\n")
mock_adapter = AsyncMock()
runner.adapters = {Platform.TELEGRAM: mock_adapter}
# Write exit code after a brief delay
async def write_exit_code():
await asyncio.sleep(0.3)
(hermes_home / ".update_output.txt").write_text(
"→ Fetching updates...\n✓ Code updated!\n"
)
(hermes_home / ".update_exit_code").write_text("0")
with patch("gateway.run._hermes_home", hermes_home):
task = asyncio.create_task(write_exit_code())
await runner._watch_update_progress(
poll_interval=0.1,
stream_interval=0.2,
timeout=5.0,
)
await task
# Should have sent at least the output and a success message
assert mock_adapter.send.call_count >= 1
all_sent = " ".join(str(c) for c in mock_adapter.send.call_args_list)
assert "update finished" in all_sent.lower()
@pytest.mark.asyncio
async def test_detects_and_forwards_prompt(self, tmp_path):
"""Detects .update_prompt.json and sends it to the user."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222",
"session_key": "agent:main:telegram:dm:111"}
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
(hermes_home / ".update_output.txt").write_text("output\n")
mock_adapter = AsyncMock()
runner.adapters = {Platform.TELEGRAM: mock_adapter}
# Write a prompt, then respond and finish
async def simulate_prompt_cycle():
await asyncio.sleep(0.3)
prompt = {"prompt": "Restore local changes? [Y/n]", "default": "y", "id": "test1"}
(hermes_home / ".update_prompt.json").write_text(json.dumps(prompt))
# Simulate user responding
await asyncio.sleep(0.5)
(hermes_home / ".update_response").write_text("y")
(hermes_home / ".update_prompt.json").unlink(missing_ok=True)
await asyncio.sleep(0.3)
(hermes_home / ".update_exit_code").write_text("0")
with patch("gateway.run._hermes_home", hermes_home):
task = asyncio.create_task(simulate_prompt_cycle())
await runner._watch_update_progress(
poll_interval=0.1,
stream_interval=0.2,
timeout=10.0,
)
await task
# Check that the prompt was forwarded
all_sent = [str(c) for c in mock_adapter.send.call_args_list]
prompt_found = any("Restore local changes" in s for s in all_sent)
assert prompt_found, f"Prompt not forwarded. Sent: {all_sent}"
# Check session was marked as having pending prompt
# (may be cleared by the time we check since update finished)
@pytest.mark.asyncio
async def test_cleans_up_on_completion(self, tmp_path):
"""All marker files are cleaned up when update finishes."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222",
"session_key": "agent:main:telegram:dm:111"}
pending_path = hermes_home / ".update_pending.json"
output_path = hermes_home / ".update_output.txt"
exit_code_path = hermes_home / ".update_exit_code"
pending_path.write_text(json.dumps(pending))
output_path.write_text("done\n")
exit_code_path.write_text("0")
mock_adapter = AsyncMock()
runner.adapters = {Platform.TELEGRAM: mock_adapter}
with patch("gateway.run._hermes_home", hermes_home):
await runner._watch_update_progress(
poll_interval=0.1,
stream_interval=0.2,
timeout=5.0,
)
assert not pending_path.exists()
assert not output_path.exists()
assert not exit_code_path.exists()
@pytest.mark.asyncio
async def test_failure_exit_code(self, tmp_path):
"""Non-zero exit code sends failure message."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222",
"session_key": "agent:main:telegram:dm:111"}
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
(hermes_home / ".update_output.txt").write_text("error occurred\n")
(hermes_home / ".update_exit_code").write_text("1")
mock_adapter = AsyncMock()
runner.adapters = {Platform.TELEGRAM: mock_adapter}
with patch("gateway.run._hermes_home", hermes_home):
await runner._watch_update_progress(
poll_interval=0.1,
stream_interval=0.2,
timeout=5.0,
)
all_sent = " ".join(str(c) for c in mock_adapter.send.call_args_list)
assert "failed" in all_sent.lower()
@pytest.mark.asyncio
async def test_falls_back_when_adapter_unavailable(self, tmp_path):
"""Falls back to legacy notification when adapter can't be resolved."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
# Platform doesn't match any adapter
pending = {"platform": "discord", "chat_id": "111", "user_id": "222"}
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
(hermes_home / ".update_output.txt").write_text("done\n")
(hermes_home / ".update_exit_code").write_text("0")
# Only telegram adapter available
mock_adapter = AsyncMock()
runner.adapters = {Platform.TELEGRAM: mock_adapter}
with patch("gateway.run._hermes_home", hermes_home):
await runner._watch_update_progress(
poll_interval=0.1,
stream_interval=0.2,
timeout=5.0,
)
# Should not crash; legacy notification handles this case
# ---------------------------------------------------------------------------
# Message interception for update prompts
# ---------------------------------------------------------------------------
class TestUpdatePromptInterception:
"""Tests for update prompt response interception in _handle_message."""
@pytest.mark.asyncio
async def test_intercepts_response_when_prompt_pending(self, tmp_path):
"""When _update_prompt_pending is set, the next message writes .update_response."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
event = _make_event(text="y", chat_id="67890")
# The session key uses the full format from build_session_key
session_key = "agent:main:telegram:dm:67890"
runner._update_prompt_pending[session_key] = True
# Mock authorization and _session_key_for_source
runner._is_user_authorized = MagicMock(return_value=True)
runner._session_key_for_source = MagicMock(return_value=session_key)
with patch("gateway.run._hermes_home", hermes_home):
result = await runner._handle_message(event)
assert result is not None
assert "Sent" in result
response_path = hermes_home / ".update_response"
assert response_path.exists()
assert response_path.read_text() == "y"
# Should clear the pending flag
assert session_key not in runner._update_prompt_pending
@pytest.mark.asyncio
async def test_normal_message_when_no_prompt_pending(self, tmp_path):
"""Messages pass through normally when no prompt is pending."""
runner = _make_runner()
hermes_home = tmp_path / "hermes"
hermes_home.mkdir()
event = _make_event(text="hello", chat_id="67890")
# No pending prompt
runner._is_user_authorized = MagicMock(return_value=True)
# The message should flow through to normal processing;
# we just verify it doesn't get intercepted
session_key = "agent:main:telegram:dm:67890"
assert session_key not in runner._update_prompt_pending
# ---------------------------------------------------------------------------
# cmd_update --gateway flag
# ---------------------------------------------------------------------------
class TestCmdUpdateGatewayMode:
"""Tests for cmd_update with --gateway flag."""
def test_gateway_flag_enables_gateway_prompt_for_stash(self, tmp_path):
"""With --gateway, stash restore uses _gateway_prompt instead of input()."""
from hermes_cli.main import _restore_stashed_changes
# Use input_fn to verify the gateway path is taken
calls = []
def fake_input(prompt, default=""):
calls.append(prompt)
return "n"
with patch("subprocess.run") as mock_run:
mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="")
_restore_stashed_changes(
["git"], tmp_path, "abc123",
prompt_user=True,
input_fn=fake_input,
)
assert len(calls) == 1
assert "Restore" in calls[0]
def test_gateway_flag_parsed(self):
"""The --gateway flag is accepted by the update subparser."""
# Verify the argparse parser accepts --gateway by checking cmd_update
# receives gateway=True when the flag is set
from types import SimpleNamespace
args = SimpleNamespace(gateway=True)
assert args.gateway is True