Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
a23f54d15f fix: add numbered approval and clarify shortcuts (#956)
All checks were successful
Lint / lint (pull_request) Successful in 21s
2026-04-22 10:44:09 -04:00
Alexander Whitestone
ffb3257cb5 fix: repair auxiliary client syntax corruption 2026-04-22 10:44:00 -04:00
6 changed files with 339 additions and 241 deletions

View File

@@ -1,4 +1,4 @@
from agent.telemetry_logger import log_token_usage\n"""Shared auxiliary client router for side tasks.
"""Shared auxiliary client router for side tasks.
Provides a single resolution chain so every consumer (context compression,
session search, web extraction, vision analysis, browser vision) picks up
@@ -34,6 +34,8 @@ Payment / credit exhaustion fallback:
their OpenRouter balance but has Codex OAuth or another provider available.
"""
from agent.telemetry_logger import log_token_usage
import json
import logging
import os
@@ -396,7 +398,8 @@ class _CodexCompletionsAdapter:
prompt_tokens=getattr(resp_usage, "input_tokens", 0),
completion_tokens=getattr(resp_usage, "output_tokens", 0),
total_tokens=getattr(resp_usage, "total_tokens", 0),
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
except Exception as exc:
logger.debug("Codex auxiliary Responses API call failed: %s", exc)
raise
@@ -529,7 +532,8 @@ class _AnthropicCompletionsAdapter:
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
)\n log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
)
log_token_usage(usage.prompt_tokens, usage.completion_tokens, model)
choice = SimpleNamespace(
index=0,

240
cli.py
View File

@@ -7254,6 +7254,40 @@ class HermesCLI:
"Use your best judgement to make the choice and proceed."
)
def _handle_clarify_selection(self) -> None:
"""Process the currently selected clarify choice."""
state = self._clarify_state
if not state or self._clarify_freetext:
return
selected = state.get("selected", 0)
choices = state.get("choices") or []
if selected < len(choices):
state["response_queue"].put(choices[selected])
self._clarify_state = None
self._clarify_freetext = False
self._invalidate()
return
if selected == len(choices):
self._clarify_freetext = True
self._invalidate()
def _handle_clarify_number_shortcut(self, number: int) -> bool:
"""Select a clarify option by number key."""
state = self._clarify_state
if not state or self._clarify_freetext:
return False
choices = state.get("choices") or []
max_option = len(choices) + 1
if number < 1 or number > max_option:
return False
state["selected"] = number - 1
self._handle_clarify_selection()
return True
def _sudo_password_callback(self) -> str:
"""
Prompt for sudo password through the prompt_toolkit UI.
@@ -7362,6 +7396,20 @@ class HermesCLI:
choices.append("view")
return choices
def _handle_approval_number_shortcut(self, number: int) -> bool:
"""Select an approval option by number key."""
state = self._approval_state
if not state:
return False
choices = state.get("choices") or []
if number < 1 or number > len(choices):
return False
state["selected"] = number - 1
self._handle_approval_selection()
return True
def _handle_approval_selection(self) -> None:
"""Process the currently selected dangerous-command approval choice."""
state = self._approval_state
@@ -7437,8 +7485,9 @@ class HermesCLI:
preview_lines.extend(_wrap_panel_text(cmd_display, 60))
for i, choice in enumerate(choices):
prefix = ' ' if i == selected else ' '
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
preview_lines.extend(_wrap_panel_text(
f"{prefix}{choice_labels.get(choice, choice)}",
f"{prefix}{label}",
60,
subsequent_indent=" ",
))
@@ -7456,7 +7505,7 @@ class HermesCLI:
_append_panel_line(lines, 'class:approval-border', 'class:approval-cmd', wrapped, box_width)
_append_blank_panel_line(lines, 'class:approval-border', box_width)
for i, choice in enumerate(choices):
label = choice_labels.get(choice, choice)
label = f"{i + 1}. {choice_labels.get(choice, choice)}"
style = 'class:approval-selected' if i == selected else 'class:approval-choice'
prefix = ' ' if i == selected else ' '
for wrapped in _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" "):
@@ -7465,6 +7514,97 @@ class HermesCLI:
lines.append(('class:approval-border', '' + ('' * box_width) + '\n'))
return lines
def _get_clarify_display_fragments(self):
"""Render the clarify panel for the prompt_toolkit UI."""
state = self._clarify_state
if not state:
return []
def _panel_box_width(title: str, content_lines: list[str], min_width: int = 46, max_width: int = 76) -> int:
term_cols = shutil.get_terminal_size((100, 20)).columns
longest = max([len(title)] + [len(line) for line in content_lines] + [min_width - 4])
inner = min(max(longest + 4, min_width - 2), max_width - 2, max(24, term_cols - 6))
return inner + 2
def _wrap_panel_text(text: str, width: int, subsequent_indent: str = "") -> list[str]:
wrapped = textwrap.wrap(
text,
width=max(8, width),
break_long_words=False,
break_on_hyphens=False,
subsequent_indent=subsequent_indent,
)
return wrapped or [""]
def _append_panel_line(lines, border_style: str, content_style: str, text: str, box_width: int) -> None:
inner_width = max(0, box_width - 2)
lines.append((border_style, ""))
lines.append((content_style, text.ljust(inner_width)))
lines.append((border_style, "\n"))
def _append_blank_panel_line(lines, border_style: str, box_width: int) -> None:
lines.append((border_style, "" + (" " * box_width) + "\n"))
question = state["question"]
choices = state.get("choices") or []
selected = state.get("selected", 0)
preview_lines = _wrap_panel_text(question, 60)
for i, choice in enumerate(choices):
prefix = " " if i == selected and not self._clarify_freetext else " "
label = f"{i + 1}. {choice}"
preview_lines.extend(_wrap_panel_text(f"{prefix}{label}", 60, subsequent_indent=" "))
other_number = len(choices) + 1
other_label = (
f" {other_number}. Other (type below)" if self._clarify_freetext
else f" {other_number}. Other (type your answer)" if selected == len(choices)
else f" {other_number}. Other (type your answer)"
)
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
lines = []
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
for wrapped in _wrap_panel_text(question, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if self._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
for i, choice in enumerate(choices):
style = 'class:clarify-selected' if i == selected and not self._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not self._clarify_freetext else ' '
label = f"{i + 1}. {choice}"
wrapped_lines = _wrap_panel_text(f"{prefix}{label}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
other_idx = len(choices)
if selected == other_idx and not self._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = f' {other_number}. Other (type your answer)'
elif self._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = f' {other_number}. Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = f' {other_number}. Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines
def _secret_capture_callback(self, var_name: str, prompt: str, metadata=None) -> dict:
return prompt_for_secret(self, var_name, prompt, metadata)
@@ -8371,17 +8511,8 @@ class HermesCLI:
# --- Clarify choice mode: confirm the highlighted selection ---
if self._clarify_state and not self._clarify_freetext:
state = self._clarify_state
selected = state["selected"]
choices = state.get("choices") or []
if selected < len(choices):
state["response_queue"].put(choices[selected])
self._clarify_state = None
event.app.invalidate()
else:
# "Other" selected → switch to freetext
self._clarify_freetext = True
event.app.invalidate()
self._handle_clarify_selection()
event.app.invalidate()
return
# --- Normal input routing ---
@@ -8501,6 +8632,19 @@ class HermesCLI:
self._approval_state["selected"] = min(max_idx, self._approval_state["selected"] + 1)
event.app.invalidate()
# --- Numbered shortcuts for clarify / approval modal prompts ---
for _digit in '123456789':
@kb.add(_digit, filter=Condition(lambda: bool(self._approval_state) or (bool(self._clarify_state) and not self._clarify_freetext)))
def _handle_modal_number(event, digit=_digit):
number = int(digit)
handled = False
if self._approval_state:
handled = self._handle_approval_number_shortcut(number)
elif self._clarify_state and not self._clarify_freetext:
handled = self._handle_clarify_number_shortcut(number)
if handled:
event.app.invalidate()
# --- /model picker: arrow-key navigation ---
@kb.add('up', filter=Condition(lambda: bool(self._model_picker_state)))
def model_picker_up(event):
@@ -8995,7 +9139,7 @@ class HermesCLI:
if cli_ref._approval_state:
remaining = max(0, int(cli_ref._approval_deadline - _time.monotonic()))
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', f' ({remaining}s)'),
]
@@ -9008,7 +9152,7 @@ class HermesCLI:
('class:clarify-countdown', countdown),
]
return [
('class:hint', ' ↑/↓ to select, Enter to confirm'),
('class:hint', ' 1-9 or ↑/↓ to select, Enter to confirm'),
('class:clarify-countdown', countdown),
]
@@ -9086,71 +9230,7 @@ class HermesCLI:
lines.append((border_style, "" + (" " * box_width) + "\n"))
def _get_clarify_display():
"""Build styled text for the clarify question/choices panel."""
state = cli_ref._clarify_state
if not state:
return []
question = state["question"]
choices = state.get("choices") or []
selected = state.get("selected", 0)
preview_lines = _wrap_panel_text(question, 60)
for i, choice in enumerate(choices):
prefix = " " if i == selected and not cli_ref._clarify_freetext else " "
preview_lines.extend(_wrap_panel_text(f"{prefix}{choice}", 60, subsequent_indent=" "))
other_label = (
" Other (type below)" if cli_ref._clarify_freetext
else " Other (type your answer)" if selected == len(choices)
else " Other (type your answer)"
)
preview_lines.extend(_wrap_panel_text(other_label, 60, subsequent_indent=" "))
box_width = _panel_box_width("Hermes needs your input", preview_lines)
inner_text_width = max(8, box_width - 2)
lines = []
# Box top border
lines.append(('class:clarify-border', '╭─ '))
lines.append(('class:clarify-title', 'Hermes needs your input'))
lines.append(('class:clarify-border', ' ' + ('' * max(0, box_width - len("Hermes needs your input") - 3)) + '\n'))
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
# Question text
for wrapped in _wrap_panel_text(question, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-question', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if cli_ref._clarify_freetext and not choices:
guidance = "Type your answer in the prompt below, then press Enter."
for wrapped in _wrap_panel_text(guidance, inner_text_width):
_append_panel_line(lines, 'class:clarify-border', 'class:clarify-choice', wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
if choices:
# Multiple-choice mode: show selectable options
for i, choice in enumerate(choices):
style = 'class:clarify-selected' if i == selected and not cli_ref._clarify_freetext else 'class:clarify-choice'
prefix = ' ' if i == selected and not cli_ref._clarify_freetext else ' '
wrapped_lines = _wrap_panel_text(f"{prefix}{choice}", inner_text_width, subsequent_indent=" ")
for wrapped in wrapped_lines:
_append_panel_line(lines, 'class:clarify-border', style, wrapped, box_width)
# "Other" option (5th line, only shown when choices exist)
other_idx = len(choices)
if selected == other_idx and not cli_ref._clarify_freetext:
other_style = 'class:clarify-selected'
other_label = ' Other (type your answer)'
elif cli_ref._clarify_freetext:
other_style = 'class:clarify-active-other'
other_label = ' Other (type below)'
else:
other_style = 'class:clarify-choice'
other_label = ' Other (type your answer)'
for wrapped in _wrap_panel_text(other_label, inner_text_width, subsequent_indent=" "):
_append_panel_line(lines, 'class:clarify-border', other_style, wrapped, box_width)
_append_blank_panel_line(lines, 'class:clarify-border', box_width)
lines.append(('class:clarify-border', '' + ('' * box_width) + '\n'))
return lines
return cli_ref._get_clarify_display_fragments()
clarify_widget = ConditionalContainer(
Window(

View File

@@ -211,43 +211,6 @@ class PluginContext:
}
logger.debug("Plugin %s registered CLI command: %s", self.manifest.name, name)
# -- memory provider registration ----------------------------------------
def register_memory_provider(self, provider) -> None:
"""Register a memory provider supplied by this plugin.
The provider must be an instance of ``agent.memory_provider.MemoryProvider``.
Only one plugin-registered memory provider is accepted; a second
attempt is rejected with a warning.
The registered provider is retrievable via
``get_plugin_memory_provider()`` and is picked up by ``run_agent.py``
when ``memory.provider`` in *config.yaml* matches the provider's
``name`` property.
"""
from agent.memory_provider import MemoryProvider
if not isinstance(provider, MemoryProvider):
logger.warning(
"Plugin '%s' tried to register a memory provider that does not "
"inherit from MemoryProvider. Ignoring.",
self.manifest.name,
)
return
if self._manager._plugin_memory_provider is not None:
logger.warning(
"Plugin '%s' tried to register a memory provider, but one is "
"already registered by another plugin. Only one plugin-supplied "
"memory provider is allowed at a time.",
self.manifest.name,
)
return
self._manager._plugin_memory_provider = provider
logger.info(
"Plugin '%s' registered memory provider: %s",
self.manifest.name, provider.name,
)
# -- context engine registration -----------------------------------------
def register_context_engine(self, engine) -> None:
@@ -360,7 +323,6 @@ class PluginManager:
self._plugin_tool_names: Set[str] = set()
self._cli_commands: Dict[str, dict] = {}
self._context_engine = None # Set by a plugin via register_context_engine()
self._plugin_memory_provider = None # Set by a plugin via register_memory_provider()
self._discovered: bool = False
self._cli_ref = None # Set by CLI after plugin discovery
# Plugin skill registry: qualified name → metadata dict.
@@ -737,11 +699,6 @@ def get_plugin_context_engine():
return get_plugin_manager()._context_engine
def get_plugin_memory_provider():
"""Return the plugin-registered memory provider, or None."""
return get_plugin_manager()._plugin_memory_provider
def get_plugin_toolsets() -> List[tuple]:
"""Return plugin toolsets as ``(key, label, description)`` tuples.

View File

@@ -1193,18 +1193,6 @@ class AIAgent:
from plugins.memory import load_memory_provider as _load_mem
self._memory_manager = _MemoryManager()
_mp = _load_mem(_mem_provider_name)
# Fall back to a user plugin that called register_memory_provider()
if _mp is None:
try:
from hermes_cli.plugins import get_plugin_memory_provider as _gpm
_candidate = _gpm()
if _candidate and _candidate.name == _mem_provider_name:
_mp = _candidate
except Exception as _gpm_err:
logger.debug(
"get_plugin_memory_provider() failed during fallback lookup: %s",
_gpm_err,
)
if _mp and _mp.is_available():
self._memory_manager.add_provider(_mp)
if self._memory_manager.providers:

View File

@@ -0,0 +1,172 @@
import queue
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock
from cli import HermesCLI
class _FakeBuffer:
def __init__(self, text="", cursor_position=None):
self.text = text
self.cursor_position = len(text) if cursor_position is None else cursor_position
def reset(self, append_to_history=False):
self.text = ""
self.cursor_position = 0
def _make_cli_stub():
cli = HermesCLI.__new__(HermesCLI)
cli._approval_state = None
cli._approval_deadline = 0
cli._approval_lock = threading.Lock()
cli._clarify_state = None
cli._clarify_freetext = False
cli._clarify_deadline = 0
cli._sudo_state = None
cli._sudo_deadline = 0
cli._secret_state = None
cli._secret_deadline = 0
cli._modal_input_snapshot = None
cli._invalidate = MagicMock()
cli._app = SimpleNamespace(invalidate=MagicMock(), current_buffer=_FakeBuffer())
return cli
def test_approval_display_numbers_choices():
cli = _make_cli_stub()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": queue.Queue(),
}
rendered = "".join(text for _style, text in cli._get_approval_display_fragments())
assert " 1. Allow once" in rendered
assert "2. Allow for this session" in rendered
assert "3. Add to permanent allowlist" in rendered
assert "4. Deny" in rendered
def test_approval_number_shortcut_submits_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_approval_number_shortcut(2) is True
assert response_queue.get_nowait() == "session"
assert cli._approval_state is None
def test_approval_selection_still_submits_selected_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo rm -rf /tmp/example",
"description": "dangerous command",
"choices": ["once", "session", "always", "deny"],
"selected": 1,
"response_queue": response_queue,
}
cli._handle_approval_selection()
assert response_queue.get_nowait() == "session"
assert cli._approval_state is None
def test_approval_number_shortcut_handles_view_in_place():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._approval_state = {
"command": "sudo dd if=/tmp/in of=/usr/share/keyrings/githubcli-archive-keyring.gpg bs=4M status=progress",
"description": "disk copy",
"choices": ["once", "session", "always", "deny", "view"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_approval_number_shortcut(5) is True
assert cli._approval_state is not None
assert cli._approval_state["show_full"] is True
assert "view" not in cli._approval_state["choices"]
assert cli._approval_state["selected"] == 3
assert response_queue.empty()
def test_clarify_display_numbers_choices_and_other():
cli = _make_cli_stub()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma", "Delta"],
"selected": 1,
"response_queue": queue.Queue(),
}
rendered = "".join(text for _style, text in cli._get_clarify_display_fragments())
assert "1. Alpha" in rendered
assert " 2. Beta" in rendered
assert "3. Gamma" in rendered
assert "4. Delta" in rendered
assert "5. Other (type your answer)" in rendered
def test_clarify_number_shortcut_submits_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_clarify_number_shortcut(3) is True
assert response_queue.get_nowait() == "Gamma"
assert cli._clarify_state is None
assert cli._clarify_freetext is False
def test_clarify_selection_still_submits_selected_choice():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 1,
"response_queue": response_queue,
}
cli._handle_clarify_selection()
assert response_queue.get_nowait() == "Beta"
assert cli._clarify_state is None
assert cli._clarify_freetext is False
def test_clarify_number_shortcut_activates_other_freetext():
cli = _make_cli_stub()
response_queue = queue.Queue()
cli._clarify_state = {
"question": "Pick the best option",
"choices": ["Alpha", "Beta", "Gamma"],
"selected": 0,
"response_queue": response_queue,
}
assert cli._handle_clarify_number_shortcut(4) is True
assert cli._clarify_state is not None
assert cli._clarify_state["selected"] == 3
assert cli._clarify_freetext is True
assert response_queue.empty()

View File

@@ -19,7 +19,6 @@ from hermes_cli.plugins import (
PluginManifest,
get_plugin_manager,
get_pre_tool_call_block_message,
get_plugin_memory_provider,
discover_plugins,
invoke_hook,
)
@@ -610,105 +609,3 @@ class TestPreLlmCallTargetRouting:
# in PluginContext (hermes_cli/plugins.py). The tests referenced _plugin_commands,
# commands_registered, get_plugin_command_handler, and GATEWAY_KNOWN_COMMANDS
# integration — all of which are unimplemented features.
# ── TestRegisterMemoryProvider ─────────────────────────────────────────────
class TestRegisterMemoryProvider:
"""Regression tests for PluginContext.register_memory_provider() — issue #990.
The MemPalace plugin (and any user plugin following the developer guide)
calls ``ctx.register_memory_provider(provider)`` inside ``register(ctx)``.
Before the fix, PluginContext had no such method and the plugin failed to
load with: 'PluginContext' object has no attribute 'register_memory_provider'.
"""
def _make_memory_plugin(self, plugins_dir: "Path", name: str) -> None:
"""Write a minimal user plugin that registers a stub MemoryProvider."""
from agent.memory_provider import MemoryProvider
plugin_dir = plugins_dir / name
plugin_dir.mkdir(parents=True, exist_ok=True)
(plugin_dir / "plugin.yaml").write_text(
f"name: {name}\nversion: 0.1.0\ndescription: Stub memory plugin\n"
)
# The register() body imports and calls register_memory_provider — this
# is the exact pattern documented in memory-provider-plugin.md and used
# by third-party plugins such as MemPalace.
(plugin_dir / "__init__.py").write_text(
"from agent.memory_provider import MemoryProvider\n"
"\n"
"class _StubProvider(MemoryProvider):\n"
" @property\n"
f" def name(self): return '{name}'\n"
" def is_available(self): return True\n"
" def initialize(self, session_id, **kw): pass\n"
" def get_tool_schemas(self): return []\n"
"\n"
"def register(ctx):\n"
" ctx.register_memory_provider(_StubProvider())\n"
)
def test_register_memory_provider_succeeds(self, tmp_path, monkeypatch):
"""A user plugin calling register_memory_provider() loads without error."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
self._make_memory_plugin(plugins_dir, "mempalace")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
assert "mempalace" in mgr._plugins
assert mgr._plugins["mempalace"].enabled, (
mgr._plugins["mempalace"].error
)
def test_plugin_memory_provider_stored(self, tmp_path, monkeypatch):
"""The provider instance is accessible via get_plugin_memory_provider()."""
import hermes_cli.plugins as plugins_mod
plugins_dir = tmp_path / "hermes_test" / "plugins"
self._make_memory_plugin(plugins_dir, "mempalace")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
# Swap the singleton so get_plugin_memory_provider() sees our manager
monkeypatch.setattr(plugins_mod, "_plugin_manager", mgr)
mgr.discover_and_load()
provider = get_plugin_memory_provider()
assert provider is not None
assert provider.name == "mempalace"
def test_second_registration_rejected(self, tmp_path, monkeypatch):
"""Only one plugin-registered memory provider is accepted."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
self._make_memory_plugin(plugins_dir, "first_provider")
self._make_memory_plugin(plugins_dir, "second_provider")
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
# The manager should hold exactly one provider
assert mgr._plugin_memory_provider is not None
assert mgr._plugin_memory_provider.name in {"first_provider", "second_provider"}
def test_non_provider_rejected(self, tmp_path, monkeypatch):
"""Passing a non-MemoryProvider object logs a warning and is ignored."""
plugins_dir = tmp_path / "hermes_test" / "plugins"
plugin_dir = plugins_dir / "bad_provider"
plugin_dir.mkdir(parents=True, exist_ok=True)
(plugin_dir / "plugin.yaml").write_text("name: bad_provider\n")
(plugin_dir / "__init__.py").write_text(
"def register(ctx):\n"
" ctx.register_memory_provider('not-a-provider')\n"
)
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes_test"))
mgr = PluginManager()
mgr.discover_and_load()
# Plugin still loads (warning only), but no provider is stored
assert mgr._plugin_memory_provider is None