Compare commits

..

1 Commits

Author SHA1 Message Date
kimi
96acac5c5f refactor: break up shell.py::run() into helpers
Extract _build_run_env() and _exec_subprocess() from the 89-line run()
method, reducing it to ~30 lines that validate and delegate.

Fixes #539

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 21:03:33 -04:00
5 changed files with 302 additions and 357 deletions

View File

@@ -146,7 +146,7 @@ class ShellHand:
@staticmethod
def _build_run_env(env: dict | None) -> dict:
"""Merge *env* overrides into a copy of the current environment."""
"""Merge *env* overrides into the current process environment."""
import os
run_env = os.environ.copy()
@@ -154,7 +154,7 @@ class ShellHand:
run_env.update(env)
return run_env
async def _execute_subprocess(
async def _exec_subprocess(
self,
command: str,
effective_timeout: int,
@@ -162,7 +162,7 @@ class ShellHand:
run_env: dict,
start: float,
) -> ShellResult:
"""Run *command* as a subprocess with timeout enforcement."""
"""Launch *command*, enforce timeout, and return the result."""
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
@@ -178,29 +178,24 @@ class ShellHand:
except TimeoutError:
proc.kill()
await proc.wait()
latency = (time.time() - start) * 1000
logger.warning("Shell command timed out after %ds: %s", effective_timeout, command)
return ShellResult(
command=command,
success=False,
exit_code=-1,
error=f"Command timed out after {effective_timeout}s",
latency_ms=latency,
latency_ms=(time.time() - start) * 1000,
timed_out=True,
)
latency = (time.time() - start) * 1000
exit_code = proc.returncode if proc.returncode is not None else -1
stdout = stdout_bytes.decode("utf-8", errors="replace").strip()
stderr = stderr_bytes.decode("utf-8", errors="replace").strip()
return ShellResult(
command=command,
success=exit_code == 0,
exit_code=exit_code,
stdout=stdout,
stderr=stderr,
latency_ms=latency,
stdout=stdout_bytes.decode("utf-8", errors="replace").strip(),
stderr=stderr_bytes.decode("utf-8", errors="replace").strip(),
latency_ms=(time.time() - start) * 1000,
)
async def run(
@@ -232,20 +227,21 @@ class ShellHand:
latency_ms=(time.time() - start) * 1000,
)
effective_timeout = timeout or self._default_timeout
cwd = working_dir or self._working_dir
try:
run_env = self._build_run_env(env)
return await self._execute_subprocess(command, effective_timeout, cwd, run_env, start)
return await self._exec_subprocess(
command,
effective_timeout=timeout or self._default_timeout,
cwd=working_dir or self._working_dir,
run_env=self._build_run_env(env),
start=start,
)
except Exception as exc:
latency = (time.time() - start) * 1000
logger.warning("Shell command failed: %s%s", command, exc)
return ShellResult(
command=command,
success=False,
error=str(exc),
latency_ms=latency,
latency_ms=(time.time() - start) * 1000,
)
def status(self) -> dict:

View File

@@ -98,73 +98,6 @@ def _get_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
return {row[1] for row in cursor.fetchall()}
def _migrate_episodes(conn: sqlite3.Connection) -> None:
"""Migrate episodes table rows into the unified memories table."""
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
def _migrate_chunks(conn: sqlite3.Connection) -> None:
"""Migrate chunks table rows into the unified memories table."""
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
def _drop_legacy_table(conn: sqlite3.Connection, table: str) -> None:
"""Drop a legacy table if it exists."""
try:
conn.execute(f"DROP TABLE {table}") # noqa: S608
logger.info("Migration: Dropped old %s table", table)
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop %s: %s", table, exc)
def _migrate_schema(conn: sqlite3.Connection) -> None:
"""Migrate from old three-table schema to unified memories table.
@@ -177,16 +110,78 @@ def _migrate_schema(conn: sqlite3.Connection) -> None:
tables = {row[0] for row in cursor.fetchall()}
has_memories = "memories" in tables
has_episodes = "episodes" in tables
has_chunks = "chunks" in tables
has_facts = "facts" in tables
if not has_memories and (tables & {"episodes", "chunks", "facts"}):
# Check if we need to migrate (old schema exists)
if not has_memories and (has_episodes or has_chunks or has_facts):
logger.info("Migration: Creating unified memories table")
# Schema will be created by _ensure_schema above
if "episodes" in tables and has_memories:
_migrate_episodes(conn)
if "chunks" in tables and has_memories:
_migrate_chunks(conn)
if "facts" in tables:
_drop_legacy_table(conn, "facts")
# Migrate episodes -> memories
if has_episodes and has_memories:
logger.info("Migration: Converting episodes table to memories")
try:
cols = _get_table_columns(conn, "episodes")
context_type_col = "context_type" if "context_type" in cols else "'conversation'"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
metadata, agent_id, task_id, session_id,
created_at, access_count, last_accessed
)
SELECT
id, content,
COALESCE({context_type_col}, 'conversation'),
COALESCE(source, 'agent'),
embedding,
metadata, agent_id, task_id, session_id,
COALESCE(timestamp, datetime('now')), 0, NULL
FROM episodes
""")
conn.execute("DROP TABLE episodes")
logger.info("Migration: Migrated episodes to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate episodes: %s", exc)
# Migrate chunks -> memories as vault_chunk
if has_chunks and has_memories:
logger.info("Migration: Converting chunks table to memories")
try:
cols = _get_table_columns(conn, "chunks")
id_col = "id" if "id" in cols else "CAST(rowid AS TEXT)"
content_col = "content" if "content" in cols else "text"
source_col = (
"filepath" if "filepath" in cols else ("source" if "source" in cols else "'vault'")
)
embedding_col = "embedding" if "embedding" in cols else "NULL"
created_col = "created_at" if "created_at" in cols else "datetime('now')"
conn.execute(f"""
INSERT INTO memories (
id, content, memory_type, source, embedding,
created_at, access_count
)
SELECT
{id_col}, {content_col}, 'vault_chunk', {source_col},
{embedding_col}, {created_col}, 0
FROM chunks
""")
conn.execute("DROP TABLE chunks")
logger.info("Migration: Migrated chunks to memories")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to migrate chunks: %s", exc)
# Drop old tables
if has_facts:
try:
conn.execute("DROP TABLE facts")
logger.info("Migration: Dropped old facts table")
except sqlite3.Error as exc:
logger.warning("Migration: Failed to drop facts: %s", exc)
conn.commit()
@@ -303,85 +298,6 @@ def store_memory(
return entry
def _build_search_filters(
context_type: str | None,
agent_id: str | None,
session_id: str | None,
) -> tuple[str, list]:
"""Build SQL WHERE clause and params from search filters."""
conditions: list[str] = []
params: list = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
return where_clause, params
def _fetch_memory_candidates(
where_clause: str, params: list, candidate_limit: int
) -> list[sqlite3.Row]:
"""Fetch candidate memory rows from the database."""
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(candidate_limit)
with get_connection() as conn:
return conn.execute(query_sql, params).fetchall()
def _row_to_entry(row: sqlite3.Row) -> MemoryEntry:
"""Convert a database row to a MemoryEntry."""
return MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
def _score_and_filter(
rows: list[sqlite3.Row],
query: str,
query_embedding: list[float],
min_relevance: float,
) -> list[MemoryEntry]:
"""Score candidate rows by similarity and filter by min_relevance."""
results = []
for row in rows:
entry = _row_to_entry(row)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
else:
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results
def search_memories(
query: str,
limit: int = 10,
@@ -404,9 +320,65 @@ def search_memories(
List of MemoryEntry objects sorted by relevance
"""
query_embedding = embed_text(query)
where_clause, params = _build_search_filters(context_type, agent_id, session_id)
rows = _fetch_memory_candidates(where_clause, params, limit * 3)
results = _score_and_filter(rows, query, query_embedding, min_relevance)
# Build query with filters
conditions = []
params = []
if context_type:
conditions.append("memory_type = ?")
params.append(context_type)
if agent_id:
conditions.append("agent_id = ?")
params.append(agent_id)
if session_id:
conditions.append("session_id = ?")
params.append(session_id)
where_clause = "WHERE " + " AND ".join(conditions) if conditions else ""
# Fetch candidates (we'll do in-memory similarity for now)
query_sql = f"""
SELECT * FROM memories
{where_clause}
ORDER BY created_at DESC
LIMIT ?
"""
params.append(limit * 3) # Get more candidates for ranking
with get_connection() as conn:
rows = conn.execute(query_sql, params).fetchall()
# Compute similarity scores
results = []
for row in rows:
entry = MemoryEntry(
id=row["id"],
content=row["content"],
source=row["source"],
context_type=row["memory_type"], # DB column -> API field
agent_id=row["agent_id"],
task_id=row["task_id"],
session_id=row["session_id"],
metadata=json.loads(row["metadata"]) if row["metadata"] else None,
embedding=json.loads(row["embedding"]) if row["embedding"] else None,
timestamp=row["created_at"],
)
if entry.embedding:
score = cosine_similarity(query_embedding, entry.embedding)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
else:
# Fallback: check for keyword overlap
score = _keyword_overlap(query, entry.content)
entry.relevance_score = score
if score >= min_relevance:
results.append(entry)
# Sort by relevance and return top results
results.sort(key=lambda x: x.relevance_score or 0, reverse=True)
return results[:limit]

View File

@@ -909,35 +909,82 @@ def _experiment_tool_catalog() -> dict:
}
_CREATIVE_CATALOG_SOURCES: list[tuple[str, str, list[str]]] = [
("creative.tools.git_tools", "GIT_TOOL_CATALOG", ["forge", "helm", "orchestrator"]),
("creative.tools.image_tools", "IMAGE_TOOL_CATALOG", ["pixel", "orchestrator"]),
("creative.tools.music_tools", "MUSIC_TOOL_CATALOG", ["lyra", "orchestrator"]),
("creative.tools.video_tools", "VIDEO_TOOL_CATALOG", ["reel", "orchestrator"]),
("creative.director", "DIRECTOR_TOOL_CATALOG", ["orchestrator"]),
("creative.assembler", "ASSEMBLER_TOOL_CATALOG", ["reel", "orchestrator"]),
]
def _import_creative_catalogs(catalog: dict) -> None:
"""Import and merge creative tool catalogs from creative module."""
for module_path, attr_name, available_in in _CREATIVE_CATALOG_SOURCES:
_merge_catalog(catalog, module_path, attr_name, available_in)
def _merge_catalog(
catalog: dict, module_path: str, attr_name: str, available_in: list[str]
) -> None:
"""Import a single creative catalog and merge its entries."""
# ── Git tools ─────────────────────────────────────────────────────────────
try:
from importlib import import_module
from creative.tools.git_tools import GIT_TOOL_CATALOG
source_catalog = getattr(import_module(module_path), attr_name)
for tool_id, info in source_catalog.items():
for tool_id, info in GIT_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": available_in,
"available_in": ["forge", "helm", "orchestrator"],
}
except ImportError:
pass
# ── Image tools ────────────────────────────────────────────────────────────
try:
from creative.tools.image_tools import IMAGE_TOOL_CATALOG
for tool_id, info in IMAGE_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["pixel", "orchestrator"],
}
except ImportError:
pass
# ── Music tools ────────────────────────────────────────────────────────────
try:
from creative.tools.music_tools import MUSIC_TOOL_CATALOG
for tool_id, info in MUSIC_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["lyra", "orchestrator"],
}
except ImportError:
pass
# ── Video tools ────────────────────────────────────────────────────────────
try:
from creative.tools.video_tools import VIDEO_TOOL_CATALOG
for tool_id, info in VIDEO_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["reel", "orchestrator"],
}
except ImportError:
pass
# ── Creative pipeline ──────────────────────────────────────────────────────
try:
from creative.director import DIRECTOR_TOOL_CATALOG
for tool_id, info in DIRECTOR_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["orchestrator"],
}
except ImportError:
pass
# ── Assembler tools ───────────────────────────────────────────────────────
try:
from creative.assembler import ASSEMBLER_TOOL_CATALOG
for tool_id, info in ASSEMBLER_TOOL_CATALOG.items():
catalog[tool_id] = {
"name": info["name"],
"description": info["description"],
"available_in": ["reel", "orchestrator"],
}
except ImportError:
pass

View File

@@ -78,11 +78,6 @@ DEFAULT_MAX_UTTERANCE = 30.0 # safety cap — don't record forever
DEFAULT_SESSION_ID = "voice"
def _rms(block: np.ndarray) -> float:
"""Compute root-mean-square energy of an audio block."""
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
@dataclass
class VoiceConfig:
"""Configuration for the voice loop."""
@@ -166,6 +161,13 @@ class VoiceLoop:
min_blocks = int(self.config.min_utterance / 0.1)
max_blocks = int(self.config.max_utterance / 0.1)
audio_chunks: list[np.ndarray] = []
silent_count = 0
recording = False
def _rms(block: np.ndarray) -> float:
return float(np.sqrt(np.mean(block.astype(np.float32) ** 2)))
sys.stdout.write("\n 🎤 Listening... (speak now)\n")
sys.stdout.flush()
@@ -175,70 +177,42 @@ class VoiceLoop:
dtype="float32",
blocksize=block_size,
) as stream:
chunks = self._capture_audio_blocks(stream, block_size, silence_blocks, max_blocks)
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
return self._finalize_utterance(chunks, min_blocks, sr)
rms = _rms(block)
def _capture_audio_blocks(
self,
stream,
block_size: int,
silence_blocks: int,
max_blocks: int,
) -> list[np.ndarray]:
"""Read audio blocks from *stream* until silence or safety cap.
Returns the list of captured audio blocks (may be empty if no
speech was detected).
"""
audio_chunks: list[np.ndarray] = []
silent_count = 0
recording = False
while self._running:
block, overflowed = stream.read(block_size)
if overflowed:
logger.debug("Audio buffer overflowed")
rms = _rms(block)
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
audio_chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
audio_chunks.append(block.copy())
if rms < self.config.silence_threshold:
silent_count += 1
if not recording:
if rms > self.config.silence_threshold:
recording = True
silent_count = 0
audio_chunks.append(block.copy())
sys.stdout.write(" 📢 Recording...\r")
sys.stdout.flush()
else:
silent_count = 0
audio_chunks.append(block.copy())
if silent_count >= silence_blocks:
break
if rms < self.config.silence_threshold:
silent_count += 1
else:
silent_count = 0
if len(audio_chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
# End of utterance
if silent_count >= silence_blocks:
break
return audio_chunks
# Safety cap
if len(audio_chunks) >= max_blocks:
logger.info("Max utterance length reached, stopping.")
break
@staticmethod
def _finalize_utterance(
chunks: list[np.ndarray], min_blocks: int, sample_rate: int
) -> np.ndarray | None:
"""Concatenate captured chunks and report duration.
Returns None if the utterance is too short (below *min_blocks*).
"""
if not chunks or len(chunks) < min_blocks:
if not audio_chunks or len(audio_chunks) < min_blocks:
return None
audio = np.concatenate(chunks, axis=0).flatten()
duration = len(audio) / sample_rate
audio = np.concatenate(audio_chunks, axis=0).flatten()
duration = len(audio) / sr
sys.stdout.write(f" ✂️ Captured {duration:.1f}s of audio\n")
sys.stdout.flush()
return audio
@@ -395,33 +369,15 @@ class VoiceLoop:
# ── Main Loop ───────────────────────────────────────────────────────
# Whisper hallucinates these on silence/noise — skip them.
_WHISPER_HALLUCINATIONS = frozenset(
{
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
}
)
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
# Spoken phrases that end the voice session.
_EXIT_COMMANDS = frozenset(
{
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
}
)
# Suppress MCP / Agno stderr noise during voice mode.
_suppress_mcp_noise()
# Suppress MCP async-generator teardown tracebacks on exit.
_install_quiet_asyncgen_hooks()
def _log_banner(self) -> None:
"""Log the startup banner with STT/TTS/LLM configuration."""
tts_label = (
"macOS say"
if self.config.use_say_fallback
@@ -437,50 +393,52 @@ class VoiceLoop:
" Press Ctrl-C to exit.\n" + "=" * 60
)
def _is_hallucination(self, text: str) -> bool:
"""Return True if *text* is a known Whisper hallucination."""
return not text or text.lower() in self._WHISPER_HALLUCINATIONS
def _is_exit_command(self, text: str) -> bool:
"""Return True if the user asked to stop the voice session."""
return text.lower().strip().rstrip(".!") in self._EXIT_COMMANDS
def _process_turn(self, text: str) -> None:
"""Handle a single listen-think-speak turn after transcription."""
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
self._speak(response)
def run(self) -> None:
"""Run the voice loop. Blocks until Ctrl-C."""
self._ensure_piper()
_suppress_mcp_noise()
_install_quiet_asyncgen_hooks()
self._log_banner()
self._running = True
try:
while self._running:
# 1. LISTEN — record until silence
audio = self._record_utterance()
if audio is None:
continue
# 2. TRANSCRIBE — Whisper STT
text = self._transcribe(audio)
if self._is_hallucination(text):
if not text or text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
):
# Whisper hallucinations on silence/noise
logger.debug("Ignoring likely Whisper hallucination: '%s'", text)
continue
if self._is_exit_command(text):
sys.stdout.write(f"\n 👤 You: {text}\n")
sys.stdout.flush()
# Exit commands
if text.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
):
logger.info("👋 Goodbye!")
break
self._process_turn(text)
# 3. THINK — send to Timmy
response = self._think(text)
sys.stdout.write(f" 🤖 Timmy: {response}\n")
sys.stdout.flush()
# 4. SPEAK — TTS output
self._speak(response)
except KeyboardInterrupt:
logger.info("👋 Voice loop stopped.")

View File

@@ -15,7 +15,7 @@ except ImportError:
np = None
try:
from timmy.voice_loop import VoiceConfig, VoiceLoop, _rms, _strip_markdown
from timmy.voice_loop import VoiceConfig, VoiceLoop, _strip_markdown
except ImportError:
pass # pytestmark will skip all tests anyway
@@ -236,7 +236,6 @@ class TestHallucinationFilter:
"""Whisper tends to hallucinate on silence/noise. The loop should filter these."""
def test_known_hallucinations_filtered(self):
loop = VoiceLoop()
hallucinations = [
"you",
"thanks.",
@@ -244,35 +243,33 @@ class TestHallucinationFilter:
"Bye.",
"Thanks for watching!",
"Thank you for watching!",
"",
]
for text in hallucinations:
assert loop._is_hallucination(text), f"'{text}' should be filtered"
def test_real_speech_not_filtered(self):
loop = VoiceLoop()
assert not loop._is_hallucination("Hello Timmy")
assert not loop._is_hallucination("What time is it?")
assert text.lower() in (
"you",
"thanks.",
"thank you.",
"bye.",
"",
"thanks for watching!",
"thank you for watching!",
), f"'{text}' should be filtered"
class TestExitCommands:
"""Voice loop should recognize exit commands."""
def test_exit_commands(self):
loop = VoiceLoop()
exits = ["goodbye", "exit", "quit", "stop", "goodbye timmy", "stop listening"]
for cmd in exits:
assert loop._is_exit_command(cmd), f"'{cmd}' should be an exit command"
def test_exit_with_punctuation(self):
loop = VoiceLoop()
assert loop._is_exit_command("goodbye!")
assert loop._is_exit_command("stop.")
def test_non_exit_commands(self):
loop = VoiceLoop()
assert not loop._is_exit_command("hello")
assert not loop._is_exit_command("what time is it")
assert cmd.lower().strip().rstrip(".!") in (
"goodbye",
"exit",
"quit",
"stop",
"goodbye timmy",
"stop listening",
), f"'{cmd}' should be an exit command"
class TestPlayAudio:
@@ -336,28 +333,3 @@ class TestSpeakSetsFlag:
# After speak
assert loop._speaking is False
class TestRms:
def test_rms_of_silence(self):
block = np.zeros(1600, dtype=np.float32)
assert _rms(block) == 0.0
def test_rms_of_signal(self):
block = np.ones(1600, dtype=np.float32) * 0.5
assert abs(_rms(block) - 0.5) < 1e-5
class TestFinalizeUtterance:
def test_returns_none_for_empty(self):
assert VoiceLoop._finalize_utterance([], min_blocks=5, sample_rate=16000) is None
def test_returns_none_below_min(self):
chunks = [np.zeros(1600, dtype=np.float32) for _ in range(3)]
assert VoiceLoop._finalize_utterance(chunks, min_blocks=5, sample_rate=16000) is None
def test_concatenates_chunks(self):
chunks = [np.ones(1600, dtype=np.float32) for _ in range(5)]
result = VoiceLoop._finalize_utterance(chunks, min_blocks=3, sample_rate=16000)
assert result is not None
assert len(result) == 8000