Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a23f54d15f | ||
|
|
ffb3257cb5 |
@@ -1,4 +1,4 @@
|
||||
from agent.telemetry_logger import log_token_usage\n"""Shared auxiliary client router for side tasks.
|
||||
"""Shared auxiliary client router for side tasks.
|
||||
|
||||
Provides a single resolution chain so every consumer (context compression,
|
||||
session search, web extraction, vision analysis, browser vision) picks up
|
||||
@@ -34,6 +34,8 @@ Payment / credit exhaustion fallback:
|
||||
their OpenRouter balance but has Codex OAuth or another provider available.
|
||||
"""
|
||||
|
||||
from agent.telemetry_logger import log_token_usage
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
@@ -396,7 +398,8 @@ class _CodexCompletionsAdapter:
|
||||
prompt_tokens=getattr(resp_usage, "input_tokens", 0),
|
||||
completion_tokens=getattr(resp_usage, "output_tokens", 0),
|
||||
total_tokens=getattr(resp_usage, "total_tokens", 0),
|
||||
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
)
|
||||
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
except Exception as exc:
|
||||
logger.debug("Codex auxiliary Responses API call failed: %s", exc)
|
||||
raise
|
||||
@@ -529,7 +532,8 @@ class _AnthropicCompletionsAdapter:
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
)
|
||||
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
|
||||
|
||||
choice = SimpleNamespace(
|
||||
index=0,
|
||||
|
||||
240
cli.py
240
cli.py
@@ -7254,6 +7254,40 @@ class HermesCLI:
|
||||
"Use your best judgement to make the choice and proceed."
|
||||
)
|
||||
|
||||
def _handle_clarify_selection(self) -> None:
|
||||
"""Process the currently selected clarify choice."""
|
||||
state = self._clarify_state
|
||||
if not state or self._clarify_freetext:
|
||||
return
|
||||
|
||||
selected = state.get("selected", 0)
|
||||
choices = state.get("choices") or []
|
||||
if selected < len(choices):
|
||||
state["response_queue"].put(choices[selected])
|
||||
self._clarify_state = None
|
||||
self._clarify_freetext = False
|
||||
self._invalidate()
|
||||
return
|
||||
|
||||
if selected == len(choices):
|
||||
self._clarify_freetext = True
|
||||
self._invalidate()
|
||||
|
||||
def _handle_clarify_number_shortcut(self, number: int) -> bool:
|
||||
"""Select a clarify option by number key."""
|
||||
state = self._clarify_state
|
||||
if not state or self._clarify_freetext:
|
||||
return False
|
||||
|
||||
choices = state.get("choices") or []
|
||||
max_option = len(choices) + 1
|
||||
if number < 1 or number > max_option:
|
||||
return False
|
||||
|
||||
state["selected"] = number - 1
|
||||
self._handle_clarify_selection()
|
||||
return True
|
||||
|
||||
def _sudo_password_callback(self) -> str:
|
||||
"""
|
||||
Prompt for sudo password through the prompt_toolkit UI.
|
||||
@@ -7362,6 +7396,20 @@ class HermesCLI:
|
||||
choices.append("view")
|
||||
return choices
|
||||
|
||||
def _handle_approval_number_shortcut(self, number: int) -> bool:
|
||||
"""Select an approval option by number key."""
|
||||
state = self._approval_state
|
||||
if not state:
|
||||
return False
|
||||
|
||||
choices = state.get("choices") or []
|
||||
if number < 1 or number > len(choices):
|
||||
return False
|
||||
|
||||
state["selected"] = number - 1
|
||||
self._handle_approval_selection()
|
||||
return True
|
||||
|
||||
def _handle_approval_selection(self) -> None:
|
||||
"""Process the currently selected dangerous-command approval choice."""
|
||||
state = self._approval_state
|
||||
@@ -7437,8 +7485,9 @@ class HermesCLI:
|
||||
preview_lines.extend(_wrap_panel_text(cmd_display, 60))
|
||||
for i, choice in enumerate(choices):
|
||||
prefix = '❯ ' if i == selected else ' '
|
||||
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
|
||||
preview_lines.extend(_wrap_panel_text(
|
||||
f"{prefix}{choice_labels.get(choice, choice)}",
|
||||
f"{prefix}{label}",
|
||||
60,
|
||||
subsequent_indent=" ",
|
||||
))
|
||||
@@ -7456,7 +7505,7 @@ class HermesCLI:
|
||||
_append_panel_line(lines, 'class:approval-border', 'class:approval-cmd', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:approval-border', box_width)
|
||||
for i, choice in enumerate(choices):
|
||||
label = choice_labels.get(choice, choice)
|
||||
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
|
||||
style = 'class:approval-selected' if i == selected else 'class:approval-choice'
|
||||
prefix = '❯ ' if i == selected else ' '
|
||||
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
|
||||
@@ -7465,6 +7514,97 @@ class HermesCLI:
|
||||
lines.append(('class:approval-border', '╰' + ('─' * box_width) + '╯\n'))
|
||||
return lines
|
||||
|
||||
def _get_clarify_display_fragments(self):
|
||||
"""Render the clarify panel for the prompt_toolkit UI."""
|
||||
state = self._clarify_state
|
||||
if not state:
|
||||
return []
|
||||
|
||||
def _panel_box_width(title: str, content_lines: list[str], min_width: int = 46, max_width: int = 76) -> int:
|
||||
term_cols = shutil.get_terminal_size((100, 20)).columns
|
||||
longest = max([len(title)] + [len(line) for line in content_lines] + [min_width - 4])
|
||||
inner = min(max(longest + 4, min_width - 2), max_width - 2, max(24, term_cols - 6))
|
||||
return inner + 2
|
||||
|
||||
def _wrap_panel_text(text: str, width: int, subsequent_indent: str = "") -> list[str]:
|
||||
wrapped = textwrap.wrap(
|
||||
text,
|
||||
width=max(8, width),
|
||||
break_long_words=False,
|
||||
break_on_hyphens=False,
|
||||
subsequent_indent=subsequent_indent,
|
||||
)
|
||||
return wrapped or [""]
|
||||
|
||||
def _append_panel_line(lines, border_style: str, content_style: str, text: str, box_width: int) -> None:
|
||||
inner_width = max(0, box_width - 2)
|
||||
lines.append((border_style, "│ "))
|
||||
lines.append((content_style, text.ljust(inner_width)))
|
||||
lines.append((border_style, " │\n"))
|
||||
|
||||
def _append_blank_panel_line(lines, border_style: str, box_width: int) -> None:
|
||||
lines.append((border_style, "│" + (" " * box_width) + "│\n"))
|
||||
|
||||
question = state["question"]
|
||||
choices = state.get("choices") or []
|
||||
selected = state.get("selected", 0)
|
||||
preview_lines = _wrap_panel_text(question, 60)
|
||||
for i, choice in enumerate(choices):
|
||||
prefix = "❯ " if i == selected and not self._clarify_freetext else " "
|
||||
label = f"{i + 1}. {choice}"
|
||||
preview_lines.extend(_wrap_panel_text(f"{prefix}{label}", 60, subsequent_indent=" "))
|
||||
other_number = len(choices) + 1
|
||||
other_label = (
|
||||
f"❯ {other_number}. Other (type below)" if self._clarify_freetext
|
||||
else f"❯ {other_number}. Other (type your answer)" if selected == len(choices)
|
||||
else f" {other_number}. Other (type your answer)"
|
||||
)
|
||||
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
|
||||
box_width = _panel_box_width("Hermes needs your input", preview_lines)
|
||||
inner_text_width = max(8, box_width - 2)
|
||||
|
||||
lines = []
|
||||
lines.append(('class:clarify-border', '╭─ '))
|
||||
lines.append(('class:clarify-title', 'Hermes needs your input'))
|
||||
lines.append(('class:clarify-border', ' ' + ('─' * max(0, box_width - len("Hermes needs your input") - 3)) + '╮\n'))
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
for wrapped in _wrap_panel_text(question, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if self._clarify_freetext and not choices:
|
||||
guidance = "Type your answer in the prompt below, then press Enter."
|
||||
for wrapped in _wrap_panel_text(guidance, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if choices:
|
||||
for i, choice in enumerate(choices):
|
||||
style = 'class:clarify-selected' if i == selected and not self._clarify_freetext else 'class:clarify-choice'
|
||||
prefix = '❯ ' if i == selected and not self._clarify_freetext else ' '
|
||||
label = f"{i + 1}. {choice}"
|
||||
wrapped_lines = _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" ")
|
||||
for wrapped in wrapped_lines:
|
||||
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
|
||||
|
||||
other_idx = len(choices)
|
||||
if selected == other_idx and not self._clarify_freetext:
|
||||
other_style = 'class:clarify-selected'
|
||||
other_label = f'❯ {other_number}. Other (type your answer)'
|
||||
elif self._clarify_freetext:
|
||||
other_style = 'class:clarify-active-other'
|
||||
other_label = f'❯ {other_number}. Other (type below)'
|
||||
else:
|
||||
other_style = 'class:clarify-choice'
|
||||
other_label = f' {other_number}. Other (type your answer)'
|
||||
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
|
||||
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
|
||||
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
lines.append(('class:clarify-border', '╰' + ('─' * box_width) + '╯\n'))
|
||||
return lines
|
||||
|
||||
def _secret_capture_callback(self, var_name: str, prompt: str, metadata=None) -> dict:
|
||||
return prompt_for_secret(self, var_name, prompt, metadata)
|
||||
|
||||
@@ -8371,17 +8511,8 @@ class HermesCLI:
|
||||
|
||||
# --- Clarify choice mode: confirm the highlighted selection ---
|
||||
if self._clarify_state and not self._clarify_freetext:
|
||||
state = self._clarify_state
|
||||
selected = state["selected"]
|
||||
choices = state.get("choices") or []
|
||||
if selected < len(choices):
|
||||
state["response_queue"].put(choices[selected])
|
||||
self._clarify_state = None
|
||||
event.app.invalidate()
|
||||
else:
|
||||
# "Other" selected → switch to freetext
|
||||
self._clarify_freetext = True
|
||||
event.app.invalidate()
|
||||
self._handle_clarify_selection()
|
||||
event.app.invalidate()
|
||||
return
|
||||
|
||||
# --- Normal input routing ---
|
||||
@@ -8501,6 +8632,19 @@ class HermesCLI:
|
||||
self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1)
|
||||
event.app.invalidate()
|
||||
|
||||
# --- Numbered shortcuts for clarify / approval modal prompts ---
|
||||
for _digit in '123456789':
|
||||
@kb.add(_digit, filter=Condition(lambda: bool(self._approval_state) or (bool(self._clarify_state) and not self._clarify_freetext)))
|
||||
def _handle_modal_number(event, digit=_digit):
|
||||
number = int(digit)
|
||||
handled = False
|
||||
if self._approval_state:
|
||||
handled = self._handle_approval_number_shortcut(number)
|
||||
elif self._clarify_state and not self._clarify_freetext:
|
||||
handled = self._handle_clarify_number_shortcut(number)
|
||||
if handled:
|
||||
event.app.invalidate()
|
||||
|
||||
# --- /model picker: arrow-key navigation ---
|
||||
@kb.add('up', filter=Condition(lambda: bool(self._model_picker_state)))
|
||||
def model_picker_up(event):
|
||||
@@ -8995,7 +9139,7 @@ class HermesCLI:
|
||||
if cli_ref._approval_state:
|
||||
remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic()))
|
||||
return [
|
||||
('class:hint', ' ↑/↓ to select, Enter to confirm'),
|
||||
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
|
||||
('class:clarify-countdown', f' ({remaining}s)'),
|
||||
]
|
||||
|
||||
@@ -9008,7 +9152,7 @@ class HermesCLI:
|
||||
('class:clarify-countdown', countdown),
|
||||
]
|
||||
return [
|
||||
('class:hint', ' ↑/↓ to select, Enter to confirm'),
|
||||
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
|
||||
('class:clarify-countdown', countdown),
|
||||
]
|
||||
|
||||
@@ -9086,71 +9230,7 @@ class HermesCLI:
|
||||
lines.append((border_style, "│" + (" " * box_width) + "│\n"))
|
||||
|
||||
def _get_clarify_display():
|
||||
"""Build styled text for the clarify question/choices panel."""
|
||||
state = cli_ref._clarify_state
|
||||
if not state:
|
||||
return []
|
||||
|
||||
question = state["question"]
|
||||
choices = state.get("choices") or []
|
||||
selected = state.get("selected", 0)
|
||||
preview_lines = _wrap_panel_text(question, 60)
|
||||
for i, choice in enumerate(choices):
|
||||
prefix = "❯ " if i == selected and not cli_ref._clarify_freetext else " "
|
||||
preview_lines.extend(_wrap_panel_text(f"{prefix}{choice}", 60, subsequent_indent=" "))
|
||||
other_label = (
|
||||
"❯ Other (type below)" if cli_ref._clarify_freetext
|
||||
else "❯ Other (type your answer)" if selected == len(choices)
|
||||
else " Other (type your answer)"
|
||||
)
|
||||
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
|
||||
box_width = _panel_box_width("Hermes needs your input", preview_lines)
|
||||
inner_text_width = max(8, box_width - 2)
|
||||
|
||||
lines = []
|
||||
# Box top border
|
||||
lines.append(('class:clarify-border', '╭─ '))
|
||||
lines.append(('class:clarify-title', 'Hermes needs your input'))
|
||||
lines.append(('class:clarify-border', ' ' + ('─' * max(0, box_width - len("Hermes needs your input") - 3)) + '╮\n'))
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
# Question text
|
||||
for wrapped in _wrap_panel_text(question, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if cli_ref._clarify_freetext and not choices:
|
||||
guidance = "Type your answer in the prompt below, then press Enter."
|
||||
for wrapped in _wrap_panel_text(guidance, inner_text_width):
|
||||
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
|
||||
if choices:
|
||||
# Multiple-choice mode: show selectable options
|
||||
for i, choice in enumerate(choices):
|
||||
style = 'class:clarify-selected' if i == selected and not cli_ref._clarify_freetext else 'class:clarify-choice'
|
||||
prefix = '❯ ' if i == selected and not cli_ref._clarify_freetext else ' '
|
||||
wrapped_lines = _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" ")
|
||||
for wrapped in wrapped_lines:
|
||||
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
|
||||
|
||||
# "Other" option (5th line, only shown when choices exist)
|
||||
other_idx = len(choices)
|
||||
if selected == other_idx and not cli_ref._clarify_freetext:
|
||||
other_style = 'class:clarify-selected'
|
||||
other_label = '❯ Other (type your answer)'
|
||||
elif cli_ref._clarify_freetext:
|
||||
other_style = 'class:clarify-active-other'
|
||||
other_label = '❯ Other (type below)'
|
||||
else:
|
||||
other_style = 'class:clarify-choice'
|
||||
other_label = ' Other (type your answer)'
|
||||
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
|
||||
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
|
||||
|
||||
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
|
||||
lines.append(('class:clarify-border', '╰' + ('─' * box_width) + '╯\n'))
|
||||
return lines
|
||||
return cli_ref._get_clarify_display_fragments()
|
||||
|
||||
clarify_widget = ConditionalContainer(
|
||||
Window(
|
||||
|
||||
152
run_agent.py
152
run_agent.py
@@ -20,7 +20,6 @@ Usage:
|
||||
response = agent.run_conversation("Tell me about the latest Python updates")
|
||||
"""
|
||||
|
||||
import ast
|
||||
import asyncio
|
||||
import base64
|
||||
import concurrent.futures
|
||||
@@ -3329,119 +3328,6 @@ class AIAgent:
|
||||
|
||||
_VALID_API_ROLES = frozenset({"system", "user", "assistant", "tool", "function", "developer"})
|
||||
|
||||
@staticmethod
|
||||
def _normalize_tool_call_arguments(arguments: Any) -> tuple[str, bool]:
|
||||
"""Return ``(normalized_text, is_complete)`` for tool-call arguments.
|
||||
|
||||
Conservative by design: repairs harmless formatting quirks common in
|
||||
Gemma 4 / Ollama output (whitespace, trailing commas, Python-style
|
||||
single-quoted dicts, bare key/value pairs) but does NOT auto-close
|
||||
truncated JSON objects. Truly incomplete fragments must remain marked
|
||||
incomplete so the agent can retry instead of silently dropping fields.
|
||||
"""
|
||||
if isinstance(arguments, (dict, list)):
|
||||
return json.dumps(arguments, ensure_ascii=False, separators=(",", ":")), True
|
||||
if arguments is None:
|
||||
return "{}", True
|
||||
if not isinstance(arguments, str):
|
||||
arguments = str(arguments)
|
||||
|
||||
text = arguments.strip()
|
||||
if not text:
|
||||
return "{}", True
|
||||
|
||||
def _parse_candidate(candidate: str):
|
||||
try:
|
||||
return json.loads(candidate)
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
pass
|
||||
try:
|
||||
return ast.literal_eval(candidate)
|
||||
except (SyntaxError, ValueError):
|
||||
return None
|
||||
|
||||
candidates: list[str] = [text]
|
||||
|
||||
trimmed_trailing_commas = re.sub(r",\s*([}\]])", r"\1", text)
|
||||
if trimmed_trailing_commas != text:
|
||||
candidates.append(trimmed_trailing_commas)
|
||||
|
||||
if ":" in text and not text.startswith(("{", "[")):
|
||||
wrapped = "{" + text + "}"
|
||||
candidates.append(wrapped)
|
||||
quoted_keys = re.sub(
|
||||
r'([\{,]\s*)([A-Za-z_][A-Za-z0-9_\-]*)(\s*:)',
|
||||
r'\1"\2"\3',
|
||||
wrapped,
|
||||
)
|
||||
if quoted_keys != wrapped:
|
||||
candidates.append(quoted_keys)
|
||||
trimmed_quoted_keys = re.sub(r",\s*([}\]])", r"\1", quoted_keys)
|
||||
if trimmed_quoted_keys != quoted_keys:
|
||||
candidates.append(trimmed_quoted_keys)
|
||||
|
||||
seen: set[str] = set()
|
||||
for candidate in candidates:
|
||||
if candidate in seen:
|
||||
continue
|
||||
seen.add(candidate)
|
||||
parsed = _parse_candidate(candidate)
|
||||
if isinstance(parsed, (dict, list)):
|
||||
return json.dumps(parsed, ensure_ascii=False, separators=(",", ":")), True
|
||||
|
||||
return text, False
|
||||
|
||||
@staticmethod
|
||||
def _merge_consecutive_assistant_tool_call_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Merge adjacent assistant messages that each carry tool_calls.
|
||||
|
||||
Some providers emit parallel tool calls as multiple consecutive assistant
|
||||
messages instead of a single assistant message with multiple tool calls.
|
||||
Merge only adjacent assistant/tool-call messages; any non-assistant
|
||||
boundary flushes the current batch.
|
||||
"""
|
||||
merged: List[Dict[str, Any]] = []
|
||||
pending: Optional[Dict[str, Any]] = None
|
||||
|
||||
def _flush_pending() -> None:
|
||||
nonlocal pending
|
||||
if pending is not None:
|
||||
merged.append(pending)
|
||||
pending = None
|
||||
|
||||
for msg in messages:
|
||||
if not isinstance(msg, dict):
|
||||
_flush_pending()
|
||||
merged.append(msg)
|
||||
continue
|
||||
|
||||
role = msg.get("role")
|
||||
tool_calls = msg.get("tool_calls")
|
||||
if role == "assistant" and isinstance(tool_calls, list) and tool_calls:
|
||||
if pending is None:
|
||||
pending = copy.deepcopy(msg)
|
||||
continue
|
||||
|
||||
pending_tool_calls = pending.get("tool_calls")
|
||||
if not isinstance(pending_tool_calls, list):
|
||||
pending_tool_calls = []
|
||||
pending["tool_calls"] = pending_tool_calls
|
||||
pending_tool_calls.extend(copy.deepcopy(tool_calls))
|
||||
|
||||
pending_content = pending.get("content") or ""
|
||||
current_content = msg.get("content") or ""
|
||||
if pending_content and current_content:
|
||||
pending["content"] = pending_content + "\n" + current_content
|
||||
elif current_content:
|
||||
pending["content"] = current_content
|
||||
continue
|
||||
|
||||
_flush_pending()
|
||||
merged.append(msg)
|
||||
|
||||
_flush_pending()
|
||||
return merged
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_api_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Fix orphaned tool_call / tool_result pairs before every LLM call.
|
||||
@@ -3461,7 +3347,7 @@ class AIAgent:
|
||||
)
|
||||
continue
|
||||
filtered.append(msg)
|
||||
messages = AIAgent._merge_consecutive_assistant_tool_call_messages(filtered)
|
||||
messages = filtered
|
||||
|
||||
surviving_call_ids: set = set()
|
||||
for msg in messages:
|
||||
@@ -5368,9 +5254,12 @@ class AIAgent:
|
||||
mock_tool_calls = []
|
||||
for idx in sorted(tool_calls_acc):
|
||||
tc = tool_calls_acc[idx]
|
||||
arguments, is_complete = self._normalize_tool_call_arguments(tc["function"]["arguments"])
|
||||
if not is_complete:
|
||||
has_truncated_tool_args = True
|
||||
arguments = tc["function"]["arguments"]
|
||||
if arguments and arguments.strip():
|
||||
try:
|
||||
json.loads(arguments)
|
||||
except json.JSONDecodeError:
|
||||
has_truncated_tool_args = True
|
||||
mock_tool_calls.append(SimpleNamespace(
|
||||
id=tc["id"],
|
||||
type=tc["type"],
|
||||
@@ -6674,7 +6563,6 @@ class AIAgent:
|
||||
response_item_id if isinstance(response_item_id, str) else None,
|
||||
)
|
||||
|
||||
normalized_args, _ = self._normalize_tool_call_arguments(tool_call.function.arguments)
|
||||
tc_dict = {
|
||||
"id": call_id,
|
||||
"call_id": call_id,
|
||||
@@ -6682,7 +6570,7 @@ class AIAgent:
|
||||
"type": tool_call.type,
|
||||
"function": {
|
||||
"name": tool_call.function.name,
|
||||
"arguments": normalized_args,
|
||||
"arguments": tool_call.function.arguments
|
||||
},
|
||||
}
|
||||
# Preserve extra_content (e.g. Gemini thought_signature) so it
|
||||
@@ -10143,15 +10031,21 @@ class AIAgent:
|
||||
# Handle empty strings as empty objects (common model quirk)
|
||||
invalid_json_args = []
|
||||
for tc in assistant_message.tool_calls:
|
||||
normalized_args, is_complete = self._normalize_tool_call_arguments(tc.function.arguments)
|
||||
tc.function.arguments = normalized_args
|
||||
if not is_complete:
|
||||
try:
|
||||
json.loads(normalized_args)
|
||||
except json.JSONDecodeError as e:
|
||||
invalid_json_args.append((tc.function.name, str(e)))
|
||||
except Exception as e:
|
||||
invalid_json_args.append((tc.function.name, str(e)))
|
||||
args = tc.function.arguments
|
||||
if isinstance(args, (dict, list)):
|
||||
tc.function.arguments = json.dumps(args)
|
||||
continue
|
||||
if args is not None and not isinstance(args, str):
|
||||
tc.function.arguments = str(args)
|
||||
args = tc.function.arguments
|
||||
# Treat empty/whitespace strings as empty object
|
||||
if not args or not args.strip():
|
||||
tc.function.arguments = "{}"
|
||||
continue
|
||||
try:
|
||||
json.loads(args)
|
||||
except json.JSONDecodeError as e:
|
||||
invalid_json_args.append((tc.function.name, str(e)))
|
||||
|
||||
if invalid_json_args:
|
||||
# Check if the invalid JSON is due to truncation rather
|
||||
|
||||
172
tests/cli/test_cli_numbered_prompt_shortcuts.py
Normal file
172
tests/cli/test_cli_numbered_prompt_shortcuts.py
Normal file
@@ -0,0 +1,172 @@
|
||||
import queue
|
||||
import threading
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from cli import HermesCLI
|
||||
|
||||
|
||||
class _FakeBuffer:
|
||||
def __init__(self, text="", cursor_position=None):
|
||||
self.text = text
|
||||
self.cursor_position = len(text) if cursor_position is None else cursor_position
|
||||
|
||||
def reset(self, append_to_history=False):
|
||||
self.text = ""
|
||||
self.cursor_position = 0
|
||||
|
||||
|
||||
def _make_cli_stub():
|
||||
cli = HermesCLI.__new__(HermesCLI)
|
||||
cli._approval_state = None
|
||||
cli._approval_deadline = 0
|
||||
cli._approval_lock = threading.Lock()
|
||||
cli._clarify_state = None
|
||||
cli._clarify_freetext = False
|
||||
cli._clarify_deadline = 0
|
||||
cli._sudo_state = None
|
||||
cli._sudo_deadline = 0
|
||||
cli._secret_state = None
|
||||
cli._secret_deadline = 0
|
||||
cli._modal_input_snapshot = None
|
||||
cli._invalidate = MagicMock()
|
||||
cli._app = SimpleNamespace(invalidate=MagicMock(), current_buffer=_FakeBuffer())
|
||||
return cli
|
||||
|
||||
|
||||
def test_approval_display_numbers_choices():
|
||||
cli = _make_cli_stub()
|
||||
cli._approval_state = {
|
||||
"command": "sudo rm -rf /tmp/example",
|
||||
"description": "dangerous command",
|
||||
"choices": ["once", "session", "always", "deny"],
|
||||
"selected": 0,
|
||||
"response_queue": queue.Queue(),
|
||||
}
|
||||
|
||||
rendered = "".join(text for _style, text in cli._get_approval_display_fragments())
|
||||
|
||||
assert "❯ 1. Allow once" in rendered
|
||||
assert "2. Allow for this session" in rendered
|
||||
assert "3. Add to permanent allowlist" in rendered
|
||||
assert "4. Deny" in rendered
|
||||
|
||||
|
||||
def test_approval_number_shortcut_submits_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._approval_state = {
|
||||
"command": "sudo rm -rf /tmp/example",
|
||||
"description": "dangerous command",
|
||||
"choices": ["once", "session", "always", "deny"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_approval_number_shortcut(2) is True
|
||||
assert response_queue.get_nowait() == "session"
|
||||
assert cli._approval_state is None
|
||||
|
||||
|
||||
def test_approval_selection_still_submits_selected_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._approval_state = {
|
||||
"command": "sudo rm -rf /tmp/example",
|
||||
"description": "dangerous command",
|
||||
"choices": ["once", "session", "always", "deny"],
|
||||
"selected": 1,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
cli._handle_approval_selection()
|
||||
|
||||
assert response_queue.get_nowait() == "session"
|
||||
assert cli._approval_state is None
|
||||
|
||||
|
||||
def test_approval_number_shortcut_handles_view_in_place():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._approval_state = {
|
||||
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
|
||||
"description": "disk copy",
|
||||
"choices": ["once", "session", "always", "deny", "view"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_approval_number_shortcut(5) is True
|
||||
assert cli._approval_state is not None
|
||||
assert cli._approval_state["show_full"] is True
|
||||
assert "view" not in cli._approval_state["choices"]
|
||||
assert cli._approval_state["selected"] == 3
|
||||
assert response_queue.empty()
|
||||
|
||||
|
||||
def test_clarify_display_numbers_choices_and_other():
|
||||
cli = _make_cli_stub()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma", "Delta"],
|
||||
"selected": 1,
|
||||
"response_queue": queue.Queue(),
|
||||
}
|
||||
|
||||
rendered = "".join(text for _style, text in cli._get_clarify_display_fragments())
|
||||
|
||||
assert "1. Alpha" in rendered
|
||||
assert "❯ 2. Beta" in rendered
|
||||
assert "3. Gamma" in rendered
|
||||
assert "4. Delta" in rendered
|
||||
assert "5. Other (type your answer)" in rendered
|
||||
|
||||
|
||||
def test_clarify_number_shortcut_submits_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_clarify_number_shortcut(3) is True
|
||||
assert response_queue.get_nowait() == "Gamma"
|
||||
assert cli._clarify_state is None
|
||||
assert cli._clarify_freetext is False
|
||||
|
||||
|
||||
def test_clarify_selection_still_submits_selected_choice():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma"],
|
||||
"selected": 1,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
cli._handle_clarify_selection()
|
||||
|
||||
assert response_queue.get_nowait() == "Beta"
|
||||
assert cli._clarify_state is None
|
||||
assert cli._clarify_freetext is False
|
||||
|
||||
|
||||
def test_clarify_number_shortcut_activates_other_freetext():
|
||||
cli = _make_cli_stub()
|
||||
response_queue = queue.Queue()
|
||||
cli._clarify_state = {
|
||||
"question": "Pick the best option",
|
||||
"choices": ["Alpha", "Beta", "Gamma"],
|
||||
"selected": 0,
|
||||
"response_queue": response_queue,
|
||||
}
|
||||
|
||||
assert cli._handle_clarify_number_shortcut(4) is True
|
||||
assert cli._clarify_state is not None
|
||||
assert cli._clarify_state["selected"] == 3
|
||||
assert cli._clarify_freetext is True
|
||||
assert response_queue.empty()
|
||||
@@ -1037,138 +1037,6 @@ class TestBuildAssistantMessage:
|
||||
result = agent._build_assistant_message(msg, "tool_calls")
|
||||
assert "extra_content" not in result["tool_calls"][0]
|
||||
|
||||
def test_tool_call_arguments_normalized_from_gemma4_whitespace(self, agent):
|
||||
tc = _mock_tool_call(
|
||||
name="read_file",
|
||||
arguments=' \n {"path": "README.md"} \n ',
|
||||
call_id="c4",
|
||||
)
|
||||
msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
||||
result = agent._build_assistant_message(msg, "tool_calls")
|
||||
assert result["tool_calls"][0]["function"]["arguments"] == '{"path":"README.md"}'
|
||||
|
||||
def test_tool_call_arguments_normalized_from_single_quotes_and_trailing_comma(self, agent):
|
||||
tc = _mock_tool_call(
|
||||
name="read_file",
|
||||
arguments="{'path': 'README.md',}",
|
||||
call_id="c5",
|
||||
)
|
||||
msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
||||
result = agent._build_assistant_message(msg, "tool_calls")
|
||||
assert result["tool_calls"][0]["function"]["arguments"] == '{"path":"README.md"}'
|
||||
|
||||
|
||||
class TestNormalizeToolCallArguments:
|
||||
@pytest.mark.parametrize(
|
||||
("raw_args", "expected"),
|
||||
[
|
||||
('{"q":"test"}', '{"q":"test"}'),
|
||||
(' \n {"q": "test"} \n ', '{"q":"test"}'),
|
||||
('{"q": "test",}', '{"q":"test"}'),
|
||||
("{'q': 'test'}", '{"q":"test"}'),
|
||||
("{'path': 'README.md', 'mode': 'read'}", '{"path":"README.md","mode":"read"}'),
|
||||
('"path": "README.md"', '{"path":"README.md"}'),
|
||||
('path: "README.md"', '{"path":"README.md"}'),
|
||||
('path: "README.md", mode: "read"', '{"path":"README.md","mode":"read"}'),
|
||||
({"path": "README.md"}, '{"path":"README.md"}'),
|
||||
(["README.md", "docs.md"], '["README.md","docs.md"]'),
|
||||
('\t\n ', '{}'),
|
||||
('{"nested": {"path": "README.md"}}', '{"nested":{"path":"README.md"}}'),
|
||||
],
|
||||
)
|
||||
def test_complete_args_are_normalized(self, raw_args, expected):
|
||||
normalized, is_complete = AIAgent._normalize_tool_call_arguments(raw_args)
|
||||
assert is_complete is True
|
||||
assert normalized == expected
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"raw_args",
|
||||
[
|
||||
'{"path": "README.md"',
|
||||
'{"a": 1, "b"',
|
||||
'{"path": [1, 2}',
|
||||
"{'path': 'README.md'",
|
||||
'path: "README.md", mode:',
|
||||
'{"command": "echo hello",',
|
||||
],
|
||||
)
|
||||
def test_incomplete_args_are_not_marked_complete(self, raw_args):
|
||||
normalized, is_complete = AIAgent._normalize_tool_call_arguments(raw_args)
|
||||
assert is_complete is False
|
||||
assert isinstance(normalized, str)
|
||||
assert normalized == raw_args.strip()
|
||||
|
||||
|
||||
class TestSanitizeApiMessages:
|
||||
def test_merges_consecutive_assistant_tool_call_messages(self):
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "first",
|
||||
"tool_calls": [{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"a.py"}'}}],
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "second",
|
||||
"tool_calls": [{"id": "c2", "type": "function", "function": {"name": "search_files", "arguments": '{"pattern":"TODO"}'}}],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "a.py"},
|
||||
{"role": "tool", "tool_call_id": "c2", "content": "matches"},
|
||||
]
|
||||
|
||||
sanitized = AIAgent._sanitize_api_messages(messages)
|
||||
|
||||
assert len(sanitized) == 3
|
||||
assert sanitized[0]["role"] == "assistant"
|
||||
assert [tc["id"] for tc in sanitized[0]["tool_calls"]] == ["c1", "c2"]
|
||||
assert sanitized[0]["content"] == "first\nsecond"
|
||||
|
||||
def test_does_not_merge_assistant_tool_call_messages_across_non_assistant_boundary(self):
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"a.py"}'}}],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c1", "content": "a.py"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [{"id": "c2", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"b.py"}'}}],
|
||||
},
|
||||
{"role": "tool", "tool_call_id": "c2", "content": "b.py"},
|
||||
]
|
||||
|
||||
sanitized = AIAgent._sanitize_api_messages(messages)
|
||||
|
||||
assistant_msgs = [m for m in sanitized if m.get("role") == "assistant"]
|
||||
assert len(assistant_msgs) == 2
|
||||
assert assistant_msgs[0]["tool_calls"][0]["id"] == "c1"
|
||||
assert assistant_msgs[1]["tool_calls"][0]["id"] == "c2"
|
||||
|
||||
def test_merge_preserves_tool_call_order(self):
|
||||
messages = [
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"a.py"}'}}],
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [{"id": "c2", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"b.py"}'}}],
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": "",
|
||||
"tool_calls": [{"id": "c3", "type": "function", "function": {"name": "read_file", "arguments": '{"path":"c.py"}'}}],
|
||||
},
|
||||
]
|
||||
|
||||
sanitized = AIAgent._sanitize_api_messages(messages)
|
||||
|
||||
assert [tc["id"] for tc in sanitized[0]["tool_calls"]] == ["c1", "c2", "c3"]
|
||||
|
||||
|
||||
class TestFormatToolsForSystemMessage:
|
||||
def test_no_tools_returns_empty_array(self, agent):
|
||||
@@ -3599,59 +3467,6 @@ class TestStreamingApiCall:
|
||||
assert tc[0].function.arguments == '{"path":"x.txt","content":"hel'
|
||||
assert resp.choices[0].finish_reason == "length"
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("raw_arguments", "expected"),
|
||||
[
|
||||
(' \n {"path": "x.txt"} \n ', '{"path":"x.txt"}'),
|
||||
("{'path': 'x.txt',}", '{"path":"x.txt"}'),
|
||||
('path: "x.txt", mode: "read"', '{"path":"x.txt","mode":"read"}'),
|
||||
],
|
||||
)
|
||||
def test_repairable_tool_call_args_do_not_upgrade_finish_reason_to_length(self, agent, raw_arguments, expected):
|
||||
chunks = [
|
||||
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "read_file", raw_arguments)]),
|
||||
_make_chunk(finish_reason="tool_calls"),
|
||||
]
|
||||
agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
resp = agent._interruptible_streaming_api_call({"messages": []})
|
||||
|
||||
tc = resp.choices[0].message.tool_calls
|
||||
assert len(tc) == 1
|
||||
assert tc[0].function.name == "read_file"
|
||||
assert tc[0].function.arguments == expected
|
||||
assert resp.choices[0].finish_reason == "tool_calls"
|
||||
|
||||
def test_streamed_tool_call_args_single_quotes_across_chunks_normalized(self, agent):
|
||||
chunks = [
|
||||
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "read_file", "{'path':")]),
|
||||
_make_chunk(tool_calls=[_make_tc_delta(0, None, None, " 'x.txt',}")]),
|
||||
_make_chunk(finish_reason="tool_calls"),
|
||||
]
|
||||
agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
resp = agent._interruptible_streaming_api_call({"messages": []})
|
||||
|
||||
tc = resp.choices[0].message.tool_calls
|
||||
assert len(tc) == 1
|
||||
assert tc[0].function.arguments == '{"path":"x.txt"}'
|
||||
assert resp.choices[0].finish_reason == "tool_calls"
|
||||
|
||||
def test_streamed_split_json_chunks_still_reassemble(self, agent):
|
||||
chunks = [
|
||||
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "read_file", '{"path":')]),
|
||||
_make_chunk(tool_calls=[_make_tc_delta(0, None, None, ' "x.txt"}')]),
|
||||
_make_chunk(finish_reason="tool_calls"),
|
||||
]
|
||||
agent.client.chat.completions.create.return_value = iter(chunks)
|
||||
|
||||
resp = agent._interruptible_streaming_api_call({"messages": []})
|
||||
|
||||
tc = resp.choices[0].message.tool_calls
|
||||
assert len(tc) == 1
|
||||
assert tc[0].function.arguments == '{"path":"x.txt"}'
|
||||
assert resp.choices[0].finish_reason == "tool_calls"
|
||||
|
||||
def test_ollama_reused_index_separate_tool_calls(self, agent):
|
||||
"""Ollama sends every tool call at index 0 with different ids.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user