Compare commits
7 Commits
fix/673
...
fix/empiri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
247a8e82d0 | ||
|
|
d3630de3d0 | ||
|
|
96a8398691 | ||
|
|
a418ca6a7c | ||
|
|
244e8d1665 | ||
|
|
aa35ad2fc6 | ||
|
|
6719f8c307 |
@@ -175,6 +175,10 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
|
||||
|
||||
for media_path, _is_voice in media_files:
|
||||
try:
|
||||
# Guard: don't use a closed or stopped loop
|
||||
if loop is None or getattr(loop, "is_closed", lambda: True)():
|
||||
logger.debug("Skipping media send — loop is closed")
|
||||
break
|
||||
ext = Path(media_path).suffix.lower()
|
||||
if ext in _AUDIO_EXTS:
|
||||
coro = adapter.send_voice(chat_id=chat_id, audio_path=media_path, metadata=metadata)
|
||||
@@ -302,7 +306,13 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
|
||||
# Prefer the live adapter when the gateway is running — this supports E2EE
|
||||
# rooms (e.g. Matrix) where the standalone HTTP path cannot encrypt.
|
||||
runtime_adapter = (adapters or {}).get(platform)
|
||||
if runtime_adapter is not None and loop is not None and getattr(loop, "is_running", lambda: False)():
|
||||
_loop_usable = (
|
||||
runtime_adapter is not None
|
||||
and loop is not None
|
||||
and not getattr(loop, "is_closed", lambda: True)()
|
||||
and getattr(loop, "is_running", lambda: False)()
|
||||
)
|
||||
if _loop_usable:
|
||||
send_metadata = {"thread_id": thread_id} if thread_id else None
|
||||
try:
|
||||
# Send cleaned text (MEDIA tags stripped) — not the raw content
|
||||
@@ -596,6 +606,41 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
origin = _resolve_origin(job)
|
||||
_cron_session_id = f"cron_{job_id}_{_hermes_now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
# Check for checkpoint from previous timeout — resume where we left off
|
||||
try:
|
||||
_checkpoint_dir = _hermes_home / "cron" / "checkpoints"
|
||||
_checkpoint_path = _checkpoint_dir / f"{job_id}.json"
|
||||
if _checkpoint_path.exists():
|
||||
import json as _json
|
||||
_cp = _json.loads(_checkpoint_path.read_text())
|
||||
_cp_age = (_hermes_now() - _parse_dt(_cp["saved_at"])).total_seconds() if _cp.get("saved_at") else 9999
|
||||
if _cp_age < 7200: # Only resume if checkpoint is <2 hours old
|
||||
_cp_context = (
|
||||
f"\n\n[CHECKPOINT: This job timed out previously. "
|
||||
f"Iterations completed: {_cp.get('iterations_completed', '?')}. "
|
||||
f"Last activity: {_cp.get('last_activity', '?')}. "
|
||||
f"Continue from where you left off. Do not repeat work already done.]\n"
|
||||
)
|
||||
# Add last conversation context if available
|
||||
_conv = _cp.get("conversation_history", [])
|
||||
if _conv:
|
||||
_cp_context += "\n[Previous work summary from last run:]\n"
|
||||
for _msg in _conv[-5:]:
|
||||
_role = _msg.get("role", "?")
|
||||
_content = str(_msg.get("content", ""))[:300]
|
||||
if _role in ("assistant", "tool"):
|
||||
_cp_context += f" {_role}: {_content}\n"
|
||||
prompt = prompt + _cp_context
|
||||
logger.info("Job '%s': resuming from checkpoint (%.0fs old, %d iterations)",
|
||||
job_id, _cp_age, _cp.get("iterations_completed", 0))
|
||||
# Clear the checkpoint — we're resuming it now
|
||||
_checkpoint_path.unlink()
|
||||
else:
|
||||
logger.info("Job '%s': checkpoint too old (%.0fs), starting fresh", job_id, _cp_age)
|
||||
_checkpoint_path.unlink()
|
||||
except Exception as _cp_err:
|
||||
logger.debug("Job '%s': checkpoint resume failed: %s", job_id, _cp_err)
|
||||
|
||||
logger.info("Running job '%s' (ID: %s)", job_name, job_id)
|
||||
logger.info("Prompt: %s", prompt[:100])
|
||||
|
||||
@@ -762,7 +807,7 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
#
|
||||
# Uses the agent's built-in activity tracker (updated by
|
||||
# _touch_activity() on every tool call, API call, and stream delta).
|
||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 600))
|
||||
_cron_timeout = float(os.getenv("HERMES_CRON_TIMEOUT", 900))
|
||||
_cron_inactivity_limit = _cron_timeout if _cron_timeout > 0 else None
|
||||
_POLL_INTERVAL = 5.0
|
||||
_cron_pool = concurrent.futures.ThreadPoolExecutor(max_workers=1)
|
||||
@@ -812,6 +857,30 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
|
||||
_iter_n = _activity.get("api_call_count", 0)
|
||||
_iter_max = _activity.get("max_iterations", 0)
|
||||
|
||||
# Save checkpoint before killing — next run can resume
|
||||
try:
|
||||
_checkpoint_dir = _hermes_home / "cron" / "checkpoints"
|
||||
_checkpoint_dir.mkdir(parents=True, exist_ok=True)
|
||||
_checkpoint_path = _checkpoint_dir / f"{job_id}.json"
|
||||
_conv_history = []
|
||||
if hasattr(agent, "conversation_history"):
|
||||
_conv_history = agent.conversation_history
|
||||
elif hasattr(agent, "_session_messages"):
|
||||
_conv_history = agent._session_messages
|
||||
_checkpoint = {
|
||||
"job_id": job_id,
|
||||
"job_name": job_name,
|
||||
"saved_at": _hermes_now().isoformat(),
|
||||
"iterations_completed": _iter_n,
|
||||
"last_activity": _last_desc,
|
||||
"conversation_history": _conv_history[-20:] if _conv_history else [],
|
||||
}
|
||||
import json as _json
|
||||
_checkpoint_path.write_text(_json.dumps(_checkpoint, indent=2, default=str))
|
||||
logger.info("Job '%s': checkpoint saved (%d iterations)", job_id, _iter_n)
|
||||
except Exception as _cp_err:
|
||||
logger.warning("Job '%s': failed to save checkpoint: %s", job_id, _cp_err)
|
||||
|
||||
logger.error(
|
||||
"Job '%s' idle for %.0fs (inactivity limit %.0fs) "
|
||||
"| last_activity=%s | iteration=%s/%s | tool=%s",
|
||||
|
||||
@@ -1837,12 +1837,50 @@ class APIServerAdapter(BasePlatformAdapter):
|
||||
pass
|
||||
|
||||
# Port conflict detection — fail fast if port is already in use
|
||||
# But: if the port is in TIME_WAIT from our own previous adapter,
|
||||
# or owned by our own process, try to bind anyway (SO_REUSEADDR).
|
||||
import os as _os
|
||||
try:
|
||||
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
|
||||
_s.settimeout(1)
|
||||
_s.connect(('127.0.0.1', self._port))
|
||||
logger.error('[%s] Port %d already in use. Set a different port in config.yaml: platforms.api_server.port', self.name, self._port)
|
||||
return False
|
||||
# Port is in use — check if it's our own process
|
||||
_our_pid = _os.getpid()
|
||||
_port_pids = set()
|
||||
try:
|
||||
import subprocess as _sp
|
||||
_lsof = _sp.run(
|
||||
['lsof', '-ti', f':{self._port}'],
|
||||
capture_output=True, text=True, timeout=3
|
||||
)
|
||||
if _lsof.returncode == 0:
|
||||
_port_pids = {int(p) for p in _lsof.stdout.strip().split('\n') if p.strip()}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if _port_pids and _our_pid in _port_pids:
|
||||
# Port is owned by us (previous adapter not fully cleaned up).
|
||||
# Proceed — SO_REUSEADDR on the TCPSite will handle it.
|
||||
logger.info(
|
||||
'[%s] Port %d is held by this process (PID %d, possibly TIME_WAIT). '
|
||||
'Proceeding with SO_REUSEADDR.',
|
||||
self.name, self._port, _our_pid,
|
||||
)
|
||||
elif _port_pids:
|
||||
# Port owned by a different process — genuine conflict
|
||||
_other_pids = _port_pids - {_our_pid}
|
||||
logger.error(
|
||||
'[%s] Port %d already in use by PID(s) %s. '
|
||||
'Set a different port in config.yaml: platforms.api_server.port',
|
||||
self.name, self._port, _other_pids,
|
||||
)
|
||||
return False
|
||||
else:
|
||||
# lsof found nothing but connect succeeded — likely TIME_WAIT
|
||||
logger.info(
|
||||
'[%s] Port %d appears to be in TIME_WAIT. Proceeding.',
|
||||
self.name, self._port,
|
||||
)
|
||||
except (ConnectionRefusedError, OSError):
|
||||
pass # port is free
|
||||
|
||||
|
||||
@@ -128,13 +128,29 @@ class WebhookAdapter(BasePlatformAdapter):
|
||||
app.router.add_post("/webhooks/{route_name}", self._handle_webhook)
|
||||
|
||||
# Port conflict detection — fail fast if port is already in use
|
||||
import socket as _socket
|
||||
# But: if the port is held by our own process (reconnect/ TIME_WAIT), proceed.
|
||||
import socket as _socket, os as _os
|
||||
try:
|
||||
with _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM) as _s:
|
||||
_s.settimeout(1)
|
||||
_s.connect(('127.0.0.1', self._port))
|
||||
logger.error('[webhook] Port %d already in use. Set a different port in config.yaml: platforms.webhook.port', self._port)
|
||||
return False
|
||||
_our_pid = _os.getpid()
|
||||
_port_pids = set()
|
||||
try:
|
||||
import subprocess as _sp
|
||||
_lsof = _sp.run(['lsof', '-ti', f':{self._port}'],
|
||||
capture_output=True, text=True, timeout=3)
|
||||
if _lsof.returncode == 0:
|
||||
_port_pids = {int(p) for p in _lsof.stdout.strip().split('\n') if p.strip()}
|
||||
except Exception:
|
||||
pass
|
||||
if _port_pids and _our_pid in _port_pids:
|
||||
logger.info('[webhook] Port %d held by this process (PID %d). Proceeding.', self._port, _our_pid)
|
||||
elif _port_pids:
|
||||
logger.error('[webhook] Port %d already in use by PID(s) %s.', self._port, _port_pids - {_our_pid})
|
||||
return False
|
||||
else:
|
||||
logger.info('[webhook] Port %d in TIME_WAIT. Proceeding.', self._port)
|
||||
except (ConnectionRefusedError, OSError):
|
||||
pass # port is free
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import shlex
|
||||
import signal
|
||||
import socket
|
||||
@@ -916,6 +917,62 @@ def execute_code(
|
||||
if not code or not code.strip():
|
||||
return tool_error("No code provided.")
|
||||
|
||||
# --- Poka-yoke: pre-execution validation ---
|
||||
import ast
|
||||
|
||||
# 1. Syntax check (catches ~236 SyntaxError occurrences)
|
||||
try:
|
||||
ast.parse(code)
|
||||
except SyntaxError as e:
|
||||
return json.dumps({
|
||||
"error": f"SyntaxError: {e.msg} (line {e.lineno}). Fix the syntax before executing.",
|
||||
"status": "error",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# 2. Detect tool names used without importing from hermes_tools
|
||||
# (catches ~279 NameError occurrences for tool names)
|
||||
_SANDBOX_TOOLS = {"read_file", "write_file", "terminal", "search_files",
|
||||
"patch", "web_search", "web_extract", "json_parse",
|
||||
"shell_quote", "retry", "fact_store", "fact_search",
|
||||
"fact_probe", "fact_feedback"}
|
||||
_COMMON_IMPORTS = {"os", "json", "re", "sys", "math", "csv", "datetime",
|
||||
"collections", "pathlib", "subprocess", "requests",
|
||||
"time", "shutil", "shlex", "glob", "io", "copy",
|
||||
"functools", "itertools", "hashlib", "base64",
|
||||
"urllib", "tempfile", "threading"}
|
||||
|
||||
if "from hermes_tools import" not in code:
|
||||
# Check if code uses tool names without importing
|
||||
used_tools = set()
|
||||
for tool in _SANDBOX_TOOLS:
|
||||
# Match tool name used as a function call: tool_name(
|
||||
if re.search(r'\b' + re.escape(tool) + r'\s*\(', code):
|
||||
used_tools.add(tool)
|
||||
if used_tools:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Names {used_tools} are tools, not Python builtins. "
|
||||
f"Add this import at the top of your code:\n"
|
||||
f"from hermes_tools import {', '.join(sorted(used_tools))}"
|
||||
),
|
||||
"status": "error",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# 3. Detect common missing imports (os, json, re, etc.)
|
||||
if "import " not in code[:500]:
|
||||
used_imports = set()
|
||||
for mod in _COMMON_IMPORTS:
|
||||
if re.search(r'\b' + re.escape(mod) + r'\b', code):
|
||||
used_imports.add(mod)
|
||||
if used_imports:
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"Missing imports: {used_imports}. "
|
||||
f"Add at the top: import {', '.join(sorted(used_imports))}"
|
||||
),
|
||||
"status": "error",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Dispatch: remote backends use file-based RPC, local uses UDS
|
||||
from tools.terminal_tool import _get_env_config
|
||||
env_type = _get_env_config()["env_type"]
|
||||
|
||||
@@ -280,7 +280,29 @@ def clear_file_ops_cache(task_id: str = None):
|
||||
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
|
||||
"""Read a file with pagination and line numbers."""
|
||||
try:
|
||||
# ── Device path guard ─────────────────────────────────────────
|
||||
# --- Poka-yoke: path existence check (catches ~221 file not found) ---
|
||||
resolved = os.path.expanduser(path)
|
||||
if not os.path.exists(resolved):
|
||||
# Check if it's a typo — look for similar filenames
|
||||
parent = os.path.dirname(resolved) or "."
|
||||
basename = os.path.basename(resolved)
|
||||
suggestion = ""
|
||||
if os.path.isdir(parent):
|
||||
import difflib
|
||||
siblings = os.listdir(parent)
|
||||
close = difflib.get_close_matches(basename, siblings, n=1, cutoff=0.6)
|
||||
if close:
|
||||
suggestion = f" Did you mean: {os.path.join(parent, close[0])}?"
|
||||
else:
|
||||
# Try partial match
|
||||
partial = [s for s in siblings if basename.lower() in s.lower()]
|
||||
if partial:
|
||||
suggestion = f" Similar files: {', '.join(partial[:3])}"
|
||||
return json.dumps({
|
||||
"error": f"File not found: {path}.{suggestion} Use search_files to find the correct path.",
|
||||
"status": "error",
|
||||
}, ensure_ascii=False)
|
||||
|
||||
# Block paths that would hang the process (infinite output,
|
||||
# blocking on input). Pure path check — no I/O.
|
||||
if _is_blocked_device(path):
|
||||
@@ -388,6 +410,17 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
|
||||
result.content = redact_sensitive_text(result.content)
|
||||
result_dict["content"] = result.content
|
||||
|
||||
# ── JSON/YAML content without line numbers ───────────────────
|
||||
# Strip line-number prefixes (e.g. " 1|{...}" → "{...}")
|
||||
# so agents can json.loads() directly without manual stripping.
|
||||
# Addresses ~721 JSONDecodeError occurrences from empirical audit.
|
||||
_suffix = Path(path).suffix.lower()
|
||||
if _suffix in (".json", ".yaml", ".yml") and result_dict.get("content"):
|
||||
result_dict["json_content"] = "\n".join(
|
||||
line.split("|", 1)[1] if "|" in line else line
|
||||
for line in result_dict["content"].split("\n")
|
||||
)
|
||||
|
||||
# Large-file hint: if the file is big and the caller didn't ask
|
||||
# for a narrow window, nudge toward targeted reads.
|
||||
if (file_size and file_size > _LARGE_FILE_HINT_BYTES
|
||||
|
||||
@@ -1314,6 +1314,47 @@ def terminal_tool(
|
||||
# Skip check if force=True (user has confirmed they want to run it)
|
||||
approval_note = None
|
||||
if not force:
|
||||
|
||||
# --- Poka-yoke: command existence check (catches ~461 exit_127) ---
|
||||
# Only for simple commands (no pipes, no &&, no ;, no |)
|
||||
if env_type == "local" and not any(c in command for c in ['|', '&&', '||', ';', '`', '$(']):
|
||||
cmd_parts = command.strip().split()
|
||||
if cmd_parts:
|
||||
first_cmd = cmd_parts[0]
|
||||
# Skip absolute paths, shell builtins, and common safe patterns
|
||||
if not first_cmd.startswith('/') and not first_cmd.startswith('./') and first_cmd not in {
|
||||
'cd', 'echo', 'export', 'set', 'unset', 'read', 'test', '[', 'true', 'false',
|
||||
'pwd', 'type', 'hash', 'builtin', 'command', 'eval', 'exec', 'exit', 'return',
|
||||
'shift', 'trap', 'ulimit', 'umask', 'wait', 'jobs', 'fg', 'bg', 'disown',
|
||||
'source', '.', 'let', 'declare', 'local', 'readonly', 'typeset',
|
||||
}:
|
||||
try:
|
||||
which_result = subprocess.run(
|
||||
['which', first_cmd],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if which_result.returncode != 0:
|
||||
# Provide helpful alternatives
|
||||
suggestions = []
|
||||
alt_cmds = {
|
||||
'tmux': 'Use terminal background=true instead',
|
||||
'ruff': 'pip install ruff, or use python -m py_compile',
|
||||
'node': 'Install Node.js or use python3',
|
||||
'npm': 'Install Node.js',
|
||||
'docker': 'Install Docker Desktop',
|
||||
}
|
||||
if first_cmd in alt_cmds:
|
||||
suggestions.append(alt_cmds[first_cmd])
|
||||
suggestion_str = f" Suggestion: {suggestions[0]}" if suggestions else ""
|
||||
return json.dumps({
|
||||
"output": "",
|
||||
"exit_code": 127,
|
||||
"error": f"Command not found: {first_cmd}.{suggestion_str} Use `which {first_cmd}` to check, or try an alternative.",
|
||||
"status": "error",
|
||||
}, ensure_ascii=False)
|
||||
except (subprocess.TimeoutExpired, FileNotFoundError):
|
||||
pass # which itself not available, let it run
|
||||
|
||||
approval = _check_all_guards(command, env_type)
|
||||
if not approval["approved"]:
|
||||
# Check if this is an approval_required (gateway ask mode)
|
||||
|
||||
Reference in New Issue
Block a user