[SYNC] Merge upstream NousResearch/hermes-agent — 499 commits #201

Merged
Rockachopa merged 500 commits from upstream-sync into main 2026-04-07 14:03:16 +00:00
2 changed files with 149 additions and 19 deletions
Showing only changes of commit 2563493466 - Show all commits

View File

@@ -1807,9 +1807,22 @@ class GatewayRunner:
_STALE_TTL = (_raw_stale_timeout + 60) if _raw_stale_timeout > 0 else float("inf")
_stale_ts = self._running_agents_ts.get(_quick_key, 0)
if _quick_key in self._running_agents and _stale_ts and (time.time() - _stale_ts) > _STALE_TTL:
_stale_age = time.time() - _stale_ts
_stale_agent = self._running_agents.get(_quick_key)
_stale_detail = ""
if _stale_agent and hasattr(_stale_agent, "get_activity_summary"):
try:
_sa = _stale_agent.get_activity_summary()
_stale_detail = (
f" | last_activity={_sa.get('last_activity_desc', 'unknown')} "
f"({_sa.get('seconds_since_activity', 0):.0f}s ago) "
f"| iteration={_sa.get('api_call_count', 0)}/{_sa.get('max_iterations', 0)}"
)
except Exception:
pass
logger.warning(
"Evicting stale _running_agents entry for %s (age: %.0fs)",
_quick_key[:30], time.time() - _stale_ts,
"Evicting stale _running_agents entry for %s (age: %.0fs, TTL: %.0fs)%s",
_quick_key[:30], _stale_age, _STALE_TTL, _stale_detail,
)
del self._running_agents[_quick_key]
self._running_agents_ts.pop(_quick_key, None)
@@ -6727,10 +6740,24 @@ class GatewayRunner:
while True:
await asyncio.sleep(_NOTIFY_INTERVAL)
_elapsed_mins = int((time.time() - _notify_start) // 60)
# Include agent activity context if available.
_agent_ref = agent_holder[0]
_status_detail = ""
if _agent_ref and hasattr(_agent_ref, "get_activity_summary"):
try:
_a = _agent_ref.get_activity_summary()
_parts = [f"iteration {_a['api_call_count']}/{_a['max_iterations']}"]
if _a.get("current_tool"):
_parts.append(f"running: {_a['current_tool']}")
else:
_parts.append(_a.get("last_activity_desc", ""))
_status_detail = "" + ", ".join(_parts)
except Exception:
pass
try:
await _notify_adapter.send(
source.chat_id,
f"⏳ Still working... ({_elapsed_mins} minutes elapsed)",
f"⏳ Still working... ({_elapsed_mins} min elapsed{_status_detail})",
metadata=_status_thread_metadata,
)
except Exception as _ne:
@@ -6752,26 +6779,66 @@ class GatewayRunner:
timeout=_agent_timeout,
)
except asyncio.TimeoutError:
# Build a diagnostic summary from the agent's activity tracker.
_timed_out_agent = agent_holder[0]
_activity = {}
if _timed_out_agent and hasattr(_timed_out_agent, "get_activity_summary"):
try:
_activity = _timed_out_agent.get_activity_summary()
except Exception:
pass
_last_desc = _activity.get("last_activity_desc", "unknown")
_secs_ago = _activity.get("seconds_since_activity", 0)
_cur_tool = _activity.get("current_tool")
_iter_n = _activity.get("api_call_count", 0)
_iter_max = _activity.get("max_iterations", 0)
logger.error(
"Agent execution timed out after %.0fs for session %s",
"Agent execution timed out after %.0fs for session %s "
"| last_activity=%.0fs ago (%s) | iteration=%s/%s | tool=%s",
_agent_timeout, session_key,
_secs_ago, _last_desc, _iter_n, _iter_max,
_cur_tool or "none",
)
# Interrupt the agent if it's still running so the thread
# pool worker is freed.
_timed_out_agent = agent_holder[0]
if _timed_out_agent and hasattr(_timed_out_agent, "interrupt"):
_timed_out_agent.interrupt("Execution timed out")
_timeout_mins = int(_agent_timeout // 60)
# Construct a user-facing message with diagnostic context.
_diag_lines = [f"⏱️ Request timed out after {_timeout_mins} minutes."]
if _secs_ago < 30:
_diag_lines.append(
f"The agent was actively working when the timeout fired "
f"(last activity: {_last_desc}, {_secs_ago:.0f}s ago, "
f"iteration {_iter_n}/{_iter_max})."
)
elif _cur_tool:
_diag_lines.append(
f"The agent appears stuck on tool `{_cur_tool}` "
f"({_secs_ago:.0f}s since last activity, "
f"iteration {_iter_n}/{_iter_max})."
)
else:
_diag_lines.append(
f"Last activity: {_last_desc} ({_secs_ago:.0f}s ago, "
f"iteration {_iter_n}/{_iter_max}). "
"The agent may have been waiting on an API response."
)
_diag_lines.append(
"To increase the limit, set HERMES_AGENT_TIMEOUT in your .env "
"(value in seconds, 0 = no limit) and restart the gateway.\n"
"Try again, or use /reset to start fresh."
)
response = {
"final_response": (
f"⏱️ Request timed out after {_timeout_mins} minutes. "
"The agent may have been stuck on a tool or API call.\n"
"To increase the limit, set HERMES_AGENT_TIMEOUT in your .env "
"(value in seconds, 0 = no limit) and restart the gateway.\n"
"Try again, or use /reset to start fresh."
),
"final_response": "\n".join(_diag_lines),
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
"api_calls": 0,
"api_calls": _iter_n,
"tools": tools_holder[0] or [],
"history_offset": 0,
"failed": True,

View File

@@ -707,6 +707,15 @@ class AIAgent:
# status_callback for gateway platforms. Does NOT inject into messages.
self._context_pressure_warned = False
# Activity tracking — updated on each API call, tool execution, and
# stream chunk. Used by the gateway timeout handler to report what the
# agent was doing when it was killed, and by the "still working"
# notifications to show progress.
self._last_activity_ts: float = time.time()
self._last_activity_desc: str = "initializing"
self._current_tool: str | None = None
self._api_call_count: int = 0
# Persistent error log -- always writes WARNING+ to ~/.hermes/logs/errors.log
# so tool failures, API errors, etc. are inspectable after the fact.
# In gateway mode, each incoming message creates a new AIAgent instance,
@@ -2617,6 +2626,29 @@ class AIAgent:
self._interrupt_message = None
_set_interrupt(False)
def _touch_activity(self, desc: str) -> None:
"""Update the last-activity timestamp and description (thread-safe)."""
self._last_activity_ts = time.time()
self._last_activity_desc = desc
def get_activity_summary(self) -> dict:
"""Return a snapshot of the agent's current activity for diagnostics.
Called by the gateway timeout handler to report what the agent was doing
when it was killed, and by the periodic "still working" notifications.
"""
elapsed = time.time() - self._last_activity_ts
return {
"last_activity_ts": self._last_activity_ts,
"last_activity_desc": self._last_activity_desc,
"seconds_since_activity": round(elapsed, 1),
"current_tool": self._current_tool,
"api_call_count": self._api_call_count,
"max_iterations": self.max_iterations,
"budget_used": self.iteration_budget.used,
"budget_max": self.iteration_budget.max_total,
}
def shutdown_memory_provider(self, messages: list = None) -> None:
"""Shut down the memory provider — call at actual session boundaries.
@@ -4354,6 +4386,7 @@ class AIAgent:
# Reset stale-stream timer so the detector measures from this
# attempt's start, not a previous attempt's last chunk.
last_chunk_time["t"] = time.time()
self._touch_activity("waiting for provider response (streaming)")
stream = request_client_holder["client"].chat.completions.create(**stream_kwargs)
content_parts: list = []
@@ -4374,8 +4407,12 @@ class AIAgent:
# knows whether reasoning was already displayed during streaming.
self._reasoning_deltas_fired = False
_first_chunk_seen = False
for chunk in stream:
last_chunk_time["t"] = time.time()
if not _first_chunk_seen:
_first_chunk_seen = True
self._touch_activity("receiving stream response")
if self._interrupt_requested:
break
@@ -4726,10 +4763,20 @@ class AIAgent:
# Detect stale streams: connections kept alive by SSE pings
# but delivering no real chunks. Kill the client so the
# inner retry loop can start a fresh connection.
if time.time() - last_chunk_time["t"] > _stream_stale_timeout:
_stale_elapsed = time.time() - last_chunk_time["t"]
if _stale_elapsed > _stream_stale_timeout:
_est_ctx = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4
logger.warning(
"Stream stale for %.0fs — no chunks received. Killing connection.",
_stream_stale_timeout,
"Stream stale for %.0fs (threshold %.0fs) — no chunks received. "
"model=%s context=~%s tokens. Killing connection.",
_stale_elapsed, _stream_stale_timeout,
api_kwargs.get("model", "unknown"), f"{_est_ctx:,}",
)
self._emit_status(
f"⚠️ No response from provider for {int(_stale_elapsed)}s "
f"(model: {api_kwargs.get('model', 'unknown')}, "
f"context: ~{_est_ctx:,} tokens). "
f"Reconnecting..."
)
try:
rc = request_client_holder.get("client")
@@ -6153,6 +6200,9 @@ class AIAgent:
response_preview = function_result[:self.log_prefix_chars] + "..." if len(function_result) > self.log_prefix_chars else function_result
print(f" ✅ Tool {i+1} completed in {tool_duration:.2f}s - {response_preview}")
self._current_tool = None
self._touch_activity(f"tool completed: {name} ({tool_duration:.1f}s)")
if self.tool_complete_callback:
try:
self.tool_complete_callback(tc.id, name, args, function_result)
@@ -6238,6 +6288,9 @@ class AIAgent:
args_preview = args_str[:self.log_prefix_chars] + "..." if len(args_str) > self.log_prefix_chars else args_str
print(f" 📞 Tool {i}: {function_name}({list(function_args.keys())}) - {args_preview}")
self._current_tool = function_name
self._touch_activity(f"executing tool: {function_name}")
if self.tool_progress_callback:
try:
preview = _build_tool_preview(function_name, function_args)
@@ -6437,6 +6490,9 @@ class AIAgent:
except Exception as cb_err:
logging.debug(f"Tool progress callback error: {cb_err}")
self._current_tool = None
self._touch_activity(f"tool completed: {function_name} ({tool_duration:.1f}s)")
if self.verbose_logging:
logging.debug(f"Tool {function_name} completed in {tool_duration:.2f}s")
logging.debug(f"Tool result ({len(function_result)} chars): {function_result}")
@@ -7033,6 +7089,8 @@ class AIAgent:
break
api_call_count += 1
self._api_call_count = api_call_count
self._touch_activity(f"starting API call #{api_call_count}")
if not self.iteration_budget.consume():
if not self.quiet_mode:
self._safe_print(f"\n⚠️ Iteration budget exhausted ({self.iteration_budget.used}/{self.iteration_budget.max_total} iterations used)")
@@ -7634,6 +7692,7 @@ class AIAgent:
self._vprint(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)")
has_retried_429 = False # Reset on success
self._touch_activity(f"API call #{api_call_count} completed")
break # Success, exit retry loop
except InterruptedError:
@@ -8008,7 +8067,7 @@ class AIAgent:
"error": f"Context length exceeded: max compression attempts ({max_compression_attempts}) reached.",
"partial": True
}
self._vprint(f"{self.log_prefix} 🗜️ Context compression attempt {compression_attempts}/{max_compression_attempts}...")
self._emit_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...")
original_len = len(messages)
messages, active_system_prompt = self._compress_context(
@@ -8076,6 +8135,10 @@ class AIAgent:
self._dump_api_request_debug(
api_kwargs, reason="non_retryable_client_error", error=api_error,
)
self._emit_status(
f"❌ Non-retryable error (HTTP {status_code}): "
f"{self._summarize_api_error(api_error)}"
)
self._vprint(f"{self.log_prefix}❌ Non-retryable client error (HTTP {status_code}). Aborting.", force=True)
self._vprint(f"{self.log_prefix} 🔌 Provider: {_provider} Model: {_model}", force=True)
self._vprint(f"{self.log_prefix} 🌐 Endpoint: {_base}", force=True)
@@ -8129,9 +8192,9 @@ class AIAgent:
continue
_final_summary = self._summarize_api_error(api_error)
if is_rate_limited:
self._vprint(f"{self.log_prefix}❌ Rate limit persisted after {max_retries} retries. Please try again later.", force=True)
self._emit_status(f"❌ Rate limited after {max_retries} retries{_final_summary}")
else:
self._vprint(f"{self.log_prefix}❌ Max retries ({max_retries}) exceeded. Giving up.", force=True)
self._emit_status(f"❌ API failed after {max_retries} retries — {_final_summary}")
self._vprint(f"{self.log_prefix} 💀 Final error: {_final_summary}", force=True)
# Detect SSE stream-drop pattern (e.g. "Network