browser_tool.py registered SIGINT/SIGTERM handlers that called sys.exit() at module import time. When a signal arrived during a lock acquisition (e.g. AudioRecorder._lock in voice mode), SystemExit was raised inside prompt_toolkit's async event loop, corrupting coroutine state and making the process unkillable (required SIGKILL). atexit handler already ensures browser sessions are cleaned up on any normal exit path, so the signal handlers were redundant and harmful.
704 lines
27 KiB
Python
704 lines
27 KiB
Python
"""Tests for CLI voice mode integration -- command parsing, markdown stripping,
|
|
state management, streaming TTS activation, voice message prefix, _vprint."""
|
|
|
|
import ast
|
|
import re
|
|
import threading
|
|
from types import SimpleNamespace
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
|
|
# ============================================================================
|
|
# Markdown stripping (same logic as _voice_speak_response)
|
|
# ============================================================================
|
|
|
|
def _strip_markdown_for_tts(text: str) -> str:
|
|
"""Replicate the markdown stripping logic from cli._voice_speak_response."""
|
|
tts_text = text[:4000] if len(text) > 4000 else text
|
|
tts_text = re.sub(r'```[\s\S]*?```', ' ', tts_text) # fenced code blocks
|
|
tts_text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', tts_text) # [text](url) -> text
|
|
tts_text = re.sub(r'https?://\S+', '', tts_text) # URLs
|
|
tts_text = re.sub(r'\*\*(.+?)\*\*', r'\1', tts_text) # bold
|
|
tts_text = re.sub(r'\*(.+?)\*', r'\1', tts_text) # italic
|
|
tts_text = re.sub(r'`(.+?)`', r'\1', tts_text) # inline code
|
|
tts_text = re.sub(r'^#+\s*', '', tts_text, flags=re.MULTILINE) # headers
|
|
tts_text = re.sub(r'^\s*[-*]\s+', '', tts_text, flags=re.MULTILINE) # list items
|
|
tts_text = re.sub(r'---+', '', tts_text) # horizontal rules
|
|
tts_text = re.sub(r'\n{3,}', '\n\n', tts_text) # excessive newlines
|
|
return tts_text.strip()
|
|
|
|
|
|
class TestMarkdownStripping:
|
|
def test_strips_bold(self):
|
|
assert _strip_markdown_for_tts("This is **bold** text") == "This is bold text"
|
|
|
|
def test_strips_italic(self):
|
|
assert _strip_markdown_for_tts("This is *italic* text") == "This is italic text"
|
|
|
|
def test_strips_inline_code(self):
|
|
assert _strip_markdown_for_tts("Run `pip install foo`") == "Run pip install foo"
|
|
|
|
def test_strips_fenced_code_blocks(self):
|
|
text = "Here is code:\n```python\nprint('hello')\n```\nDone."
|
|
result = _strip_markdown_for_tts(text)
|
|
assert "print" not in result
|
|
assert "Done." in result
|
|
|
|
def test_strips_headers(self):
|
|
assert _strip_markdown_for_tts("## Summary\nSome text") == "Summary\nSome text"
|
|
|
|
def test_strips_list_markers(self):
|
|
text = "- item one\n- item two\n* item three"
|
|
result = _strip_markdown_for_tts(text)
|
|
assert "item one" in result
|
|
assert "- " not in result
|
|
assert "* " not in result
|
|
|
|
def test_strips_urls(self):
|
|
text = "Visit https://example.com for details"
|
|
result = _strip_markdown_for_tts(text)
|
|
assert "https://" not in result
|
|
assert "Visit" in result
|
|
|
|
def test_strips_markdown_links(self):
|
|
text = "See [the docs](https://example.com/docs) for info"
|
|
result = _strip_markdown_for_tts(text)
|
|
assert "the docs" in result
|
|
assert "https://" not in result
|
|
assert "[" not in result
|
|
|
|
def test_strips_horizontal_rules(self):
|
|
text = "Part one\n---\nPart two"
|
|
result = _strip_markdown_for_tts(text)
|
|
assert "---" not in result
|
|
assert "Part one" in result
|
|
assert "Part two" in result
|
|
|
|
def test_empty_after_stripping_returns_empty(self):
|
|
text = "```python\nprint('hello')\n```"
|
|
result = _strip_markdown_for_tts(text)
|
|
assert result == ""
|
|
|
|
def test_truncates_long_text(self):
|
|
text = "a" * 5000
|
|
result = _strip_markdown_for_tts(text)
|
|
assert len(result) <= 4000
|
|
|
|
def test_complex_response(self):
|
|
text = (
|
|
"## Answer\n\n"
|
|
"Here's how to do it:\n\n"
|
|
"```python\ndef hello():\n print('hi')\n```\n\n"
|
|
"Run it with `python main.py`. "
|
|
"See [docs](https://example.com) for more.\n\n"
|
|
"- Step one\n- Step two\n\n"
|
|
"---\n\n"
|
|
"**Good luck!**"
|
|
)
|
|
result = _strip_markdown_for_tts(text)
|
|
assert "```" not in result
|
|
assert "https://" not in result
|
|
assert "**" not in result
|
|
assert "---" not in result
|
|
assert "Answer" in result
|
|
assert "Good luck!" in result
|
|
assert "docs" in result
|
|
|
|
|
|
# ============================================================================
|
|
# Voice command parsing
|
|
# ============================================================================
|
|
|
|
class TestVoiceCommandParsing:
|
|
"""Test _handle_voice_command logic without full CLI setup."""
|
|
|
|
def test_parse_subcommands(self):
|
|
"""Verify subcommand extraction from /voice commands."""
|
|
test_cases = [
|
|
("/voice on", "on"),
|
|
("/voice off", "off"),
|
|
("/voice tts", "tts"),
|
|
("/voice status", "status"),
|
|
("/voice", ""),
|
|
("/voice ON ", "on"),
|
|
]
|
|
for command, expected in test_cases:
|
|
parts = command.strip().split(maxsplit=1)
|
|
subcommand = parts[1].lower().strip() if len(parts) > 1 else ""
|
|
assert subcommand == expected, f"Failed for {command!r}: got {subcommand!r}"
|
|
|
|
|
|
# ============================================================================
|
|
# Voice state thread safety
|
|
# ============================================================================
|
|
|
|
class TestVoiceStateLock:
|
|
def test_lock_protects_state(self):
|
|
"""Verify that concurrent state changes don't corrupt state."""
|
|
lock = threading.Lock()
|
|
state = {"recording": False, "count": 0}
|
|
|
|
def toggle_many(n):
|
|
for _ in range(n):
|
|
with lock:
|
|
state["recording"] = not state["recording"]
|
|
state["count"] += 1
|
|
|
|
threads = [threading.Thread(target=toggle_many, args=(1000,)) for _ in range(4)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert state["count"] == 4000
|
|
|
|
|
|
# ============================================================================
|
|
# Streaming TTS lazy import activation (Bug A fix)
|
|
# ============================================================================
|
|
|
|
class TestStreamingTTSActivation:
|
|
"""Verify streaming TTS uses lazy imports to check availability."""
|
|
|
|
def test_activates_when_elevenlabs_and_sounddevice_available(self):
|
|
"""use_streaming_tts should be True when provider is elevenlabs
|
|
and both lazy imports succeed."""
|
|
use_streaming_tts = False
|
|
try:
|
|
from tools.tts_tool import (
|
|
_load_tts_config as _load_tts_cfg,
|
|
_get_provider as _get_prov,
|
|
_import_elevenlabs,
|
|
_import_sounddevice,
|
|
)
|
|
assert callable(_import_elevenlabs)
|
|
assert callable(_import_sounddevice)
|
|
except ImportError:
|
|
pytest.skip("tools.tts_tool not available")
|
|
|
|
with patch("tools.tts_tool._load_tts_config") as mock_cfg, \
|
|
patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \
|
|
patch("tools.tts_tool._import_elevenlabs") as mock_el, \
|
|
patch("tools.tts_tool._import_sounddevice") as mock_sd:
|
|
mock_cfg.return_value = {"provider": "elevenlabs"}
|
|
mock_el.return_value = MagicMock()
|
|
mock_sd.return_value = MagicMock()
|
|
|
|
from tools.tts_tool import (
|
|
_load_tts_config as load_cfg,
|
|
_get_provider as get_prov,
|
|
_import_elevenlabs as import_el,
|
|
_import_sounddevice as import_sd,
|
|
)
|
|
cfg = load_cfg()
|
|
if get_prov(cfg) == "elevenlabs":
|
|
import_el()
|
|
import_sd()
|
|
use_streaming_tts = True
|
|
|
|
assert use_streaming_tts is True
|
|
|
|
def test_does_not_activate_when_elevenlabs_missing(self):
|
|
"""use_streaming_tts stays False when elevenlabs import fails."""
|
|
use_streaming_tts = False
|
|
with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \
|
|
patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \
|
|
patch("tools.tts_tool._import_elevenlabs", side_effect=ImportError("no elevenlabs")):
|
|
try:
|
|
from tools.tts_tool import (
|
|
_load_tts_config as load_cfg,
|
|
_get_provider as get_prov,
|
|
_import_elevenlabs as import_el,
|
|
_import_sounddevice as import_sd,
|
|
)
|
|
cfg = load_cfg()
|
|
if get_prov(cfg) == "elevenlabs":
|
|
import_el()
|
|
import_sd()
|
|
use_streaming_tts = True
|
|
except (ImportError, OSError):
|
|
pass
|
|
|
|
assert use_streaming_tts is False
|
|
|
|
def test_does_not_activate_when_sounddevice_missing(self):
|
|
"""use_streaming_tts stays False when sounddevice import fails."""
|
|
use_streaming_tts = False
|
|
with patch("tools.tts_tool._load_tts_config", return_value={"provider": "elevenlabs"}), \
|
|
patch("tools.tts_tool._get_provider", return_value="elevenlabs"), \
|
|
patch("tools.tts_tool._import_elevenlabs", return_value=MagicMock()), \
|
|
patch("tools.tts_tool._import_sounddevice", side_effect=OSError("no PortAudio")):
|
|
try:
|
|
from tools.tts_tool import (
|
|
_load_tts_config as load_cfg,
|
|
_get_provider as get_prov,
|
|
_import_elevenlabs as import_el,
|
|
_import_sounddevice as import_sd,
|
|
)
|
|
cfg = load_cfg()
|
|
if get_prov(cfg) == "elevenlabs":
|
|
import_el()
|
|
import_sd()
|
|
use_streaming_tts = True
|
|
except (ImportError, OSError):
|
|
pass
|
|
|
|
assert use_streaming_tts is False
|
|
|
|
def test_does_not_activate_for_non_elevenlabs_provider(self):
|
|
"""use_streaming_tts stays False when provider is not elevenlabs."""
|
|
use_streaming_tts = False
|
|
with patch("tools.tts_tool._load_tts_config", return_value={"provider": "edge"}), \
|
|
patch("tools.tts_tool._get_provider", return_value="edge"):
|
|
try:
|
|
from tools.tts_tool import (
|
|
_load_tts_config as load_cfg,
|
|
_get_provider as get_prov,
|
|
_import_elevenlabs as import_el,
|
|
_import_sounddevice as import_sd,
|
|
)
|
|
cfg = load_cfg()
|
|
if get_prov(cfg) == "elevenlabs":
|
|
import_el()
|
|
import_sd()
|
|
use_streaming_tts = True
|
|
except (ImportError, OSError):
|
|
pass
|
|
|
|
assert use_streaming_tts is False
|
|
|
|
def test_stale_boolean_imports_no_longer_exist(self):
|
|
"""Confirm _HAS_ELEVENLABS and _HAS_AUDIO are not in tts_tool module."""
|
|
import tools.tts_tool as tts_mod
|
|
assert not hasattr(tts_mod, "_HAS_ELEVENLABS"), \
|
|
"_HAS_ELEVENLABS should not exist -- lazy imports replaced it"
|
|
assert not hasattr(tts_mod, "_HAS_AUDIO"), \
|
|
"_HAS_AUDIO should not exist -- lazy imports replaced it"
|
|
|
|
|
|
# ============================================================================
|
|
# Voice mode user message prefix (Bug B fix)
|
|
# ============================================================================
|
|
|
|
class TestVoiceMessagePrefix:
|
|
"""Voice mode should inject instruction via user message prefix,
|
|
not by modifying the system prompt (which breaks prompt cache)."""
|
|
|
|
def test_prefix_added_when_voice_mode_active(self):
|
|
"""When voice mode is active and message is str, agent_message
|
|
should have the voice instruction prefix."""
|
|
voice_mode = True
|
|
message = "What's the weather like?"
|
|
|
|
agent_message = message
|
|
if voice_mode and isinstance(message, str):
|
|
agent_message = (
|
|
"[Voice input — respond concisely and conversationally, "
|
|
"2-3 sentences max. No code blocks or markdown.] "
|
|
+ message
|
|
)
|
|
|
|
assert agent_message.startswith("[Voice input")
|
|
assert "What's the weather like?" in agent_message
|
|
|
|
def test_no_prefix_when_voice_mode_inactive(self):
|
|
"""When voice mode is off, message passes through unchanged."""
|
|
voice_mode = False
|
|
message = "What's the weather like?"
|
|
|
|
agent_message = message
|
|
if voice_mode and isinstance(message, str):
|
|
agent_message = (
|
|
"[Voice input — respond concisely and conversationally, "
|
|
"2-3 sentences max. No code blocks or markdown.] "
|
|
+ message
|
|
)
|
|
|
|
assert agent_message == message
|
|
|
|
def test_no_prefix_for_multimodal_content(self):
|
|
"""When message is a list (multimodal), no prefix is added."""
|
|
voice_mode = True
|
|
message = [{"type": "text", "text": "describe this"}, {"type": "image_url"}]
|
|
|
|
agent_message = message
|
|
if voice_mode and isinstance(message, str):
|
|
agent_message = (
|
|
"[Voice input — respond concisely and conversationally, "
|
|
"2-3 sentences max. No code blocks or markdown.] "
|
|
+ message
|
|
)
|
|
|
|
assert agent_message is message
|
|
|
|
def test_history_stays_clean(self):
|
|
"""conversation_history should contain the original message,
|
|
not the prefixed version."""
|
|
voice_mode = True
|
|
message = "Hello there"
|
|
conversation_history = []
|
|
|
|
conversation_history.append({"role": "user", "content": message})
|
|
|
|
agent_message = message
|
|
if voice_mode and isinstance(message, str):
|
|
agent_message = (
|
|
"[Voice input — respond concisely and conversationally, "
|
|
"2-3 sentences max. No code blocks or markdown.] "
|
|
+ message
|
|
)
|
|
|
|
assert conversation_history[-1]["content"] == "Hello there"
|
|
assert agent_message.startswith("[Voice input")
|
|
assert agent_message != conversation_history[-1]["content"]
|
|
|
|
def test_enable_voice_mode_does_not_modify_system_prompt(self):
|
|
"""_enable_voice_mode should NOT modify self.system_prompt or
|
|
agent.ephemeral_system_prompt -- the system prompt must stay
|
|
stable to preserve prompt cache."""
|
|
cli = SimpleNamespace(
|
|
_voice_mode=False,
|
|
_voice_tts=False,
|
|
_voice_lock=threading.Lock(),
|
|
system_prompt="You are helpful",
|
|
agent=SimpleNamespace(ephemeral_system_prompt="You are helpful"),
|
|
)
|
|
|
|
original_system = cli.system_prompt
|
|
original_ephemeral = cli.agent.ephemeral_system_prompt
|
|
|
|
cli._voice_mode = True
|
|
|
|
assert cli.system_prompt == original_system
|
|
assert cli.agent.ephemeral_system_prompt == original_ephemeral
|
|
|
|
|
|
# ============================================================================
|
|
# _vprint force parameter (Minor fix)
|
|
# ============================================================================
|
|
|
|
class TestVprintForceParameter:
|
|
"""_vprint should suppress output during streaming TTS unless force=True."""
|
|
|
|
def _make_agent_with_stream(self, stream_active: bool):
|
|
"""Create a minimal agent-like object with _vprint."""
|
|
agent = SimpleNamespace(
|
|
_stream_callback=MagicMock() if stream_active else None,
|
|
)
|
|
|
|
def _vprint(*args, force=False, **kwargs):
|
|
if not force and getattr(agent, "_stream_callback", None) is not None:
|
|
return
|
|
print(*args, **kwargs)
|
|
|
|
agent._vprint = _vprint
|
|
return agent
|
|
|
|
def test_suppressed_during_streaming(self, capsys):
|
|
"""Normal _vprint output is suppressed when streaming TTS is active."""
|
|
agent = self._make_agent_with_stream(stream_active=True)
|
|
agent._vprint("should be hidden")
|
|
captured = capsys.readouterr()
|
|
assert captured.out == ""
|
|
|
|
def test_shown_when_not_streaming(self, capsys):
|
|
"""Normal _vprint output is shown when streaming is not active."""
|
|
agent = self._make_agent_with_stream(stream_active=False)
|
|
agent._vprint("should be shown")
|
|
captured = capsys.readouterr()
|
|
assert "should be shown" in captured.out
|
|
|
|
def test_force_shown_during_streaming(self, capsys):
|
|
"""force=True bypasses the streaming suppression."""
|
|
agent = self._make_agent_with_stream(stream_active=True)
|
|
agent._vprint("critical error!", force=True)
|
|
captured = capsys.readouterr()
|
|
assert "critical error!" in captured.out
|
|
|
|
def test_force_shown_when_not_streaming(self, capsys):
|
|
"""force=True works normally when not streaming (no regression)."""
|
|
agent = self._make_agent_with_stream(stream_active=False)
|
|
agent._vprint("normal message", force=True)
|
|
captured = capsys.readouterr()
|
|
assert "normal message" in captured.out
|
|
|
|
def test_error_messages_use_force_in_run_agent(self):
|
|
"""Verify that critical error _vprint calls in run_agent.py
|
|
include force=True."""
|
|
with open("run_agent.py", "r") as f:
|
|
source = f.read()
|
|
|
|
tree = ast.parse(source)
|
|
|
|
forced_error_count = 0
|
|
unforced_error_count = 0
|
|
|
|
for node in ast.walk(tree):
|
|
if not isinstance(node, ast.Call):
|
|
continue
|
|
func = node.func
|
|
if not (isinstance(func, ast.Attribute) and func.attr == "_vprint"):
|
|
continue
|
|
has_fatal = False
|
|
for arg in node.args:
|
|
if isinstance(arg, ast.JoinedStr):
|
|
for val in arg.values:
|
|
if isinstance(val, ast.Constant) and isinstance(val.value, str):
|
|
if "\u274c" in val.value:
|
|
has_fatal = True
|
|
break
|
|
|
|
if not has_fatal:
|
|
continue
|
|
|
|
has_force = any(
|
|
kw.arg == "force"
|
|
and isinstance(kw.value, ast.Constant)
|
|
and kw.value.value is True
|
|
for kw in node.keywords
|
|
)
|
|
|
|
if has_force:
|
|
forced_error_count += 1
|
|
else:
|
|
unforced_error_count += 1
|
|
|
|
assert forced_error_count > 0, \
|
|
"Expected at least one _vprint with force=True for error messages"
|
|
assert unforced_error_count == 0, \
|
|
f"Found {unforced_error_count} critical error _vprint calls without force=True"
|
|
|
|
|
|
# ============================================================================
|
|
# Bug fix regression tests
|
|
# ============================================================================
|
|
|
|
class TestEdgeTTSLazyImport:
|
|
"""Bug #3: _generate_edge_tts must use lazy import, not bare module name."""
|
|
|
|
def test_generate_edge_tts_calls_lazy_import(self):
|
|
"""AST check: _generate_edge_tts must call _import_edge_tts(), not
|
|
reference bare 'edge_tts' module name."""
|
|
import ast as _ast
|
|
|
|
with open("tools/tts_tool.py") as f:
|
|
tree = _ast.parse(f.read())
|
|
|
|
for node in _ast.walk(tree):
|
|
if isinstance(node, _ast.AsyncFunctionDef) and node.name == "_generate_edge_tts":
|
|
# Collect all Name references (bare identifiers)
|
|
bare_refs = [
|
|
n.id for n in _ast.walk(node)
|
|
if isinstance(n, _ast.Name) and n.id == "edge_tts"
|
|
]
|
|
assert bare_refs == [], (
|
|
f"_generate_edge_tts uses bare 'edge_tts' name — "
|
|
f"should use _import_edge_tts() lazy helper"
|
|
)
|
|
|
|
# Must have a call to _import_edge_tts
|
|
lazy_calls = [
|
|
n for n in _ast.walk(node)
|
|
if isinstance(n, _ast.Call)
|
|
and isinstance(n.func, _ast.Name)
|
|
and n.func.id == "_import_edge_tts"
|
|
]
|
|
assert len(lazy_calls) >= 1, (
|
|
"_generate_edge_tts must call _import_edge_tts()"
|
|
)
|
|
break
|
|
else:
|
|
pytest.fail("_generate_edge_tts not found in tts_tool.py")
|
|
|
|
|
|
class TestStreamingTTSOutputStreamCleanup:
|
|
"""Bug #7: output_stream must be closed in finally block."""
|
|
|
|
def test_output_stream_closed_in_finally(self):
|
|
"""AST check: stream_tts_to_speaker's finally block must close
|
|
output_stream even on exception."""
|
|
import ast as _ast
|
|
|
|
with open("tools/tts_tool.py") as f:
|
|
tree = _ast.parse(f.read())
|
|
|
|
for node in _ast.walk(tree):
|
|
if isinstance(node, _ast.FunctionDef) and node.name == "stream_tts_to_speaker":
|
|
# Find the outermost try that has a finally with tts_done_event.set()
|
|
for child in _ast.walk(node):
|
|
if isinstance(child, _ast.Try) and child.finalbody:
|
|
finally_text = "\n".join(
|
|
_ast.dump(n) for n in child.finalbody
|
|
)
|
|
if "tts_done_event" in finally_text:
|
|
assert "output_stream" in finally_text, (
|
|
"finally block must close output_stream"
|
|
)
|
|
return
|
|
pytest.fail("No finally block with tts_done_event found")
|
|
|
|
|
|
class TestCtrlCResetsContinuousMode:
|
|
"""Bug #4: Ctrl+C cancel must reset _voice_continuous."""
|
|
|
|
def test_ctrl_c_handler_resets_voice_continuous(self):
|
|
"""Source check: Ctrl+C voice cancel block must set
|
|
_voice_continuous = False."""
|
|
with open("cli.py") as f:
|
|
source = f.read()
|
|
|
|
# Find the Ctrl+C handler's voice cancel block
|
|
lines = source.split("\n")
|
|
in_cancel_block = False
|
|
found_continuous_reset = False
|
|
for i, line in enumerate(lines):
|
|
if "Cancel active voice recording" in line:
|
|
in_cancel_block = True
|
|
if in_cancel_block:
|
|
if "_voice_continuous = False" in line:
|
|
found_continuous_reset = True
|
|
break
|
|
# Block ends at next comment section or return
|
|
if "return" in line and in_cancel_block:
|
|
break
|
|
|
|
assert found_continuous_reset, (
|
|
"Ctrl+C voice cancel block must set _voice_continuous = False"
|
|
)
|
|
|
|
|
|
class TestDisableVoiceModeStopsTTS:
|
|
"""Bug #5: _disable_voice_mode must stop active TTS playback."""
|
|
|
|
def test_disable_voice_mode_calls_stop_playback(self):
|
|
"""Source check: _disable_voice_mode must call stop_playback()."""
|
|
with open("cli.py") as f:
|
|
source = f.read()
|
|
|
|
# Extract _disable_voice_mode method body
|
|
lines = source.split("\n")
|
|
in_method = False
|
|
method_lines = []
|
|
for line in lines:
|
|
if "def _disable_voice_mode" in line:
|
|
in_method = True
|
|
elif in_method:
|
|
if line.strip() and not line.startswith(" ") and not line.startswith("\t"):
|
|
break
|
|
if line.strip().startswith("def "):
|
|
break
|
|
method_lines.append(line)
|
|
|
|
method_body = "\n".join(method_lines)
|
|
assert "stop_playback" in method_body, (
|
|
"_disable_voice_mode must call stop_playback()"
|
|
)
|
|
assert "_voice_tts_done.set()" in method_body, (
|
|
"_disable_voice_mode must set _voice_tts_done"
|
|
)
|
|
|
|
|
|
class TestVoiceStatusUsesConfigKey:
|
|
"""Bug #8: _show_voice_status must read record key from config."""
|
|
|
|
def test_show_voice_status_not_hardcoded(self):
|
|
"""Source check: _show_voice_status must not hardcode Ctrl+B."""
|
|
with open("cli.py") as f:
|
|
source = f.read()
|
|
|
|
lines = source.split("\n")
|
|
in_method = False
|
|
for line in lines:
|
|
if "def _show_voice_status" in line:
|
|
in_method = True
|
|
elif in_method and line.strip().startswith("def "):
|
|
break
|
|
elif in_method:
|
|
assert 'Record key: Ctrl+B"' not in line, (
|
|
"_show_voice_status hardcodes 'Ctrl+B' — "
|
|
"should read from config"
|
|
)
|
|
|
|
def test_show_voice_status_reads_config(self):
|
|
"""Source check: _show_voice_status must use load_config()."""
|
|
with open("cli.py") as f:
|
|
source = f.read()
|
|
|
|
lines = source.split("\n")
|
|
in_method = False
|
|
method_lines = []
|
|
for line in lines:
|
|
if "def _show_voice_status" in line:
|
|
in_method = True
|
|
elif in_method and line.strip().startswith("def "):
|
|
break
|
|
elif in_method:
|
|
method_lines.append(line)
|
|
|
|
method_body = "\n".join(method_lines)
|
|
assert "load_config" in method_body or "record_key" in method_body, (
|
|
"_show_voice_status should read record_key from config"
|
|
)
|
|
|
|
|
|
class TestChatTTSCleanupOnException:
|
|
"""Bug #2: chat() must clean up streaming TTS resources on exception."""
|
|
|
|
def test_chat_has_finally_for_tts_cleanup(self):
|
|
"""AST check: chat() method must have a finally block that cleans up
|
|
text_queue, stop_event, and tts_thread."""
|
|
import ast as _ast
|
|
|
|
with open("cli.py") as f:
|
|
tree = _ast.parse(f.read())
|
|
|
|
for node in _ast.walk(tree):
|
|
if isinstance(node, _ast.FunctionDef) and node.name == "chat":
|
|
# Find Try nodes with finally blocks
|
|
for child in _ast.walk(node):
|
|
if isinstance(child, _ast.Try) and child.finalbody:
|
|
finally_text = "\n".join(
|
|
_ast.dump(n) for n in child.finalbody
|
|
)
|
|
if "text_queue" in finally_text:
|
|
assert "stop_event" in finally_text, (
|
|
"finally must also handle stop_event"
|
|
)
|
|
assert "tts_thread" in finally_text, (
|
|
"finally must also handle tts_thread"
|
|
)
|
|
return
|
|
pytest.fail(
|
|
"chat() must have a finally block cleaning up "
|
|
"text_queue/stop_event/tts_thread"
|
|
)
|
|
|
|
|
|
class TestBrowserToolSignalHandlerRemoved:
|
|
"""browser_tool.py must NOT register SIGINT/SIGTERM handlers that call
|
|
sys.exit() — this conflicts with prompt_toolkit's event loop and causes
|
|
the process to become unkillable during voice mode."""
|
|
|
|
def test_no_signal_handler_registration(self):
|
|
"""Source check: browser_tool.py must not call signal.signal()
|
|
for SIGINT or SIGTERM."""
|
|
with open("tools/browser_tool.py") as f:
|
|
source = f.read()
|
|
|
|
lines = source.split("\n")
|
|
for i, line in enumerate(lines, 1):
|
|
stripped = line.strip()
|
|
# Skip comments
|
|
if stripped.startswith("#"):
|
|
continue
|
|
assert "signal.signal(signal.SIGINT" not in stripped, (
|
|
f"browser_tool.py:{i} registers SIGINT handler — "
|
|
f"use atexit instead to avoid prompt_toolkit conflicts"
|
|
)
|
|
assert "signal.signal(signal.SIGTERM" not in stripped, (
|
|
f"browser_tool.py:{i} registers SIGTERM handler — "
|
|
f"use atexit instead to avoid prompt_toolkit conflicts"
|
|
)
|