Merge pull request #1288 from NousResearch/hermes/hermes-de3d4e49-pr976
fix: reliably notify gateway users when updates finish
This commit is contained in:
132
gateway/run.py
132
gateway/run.py
@@ -17,6 +17,7 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shlex
|
||||
import sys
|
||||
import signal
|
||||
import threading
|
||||
@@ -674,8 +675,17 @@ class GatewayRunner:
|
||||
except Exception as e:
|
||||
logger.warning("Channel directory build failed: %s", e)
|
||||
|
||||
# Check if we're restarting after a /update command
|
||||
await self._send_update_notification()
|
||||
# Check if we're restarting after a /update command. If the update is
|
||||
# still running, keep watching so we notify once it actually finishes.
|
||||
notified = await self._send_update_notification()
|
||||
if not notified and any(
|
||||
path.exists()
|
||||
for path in (
|
||||
_hermes_home / ".update_pending.json",
|
||||
_hermes_home / ".update_pending.claimed.json",
|
||||
)
|
||||
):
|
||||
self._schedule_update_notification_watch()
|
||||
|
||||
# Start background session expiry watcher for proactive memory flushing
|
||||
asyncio.create_task(self._session_expiry_watcher())
|
||||
@@ -2714,9 +2724,9 @@ class GatewayRunner:
|
||||
"""Handle /update command — update Hermes Agent to the latest version.
|
||||
|
||||
Spawns ``hermes update`` in a separate systemd scope so it survives the
|
||||
gateway restart that ``hermes update`` triggers at the end. A marker
|
||||
file is written so the *new* gateway process can notify the user of the
|
||||
result on startup.
|
||||
gateway restart that ``hermes update`` may trigger at the end. Marker
|
||||
files are written so either the current gateway process or the next one
|
||||
can notify the user when the update finishes.
|
||||
"""
|
||||
import json
|
||||
import shutil
|
||||
@@ -2733,9 +2743,9 @@ class GatewayRunner:
|
||||
if not hermes_bin:
|
||||
return "✗ `hermes` command not found on PATH."
|
||||
|
||||
# Write marker so the restarted gateway can notify this chat
|
||||
pending_path = _hermes_home / ".update_pending.json"
|
||||
output_path = _hermes_home / ".update_output.txt"
|
||||
exit_code_path = _hermes_home / ".update_exit_code"
|
||||
pending = {
|
||||
"platform": event.source.platform.value,
|
||||
"chat_id": event.source.chat_id,
|
||||
@@ -2743,10 +2753,14 @@ class GatewayRunner:
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
pending_path.write_text(json.dumps(pending))
|
||||
exit_code_path.unlink(missing_ok=True)
|
||||
|
||||
# Spawn `hermes update` in a separate cgroup so it survives gateway
|
||||
# restart. systemd-run --user --scope creates a transient scope unit.
|
||||
update_cmd = f"{hermes_bin} update > {output_path} 2>&1"
|
||||
# restart. systemd-run --user --scope creates a transient scope unit.
|
||||
update_cmd = (
|
||||
f"{shlex.quote(hermes_bin)} update > {shlex.quote(str(output_path))} 2>&1; "
|
||||
f"status=$?; printf '%s' \"$status\" > {shlex.quote(str(exit_code_path))}"
|
||||
)
|
||||
try:
|
||||
systemd_run = shutil.which("systemd-run")
|
||||
if systemd_run:
|
||||
@@ -2768,26 +2782,91 @@ class GatewayRunner:
|
||||
)
|
||||
except Exception as e:
|
||||
pending_path.unlink(missing_ok=True)
|
||||
exit_code_path.unlink(missing_ok=True)
|
||||
return f"✗ Failed to start update: {e}"
|
||||
|
||||
self._schedule_update_notification_watch()
|
||||
return "⚕ Starting Hermes update… I'll notify you when it's done."
|
||||
|
||||
async def _send_update_notification(self) -> None:
|
||||
"""If the gateway is starting after a ``/update``, notify the user."""
|
||||
def _schedule_update_notification_watch(self) -> None:
|
||||
"""Ensure a background task is watching for update completion."""
|
||||
existing_task = getattr(self, "_update_notification_task", None)
|
||||
if existing_task and not existing_task.done():
|
||||
return
|
||||
|
||||
try:
|
||||
self._update_notification_task = asyncio.create_task(
|
||||
self._watch_for_update_completion()
|
||||
)
|
||||
except RuntimeError:
|
||||
logger.debug("Skipping update notification watcher: no running event loop")
|
||||
|
||||
async def _watch_for_update_completion(
|
||||
self,
|
||||
poll_interval: float = 2.0,
|
||||
timeout: float = 1800.0,
|
||||
) -> None:
|
||||
"""Wait for ``hermes update`` to finish, then send its notification."""
|
||||
pending_path = _hermes_home / ".update_pending.json"
|
||||
claimed_path = _hermes_home / ".update_pending.claimed.json"
|
||||
exit_code_path = _hermes_home / ".update_exit_code"
|
||||
loop = asyncio.get_running_loop()
|
||||
deadline = loop.time() + timeout
|
||||
|
||||
while (pending_path.exists() or claimed_path.exists()) and loop.time() < deadline:
|
||||
if exit_code_path.exists():
|
||||
await self._send_update_notification()
|
||||
return
|
||||
await asyncio.sleep(poll_interval)
|
||||
|
||||
if (pending_path.exists() or claimed_path.exists()) and not exit_code_path.exists():
|
||||
logger.warning("Update watcher timed out waiting for completion marker")
|
||||
exit_code_path.write_text("124")
|
||||
await self._send_update_notification()
|
||||
|
||||
async def _send_update_notification(self) -> bool:
|
||||
"""If an update finished, notify the user.
|
||||
|
||||
Returns False when the update is still running so a caller can retry
|
||||
later. Returns True after a definitive send/skip decision.
|
||||
"""
|
||||
import json
|
||||
import re as _re
|
||||
|
||||
pending_path = _hermes_home / ".update_pending.json"
|
||||
claimed_path = _hermes_home / ".update_pending.claimed.json"
|
||||
output_path = _hermes_home / ".update_output.txt"
|
||||
exit_code_path = _hermes_home / ".update_exit_code"
|
||||
|
||||
if not pending_path.exists():
|
||||
return
|
||||
if not pending_path.exists() and not claimed_path.exists():
|
||||
return False
|
||||
|
||||
cleanup = True
|
||||
active_pending_path = claimed_path
|
||||
try:
|
||||
pending = json.loads(pending_path.read_text())
|
||||
if pending_path.exists():
|
||||
try:
|
||||
pending_path.replace(claimed_path)
|
||||
except FileNotFoundError:
|
||||
if not claimed_path.exists():
|
||||
return True
|
||||
elif not claimed_path.exists():
|
||||
return True
|
||||
|
||||
pending = json.loads(claimed_path.read_text())
|
||||
platform_str = pending.get("platform")
|
||||
chat_id = pending.get("chat_id")
|
||||
|
||||
if not exit_code_path.exists():
|
||||
logger.info("Update notification deferred: update still running")
|
||||
cleanup = False
|
||||
active_pending_path = pending_path
|
||||
claimed_path.replace(pending_path)
|
||||
return False
|
||||
|
||||
exit_code_raw = exit_code_path.read_text().strip() or "1"
|
||||
exit_code = int(exit_code_raw)
|
||||
|
||||
# Read the captured update output
|
||||
output = ""
|
||||
if output_path.exists():
|
||||
@@ -2801,19 +2880,34 @@ class GatewayRunner:
|
||||
# Strip ANSI escape codes for clean display
|
||||
output = _re.sub(r'\x1b\[[0-9;]*m', '', output).strip()
|
||||
if output:
|
||||
# Truncate if too long for a single message
|
||||
if len(output) > 3500:
|
||||
output = "…" + output[-3500:]
|
||||
msg = f"✅ Hermes update finished — gateway restarted.\n\n```\n{output}\n```"
|
||||
if exit_code == 0:
|
||||
msg = f"✅ Hermes update finished.\n\n```\n{output}\n```"
|
||||
else:
|
||||
msg = f"❌ Hermes update failed.\n\n```\n{output}\n```"
|
||||
else:
|
||||
msg = "✅ Hermes update finished — gateway restarted successfully."
|
||||
if exit_code == 0:
|
||||
msg = "✅ Hermes update finished successfully."
|
||||
else:
|
||||
msg = "❌ Hermes update failed. Check the gateway logs or run `hermes update` manually for details."
|
||||
await adapter.send(chat_id, msg)
|
||||
logger.info("Sent post-update notification to %s:%s", platform_str, chat_id)
|
||||
logger.info(
|
||||
"Sent post-update notification to %s:%s (exit=%s)",
|
||||
platform_str,
|
||||
chat_id,
|
||||
exit_code,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("Post-update notification failed: %s", e)
|
||||
finally:
|
||||
pending_path.unlink(missing_ok=True)
|
||||
output_path.unlink(missing_ok=True)
|
||||
if cleanup:
|
||||
active_pending_path.unlink(missing_ok=True)
|
||||
claimed_path.unlink(missing_ok=True)
|
||||
output_path.unlink(missing_ok=True)
|
||||
exit_code_path.unlink(missing_ok=True)
|
||||
|
||||
return True
|
||||
|
||||
def _set_session_env(self, context: SessionContext) -> None:
|
||||
"""Set environment variables for the current session."""
|
||||
|
||||
@@ -133,6 +133,7 @@ class TestHandleUpdateCommand:
|
||||
assert data["platform"] == "telegram"
|
||||
assert data["chat_id"] == "99999"
|
||||
assert "timestamp" in data
|
||||
assert not (hermes_home / ".update_exit_code").exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_spawns_systemd_run(self, tmp_path):
|
||||
@@ -160,6 +161,7 @@ class TestHandleUpdateCommand:
|
||||
call_args = mock_popen.call_args[0][0]
|
||||
assert call_args[0] == "/usr/bin/systemd-run"
|
||||
assert "--scope" in call_args
|
||||
assert ".update_exit_code" in call_args[-1]
|
||||
assert "Starting Hermes update" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -196,6 +198,7 @@ class TestHandleUpdateCommand:
|
||||
call_args = mock_popen.call_args[0][0]
|
||||
assert call_args[0] == "bash"
|
||||
assert "nohup" in call_args[2]
|
||||
assert ".update_exit_code" in call_args[2]
|
||||
assert "Starting Hermes update" in result
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -222,6 +225,7 @@ class TestHandleUpdateCommand:
|
||||
assert "Failed to start update" in result
|
||||
# Pending file should be cleaned up
|
||||
assert not (hermes_home / ".update_pending.json").exists()
|
||||
assert not (hermes_home / ".update_exit_code").exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_returns_user_friendly_message(self, tmp_path):
|
||||
@@ -266,6 +270,53 @@ class TestSendUpdateNotification:
|
||||
# Should not raise
|
||||
await runner._send_update_notification()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_defers_notification_while_update_still_running(self, tmp_path):
|
||||
"""Returns False and keeps marker files when the update has not exited yet."""
|
||||
runner = _make_runner()
|
||||
hermes_home = tmp_path / "hermes"
|
||||
hermes_home.mkdir()
|
||||
|
||||
pending_path = hermes_home / ".update_pending.json"
|
||||
pending_path.write_text(json.dumps({
|
||||
"platform": "telegram", "chat_id": "67890", "user_id": "12345",
|
||||
}))
|
||||
(hermes_home / ".update_output.txt").write_text("still running")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
|
||||
with patch("gateway.run._hermes_home", hermes_home):
|
||||
result = await runner._send_update_notification()
|
||||
|
||||
assert result is False
|
||||
mock_adapter.send.assert_not_called()
|
||||
assert pending_path.exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_recovers_from_claimed_pending_file(self, tmp_path):
|
||||
"""A claimed pending file from a crashed notifier is still deliverable."""
|
||||
runner = _make_runner()
|
||||
hermes_home = tmp_path / "hermes"
|
||||
hermes_home.mkdir()
|
||||
|
||||
claimed_path = hermes_home / ".update_pending.claimed.json"
|
||||
claimed_path.write_text(json.dumps({
|
||||
"platform": "telegram", "chat_id": "67890", "user_id": "12345",
|
||||
}))
|
||||
(hermes_home / ".update_output.txt").write_text("done")
|
||||
(hermes_home / ".update_exit_code").write_text("0")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
|
||||
with patch("gateway.run._hermes_home", hermes_home):
|
||||
result = await runner._send_update_notification()
|
||||
|
||||
assert result is True
|
||||
mock_adapter.send.assert_called_once()
|
||||
assert not claimed_path.exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sends_notification_with_output(self, tmp_path):
|
||||
"""Sends update output to the correct platform and chat."""
|
||||
@@ -284,6 +335,7 @@ class TestSendUpdateNotification:
|
||||
(hermes_home / ".update_output.txt").write_text(
|
||||
"→ Found 3 new commit(s)\n✓ Code updated!\n✓ Update complete!"
|
||||
)
|
||||
(hermes_home / ".update_exit_code").write_text("0")
|
||||
|
||||
# Mock the adapter
|
||||
mock_adapter = AsyncMock()
|
||||
@@ -310,6 +362,7 @@ class TestSendUpdateNotification:
|
||||
(hermes_home / ".update_output.txt").write_text(
|
||||
"\x1b[32m✓ Code updated!\x1b[0m\n\x1b[1mDone\x1b[0m"
|
||||
)
|
||||
(hermes_home / ".update_exit_code").write_text("0")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
@@ -331,6 +384,7 @@ class TestSendUpdateNotification:
|
||||
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222"}
|
||||
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
|
||||
(hermes_home / ".update_output.txt").write_text("x" * 5000)
|
||||
(hermes_home / ".update_exit_code").write_text("0")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
@@ -344,6 +398,29 @@ class TestSendUpdateNotification:
|
||||
# Total message should not be absurdly long
|
||||
assert len(sent_text) < 4500
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sends_failure_message_when_update_fails(self, tmp_path):
|
||||
"""Non-zero exit codes produce a failure notification with captured output."""
|
||||
runner = _make_runner()
|
||||
hermes_home = tmp_path / "hermes"
|
||||
hermes_home.mkdir()
|
||||
|
||||
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222"}
|
||||
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
|
||||
(hermes_home / ".update_output.txt").write_text("Traceback: boom")
|
||||
(hermes_home / ".update_exit_code").write_text("1")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
|
||||
with patch("gateway.run._hermes_home", hermes_home):
|
||||
result = await runner._send_update_notification()
|
||||
|
||||
assert result is True
|
||||
sent_text = mock_adapter.send.call_args[0][1]
|
||||
assert "update failed" in sent_text.lower()
|
||||
assert "Traceback: boom" in sent_text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sends_generic_message_when_no_output(self, tmp_path):
|
||||
"""Sends a success message even if the output file is missing."""
|
||||
@@ -354,6 +431,7 @@ class TestSendUpdateNotification:
|
||||
pending = {"platform": "telegram", "chat_id": "111", "user_id": "222"}
|
||||
(hermes_home / ".update_pending.json").write_text(json.dumps(pending))
|
||||
# No .update_output.txt created
|
||||
(hermes_home / ".update_exit_code").write_text("0")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
@@ -362,7 +440,7 @@ class TestSendUpdateNotification:
|
||||
await runner._send_update_notification()
|
||||
|
||||
sent_text = mock_adapter.send.call_args[0][1]
|
||||
assert "restarted successfully" in sent_text
|
||||
assert "finished successfully" in sent_text
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleans_up_files_after_notification(self, tmp_path):
|
||||
@@ -373,10 +451,12 @@ class TestSendUpdateNotification:
|
||||
|
||||
pending_path = hermes_home / ".update_pending.json"
|
||||
output_path = hermes_home / ".update_output.txt"
|
||||
exit_code_path = hermes_home / ".update_exit_code"
|
||||
pending_path.write_text(json.dumps({
|
||||
"platform": "telegram", "chat_id": "111", "user_id": "222",
|
||||
}))
|
||||
output_path.write_text("✓ Done")
|
||||
exit_code_path.write_text("0")
|
||||
|
||||
mock_adapter = AsyncMock()
|
||||
runner.adapters = {Platform.TELEGRAM: mock_adapter}
|
||||
@@ -386,6 +466,7 @@ class TestSendUpdateNotification:
|
||||
|
||||
assert not pending_path.exists()
|
||||
assert not output_path.exists()
|
||||
assert not exit_code_path.exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleans_up_on_error(self, tmp_path):
|
||||
@@ -396,10 +477,12 @@ class TestSendUpdateNotification:
|
||||
|
||||
pending_path = hermes_home / ".update_pending.json"
|
||||
output_path = hermes_home / ".update_output.txt"
|
||||
exit_code_path = hermes_home / ".update_exit_code"
|
||||
pending_path.write_text(json.dumps({
|
||||
"platform": "telegram", "chat_id": "111", "user_id": "222",
|
||||
}))
|
||||
output_path.write_text("✓ Done")
|
||||
exit_code_path.write_text("0")
|
||||
|
||||
# Adapter send raises
|
||||
mock_adapter = AsyncMock()
|
||||
@@ -412,6 +495,7 @@ class TestSendUpdateNotification:
|
||||
# Files should still be cleaned up (finally block)
|
||||
assert not pending_path.exists()
|
||||
assert not output_path.exists()
|
||||
assert not exit_code_path.exists()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_corrupt_pending_file(self, tmp_path):
|
||||
@@ -440,8 +524,10 @@ class TestSendUpdateNotification:
|
||||
pending = {"platform": "discord", "chat_id": "111", "user_id": "222"}
|
||||
pending_path = hermes_home / ".update_pending.json"
|
||||
output_path = hermes_home / ".update_output.txt"
|
||||
exit_code_path = hermes_home / ".update_exit_code"
|
||||
pending_path.write_text(json.dumps(pending))
|
||||
output_path.write_text("Done")
|
||||
exit_code_path.write_text("0")
|
||||
|
||||
# Only telegram adapter available, but pending says discord
|
||||
mock_adapter = AsyncMock()
|
||||
@@ -454,6 +540,7 @@ class TestSendUpdateNotification:
|
||||
mock_adapter.send.assert_not_called()
|
||||
# Files should still be cleaned up
|
||||
assert not pending_path.exists()
|
||||
assert not exit_code_path.exists()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user