"""Sovereign voice loop — listen, think, speak. A fully local voice interface for Timmy. No cloud, no network calls. All processing happens on the user's machine: Mic → VAD/silence detection → Whisper (local STT) → Timmy chat → Piper TTS → Speaker Usage: from timmy.voice_loop import VoiceLoop loop = VoiceLoop() loop.run() # blocks, Ctrl-C to stop Requires: sounddevice, numpy, whisper, piper-tts """ import asyncio import logging import re import subprocess import sys import tempfile import time from dataclasses import dataclass from pathlib import Path import numpy as np logger = logging.getLogger(__name__) # ── Voice-mode system instruction ─────────────────────────────────────────── # Prepended to user messages so Timmy responds naturally for TTS. _VOICE_PREAMBLE = ( "[VOICE MODE] You are speaking aloud through a text-to-speech system. " "Respond in short, natural spoken sentences. No markdown, no bullet points, " "no asterisks, no numbered lists, no headers, no bold/italic formatting. " "Talk like a person in a conversation — concise, warm, direct. " "Keep responses under 3-4 sentences unless the user asks for detail." ) def _strip_markdown(text: str) -> str: """Remove markdown formatting so TTS reads naturally. Strips: **bold**, *italic*, `code`, # headers, - bullets, numbered lists, [links](url), etc. """ if not text: return text # Remove bold/italic markers text = re.sub(r"\*{1,3}([^*]+)\*{1,3}", r"\1", text) # Remove inline code text = re.sub(r"`([^`]+)`", r"\1", text) # Remove headers (# Header) text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE) # Remove bullet points (-, *, +) at start of line text = re.sub(r"^[\s]*[-*+]\s+", "", text, flags=re.MULTILINE) # Remove numbered lists (1. 2. etc) text = re.sub(r"^[\s]*\d+\.\s+", "", text, flags=re.MULTILINE) # Remove link syntax [text](url) → text text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text) # Remove horizontal rules text = re.sub(r"^[-*_]{3,}\s*$", "", text, flags=re.MULTILINE) # Collapse multiple newlines text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() # ── Defaults ──────────────────────────────────────────────────────────────── DEFAULT_WHISPER_MODEL = "base.en" DEFAULT_PIPER_VOICE = Path.home() / ".local/share/piper-voices/en_US-lessac-medium.onnx" DEFAULT_SAMPLE_RATE = 16000 # Whisper expects 16 kHz DEFAULT_CHANNELS = 1 DEFAULT_SILENCE_THRESHOLD = 0.015 # RMS threshold — tune for your mic/room DEFAULT_SILENCE_DURATION = 1.5 # seconds of silence to end utterance DEFAULT_MIN_UTTERANCE = 0.5 # ignore clicks/bumps shorter than this DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever DEFAULT_SESSION_ID = "voice" @dataclass class VoiceConfig: """Configuration for the voice loop.""" whisper_model: str = DEFAULT_WHISPER_MODEL piper_voice: Path = DEFAULT_PIPER_VOICE sample_rate: int = DEFAULT_SAMPLE_RATE silence_threshold: float = DEFAULT_SILENCE_THRESHOLD silence_duration: float = DEFAULT_SILENCE_DURATION min_utterance: float = DEFAULT_MIN_UTTERANCE max_utterance: float = DEFAULT_MAX_UTTERANCE session_id: str = DEFAULT_SESSION_ID # Set True to use macOS `say` instead of Piper use_say_fallback: bool = False # Piper speaking rate (default 1.0, lower = slower) speaking_rate: float = 1.0 # Backend/model for Timmy inference backend: str | None = None model_size: str | None = None class VoiceLoop: """Sovereign listen-think-speak loop. Everything runs locally: - STT: OpenAI Whisper (local model, no API) - LLM: Timmy via Ollama (local inference) - TTS: Piper (local ONNX model) or macOS `say` """ def __init__(self, config: VoiceConfig | None = None) -> None: self.config = config or VoiceConfig() self._whisper_model = None self._running = False self._speaking = False # True while TTS is playing self._interrupted = False # set when user talks over TTS # Persistent event loop — reused across all chat calls so Agno's # MCP sessions don't die when the loop closes. self._loop: asyncio.AbstractEventLoop | None = None # ── Lazy initialization ───────────────────────────────────────────── def _load_whisper(self): """Load Whisper model (lazy, first use only).""" if self._whisper_model is not None: return import whisper logger.info("Loading Whisper model: %s", self.config.whisper_model) self._whisper_model = whisper.load_model(self.config.whisper_model) logger.info("Whisper model loaded.") def _ensure_piper(self) -> bool: """Check that Piper voice model exists.""" if self.config.use_say_fallback: return True voice_path = self.config.piper_voice if not voice_path.exists(): logger.warning("Piper voice not found at %s — falling back to `say`", voice_path) self.config.use_say_fallback = True return True return True # ── STT: Microphone → Text ────────────────────────────────────────── def _record_utterance(self) -> np.ndarray | None: """Record from microphone until silence is detected. Uses energy-based Voice Activity Detection: 1. Wait for speech (RMS above threshold) 2. Record until silence (RMS below threshold for silence_duration) 3. Return the audio as a numpy array Returns None if interrupted or no speech detected. """ import sounddevice as sd sr = self.config.sample_rate block_size = int(sr * 0.1) # 100ms blocks silence_blocks = int(self.config.silence_duration / 0.1) min_blocks = int(self.config.min_utterance / 0.1) max_blocks = int(self.config.max_utterance / 0.1) audio_chunks: list[np.ndarray] = [] silent_count = 0 recording = False def _rms(block: np.ndarray) -> float: return float(np.sqrt(np.mean(block.astype(np.float32) ** 2))) sys.stdout.write("\n 🎤 Listening... (speak now)\n") sys.stdout.flush() with sd.InputStream( samplerate=sr, channels=DEFAULT_CHANNELS, dtype="float32", blocksize=block_size, ) as stream: while self._running: block, overflowed = stream.read(block_size) if overflowed: logger.debug("Audio buffer overflowed") rms = _rms(block) if not recording: if rms > self.config.silence_threshold: recording = True silent_count = 0 audio_chunks.append(block.copy()) sys.stdout.write(" 📢 Recording...\r") sys.stdout.flush() else: audio_chunks.append(block.copy()) if rms < self.config.silence_threshold: silent_count += 1 else: silent_count = 0 # End of utterance if silent_count >= silence_blocks: break # Safety cap if len(audio_chunks) >= max_blocks: logger.info("Max utterance length reached, stopping.") break if not audio_chunks or len(audio_chunks) < min_blocks: return None audio = np.concatenate(audio_chunks, axis=0).flatten() duration = len(audio) / sr sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n") sys.stdout.flush() return audio def _transcribe(self, audio: np.ndarray) -> str: """Transcribe audio using local Whisper model.""" self._load_whisper() sys.stdout.write(" 🧠 Transcribing...\r") sys.stdout.flush() t0 = time.monotonic() result = self._whisper_model.transcribe( audio, language="en", fp16=False, # MPS/CPU — fp16 can cause issues on some setups ) elapsed = time.monotonic() - t0 text = result["text"].strip() logger.info("Whisper transcribed in %.1fs: '%s'", elapsed, text[:80]) return text # ── TTS: Text → Speaker ───────────────────────────────────────────── def _speak(self, text: str) -> None: """Speak text aloud using Piper TTS or macOS `say`.""" if not text: return self._speaking = True try: if self.config.use_say_fallback: self._speak_say(text) else: self._speak_piper(text) finally: self._speaking = False def _speak_piper(self, text: str) -> None: """Speak using Piper TTS (local ONNX inference).""" with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp: tmp_path = tmp.name try: # Generate WAV with Piper cmd = [ "piper", "--model", str(self.config.piper_voice), "--output_file", tmp_path, ] proc = subprocess.run( cmd, input=text, capture_output=True, text=True, timeout=30, ) if proc.returncode != 0: logger.error("Piper failed: %s", proc.stderr) self._speak_say(text) # fallback return # Play with afplay (macOS) — interruptible self._play_audio(tmp_path) finally: Path(tmp_path).unlink(missing_ok=True) def _speak_say(self, text: str) -> None: """Speak using macOS `say` command.""" try: proc = subprocess.Popen( ["say", "-r", "180", text], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) proc.wait(timeout=60) except subprocess.TimeoutExpired: proc.kill() except FileNotFoundError: logger.error("macOS `say` command not found") def _play_audio(self, path: str) -> None: """Play a WAV file. Can be interrupted by setting self._interrupted.""" try: proc = subprocess.Popen( ["afplay", path], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) # Poll so we can interrupt while proc.poll() is None: if self._interrupted: proc.terminate() self._interrupted = False logger.info("TTS interrupted by user") return time.sleep(0.05) except FileNotFoundError: # Not macOS — try aplay (Linux) try: subprocess.run(["aplay", path], capture_output=True, timeout=60) except (FileNotFoundError, subprocess.TimeoutExpired): logger.error("No audio player found (tried afplay, aplay)") # ── LLM: Text → Response ─────────────────────────────────────────── def _get_loop(self) -> asyncio.AbstractEventLoop: """Return a persistent event loop, creating one if needed. A single loop is reused for the entire voice session so Agno's MCP tool-server connections survive across turns. """ if self._loop is None or self._loop.is_closed(): self._loop = asyncio.new_event_loop() return self._loop def _think(self, user_text: str) -> str: """Send text to Timmy and get a response.""" sys.stdout.write(" 💭 Thinking...\r") sys.stdout.flush() t0 = time.monotonic() try: loop = self._get_loop() response = loop.run_until_complete(self._chat(user_text)) except (ConnectionError, RuntimeError, ValueError) as exc: logger.error("Timmy chat failed: %s", exc) response = "I'm having trouble thinking right now. Could you try again?" elapsed = time.monotonic() - t0 logger.info("Timmy responded in %.1fs", elapsed) # Strip markdown so TTS doesn't read asterisks, bullets, etc. response = _strip_markdown(response) return response async def _chat(self, message: str) -> str: """Async wrapper around Timmy's session.chat(). Prepends the voice-mode instruction so Timmy responds in natural spoken language rather than markdown. """ from timmy.session import chat voiced = f"{_VOICE_PREAMBLE}\n\nUser said: {message}" return await chat(voiced, session_id=self.config.session_id) # ── Main Loop ─────────────────────────────────────────────────────── def run(self) -> None: """Run the voice loop. Blocks until Ctrl-C.""" self._ensure_piper() # Suppress MCP / Agno stderr noise during voice mode. _suppress_mcp_noise() # Suppress MCP async-generator teardown tracebacks on exit. _install_quiet_asyncgen_hooks() tts_label = ( "macOS say" if self.config.use_say_fallback else f"Piper ({self.config.piper_voice.name})" ) logger.info( "\n" + "=" * 60 + "\n" " 🎙️ Timmy Voice — Sovereign Voice Interface\n" + "=" * 60 + "\n" f" STT: Whisper ({self.config.whisper_model})\n" f" TTS: {tts_label}\n" " LLM: Timmy (local Ollama)\n" + "=" * 60 + "\n" " Speak naturally. Timmy will listen, think, and respond.\n" " Press Ctrl-C to exit.\n" + "=" * 60 ) self._running = True try: while self._running: # 1. LISTEN — record until silence audio = self._record_utterance() if audio is None: continue # 2. TRANSCRIBE — Whisper STT text = self._transcribe(audio) if not text or text.lower() in ( "you", "thanks.", "thank you.", "bye.", "", "thanks for watching!", "thank you for watching!", ): # Whisper hallucinations on silence/noise logger.debug("Ignoring likely Whisper hallucination: '%s'", text) continue sys.stdout.write(f"\n 👤 You: {text}\n") sys.stdout.flush() # Exit commands if text.lower().strip().rstrip(".!") in ( "goodbye", "exit", "quit", "stop", "goodbye timmy", "stop listening", ): logger.info("👋 Goodbye!") break # 3. THINK — send to Timmy response = self._think(text) sys.stdout.write(f" 🤖 Timmy: {response}\n") sys.stdout.flush() # 4. SPEAK — TTS output self._speak(response) except KeyboardInterrupt: logger.info("👋 Voice loop stopped.") finally: self._running = False self._cleanup_loop() def _cleanup_loop(self) -> None: """Shut down the persistent event loop cleanly. Agno's MCP stdio sessions leave async generators (stdio_client) that complain loudly when torn down from a different task. We swallow those errors — they're harmless, the subprocesses die with the loop anyway. """ if self._loop is None or self._loop.is_closed(): return # Silence "error during closing of asynchronous generator" warnings # from MCP's anyio/asyncio cancel-scope teardown. import warnings self._loop.set_exception_handler(lambda loop, ctx: None) try: self._loop.run_until_complete(self._loop.shutdown_asyncgens()) except RuntimeError as exc: logger.debug("Shutdown asyncgens failed: %s", exc) pass with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) try: self._loop.close() except RuntimeError as exc: logger.debug("Loop close failed: %s", exc) pass self._loop = None def stop(self) -> None: """Stop the voice loop (from another thread).""" self._running = False def _suppress_mcp_noise() -> None: """Quiet down noisy MCP/Agno loggers during voice mode. Sets specific loggers to WARNING so the terminal stays clean for the voice transcript. """ for name in ( "mcp", "mcp.server", "mcp.client", "agno", "agno.mcp", "httpx", "httpcore", ): logging.getLogger(name).setLevel(logging.WARNING) def _install_quiet_asyncgen_hooks() -> None: """Silence MCP stdio_client async-generator teardown noise. When the voice loop exits, Python GC finalizes Agno's MCP stdio_client async generators. anyio's cancel-scope teardown prints ugly tracebacks to stderr. These are harmless — the MCP subprocesses die with the loop. We intercept them here. """ _orig_hook = getattr(sys, "unraisablehook", None) def _quiet_hook(args): # Swallow RuntimeError from anyio cancel-scope teardown # and BaseExceptionGroup from MCP stdio_client generators if args.exc_type in (RuntimeError, BaseExceptionGroup): msg = str(args.exc_value) if args.exc_value else "" if "cancel scope" in msg or "unhandled errors" in msg: return # Also swallow GeneratorExit from stdio_client if args.exc_type is GeneratorExit: return # Everything else: forward to original hook if _orig_hook: _orig_hook(args) else: sys.__unraisablehook__(args) sys.unraisablehook = _quiet_hook