From c23928d089a35ce6b6ea72785a85aee9d301ffca Mon Sep 17 00:00:00 2001 From: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com> Date: Tue, 3 Mar 2026 18:00:31 +0300 Subject: [PATCH] fix: improve voice mode robustness and add integration tests - Show TTS errors to user instead of silently logging - Improve markdown stripping: code blocks, URLs, links, horizontal rules - Fix stripping order: process markdown links before removing URLs - Add threading.Lock for voice state variables (cross-thread safety) - Add 14 CLI integration tests (markdown stripping, command parsing, thread safety) - Total: 47 voice-related tests --- cli.py | 64 +++++---- tests/tools/test_voice_cli_integration.py | 151 ++++++++++++++++++++++ 2 files changed, 192 insertions(+), 23 deletions(-) create mode 100644 tests/tools/test_voice_cli_integration.py diff --git a/cli.py b/cli.py index 66bf1b3e1..3b3032c40 100755 --- a/cli.py +++ b/cli.py @@ -3540,7 +3540,8 @@ class HermesCLI: self._voice_recorder = AudioRecorder() self._voice_recorder.start() - self._voice_recording = True + with self._voice_lock: + self._voice_recording = True _cprint(f"\n{_GOLD}● Recording...{_RST} {_DIM}(Ctrl+R to stop, Ctrl+C to cancel){_RST}") def _voice_stop_and_transcribe(self): @@ -3550,13 +3551,15 @@ class HermesCLI: return wav_path = self._voice_recorder.stop() - self._voice_recording = False + with self._voice_lock: + self._voice_recording = False if wav_path is None: _cprint(f"{_DIM}No speech detected (recording too short).{_RST}") return - self._voice_processing = True + with self._voice_lock: + self._voice_processing = True if hasattr(self, '_app') and self._app: self._app.invalidate() _cprint(f"{_DIM}Transcribing...{_RST}") @@ -3585,7 +3588,8 @@ class HermesCLI: except Exception as e: _cprint(f"\n{_DIM}Voice processing error: {e}{_RST}") finally: - self._voice_processing = False + with self._voice_lock: + self._voice_processing = False if hasattr(self, '_app') and self._app: self._app.invalidate() # Clean up temp file @@ -3605,13 +3609,21 @@ class HermesCLI: import json import re - # Strip markdown formatting for cleaner TTS + # Strip markdown and non-speech content for cleaner TTS tts_text = text[:4000] if len(text) > 4000 else text + tts_text = re.sub(r'```[\s\S]*?```', ' ', tts_text) # fenced code blocks + tts_text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', tts_text) # [text](url) -> text + tts_text = re.sub(r'https?://\S+', '', tts_text) # URLs tts_text = re.sub(r'\*\*(.+?)\*\*', r'\1', tts_text) # bold tts_text = re.sub(r'\*(.+?)\*', r'\1', tts_text) # italic - tts_text = re.sub(r'`(.+?)`', r'\1', tts_text) # code + tts_text = re.sub(r'`(.+?)`', r'\1', tts_text) # inline code tts_text = re.sub(r'^#+\s*', '', tts_text, flags=re.MULTILINE) # headers tts_text = re.sub(r'^\s*[-*]\s+', '', tts_text, flags=re.MULTILINE) # list items + tts_text = re.sub(r'---+', '', tts_text) # horizontal rules + tts_text = re.sub(r'\n{3,}', '\n\n', tts_text) # excessive newlines + tts_text = tts_text.strip() + if not tts_text: + return # Use MP3 output for CLI playback (afplay doesn't handle OGG well). # The TTS tool may auto-convert MP3->OGG, but the original MP3 remains. @@ -3635,7 +3647,8 @@ class HermesCLI: except OSError: pass except Exception as e: - logger.debug("Voice TTS playback failed: %s", e) + logger.warning("Voice TTS playback failed: %s", e) + _cprint(f"{_DIM}TTS playback failed: {e}{_RST}") def _handle_voice_command(self, command: str): """Handle /voice [on|off|tts|status] command.""" @@ -3674,14 +3687,16 @@ class HermesCLI: _cprint(f" {_DIM}Or: pip install hermes-agent[voice]{_RST}") return - self._voice_mode = True + with self._voice_lock: + self._voice_mode = True # Check config for auto_tts try: from hermes_cli.config import load_config voice_config = load_config().get("voice", {}) if voice_config.get("auto_tts", False): - self._voice_tts = True + with self._voice_lock: + self._voice_tts = True except Exception: pass @@ -3693,12 +3708,12 @@ class HermesCLI: def _disable_voice_mode(self): """Disable voice mode and cancel any active recording.""" - if self._voice_recording and self._voice_recorder: - self._voice_recorder.cancel() - self._voice_recording = False - - self._voice_mode = False - self._voice_tts = False + with self._voice_lock: + if self._voice_recording and self._voice_recorder: + self._voice_recorder.cancel() + self._voice_recording = False + self._voice_mode = False + self._voice_tts = False _cprint(f"\n{_DIM}Voice mode disabled.{_RST}") def _toggle_voice_tts(self): @@ -3707,7 +3722,8 @@ class HermesCLI: _cprint(f"{_DIM}Enable voice mode first: /voice on{_RST}") return - self._voice_tts = not self._voice_tts + with self._voice_lock: + self._voice_tts = not self._voice_tts status = "enabled" if self._voice_tts else "disabled" if self._voice_tts: @@ -4308,7 +4324,8 @@ class HermesCLI: self._attached_images: list[Path] = [] self._image_counter = 0 - # Voice mode state + # Voice mode state (protected by _voice_lock for cross-thread access) + self._voice_lock = threading.Lock() self._voice_mode = False # Whether voice mode is enabled self._voice_tts = False # Whether TTS output is enabled self._voice_recorder = None # AudioRecorder instance (lazy init) @@ -4508,12 +4525,13 @@ class HermesCLI: now = _time.time() # Cancel active voice recording - if cli_ref._voice_recording and cli_ref._voice_recorder: - cli_ref._voice_recorder.cancel() - cli_ref._voice_recording = False - _cprint(f"\n{_DIM}Recording cancelled.{_RST}") - event.app.invalidate() - return + with cli_ref._voice_lock: + if cli_ref._voice_recording and cli_ref._voice_recorder: + cli_ref._voice_recorder.cancel() + cli_ref._voice_recording = False + _cprint(f"\n{_DIM}Recording cancelled.{_RST}") + event.app.invalidate() + return # Cancel sudo prompt if self._sudo_state: diff --git a/tests/tools/test_voice_cli_integration.py b/tests/tools/test_voice_cli_integration.py new file mode 100644 index 000000000..7bb78e66c --- /dev/null +++ b/tests/tools/test_voice_cli_integration.py @@ -0,0 +1,151 @@ +"""Tests for CLI voice mode integration -- command parsing, markdown stripping, state management.""" + +import re +import threading + +import pytest + + +# ============================================================================ +# Markdown stripping (same logic as _voice_speak_response) +# ============================================================================ + +def _strip_markdown_for_tts(text: str) -> str: + """Replicate the markdown stripping logic from cli._voice_speak_response.""" + tts_text = text[:4000] if len(text) > 4000 else text + tts_text = re.sub(r'```[\s\S]*?```', ' ', tts_text) # fenced code blocks + tts_text = re.sub(r'\[([^\]]+)\]\([^)]+\)', r'\1', tts_text) # [text](url) -> text + tts_text = re.sub(r'https?://\S+', '', tts_text) # URLs + tts_text = re.sub(r'\*\*(.+?)\*\*', r'\1', tts_text) # bold + tts_text = re.sub(r'\*(.+?)\*', r'\1', tts_text) # italic + tts_text = re.sub(r'`(.+?)`', r'\1', tts_text) # inline code + tts_text = re.sub(r'^#+\s*', '', tts_text, flags=re.MULTILINE) # headers + tts_text = re.sub(r'^\s*[-*]\s+', '', tts_text, flags=re.MULTILINE) # list items + tts_text = re.sub(r'---+', '', tts_text) # horizontal rules + tts_text = re.sub(r'\n{3,}', '\n\n', tts_text) # excessive newlines + return tts_text.strip() + + +class TestMarkdownStripping: + def test_strips_bold(self): + assert _strip_markdown_for_tts("This is **bold** text") == "This is bold text" + + def test_strips_italic(self): + assert _strip_markdown_for_tts("This is *italic* text") == "This is italic text" + + def test_strips_inline_code(self): + assert _strip_markdown_for_tts("Run `pip install foo`") == "Run pip install foo" + + def test_strips_fenced_code_blocks(self): + text = "Here is code:\n```python\nprint('hello')\n```\nDone." + result = _strip_markdown_for_tts(text) + assert "print" not in result + assert "Done." in result + + def test_strips_headers(self): + assert _strip_markdown_for_tts("## Summary\nSome text") == "Summary\nSome text" + + def test_strips_list_markers(self): + text = "- item one\n- item two\n* item three" + result = _strip_markdown_for_tts(text) + assert "item one" in result + assert "- " not in result + assert "* " not in result + + def test_strips_urls(self): + text = "Visit https://example.com for details" + result = _strip_markdown_for_tts(text) + assert "https://" not in result + assert "Visit" in result + + def test_strips_markdown_links(self): + text = "See [the docs](https://example.com/docs) for info" + result = _strip_markdown_for_tts(text) + assert "the docs" in result + assert "https://" not in result + assert "[" not in result + + def test_strips_horizontal_rules(self): + text = "Part one\n---\nPart two" + result = _strip_markdown_for_tts(text) + assert "---" not in result + assert "Part one" in result + assert "Part two" in result + + def test_empty_after_stripping_returns_empty(self): + text = "```python\nprint('hello')\n```" + result = _strip_markdown_for_tts(text) + assert result == "" + + def test_truncates_long_text(self): + text = "a" * 5000 + result = _strip_markdown_for_tts(text) + assert len(result) <= 4000 + + def test_complex_response(self): + text = ( + "## Answer\n\n" + "Here's how to do it:\n\n" + "```python\ndef hello():\n print('hi')\n```\n\n" + "Run it with `python main.py`. " + "See [docs](https://example.com) for more.\n\n" + "- Step one\n- Step two\n\n" + "---\n\n" + "**Good luck!**" + ) + result = _strip_markdown_for_tts(text) + assert "```" not in result + assert "https://" not in result + assert "**" not in result + assert "---" not in result + assert "Answer" in result + assert "Good luck!" in result + assert "docs" in result + + +# ============================================================================ +# Voice command parsing +# ============================================================================ + +class TestVoiceCommandParsing: + """Test _handle_voice_command logic without full CLI setup.""" + + def test_parse_subcommands(self): + """Verify subcommand extraction from /voice commands.""" + test_cases = [ + ("/voice on", "on"), + ("/voice off", "off"), + ("/voice tts", "tts"), + ("/voice status", "status"), + ("/voice", ""), + ("/voice ON ", "on"), + ] + for command, expected in test_cases: + parts = command.strip().split(maxsplit=1) + subcommand = parts[1].lower().strip() if len(parts) > 1 else "" + assert subcommand == expected, f"Failed for {command!r}: got {subcommand!r}" + + +# ============================================================================ +# Voice state thread safety +# ============================================================================ + +class TestVoiceStateLock: + def test_lock_protects_state(self): + """Verify that concurrent state changes don't corrupt state.""" + lock = threading.Lock() + state = {"recording": False, "count": 0} + + def toggle_many(n): + for _ in range(n): + with lock: + state["recording"] = not state["recording"] + state["count"] += 1 + + threads = [threading.Thread(target=toggle_many, args=(1000,)) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert state["count"] == 4000