fix: guard all print() calls against OSError with _SafeWriter

When hermes-agent runs as a systemd service, Docker container, or
headless daemon, the stdout pipe can become unavailable (idle timeout,
buffer exhaustion, socket reset). Any print() call then raises
OSError: [Errno 5] Input/output error, crashing run_conversation()
and causing cron jobs to fail.

Rather than wrapping individual print() calls (68 in run_conversation
alone), this adds a transparent _SafeWriter wrapper installed once at
the start of run_conversation(). It delegates all writes to the real
stdout and silently catches OSError. Zero overhead on the happy path,
comprehensive coverage of all print calls including future ones.

Fixes #845

Co-authored-by: J0hnLawMississippi <J0hnLawMississippi@users.noreply.github.com>
This commit is contained in:
teknium1
2026-03-11 09:19:10 -07:00
parent b66c8b409c
commit a8409a161f
2 changed files with 130 additions and 0 deletions

View File

@@ -99,6 +99,51 @@ from agent.trajectory import (
)
class _SafeWriter:
"""Transparent stdout wrapper that catches OSError from broken pipes.
When hermes-agent runs as a systemd service, Docker container, or headless
daemon, the stdout pipe can become unavailable (idle timeout, buffer
exhaustion, socket reset). Any print() call then raises
``OSError: [Errno 5] Input/output error``, which can crash
run_conversation() — especially via double-fault when the except handler
also tries to print.
This wrapper delegates all writes to the underlying stream and silently
catches OSError. It is installed once at the start of run_conversation()
and is transparent when stdout is healthy (zero overhead on the happy path).
"""
__slots__ = ("_inner",)
def __init__(self, inner):
object.__setattr__(self, "_inner", inner)
def write(self, data):
try:
return self._inner.write(data)
except OSError:
return len(data) if isinstance(data, str) else 0
def flush(self):
try:
self._inner.flush()
except OSError:
pass
def fileno(self):
return self._inner.fileno()
def isatty(self):
try:
return self._inner.isatty()
except OSError:
return False
def __getattr__(self, name):
return getattr(self._inner, name)
class IterationBudget:
"""Thread-safe shared iteration counter for parent and child agents.
@@ -3157,6 +3202,11 @@ class AIAgent:
Returns:
Dict: Complete conversation result with final response and message history
"""
# Guard stdout against OSError from broken pipes (systemd/headless/daemon).
# Installed once, transparent when stdout is healthy, prevents crash on write.
if not isinstance(sys.stdout, _SafeWriter):
sys.stdout = _SafeWriter(sys.stdout)
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
effective_task_id = task_id or str(uuid.uuid4())

View File

@@ -1283,3 +1283,83 @@ class TestBudgetPressure:
messages[-1]["content"] = last_content + f"\n\n{warning}"
assert "plain text result" in messages[-1]["content"]
assert "BUDGET WARNING" in messages[-1]["content"]
class TestSafeWriter:
"""Verify _SafeWriter guards stdout against OSError (broken pipes)."""
def test_write_delegates_normally(self):
"""When stdout is healthy, _SafeWriter is transparent."""
from run_agent import _SafeWriter
from io import StringIO
inner = StringIO()
writer = _SafeWriter(inner)
writer.write("hello")
assert inner.getvalue() == "hello"
def test_write_catches_oserror(self):
"""OSError on write is silently caught, returns len(data)."""
from run_agent import _SafeWriter
from unittest.mock import MagicMock
inner = MagicMock()
inner.write.side_effect = OSError(5, "Input/output error")
writer = _SafeWriter(inner)
result = writer.write("hello")
assert result == 5 # len("hello")
def test_flush_catches_oserror(self):
"""OSError on flush is silently caught."""
from run_agent import _SafeWriter
from unittest.mock import MagicMock
inner = MagicMock()
inner.flush.side_effect = OSError(5, "Input/output error")
writer = _SafeWriter(inner)
writer.flush() # should not raise
def test_print_survives_broken_stdout(self, monkeypatch):
"""print() through _SafeWriter doesn't crash on broken pipe."""
import sys
from run_agent import _SafeWriter
from unittest.mock import MagicMock
broken = MagicMock()
broken.write.side_effect = OSError(5, "Input/output error")
original = sys.stdout
sys.stdout = _SafeWriter(broken)
try:
print("this should not crash") # would raise without _SafeWriter
finally:
sys.stdout = original
def test_installed_in_run_conversation(self, agent):
"""run_conversation installs _SafeWriter on sys.stdout."""
import sys
from run_agent import _SafeWriter
resp = _mock_response(content="Done", finish_reason="stop")
agent.client.chat.completions.create.return_value = resp
original = sys.stdout
try:
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
agent.run_conversation("test")
assert isinstance(sys.stdout, _SafeWriter)
finally:
sys.stdout = original
def test_double_wrap_prevented(self):
"""Wrapping an already-wrapped stream doesn't add layers."""
import sys
from run_agent import _SafeWriter
from io import StringIO
inner = StringIO()
wrapped = _SafeWriter(inner)
# isinstance check should prevent double-wrapping
assert isinstance(wrapped, _SafeWriter)
# The guard in run_conversation checks isinstance before wrapping
if not isinstance(wrapped, _SafeWriter):
wrapped = _SafeWriter(wrapped)
# Still just one layer
wrapped.write("test")
assert inner.getvalue() == "test"