feat: add voice mode with push-to-talk and TTS output for CLI

Implements Issue #314 Phase 2 & 3:
- /voice command to toggle voice mode (on/off/tts/status)
- Ctrl+Space push-to-talk recording via sounddevice
- Whisper STT transcription via existing transcription_tools
- Optional TTS response playback via existing tts_tool
- Visual indicators in prompt (recording/transcribing/voice)
- 21 unit tests, all mocked (no real mic/API)
- Optional deps: sounddevice, numpy (pip install hermes-agent[voice])
This commit is contained in:
0xbyt4
2026-03-03 16:17:05 +03:00
parent cf3dceafe1
commit 1a6fbef8a9
6 changed files with 977 additions and 1 deletions

276
cli.py
View File

@@ -3017,6 +3017,8 @@ class HermesCLI:
self._handle_background_command(cmd_original)
elif cmd_lower.startswith("/skin"):
self._handle_skin_command(cmd_original)
elif cmd_lower.startswith("/voice"):
self._handle_voice_command(cmd_original)
else:
# Check for user-defined quick commands (bypass agent loop, no LLM call)
base_cmd = cmd_lower.split()[0]
@@ -3511,6 +3513,201 @@ class HermesCLI:
except Exception as e:
print(f" ❌ MCP reload failed: {e}")
# ====================================================================
# Voice mode methods
# ====================================================================
def _voice_start_recording(self):
"""Start capturing audio from the microphone."""
from tools.voice_mode import AudioRecorder, check_voice_requirements
reqs = check_voice_requirements()
if not reqs["audio_available"]:
raise RuntimeError(
"Voice mode requires sounddevice and numpy.\n"
"Install with: pip install sounddevice numpy\n"
"Or: pip install hermes-agent[voice]"
)
if not reqs["stt_key_set"]:
raise RuntimeError(
"Voice mode requires VOICE_TOOLS_OPENAI_KEY for transcription.\n"
"Get one at: https://platform.openai.com/api-keys"
)
if self._voice_recorder is None:
self._voice_recorder = AudioRecorder()
self._voice_recorder.start()
self._voice_recording = True
_cprint(f"\n{_GOLD}● Recording...{_RST} {_DIM}(Ctrl+Space to stop, Ctrl+C to cancel){_RST}")
def _voice_stop_and_transcribe(self):
"""Stop recording, transcribe via STT, and queue the transcript as input."""
try:
if self._voice_recorder is None:
return
wav_path = self._voice_recorder.stop()
self._voice_recording = False
if wav_path is None:
_cprint(f"{_DIM}No speech detected (recording too short).{_RST}")
return
self._voice_processing = True
if hasattr(self, '_app') and self._app:
self._app.invalidate()
_cprint(f"{_DIM}Transcribing...{_RST}")
# Get STT model from config
stt_model = None
try:
from hermes_cli.config import load_config
stt_config = load_config().get("stt", {})
stt_model = stt_config.get("model")
except Exception:
pass
from tools.voice_mode import transcribe_recording
result = transcribe_recording(wav_path, model=stt_model)
if result.get("success") and result.get("transcript", "").strip():
transcript = result["transcript"].strip()
_cprint(f"\n{_GOLD}{_RST} {_BOLD}{transcript}{_RST}")
self._pending_input.put(transcript)
elif result.get("success"):
_cprint(f"{_DIM}No speech detected.{_RST}")
else:
error = result.get("error", "Unknown error")
_cprint(f"\n{_DIM}Transcription failed: {error}{_RST}")
except Exception as e:
_cprint(f"\n{_DIM}Voice processing error: {e}{_RST}")
finally:
self._voice_processing = False
if hasattr(self, '_app') and self._app:
self._app.invalidate()
# Clean up temp file
try:
if wav_path and os.path.isfile(wav_path):
os.unlink(wav_path)
except Exception:
pass
def _voice_speak_response(self, text: str):
"""Speak the agent's response aloud using TTS (runs in background thread)."""
if not self._voice_tts:
return
try:
from tools.tts_tool import text_to_speech_tool
from tools.voice_mode import play_audio_file
import json
# Truncate to TTS limit
tts_text = text[:4000] if len(text) > 4000 else text
result_json = text_to_speech_tool(text=tts_text)
result = json.loads(result_json)
if result.get("success") and result.get("file_path"):
play_audio_file(result["file_path"])
except Exception as e:
logger.debug("Voice TTS playback failed: %s", e)
def _handle_voice_command(self, command: str):
"""Handle /voice [on|off|tts|status] command."""
parts = command.strip().split(maxsplit=1)
subcommand = parts[1].lower().strip() if len(parts) > 1 else ""
if subcommand == "on":
self._enable_voice_mode()
elif subcommand == "off":
self._disable_voice_mode()
elif subcommand == "tts":
self._toggle_voice_tts()
elif subcommand == "status":
self._show_voice_status()
elif subcommand == "":
# Toggle
if self._voice_mode:
self._disable_voice_mode()
else:
self._enable_voice_mode()
else:
print(f"Unknown voice subcommand: {subcommand}")
print("Usage: /voice [on|off|tts|status]")
def _enable_voice_mode(self):
"""Enable voice mode after checking requirements."""
from tools.voice_mode import check_voice_requirements
reqs = check_voice_requirements()
if not reqs["available"]:
_cprint(f"\n{_GOLD}Voice mode requirements not met:{_RST}")
for line in reqs["details"].split("\n"):
_cprint(f" {_DIM}{line}{_RST}")
if reqs["missing_packages"]:
_cprint(f"\n {_BOLD}Install: pip install {' '.join(reqs['missing_packages'])}{_RST}")
_cprint(f" {_DIM}Or: pip install hermes-agent[voice]{_RST}")
return
self._voice_mode = True
# Check config for auto_tts
try:
from hermes_cli.config import load_config
voice_config = load_config().get("voice", {})
if voice_config.get("auto_tts", False):
self._voice_tts = True
except Exception:
pass
tts_status = " (TTS enabled)" if self._voice_tts else ""
_cprint(f"\n{_GOLD}Voice mode enabled{tts_status}{_RST}")
_cprint(f" {_DIM}Ctrl+Space to start/stop recording{_RST}")
_cprint(f" {_DIM}/voice tts to toggle speech output{_RST}")
_cprint(f" {_DIM}/voice off to disable voice mode{_RST}")
def _disable_voice_mode(self):
"""Disable voice mode and cancel any active recording."""
if self._voice_recording and self._voice_recorder:
self._voice_recorder.cancel()
self._voice_recording = False
self._voice_mode = False
self._voice_tts = False
_cprint(f"\n{_DIM}Voice mode disabled.{_RST}")
def _toggle_voice_tts(self):
"""Toggle TTS output for voice mode."""
if not self._voice_mode:
_cprint(f"{_DIM}Enable voice mode first: /voice on{_RST}")
return
self._voice_tts = not self._voice_tts
status = "enabled" if self._voice_tts else "disabled"
if self._voice_tts:
from tools.tts_tool import check_tts_requirements
if not check_tts_requirements():
_cprint(f"{_DIM}Warning: No TTS provider available. Install edge-tts or set API keys.{_RST}")
_cprint(f"{_GOLD}Voice TTS {status}.{_RST}")
def _show_voice_status(self):
"""Show current voice mode status."""
from tools.voice_mode import check_voice_requirements
reqs = check_voice_requirements()
_cprint(f"\n{_BOLD}Voice Mode Status{_RST}")
_cprint(f" Mode: {'ON' if self._voice_mode else 'OFF'}")
_cprint(f" TTS: {'ON' if self._voice_tts else 'OFF'}")
_cprint(f" Recording: {'YES' if self._voice_recording else 'no'}")
_cprint(f" Record key: Ctrl+Space")
_cprint(f"\n {_BOLD}Requirements:{_RST}")
for line in reqs["details"].split("\n"):
_cprint(f" {line}")
def _clarify_callback(self, question, choices):
"""
Platform callback for the clarify tool. Called from the agent thread.
@@ -3876,12 +4073,23 @@ class HermesCLI:
padding=(1, 2),
))
# Play terminal bell when agent finishes (if enabled).
# Works over SSH — the bell propagates to the user's terminal.
if self.bell_on_complete:
sys.stdout.write("\a")
sys.stdout.flush()
# Speak response aloud if voice TTS is enabled
if self._voice_tts and response:
threading.Thread(
target=self._voice_speak_response,
args=(response,),
daemon=True,
).start()
# Combine all interrupt messages (user may have typed multiple while waiting)
# and re-queue as one prompt for process_loop
if pending_message and hasattr(self, '_pending_input'):
@@ -3964,6 +4172,10 @@ class HermesCLI:
def _get_tui_prompt_fragments(self):
"""Return the prompt_toolkit fragments for the current interactive state."""
symbol, state_suffix = self._get_tui_prompt_symbols()
if self._voice_recording:
return [("class:voice-recording", f"{state_suffix}")]
if self._voice_processing:
return [("class:voice-processing", f"{state_suffix}")]
if self._sudo_state:
return [("class:sudo-prompt", f"🔐 {state_suffix}")]
if self._secret_state:
@@ -3978,6 +4190,8 @@ class HermesCLI:
return [("class:prompt-working", f"{self._command_spinner_frame()} {state_suffix}")]
if self._agent_running:
return [("class:prompt-working", f"{state_suffix}")]
if self._voice_mode:
return [("class:voice-prompt", f"🎤 {state_suffix}")]
return [("class:prompt", symbol)]
def _get_tui_prompt_text(self) -> str:
@@ -4070,6 +4284,13 @@ class HermesCLI:
self._attached_images: list[Path] = []
self._image_counter = 0
# Voice mode state
self._voice_mode = False # Whether voice mode is enabled
self._voice_tts = False # Whether TTS output is enabled
self._voice_recorder = None # AudioRecorder instance (lazy init)
self._voice_recording = False # Whether currently recording
self._voice_processing = False # Whether STT is in progress
# Register callbacks so terminal_tool prompts route through our UI
set_sudo_password_callback(self._sudo_password_callback)
set_approval_callback(self._approval_callback)
@@ -4254,6 +4475,7 @@ class HermesCLI:
"""Handle Ctrl+C - cancel interactive prompts, interrupt agent, or exit.
Priority:
0. Cancel active voice recording
1. Cancel active sudo/approval/clarify prompt
2. Interrupt the running agent (first press)
3. Force exit (second press within 2s, or when idle)
@@ -4261,6 +4483,14 @@ class HermesCLI:
import time as _time
now = _time.time()
# Cancel active voice recording
if cli_ref._voice_recording and cli_ref._voice_recorder:
cli_ref._voice_recorder.cancel()
cli_ref._voice_recording = False
_cprint(f"\n{_DIM}Recording cancelled.{_RST}")
event.app.invalidate()
return
# Cancel sudo prompt
if self._sudo_state:
self._sudo_state["response_queue"].put("")
@@ -4367,6 +4597,30 @@ class HermesCLI:
# No image found — show a hint
pass # silent when no image (avoid noise on accidental press)
@kb.add('c-space')
def handle_ctrl_space(event):
"""Toggle voice recording when voice mode is active."""
if not cli_ref._voice_mode:
return
if cli_ref._agent_running:
return
# Block recording during interactive prompts
if cli_ref._clarify_state or cli_ref._sudo_state or cli_ref._approval_state:
return
if cli_ref._voice_recording:
cli_ref._voice_recording = False
event.app.invalidate()
threading.Thread(
target=cli_ref._voice_stop_and_transcribe,
daemon=True,
).start()
else:
try:
cli_ref._voice_start_recording()
event.app.invalidate()
except Exception as e:
_cprint(f"\n{_DIM}Voice recording failed: {e}{_RST}")
# Dynamic prompt: shows Hermes symbol when agent is working,
# or answer prompt when clarify freetext mode is active.
cli_ref = self
@@ -4460,6 +4714,10 @@ class HermesCLI:
return Transformation(fragments=ti.fragments)
def _get_placeholder():
if cli_ref._voice_recording:
return "recording... Ctrl+Space to stop, Ctrl+C to cancel"
if cli_ref._voice_processing:
return "transcribing..."
if cli_ref._sudo_state:
return "type password (hidden), Enter to skip"
if cli_ref._secret_state:
@@ -4476,6 +4734,8 @@ class HermesCLI:
return f"{frame} {status}"
if cli_ref._agent_running:
return "type a message + Enter to interrupt, Ctrl+C to cancel"
if cli_ref._voice_mode:
return "type or Ctrl+Space to record"
return ""
input_area.control.input_processors.append(_PlaceholderProcessor(_get_placeholder))
@@ -4869,6 +5129,10 @@ class HermesCLI:
'approval-cmd': '#AAAAAA italic',
'approval-choice': '#AAAAAA',
'approval-selected': '#FFD700 bold',
# Voice mode
'voice-prompt': '#87CEEB',
'voice-recording': '#FF4444 bold',
'voice-processing': '#FFA500 italic',
}
style = PTStyle.from_dict(self._build_tui_style_dict())
@@ -4993,6 +5257,18 @@ class HermesCLI:
self.agent.flush_memories(self.conversation_history)
except Exception:
pass
# Cancel active voice recording
if hasattr(self, '_voice_recorder') and self._voice_recorder and self._voice_recording:
try:
self._voice_recorder.cancel()
except Exception:
pass
# Clean up old temp voice recordings
try:
from tools.voice_mode import cleanup_temp_recordings
cleanup_temp_recordings()
except Exception:
pass
# Unregister callbacks to avoid dangling references
set_sudo_password_callback(None)
set_approval_callback(None)

View File

@@ -37,6 +37,7 @@ COMMANDS_BY_CATEGORY = {
"/verbose": "Cycle tool progress display: off → new → all → verbose",
"/reasoning": "Manage reasoning effort and display (usage: /reasoning [level|show|hide])",
"/skin": "Show or change the display skin/theme",
"/voice": "Toggle voice mode (Ctrl+B to record). Usage: /voice [on|off|tts|status]",
},
"Tools & Skills": {
"/tools": "List available tools",

View File

@@ -203,6 +203,12 @@ DEFAULT_CONFIG = {
},
},
"voice": {
"record_key": "ctrl+space",
"max_recording_seconds": 120,
"auto_tts": False,
},
"human_delay": {
"mode": "off",
"min_ms": 800,

View File

@@ -48,6 +48,7 @@ cron = ["croniter"]
slack = ["slack-bolt>=1.18.0", "slack-sdk>=3.27.0"]
cli = ["simple-term-menu"]
tts-premium = ["elevenlabs"]
voice = ["sounddevice>=0.4.6", "numpy>=1.24.0"]
pty = [
"ptyprocess>=0.7.0; sys_platform != 'win32'",
"pywinpty>=2.0.0; sys_platform == 'win32'",
@@ -78,6 +79,7 @@ all = [
"hermes-agent[mcp]",
"hermes-agent[homeassistant]",
"hermes-agent[acp]",
"hermes-agent[voice]",
]
[project.scripts]

View File

@@ -0,0 +1,347 @@
"""Tests for tools.voice_mode -- all mocked, no real microphone or API calls."""
import os
import struct
import time
import wave
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ============================================================================
# Fixtures
# ============================================================================
@pytest.fixture
def sample_wav(tmp_path):
"""Create a minimal valid WAV file (1 second of silence at 16kHz)."""
wav_path = tmp_path / "test.wav"
n_frames = 16000 # 1 second at 16kHz
silence = struct.pack(f"<{n_frames}h", *([0] * n_frames))
with wave.open(str(wav_path), "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(16000)
wf.writeframes(silence)
return str(wav_path)
@pytest.fixture
def temp_voice_dir(tmp_path, monkeypatch):
"""Redirect _TEMP_DIR to a temporary path."""
voice_dir = tmp_path / "hermes_voice"
voice_dir.mkdir()
monkeypatch.setattr("tools.voice_mode._TEMP_DIR", str(voice_dir))
return voice_dir
@pytest.fixture
def mock_sd(monkeypatch):
"""Replace tools.voice_mode.sd with a MagicMock (sounddevice may not be installed)."""
mock = MagicMock()
monkeypatch.setattr("tools.voice_mode.sd", mock)
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True)
# Also ensure numpy is available (use real numpy if installed, else mock)
try:
import numpy as real_np
monkeypatch.setattr("tools.voice_mode.np", real_np)
except ImportError:
monkeypatch.setattr("tools.voice_mode.np", MagicMock())
return mock
# ============================================================================
# check_voice_requirements
# ============================================================================
class TestCheckVoiceRequirements:
def test_all_requirements_met(self, monkeypatch):
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True)
monkeypatch.setenv("VOICE_TOOLS_OPENAI_KEY", "sk-test-key")
from tools.voice_mode import check_voice_requirements
result = check_voice_requirements()
assert result["available"] is True
assert result["audio_available"] is True
assert result["stt_key_set"] is True
assert result["missing_packages"] == []
def test_missing_audio_packages(self, monkeypatch):
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", False)
monkeypatch.setenv("VOICE_TOOLS_OPENAI_KEY", "sk-test-key")
from tools.voice_mode import check_voice_requirements
result = check_voice_requirements()
assert result["available"] is False
assert result["audio_available"] is False
assert "sounddevice" in result["missing_packages"]
assert "numpy" in result["missing_packages"]
def test_missing_stt_key(self, monkeypatch):
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True)
monkeypatch.delenv("VOICE_TOOLS_OPENAI_KEY", raising=False)
from tools.voice_mode import check_voice_requirements
result = check_voice_requirements()
assert result["available"] is False
assert result["stt_key_set"] is False
assert "STT API key: MISSING" in result["details"]
# ============================================================================
# AudioRecorder
# ============================================================================
class TestAudioRecorderStart:
def test_start_raises_without_audio(self, monkeypatch):
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", False)
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
with pytest.raises(RuntimeError, match="sounddevice and numpy"):
recorder.start()
def test_start_creates_and_starts_stream(self, mock_sd):
mock_stream = MagicMock()
mock_sd.InputStream.return_value = mock_stream
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
recorder.start()
assert recorder.is_recording is True
mock_sd.InputStream.assert_called_once()
mock_stream.start.assert_called_once()
def test_double_start_is_noop(self, mock_sd):
mock_stream = MagicMock()
mock_sd.InputStream.return_value = mock_stream
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
recorder.start()
recorder.start() # second call should be noop
assert mock_sd.InputStream.call_count == 1
class TestAudioRecorderStop:
def test_stop_returns_none_when_not_recording(self):
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
assert recorder.stop() is None
def test_stop_writes_wav_file(self, mock_sd, temp_voice_dir):
np = pytest.importorskip("numpy")
mock_stream = MagicMock()
mock_sd.InputStream.return_value = mock_stream
from tools.voice_mode import AudioRecorder, SAMPLE_RATE
recorder = AudioRecorder()
recorder.start()
# Simulate captured audio frames (1 second of silence)
frame = np.zeros((SAMPLE_RATE, 1), dtype="int16")
recorder._frames = [frame]
wav_path = recorder.stop()
assert wav_path is not None
assert os.path.isfile(wav_path)
assert wav_path.endswith(".wav")
assert recorder.is_recording is False
# Verify it is a valid WAV
with wave.open(wav_path, "rb") as wf:
assert wf.getnchannels() == 1
assert wf.getsampwidth() == 2
assert wf.getframerate() == SAMPLE_RATE
def test_stop_returns_none_for_very_short_recording(self, mock_sd, temp_voice_dir):
np = pytest.importorskip("numpy")
mock_stream = MagicMock()
mock_sd.InputStream.return_value = mock_stream
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
recorder.start()
# Very short recording (100 samples = ~6ms at 16kHz)
frame = np.zeros((100, 1), dtype="int16")
recorder._frames = [frame]
wav_path = recorder.stop()
assert wav_path is None
class TestAudioRecorderCancel:
def test_cancel_discards_frames(self, mock_sd):
mock_stream = MagicMock()
mock_sd.InputStream.return_value = mock_stream
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
recorder.start()
recorder._frames = [MagicMock()] # simulate captured data
recorder.cancel()
assert recorder.is_recording is False
assert recorder._frames == []
mock_stream.stop.assert_called_once()
mock_stream.close.assert_called_once()
def test_cancel_when_not_recording_is_safe(self):
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
recorder.cancel() # should not raise
assert recorder.is_recording is False
class TestAudioRecorderProperties:
def test_elapsed_seconds_when_not_recording(self):
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
assert recorder.elapsed_seconds == 0.0
def test_elapsed_seconds_when_recording(self, mock_sd):
mock_stream = MagicMock()
mock_sd.InputStream.return_value = mock_stream
from tools.voice_mode import AudioRecorder
recorder = AudioRecorder()
recorder.start()
# Force start time to 1 second ago
recorder._start_time = time.monotonic() - 1.0
elapsed = recorder.elapsed_seconds
assert 0.9 < elapsed < 2.0
recorder.cancel()
# ============================================================================
# transcribe_recording
# ============================================================================
class TestTranscribeRecording:
def test_delegates_to_transcribe_audio(self):
mock_transcribe = MagicMock(return_value={
"success": True,
"transcript": "hello world",
})
with patch("tools.transcription_tools.transcribe_audio", mock_transcribe):
from tools.voice_mode import transcribe_recording
result = transcribe_recording("/tmp/test.wav", model="whisper-1")
assert result["success"] is True
assert result["transcript"] == "hello world"
mock_transcribe.assert_called_once_with("/tmp/test.wav", model="whisper-1")
# ============================================================================
# play_audio_file
# ============================================================================
class TestPlayAudioFile:
def test_play_wav_via_sounddevice(self, monkeypatch, sample_wav):
np = pytest.importorskip("numpy")
mock_sd = MagicMock()
monkeypatch.setattr("tools.voice_mode.sd", mock_sd)
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", True)
monkeypatch.setattr("tools.voice_mode.np", np)
from tools.voice_mode import play_audio_file
result = play_audio_file(sample_wav)
assert result is True
mock_sd.play.assert_called_once()
mock_sd.wait.assert_called_once()
def test_returns_false_when_no_player(self, monkeypatch, sample_wav):
monkeypatch.setattr("tools.voice_mode._HAS_AUDIO", False)
monkeypatch.setattr("shutil.which", lambda _: None)
from tools.voice_mode import play_audio_file
result = play_audio_file(sample_wav)
assert result is False
def test_returns_false_for_missing_file(self):
from tools.voice_mode import play_audio_file
result = play_audio_file("/nonexistent/file.wav")
assert result is False
# ============================================================================
# cleanup_temp_recordings
# ============================================================================
class TestCleanupTempRecordings:
def test_old_files_deleted(self, temp_voice_dir):
# Create an "old" file
old_file = temp_voice_dir / "recording_20240101_000000.wav"
old_file.write_bytes(b"\x00" * 100)
# Set mtime to 2 hours ago
old_mtime = time.time() - 7200
os.utime(str(old_file), (old_mtime, old_mtime))
from tools.voice_mode import cleanup_temp_recordings
deleted = cleanup_temp_recordings(max_age_seconds=3600)
assert deleted == 1
assert not old_file.exists()
def test_recent_files_preserved(self, temp_voice_dir):
# Create a "recent" file
recent_file = temp_voice_dir / "recording_20260303_120000.wav"
recent_file.write_bytes(b"\x00" * 100)
from tools.voice_mode import cleanup_temp_recordings
deleted = cleanup_temp_recordings(max_age_seconds=3600)
assert deleted == 0
assert recent_file.exists()
def test_nonexistent_dir_returns_zero(self, monkeypatch):
monkeypatch.setattr("tools.voice_mode._TEMP_DIR", "/nonexistent/dir")
from tools.voice_mode import cleanup_temp_recordings
assert cleanup_temp_recordings() == 0
def test_non_recording_files_ignored(self, temp_voice_dir):
# Create a file that doesn't match the pattern
other_file = temp_voice_dir / "other_file.txt"
other_file.write_bytes(b"\x00" * 100)
old_mtime = time.time() - 7200
os.utime(str(other_file), (old_mtime, old_mtime))
from tools.voice_mode import cleanup_temp_recordings
deleted = cleanup_temp_recordings(max_age_seconds=3600)
assert deleted == 0
assert other_file.exists()

344
tools/voice_mode.py Normal file
View File

@@ -0,0 +1,344 @@
"""Voice Mode -- Push-to-talk audio recording and playback for the CLI.
Provides audio capture via sounddevice, WAV encoding via stdlib wave,
STT dispatch via tools.transcription_tools, and TTS playback via
sounddevice or system audio players.
Dependencies (optional):
pip install sounddevice numpy
or: pip install hermes-agent[voice]
"""
import logging
import os
import platform
import shutil
import subprocess
import tempfile
import threading
import time
import wave
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional imports with graceful degradation
# ---------------------------------------------------------------------------
try:
import sounddevice as sd
import numpy as np
_HAS_AUDIO = True
except ImportError:
sd = None # type: ignore[assignment]
np = None # type: ignore[assignment]
_HAS_AUDIO = False
# ---------------------------------------------------------------------------
# Recording parameters
# ---------------------------------------------------------------------------
SAMPLE_RATE = 16000 # Whisper native rate
CHANNELS = 1 # Mono
DTYPE = "int16" # 16-bit PCM
SAMPLE_WIDTH = 2 # bytes per sample (int16)
MAX_RECORDING_SECONDS = 120 # Safety cap
# Temp directory for voice recordings
_TEMP_DIR = os.path.join(tempfile.gettempdir(), "hermes_voice")
# ============================================================================
# AudioRecorder
# ============================================================================
class AudioRecorder:
"""Thread-safe audio recorder using sounddevice.InputStream.
Usage::
recorder = AudioRecorder()
recorder.start()
# ... user speaks ...
wav_path = recorder.stop() # returns path to WAV file
# or
recorder.cancel() # discard without saving
"""
def __init__(self) -> None:
self._lock = threading.Lock()
self._stream: Any = None
self._frames: List[Any] = []
self._recording = False
self._start_time: float = 0.0
# -- public properties ---------------------------------------------------
@property
def is_recording(self) -> bool:
return self._recording
@property
def elapsed_seconds(self) -> float:
if not self._recording:
return 0.0
return time.monotonic() - self._start_time
# -- public methods ------------------------------------------------------
def start(self) -> None:
"""Start capturing audio from the default input device.
Raises ``RuntimeError`` if sounddevice/numpy are not installed
or if a recording is already in progress.
"""
if not _HAS_AUDIO:
raise RuntimeError(
"Voice mode requires sounddevice and numpy.\n"
"Install with: pip install sounddevice numpy\n"
"Or: pip install hermes-agent[voice]"
)
with self._lock:
if self._recording:
return # already recording
self._frames = []
self._start_time = time.monotonic()
def _callback(indata, frames, time_info, status): # noqa: ARG001
if status:
logger.debug("sounddevice status: %s", status)
self._frames.append(indata.copy())
self._stream = sd.InputStream(
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype=DTYPE,
callback=_callback,
)
self._stream.start()
self._recording = True
logger.info("Voice recording started (rate=%d, channels=%d)", SAMPLE_RATE, CHANNELS)
def stop(self) -> Optional[str]:
"""Stop recording and write captured audio to a WAV file.
Returns:
Path to the WAV file, or ``None`` if no audio was captured.
"""
with self._lock:
if not self._recording:
return None
self._recording = False
if self._stream is not None:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
if not self._frames:
return None
# Concatenate frames and write WAV
audio_data = np.concatenate(self._frames, axis=0)
self._frames = []
elapsed = time.monotonic() - self._start_time
logger.info("Voice recording stopped (%.1fs, %d samples)", elapsed, len(audio_data))
# Skip very short recordings (< 0.3s of audio)
min_samples = int(SAMPLE_RATE * 0.3)
if len(audio_data) < min_samples:
logger.debug("Recording too short (%d samples), discarding", len(audio_data))
return None
return self._write_wav(audio_data)
def cancel(self) -> None:
"""Stop recording and discard all captured audio."""
with self._lock:
self._recording = False
self._frames = []
if self._stream is not None:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
logger.info("Voice recording cancelled")
# -- private helpers -----------------------------------------------------
@staticmethod
def _write_wav(audio_data) -> str:
"""Write numpy int16 audio data to a WAV file.
Returns the file path.
"""
os.makedirs(_TEMP_DIR, exist_ok=True)
timestamp = time.strftime("%Y%m%d_%H%M%S")
wav_path = os.path.join(_TEMP_DIR, f"recording_{timestamp}.wav")
with wave.open(wav_path, "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
wf.writeframes(audio_data.tobytes())
file_size = os.path.getsize(wav_path)
logger.info("WAV written: %s (%d bytes)", wav_path, file_size)
return wav_path
# ============================================================================
# STT dispatch
# ============================================================================
def transcribe_recording(wav_path: str, model: Optional[str] = None) -> Dict[str, Any]:
"""Transcribe a WAV recording using the existing Whisper pipeline.
Delegates to ``tools.transcription_tools.transcribe_audio()``.
Args:
wav_path: Path to the WAV file.
model: Whisper model name (default: from config or ``whisper-1``).
Returns:
Dict with ``success``, ``transcript``, and optionally ``error``.
"""
from tools.transcription_tools import transcribe_audio
return transcribe_audio(wav_path, model=model)
# ============================================================================
# Audio playback
# ============================================================================
def play_audio_file(file_path: str) -> bool:
"""Play an audio file through the default output device.
Strategy:
1. WAV files via ``sounddevice.play()`` when available.
2. System commands: ``afplay`` (macOS), ``ffplay`` (cross-platform),
``aplay`` (Linux ALSA).
Returns:
``True`` if playback succeeded, ``False`` otherwise.
"""
if not os.path.isfile(file_path):
logger.warning("Audio file not found: %s", file_path)
return False
# Try sounddevice for WAV files
if _HAS_AUDIO and file_path.endswith(".wav"):
try:
with wave.open(file_path, "rb") as wf:
frames = wf.readframes(wf.getnframes())
audio_data = np.frombuffer(frames, dtype=np.int16)
sample_rate = wf.getframerate()
sd.play(audio_data, samplerate=sample_rate)
sd.wait()
return True
except Exception as e:
logger.debug("sounddevice playback failed: %s", e)
# Fall back to system audio players
system = platform.system()
players = []
if system == "Darwin":
players.append(["afplay", file_path])
players.append(["ffplay", "-nodisp", "-autoexit", "-loglevel", "quiet", file_path])
if system == "Linux":
players.append(["aplay", "-q", file_path])
for cmd in players:
exe = shutil.which(cmd[0])
if exe:
try:
subprocess.run(cmd, capture_output=True, timeout=300)
return True
except Exception as e:
logger.debug("System player %s failed: %s", cmd[0], e)
logger.warning("No audio player available for %s", file_path)
return False
# ============================================================================
# Requirements check
# ============================================================================
def check_voice_requirements() -> Dict[str, Any]:
"""Check if all voice mode requirements are met.
Returns:
Dict with ``available``, ``audio_available``, ``stt_key_set``,
``missing_packages``, and ``details``.
"""
stt_key_set = bool(os.getenv("VOICE_TOOLS_OPENAI_KEY"))
missing: List[str] = []
if not _HAS_AUDIO:
missing.extend(["sounddevice", "numpy"])
available = _HAS_AUDIO and stt_key_set
details_parts = []
if _HAS_AUDIO:
details_parts.append("Audio capture: OK")
else:
details_parts.append("Audio capture: MISSING (pip install sounddevice numpy)")
if stt_key_set:
details_parts.append("STT API key: OK")
else:
details_parts.append("STT API key: MISSING (set VOICE_TOOLS_OPENAI_KEY)")
return {
"available": available,
"audio_available": _HAS_AUDIO,
"stt_key_set": stt_key_set,
"missing_packages": missing,
"details": "\n".join(details_parts),
}
# ============================================================================
# Temp file cleanup
# ============================================================================
def cleanup_temp_recordings(max_age_seconds: int = 3600) -> int:
"""Remove old temporary voice recording files.
Args:
max_age_seconds: Delete files older than this (default: 1 hour).
Returns:
Number of files deleted.
"""
if not os.path.isdir(_TEMP_DIR):
return 0
deleted = 0
now = time.time()
for entry in os.scandir(_TEMP_DIR):
if entry.is_file() and entry.name.startswith("recording_") and entry.name.endswith(".wav"):
try:
age = now - entry.stat().st_mtime
if age > max_age_seconds:
os.unlink(entry.path)
deleted += 1
except OSError:
pass
if deleted:
logger.debug("Cleaned up %d old voice recordings", deleted)
return deleted