Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
a23f54d15f fix: add numbered approval and clarify shortcuts (#956)
All checks were successful
Lint / lint (pull_request) Successful in 21s
2026-04-22 10:44:09 -04:00
Alexander Whitestone
ffb3257cb5 fix: repair auxiliary client syntax corruption 2026-04-22 10:44:00 -04:00
6 changed files with 397 additions and 333 deletions

View File

@@ -1,4 +1,4 @@
from agent.telemetry_logger import log_token_usage\n"""Shared auxiliary client router for side tasks.
"""Shared auxiliary client router for side tasks.
Provides a single resolution chain so every consumer (context compression,
session search, web extraction, vision analysis, browser vision) picks up
@@ -34,6 +34,8 @@ Payment / credit exhaustion fallback:
their OpenRouter balance but has Codex OAuth or another provider available.
"""
from agent.telemetry_logger import log_token_usage
import json
import logging
import os
@@ -396,7 +398,8 @@ class _CodexCompletionsAdapter:
prompt_tokens=getattr(resp_usage, "input_tokens", 0),
completion_tokens=getattr(resp_usage, "output_tokens", 0),
total_tokens=getattr(resp_usage, "total_tokens", 0),
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
except Exception as exc:
logger.debug("Codex auxiliary Responses API call failed: %s", exc)
raise
@@ -529,7 +532,8 @@ class _AnthropicCompletionsAdapter:
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
choice = SimpleNamespace(
index=0,

256
cli.py
View File

@@ -589,7 +589,6 @@ from tools.terminal_tool import set_sudo_password_callback, set_approval_callbac
from tools.skills_tool import set_secret_capture_callback
from hermes_cli.callbacks import prompt_for_secret
from tools.browser_tool import _emergency_cleanup_all_sessions as _cleanup_all_browsers
from utils import repair_and_load_json
# Guard to prevent cleanup from running multiple times on exit
_cleanup_done = False
@@ -3570,11 +3569,7 @@ class HermesCLI:
result_json = _asyncio.run(
vision_analyze_tool(image_url=str(img_path), user_prompt=analysis_prompt)
)
result = repair_and_load_json(
result_json,
default={},
context="cli_image_analysis",
) if isinstance(result_json, str) else {}
result = _json.loads(result_json)
if result.get("success"):
description = result.get("analysis", "")
enriched_parts.append(
@@ -4965,14 +4960,7 @@ class HermesCLI:
from tools.cronjob_tools import cronjob as cronjob_tool
def _cron_api(**kwargs):
result = repair_and_load_json(
cronjob_tool(**kwargs),
default=None,
context="cli_cron_command",
)
if isinstance(result, dict):
return result
return {"success": False, "error": "Invalid JSON from cronjob tool"}
return json.loads(cronjob_tool(**kwargs))
def _normalize_skills(values):
normalized = []
@@ -7266,6 +7254,40 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
def _handle_clarify_selection(self) -> None:
"""Process the currently selected clarify choice."""
state = self._clarify_state
if not state or self._clarify_freetext:
return
selected = state.get("selected", 0)
choices = state.get("choices") or []
if selected < len(choices):
state["response_queue"].put(choices[selected])
self._clarify_state = None
self._clarify_freetext = False
self._invalidate()
return
if selected == len(choices):
self._clarify_freetext = True
self._invalidate()
def _handle_clarify_number_shortcut(self, number: int) -> bool:
"""Select a clarify option by number key."""
state = self._clarify_state
if not state or self._clarify_freetext:
return False
choices = state.get("choices") or []
max_option = len(choices) + 1
if number < 1 or number > max_option:
return False
state["selected"] = number - 1
self._handle_clarify_selection()
return True
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
@@ -7374,6 +7396,20 @@ class HermesCLI:
choices.append("view")
return choices
def _handle_approval_number_shortcut(self, number: int) -> bool:
"""Select an approval option by number key."""
state = self._approval_state
if not state:
return False
choices = state.get("choices") or []
if number < 1 or number > len(choices):
return False
state["selected"] = number - 1
self._handle_approval_selection()
return True
def _handle_approval_selection(self) -> None:
"""Process the currently selected dangerous-command approval choice."""
state = self._approval_state
@@ -7449,8 +7485,9 @@ class HermesCLI:
preview_lines.extend(_wrap_panel_text(cmd_display, 60))
for i, choice in enumerate(choices):
prefix = ' ' if i == selected else ' '
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
preview_lines.extend(_wrap_panel_text(
f"{prefix}{choice_labels.get(choice, choice)}",
f"{prefix}{label}",
60,
subsequent_indent=" ",
))
@@ -7468,7 +7505,7 @@ class HermesCLI:
_append_panel_line(lines, 'class:approval-border', 'class:approval-cmd', wrapped, box_width)
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for i, choice in enumerate(choices):
label = choice_labels.get(choice, choice)
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
style = 'class:approval-selected' if i == selected else 'class:approval-choice'
prefix = ' ' if i == selected else ' '
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
@@ -7477,6 +7514,97 @@ class HermesCLI:
lines.append(('class:approval-border', '' + ('' * box_width) + '\n'))
return lines
def _get_clarify_display_fragments(self):
"""Render the clarify panel for the prompt_toolkit UI."""
state = self._clarify_state
if not state:
return []
def _panel_box_width(title: str, content_lines: list[str], min_width: int = 46, max_width: int = 76) -> int:
term_cols = shutil.get_terminal_size((100, 20)).columns
longest = max([len(title)] + [len(line) for line in content_lines] + [min_width - 4])
inner = min(max(longest + 4, min_width - 2), max_width - 2, max(24, term_cols - 6))
return inner + 2
def _wrap_panel_text(text: str, width: int, subsequent_indent: str = "") -> list[str]:
wrapped = textwrap.wrap(
text,
width=max(8, width),
break_long_words=False,
break_on_hyphens=False,
subsequent_indent=subsequent_indent,
)
return wrapped or [""]
def _append_panel_line(lines, border_style: str, content_style: str, text: str, box_width: int) -> None:
inner_width = max(0, box_width - 2)
lines.append((border_style, ""))
lines.append((content_style, text.ljust(inner_width)))
lines.append((border_style, "\n"))
def _append_blank_panel_line(lines, border_style: str, box_width: int) -> None:
lines.append((border_style, "" + (" " * box_width) + "\n"))
question = state["question"]
choices = state.get("choices") or []
selected = state.get("selected", 0)
preview_lines = _wrap_panel_text(question, 60)
for i, choice in enumerate(choices):
prefix = " " if i == selected and not self._clarify_freetext else " "
label = f"{i + 1}. {choice}"
preview_lines.extend(_wrap_panel_text(f"{prefix}{label}", 60, subsequent_indent=" "))
other_number = len(choices) + 1
other_label = (
f" {other_number}. Other (type below)" if self._clarify_freetext
else f" {other_number}. Other (type your answer)" if selected == len(choices)
else f" {other_number}. Other (type your answer)"
)
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
lines = []
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
for wrapped in _wrap_panel_text(question, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if self._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
for i, choice in enumerate(choices):
style = 'class:clarify-selected' if i == selected and not self._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not self._clarify_freetext else ' '
label = f"{i + 1}. {choice}"
wrapped_lines = _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
other_idx = len(choices)
if selected == other_idx and not self._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = f' {other_number}. Other (type your answer)'
elif self._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = f' {other_number}. Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = f' {other_number}. Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines
def _secret_capture_callback(self, var_name: str, prompt: str, metadata=None) -> dict:
return prompt_for_secret(self, var_name, prompt, metadata)
@@ -8383,17 +8511,8 @@ class HermesCLI:
# --- Clarify choice mode: confirm the highlighted selection ---
if self._clarify_state and not self._clarify_freetext:
state = self._clarify_state
selected = state["selected"]
choices = state.get("choices") or []
if selected < len(choices):
state["response_queue"].put(choices[selected])
self._clarify_state = None
event.app.invalidate()
else:
# "Other" selected → switch to freetext
self._clarify_freetext = True
event.app.invalidate()
self._handle_clarify_selection()
event.app.invalidate()
return
# --- Normal input routing ---
@@ -8513,6 +8632,19 @@ class HermesCLI:
self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1)
event.app.invalidate()
# --- Numbered shortcuts for clarify / approval modal prompts ---
for _digit in '123456789':
@kb.add(_digit, filter=Condition(lambda: bool(self._approval_state) or (bool(self._clarify_state) and not self._clarify_freetext)))
def _handle_modal_number(event, digit=_digit):
number = int(digit)
handled = False
if self._approval_state:
handled = self._handle_approval_number_shortcut(number)
elif self._clarify_state and not self._clarify_freetext:
handled = self._handle_clarify_number_shortcut(number)
if handled:
event.app.invalidate()
# --- /model picker: arrow-key navigation ---
@kb.add('up', filter=Condition(lambda: bool(self._model_picker_state)))
def model_picker_up(event):
@@ -9007,7 +9139,7 @@ class HermesCLI:
if cli_ref._approval_state:
remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic()))
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', f' ({remaining}s)'),
]
@@ -9020,7 +9152,7 @@ class HermesCLI:
('class:clarify-countdown', countdown),
]
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', countdown),
]
@@ -9098,71 +9230,7 @@ class HermesCLI:
lines.append((border_style, "" + (" " * box_width) + "\n"))
def _get_clarify_display():
"""Build styled text for the clarify question/choices panel."""
state = cli_ref._clarify_state
if not state:
return []
question = state["question"]
choices = state.get("choices") or []
selected = state.get("selected", 0)
preview_lines = _wrap_panel_text(question, 60)
for i, choice in enumerate(choices):
prefix = " " if i == selected and not cli_ref._clarify_freetext else " "
preview_lines.extend(_wrap_panel_text(f"{prefix}{choice}", 60, subsequent_indent=" "))
other_label = (
" Other (type below)" if cli_ref._clarify_freetext
else " Other (type your answer)" if selected == len(choices)
else " Other (type your answer)"
)
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
lines = []
# Box top border
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
# Question text
for wrapped in _wrap_panel_text(question, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if cli_ref._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
# Multiple-choice mode: show selectable options
for i, choice in enumerate(choices):
style = 'class:clarify-selected' if i == selected and not cli_ref._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not cli_ref._clarify_freetext else ' '
wrapped_lines = _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
# "Other" option (5th line, only shown when choices exist)
other_idx = len(choices)
if selected == other_idx and not cli_ref._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = ' Other (type your answer)'
elif cli_ref._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = ' Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = ' Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines
return cli_ref._get_clarify_display_fragments()
clarify_widget = ConditionalContainer(
Window(

View File

@@ -1,62 +0,0 @@
import sys
import types
from unittest.mock import patch
def _stub_auxiliary_client():
stub = types.ModuleType("agent.auxiliary_client")
stub.call_llm = lambda *args, **kwargs: None
stub.resolve_provider_client = lambda *args, **kwargs: (None, None)
stub.get_text_auxiliary_client = lambda *args, **kwargs: (None, None)
stub.async_call_llm = lambda *args, **kwargs: None
stub.extract_content_or_reasoning = lambda *args, **kwargs: ""
stub._OR_HEADERS = {}
stub._get_task_timeout = lambda *args, **kwargs: 30
sys.modules["agent.auxiliary_client"] = stub
def _stub_vision_tools(vision_analyze_tool):
stub = types.ModuleType("tools.vision_tools")
stub.vision_analyze_tool = vision_analyze_tool
sys.modules["tools.vision_tools"] = stub
def test_preprocess_images_with_vision_repairs_malformed_json(tmp_path):
_stub_auxiliary_client()
from cli import HermesCLI
cli_obj = HermesCLI.__new__(HermesCLI)
image_path = tmp_path / "test.png"
image_path.write_bytes(b"fake-image-bytes")
async def fake_vision(**kwargs):
return "{'success': true, 'analysis': 'Recovered image description',}"
_stub_vision_tools(fake_vision)
result = HermesCLI._preprocess_images_with_vision(
cli_obj,
"Describe this",
[image_path],
announce=False,
)
assert "Recovered image description" in result
assert "Describe this" in result
assert str(image_path) in result
def test_handle_cron_command_repairs_malformed_json(capsys):
_stub_auxiliary_client()
from cli import HermesCLI
cli_obj = HermesCLI.__new__(HermesCLI)
malformed_result = """{'success': true, 'jobs': [{'job_id': 'job-1234567890ab', 'name': 'Nightly Check', 'state': 'scheduled', 'schedule': 'every 1h', 'repeat': 'forever', 'prompt_preview': 'Check server status', 'skills': ['blogwatcher',], 'next_run_at': '2026-04-22T01:00:00Z',},],}"""
with patch("tools.cronjob_tools.cronjob", return_value=malformed_result):
HermesCLI._handle_cron_command(cli_obj, "/cron list")
out = capsys.readouterr().out
assert "Scheduled Jobs:" in out
assert "job-1234567890ab" in out
assert "Nightly Check" in out
assert "blogwatcher" in out

View File

@@ -0,0 +1,172 @@
import queue
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock
from cli import HermesCLI
class _FakeBuffer:
def __init__(self, text="", cursor_position=None):
self.text = text
self.cursor_position = len(text) if cursor_position is None else cursor_position
def reset(self, append_to_history=False):
self.text = ""
self.cursor_position = 0
def _make_cli_stub():
cli = HermesCLI.__new__(HermesCLI)
cli._approval_state = None
cli._approval_deadline = 0
cli._approval_lock = threading.Lock()
cli._clarify_state = None
cli._clarify_freetext = False
cli._clarify_deadline = 0
cli._sudo_state = None
cli._sudo_deadline = 0
cli._secret_state = None
cli._secret_deadline = 0
cli._modal_input_snapshot = None
cli._invalidate = MagicMock()
cli._app = SimpleNamespace(invalidate=MagicMock(), current_buffer=_FakeBuffer())
return cli
def test_approval_display_numbers_choices():
cli = _make_cli_stub()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": queue.Queue(),
}
rendered = "".join(text for _style, text in cli._get_approval_display_fragments())
assert " 1. Allow once" in rendered
assert "2. Allow for this session" in rendered
assert "3. Add to permanent allowlist" in rendered
assert "4. Deny" in rendered
def test_approval_number_shortcut_submits_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_approval_number_shortcut(2) is True
assert response_queue.get_nowait() == "session"
assert cli._approval_state is None
def test_approval_selection_still_submits_selected_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 1,
"response_queue": response_queue,
}
cli._handle_approval_selection()
assert response_queue.get_nowait() == "session"
assert cli._approval_state is None
def test_approval_number_shortcut_handles_view_in_place():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
"description": "disk copy",
"choices": ["once", "session", "always", "deny", "view"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_approval_number_shortcut(5) is True
assert cli._approval_state is not None
assert cli._approval_state["show_full"] is True
assert "view" not in cli._approval_state["choices"]
assert cli._approval_state["selected"] == 3
assert response_queue.empty()
def test_clarify_display_numbers_choices_and_other():
cli = _make_cli_stub()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma", "Delta"],
"selected": 1,
"response_queue": queue.Queue(),
}
rendered = "".join(text for _style, text in cli._get_clarify_display_fragments())
assert "1. Alpha" in rendered
assert " 2. Beta" in rendered
assert "3. Gamma" in rendered
assert "4. Delta" in rendered
assert "5. Other (type your answer)" in rendered
def test_clarify_number_shortcut_submits_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_clarify_number_shortcut(3) is True
assert response_queue.get_nowait() == "Gamma"
assert cli._clarify_state is None
assert cli._clarify_freetext is False
def test_clarify_selection_still_submits_selected_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 1,
"response_queue": response_queue,
}
cli._handle_clarify_selection()
assert response_queue.get_nowait() == "Beta"
assert cli._clarify_state is None
assert cli._clarify_freetext is False
def test_clarify_number_shortcut_activates_other_freetext():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_clarify_number_shortcut(4) is True
assert cli._clarify_state is not None
assert cli._clarify_state["selected"] == 3
assert cli._clarify_freetext is True
assert response_queue.empty()

View File

@@ -1,108 +0,0 @@
import io
import json
import sys
import types
from unittest.mock import MagicMock, patch
def _stub_auxiliary_client():
stub = types.ModuleType("agent.auxiliary_client")
stub.call_llm = lambda *args, **kwargs: None
stub.resolve_provider_client = lambda *args, **kwargs: (None, None)
stub.get_text_auxiliary_client = lambda *args, **kwargs: (None, None)
stub.async_call_llm = lambda *args, **kwargs: None
stub.extract_content_or_reasoning = lambda *args, **kwargs: ""
stub._OR_HEADERS = {}
stub._get_task_timeout = lambda *args, **kwargs: 30
sys.modules["agent.auxiliary_client"] = stub
def test_run_browser_command_repairs_malformed_stdout_envelope(tmp_path):
_stub_auxiliary_client()
from tools.browser_tool import _run_browser_command
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.wait.return_value = 0
fake_session = {
"session_name": "test-session",
"session_id": "test-id",
"cdp_url": None,
}
malformed_stdout = "{'success': true, 'data': {'url': 'https://example.com',},}"
def fake_open(path, mode="r", *args, **kwargs):
path = str(path)
if path.endswith("_stdout_navigate"):
return io.StringIO(malformed_stdout)
if path.endswith("_stderr_navigate"):
return io.StringIO("")
raise FileNotFoundError(path)
with (
patch("tools.browser_tool._find_agent_browser", return_value="/usr/bin/agent-browser"),
patch("tools.browser_tool._get_session_info", return_value=fake_session),
patch("tools.browser_tool._socket_safe_tmpdir", return_value=str(tmp_path)),
patch("tools.browser_tool._merge_browser_path", side_effect=lambda p: p),
patch("tools.interrupt.is_interrupted", return_value=False),
patch("subprocess.Popen", return_value=mock_proc),
patch("os.open", return_value=99),
patch("os.close"),
patch("os.unlink"),
patch("builtins.open", side_effect=fake_open),
):
result = _run_browser_command("task-1", "navigate", ["https://example.com"])
assert result["success"] is True
assert result["data"]["url"] == "https://example.com"
def test_agent_browser_eval_repairs_malformed_json_result():
_stub_auxiliary_client()
from tools.browser_tool import _browser_eval
with patch(
"tools.browser_tool._run_browser_command",
return_value={"success": True, "data": {"result": "{'items': ['a', 'b',],}"}},
):
result = json.loads(_browser_eval("document.body.innerText", task_id="test"))
assert result["success"] is True
assert result["result"] == {"items": ["a", "b"]}
assert result["result_type"] == "dict"
def test_camofox_eval_repairs_malformed_json_result():
_stub_auxiliary_client()
from tools.browser_tool import _camofox_eval
with (
patch("tools.browser_camofox._ensure_tab", return_value={"tab_id": "tab-1", "user_id": "user-1"}),
patch("tools.browser_camofox._post", return_value={"result": "{'count': 3,}"}),
):
result = json.loads(_camofox_eval("2+1", task_id="test"))
assert result["success"] is True
assert result["result"] == {"count": 3}
assert result["result_type"] == "dict"
def test_browser_get_images_repairs_malformed_json_result():
_stub_auxiliary_client()
from tools.browser_tool import browser_get_images
with patch(
"tools.browser_tool._run_browser_command",
return_value={
"success": True,
"data": {
"result": "[{\"src\": \"https://example.com/cat.png\", \"alt\": \"cat\",}]"
},
},
):
result = json.loads(browser_get_images(task_id="test"))
assert result["success"] is True
assert result["count"] == 1
assert result["images"] == [{"src": "https://example.com/cat.png", "alt": "cat"}]
assert "warning" not in result

View File

@@ -67,7 +67,6 @@ from typing import Dict, Any, Optional, List
from pathlib import Path
from agent.auxiliary_client import call_llm
from hermes_constants import get_hermes_home
from utils import repair_and_load_json
try:
from tools.website_policy import check_website_access
@@ -1172,12 +1171,8 @@ def _run_browser_command(
return {"success": False, "error": f"Browser command '{command}' returned no output"}
if stdout_text:
parsed = repair_and_load_json(
stdout_text,
default=None,
context=f"browser_{command}_stdout",
)
if isinstance(parsed, dict):
try:
parsed = json.loads(stdout_text)
# Warn if snapshot came back empty (common sign of daemon/CDP issues)
if command == "snapshot" and parsed.get("success"):
snap_data = parsed.get("data", {})
@@ -1186,35 +1181,35 @@ def _run_browser_command(
"Possible stale daemon or CDP connection issue. "
"returncode=%s", returncode)
return parsed
except json.JSONDecodeError:
raw = stdout_text[:2000]
logger.warning("browser '%s' returned non-JSON output (rc=%s): %s",
command, returncode, raw[:500])
raw = stdout_text[:2000]
logger.warning("browser '%s' returned non-JSON output (rc=%s): %s",
command, returncode, raw[:500])
if command == "screenshot":
stderr_text = (stderr or "").strip()
combined_text = "\n".join(
part for part in [stdout_text, stderr_text] if part
)
recovered_path = _extract_screenshot_path_from_text(combined_text)
if recovered_path and Path(recovered_path).exists():
logger.info(
"browser 'screenshot' recovered file from non-JSON output: %s",
recovered_path,
if command == "screenshot":
stderr_text = (stderr or "").strip()
combined_text = "\n".join(
part for part in [stdout_text, stderr_text] if part
)
return {
"success": True,
"data": {
"path": recovered_path,
"raw": raw,
},
}
recovered_path = _extract_screenshot_path_from_text(combined_text)
return {
"success": False,
"error": f"Non-JSON output from agent-browser for '{command}': {raw}"
}
if recovered_path and Path(recovered_path).exists():
logger.info(
"browser 'screenshot' recovered file from non-JSON output: %s",
recovered_path,
)
return {
"success": True,
"data": {
"path": recovered_path,
"raw": raw,
},
}
return {
"success": False,
"error": f"Non-JSON output from agent-browser for '{command}': {raw}"
}
# Check for errors
if returncode != 0:
@@ -1782,11 +1777,10 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
# is valid JSON, parse it so the model gets structured data.
parsed = raw_result
if isinstance(raw_result, str):
parsed = repair_and_load_json(
raw_result,
default=raw_result,
context="browser_eval_result",
)
try:
parsed = json.loads(raw_result)
except (json.JSONDecodeError, ValueError):
pass # keep as string
return json.dumps({
"success": True,
@@ -1807,11 +1801,10 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
raw_result = resp.get("result") if isinstance(resp, dict) else resp
parsed = raw_result
if isinstance(raw_result, str):
parsed = repair_and_load_json(
raw_result,
default=raw_result,
context="camofox_eval_result",
)
try:
parsed = json.loads(raw_result)
except (json.JSONDecodeError, ValueError):
pass
return json.dumps({
"success": True,
@@ -1911,29 +1904,26 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
if result.get("success"):
data = result.get("data", {})
raw_result = data.get("result", "[]")
warning = None
if isinstance(raw_result, str):
images = repair_and_load_json(
raw_result,
default=None,
context="browser_get_images_result",
)
else:
images = raw_result
if not isinstance(images, list):
images = []
warning = "Could not parse image data"
payload = {
"success": True,
"images": images,
"count": len(images),
}
if warning:
payload["warning"] = warning
return json.dumps(payload, ensure_ascii=False)
try:
# Parse the JSON string returned by JavaScript
if isinstance(raw_result, str):
images = json.loads(raw_result)
else:
images = raw_result
return json.dumps({
"success": True,
"images": images,
"count": len(images)
}, ensure_ascii=False)
except json.JSONDecodeError:
return json.dumps({
"success": True,
"images": [],
"count": 0,
"warning": "Could not parse image data"
}, ensure_ascii=False)
else:
return json.dumps({
"success": False,