Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a23f54d15f | ||
|
|
ffb3257cb5 |
@@ -1,4 +1,4 @@
|
||||
from agent.telemetry_logger import log_token_usage\n"""Shared auxiliary client router for side tasks.
|
||||
"""Shared auxiliary client router for side tasks.
|
||||
|
||||
Provides a single resolution chain so every consumer (context compression,
|
||||
session search, web extraction, vision analysis, browser vision) picks up
|
||||
@@ -34,6 +34,8 @@ Payment / credit exhaustion fallback:
|
||||
their OpenRouter balance but has Codex OAuth or another provider available.
|
||||
"""
|
||||
|
||||
from agent.telemetry_logger import log_token_usage
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -396,7 +398,8 @@ class _CodexCompletionsAdapter:
|
||||
prompt_tokens=getattr(resp_usage, "input_tokens", 0),
|
||||
completion_tokens=getattr(resp_usage, "output_tokens", 0),
|
||||
total_tokens=getattr(resp_usage, "total_tokens", 0),
|
||||
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
)
|
||||
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
except Exception as exc:
|
||||
logger.debug("Codex auxiliary Responses API call failed: %s", exc)
|
||||
raise
|
||||
@@ -529,7 +532,8 @@ class _AnthropicCompletionsAdapter:
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
)
|
||||
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
|
||||
choice = SimpleNamespace(
|
||||
index=0,
|
||||
|
||||
256
cli.py
256
cli.py
@@ -589,7 +589,6 @@ from tools.terminal_tool import set_sudo_password_callback, set_approval_callbac
|
||||
from tools.skills_tool import set_secret_capture_callback
|
||||
from hermes_cli.callbacks import prompt_for_secret
|
||||
from tools.browser_tool import _emergency_cleanup_all_sessions as _cleanup_all_browsers
|
||||
from utils import repair_and_load_json
|
||||
|
||||
# Guard to prevent cleanup from running multiple times on exit
|
||||
_cleanup_done = False
|
||||
@@ -3570,11 +3569,7 @@ class HermesCLI:
|
||||
result_json = _asyncio.run(
|
||||
vision_analyze_tool(image_url=str(img_path), user_prompt=analysis_prompt)
|
||||
)
|
||||
result = repair_and_load_json(
|
||||
result_json,
|
||||
default={},
|
||||
context="cli_image_analysis",
|
||||
) if isinstance(result_json, str) else {}
|
||||
result = _json.loads(result_json)
|
||||
if result.get("success"):
|
||||
description = result.get("analysis", "")
|
||||
enriched_parts.append(
|
||||
@@ -4965,14 +4960,7 @@ class HermesCLI:
|
||||
from tools.cronjob_tools import cronjob as cronjob_tool
|
||||
|
||||
def _cron_api(**kwargs):
|
||||
result = repair_and_load_json(
|
||||
cronjob_tool(**kwargs),
|
||||
default=None,
|
||||
context="cli_cron_command",
|
||||
)
|
||||
if isinstance(result, dict):
|
||||
return result
|
||||
return {"success": False, "error": "Invalid JSON from cronjob tool"}
|
||||
return json.loads(cronjob_tool(**kwargs))
|
||||
|
||||
def _normalize_skills(values):
|
||||
normalized = []
|
||||
@@ -7266,6 +7254,40 @@ class HermesCLI:
|
||||
"Use your best judgement to make the choice and proceed."
|
||||
)
|
||||
|
||||
def _handle_clarify_selection(self) -> None:
|
||||
"""Process the currently selected clarify choice."""
|
||||
state = self._clarify_state
|
||||
if not state or self._clarify_freetext:
|
||||
return
|
||||
|
||||
selected = state.get("selected", 0)
|
||||
choices = state.get("choices") or []
|
||||
if selected < len(choices):
|
||||
state["response_queue"].put(choices[selected])
|
||||
self._clarify_state = None
|
||||
self._clarify_freetext = False
|
||||
self._invalidate()
|
||||
return
|
||||
|
||||
if selected == len(choices):
|
||||
self._clarify_freetext = True
|
||||
self._invalidate()
|
||||
|
||||
def _handle_clarify_number_shortcut(self, number: int) -> bool:
|
||||
"""Select a clarify option by number key."""
|
||||
state = self._clarify_state
|
||||
if not state or self._clarify_freetext:
|
||||
return False
|
||||
|
||||
choices = state.get("choices") or []
|
||||
max_option = len(choices) + 1
|
||||
if number < 1 or number > max_option:
|
||||
return False
|
||||
|
||||
state["selected"] = number - 1
|
||||
self._handle_clarify_selection()
|
||||
return True
|
||||
|
||||
def _sudo_password_callback(self) -> str:
|
||||
"""
|
||||
Prompt for sudo password through the prompt_toolkit UI.
|
||||
@@ -7374,6 +7396,20 @@ class HermesCLI:
|
||||
choices.append("view")
|
||||
return choices
|
||||
|
||||
def _handle_approval_number_shortcut(self, number: int) -> bool:
|
||||
"""Select an approval option by number key."""
|
||||
state = self._approval_state
|
||||
if not state:
|
||||
return False
|
||||
|
||||
choices = state.get("choices") or []
|
||||
if number < 1 or number > len(choices):
|
||||
return False
|
||||
|
||||
state["selected"] = number - 1
|
||||
self._handle_approval_selection()
|
||||
return True
|
||||
|
||||
def _handle_approval_selection(self) -> None:
|
||||
"""Process the currently selected dangerous-command approval choice."""
|
||||
state = self._approval_state
|
||||
@@ -7449,8 +7485,9 @@ class HermesCLI:
|
||||
preview_lines.extend(_wrap_panel_text(cmd_display, 60))
|
||||
for i, choice in enumerate(choices):
|
||||
prefix = '❯ ' if i == selected else ' '
|
||||
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
|
||||
preview_lines.extend(_wrap_panel_text(
|
||||
f"{prefix}{choice_labels.get(choice, choice)}",
|
||||
f"{prefix}{label}",
|
||||
60,
|
||||
subsequent_indent=" ",
|
||||
))
|
||||
@@ -7468,7 +7505,7 @@ class HermesCLI:
|
||||
_append_panel_line(lines, 'class:approval-border', 'class:approval-cmd', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:approval-border', box_width)
|
||||
for i, choice in enumerate(choices):
|
||||
label = choice_labels.get(choice, choice)
|
||||
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
|
||||
style = 'class:approval-selected' if i == selected else 'class:approval-choice'
|
||||
prefix = '❯ ' if i == selected else ' '
|
||||
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
|
||||
@@ -7477,6 +7514,97 @@ class HermesCLI:
|
||||
lines.append(('class:approval-border', '╰' + ('─' * box_width) + '╯\n'))
|
||||
return lines
|
||||
|
||||
def _get_clarify_display_fragments(self):
|
||||
"""Render the clarify panel for the prompt_toolkit UI."""
|
||||
state = self._clarify_state
|
||||
if not state:
|
||||
return []
|
||||
|
||||
def _panel_box_width(title: str, content_lines: list[str], min_width: int = 46, max_width: int = 76) -> int:
|
||||
term_cols = shutil.get_terminal_size((100, 20)).columns
|
||||
longest = max([len(title)] + [len(line) for line in content_lines] + [min_width - 4])
|
||||
inner = min(max(longest + 4, min_width - 2), max_width - 2, max(24, term_cols - 6))
|
||||
return inner + 2
|
||||
|
||||
def _wrap_panel_text(text: str, width: int, subsequent_indent: str = "") -> list[str]:
|
||||
wrapped = textwrap.wrap(
|
||||
text,
|
||||
width=max(8, width),
|
||||
break_long_words=False,
|
||||
break_on_hyphens=False,
|
||||
subsequent_indent=subsequent_indent,
|
||||
)
|
||||
return wrapped or [""]
|
||||
|
||||
def _append_panel_line(lines, border_style: str, content_style: str, text: str, box_width: int) -> None:
|
||||
inner_width = max(0, box_width - 2)
|
||||
lines.append((border_style, "│ "))
|
||||
lines.append((content_style, text.ljust(inner_width)))
|
||||
lines.append((border_style, " │\n"))
|
||||
|
||||
def _append_blank_panel_line(lines, border_style: str, box_width: int) -> None:
|
||||
lines.append((border_style, "│" + (" " * box_width) + "│\n"))
|
||||
|
||||
question = state["question"]
|
||||
choices = state.get("choices") or []
|
||||
selected = state.get("selected", 0)
|
||||
preview_lines = _wrap_panel_text(question, 60)
|
||||
for i, choice in enumerate(choices):
|
||||
prefix = "❯ " if i == selected and not self._clarify_freetext else " "
|
||||
label = f"{i + 1}. {choice}"
|
||||
preview_lines.extend(_wrap_panel_text(f"{prefix}{label}", 60, subsequent_indent=" "))
|
||||
other_number = len(choices) + 1
|
||||
other_label = (
|
||||
f"❯ {other_number}. Other (type below)" if self._clarify_freetext
|
||||
else f"❯ {other_number}. Other (type your answer)" if selected == len(choices)
|
||||
else f" {other_number}. Other (type your answer)"
|
||||
)
|
||||
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
|
||||
box_width = _panel_box_width("Hermes needs your input", preview_lines)
|
||||
inner_text_width = max(8, box_width - 2)
|
||||
|
||||
lines = []
|
||||
lines.append(('class:clarify-border', '╭─ '))
|
||||
lines.append(('class:clarify-title', 'Hermes needs your input'))
|
||||
lines.append(('class:clarify-border', ' ' + ('─' * max(0, box_width - len("Hermes needs your input") - 3)) + '╮\n'))
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
for wrapped in _wrap_panel_text(question, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if self._clarify_freetext and not choices:
|
||||
guidance = "Type your answer in the prompt below, then press Enter."
|
||||
for wrapped in _wrap_panel_text(guidance, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if choices:
|
||||
for i, choice in enumerate(choices):
|
||||
style = 'class:clarify-selected' if i == selected and not self._clarify_freetext else 'class:clarify-choice'
|
||||
prefix = '❯ ' if i == selected and not self._clarify_freetext else ' '
|
||||
label = f"{i + 1}. {choice}"
|
||||
wrapped_lines = _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" ")
|
||||
for wrapped in wrapped_lines:
|
||||
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
|
||||
|
||||
other_idx = len(choices)
|
||||
if selected == other_idx and not self._clarify_freetext:
|
||||
other_style = 'class:clarify-selected'
|
||||
other_label = f'❯ {other_number}. Other (type your answer)'
|
||||
elif self._clarify_freetext:
|
||||
other_style = 'class:clarify-active-other'
|
||||
other_label = f'❯ {other_number}. Other (type below)'
|
||||
else:
|
||||
other_style = 'class:clarify-choice'
|
||||
other_label = f' {other_number}. Other (type your answer)'
|
||||
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
|
||||
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
|
||||
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
lines.append(('class:clarify-border', '╰' + ('─' * box_width) + '╯\n'))
|
||||
return lines
|
||||
|
||||
def _secret_capture_callback(self, var_name: str, prompt: str, metadata=None) -> dict:
|
||||
return prompt_for_secret(self, var_name, prompt, metadata)
|
||||
|
||||
@@ -8383,17 +8511,8 @@ class HermesCLI:
|
||||
|
||||
# --- Clarify choice mode: confirm the highlighted selection ---
|
||||
if self._clarify_state and not self._clarify_freetext:
|
||||
state = self._clarify_state
|
||||
selected = state["selected"]
|
||||
choices = state.get("choices") or []
|
||||
if selected < len(choices):
|
||||
state["response_queue"].put(choices[selected])
|
||||
self._clarify_state = None
|
||||
event.app.invalidate()
|
||||
else:
|
||||
# "Other" selected → switch to freetext
|
||||
self._clarify_freetext = True
|
||||
event.app.invalidate()
|
||||
self._handle_clarify_selection()
|
||||
event.app.invalidate()
|
||||
return
|
||||
|
||||
# --- Normal input routing ---
|
||||
@@ -8513,6 +8632,19 @@ class HermesCLI:
|
||||
self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1)
|
||||
event.app.invalidate()
|
||||
|
||||
# --- Numbered shortcuts for clarify / approval modal prompts ---
|
||||
for _digit in '123456789':
|
||||
@kb.add(_digit, filter=Condition(lambda: bool(self._approval_state) or (bool(self._clarify_state) and not self._clarify_freetext)))
|
||||
def _handle_modal_number(event, digit=_digit):
|
||||
number = int(digit)
|
||||
handled = False
|
||||
if self._approval_state:
|
||||
handled = self._handle_approval_number_shortcut(number)
|
||||
elif self._clarify_state and not self._clarify_freetext:
|
||||
handled = self._handle_clarify_number_shortcut(number)
|
||||
if handled:
|
||||
event.app.invalidate()
|
||||
|
||||
# --- /model picker: arrow-key navigation ---
|
||||
@kb.add('up', filter=Condition(lambda: bool(self._model_picker_state)))
|
||||
def model_picker_up(event):
|
||||
@@ -9007,7 +9139,7 @@ class HermesCLI:
|
||||
if cli_ref._approval_state:
|
||||
remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic()))
|
||||
return [
|
||||
('class:hint', ' ↑/↓ to select, Enter to confirm'),
|
||||
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
|
||||
('class:clarify-countdown', f' ({remaining}s)'),
|
||||
]
|
||||
|
||||
@@ -9020,7 +9152,7 @@ class HermesCLI:
|
||||
('class:clarify-countdown', countdown),
|
||||
]
|
||||
return [
|
||||
('class:hint', ' ↑/↓ to select, Enter to confirm'),
|
||||
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
|
||||
('class:clarify-countdown', countdown),
|
||||
]
|
||||
|
||||
@@ -9098,71 +9230,7 @@ class HermesCLI:
|
||||
lines.append((border_style, "│" + (" " * box_width) + "│\n"))
|
||||
|
||||
def _get_clarify_display():
|
||||
"""Build styled text for the clarify question/choices panel."""
|
||||
state = cli_ref._clarify_state
|
||||
if not state:
|
||||
return []
|
||||
|
||||
question = state["question"]
|
||||
choices = state.get("choices") or []
|
||||
selected = state.get("selected", 0)
|
||||
preview_lines = _wrap_panel_text(question, 60)
|
||||
for i, choice in enumerate(choices):
|
||||
prefix = "❯ " if i == selected and not cli_ref._clarify_freetext else " "
|
||||
preview_lines.extend(_wrap_panel_text(f"{prefix}{choice}", 60, subsequent_indent=" "))
|
||||
other_label = (
|
||||
"❯ Other (type below)" if cli_ref._clarify_freetext
|
||||
else "❯ Other (type your answer)" if selected == len(choices)
|
||||
else " Other (type your answer)"
|
||||
)
|
||||
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
|
||||
box_width = _panel_box_width("Hermes needs your input", preview_lines)
|
||||
inner_text_width = max(8, box_width - 2)
|
||||
|
||||
lines = []
|
||||
# Box top border
|
||||
lines.append(('class:clarify-border', '╭─ '))
|
||||
lines.append(('class:clarify-title', 'Hermes needs your input'))
|
||||
lines.append(('class:clarify-border', ' ' + ('─' * max(0, box_width - len("Hermes needs your input") - 3)) + '╮\n'))
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
# Question text
|
||||
for wrapped in _wrap_panel_text(question, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if cli_ref._clarify_freetext and not choices:
|
||||
guidance = "Type your answer in the prompt below, then press Enter."
|
||||
for wrapped in _wrap_panel_text(guidance, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if choices:
|
||||
# Multiple-choice mode: show selectable options
|
||||
for i, choice in enumerate(choices):
|
||||
style = 'class:clarify-selected' if i == selected and not cli_ref._clarify_freetext else 'class:clarify-choice'
|
||||
prefix = '❯ ' if i == selected and not cli_ref._clarify_freetext else ' '
|
||||
wrapped_lines = _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" ")
|
||||
for wrapped in wrapped_lines:
|
||||
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
|
||||
|
||||
# "Other" option (5th line, only shown when choices exist)
|
||||
other_idx = len(choices)
|
||||
if selected == other_idx and not cli_ref._clarify_freetext:
|
||||
other_style = 'class:clarify-selected'
|
||||
other_label = '❯ Other (type your answer)'
|
||||
elif cli_ref._clarify_freetext:
|
||||
other_style = 'class:clarify-active-other'
|
||||
other_label = '❯ Other (type below)'
|
||||
else:
|
||||
other_style = 'class:clarify-choice'
|
||||
other_label = ' Other (type your answer)'
|
||||
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
|
||||
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
|
||||
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
lines.append(('class:clarify-border', '╰' + ('─' * box_width) + '╯\n'))
|
||||
return lines
|
||||
return cli_ref._get_clarify_display_fragments()
|
||||
|
||||
clarify_widget = ConditionalContainer(
|
||||
Window(
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
import sys
|
||||
import types
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
def _stub_auxiliary_client():
|
||||
stub = types.ModuleType("agent.auxiliary_client")
|
||||
stub.call_llm = lambda *args, **kwargs: None
|
||||
stub.resolve_provider_client = lambda *args, **kwargs: (None, None)
|
||||
stub.get_text_auxiliary_client = lambda *args, **kwargs: (None, None)
|
||||
stub.async_call_llm = lambda *args, **kwargs: None
|
||||
stub.extract_content_or_reasoning = lambda *args, **kwargs: ""
|
||||
stub._OR_HEADERS = {}
|
||||
stub._get_task_timeout = lambda *args, **kwargs: 30
|
||||
sys.modules["agent.auxiliary_client"] = stub
|
||||
|
||||
|
||||
def _stub_vision_tools(vision_analyze_tool):
|
||||
stub = types.ModuleType("tools.vision_tools")
|
||||
stub.vision_analyze_tool = vision_analyze_tool
|
||||
sys.modules["tools.vision_tools"] = stub
|
||||
|
||||
|
||||
def test_preprocess_images_with_vision_repairs_malformed_json(tmp_path):
|
||||
_stub_auxiliary_client()
|
||||
from cli import HermesCLI
|
||||
|
||||
cli_obj = HermesCLI.__new__(HermesCLI)
|
||||
image_path = tmp_path / "test.png"
|
||||
image_path.write_bytes(b"fake-image-bytes")
|
||||
|
||||
async def fake_vision(**kwargs):
|
||||
return "{'success': true, 'analysis': 'Recovered image description',}"
|
||||
|
||||
_stub_vision_tools(fake_vision)
|
||||
result = HermesCLI._preprocess_images_with_vision(
|
||||
cli_obj,
|
||||
"Describe this",
|
||||
[image_path],
|
||||
announce=False,
|
||||
)
|
||||
|
||||
assert "Recovered image description" in result
|
||||
assert "Describe this" in result
|
||||
assert str(image_path) in result
|
||||
|
||||
|
||||
def test_handle_cron_command_repairs_malformed_json(capsys):
|
||||
_stub_auxiliary_client()
|
||||
from cli import HermesCLI
|
||||
|
||||
cli_obj = HermesCLI.__new__(HermesCLI)
|
||||
malformed_result = """{'success': true, 'jobs': [{'job_id': 'job-1234567890ab', 'name': 'Nightly Check', 'state': 'scheduled', 'schedule': 'every 1h', 'repeat': 'forever', 'prompt_preview': 'Check server status', 'skills': ['blogwatcher',], 'next_run_at': '2026-04-22T01:00:00Z',},],}"""
|
||||
|
||||
with patch("tools.cronjob_tools.cronjob", return_value=malformed_result):
|
||||
HermesCLI._handle_cron_command(cli_obj, "/cron list")
|
||||
|
||||
out = capsys.readouterr().out
|
||||
assert "Scheduled Jobs:" in out
|
||||
assert "job-1234567890ab" in out
|
||||
assert "Nightly Check" in out
|
||||
assert "blogwatcher" in out
|
||||
172
tests/cli/test_cli_numbered_prompt_shortcuts.py
Normal file
172
tests/cli/test_cli_numbered_prompt_shortcuts.py
Normal file
@@ -0,0 +1,172 @@
|
||||
import queue
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from cli import HermesCLI
|
||||
|
||||
|
||||
class _FakeBuffer:
|
||||
def __init__(self, text="", cursor_position=None):
|
||||
self.text = text
|
||||
self.cursor_position = len(text) if cursor_position is None else cursor_position
|
||||
|
||||
def reset(self, append_to_history=False):
|
||||
self.text = ""
|
||||
self.cursor_position = 0
|
||||
|
||||
|
||||
def _make_cli_stub():
|
||||
cli = HermesCLI.__new__(HermesCLI)
|
||||
cli._approval_state = None
|
||||
cli._approval_deadline = 0
|
||||
cli._approval_lock = threading.Lock()
|
||||
cli._clarify_state = None
|
||||
cli._clarify_freetext = False
|
||||
cli._clarify_deadline = 0
|
||||
cli._sudo_state = None
|
||||
cli._sudo_deadline = 0
|
||||
cli._secret_state = None
|
||||
cli._secret_deadline = 0
|
||||
cli._modal_input_snapshot = None
|
||||
cli._invalidate = MagicMock()
|
||||
cli._app = SimpleNamespace(invalidate=MagicMock(), current_buffer=_FakeBuffer())
|
||||
return cli
|
||||
|
||||
|
||||
def test_approval_display_numbers_choices():
|
||||
cli = _make_cli_stub()
|
||||
cli._approval_state = {
|
||||
"command": "sudo rm -rf /tmp/example",
|
||||
"description": "dangerous command",
|
||||
"choices": ["once", "session", "always", "deny"],
|
||||
"selected": 0,
|
||||
"response_queue": queue.Queue(),
|
||||
}
|
||||
|
||||
rendered = "".join(text for _style, text in cli._get_approval_display_fragments())
|
||||
|
||||
assert "❯ 1. Allow once" in rendered
|
||||
assert "2. Allow for this session" in rendered
|
||||
assert "3. Add to permanent allowlist" in rendered
|
||||
assert "4. Deny" in rendered
|
||||
|
||||
|
||||
def test_approval_number_shortcut_submits_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._approval_state = {
|
||||
"command": "sudo rm -rf /tmp/example",
|
||||
"description": "dangerous command",
|
||||
"choices": ["once", "session", "always", "deny"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_approval_number_shortcut(2) is True
|
||||
assert response_queue.get_nowait() == "session"
|
||||
assert cli._approval_state is None
|
||||
|
||||
|
||||
def test_approval_selection_still_submits_selected_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._approval_state = {
|
||||
"command": "sudo rm -rf /tmp/example",
|
||||
"description": "dangerous command",
|
||||
"choices": ["once", "session", "always", "deny"],
|
||||
"selected": 1,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
cli._handle_approval_selection()
|
||||
|
||||
assert response_queue.get_nowait() == "session"
|
||||
assert cli._approval_state is None
|
||||
|
||||
|
||||
def test_approval_number_shortcut_handles_view_in_place():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._approval_state = {
|
||||
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
|
||||
"description": "disk copy",
|
||||
"choices": ["once", "session", "always", "deny", "view"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_approval_number_shortcut(5) is True
|
||||
assert cli._approval_state is not None
|
||||
assert cli._approval_state["show_full"] is True
|
||||
assert "view" not in cli._approval_state["choices"]
|
||||
assert cli._approval_state["selected"] == 3
|
||||
assert response_queue.empty()
|
||||
|
||||
|
||||
def test_clarify_display_numbers_choices_and_other():
|
||||
cli = _make_cli_stub()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma", "Delta"],
|
||||
"selected": 1,
|
||||
"response_queue": queue.Queue(),
|
||||
}
|
||||
|
||||
rendered = "".join(text for _style, text in cli._get_clarify_display_fragments())
|
||||
|
||||
assert "1. Alpha" in rendered
|
||||
assert "❯ 2. Beta" in rendered
|
||||
assert "3. Gamma" in rendered
|
||||
assert "4. Delta" in rendered
|
||||
assert "5. Other (type your answer)" in rendered
|
||||
|
||||
|
||||
def test_clarify_number_shortcut_submits_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_clarify_number_shortcut(3) is True
|
||||
assert response_queue.get_nowait() == "Gamma"
|
||||
assert cli._clarify_state is None
|
||||
assert cli._clarify_freetext is False
|
||||
|
||||
|
||||
def test_clarify_selection_still_submits_selected_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma"],
|
||||
"selected": 1,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
cli._handle_clarify_selection()
|
||||
|
||||
assert response_queue.get_nowait() == "Beta"
|
||||
assert cli._clarify_state is None
|
||||
assert cli._clarify_freetext is False
|
||||
|
||||
|
||||
def test_clarify_number_shortcut_activates_other_freetext():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_clarify_number_shortcut(4) is True
|
||||
assert cli._clarify_state is not None
|
||||
assert cli._clarify_state["selected"] == 3
|
||||
assert cli._clarify_freetext is True
|
||||
assert response_queue.empty()
|
||||
@@ -1,108 +0,0 @@
|
||||
import io
|
||||
import json
|
||||
import sys
|
||||
import types
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
def _stub_auxiliary_client():
|
||||
stub = types.ModuleType("agent.auxiliary_client")
|
||||
stub.call_llm = lambda *args, **kwargs: None
|
||||
stub.resolve_provider_client = lambda *args, **kwargs: (None, None)
|
||||
stub.get_text_auxiliary_client = lambda *args, **kwargs: (None, None)
|
||||
stub.async_call_llm = lambda *args, **kwargs: None
|
||||
stub.extract_content_or_reasoning = lambda *args, **kwargs: ""
|
||||
stub._OR_HEADERS = {}
|
||||
stub._get_task_timeout = lambda *args, **kwargs: 30
|
||||
sys.modules["agent.auxiliary_client"] = stub
|
||||
|
||||
|
||||
def test_run_browser_command_repairs_malformed_stdout_envelope(tmp_path):
|
||||
_stub_auxiliary_client()
|
||||
from tools.browser_tool import _run_browser_command
|
||||
|
||||
mock_proc = MagicMock()
|
||||
mock_proc.returncode = 0
|
||||
mock_proc.wait.return_value = 0
|
||||
fake_session = {
|
||||
"session_name": "test-session",
|
||||
"session_id": "test-id",
|
||||
"cdp_url": None,
|
||||
}
|
||||
malformed_stdout = "{'success': true, 'data': {'url': 'https://example.com',},}"
|
||||
|
||||
def fake_open(path, mode="r", *args, **kwargs):
|
||||
path = str(path)
|
||||
if path.endswith("_stdout_navigate"):
|
||||
return io.StringIO(malformed_stdout)
|
||||
if path.endswith("_stderr_navigate"):
|
||||
return io.StringIO("")
|
||||
raise FileNotFoundError(path)
|
||||
|
||||
with (
|
||||
patch("tools.browser_tool._find_agent_browser", return_value="/usr/bin/agent-browser"),
|
||||
patch("tools.browser_tool._get_session_info", return_value=fake_session),
|
||||
patch("tools.browser_tool._socket_safe_tmpdir", return_value=str(tmp_path)),
|
||||
patch("tools.browser_tool._merge_browser_path", side_effect=lambda p: p),
|
||||
patch("tools.interrupt.is_interrupted", return_value=False),
|
||||
patch("subprocess.Popen", return_value=mock_proc),
|
||||
patch("os.open", return_value=99),
|
||||
patch("os.close"),
|
||||
patch("os.unlink"),
|
||||
patch("builtins.open", side_effect=fake_open),
|
||||
):
|
||||
result = _run_browser_command("task-1", "navigate", ["https://example.com"])
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["data"]["url"] == "https://example.com"
|
||||
|
||||
|
||||
def test_agent_browser_eval_repairs_malformed_json_result():
|
||||
_stub_auxiliary_client()
|
||||
from tools.browser_tool import _browser_eval
|
||||
|
||||
with patch(
|
||||
"tools.browser_tool._run_browser_command",
|
||||
return_value={"success": True, "data": {"result": "{'items': ['a', 'b',],}"}},
|
||||
):
|
||||
result = json.loads(_browser_eval("document.body.innerText", task_id="test"))
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["result"] == {"items": ["a", "b"]}
|
||||
assert result["result_type"] == "dict"
|
||||
|
||||
|
||||
def test_camofox_eval_repairs_malformed_json_result():
|
||||
_stub_auxiliary_client()
|
||||
from tools.browser_tool import _camofox_eval
|
||||
|
||||
with (
|
||||
patch("tools.browser_camofox._ensure_tab", return_value={"tab_id": "tab-1", "user_id": "user-1"}),
|
||||
patch("tools.browser_camofox._post", return_value={"result": "{'count': 3,}"}),
|
||||
):
|
||||
result = json.loads(_camofox_eval("2+1", task_id="test"))
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["result"] == {"count": 3}
|
||||
assert result["result_type"] == "dict"
|
||||
|
||||
|
||||
def test_browser_get_images_repairs_malformed_json_result():
|
||||
_stub_auxiliary_client()
|
||||
from tools.browser_tool import browser_get_images
|
||||
|
||||
with patch(
|
||||
"tools.browser_tool._run_browser_command",
|
||||
return_value={
|
||||
"success": True,
|
||||
"data": {
|
||||
"result": "[{\"src\": \"https://example.com/cat.png\", \"alt\": \"cat\",}]"
|
||||
},
|
||||
},
|
||||
):
|
||||
result = json.loads(browser_get_images(task_id="test"))
|
||||
|
||||
assert result["success"] is True
|
||||
assert result["count"] == 1
|
||||
assert result["images"] == [{"src": "https://example.com/cat.png", "alt": "cat"}]
|
||||
assert "warning" not in result
|
||||
@@ -67,7 +67,6 @@ from typing import Dict, Any, Optional, List
|
||||
from pathlib import Path
|
||||
from agent.auxiliary_client import call_llm
|
||||
from hermes_constants import get_hermes_home
|
||||
from utils import repair_and_load_json
|
||||
|
||||
try:
|
||||
from tools.website_policy import check_website_access
|
||||
@@ -1172,12 +1171,8 @@ def _run_browser_command(
|
||||
return {"success": False, "error": f"Browser command '{command}' returned no output"}
|
||||
|
||||
if stdout_text:
|
||||
parsed = repair_and_load_json(
|
||||
stdout_text,
|
||||
default=None,
|
||||
context=f"browser_{command}_stdout",
|
||||
)
|
||||
if isinstance(parsed, dict):
|
||||
try:
|
||||
parsed = json.loads(stdout_text)
|
||||
# Warn if snapshot came back empty (common sign of daemon/CDP issues)
|
||||
if command == "snapshot" and parsed.get("success"):
|
||||
snap_data = parsed.get("data", {})
|
||||
@@ -1186,35 +1181,35 @@ def _run_browser_command(
|
||||
"Possible stale daemon or CDP connection issue. "
|
||||
"returncode=%s", returncode)
|
||||
return parsed
|
||||
except json.JSONDecodeError:
|
||||
raw = stdout_text[:2000]
|
||||
logger.warning("browser '%s' returned non-JSON output (rc=%s): %s",
|
||||
command, returncode, raw[:500])
|
||||
|
||||
raw = stdout_text[:2000]
|
||||
logger.warning("browser '%s' returned non-JSON output (rc=%s): %s",
|
||||
command, returncode, raw[:500])
|
||||
|
||||
if command == "screenshot":
|
||||
stderr_text = (stderr or "").strip()
|
||||
combined_text = "\n".join(
|
||||
part for part in [stdout_text, stderr_text] if part
|
||||
)
|
||||
recovered_path = _extract_screenshot_path_from_text(combined_text)
|
||||
|
||||
if recovered_path and Path(recovered_path).exists():
|
||||
logger.info(
|
||||
"browser 'screenshot' recovered file from non-JSON output: %s",
|
||||
recovered_path,
|
||||
if command == "screenshot":
|
||||
stderr_text = (stderr or "").strip()
|
||||
combined_text = "\n".join(
|
||||
part for part in [stdout_text, stderr_text] if part
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"data": {
|
||||
"path": recovered_path,
|
||||
"raw": raw,
|
||||
},
|
||||
}
|
||||
recovered_path = _extract_screenshot_path_from_text(combined_text)
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Non-JSON output from agent-browser for '{command}': {raw}"
|
||||
}
|
||||
if recovered_path and Path(recovered_path).exists():
|
||||
logger.info(
|
||||
"browser 'screenshot' recovered file from non-JSON output: %s",
|
||||
recovered_path,
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"data": {
|
||||
"path": recovered_path,
|
||||
"raw": raw,
|
||||
},
|
||||
}
|
||||
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Non-JSON output from agent-browser for '{command}': {raw}"
|
||||
}
|
||||
|
||||
# Check for errors
|
||||
if returncode != 0:
|
||||
@@ -1782,11 +1777,10 @@ def _browser_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||
# is valid JSON, parse it so the model gets structured data.
|
||||
parsed = raw_result
|
||||
if isinstance(raw_result, str):
|
||||
parsed = repair_and_load_json(
|
||||
raw_result,
|
||||
default=raw_result,
|
||||
context="browser_eval_result",
|
||||
)
|
||||
try:
|
||||
parsed = json.loads(raw_result)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass # keep as string
|
||||
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
@@ -1807,11 +1801,10 @@ def _camofox_eval(expression: str, task_id: Optional[str] = None) -> str:
|
||||
raw_result = resp.get("result") if isinstance(resp, dict) else resp
|
||||
parsed = raw_result
|
||||
if isinstance(raw_result, str):
|
||||
parsed = repair_and_load_json(
|
||||
raw_result,
|
||||
default=raw_result,
|
||||
context="camofox_eval_result",
|
||||
)
|
||||
try:
|
||||
parsed = json.loads(raw_result)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
@@ -1911,29 +1904,26 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
|
||||
if result.get("success"):
|
||||
data = result.get("data", {})
|
||||
raw_result = data.get("result", "[]")
|
||||
|
||||
warning = None
|
||||
if isinstance(raw_result, str):
|
||||
images = repair_and_load_json(
|
||||
raw_result,
|
||||
default=None,
|
||||
context="browser_get_images_result",
|
||||
)
|
||||
else:
|
||||
images = raw_result
|
||||
|
||||
if not isinstance(images, list):
|
||||
images = []
|
||||
warning = "Could not parse image data"
|
||||
|
||||
payload = {
|
||||
"success": True,
|
||||
"images": images,
|
||||
"count": len(images),
|
||||
}
|
||||
if warning:
|
||||
payload["warning"] = warning
|
||||
return json.dumps(payload, ensure_ascii=False)
|
||||
|
||||
try:
|
||||
# Parse the JSON string returned by JavaScript
|
||||
if isinstance(raw_result, str):
|
||||
images = json.loads(raw_result)
|
||||
else:
|
||||
images = raw_result
|
||||
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"images": images,
|
||||
"count": len(images)
|
||||
}, ensure_ascii=False)
|
||||
except json.JSONDecodeError:
|
||||
return json.dumps({
|
||||
"success": True,
|
||||
"images": [],
|
||||
"count": 0,
|
||||
"warning": "Could not parse image data"
|
||||
}, ensure_ascii=False)
|
||||
else:
|
||||
return json.dumps({
|
||||
"success": False,
|
||||
|
||||
Reference in New Issue
Block a user