refactor: split voice_loop.py into voice/ subpackage (#1379)
Some checks failed
Tests / lint (pull_request) Failing after 8s
Tests / test (pull_request) Has been skipped

This commit is contained in:
Alexander Whitestone
2026-03-24 15:26:01 -04:00
parent 46b5bf96cc
commit f32077ae8d
12 changed files with 517 additions and 446 deletions

View File

@@ -50,17 +50,12 @@ for route in _matrix_matrix_router.routes:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Used by src/dashboard/app.py # Used by src/dashboard/app.py
from .websocket import broadcast_world_state # noqa: E402, F401
# Used by src/infrastructure/presence.py
from .websocket import _ws_clients # noqa: E402, F401
# Used by tests # Used by tests
from .bark import ( # noqa: E402, F401 from .bark import ( # noqa: E402, F401
BarkRequest,
_BARK_RATE_LIMIT_SECONDS, _BARK_RATE_LIMIT_SECONDS,
_GROUND_TTL, _GROUND_TTL,
_MAX_EXCHANGES, _MAX_EXCHANGES,
BarkRequest,
_bark_and_broadcast, _bark_and_broadcast,
_bark_last_request, _bark_last_request,
_conversation, _conversation,
@@ -116,9 +111,13 @@ from .utils import ( # noqa: E402, F401
_get_agent_shape, _get_agent_shape,
_get_client_ip, _get_client_ip,
) )
# Used by src/infrastructure/presence.py
from .websocket import ( # noqa: E402, F401 from .websocket import ( # noqa: E402, F401
_authenticate_ws, _authenticate_ws,
_broadcast, _broadcast,
_heartbeat, _heartbeat,
_ws_clients, # noqa: E402, F401
broadcast_world_state, # noqa: E402, F401
world_ws, world_ws,
) )

View File

@@ -29,6 +29,8 @@ except ImportError:
requests = None # type: ignore requests = None # type: ignore
# Re-export data models so existing ``from …cascade import X`` keeps working. # Re-export data models so existing ``from …cascade import X`` keeps working.
# Mixins
from .health import HealthMixin
from .models import ( # noqa: F401 re-exports from .models import ( # noqa: F401 re-exports
CircuitState, CircuitState,
ContentType, ContentType,
@@ -38,9 +40,6 @@ from .models import ( # noqa: F401 re-exports
ProviderStatus, ProviderStatus,
RouterConfig, RouterConfig,
) )
# Mixins
from .health import HealthMixin
from .providers import ProviderCallsMixin from .providers import ProviderCallsMixin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -10,7 +10,7 @@ import logging
import time import time
from datetime import UTC, datetime from datetime import UTC, datetime
from .models import CircuitState, Provider, ProviderMetrics, ProviderStatus from .models import CircuitState, Provider, ProviderStatus
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -18,7 +18,7 @@ logger = logging.getLogger(__name__)
try: try:
from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor from infrastructure.claude_quota import QuotaMonitor, get_quota_monitor
_quota_monitor: "QuotaMonitor | None" = get_quota_monitor() _quota_monitor: QuotaMonitor | None = get_quota_monitor()
except Exception as _exc: # pragma: no cover except Exception as _exc: # pragma: no cover
logger.debug("Quota monitor not available: %s", _exc) logger.debug("Quota monitor not available: %s", _exc)
_quota_monitor = None _quota_monitor = None

View File

@@ -0,0 +1,50 @@
"""Voice subpackage — re-exports for convenience."""
from timmy.voice.activation import (
EXIT_COMMANDS,
WHISPER_HALLUCINATIONS,
is_exit_command,
is_hallucination,
)
from timmy.voice.audio_io import (
DEFAULT_CHANNELS,
DEFAULT_MAX_UTTERANCE,
DEFAULT_MIN_UTTERANCE,
DEFAULT_SAMPLE_RATE,
DEFAULT_SILENCE_DURATION,
DEFAULT_SILENCE_THRESHOLD,
_rms,
)
from timmy.voice.helpers import _install_quiet_asyncgen_hooks, _suppress_mcp_noise
from timmy.voice.llm import LLMMixin
from timmy.voice.speech_engines import (
_VOICE_PREAMBLE,
DEFAULT_PIPER_VOICE,
DEFAULT_WHISPER_MODEL,
_strip_markdown,
)
from timmy.voice.stt import STTMixin
from timmy.voice.tts import TTSMixin
__all__ = [
"DEFAULT_CHANNELS",
"DEFAULT_MAX_UTTERANCE",
"DEFAULT_MIN_UTTERANCE",
"DEFAULT_PIPER_VOICE",
"DEFAULT_SAMPLE_RATE",
"DEFAULT_SILENCE_DURATION",
"DEFAULT_SILENCE_THRESHOLD",
"DEFAULT_WHISPER_MODEL",
"EXIT_COMMANDS",
"LLMMixin",
"STTMixin",
"TTSMixin",
"WHISPER_HALLUCINATIONS",
"_VOICE_PREAMBLE",
"_install_quiet_asyncgen_hooks",
"_rms",
"_strip_markdown",
"_suppress_mcp_noise",
"is_exit_command",
"is_hallucination",
]

View File

@@ -0,0 +1,38 @@
"""Voice activation detection — hallucination filtering and exit commands."""
from __future__ import annotations
# Whisper hallucinates these on silence/noise — skip them.
WHISPER_HALLUCINATIONS = frozenset(
{
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
}
)
# Spoken phrases that end the voice session.
EXIT_COMMANDS = frozenset(
{
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
}
)
def is_hallucination(text: str) -> bool:
"""Return True if *text* is a known Whisper hallucination."""
return not text or text.lower() in WHISPER_HALLUCINATIONS
def is_exit_command(text: str) -> bool:
"""Return True if the user asked to stop the voice session."""
return text.lower().strip().rstrip(".!") in EXIT_COMMANDS

View File

@@ -0,0 +1,19 @@
"""Audio capture and playback utilities for the voice loop."""
from __future__ import annotations
import numpy as np
# ── Defaults ────────────────────────────────────────────────────────────────
DEFAULT_SAMPLE_RATE = 16000 # Whisper expects 16 kHz
DEFAULT_CHANNELS = 1
DEFAULT_SILENCE_THRESHOLD = 0.015 # RMS threshold — tune for your mic/room
DEFAULT_SILENCE_DURATION = 1.5 # seconds of silence to end utterance
DEFAULT_MIN_UTTERANCE = 0.5 # ignore clicks/bumps shorter than this
DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
def _rms(block: np.ndarray) -> float:
"""Compute root-mean-square energy of an audio block."""
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))

View File

@@ -0,0 +1,53 @@
"""Miscellaneous helpers for the voice loop runtime."""
from __future__ import annotations
import logging
import sys
def _suppress_mcp_noise() -> None:
"""Quiet down noisy MCP/Agno loggers during voice mode.
Sets specific loggers to WARNING so the terminal stays clean
for the voice transcript.
"""
for name in (
"mcp",
"mcp.server",
"mcp.client",
"agno",
"agno.mcp",
"httpx",
"httpcore",
):
logging.getLogger(name).setLevel(logging.WARNING)
def _install_quiet_asyncgen_hooks() -> None:
"""Silence MCP stdio_client async-generator teardown noise.
When the voice loop exits, Python GC finalizes Agno's MCP
stdio_client async generators. anyio's cancel-scope teardown
prints ugly tracebacks to stderr. These are harmless — the
MCP subprocesses die with the loop. We intercept them here.
"""
_orig_hook = getattr(sys, "unraisablehook", None)
def _quiet_hook(args):
# Swallow RuntimeError from anyio cancel-scope teardown
# and BaseExceptionGroup from MCP stdio_client generators
if args.exc_type in (RuntimeError, BaseExceptionGroup):
msg = str(args.exc_value) if args.exc_value else ""
if "cancel scope" in msg or "unhandled errors" in msg:
return
# Also swallow GeneratorExit from stdio_client
if args.exc_type is GeneratorExit:
return
# Everything else: forward to original hook
if _orig_hook:
_orig_hook(args)
else:
sys.__unraisablehook__(args)
sys.unraisablehook = _quiet_hook

68
src/timmy/voice/llm.py Normal file
View File

@@ -0,0 +1,68 @@
"""LLM integration mixin — async chat and event-loop management."""
from __future__ import annotations
import asyncio
import logging
import sys
import time
import warnings
from timmy.voice.speech_engines import _VOICE_PREAMBLE, _strip_markdown
logger = logging.getLogger(__name__)
class LLMMixin:
"""Mixin providing LLM chat methods for :class:`VoiceLoop`."""
def _get_loop(self) -> asyncio.AbstractEventLoop:
"""Return a persistent event loop, creating one if needed."""
if self._loop is None or self._loop.is_closed():
self._loop = asyncio.new_event_loop()
return self._loop
def _think(self, user_text: str) -> str:
"""Send text to Timmy and get a response."""
sys.stdout.write(" 💭 Thinking...\r")
sys.stdout.flush()
t0 = time.monotonic()
try:
loop = self._get_loop()
response = loop.run_until_complete(self._chat(user_text))
except (ConnectionError, RuntimeError, ValueError) as exc:
logger.error("Timmy chat failed: %s", exc)
response = "I'm having trouble thinking right now. Could you try again?"
elapsed = time.monotonic() - t0
logger.info("Timmy responded in %.1fs", elapsed)
response = _strip_markdown(response)
return response
async def _chat(self, message: str) -> str:
"""Async wrapper around Timmy's session.chat()."""
from timmy.session import chat
voiced = f"{_VOICE_PREAMBLE}\n\nUser said: {message}"
return await chat(voiced, session_id=self.config.session_id)
def _cleanup_loop(self) -> None:
"""Shut down the persistent event loop cleanly."""
if self._loop is None or self._loop.is_closed():
return
self._loop.set_exception_handler(lambda loop, ctx: None)
try:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
except RuntimeError as exc:
logger.debug("Shutdown asyncgens failed: %s", exc)
pass
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
try:
self._loop.close()
except RuntimeError as exc:
logger.debug("Loop close failed: %s", exc)
pass
self._loop = None

View File

@@ -0,0 +1,48 @@
"""Speech engine constants and text-processing utilities."""
from __future__ import annotations
import re
from pathlib import Path
# ── Defaults ────────────────────────────────────────────────────────────────
DEFAULT_WHISPER_MODEL = "base.en"
DEFAULT_PIPER_VOICE = Path.home() / ".local/share/piper-voices/en_US-lessac-medium.onnx"
# ── Voice-mode system instruction ───────────────────────────────────────────
# Prepended to user messages so Timmy responds naturally for TTS.
_VOICE_PREAMBLE = (
"[VOICE MODE] You are speaking aloud through a text-to-speech system. "
"Respond in short, natural spoken sentences. No markdown, no bullet points, "
"no asterisks, no numbered lists, no headers, no bold/italic formatting. "
"Talk like a person in a conversation — concise, warm, direct. "
"Keep responses under 3-4 sentences unless the user asks for detail."
)
def _strip_markdown(text: str) -> str:
"""Remove markdown formatting so TTS reads naturally.
Strips: **bold**, *italic*, `code`, # headers, - bullets,
numbered lists, [links](url), etc.
"""
if not text:
return text
# Remove bold/italic markers
text = re.sub(r"\*{1,3}([^*]+)\*{1,3}", r"\1", text)
# Remove inline code
text = re.sub(r"`([^`]+)`", r"\1", text)
# Remove headers (# Header)
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE)
# Remove bullet points (-, *, +) at start of line
text = re.sub(r"^[\s]*[-*+]\s+", "", text, flags=re.MULTILINE)
# Remove numbered lists (1. 2. etc)
text = re.sub(r"^[\s]*\d+\.\s+", "", text, flags=re.MULTILINE)
# Remove link syntax [text](url) → text
text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
# Remove horizontal rules
text = re.sub(r"^[-*_]{3,}\s*$", "", text, flags=re.MULTILINE)
# Collapse multiple newlines
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()

119
src/timmy/voice/stt.py Normal file
View File

@@ -0,0 +1,119 @@
"""Speech-to-text mixin — microphone capture and Whisper transcription."""
from __future__ import annotations
import logging
import sys
import time
import numpy as np
from timmy.voice.audio_io import DEFAULT_CHANNELS, _rms
logger = logging.getLogger(__name__)
class STTMixin:
"""Mixin providing STT methods for :class:`VoiceLoop`."""
def _load_whisper(self):
"""Load Whisper model (lazy, first use only)."""
if self._whisper_model is not None:
return
import whisper
logger.info("Loading Whisper model: %s", self.config.whisper_model)
self._whisper_model = whisper.load_model(self.config.whisper_model)
logger.info("Whisper model loaded.")
def _record_utterance(self) -> np.ndarray | None:
"""Record from microphone until silence is detected."""
import sounddevice as sd
sr = self.config.sample_rate
block_size = int(sr * 0.1)
silence_blocks = int(self.config.silence_duration / 0.1)
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
with sd.InputStream(
samplerate=sr,
channels=DEFAULT_CHANNELS,
dtype="float32",
blocksize=block_size,
) as stream:
chunks = self._capture_audio_blocks(stream, block_size, silence_blocks, max_blocks)
return self._finalize_utterance(chunks, min_blocks, sr)
def _capture_audio_blocks(
self,
stream,
block_size: int,
silence_blocks: int,
max_blocks: int,
) -> list[np.ndarray]:
"""Read audio blocks from *stream* until silence or max length."""
chunks: list[np.ndarray] = []
silent_count = 0
recording = False
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
if silent_count >= silence_blocks:
break
if len(chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
return chunks
@staticmethod
def _finalize_utterance(
chunks: list[np.ndarray], min_blocks: int, sample_rate: int
) -> np.ndarray | None:
"""Concatenate recorded chunks and report duration."""
if not chunks or len(chunks) < min_blocks:
return None
audio = np.concatenate(chunks, axis=0).flatten()
duration = len(audio) / sample_rate
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio
def _transcribe(self, audio: np.ndarray) -> str:
"""Transcribe audio using local Whisper model."""
self._load_whisper()
sys.stdout.write(" 🧠 Transcribing...\r")
sys.stdout.flush()
t0 = time.monotonic()
result = self._whisper_model.transcribe(audio, language="en", fp16=False)
elapsed = time.monotonic() - t0
text = result["text"].strip()
logger.info("Whisper transcribed in %.1fs: '%s'", elapsed, text[:80])
return text

78
src/timmy/voice/tts.py Normal file
View File

@@ -0,0 +1,78 @@
"""Text-to-speech mixin — Piper TTS and macOS ``say`` fallback."""
from __future__ import annotations
import logging
import subprocess
import tempfile
import time
from pathlib import Path
logger = logging.getLogger(__name__)
class TTSMixin:
"""Mixin providing TTS methods for :class:`VoiceLoop`."""
def _speak(self, text: str) -> None:
"""Speak text aloud using Piper TTS or macOS `say`."""
if not text:
return
self._speaking = True
try:
if self.config.use_say_fallback:
self._speak_say(text)
else:
self._speak_piper(text)
finally:
self._speaking = False
def _speak_piper(self, text: str) -> None:
"""Speak using Piper TTS (local ONNX inference)."""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
cmd = ["piper", "--model", str(self.config.piper_voice), "--output_file", tmp_path]
proc = subprocess.run(cmd, input=text, capture_output=True, text=True, timeout=30)
if proc.returncode != 0:
logger.error("Piper failed: %s", proc.stderr)
self._speak_say(text)
return
self._play_audio(tmp_path)
finally:
Path(tmp_path).unlink(missing_ok=True)
def _speak_say(self, text: str) -> None:
"""Speak using macOS `say` command."""
try:
proc = subprocess.Popen(
["say", "-r", "180", text],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
proc.kill()
except FileNotFoundError:
logger.error("macOS `say` command not found")
def _play_audio(self, path: str) -> None:
"""Play a WAV file. Can be interrupted by setting self._interrupted."""
try:
proc = subprocess.Popen(
["afplay", path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
while proc.poll() is None:
if self._interrupted:
proc.terminate()
self._interrupted = False
logger.info("TTS interrupted by user")
return
time.sleep(0.05)
except FileNotFoundError:
try:
subprocess.run(["aplay", path], capture_output=True, timeout=60)
except (FileNotFoundError, subprocess.TimeoutExpired):
logger.error("No audio player found (tried afplay, aplay)")

View File

@@ -13,76 +13,41 @@ Usage:
Requires: sounddevice, numpy, whisper, piper-tts Requires: sounddevice, numpy, whisper, piper-tts
""" """
from __future__ import annotations
import asyncio import asyncio
import logging import logging
import re
import subprocess
import sys import sys
import tempfile
import time
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
import numpy as np from timmy.voice.activation import (
EXIT_COMMANDS,
WHISPER_HALLUCINATIONS,
is_exit_command,
is_hallucination,
)
from timmy.voice.audio_io import (
DEFAULT_MAX_UTTERANCE,
DEFAULT_MIN_UTTERANCE,
DEFAULT_SAMPLE_RATE,
DEFAULT_SILENCE_DURATION,
DEFAULT_SILENCE_THRESHOLD,
)
from timmy.voice.helpers import _install_quiet_asyncgen_hooks, _suppress_mcp_noise
from timmy.voice.llm import LLMMixin
from timmy.voice.speech_engines import (
DEFAULT_PIPER_VOICE,
DEFAULT_WHISPER_MODEL,
)
from timmy.voice.stt import STTMixin
from timmy.voice.tts import TTSMixin
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# ── Voice-mode system instruction ───────────────────────────────────────────
# Prepended to user messages so Timmy responds naturally for TTS.
_VOICE_PREAMBLE = (
"[VOICE MODE] You are speaking aloud through a text-to-speech system. "
"Respond in short, natural spoken sentences. No markdown, no bullet points, "
"no asterisks, no numbered lists, no headers, no bold/italic formatting. "
"Talk like a person in a conversation — concise, warm, direct. "
"Keep responses under 3-4 sentences unless the user asks for detail."
)
def _strip_markdown(text: str) -> str:
"""Remove markdown formatting so TTS reads naturally.
Strips: **bold**, *italic*, `code`, # headers, - bullets,
numbered lists, [links](url), etc.
"""
if not text:
return text
# Remove bold/italic markers
text = re.sub(r"\*{1,3}([^*]+)\*{1,3}", r"\1", text)
# Remove inline code
text = re.sub(r"`([^`]+)`", r"\1", text)
# Remove headers (# Header)
text = re.sub(r"^#{1,6}\s+", "", text, flags=re.MULTILINE)
# Remove bullet points (-, *, +) at start of line
text = re.sub(r"^[\s]*[-*+]\s+", "", text, flags=re.MULTILINE)
# Remove numbered lists (1. 2. etc)
text = re.sub(r"^[\s]*\d+\.\s+", "", text, flags=re.MULTILINE)
# Remove link syntax [text](url) → text
text = re.sub(r"\[([^\]]+)\]\([^)]+\)", r"\1", text)
# Remove horizontal rules
text = re.sub(r"^[-*_]{3,}\s*$", "", text, flags=re.MULTILINE)
# Collapse multiple newlines
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
# ── Defaults ────────────────────────────────────────────────────────────────
DEFAULT_WHISPER_MODEL = "base.en"
DEFAULT_PIPER_VOICE = Path.home() / ".local/share/piper-voices/en_US-lessac-medium.onnx"
DEFAULT_SAMPLE_RATE = 16000 # Whisper expects 16 kHz
DEFAULT_CHANNELS = 1
DEFAULT_SILENCE_THRESHOLD = 0.015 # RMS threshold — tune for your mic/room
DEFAULT_SILENCE_DURATION = 1.5 # seconds of silence to end utterance
DEFAULT_MIN_UTTERANCE = 0.5 # ignore clicks/bumps shorter than this
DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
DEFAULT_SESSION_ID = "voice" DEFAULT_SESSION_ID = "voice"
def _rms(block: np.ndarray) -> float:
"""Compute root-mean-square energy of an audio block."""
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
@dataclass @dataclass
class VoiceConfig: class VoiceConfig:
"""Configuration for the voice loop.""" """Configuration for the voice loop."""
@@ -104,7 +69,7 @@ class VoiceConfig:
model_size: str | None = None model_size: str | None = None
class VoiceLoop: class VoiceLoop(STTMixin, TTSMixin, LLMMixin):
"""Sovereign listen-think-speak loop. """Sovereign listen-think-speak loop.
Everything runs locally: Everything runs locally:
@@ -113,312 +78,35 @@ class VoiceLoop:
- TTS: Piper (local ONNX model) or macOS `say` - TTS: Piper (local ONNX model) or macOS `say`
""" """
# Class-level constants delegate to the activation module.
_WHISPER_HALLUCINATIONS = WHISPER_HALLUCINATIONS
_EXIT_COMMANDS = EXIT_COMMANDS
def __init__(self, config: VoiceConfig | None = None) -> None: def __init__(self, config: VoiceConfig | None = None) -> None:
self.config = config or VoiceConfig() self.config = config or VoiceConfig()
self._whisper_model = None self._whisper_model = None
self._running = False self._running = False
self._speaking = False # True while TTS is playing self._speaking = False
self._interrupted = False # set when user talks over TTS self._interrupted = False
# Persistent event loop — reused across all chat calls so Agno's
# MCP sessions don't die when the loop closes.
self._loop: asyncio.AbstractEventLoop | None = None self._loop: asyncio.AbstractEventLoop | None = None
# ── Lazy initialization ───────────────────────────────────────────── # ── Lazy initialization ─────────────────────────────────────────────
def _load_whisper(self):
"""Load Whisper model (lazy, first use only)."""
if self._whisper_model is not None:
return
import whisper
logger.info("Loading Whisper model: %s", self.config.whisper_model)
self._whisper_model = whisper.load_model(self.config.whisper_model)
logger.info("Whisper model loaded.")
def _ensure_piper(self) -> bool: def _ensure_piper(self) -> bool:
"""Check that Piper voice model exists.""" """Check that Piper voice model exists."""
if self.config.use_say_fallback: if self.config.use_say_fallback:
return True return True
voice_path = self.config.piper_voice voice_path = self.config.piper_voice
if not voice_path.exists(): if not voice_path.exists():
logger.warning("Piper voice not found at %s — falling back to `say`", voice_path) logger.warning(
"Piper voice not found at %s — falling back to `say`", voice_path
)
self.config.use_say_fallback = True self.config.use_say_fallback = True
return True return True
return True return True
# ── STT: Microphone → Text ──────────────────────────────────────────
def _record_utterance(self) -> np.ndarray | None:
"""Record from microphone until silence is detected.
Uses energy-based Voice Activity Detection:
1. Wait for speech (RMS above threshold)
2. Record until silence (RMS below threshold for silence_duration)
3. Return the audio as a numpy array
Returns None if interrupted or no speech detected.
"""
import sounddevice as sd
sr = self.config.sample_rate
block_size = int(sr * 0.1) # 100ms blocks
silence_blocks = int(self.config.silence_duration / 0.1)
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
with sd.InputStream(
samplerate=sr,
channels=DEFAULT_CHANNELS,
dtype="float32",
blocksize=block_size,
) as stream:
chunks = self._capture_audio_blocks(stream, block_size, silence_blocks, max_blocks)
return self._finalize_utterance(chunks, min_blocks, sr)
def _capture_audio_blocks(
self,
stream,
block_size: int,
silence_blocks: int,
max_blocks: int,
) -> list[np.ndarray]:
"""Read audio blocks from *stream* until silence or max length.
Returns the list of captured audio chunks (may be empty).
"""
chunks: list[np.ndarray] = []
silent_count = 0
recording = False
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
if silent_count >= silence_blocks:
break
if len(chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
return chunks
@staticmethod
def _finalize_utterance(
chunks: list[np.ndarray], min_blocks: int, sample_rate: int
) -> np.ndarray | None:
"""Concatenate recorded chunks and report duration.
Returns ``None`` if the utterance is too short to be meaningful.
"""
if not chunks or len(chunks) < min_blocks:
return None
audio = np.concatenate(chunks, axis=0).flatten()
duration = len(audio) / sample_rate
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio
def _transcribe(self, audio: np.ndarray) -> str:
"""Transcribe audio using local Whisper model."""
self._load_whisper()
sys.stdout.write(" 🧠 Transcribing...\r")
sys.stdout.flush()
t0 = time.monotonic()
result = self._whisper_model.transcribe(
audio,
language="en",
fp16=False, # MPS/CPU — fp16 can cause issues on some setups
)
elapsed = time.monotonic() - t0
text = result["text"].strip()
logger.info("Whisper transcribed in %.1fs: '%s'", elapsed, text[:80])
return text
# ── TTS: Text → Speaker ─────────────────────────────────────────────
def _speak(self, text: str) -> None:
"""Speak text aloud using Piper TTS or macOS `say`."""
if not text:
return
self._speaking = True
try:
if self.config.use_say_fallback:
self._speak_say(text)
else:
self._speak_piper(text)
finally:
self._speaking = False
def _speak_piper(self, text: str) -> None:
"""Speak using Piper TTS (local ONNX inference)."""
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp:
tmp_path = tmp.name
try:
# Generate WAV with Piper
cmd = [
"piper",
"--model",
str(self.config.piper_voice),
"--output_file",
tmp_path,
]
proc = subprocess.run(
cmd,
input=text,
capture_output=True,
text=True,
timeout=30,
)
if proc.returncode != 0:
logger.error("Piper failed: %s", proc.stderr)
self._speak_say(text) # fallback
return
# Play with afplay (macOS) — interruptible
self._play_audio(tmp_path)
finally:
Path(tmp_path).unlink(missing_ok=True)
def _speak_say(self, text: str) -> None:
"""Speak using macOS `say` command."""
try:
proc = subprocess.Popen(
["say", "-r", "180", text],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
proc.wait(timeout=60)
except subprocess.TimeoutExpired:
proc.kill()
except FileNotFoundError:
logger.error("macOS `say` command not found")
def _play_audio(self, path: str) -> None:
"""Play a WAV file. Can be interrupted by setting self._interrupted."""
try:
proc = subprocess.Popen(
["afplay", path],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# Poll so we can interrupt
while proc.poll() is None:
if self._interrupted:
proc.terminate()
self._interrupted = False
logger.info("TTS interrupted by user")
return
time.sleep(0.05)
except FileNotFoundError:
# Not macOS — try aplay (Linux)
try:
subprocess.run(["aplay", path], capture_output=True, timeout=60)
except (FileNotFoundError, subprocess.TimeoutExpired):
logger.error("No audio player found (tried afplay, aplay)")
# ── LLM: Text → Response ───────────────────────────────────────────
def _get_loop(self) -> asyncio.AbstractEventLoop:
"""Return a persistent event loop, creating one if needed.
A single loop is reused for the entire voice session so Agno's
MCP tool-server connections survive across turns.
"""
if self._loop is None or self._loop.is_closed():
self._loop = asyncio.new_event_loop()
return self._loop
def _think(self, user_text: str) -> str:
"""Send text to Timmy and get a response."""
sys.stdout.write(" 💭 Thinking...\r")
sys.stdout.flush()
t0 = time.monotonic()
try:
loop = self._get_loop()
response = loop.run_until_complete(self._chat(user_text))
except (ConnectionError, RuntimeError, ValueError) as exc:
logger.error("Timmy chat failed: %s", exc)
response = "I'm having trouble thinking right now. Could you try again?"
elapsed = time.monotonic() - t0
logger.info("Timmy responded in %.1fs", elapsed)
# Strip markdown so TTS doesn't read asterisks, bullets, etc.
response = _strip_markdown(response)
return response
async def _chat(self, message: str) -> str:
"""Async wrapper around Timmy's session.chat().
Prepends the voice-mode instruction so Timmy responds in
natural spoken language rather than markdown.
"""
from timmy.session import chat
voiced = f"{_VOICE_PREAMBLE}\n\nUser said: {message}"
return await chat(voiced, session_id=self.config.session_id)
# ── Main Loop ─────────────────────────────────────────────────────── # ── Main Loop ───────────────────────────────────────────────────────
# Whisper hallucinates these on silence/noise — skip them.
_WHISPER_HALLUCINATIONS = frozenset(
{
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
}
)
# Spoken phrases that end the voice session.
_EXIT_COMMANDS = frozenset(
{
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
}
)
def _log_banner(self) -> None: def _log_banner(self) -> None:
"""Log the startup banner with STT/TTS/LLM configuration.""" """Log the startup banner with STT/TTS/LLM configuration."""
tts_label = ( tts_label = (
@@ -438,21 +126,19 @@ class VoiceLoop:
def _is_hallucination(self, text: str) -> bool: def _is_hallucination(self, text: str) -> bool:
"""Return True if *text* is a known Whisper hallucination.""" """Return True if *text* is a known Whisper hallucination."""
return not text or text.lower() in self._WHISPER_HALLUCINATIONS return is_hallucination(text)
def _is_exit_command(self, text: str) -> bool: def _is_exit_command(self, text: str) -> bool:
"""Return True if the user asked to stop the voice session.""" """Return True if the user asked to stop the voice session."""
return text.lower().strip().rstrip(".!") in self._EXIT_COMMANDS return is_exit_command(text)
def _process_turn(self, text: str) -> None: def _process_turn(self, text: str) -> None:
"""Handle a single listen-think-speak turn after transcription.""" """Handle a single listen-think-speak turn after transcription."""
sys.stdout.write(f"\n 👤 You: {text}\n") sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush() sys.stdout.flush()
response = self._think(text) response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n") sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush() sys.stdout.flush()
self._speak(response) self._speak(response)
def run(self) -> None: def run(self) -> None:
@@ -461,112 +147,26 @@ class VoiceLoop:
_suppress_mcp_noise() _suppress_mcp_noise()
_install_quiet_asyncgen_hooks() _install_quiet_asyncgen_hooks()
self._log_banner() self._log_banner()
self._running = True self._running = True
try: try:
while self._running: while self._running:
audio = self._record_utterance() audio = self._record_utterance()
if audio is None: if audio is None:
continue continue
text = self._transcribe(audio) text = self._transcribe(audio)
if self._is_hallucination(text): if self._is_hallucination(text):
logger.debug("Ignoring likely Whisper hallucination: '%s'", text) logger.debug("Ignoring likely Whisper hallucination: '%s'", text)
continue continue
if self._is_exit_command(text): if self._is_exit_command(text):
logger.info("👋 Goodbye!") logger.info("👋 Goodbye!")
break break
self._process_turn(text) self._process_turn(text)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("👋 Voice loop stopped.") logger.info("👋 Voice loop stopped.")
finally: finally:
self._running = False self._running = False
self._cleanup_loop() self._cleanup_loop()
def _cleanup_loop(self) -> None:
"""Shut down the persistent event loop cleanly.
Agno's MCP stdio sessions leave async generators (stdio_client)
that complain loudly when torn down from a different task.
We swallow those errors — they're harmless, the subprocesses
die with the loop anyway.
"""
if self._loop is None or self._loop.is_closed():
return
# Silence "error during closing of asynchronous generator" warnings
# from MCP's anyio/asyncio cancel-scope teardown.
import warnings
self._loop.set_exception_handler(lambda loop, ctx: None)
try:
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
except RuntimeError as exc:
logger.debug("Shutdown asyncgens failed: %s", exc)
pass
with warnings.catch_warnings():
warnings.simplefilter("ignore", RuntimeWarning)
try:
self._loop.close()
except RuntimeError as exc:
logger.debug("Loop close failed: %s", exc)
pass
self._loop = None
def stop(self) -> None: def stop(self) -> None:
"""Stop the voice loop (from another thread).""" """Stop the voice loop (from another thread)."""
self._running = False self._running = False
def _suppress_mcp_noise() -> None:
"""Quiet down noisy MCP/Agno loggers during voice mode.
Sets specific loggers to WARNING so the terminal stays clean
for the voice transcript.
"""
for name in (
"mcp",
"mcp.server",
"mcp.client",
"agno",
"agno.mcp",
"httpx",
"httpcore",
):
logging.getLogger(name).setLevel(logging.WARNING)
def _install_quiet_asyncgen_hooks() -> None:
"""Silence MCP stdio_client async-generator teardown noise.
When the voice loop exits, Python GC finalizes Agno's MCP
stdio_client async generators. anyio's cancel-scope teardown
prints ugly tracebacks to stderr. These are harmless — the
MCP subprocesses die with the loop. We intercept them here.
"""
_orig_hook = getattr(sys, "unraisablehook", None)
def _quiet_hook(args):
# Swallow RuntimeError from anyio cancel-scope teardown
# and BaseExceptionGroup from MCP stdio_client generators
if args.exc_type in (RuntimeError, BaseExceptionGroup):
msg = str(args.exc_value) if args.exc_value else ""
if "cancel scope" in msg or "unhandled errors" in msg:
return
# Also swallow GeneratorExit from stdio_client
if args.exc_type is GeneratorExit:
return
# Everything else: forward to original hook
if _orig_hook:
_orig_hook(args)
else:
sys.__unraisablehook__(args)
sys.unraisablehook = _quiet_hook