feat: implement interactive prompts for sudo password and command approval in CLI

- Added methods for handling sudo password and dangerous command approval prompts using a callback mechanism in cli.py.
- Integrated these prompts with the prompt_toolkit UI for improved user experience.
- Updated terminal_tool.py to support callback registration for interactive prompts, enhancing the CLI's interactivity.
- Introduced a background thread for API calls in run_agent.py to allow for interrupt handling during long-running operations.
- Enhanced error handling for interrupted API calls, ensuring graceful degradation of user experience.
This commit is contained in:
teknium1
2026-02-21 12:15:40 -08:00
parent ecb430effe
commit c98ee98525
3 changed files with 409 additions and 39 deletions

357
cli.py
View File

@@ -281,6 +281,7 @@ from cron import create_job, list_jobs, remove_job, get_job, run_daemon as run_c
# Resource cleanup imports for safe shutdown (terminal VMs, browser sessions)
from tools.terminal_tool import cleanup_all_environments as _cleanup_all_terminals
from tools.terminal_tool import set_sudo_password_callback, set_approval_callback
from tools.browser_tool import _emergency_cleanup_all_sessions as _cleanup_all_browsers
# Guard to prevent cleanup from running multiple times on exit
@@ -1582,6 +1583,100 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
Called from the agent thread when a sudo command is encountered.
Uses the same clarify-style mechanism: sets UI state, waits on a
queue for the user's response via the Enter key binding.
"""
import time as _time
timeout = 45
response_queue = queue.Queue()
self._sudo_state = {
"response_queue": response_queue,
}
self._sudo_deadline = _time.monotonic() + timeout
if hasattr(self, '_app') and self._app:
self._app.invalidate()
while True:
try:
result = response_queue.get(timeout=1)
self._sudo_state = None
self._sudo_deadline = 0
if hasattr(self, '_app') and self._app:
self._app.invalidate()
if result:
_cprint(f"\n{_DIM} ✓ Password received (cached for session){_RST}")
else:
_cprint(f"\n{_DIM} ⏭ Skipped{_RST}")
return result
except queue.Empty:
remaining = self._sudo_deadline - _time.monotonic()
if remaining <= 0:
break
if hasattr(self, '_app') and self._app:
self._app.invalidate()
self._sudo_state = None
self._sudo_deadline = 0
if hasattr(self, '_app') and self._app:
self._app.invalidate()
_cprint(f"\n{_DIM} ⏱ Timeout — continuing without sudo{_RST}")
return ""
def _approval_callback(self, command: str, description: str) -> str:
"""
Prompt for dangerous command approval through the prompt_toolkit UI.
Called from the agent thread. Shows a selection UI similar to clarify
with choices: once / session / always / deny.
"""
import time as _time
timeout = 60
response_queue = queue.Queue()
choices = ["once", "session", "always", "deny"]
self._approval_state = {
"command": command,
"description": description,
"choices": choices,
"selected": 0,
"response_queue": response_queue,
}
self._approval_deadline = _time.monotonic() + timeout
if hasattr(self, '_app') and self._app:
self._app.invalidate()
while True:
try:
result = response_queue.get(timeout=1)
self._approval_state = None
self._approval_deadline = 0
if hasattr(self, '_app') and self._app:
self._app.invalidate()
return result
except queue.Empty:
remaining = self._approval_deadline - _time.monotonic()
if remaining <= 0:
break
if hasattr(self, '_app') and self._app:
self._app.invalidate()
self._approval_state = None
self._approval_deadline = 0
if hasattr(self, '_app') and self._app:
self._app.invalidate()
_cprint(f"\n{_DIM} ⏱ Timeout — denying command{_RST}")
return "deny"
def chat(self, message: str) -> Optional[str]:
"""
Send a message to the agent and get a response.
@@ -1724,6 +1819,18 @@ class HermesCLI:
self._clarify_state = None # dict with question, choices, selected, response_queue
self._clarify_freetext = False # True when user chose "Other" and is typing
self._clarify_deadline = 0 # monotonic timestamp when the clarify times out
# Sudo password prompt state (similar mechanism to clarify)
self._sudo_state = None # dict with response_queue when active
self._sudo_deadline = 0
# Dangerous command approval state (similar mechanism to clarify)
self._approval_state = None # dict with command, description, choices, selected, response_queue
self._approval_deadline = 0
# Register callbacks so terminal_tool prompts route through our UI
set_sudo_password_callback(self._sudo_password_callback)
set_approval_callback(self._approval_callback)
# Key bindings for the input area
kb = KeyBindings()
@@ -1732,7 +1839,9 @@ class HermesCLI:
def handle_enter(event):
"""Handle Enter key - submit input.
Routes to the correct queue based on agent state:
Routes to the correct queue based on active UI state:
- Sudo password prompt: password goes to sudo response queue
- Approval selection: selected choice goes to approval response queue
- Clarify freetext mode: answer goes to the clarify response queue
- Clarify choice mode: selected choice goes to the clarify response queue
- Agent running: goes to _interrupt_queue (chat() monitors this)
@@ -1740,6 +1849,26 @@ class HermesCLI:
Commands (starting with /) always go to _pending_input so they're
handled as commands, not sent as interrupt text to the agent.
"""
# --- Sudo password prompt: submit the typed password ---
if self._sudo_state:
text = event.app.current_buffer.text
self._sudo_state["response_queue"].put(text)
self._sudo_state = None
event.app.current_buffer.reset()
event.app.invalidate()
return
# --- Approval selection: confirm the highlighted choice ---
if self._approval_state:
state = self._approval_state
selected = state["selected"]
choices = state["choices"]
if 0 <= selected < len(choices):
state["response_queue"].put(choices[selected])
self._approval_state = None
event.app.invalidate()
return
# --- Clarify freetext mode: user typed their own answer ---
if self._clarify_freetext and self._clarify_state:
text = event.app.current_buffer.text.strip()
@@ -1802,31 +1931,71 @@ class HermesCLI:
max_idx = len(choices) # last index is the "Other" option
self._clarify_state["selected"] = min(max_idx, self._clarify_state["selected"] + 1)
event.app.invalidate()
# --- Dangerous command approval: arrow-key navigation ---
@kb.add('up', filter=Condition(lambda: bool(self._approval_state)))
def approval_up(event):
if self._approval_state:
self._approval_state["selected"] = max(0, self._approval_state["selected"] - 1)
event.app.invalidate()
@kb.add('down', filter=Condition(lambda: bool(self._approval_state)))
def approval_down(event):
if self._approval_state:
max_idx = len(self._approval_state["choices"]) - 1
self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1)
event.app.invalidate()
@kb.add('c-c')
def handle_ctrl_c(event):
"""Handle Ctrl+C - interrupt agent or force exit on double press.
"""Handle Ctrl+C - cancel interactive prompts, interrupt agent, or exit.
First Ctrl+C: interrupt the running agent gracefully.
Second Ctrl+C within 2 seconds (or when agent is idle): force exit.
Priority:
1. Cancel active sudo/approval/clarify prompt
2. Interrupt the running agent (first press)
3. Force exit (second press within 2s, or when idle)
"""
import time as _time
now = _time.time()
# Cancel sudo prompt
if self._sudo_state:
self._sudo_state["response_queue"].put("")
self._sudo_state = None
event.app.current_buffer.reset()
event.app.invalidate()
return
# Cancel approval prompt (deny)
if self._approval_state:
self._approval_state["response_queue"].put("deny")
self._approval_state = None
event.app.invalidate()
return
# Cancel clarify prompt
if self._clarify_state:
self._clarify_state["response_queue"].put(
"The user cancelled. Use your best judgement to proceed."
)
self._clarify_state = None
self._clarify_freetext = False
event.app.current_buffer.reset()
event.app.invalidate()
return
if self._agent_running and self.agent:
# Check for double Ctrl+C (second press within 2 seconds)
if now - self._last_ctrl_c_time < 2.0:
print("\n⚡ Force exiting...")
self._should_exit = True
event.app.exit()
return
# First Ctrl+C: try graceful interrupt
self._last_ctrl_c_time = now
print("\n⚡ Interrupting agent... (press Ctrl+C again to force exit)")
self.agent.interrupt()
else:
# Agent not running, exit immediately
self._should_exit = True
event.app.exit()
@@ -1841,6 +2010,10 @@ class HermesCLI:
cli_ref = self
def get_prompt():
if cli_ref._sudo_state:
return [('class:sudo-prompt', '🔐 ')]
if cli_ref._approval_state:
return [('class:prompt-working', ' ')]
if cli_ref._clarify_freetext:
return [('class:clarify-selected', ' ')]
if cli_ref._clarify_state:
@@ -1861,14 +2034,23 @@ class HermesCLI:
complete_while_typing=True,
)
# Dynamic height: return the exact line count so the TextArea is
# always exactly as tall as its content -- no extra blank space.
# The bottom rule sits directly below the last line of text and
# pushes down only when the user adds a newline.
# Dynamic height: accounts for both explicit newlines AND visual
# wrapping of long lines so the input area always fits its content.
# The prompt characters (" " etc.) consume ~4 columns.
def _input_height():
try:
lines = input_area.buffer.document.line_count
return min(max(lines, 1), 8)
doc = input_area.buffer.document
available_width = (cli_ref.console.width or 80) - 4 # subtract prompt width
if available_width < 10:
available_width = 40
visual_lines = 0
for line in doc.lines:
# Each logical line takes at least 1 visual row; long lines wrap
if len(line) == 0:
visual_lines += 1
else:
visual_lines += max(1, -(-len(line) // available_width)) # ceil division
return min(max(visual_lines, 1), 8)
except Exception:
return 1
@@ -1895,15 +2077,29 @@ class HermesCLI:
input_area.buffer.on_text_changed += _on_text_changed
# Hint line above input: shows placeholder when agent is working
# and the user hasn't typed anything yet. Disappears when idle
# or when the user starts typing.
# Hint line above input: context-sensitive instructions for the
# current UI state (sudo prompt, approval, clarify, interrupt).
def get_hint_text():
if not cli_ref._agent_running:
return []
# When clarify is active, show a different hint with countdown
import time as _time
# Sudo password prompt
if cli_ref._sudo_state:
remaining = max(0, int(cli_ref._sudo_deadline - _time.monotonic()))
return [
('class:hint', ' type password (hidden) and press Enter, or Enter to skip'),
('class:clarify-countdown', f' ({remaining}s)'),
]
# Dangerous command approval
if cli_ref._approval_state:
remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic()))
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', f' ({remaining}s)'),
]
# Clarify question
if cli_ref._clarify_state:
import time as _time
remaining = max(0, int(cli_ref._clarify_deadline - _time.monotonic()))
countdown = f' ({remaining}s)' if cli_ref._clarify_deadline else ''
if cli_ref._clarify_freetext:
@@ -1915,13 +2111,18 @@ class HermesCLI:
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', countdown),
]
if not cli_ref._agent_running:
return []
# Agent is running — show interrupt hint only when buffer is empty
buf = input_area.buffer
if buf.text:
return []
return [('class:hint', ' type here to interrupt')]
return [('class:hint', ' press Enter to send interrupt')]
return [('class:hint', ' type a message + Enter to interrupt, or Ctrl+C to cancel')]
def get_hint_height():
if cli_ref._clarify_state:
if cli_ref._sudo_state or cli_ref._approval_state or cli_ref._clarify_state:
return 1
return 1 if cli_ref._agent_running else 0
@@ -1987,7 +2188,84 @@ class HermesCLI:
),
filter=Condition(lambda: cli_ref._clarify_state is not None),
)
# --- Sudo password: display widget ---
def _get_sudo_display():
state = cli_ref._sudo_state
if not state:
return []
lines = []
lines.append(('class:sudo-border', '╭─ '))
lines.append(('class:sudo-title', '🔐 Sudo Password Required'))
lines.append(('class:sudo-border', ' ──────────────────────────╮\n'))
lines.append(('class:sudo-border', '\n'))
lines.append(('class:sudo-border', ''))
lines.append(('class:sudo-text', 'Enter password below (hidden), or press Enter to skip'))
lines.append(('', '\n'))
lines.append(('class:sudo-border', '\n'))
lines.append(('class:sudo-border', '╰──────────────────────────────────────────────────╯\n'))
return lines
sudo_widget = ConditionalContainer(
Window(
FormattedTextControl(_get_sudo_display),
wrap_lines=True,
),
filter=Condition(lambda: cli_ref._sudo_state is not None),
)
# --- Dangerous command approval: display widget ---
def _get_approval_display():
state = cli_ref._approval_state
if not state:
return []
command = state["command"]
description = state["description"]
choices = state["choices"]
selected = state.get("selected", 0)
cmd_display = command[:70] + '...' if len(command) > 70 else command
choice_labels = {
"once": "Allow once",
"session": "Allow for this session",
"always": "Add to permanent allowlist",
"deny": "Deny",
}
lines = []
lines.append(('class:approval-border', '╭─ '))
lines.append(('class:approval-title', '⚠️ Dangerous Command'))
lines.append(('class:approval-border', ' ───────────────────────────────╮\n'))
lines.append(('class:approval-border', '\n'))
lines.append(('class:approval-border', ''))
lines.append(('class:approval-desc', description))
lines.append(('', '\n'))
lines.append(('class:approval-border', ''))
lines.append(('class:approval-cmd', cmd_display))
lines.append(('', '\n'))
lines.append(('class:approval-border', '\n'))
for i, choice in enumerate(choices):
lines.append(('class:approval-border', ''))
label = choice_labels.get(choice, choice)
if i == selected:
lines.append(('class:approval-selected', f' {label}'))
else:
lines.append(('class:approval-choice', f' {label}'))
lines.append(('', '\n'))
lines.append(('class:approval-border', '\n'))
lines.append(('class:approval-border', '╰──────────────────────────────────────────────────────╯\n'))
return lines
approval_widget = ConditionalContainer(
Window(
FormattedTextControl(_get_approval_display),
wrap_lines=True,
),
filter=Condition(lambda: cli_ref._approval_state is not None),
)
# Horizontal rules above and below the input (bronze, 1 line each).
# The bottom rule moves down as the TextArea grows with newlines.
input_rule_top = Window(
@@ -1999,16 +2277,14 @@ class HermesCLI:
height=1,
)
# Layout: spacer + ruled input at bottom, completions below.
# Using inline CompletionsMenu (not a Float) so it reliably appears even
# after agent output has filled the terminal via patch_stdout. Float-based
# menus lose their rendering space in non-full-screen mode once scrollback
# pushes the app area to the very bottom of the terminal.
# The clarify_widget appears above the input area when the agent
# asks a multiple-choice or open-ended question.
# Layout: interactive prompt widgets + ruled input at bottom.
# The sudo, approval, and clarify widgets appear above the input when
# the corresponding interactive prompt is active.
layout = Layout(
HSplit([
Window(height=0),
sudo_widget,
approval_widget,
clarify_widget,
spacer,
input_rule_top,
@@ -2039,6 +2315,18 @@ class HermesCLI:
'clarify-selected': '#FFD700 bold',
'clarify-active-other': '#FFD700 italic',
'clarify-countdown': '#CD7F32',
# Sudo password panel
'sudo-prompt': '#FF6B6B bold',
'sudo-border': '#CD7F32',
'sudo-title': '#FF6B6B bold',
'sudo-text': '#FFF8DC',
# Dangerous command approval panel
'approval-border': '#CD7F32',
'approval-title': '#FF8C00 bold',
'approval-desc': '#FFF8DC bold',
'approval-cmd': '#AAAAAA italic',
'approval-choice': '#AAAAAA',
'approval-selected': '#FFD700 bold',
})
# Create the application
@@ -2126,6 +2414,9 @@ class HermesCLI:
pass
finally:
self._should_exit = True
# Unregister terminal_tool callbacks to avoid dangling references
set_sudo_password_callback(None)
set_approval_callback(None)
# Close session in SQLite
if hasattr(self, '_session_db') and self._session_db and self.agent:
try:

View File

@@ -2285,6 +2285,35 @@ class AIAgent:
if self._memory_store:
self._memory_store.load_from_disk()
def _interruptible_api_call(self, api_kwargs: dict):
"""
Run the API call in a background thread so the main conversation loop
can detect interrupts without waiting for the full HTTP round-trip.
Returns the API response, or raises InterruptedError if the agent was
interrupted while waiting.
"""
result = {"response": None, "error": None}
def _call():
try:
result["response"] = self.client.chat.completions.create(**api_kwargs)
except Exception as e:
result["error"] = e
t = threading.Thread(target=_call, daemon=True)
t.start()
# Poll every 0.3s so interrupts are noticed quickly
while t.is_alive():
t.join(timeout=0.3)
if self._interrupt_requested:
# Can't cancel the HTTP request cleanly, but we can stop
# waiting and let the thread finish in the background.
raise InterruptedError("Agent interrupted during API call")
if result["error"] is not None:
raise result["error"]
return result["response"]
def _build_api_kwargs(self, api_messages: list) -> dict:
"""Build the keyword arguments dict for the chat completions API call."""
provider_preferences = {}
@@ -2778,7 +2807,7 @@ class AIAgent:
if os.getenv("HERMES_DUMP_REQUESTS", "").strip().lower() in {"1", "true", "yes", "on"}:
self._dump_api_request_debug(api_kwargs, reason="preflight")
response = self.client.chat.completions.create(**api_kwargs)
response = self._interruptible_api_call(api_kwargs)
api_duration = time.time() - api_start_time
@@ -2935,6 +2964,16 @@ class AIAgent:
break # Success, exit retry loop
except InterruptedError:
if thinking_spinner:
thinking_spinner.stop("")
thinking_spinner = None
print(f"{self.log_prefix}⚡ Interrupted during API call.")
self._flush_messages_to_session_db(messages, conversation_history)
interrupted = True
final_response = "Operation interrupted."
break
except Exception as api_error:
# Stop spinner before printing error messages
if thinking_spinner:
@@ -3053,6 +3092,10 @@ class AIAgent:
}
time.sleep(0.2) # Check interrupt every 200ms
# If the API call was interrupted, skip response processing
if interrupted:
break
try:
assistant_message = response.choices[0].message

View File

@@ -231,6 +231,26 @@ def _check_disk_usage_warning():
# Session-cached sudo password (persists until CLI exits)
_cached_sudo_password: str = ""
# Optional UI callbacks for interactive prompts. When set, these are called
# instead of the default /dev/tty or input() readers. The CLI registers these
# so prompts route through prompt_toolkit's event loop.
# _sudo_password_callback() -> str (return password or "" to skip)
# _approval_callback(command, description) -> str ("once"/"session"/"always"/"deny")
_sudo_password_callback = None
_approval_callback = None
def set_sudo_password_callback(cb):
"""Register a callback for sudo password prompts (used by CLI)."""
global _sudo_password_callback
_sudo_password_callback = cb
def set_approval_callback(cb):
"""Register a callback for dangerous command approval prompts (used by CLI)."""
global _approval_callback
_approval_callback = cb
# =============================================================================
# Dangerous Command Approval System
# =============================================================================
@@ -319,16 +339,26 @@ def _prompt_dangerous_approval(command: str, description: str, timeout_seconds:
"""
Prompt user to approve a dangerous command (CLI only).
If an _approval_callback is registered (by the CLI), delegates to it so the
prompt integrates with prompt_toolkit's UI. Otherwise falls back to the
raw input() approach (works outside the TUI, e.g. tests).
Returns: 'once', 'session', 'always', or 'deny'
"""
import sys
import threading
# Use the registered callback when available (prompt_toolkit-compatible)
if _approval_callback is not None:
try:
return _approval_callback(command, description)
except Exception:
return "deny"
# Pause spinner if one is running
os.environ["HERMES_SPINNER_PAUSE"] = "1"
try:
# Use simple ASCII art for compatibility (no ANSI color codes)
print()
print(f" ⚠️ DANGEROUS COMMAND: {description}")
print(f" {command[:80]}{'...' if len(command) > 80 else ''}")
@@ -484,12 +514,20 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
- Any error occurs
Only works in interactive mode (HERMES_INTERACTIVE=1).
Reads directly from /dev/tty with echo disabled to avoid conflicts
with prompt_toolkit's patch_stdout / Application input handling.
If a _sudo_password_callback is registered (by the CLI), delegates to it
so the prompt integrates with prompt_toolkit's UI. Otherwise reads
directly from /dev/tty with echo disabled.
"""
import sys
import time as time_module
# Use the registered callback when available (prompt_toolkit-compatible)
if _sudo_password_callback is not None:
try:
return _sudo_password_callback() or ""
except Exception:
return ""
result = {"password": None, "done": False}
def read_password_thread():
@@ -500,11 +538,9 @@ def _prompt_for_sudo_password(timeout_seconds: int = 45) -> str:
import termios
tty_fd = os.open("/dev/tty", os.O_RDONLY)
old_attrs = termios.tcgetattr(tty_fd)
# Disable echo (ECHO) but keep canonical mode (ICANON) for line buffering
new_attrs = termios.tcgetattr(tty_fd)
new_attrs[3] = new_attrs[3] & ~termios.ECHO
termios.tcsetattr(tty_fd, termios.TCSAFLUSH, new_attrs)
# Read one line (up to newline)
chars = []
while True:
b = os.read(tty_fd, 1)