diff --git a/cli.py b/cli.py index face0e0e..95834959 100755 --- a/cli.py +++ b/cli.py @@ -3830,7 +3830,7 @@ class HermesCLI: _cprint(f" {_DIM}/voice off to disable voice mode{_RST}") def _disable_voice_mode(self): - """Disable voice mode and cancel any active recording.""" + """Disable voice mode, cancel any active recording, and stop TTS.""" with self._voice_lock: if self._voice_recording and self._voice_recorder: self._voice_recorder.cancel() @@ -3839,6 +3839,14 @@ class HermesCLI: self._voice_tts = False self._voice_continuous = False + # Stop any active TTS playback + try: + from tools.voice_mode import stop_playback + stop_playback() + except Exception: + pass + self._voice_tts_done.set() + _cprint(f"\n{_DIM}Voice mode disabled.{_RST}") def _toggle_voice_tts(self): @@ -3868,7 +3876,9 @@ class HermesCLI: _cprint(f" Mode: {'ON' if self._voice_mode else 'OFF'}") _cprint(f" TTS: {'ON' if self._voice_tts else 'OFF'}") _cprint(f" Recording: {'YES' if self._voice_recording else 'no'}") - _cprint(f" Record key: Ctrl+B") + _raw_key = load_config().get("voice", {}).get("record_key", "ctrl+b") + _display_key = _raw_key.replace("ctrl+", "Ctrl+").upper() if "ctrl+" in _raw_key.lower() else _raw_key + _cprint(f" Record key: {_display_key}") _cprint(f"\n {_BOLD}Requirements:{_RST}") for line in reqs["details"].split("\n"): _cprint(f" {line}") @@ -4368,6 +4378,20 @@ class HermesCLI: except Exception as e: print(f"Error: {e}") return None + finally: + # Ensure streaming TTS resources are cleaned up even on error. + # Normal path sends the sentinel at line ~3568; this is a safety + # net for exception paths that skip it. Duplicate sentinels are + # harmless — stream_tts_to_speaker exits on the first None. + if text_queue is not None: + try: + text_queue.put_nowait(None) + except Exception: + pass + if stop_event is not None: + stop_event.set() + if tts_thread is not None and tts_thread.is_alive(): + tts_thread.join(timeout=5) def _print_exit_summary(self): """Print session resume info on exit, similar to Claude Code.""" @@ -4763,6 +4787,7 @@ class HermesCLI: if cli_ref._voice_recording and cli_ref._voice_recorder: cli_ref._voice_recorder.cancel() cli_ref._voice_recording = False + cli_ref._voice_continuous = False _cprint(f"\n{_DIM}Recording cancelled.{_RST}") event.app.invalidate() return diff --git a/tests/tools/test_voice_cli_integration.py b/tests/tools/test_voice_cli_integration.py index e42c3fc7..38a947b6 100644 --- a/tests/tools/test_voice_cli_integration.py +++ b/tests/tools/test_voice_cli_integration.py @@ -469,3 +469,208 @@ class TestVprintForceParameter: "Expected at least one _vprint with force=True for error messages" assert unforced_error_count == 0, \ f"Found {unforced_error_count} critical error _vprint calls without force=True" + + +# ============================================================================ +# Bug fix regression tests +# ============================================================================ + +class TestEdgeTTSLazyImport: + """Bug #3: _generate_edge_tts must use lazy import, not bare module name.""" + + def test_generate_edge_tts_calls_lazy_import(self): + """AST check: _generate_edge_tts must call _import_edge_tts(), not + reference bare 'edge_tts' module name.""" + import ast as _ast + + with open("tools/tts_tool.py") as f: + tree = _ast.parse(f.read()) + + for node in _ast.walk(tree): + if isinstance(node, _ast.AsyncFunctionDef) and node.name == "_generate_edge_tts": + # Collect all Name references (bare identifiers) + bare_refs = [ + n.id for n in _ast.walk(node) + if isinstance(n, _ast.Name) and n.id == "edge_tts" + ] + assert bare_refs == [], ( + f"_generate_edge_tts uses bare 'edge_tts' name — " + f"should use _import_edge_tts() lazy helper" + ) + + # Must have a call to _import_edge_tts + lazy_calls = [ + n for n in _ast.walk(node) + if isinstance(n, _ast.Call) + and isinstance(n.func, _ast.Name) + and n.func.id == "_import_edge_tts" + ] + assert len(lazy_calls) >= 1, ( + "_generate_edge_tts must call _import_edge_tts()" + ) + break + else: + pytest.fail("_generate_edge_tts not found in tts_tool.py") + + +class TestStreamingTTSOutputStreamCleanup: + """Bug #7: output_stream must be closed in finally block.""" + + def test_output_stream_closed_in_finally(self): + """AST check: stream_tts_to_speaker's finally block must close + output_stream even on exception.""" + import ast as _ast + + with open("tools/tts_tool.py") as f: + tree = _ast.parse(f.read()) + + for node in _ast.walk(tree): + if isinstance(node, _ast.FunctionDef) and node.name == "stream_tts_to_speaker": + # Find the outermost try that has a finally with tts_done_event.set() + for child in _ast.walk(node): + if isinstance(child, _ast.Try) and child.finalbody: + finally_text = "\n".join( + _ast.dump(n) for n in child.finalbody + ) + if "tts_done_event" in finally_text: + assert "output_stream" in finally_text, ( + "finally block must close output_stream" + ) + return + pytest.fail("No finally block with tts_done_event found") + + +class TestCtrlCResetsContinuousMode: + """Bug #4: Ctrl+C cancel must reset _voice_continuous.""" + + def test_ctrl_c_handler_resets_voice_continuous(self): + """Source check: Ctrl+C voice cancel block must set + _voice_continuous = False.""" + with open("cli.py") as f: + source = f.read() + + # Find the Ctrl+C handler's voice cancel block + lines = source.split("\n") + in_cancel_block = False + found_continuous_reset = False + for i, line in enumerate(lines): + if "Cancel active voice recording" in line: + in_cancel_block = True + if in_cancel_block: + if "_voice_continuous = False" in line: + found_continuous_reset = True + break + # Block ends at next comment section or return + if "return" in line and in_cancel_block: + break + + assert found_continuous_reset, ( + "Ctrl+C voice cancel block must set _voice_continuous = False" + ) + + +class TestDisableVoiceModeStopsTTS: + """Bug #5: _disable_voice_mode must stop active TTS playback.""" + + def test_disable_voice_mode_calls_stop_playback(self): + """Source check: _disable_voice_mode must call stop_playback().""" + with open("cli.py") as f: + source = f.read() + + # Extract _disable_voice_mode method body + lines = source.split("\n") + in_method = False + method_lines = [] + for line in lines: + if "def _disable_voice_mode" in line: + in_method = True + elif in_method: + if line.strip() and not line.startswith(" ") and not line.startswith("\t"): + break + if line.strip().startswith("def "): + break + method_lines.append(line) + + method_body = "\n".join(method_lines) + assert "stop_playback" in method_body, ( + "_disable_voice_mode must call stop_playback()" + ) + assert "_voice_tts_done.set()" in method_body, ( + "_disable_voice_mode must set _voice_tts_done" + ) + + +class TestVoiceStatusUsesConfigKey: + """Bug #8: _show_voice_status must read record key from config.""" + + def test_show_voice_status_not_hardcoded(self): + """Source check: _show_voice_status must not hardcode Ctrl+B.""" + with open("cli.py") as f: + source = f.read() + + lines = source.split("\n") + in_method = False + for line in lines: + if "def _show_voice_status" in line: + in_method = True + elif in_method and line.strip().startswith("def "): + break + elif in_method: + assert 'Record key: Ctrl+B"' not in line, ( + "_show_voice_status hardcodes 'Ctrl+B' — " + "should read from config" + ) + + def test_show_voice_status_reads_config(self): + """Source check: _show_voice_status must use load_config().""" + with open("cli.py") as f: + source = f.read() + + lines = source.split("\n") + in_method = False + method_lines = [] + for line in lines: + if "def _show_voice_status" in line: + in_method = True + elif in_method and line.strip().startswith("def "): + break + elif in_method: + method_lines.append(line) + + method_body = "\n".join(method_lines) + assert "load_config" in method_body or "record_key" in method_body, ( + "_show_voice_status should read record_key from config" + ) + + +class TestChatTTSCleanupOnException: + """Bug #2: chat() must clean up streaming TTS resources on exception.""" + + def test_chat_has_finally_for_tts_cleanup(self): + """AST check: chat() method must have a finally block that cleans up + text_queue, stop_event, and tts_thread.""" + import ast as _ast + + with open("cli.py") as f: + tree = _ast.parse(f.read()) + + for node in _ast.walk(tree): + if isinstance(node, _ast.FunctionDef) and node.name == "chat": + # Find Try nodes with finally blocks + for child in _ast.walk(node): + if isinstance(child, _ast.Try) and child.finalbody: + finally_text = "\n".join( + _ast.dump(n) for n in child.finalbody + ) + if "text_queue" in finally_text: + assert "stop_event" in finally_text, ( + "finally must also handle stop_event" + ) + assert "tts_thread" in finally_text, ( + "finally must also handle tts_thread" + ) + return + pytest.fail( + "chat() must have a finally block cleaning up " + "text_queue/stop_event/tts_thread" + ) diff --git a/tools/tts_tool.py b/tools/tts_tool.py index 1a1642e0..286bb14b 100644 --- a/tools/tts_tool.py +++ b/tools/tts_tool.py @@ -161,10 +161,11 @@ async def _generate_edge_tts(text: str, output_path: str, tts_config: Dict[str, Returns: Path to the saved audio file. """ + _edge_tts = _import_edge_tts() edge_config = tts_config.get("edge", {}) voice = edge_config.get("voice", DEFAULT_EDGE_VOICE) - communicate = edge_tts.Communicate(text, voice) + communicate = _edge_tts.Communicate(text, voice) await communicate.save(output_path) return output_path @@ -667,17 +668,18 @@ def stream_tts_to_speaker( except queue.Empty: break - # Close the audio output stream + # output_stream is closed in the finally block below + + except Exception as exc: + logger.warning("Streaming TTS pipeline error: %s", exc) + finally: + # Always close the audio output stream to avoid locking the device if output_stream is not None: try: output_stream.stop() output_stream.close() except Exception: pass - - except Exception as exc: - logger.warning("Streaming TTS pipeline error: %s", exc) - finally: tts_done_event.set()