From 3ea039684ee23f7890be86cb3c548a6f3e748751 Mon Sep 17 00:00:00 2001 From: 0xbyt4 <35742124+0xbyt4@users.noreply.github.com> Date: Sun, 15 Mar 2026 13:48:56 +0300 Subject: [PATCH] test(voice): add integration tests with real NaCl crypto and Opus codec End-to-end voice channel tests using real crypto (no mocks): NaCl decrypt (5): valid packet, wrong key, bot SSRC, multi-packet, multi-SSRC DAVE passthrough (3): unknown SSRC, Unencrypted error, real error drop Full flow (5): utterance lifecycle, auto-map, pause/resume, corruption, cleanup SPEAKING hook (4): hook installed, map/overwrite, mapped audio processed Auth filtering (3): allowed user, rejected user, empty allowlist Rejoin flow (3): clean state, new SSRC, missing SPEAKING auto-map Multi-guild (2): independent receivers, stop isolation Echo prevention (2): paused audio ignored, resumed audio processed --- tests/integration/test_voice_channel_flow.py | 611 +++++++++++++++++++ 1 file changed, 611 insertions(+) create mode 100644 tests/integration/test_voice_channel_flow.py diff --git a/tests/integration/test_voice_channel_flow.py b/tests/integration/test_voice_channel_flow.py new file mode 100644 index 000000000..096ef9d3f --- /dev/null +++ b/tests/integration/test_voice_channel_flow.py @@ -0,0 +1,611 @@ +"""Integration tests for Discord voice channel audio flow. + +Uses real NaCl encryption and Opus codec (no mocks for crypto/codec). +Does NOT require a Discord connection — tests the VoiceReceiver +packet processing pipeline end-to-end. + +Requires: PyNaCl>=1.5.0, discord.py[voice] (opus codec) +""" + +import struct +import time +import pytest + +pytestmark = pytest.mark.integration + +# Skip entire module if voice deps are missing +pytest.importorskip("nacl.secret", reason="PyNaCl required for voice integration tests") +discord = pytest.importorskip("discord", reason="discord.py required for voice integration tests") + +import nacl.secret + +try: + if not discord.opus.is_loaded(): + import ctypes.util + opus_path = ctypes.util.find_library("opus") + if not opus_path: + import sys + for p in ("/opt/homebrew/lib/libopus.dylib", "/usr/local/lib/libopus.dylib"): + import os + if os.path.isfile(p): + opus_path = p + break + if opus_path: + discord.opus.load_opus(opus_path) + OPUS_AVAILABLE = discord.opus.is_loaded() +except Exception: + OPUS_AVAILABLE = False + +from types import SimpleNamespace +from unittest.mock import MagicMock +from gateway.platforms.discord import VoiceReceiver + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_secret_key(): + """Generate a random 32-byte key.""" + import os + return os.urandom(32) + + +def _build_encrypted_rtp_packet(secret_key, opus_payload, ssrc=100, seq=1, timestamp=960): + """Build a real NaCl-encrypted RTP packet matching Discord's format. + + Format: RTP header (12 bytes) + encrypted(opus) + 4-byte nonce + Encryption: aead_xchacha20_poly1305 with RTP header as AAD. + """ + # RTP header: version=2, payload_type=0x78, no extension, no CSRC + header = struct.pack(">BBHII", 0x80, 0x78, seq, timestamp, ssrc) + + # Encrypt with NaCl AEAD + box = nacl.secret.Aead(secret_key) + nonce_counter = struct.pack(">I", seq) # 4-byte counter as nonce seed + # Full 24-byte nonce: counter in first 4 bytes, rest zeros + full_nonce = nonce_counter + b'\x00' * 20 + + enc_msg = box.encrypt(opus_payload, header, full_nonce) + ciphertext = enc_msg.ciphertext # without nonce prefix + + # Discord format: header + ciphertext + 4-byte nonce + return header + ciphertext + nonce_counter + + +def _make_voice_receiver(secret_key, dave_session=None, bot_ssrc=9999, + allowed_user_ids=None, members=None): + """Create a VoiceReceiver with real secret key.""" + vc = MagicMock() + vc._connection.secret_key = list(secret_key) + vc._connection.dave_session = dave_session + vc._connection.ssrc = bot_ssrc + vc._connection.add_socket_listener = MagicMock() + vc._connection.remove_socket_listener = MagicMock() + vc._connection.hook = None + vc.user = SimpleNamespace(id=bot_ssrc) + vc.channel = MagicMock() + vc.channel.members = members or [] + receiver = VoiceReceiver(vc, allowed_user_ids=allowed_user_ids) + receiver.start() + return receiver + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestRealNaClDecrypt: + """End-to-end: real NaCl encrypt → _on_packet decrypt → buffer.""" + + def test_valid_encrypted_packet_buffered(self): + """Real NaCl encrypted packet → decrypted → buffered.""" + key = _make_secret_key() + opus_silence = b'\xf8\xff\xfe' + receiver = _make_voice_receiver(key) + + packet = _build_encrypted_rtp_packet(key, opus_silence, ssrc=100) + receiver._on_packet(packet) + + assert 100 in receiver._buffers + assert len(receiver._buffers[100]) > 0 + + def test_wrong_key_packet_dropped(self): + """Packet encrypted with wrong key → NaCl fails → not buffered.""" + real_key = _make_secret_key() + wrong_key = _make_secret_key() + opus_silence = b'\xf8\xff\xfe' + receiver = _make_voice_receiver(real_key) + + packet = _build_encrypted_rtp_packet(wrong_key, opus_silence, ssrc=100) + receiver._on_packet(packet) + + assert len(receiver._buffers.get(100, b"")) == 0 + + def test_bot_ssrc_ignored(self): + """Packet from bot's own SSRC → ignored.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key, bot_ssrc=9999) + + packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=9999) + receiver._on_packet(packet) + + assert len(receiver._buffers) == 0 + + def test_multiple_packets_accumulate(self): + """Multiple valid packets → buffer grows.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + + for seq in range(1, 6): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + assert 100 in receiver._buffers + buf_size = len(receiver._buffers[100]) + assert buf_size > 0, "Multiple packets should accumulate in buffer" + + def test_different_ssrcs_separate_buffers(self): + """Packets from different SSRCs → separate buffers.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + + for ssrc in [100, 200, 300]: + packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=ssrc) + receiver._on_packet(packet) + + assert len(receiver._buffers) == 3 + for ssrc in [100, 200, 300]: + assert ssrc in receiver._buffers + + +class TestRealNaClWithDAVE: + """NaCl decrypt + DAVE passthrough scenarios with real crypto.""" + + def test_dave_unknown_ssrc_passthrough(self): + """DAVE enabled but SSRC unknown → skip DAVE, buffer audio.""" + key = _make_secret_key() + dave = MagicMock() # DAVE session present but SSRC not mapped + receiver = _make_voice_receiver(key, dave_session=dave) + + packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100) + receiver._on_packet(packet) + + # DAVE decrypt not called (SSRC unknown) + dave.decrypt.assert_not_called() + # Audio still buffered via passthrough + assert 100 in receiver._buffers + assert len(receiver._buffers[100]) > 0 + + def test_dave_unencrypted_error_passthrough(self): + """DAVE raises 'Unencrypted' → use NaCl-decrypted data as-is.""" + key = _make_secret_key() + dave = MagicMock() + dave.decrypt.side_effect = Exception( + "DecryptionFailed(UnencryptedWhenPassthroughDisabled)" + ) + receiver = _make_voice_receiver(key, dave_session=dave) + receiver.map_ssrc(100, 42) + + packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100) + receiver._on_packet(packet) + + # DAVE was called but failed → passthrough + dave.decrypt.assert_called_once() + assert 100 in receiver._buffers + assert len(receiver._buffers[100]) > 0 + + def test_dave_real_error_drops(self): + """DAVE raises non-Unencrypted error → packet dropped.""" + key = _make_secret_key() + dave = MagicMock() + dave.decrypt.side_effect = Exception("KeyRotationFailed") + receiver = _make_voice_receiver(key, dave_session=dave) + receiver.map_ssrc(100, 42) + + packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100) + receiver._on_packet(packet) + + assert len(receiver._buffers.get(100, b"")) == 0 + + +class TestFullVoiceFlow: + """End-to-end: encrypt → receive → buffer → silence detect → complete.""" + + def test_single_utterance_flow(self): + """Encrypt packets → buffer → silence → check_silence returns utterance.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(100, 42) + + # Send enough packets to exceed MIN_SPEECH_DURATION (0.5s) + # At 48kHz stereo 16-bit, each Opus silence frame decodes to ~3840 bytes + # Need 96000 bytes = ~25 frames + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + # Simulate silence by setting last_packet_time in the past + receiver._last_packet_time[100] = time.monotonic() - 3.0 + + completed = receiver.check_silence() + assert len(completed) == 1 + user_id, pcm_data = completed[0] + assert user_id == 42 + assert len(pcm_data) > 0 + + def test_utterance_with_ssrc_automap(self): + """No SPEAKING event → auto-map sole allowed user → utterance processed.""" + key = _make_secret_key() + members = [ + SimpleNamespace(id=9999, name="Bot"), + SimpleNamespace(id=42, name="Alice"), + ] + receiver = _make_voice_receiver( + key, allowed_user_ids={"42"}, members=members + ) + # No map_ssrc call — simulating missing SPEAKING event + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + receiver._last_packet_time[100] = time.monotonic() - 3.0 + + completed = receiver.check_silence() + assert len(completed) == 1 + assert completed[0][0] == 42 # auto-mapped to sole allowed user + + def test_pause_blocks_during_playback(self): + """Pause receiver → packets ignored → resume → packets accepted.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + + # Pause (echo prevention during TTS playback) + receiver.pause() + packet = _build_encrypted_rtp_packet(key, b'\xf8\xff\xfe', ssrc=100) + receiver._on_packet(packet) + assert len(receiver._buffers.get(100, b"")) == 0 + + # Resume + receiver.resume() + receiver._on_packet(packet) + assert 100 in receiver._buffers + assert len(receiver._buffers[100]) > 0 + + def test_corrupted_packet_ignored(self): + """Corrupted/truncated packet → silently ignored.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + + # Too short + receiver._on_packet(b"\x00" * 5) + assert len(receiver._buffers) == 0 + + # Wrong RTP version + bad_header = struct.pack(">BBHII", 0x00, 0x78, 1, 960, 100) + receiver._on_packet(bad_header + b"\x00" * 20) + assert len(receiver._buffers) == 0 + + # Wrong payload type + bad_pt = struct.pack(">BBHII", 0x80, 0x00, 1, 960, 100) + receiver._on_packet(bad_pt + b"\x00" * 20) + assert len(receiver._buffers) == 0 + + def test_stop_cleans_everything(self): + """stop() clears all state cleanly.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(100, 42) + + for seq in range(1, 10): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + assert len(receiver._buffers[100]) > 0 + + receiver.stop() + assert receiver._running is False + assert len(receiver._buffers) == 0 + assert len(receiver._ssrc_to_user) == 0 + assert len(receiver._decoders) == 0 + + +class TestSPEAKINGHook: + """SPEAKING event hook correctly maps SSRC to user_id.""" + + def test_speaking_hook_installed(self): + """start() installs speaking hook on connection.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + conn = receiver._vc._connection + # hook should be set (wrapped) + assert conn.hook is not None + + def test_map_ssrc_via_speaking(self): + """SPEAKING op 5 event maps SSRC to user_id.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(500, 12345) + assert receiver._ssrc_to_user[500] == 12345 + + def test_map_ssrc_overwrites(self): + """New SPEAKING event for same SSRC overwrites old mapping.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(500, 111) + receiver.map_ssrc(500, 222) + assert receiver._ssrc_to_user[500] == 222 + + def test_speaking_mapped_audio_processed(self): + """After SSRC is mapped, audio from that SSRC gets correct user_id.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(100, 42) + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + receiver._last_packet_time[100] = time.monotonic() - 3.0 + completed = receiver.check_silence() + assert len(completed) == 1 + assert completed[0][0] == 42 + + +class TestAuthFiltering: + """Only allowed users' audio should be processed.""" + + def test_allowed_user_audio_processed(self): + """Allowed user's utterance is returned by check_silence.""" + key = _make_secret_key() + members = [ + SimpleNamespace(id=9999, name="Bot"), + SimpleNamespace(id=42, name="Alice"), + ] + receiver = _make_voice_receiver( + key, allowed_user_ids={"42"}, members=members, + ) + receiver.map_ssrc(100, 42) + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + receiver._last_packet_time[100] = time.monotonic() - 3.0 + completed = receiver.check_silence() + assert len(completed) == 1 + assert completed[0][0] == 42 + + def test_automap_rejects_unallowed_user(self): + """Auto-map refuses to map SSRC to user not in allowed list.""" + key = _make_secret_key() + members = [ + SimpleNamespace(id=9999, name="Bot"), + SimpleNamespace(id=42, name="Alice"), + ] + receiver = _make_voice_receiver( + key, allowed_user_ids={"99"}, # Alice not allowed + members=members, + ) + # No map_ssrc — SSRC unknown, auto-map should reject + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + receiver._last_packet_time[100] = time.monotonic() - 3.0 + completed = receiver.check_silence() + assert len(completed) == 0 + + def test_empty_allowlist_allows_all(self): + """Empty allowed_user_ids means no restriction.""" + key = _make_secret_key() + members = [ + SimpleNamespace(id=9999, name="Bot"), + SimpleNamespace(id=42, name="Alice"), + ] + receiver = _make_voice_receiver( + key, allowed_user_ids=None, members=members, + ) + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + receiver._last_packet_time[100] = time.monotonic() - 3.0 + completed = receiver.check_silence() + # Auto-mapped to sole non-bot member + assert len(completed) == 1 + assert completed[0][0] == 42 + + +class TestRejoinFlow: + """Leave and rejoin: state cleanup and fresh receiver.""" + + def test_stop_then_new_receiver_clean_state(self): + """After stop(), a new receiver starts with empty state.""" + key = _make_secret_key() + receiver1 = _make_voice_receiver(key) + receiver1.map_ssrc(100, 42) + + for seq in range(1, 10): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver1._on_packet(packet) + + assert len(receiver1._buffers[100]) > 0 + receiver1.stop() + + # New receiver (simulates rejoin) + receiver2 = _make_voice_receiver(key) + assert len(receiver2._buffers) == 0 + assert len(receiver2._ssrc_to_user) == 0 + assert len(receiver2._decoders) == 0 + + def test_rejoin_new_ssrc_works(self): + """After rejoin, user may get new SSRC — still works.""" + key = _make_secret_key() + receiver1 = _make_voice_receiver(key) + receiver1.map_ssrc(100, 42) # old SSRC + receiver1.stop() + + receiver2 = _make_voice_receiver(key) + receiver2.map_ssrc(200, 42) # new SSRC after rejoin + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=200, seq=seq, timestamp=960 * seq + ) + receiver2._on_packet(packet) + + receiver2._last_packet_time[200] = time.monotonic() - 3.0 + completed = receiver2.check_silence() + assert len(completed) == 1 + assert completed[0][0] == 42 + + def test_rejoin_without_speaking_event_automap(self): + """Rejoin without SPEAKING event — auto-map sole allowed user.""" + key = _make_secret_key() + members = [ + SimpleNamespace(id=9999, name="Bot"), + SimpleNamespace(id=42, name="Alice"), + ] + + # First session + receiver1 = _make_voice_receiver( + key, allowed_user_ids={"42"}, members=members, + ) + receiver1.stop() + + # Rejoin — new key (Discord may assign new secret_key) + new_key = _make_secret_key() + receiver2 = _make_voice_receiver( + new_key, allowed_user_ids={"42"}, members=members, + ) + # No map_ssrc — simulating missing SPEAKING event + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + new_key, b'\xf8\xff\xfe', ssrc=300, seq=seq, timestamp=960 * seq + ) + receiver2._on_packet(packet) + + receiver2._last_packet_time[300] = time.monotonic() - 3.0 + completed = receiver2.check_silence() + assert len(completed) == 1 + assert completed[0][0] == 42 + + +class TestMultiGuildIsolation: + """Each guild has independent voice state.""" + + def test_separate_receivers_independent(self): + """Two receivers (different guilds) don't interfere.""" + key1 = _make_secret_key() + key2 = _make_secret_key() + + receiver1 = _make_voice_receiver(key1, bot_ssrc=1111) + receiver2 = _make_voice_receiver(key2, bot_ssrc=2222) + + receiver1.map_ssrc(100, 42) + receiver2.map_ssrc(200, 99) + + # Send to receiver1 + for seq in range(1, 10): + packet = _build_encrypted_rtp_packet( + key1, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver1._on_packet(packet) + + # receiver2 should be empty + assert len(receiver2._buffers) == 0 + assert 100 in receiver1._buffers + + def test_stop_one_doesnt_affect_other(self): + """Stopping one receiver doesn't affect another.""" + key1 = _make_secret_key() + key2 = _make_secret_key() + + receiver1 = _make_voice_receiver(key1) + receiver2 = _make_voice_receiver(key2) + + receiver1.map_ssrc(100, 42) + receiver2.map_ssrc(200, 99) + + for seq in range(1, 10): + packet = _build_encrypted_rtp_packet( + key2, b'\xf8\xff\xfe', ssrc=200, seq=seq, timestamp=960 * seq + ) + receiver2._on_packet(packet) + + receiver1.stop() + + # receiver2 still has data + assert receiver2._running is True + assert len(receiver2._buffers[200]) > 0 + + +class TestEchoPreventionFlow: + """Receiver pause/resume during TTS playback prevents echo.""" + + def test_audio_during_pause_ignored(self): + """Audio arriving while paused is completely ignored.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(100, 42) + receiver.pause() + + for seq in range(1, 30): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + assert len(receiver._buffers.get(100, b"")) == 0 + + def test_audio_after_resume_processed(self): + """Audio arriving after resume is processed normally.""" + key = _make_secret_key() + receiver = _make_voice_receiver(key) + receiver.map_ssrc(100, 42) + + # Pause → send packets → resume → send more packets + receiver.pause() + for seq in range(1, 5): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + assert len(receiver._buffers.get(100, b"")) == 0 + + receiver.resume() + for seq in range(5, 35): + packet = _build_encrypted_rtp_packet( + key, b'\xf8\xff\xfe', ssrc=100, seq=seq, timestamp=960 * seq + ) + receiver._on_packet(packet) + + assert len(receiver._buffers[100]) > 0 + receiver._last_packet_time[100] = time.monotonic() - 3.0 + completed = receiver.check_silence() + assert len(completed) == 1 + assert completed[0][0] == 42