From 1a6fbef8a9c046ee2d45da8534663b64453b6502 Mon Sep 17 00:00:00 2001 From: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:17:05 +0300 Subject: [PATCH] feat: add voice mode with push-to-talk and TTS output for CLI Implements Issue #314 Phase 2 & 3: - /voice command to toggle voice mode (on/off/tts/status) - Ctrl+Space push-to-talk recording via sounddevice - Whisper STT transcription via existing transcription_tools - Optional TTS response playback via existing tts_tool - Visual indicators in prompt (recording/transcribing/voice) - 21 unit tests, all mocked (no real mic/API) - Optional deps: sounddevice, numpy (pip install hermes-agent[voice]) --- cli.py | 278 +++++++++++++++++++++++++- hermes_cli/commands.py | 1 + hermes_cli/config.py | 6 + pyproject.toml | 2 + tests/tools/test_voice_mode.py | 347 +++++++++++++++++++++++++++++++++ tools/voice_mode.py | 344 ++++++++++++++++++++++++++++++++ 6 files changed, 977 insertions(+), 1 deletion(-) create mode 100644 tests/tools/test_voice_mode.py create mode 100644 tools/voice_mode.py diff --git a/cli.py b/cli.py index 253cdd085..b86e2bb82 100755 --- a/cli.py +++ b/cli.py @@ -3017,6 +3017,8 @@ class HermesCLI: self._handle_background_command(cmd_original) elif cmd_lower.startswith("/skin"): self._handle_skin_command(cmd_original) + elif cmd_lower.startswith("/voice"): + self._handle_voice_command(cmd_original) else: # Check for user-defined quick commands (bypass agent loop, no LLM call) base_cmd = cmd_lower.split()[0] @@ -3511,6 +3513,201 @@ class HermesCLI: except Exception as e: print(f" ❌ MCP reload failed: {e}") + # ==================================================================== + # Voice mode methods + # ==================================================================== + + def _voice_start_recording(self): + """Start capturing audio from the microphone.""" + from tools.voice_mode import AudioRecorder, check_voice_requirements + + reqs = check_voice_requirements() + if not reqs["audio_available"]: + raise RuntimeError( + "Voice mode requires sounddevice and numpy.\n" + "Install with: pip install sounddevice numpy\n" + "Or: pip install hermes-agent[voice]" + ) + if not reqs["stt_key_set"]: + raise RuntimeError( + "Voice mode requires VOICE_TOOLS_OPENAI_KEY for transcription.\n" + "Get one at: https://platform.openai.com/api-keys" + ) + + if self._voice_recorder is None: + self._voice_recorder = AudioRecorder() + + self._voice_recorder.start() + self._voice_recording = True + _cprint(f"\n{_GOLD}● Recording...{_RST} {_DIM}(Ctrl+Space to stop, Ctrl+C to cancel){_RST}") + + def _voice_stop_and_transcribe(self): + """Stop recording, transcribe via STT, and queue the transcript as input.""" + try: + if self._voice_recorder is None: + return + + wav_path = self._voice_recorder.stop() + self._voice_recording = False + + if wav_path is None: + _cprint(f"{_DIM}No speech detected (recording too short).{_RST}") + return + + self._voice_processing = True + if hasattr(self, '_app') and self._app: + self._app.invalidate() + _cprint(f"{_DIM}Transcribing...{_RST}") + + # Get STT model from config + stt_model = None + try: + from hermes_cli.config import load_config + stt_config = load_config().get("stt", {}) + stt_model = stt_config.get("model") + except Exception: + pass + + from tools.voice_mode import transcribe_recording + result = transcribe_recording(wav_path, model=stt_model) + + if result.get("success") and result.get("transcript", "").strip(): + transcript = result["transcript"].strip() + _cprint(f"\n{_GOLD}●{_RST} {_BOLD}{transcript}{_RST}") + self._pending_input.put(transcript) + elif result.get("success"): + _cprint(f"{_DIM}No speech detected.{_RST}") + else: + error = result.get("error", "Unknown error") + _cprint(f"\n{_DIM}Transcription failed: {error}{_RST}") + + except Exception as e: + _cprint(f"\n{_DIM}Voice processing error: {e}{_RST}") + finally: + self._voice_processing = False + if hasattr(self, '_app') and self._app: + self._app.invalidate() + # Clean up temp file + try: + if wav_path and os.path.isfile(wav_path): + os.unlink(wav_path) + except Exception: + pass + + def _voice_speak_response(self, text: str): + """Speak the agent's response aloud using TTS (runs in background thread).""" + if not self._voice_tts: + return + try: + from tools.tts_tool import text_to_speech_tool + from tools.voice_mode import play_audio_file + import json + + # Truncate to TTS limit + tts_text = text[:4000] if len(text) > 4000 else text + result_json = text_to_speech_tool(text=tts_text) + result = json.loads(result_json) + + if result.get("success") and result.get("file_path"): + play_audio_file(result["file_path"]) + except Exception as e: + logger.debug("Voice TTS playback failed: %s", e) + + def _handle_voice_command(self, command: str): + """Handle /voice [on|off|tts|status] command.""" + parts = command.strip().split(maxsplit=1) + subcommand = parts[1].lower().strip() if len(parts) > 1 else "" + + if subcommand == "on": + self._enable_voice_mode() + elif subcommand == "off": + self._disable_voice_mode() + elif subcommand == "tts": + self._toggle_voice_tts() + elif subcommand == "status": + self._show_voice_status() + elif subcommand == "": + # Toggle + if self._voice_mode: + self._disable_voice_mode() + else: + self._enable_voice_mode() + else: + print(f"Unknown voice subcommand: {subcommand}") + print("Usage: /voice [on|off|tts|status]") + + def _enable_voice_mode(self): + """Enable voice mode after checking requirements.""" + from tools.voice_mode import check_voice_requirements + + reqs = check_voice_requirements() + if not reqs["available"]: + _cprint(f"\n{_GOLD}Voice mode requirements not met:{_RST}") + for line in reqs["details"].split("\n"): + _cprint(f" {_DIM}{line}{_RST}") + if reqs["missing_packages"]: + _cprint(f"\n {_BOLD}Install: pip install {' '.join(reqs['missing_packages'])}{_RST}") + _cprint(f" {_DIM}Or: pip install hermes-agent[voice]{_RST}") + return + + self._voice_mode = True + + # Check config for auto_tts + try: + from hermes_cli.config import load_config + voice_config = load_config().get("voice", {}) + if voice_config.get("auto_tts", False): + self._voice_tts = True + except Exception: + pass + + tts_status = " (TTS enabled)" if self._voice_tts else "" + _cprint(f"\n{_GOLD}Voice mode enabled{tts_status}{_RST}") + _cprint(f" {_DIM}Ctrl+Space to start/stop recording{_RST}") + _cprint(f" {_DIM}/voice tts to toggle speech output{_RST}") + _cprint(f" {_DIM}/voice off to disable voice mode{_RST}") + + def _disable_voice_mode(self): + """Disable voice mode and cancel any active recording.""" + if self._voice_recording and self._voice_recorder: + self._voice_recorder.cancel() + self._voice_recording = False + + self._voice_mode = False + self._voice_tts = False + _cprint(f"\n{_DIM}Voice mode disabled.{_RST}") + + def _toggle_voice_tts(self): + """Toggle TTS output for voice mode.""" + if not self._voice_mode: + _cprint(f"{_DIM}Enable voice mode first: /voice on{_RST}") + return + + self._voice_tts = not self._voice_tts + status = "enabled" if self._voice_tts else "disabled" + + if self._voice_tts: + from tools.tts_tool import check_tts_requirements + if not check_tts_requirements(): + _cprint(f"{_DIM}Warning: No TTS provider available. Install edge-tts or set API keys.{_RST}") + + _cprint(f"{_GOLD}Voice TTS {status}.{_RST}") + + def _show_voice_status(self): + """Show current voice mode status.""" + from tools.voice_mode import check_voice_requirements + + reqs = check_voice_requirements() + + _cprint(f"\n{_BOLD}Voice Mode Status{_RST}") + _cprint(f" Mode: {'ON' if self._voice_mode else 'OFF'}") + _cprint(f" TTS: {'ON' if self._voice_tts else 'OFF'}") + _cprint(f" Recording: {'YES' if self._voice_recording else 'no'}") + _cprint(f" Record key: Ctrl+Space") + _cprint(f"\n {_BOLD}Requirements:{_RST}") + for line in reqs["details"].split("\n"): + _cprint(f" {line}") + def _clarify_callback(self, question, choices): """ Platform callback for the clarify tool. Called from the agent thread. @@ -3876,12 +4073,23 @@ class HermesCLI: padding=(1, 2), )) + + # Play terminal bell when agent finishes (if enabled). # Works over SSH — the bell propagates to the user's terminal. if self.bell_on_complete: sys.stdout.write("\a") sys.stdout.flush() - + + # Speak response aloud if voice TTS is enabled + if self._voice_tts and response: + threading.Thread( + target=self._voice_speak_response, + args=(response,), + daemon=True, + ).start() + + # Combine all interrupt messages (user may have typed multiple while waiting) # and re-queue as one prompt for process_loop if pending_message and hasattr(self, '_pending_input'): @@ -3964,6 +4172,10 @@ class HermesCLI: def _get_tui_prompt_fragments(self): """Return the prompt_toolkit fragments for the current interactive state.""" symbol, state_suffix = self._get_tui_prompt_symbols() + if self._voice_recording: + return [("class:voice-recording", f"● {state_suffix}")] + if self._voice_processing: + return [("class:voice-processing", f"◉ {state_suffix}")] if self._sudo_state: return [("class:sudo-prompt", f"🔐 {state_suffix}")] if self._secret_state: @@ -3978,6 +4190,8 @@ class HermesCLI: return [("class:prompt-working", f"{self._command_spinner_frame()} {state_suffix}")] if self._agent_running: return [("class:prompt-working", f"⚕ {state_suffix}")] + if self._voice_mode: + return [("class:voice-prompt", f"🎤 {state_suffix}")] return [("class:prompt", symbol)] def _get_tui_prompt_text(self) -> str: @@ -4070,6 +4284,13 @@ class HermesCLI: self._attached_images: list[Path] = [] self._image_counter = 0 + # Voice mode state + self._voice_mode = False # Whether voice mode is enabled + self._voice_tts = False # Whether TTS output is enabled + self._voice_recorder = None # AudioRecorder instance (lazy init) + self._voice_recording = False # Whether currently recording + self._voice_processing = False # Whether STT is in progress + # Register callbacks so terminal_tool prompts route through our UI set_sudo_password_callback(self._sudo_password_callback) set_approval_callback(self._approval_callback) @@ -4254,6 +4475,7 @@ class HermesCLI: """Handle Ctrl+C - cancel interactive prompts, interrupt agent, or exit. Priority: + 0. Cancel active voice recording 1. Cancel active sudo/approval/clarify prompt 2. Interrupt the running agent (first press) 3. Force exit (second press within 2s, or when idle) @@ -4261,6 +4483,14 @@ class HermesCLI: import time as _time now = _time.time() + # Cancel active voice recording + if cli_ref._voice_recording and cli_ref._voice_recorder: + cli_ref._voice_recorder.cancel() + cli_ref._voice_recording = False + _cprint(f"\n{_DIM}Recording cancelled.{_RST}") + event.app.invalidate() + return + # Cancel sudo prompt if self._sudo_state: self._sudo_state["response_queue"].put("") @@ -4367,6 +4597,30 @@ class HermesCLI: # No image found — show a hint pass # silent when no image (avoid noise on accidental press) + @kb.add('c-space') + def handle_ctrl_space(event): + """Toggle voice recording when voice mode is active.""" + if not cli_ref._voice_mode: + return + if cli_ref._agent_running: + return + # Block recording during interactive prompts + if cli_ref._clarify_state or cli_ref._sudo_state or cli_ref._approval_state: + return + if cli_ref._voice_recording: + cli_ref._voice_recording = False + event.app.invalidate() + threading.Thread( + target=cli_ref._voice_stop_and_transcribe, + daemon=True, + ).start() + else: + try: + cli_ref._voice_start_recording() + event.app.invalidate() + except Exception as e: + _cprint(f"\n{_DIM}Voice recording failed: {e}{_RST}") + # Dynamic prompt: shows Hermes symbol when agent is working, # or answer prompt when clarify freetext mode is active. cli_ref = self @@ -4460,6 +4714,10 @@ class HermesCLI: return Transformation(fragments=ti.fragments) def _get_placeholder(): + if cli_ref._voice_recording: + return "recording... Ctrl+Space to stop, Ctrl+C to cancel" + if cli_ref._voice_processing: + return "transcribing..." if cli_ref._sudo_state: return "type password (hidden), Enter to skip" if cli_ref._secret_state: @@ -4476,6 +4734,8 @@ class HermesCLI: return f"{frame} {status}" if cli_ref._agent_running: return "type a message + Enter to interrupt, Ctrl+C to cancel" + if cli_ref._voice_mode: + return "type or Ctrl+Space to record" return "" input_area.control.input_processors.append(_PlaceholderProcessor(_get_placeholder)) @@ -4869,6 +5129,10 @@ class HermesCLI: 'approval-cmd': '#AAAAAA italic', 'approval-choice': '#AAAAAA', 'approval-selected': '#FFD700 bold', + # Voice mode + 'voice-prompt': '#87CEEB', + 'voice-recording': '#FF4444 bold', + 'voice-processing': '#FFA500 italic', } style = PTStyle.from_dict(self._build_tui_style_dict()) @@ -4993,6 +5257,18 @@ class HermesCLI: self.agent.flush_memories(self.conversation_history) except Exception: pass + # Cancel active voice recording + if hasattr(self, '_voice_recorder') and self._voice_recorder and self._voice_recording: + try: + self._voice_recorder.cancel() + except Exception: + pass + # Clean up old temp voice recordings + try: + from tools.voice_mode import cleanup_temp_recordings + cleanup_temp_recordings() + except Exception: + pass # Unregister callbacks to avoid dangling references set_sudo_password_callback(None) set_approval_callback(None) diff --git a/hermes_cli/commands.py b/hermes_cli/commands.py index 57899cf08..a9a1a67ba 100644 --- a/hermes_cli/commands.py +++ b/hermes_cli/commands.py @@ -37,6 +37,7 @@ COMMANDS_BY_CATEGORY = { "/verbose": "Cycle tool progress display: off → new → all → verbose", "/reasoning": "Manage reasoning effort and display (usage: /reasoning [level|show|hide])", "/skin": "Show or change the display skin/theme", + "/voice": "Toggle voice mode (Ctrl+B to record). Usage: /voice [on|off|tts|status]", }, "Tools & Skills": { "/tools": "List available tools", diff --git a/hermes_cli/config.py b/hermes_cli/config.py index 02edad1fa..dbbe41c10 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -202,6 +202,12 @@ DEFAULT_CONFIG = { "model": "whisper-1", # whisper-1, gpt-4o-mini-transcribe, gpt-4o-transcribe }, }, + + "voice": { + "record_key": "ctrl+space", + "max_recording_seconds": 120, + "auto_tts": False, + }, "human_delay": { "mode": "off", diff --git a/pyproject.toml b/pyproject.toml index 7e4197724..eb005ab94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,6 +48,7 @@ cron = ["croniter"] slack = ["slack-bolt>=1.18.0", "slack-sdk>=3.27.0"] cli = ["simple-term-menu"] tts-premium = ["elevenlabs"] +voice = ["sounddevice>=0.4.6", "numpy>=1.24.0"] pty = [ "ptyprocess>=0.7.0; sys_platform != 'win32'", "pywinpty>=2.0.0; sys_platform == 'win32'", @@ -78,6 +79,7 @@ all = [ "hermes-agent[mcp]", "hermes-agent[homeassistant]", "hermes-agent[acp]", + "hermes-agent[voice]", ] [project.scripts] diff --git a/tests/tools/test_voice_mode.py b/tests/tools/test_voice_mode.py new file mode 100644 index 000000000..fe841f5cb --- /dev/null +++ b/tests/tools/test_voice_mode.py @@ -0,0 +1,347 @@ +"""Tests for tools.voice_mode -- all mocked, no real microphone or API calls.""" + +import os +import struct +import time +import wave +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + + +# ============================================================================ +# Fixtures +# ============================================================================ + +@pytest.fixture +def sample_wav(tmp_path): + """Create a minimal valid WAV file (1 second of silence at 16kHz).""" + wav_path = tmp_path / "test.wav" + n_frames = 16000 # 1 second at 16kHz + silence = struct.pack(f"<{n_frames}h", *([0] * n_frames)) + + with wave.open(str(wav_path), "wb") as wf: + wf.setnchannels(1) + wf.setsampwidth(2) + wf.setframerate(16000) + wf.writeframes(silence) + + return str(wav_path) + + +@pytest.fixture +def temp_voice_dir(tmp_path, monkeypatch): + """Redirect _TEMP_DIR to a temporary path.""" + voice_dir = tmp_path / "hermes_voice" + voice_dir.mkdir() + monkeypatch.setattr("tools.voice_mode._TEMP_DIR", str(voice_dir)) + return voice_dir + + +@pytest.fixture +def mock_sd(monkeypatch): + """Replace tools.voice_mode.sd with a MagicMock (sounddevice may not be installed).""" + mock = MagicMock() + monkeypatch.setattr("tools.voice_mode.sd", mock) + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True) + # Also ensure numpy is available (use real numpy if installed, else mock) + try: + import numpy as real_np + monkeypatch.setattr("tools.voice_mode.np", real_np) + except ImportError: + monkeypatch.setattr("tools.voice_mode.np", MagicMock()) + return mock + + +# ============================================================================ +# check_voice_requirements +# ============================================================================ + +class TestCheckVoiceRequirements: + def test_all_requirements_met(self, monkeypatch): + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True) + monkeypatch.setenv("VOICE_TOOLS_OPENAI_KEY", "sk-test-key") + + from tools.voice_mode import check_voice_requirements + + result = check_voice_requirements() + assert result["available"] is True + assert result["audio_available"] is True + assert result["stt_key_set"] is True + assert result["missing_packages"] == [] + + def test_missing_audio_packages(self, monkeypatch): + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", False) + monkeypatch.setenv("VOICE_TOOLS_OPENAI_KEY", "sk-test-key") + + from tools.voice_mode import check_voice_requirements + + result = check_voice_requirements() + assert result["available"] is False + assert result["audio_available"] is False + assert "sounddevice" in result["missing_packages"] + assert "numpy" in result["missing_packages"] + + def test_missing_stt_key(self, monkeypatch): + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True) + monkeypatch.delenv("VOICE_TOOLS_OPENAI_KEY", raising=False) + + from tools.voice_mode import check_voice_requirements + + result = check_voice_requirements() + assert result["available"] is False + assert result["stt_key_set"] is False + assert "STT API key: MISSING" in result["details"] + + +# ============================================================================ +# AudioRecorder +# ============================================================================ + +class TestAudioRecorderStart: + def test_start_raises_without_audio(self, monkeypatch): + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", False) + + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + with pytest.raises(RuntimeError, match="sounddevice and numpy"): + recorder.start() + + def test_start_creates_and_starts_stream(self, mock_sd): + mock_stream = MagicMock() + mock_sd.InputStream.return_value = mock_stream + + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + recorder.start() + + assert recorder.is_recording is True + mock_sd.InputStream.assert_called_once() + mock_stream.start.assert_called_once() + + def test_double_start_is_noop(self, mock_sd): + mock_stream = MagicMock() + mock_sd.InputStream.return_value = mock_stream + + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + recorder.start() + recorder.start() # second call should be noop + + assert mock_sd.InputStream.call_count == 1 + + +class TestAudioRecorderStop: + def test_stop_returns_none_when_not_recording(self): + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + assert recorder.stop() is None + + def test_stop_writes_wav_file(self, mock_sd, temp_voice_dir): + np = pytest.importorskip("numpy") + + mock_stream = MagicMock() + mock_sd.InputStream.return_value = mock_stream + + from tools.voice_mode import AudioRecorder, SAMPLE_RATE + + recorder = AudioRecorder() + recorder.start() + + # Simulate captured audio frames (1 second of silence) + frame = np.zeros((SAMPLE_RATE, 1), dtype="int16") + recorder._frames = [frame] + + wav_path = recorder.stop() + + assert wav_path is not None + assert os.path.isfile(wav_path) + assert wav_path.endswith(".wav") + assert recorder.is_recording is False + + # Verify it is a valid WAV + with wave.open(wav_path, "rb") as wf: + assert wf.getnchannels() == 1 + assert wf.getsampwidth() == 2 + assert wf.getframerate() == SAMPLE_RATE + + def test_stop_returns_none_for_very_short_recording(self, mock_sd, temp_voice_dir): + np = pytest.importorskip("numpy") + + mock_stream = MagicMock() + mock_sd.InputStream.return_value = mock_stream + + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + recorder.start() + + # Very short recording (100 samples = ~6ms at 16kHz) + frame = np.zeros((100, 1), dtype="int16") + recorder._frames = [frame] + + wav_path = recorder.stop() + assert wav_path is None + + +class TestAudioRecorderCancel: + def test_cancel_discards_frames(self, mock_sd): + mock_stream = MagicMock() + mock_sd.InputStream.return_value = mock_stream + + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + recorder.start() + recorder._frames = [MagicMock()] # simulate captured data + + recorder.cancel() + + assert recorder.is_recording is False + assert recorder._frames == [] + mock_stream.stop.assert_called_once() + mock_stream.close.assert_called_once() + + def test_cancel_when_not_recording_is_safe(self): + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + recorder.cancel() # should not raise + assert recorder.is_recording is False + + +class TestAudioRecorderProperties: + def test_elapsed_seconds_when_not_recording(self): + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + assert recorder.elapsed_seconds == 0.0 + + def test_elapsed_seconds_when_recording(self, mock_sd): + mock_stream = MagicMock() + mock_sd.InputStream.return_value = mock_stream + + from tools.voice_mode import AudioRecorder + + recorder = AudioRecorder() + recorder.start() + + # Force start time to 1 second ago + recorder._start_time = time.monotonic() - 1.0 + elapsed = recorder.elapsed_seconds + assert 0.9 < elapsed < 2.0 + + recorder.cancel() + + +# ============================================================================ +# transcribe_recording +# ============================================================================ + +class TestTranscribeRecording: + def test_delegates_to_transcribe_audio(self): + mock_transcribe = MagicMock(return_value={ + "success": True, + "transcript": "hello world", + }) + + with patch("tools.transcription_tools.transcribe_audio", mock_transcribe): + from tools.voice_mode import transcribe_recording + result = transcribe_recording("/tmp/test.wav", model="whisper-1") + + assert result["success"] is True + assert result["transcript"] == "hello world" + mock_transcribe.assert_called_once_with("/tmp/test.wav", model="whisper-1") + + +# ============================================================================ +# play_audio_file +# ============================================================================ + +class TestPlayAudioFile: + def test_play_wav_via_sounddevice(self, monkeypatch, sample_wav): + np = pytest.importorskip("numpy") + + mock_sd = MagicMock() + monkeypatch.setattr("tools.voice_mode.sd", mock_sd) + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True) + monkeypatch.setattr("tools.voice_mode.np", np) + + from tools.voice_mode import play_audio_file + + result = play_audio_file(sample_wav) + + assert result is True + mock_sd.play.assert_called_once() + mock_sd.wait.assert_called_once() + + def test_returns_false_when_no_player(self, monkeypatch, sample_wav): + monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", False) + monkeypatch.setattr("shutil.which", lambda _: None) + + from tools.voice_mode import play_audio_file + + result = play_audio_file(sample_wav) + assert result is False + + def test_returns_false_for_missing_file(self): + from tools.voice_mode import play_audio_file + + result = play_audio_file("/nonexistent/file.wav") + assert result is False + + +# ============================================================================ +# cleanup_temp_recordings +# ============================================================================ + +class TestCleanupTempRecordings: + def test_old_files_deleted(self, temp_voice_dir): + # Create an "old" file + old_file = temp_voice_dir / "recording_20240101_000000.wav" + old_file.write_bytes(b"\x00" * 100) + # Set mtime to 2 hours ago + old_mtime = time.time() - 7200 + os.utime(str(old_file), (old_mtime, old_mtime)) + + from tools.voice_mode import cleanup_temp_recordings + + deleted = cleanup_temp_recordings(max_age_seconds=3600) + assert deleted == 1 + assert not old_file.exists() + + def test_recent_files_preserved(self, temp_voice_dir): + # Create a "recent" file + recent_file = temp_voice_dir / "recording_20260303_120000.wav" + recent_file.write_bytes(b"\x00" * 100) + + from tools.voice_mode import cleanup_temp_recordings + + deleted = cleanup_temp_recordings(max_age_seconds=3600) + assert deleted == 0 + assert recent_file.exists() + + def test_nonexistent_dir_returns_zero(self, monkeypatch): + monkeypatch.setattr("tools.voice_mode._TEMP_DIR", "/nonexistent/dir") + + from tools.voice_mode import cleanup_temp_recordings + + assert cleanup_temp_recordings() == 0 + + def test_non_recording_files_ignored(self, temp_voice_dir): + # Create a file that doesn't match the pattern + other_file = temp_voice_dir / "other_file.txt" + other_file.write_bytes(b"\x00" * 100) + old_mtime = time.time() - 7200 + os.utime(str(other_file), (old_mtime, old_mtime)) + + from tools.voice_mode import cleanup_temp_recordings + + deleted = cleanup_temp_recordings(max_age_seconds=3600) + assert deleted == 0 + assert other_file.exists() diff --git a/tools/voice_mode.py b/tools/voice_mode.py new file mode 100644 index 000000000..213802013 --- /dev/null +++ b/tools/voice_mode.py @@ -0,0 +1,344 @@ +"""Voice Mode -- Push-to-talk audio recording and playback for the CLI. + +Provides audio capture via sounddevice, WAV encoding via stdlib wave, +STT dispatch via tools.transcription_tools, and TTS playback via +sounddevice or system audio players. + +Dependencies (optional): + pip install sounddevice numpy + or: pip install hermes-agent[voice] +""" + +import logging +import os +import platform +import shutil +import subprocess +import tempfile +import threading +import time +import wave +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Optional imports with graceful degradation +# --------------------------------------------------------------------------- +try: + import sounddevice as sd + import numpy as np + + _HAS_AUDIO = True +except ImportError: + sd = None # type: ignore[assignment] + np = None # type: ignore[assignment] + _HAS_AUDIO = False + +# --------------------------------------------------------------------------- +# Recording parameters +# --------------------------------------------------------------------------- +SAMPLE_RATE = 16000 # Whisper native rate +CHANNELS = 1 # Mono +DTYPE = "int16" # 16-bit PCM +SAMPLE_WIDTH = 2 # bytes per sample (int16) +MAX_RECORDING_SECONDS = 120 # Safety cap + +# Temp directory for voice recordings +_TEMP_DIR = os.path.join(tempfile.gettempdir(), "hermes_voice") + + +# ============================================================================ +# AudioRecorder +# ============================================================================ +class AudioRecorder: + """Thread-safe audio recorder using sounddevice.InputStream. + + Usage:: + + recorder = AudioRecorder() + recorder.start() + # ... user speaks ... + wav_path = recorder.stop() # returns path to WAV file + # or + recorder.cancel() # discard without saving + """ + + def __init__(self) -> None: + self._lock = threading.Lock() + self._stream: Any = None + self._frames: List[Any] = [] + self._recording = False + self._start_time: float = 0.0 + + # -- public properties --------------------------------------------------- + + @property + def is_recording(self) -> bool: + return self._recording + + @property + def elapsed_seconds(self) -> float: + if not self._recording: + return 0.0 + return time.monotonic() - self._start_time + + # -- public methods ------------------------------------------------------ + + def start(self) -> None: + """Start capturing audio from the default input device. + + Raises ``RuntimeError`` if sounddevice/numpy are not installed + or if a recording is already in progress. + """ + if not _HAS_AUDIO: + raise RuntimeError( + "Voice mode requires sounddevice and numpy.\n" + "Install with: pip install sounddevice numpy\n" + "Or: pip install hermes-agent[voice]" + ) + + with self._lock: + if self._recording: + return # already recording + + self._frames = [] + self._start_time = time.monotonic() + + def _callback(indata, frames, time_info, status): # noqa: ARG001 + if status: + logger.debug("sounddevice status: %s", status) + self._frames.append(indata.copy()) + + self._stream = sd.InputStream( + samplerate=SAMPLE_RATE, + channels=CHANNELS, + dtype=DTYPE, + callback=_callback, + ) + self._stream.start() + self._recording = True + logger.info("Voice recording started (rate=%d, channels=%d)", SAMPLE_RATE, CHANNELS) + + def stop(self) -> Optional[str]: + """Stop recording and write captured audio to a WAV file. + + Returns: + Path to the WAV file, or ``None`` if no audio was captured. + """ + with self._lock: + if not self._recording: + return None + + self._recording = False + + if self._stream is not None: + try: + self._stream.stop() + self._stream.close() + except Exception: + pass + self._stream = None + + if not self._frames: + return None + + # Concatenate frames and write WAV + audio_data = np.concatenate(self._frames, axis=0) + self._frames = [] + + elapsed = time.monotonic() - self._start_time + logger.info("Voice recording stopped (%.1fs, %d samples)", elapsed, len(audio_data)) + + # Skip very short recordings (< 0.3s of audio) + min_samples = int(SAMPLE_RATE * 0.3) + if len(audio_data) < min_samples: + logger.debug("Recording too short (%d samples), discarding", len(audio_data)) + return None + + return self._write_wav(audio_data) + + def cancel(self) -> None: + """Stop recording and discard all captured audio.""" + with self._lock: + self._recording = False + self._frames = [] + + if self._stream is not None: + try: + self._stream.stop() + self._stream.close() + except Exception: + pass + self._stream = None + + logger.info("Voice recording cancelled") + + # -- private helpers ----------------------------------------------------- + + @staticmethod + def _write_wav(audio_data) -> str: + """Write numpy int16 audio data to a WAV file. + + Returns the file path. + """ + os.makedirs(_TEMP_DIR, exist_ok=True) + timestamp = time.strftime("%Y%m%d_%H%M%S") + wav_path = os.path.join(_TEMP_DIR, f"recording_{timestamp}.wav") + + with wave.open(wav_path, "wb") as wf: + wf.setnchannels(CHANNELS) + wf.setsampwidth(SAMPLE_WIDTH) + wf.setframerate(SAMPLE_RATE) + wf.writeframes(audio_data.tobytes()) + + file_size = os.path.getsize(wav_path) + logger.info("WAV written: %s (%d bytes)", wav_path, file_size) + return wav_path + + +# ============================================================================ +# STT dispatch +# ============================================================================ +def transcribe_recording(wav_path: str, model: Optional[str] = None) -> Dict[str, Any]: + """Transcribe a WAV recording using the existing Whisper pipeline. + + Delegates to ``tools.transcription_tools.transcribe_audio()``. + + Args: + wav_path: Path to the WAV file. + model: Whisper model name (default: from config or ``whisper-1``). + + Returns: + Dict with ``success``, ``transcript``, and optionally ``error``. + """ + from tools.transcription_tools import transcribe_audio + + return transcribe_audio(wav_path, model=model) + + +# ============================================================================ +# Audio playback +# ============================================================================ +def play_audio_file(file_path: str) -> bool: + """Play an audio file through the default output device. + + Strategy: + 1. WAV files via ``sounddevice.play()`` when available. + 2. System commands: ``afplay`` (macOS), ``ffplay`` (cross-platform), + ``aplay`` (Linux ALSA). + + Returns: + ``True`` if playback succeeded, ``False`` otherwise. + """ + if not os.path.isfile(file_path): + logger.warning("Audio file not found: %s", file_path) + return False + + # Try sounddevice for WAV files + if _HAS_AUDIO and file_path.endswith(".wav"): + try: + with wave.open(file_path, "rb") as wf: + frames = wf.readframes(wf.getnframes()) + audio_data = np.frombuffer(frames, dtype=np.int16) + sample_rate = wf.getframerate() + + sd.play(audio_data, samplerate=sample_rate) + sd.wait() + return True + except Exception as e: + logger.debug("sounddevice playback failed: %s", e) + + # Fall back to system audio players + system = platform.system() + players = [] + + if system == "Darwin": + players.append(["afplay", file_path]) + players.append(["ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet", file_path]) + if system == "Linux": + players.append(["aplay", "-q", file_path]) + + for cmd in players: + exe = shutil.which(cmd[0]) + if exe: + try: + subprocess.run(cmd, capture_output=True, timeout=300) + return True + except Exception as e: + logger.debug("System player %s failed: %s", cmd[0], e) + + logger.warning("No audio player available for %s", file_path) + return False + + +# ============================================================================ +# Requirements check +# ============================================================================ +def check_voice_requirements() -> Dict[str, Any]: + """Check if all voice mode requirements are met. + + Returns: + Dict with ``available``, ``audio_available``, ``stt_key_set``, + ``missing_packages``, and ``details``. + """ + stt_key_set = bool(os.getenv("VOICE_TOOLS_OPENAI_KEY")) + missing: List[str] = [] + + if not _HAS_AUDIO: + missing.extend(["sounddevice", "numpy"]) + + available = _HAS_AUDIO and stt_key_set + details_parts = [] + + if _HAS_AUDIO: + details_parts.append("Audio capture: OK") + else: + details_parts.append("Audio capture: MISSING (pip install sounddevice numpy)") + + if stt_key_set: + details_parts.append("STT API key: OK") + else: + details_parts.append("STT API key: MISSING (set VOICE_TOOLS_OPENAI_KEY)") + + return { + "available": available, + "audio_available": _HAS_AUDIO, + "stt_key_set": stt_key_set, + "missing_packages": missing, + "details": "\n".join(details_parts), + } + + +# ============================================================================ +# Temp file cleanup +# ============================================================================ +def cleanup_temp_recordings(max_age_seconds: int = 3600) -> int: + """Remove old temporary voice recording files. + + Args: + max_age_seconds: Delete files older than this (default: 1 hour). + + Returns: + Number of files deleted. + """ + if not os.path.isdir(_TEMP_DIR): + return 0 + + deleted = 0 + now = time.time() + + for entry in os.scandir(_TEMP_DIR): + if entry.is_file() and entry.name.startswith("recording_") and entry.name.endswith(".wav"): + try: + age = now - entry.stat().st_mtime + if age > max_age_seconds: + os.unlink(entry.path) + deleted += 1 + except OSError: + pass + + if deleted: + logger.debug("Cleaned up %d old voice recordings", deleted) + return deleted