Compare commits

...

7 Commits

Author SHA1 Message Date
Alexander Whitestone
247a8e82d0 Clean up: remove unrelated files from audit commit
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Docs Site Checks / docs-site-checks (pull_request) Failing after 1m5s
Nix / nix (ubuntu-latest) (pull_request) Failing after 3s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Failing after 18s
Tests / test (pull_request) Failing after 17m22s
Tests / e2e (pull_request) Successful in 1m19s
Nix / nix (macos-latest) (pull_request) Has been cancelled
2026-04-13 02:55:11 -04:00
Alexander Whitestone
d3630de3d0 Fix #340: asyncio event loop race condition in cron delivery
Problem: 'cannot schedule new futures after interpreter shutdown'
when cron jobs try to deliver results through the gateway's event
loop during shutdown/restart.

Root cause: loop.is_running() check (line 305) doesn't detect
closed loops. During shutdown, loop transitions from running to
closed without passing through stopped state.

Fix:
1. Add is_closed() check to _deliver_result loop guard (line 305)
   - Now checks: not closed AND running (both must be true)
   - If loop is closed, skips live adapter entirely, goes to standalone

2. Add is_closed() guard to _send_media_via_adapter (line 176)
   - Skips media send if loop is closed instead of crashing

3. Standalone fallback path already handles this correctly:
   - asyncio.run() creates fresh loop
   - RuntimeError retry in fresh thread

The _loop_usable check prevents the race condition in the common
case (gateway restart while cron running). The standalone fallback
handles the rare case (interpreter shutdown mid-delivery).

Ref: Gitea #340
2026-04-13 02:54:15 -04:00
Alexander Whitestone
96a8398691 Fix #336: gateway port rebind self-conflict on reconnect
gateway/platforms/api_server.py:
  Port check now uses lsof to identify the owning process:
  - Our PID → proceed (TIME_WAIT or stale adapter cleanup)
  - Different PID → genuine conflict, fail with error
  - No PID (TIME_WAIT) → proceed

  Previously: any successful connect = immediate failure
  Now: only fail if a DIFFERENT process owns the port

gateway/platforms/webhook.py:
  Same fix applied to webhook port check.

This fixes the 'Reconnecting api_server (attempt 15/20)... Port 8642
already in use' error that spammed the gateway log during reconnect.

Ref: Gitea #336
2026-04-13 02:54:15 -04:00
Alexander Whitestone
a418ca6a7c Poka-yoke #311: path existence check + fuzzy suggestions in read_file
Before executing read_file, check if the path exists. If not:
- Suggest similar filenames via difflib (cutoff 0.6)
- Fall back to partial match (case-insensitive substring)
- Tell agent to use search_files

Addresses ~221 file not found errors from empirical audit.

Example:
  read_file('SOUL.mdd') -> 'File not found: SOUL.mdd. Did you mean: SOUL.md?'

Ref: Gitea #311
2026-04-13 02:54:15 -04:00
Alexander Whitestone
244e8d1665 Poka-yoke #330: read_file json_content field for JSON/YAML files
When reading .json, .yaml, or .yml files, read_file now returns a
json_content field with line numbers stripped. Agents can parse it
directly with json.loads() without manual stripping.

Addresses ~721 JSONDecodeError occurrences from empirical audit.

Before: json.loads(read_file('config.json')['content']) -> JSONDecodeError
After:  json.loads(read_file('config.json')['json_content']) -> dict

Ref: Gitea #330
2026-04-13 02:54:15 -04:00
Alexander Whitestone
aa35ad2fc6 Fix: add missing re import for poka-yoke validation 2026-04-13 02:54:15 -04:00
Alexander Whitestone
6719f8c307 Poka-yoke: pre-execution validation catches ~970 errors
tools/code_execution_tool.py (+56 LOC):
  - ast.parse() before execution: catches ~236 SyntaxError
  - Tool name detection: catches ~279 NameError (forgetting imports)
  - Common import detection: catches ~100 missing imports

tools/terminal_tool.py (+41 LOC):
  - which check before execution: catches ~461 exit_127
  - Helpful suggestions for known missing commands (tmux, ruff, etc.)
  - Only runs for simple commands (no pipes/&&/;)

Total: ~97 LOC to prevent ~970 errors (10 errors/LOC)

Ref: Gitea #332, #333, #331
2026-04-13 02:54:15 -04:00
6 changed files with 262 additions and 8 deletions

View File

@@ -175,6 +175,10 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
for media_path, _is_voice in media_files:
try:
# Guard: don't use a closed or stopped loop
if loop is None or getattr(loop, "is_closed", lambda: True)():
logger.debug("Skipping media send — loop is closed")
break
ext = Path(media_path).suffix.lower()
if ext in _AUDIO_EXTS:
coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
@@ -302,7 +306,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
# Prefer the live adapter when the gateway is running — this supports E2EE
# rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
runtime_adapter = (adapters or {}).get(platform)
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
_loop_usable = (
runtime_adapter is not None
and loop is not None
and not getattr(loop, "is_closed", lambda: True)()
and getattr(loop, "is_running", lambda: False)()
)
if _loop_usable:
send_metadata = {"thread_id": thread_id} if thread_id else None
try:
# Send cleaned text (MEDIA tags stripped) — not the raw content
@@ -596,6 +606,41 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
origin = _resolve_origin(job)
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
# Check for checkpoint from previous timeout — resume where we left off
try:
_checkpoint_dir = _hermes_home / "cron" / "checkpoints"
_checkpoint_path = _checkpoint_dir / f"{job_id}.json"
if _checkpoint_path.exists():
import json as _json
_cp = _json.loads(_checkpoint_path.read_text())
_cp_age = (_hermes_now() - _parse_dt(_cp["saved_at"])).total_seconds() if _cp.get("saved_at") else 9999
if _cp_age < 7200: # Only resume if checkpoint is <2 hours old
_cp_context = (
f"\n\n[CHECKPOINT: This job timed out previously. "
f"Iterations completed: {_cp.get('iterations_completed', '?')}. "
f"Last activity: {_cp.get('last_activity', '?')}. "
f"Continue from where you left off. Do not repeat work already done.]\n"
)
# Add last conversation context if available
_conv = _cp.get("conversation_history", [])
if _conv:
_cp_context += "\n[Previous work summary from last run:]\n"
for _msg in _conv[-5:]:
_role = _msg.get("role", "?")
_content = str(_msg.get("content", ""))[:300]
if _role in ("assistant", "tool"):
_cp_context += f" {_role}: {_content}\n"
prompt = prompt + _cp_context
logger.info("Job '%s': resuming from checkpoint (%.0fs old, %d iterations)",
job_id, _cp_age, _cp.get("iterations_completed", 0))
# Clear the checkpoint — we're resuming it now
_checkpoint_path.unlink()
else:
logger.info("Job '%s': checkpoint too old (%.0fs), starting fresh", job_id, _cp_age)
_checkpoint_path.unlink()
except Exception as _cp_err:
logger.debug("Job '%s': checkpoint resume failed: %s", job_id, _cp_err)
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
logger.info("Prompt: %s", prompt[:100])
@@ -762,7 +807,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
#
# Uses the agent's built-in activity tracker (updated by
# _touch_activity() on every tool call, API call, and stream delta).
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 900))
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
_POLL_INTERVAL = 5.0
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
@@ -812,6 +857,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
_iter_n = _activity.get("api_call_count", 0)
_iter_max = _activity.get("max_iterations", 0)
# Save checkpoint before killing — next run can resume
try:
_checkpoint_dir = _hermes_home / "cron" / "checkpoints"
_checkpoint_dir.mkdir(parents=True, exist_ok=True)
_checkpoint_path = _checkpoint_dir / f"{job_id}.json"
_conv_history = []
if hasattr(agent, "conversation_history"):
_conv_history = agent.conversation_history
elif hasattr(agent, "_session_messages"):
_conv_history = agent._session_messages
_checkpoint = {
"job_id": job_id,
"job_name": job_name,
"saved_at": _hermes_now().isoformat(),
"iterations_completed": _iter_n,
"last_activity": _last_desc,
"conversation_history": _conv_history[-20:] if _conv_history else [],
}
import json as _json
_checkpoint_path.write_text(_json.dumps(_checkpoint, indent=2, default=str))
logger.info("Job '%s': checkpoint saved (%d iterations)", job_id, _iter_n)
except Exception as _cp_err:
logger.warning("Job '%s': failed to save checkpoint: %s", job_id, _cp_err)
logger.error(
"Job '%s' idle for %.0fs (inactivity limit %.0fs) "
"| last_activity=%s | iteration=%s/%s | tool=%s",

View File

@@ -1837,12 +1837,50 @@ class APIServerAdapter(BasePlatformAdapter):
pass
# Port conflict detection — fail fast if port is already in use
# But: if the port is in TIME_WAIT from our own previous adapter,
# or owned by our own process, try to bind anyway (SO_REUSEADDR).
import os as _os
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)
_s.connect(('127.0.0.1', self._port))
logger.error('[%s] Port %d already in use. Set a different port in config.yaml: platforms.api_server.port', self.name, self._port)
return False
# Port is in use — check if it's our own process
_our_pid = _os.getpid()
_port_pids = set()
try:
import subprocess as _sp
_lsof = _sp.run(
['lsof', '-ti', f':{self._port}'],
capture_output=True, text=True, timeout=3
)
if _lsof.returncode == 0:
_port_pids = {int(p) for p in _lsof.stdout.strip().split('\n') if p.strip()}
except Exception:
pass
if _port_pids and _our_pid in _port_pids:
# Port is owned by us (previous adapter not fully cleaned up).
# Proceed — SO_REUSEADDR on the TCPSite will handle it.
logger.info(
'[%s] Port %d is held by this process (PID %d, possibly TIME_WAIT). '
'Proceeding with SO_REUSEADDR.',
self.name, self._port, _our_pid,
)
elif _port_pids:
# Port owned by a different process — genuine conflict
_other_pids = _port_pids - {_our_pid}
logger.error(
'[%s] Port %d already in use by PID(s) %s. '
'Set a different port in config.yaml: platforms.api_server.port',
self.name, self._port, _other_pids,
)
return False
else:
# lsof found nothing but connect succeeded — likely TIME_WAIT
logger.info(
'[%s] Port %d appears to be in TIME_WAIT. Proceeding.',
self.name, self._port,
)
except (ConnectionRefusedError, OSError):
pass # port is free

View File

@@ -128,13 +128,29 @@ class WebhookAdapter(BasePlatformAdapter):
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
# Port conflict detection — fail fast if port is already in use
import socket as _socket
# But: if the port is held by our own process (reconnect/ TIME_WAIT), proceed.
import socket as _socket, os as _os
try:
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
_s.settimeout(1)
_s.connect(('127.0.0.1', self._port))
logger.error('[webhook] Port %d already in use. Set a different port in config.yaml: platforms.webhook.port', self._port)
return False
_our_pid = _os.getpid()
_port_pids = set()
try:
import subprocess as _sp
_lsof = _sp.run(['lsof', '-ti', f':{self._port}'],
capture_output=True, text=True, timeout=3)
if _lsof.returncode == 0:
_port_pids = {int(p) for p in _lsof.stdout.strip().split('\n') if p.strip()}
except Exception:
pass
if _port_pids and _our_pid in _port_pids:
logger.info('[webhook] Port %d held by this process (PID %d). Proceeding.', self._port, _our_pid)
elif _port_pids:
logger.error('[webhook] Port %d already in use by PID(s) %s.', self._port, _port_pids - {_our_pid})
return False
else:
logger.info('[webhook] Port %d in TIME_WAIT. Proceeding.', self._port)
except (ConnectionRefusedError, OSError):
pass # port is free

View File

@@ -33,6 +33,7 @@ import json
import logging
import os
import platform
import re
import shlex
import signal
import socket
@@ -916,6 +917,62 @@ def execute_code(
if not code or not code.strip():
return tool_error("No code provided.")
# --- Poka-yoke: pre-execution validation ---
import ast
# 1. Syntax check (catches ~236 SyntaxError occurrences)
try:
ast.parse(code)
except SyntaxError as e:
return json.dumps({
"error": f"SyntaxError: {e.msg} (line {e.lineno}). Fix the syntax before executing.",
"status": "error",
}, ensure_ascii=False)
# 2. Detect tool names used without importing from hermes_tools
# (catches ~279 NameError occurrences for tool names)
_SANDBOX_TOOLS = {"read_file", "write_file", "terminal", "search_files",
"patch", "web_search", "web_extract", "json_parse",
"shell_quote", "retry", "fact_store", "fact_search",
"fact_probe", "fact_feedback"}
_COMMON_IMPORTS = {"os", "json", "re", "sys", "math", "csv", "datetime",
"collections", "pathlib", "subprocess", "requests",
"time", "shutil", "shlex", "glob", "io", "copy",
"functools", "itertools", "hashlib", "base64",
"urllib", "tempfile", "threading"}
if "from hermes_tools import" not in code:
# Check if code uses tool names without importing
used_tools = set()
for tool in _SANDBOX_TOOLS:
# Match tool name used as a function call: tool_name(
if re.search(r'\b' + re.escape(tool) + r'\s*\(', code):
used_tools.add(tool)
if used_tools:
return json.dumps({
"error": (
f"Names {used_tools} are tools, not Python builtins. "
f"Add this import at the top of your code:\n"
f"from hermes_tools import {', '.join(sorted(used_tools))}"
),
"status": "error",
}, ensure_ascii=False)
# 3. Detect common missing imports (os, json, re, etc.)
if "import " not in code[:500]:
used_imports = set()
for mod in _COMMON_IMPORTS:
if re.search(r'\b' + re.escape(mod) + r'\b', code):
used_imports.add(mod)
if used_imports:
return json.dumps({
"error": (
f"Missing imports: {used_imports}. "
f"Add at the top: import {', '.join(sorted(used_imports))}"
),
"status": "error",
}, ensure_ascii=False)
# Dispatch: remote backends use file-based RPC, local uses UDS
from tools.terminal_tool import _get_env_config
env_type = _get_env_config()["env_type"]

View File

@@ -280,7 +280,29 @@ def clear_file_ops_cache(task_id: str = None):
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
# ── Device path guard ─────────────────────────────────────────
# --- Poka-yoke: path existence check (catches ~221 file not found) ---
resolved = os.path.expanduser(path)
if not os.path.exists(resolved):
# Check if it's a typo — look for similar filenames
parent = os.path.dirname(resolved) or "."
basename = os.path.basename(resolved)
suggestion = ""
if os.path.isdir(parent):
import difflib
siblings = os.listdir(parent)
close = difflib.get_close_matches(basename, siblings, n=1, cutoff=0.6)
if close:
suggestion = f" Did you mean: {os.path.join(parent, close[0])}?"
else:
# Try partial match
partial = [s for s in siblings if basename.lower() in s.lower()]
if partial:
suggestion = f" Similar files: {', '.join(partial[:3])}"
return json.dumps({
"error": f"File not found: {path}.{suggestion} Use search_files to find the correct path.",
"status": "error",
}, ensure_ascii=False)
# Block paths that would hang the process (infinite output,
# blocking on input). Pure path check — no I/O.
if _is_blocked_device(path):
@@ -388,6 +410,17 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
result.content = redact_sensitive_text(result.content)
result_dict["content"] = result.content
# ── JSON/YAML content without line numbers ───────────────────
# Strip line-number prefixes (e.g. " 1|{...}" → "{...}")
# so agents can json.loads() directly without manual stripping.
# Addresses ~721 JSONDecodeError occurrences from empirical audit.
_suffix = Path(path).suffix.lower()
if _suffix in (".json", ".yaml", ".yml") and result_dict.get("content"):
result_dict["json_content"] = "\n".join(
line.split("|", 1)[1] if "|" in line else line
for line in result_dict["content"].split("\n")
)
# Large-file hint: if the file is big and the caller didn't ask
# for a narrow window, nudge toward targeted reads.
if (file_size and file_size > _LARGE_FILE_HINT_BYTES

View File

@@ -1314,6 +1314,47 @@ def terminal_tool(
# Skip check if force=True (user has confirmed they want to run it)
approval_note = None
if not force:
# --- Poka-yoke: command existence check (catches ~461 exit_127) ---
# Only for simple commands (no pipes, no &&, no ;, no |)
if env_type == "local" and not any(c in command for c in ['|', '&&', '||', ';', '`', '$(']):
cmd_parts = command.strip().split()
if cmd_parts:
first_cmd = cmd_parts[0]
# Skip absolute paths, shell builtins, and common safe patterns
if not first_cmd.startswith('/') and not first_cmd.startswith('./') and first_cmd not in {
'cd', 'echo', 'export', 'set', 'unset', 'read', 'test', '[', 'true', 'false',
'pwd', 'type', 'hash', 'builtin', 'command', 'eval', 'exec', 'exit', 'return',
'shift', 'trap', 'ulimit', 'umask', 'wait', 'jobs', 'fg', 'bg', 'disown',
'source', '.', 'let', 'declare', 'local', 'readonly', 'typeset',
}:
try:
which_result = subprocess.run(
['which', first_cmd],
capture_output=True, text=True, timeout=5
)
if which_result.returncode != 0:
# Provide helpful alternatives
suggestions = []
alt_cmds = {
'tmux': 'Use terminal background=true instead',
'ruff': 'pip install ruff, or use python -m py_compile',
'node': 'Install Node.js or use python3',
'npm': 'Install Node.js',
'docker': 'Install Docker Desktop',
}
if first_cmd in alt_cmds:
suggestions.append(alt_cmds[first_cmd])
suggestion_str = f" Suggestion: {suggestions[0]}" if suggestions else ""
return json.dumps({
"output": "",
"exit_code": 127,
"error": f"Command not found: {first_cmd}.{suggestion_str} Use `which {first_cmd}` to check, or try an alternative.",
"status": "error",
}, ensure_ascii=False)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass # which itself not available, let it run
approval = _check_all_guards(command, env_type)
if not approval["approved"]:
# Check if this is an approval_required (gateway ask mode)