Source code for discord.voice.gateway

# -*- coding: utf-8 -*-
# cython: language_level=3
# Copyright (c) 2021-present VincentRPS

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE
"""The V4 Voice Gateway Impl"""

import asyncio
import json
import logging
import struct
import time
from random import random

import aiohttp

from ..state import ConnectionState

_log = logging.getLogger(__name__)


[docs]class VoiceGateway: def __init__(self, state: ConnectionState, guild_id: int, hook): self.state = state self.hook = hook self.server_id = guild_id self.session_id: int = None self.secret_key: str = None self.kept_alive: bool = False self.ws: aiohttp.ClientWebSocketResponse = None async def hook(self, *args): pass def update_session_id(self, id: int): self.session_id = id async def send_json(self, payload: dict): _log.debug('Sending payload data %s via the voice gateway', payload) await self.ws.send_str(json.dumps(payload)) async def resume(self): payload = { 'op': 7, 'd': { 'token': self.client.token, 'server_id': str(self.client.guild_id), 'session_id': self.client.session_id, }, } await self.send_json(payload) async def identify(self): payload = { 'op': 0, 'd': { 'server_id': str(self.server_id), 'user_id': str(self.state.app.user.id), 'session_id': self.client.session_id, 'token': self.client.token, }, } await self.send_json(payload) async def connect(self, resume: bool = False): self.ws = await aiohttp.ClientSession().ws_connect(self.gateway, compress=15) if not resume: await self.identify() self.client._state.loop.create_task(self.recv()) else: await self.resume() self.client._state.loop.create_task(self.recv()) # i know this is a loss of customization here but doing this makes, # the development experience so much easier and since i wouldn't guess people are customizing here # it's ok. @classmethod async def voice_client_entry(cls, client, *, hook=None): gateway = 'wss://' + client.endpoint + '/?v=4' self = cls(client._state, client.guild_id, hook=hook) self.gateway = gateway self.client = client return self async def select_protocol(self, ip, port, mode): payload = { 'op': 1, 'd': { 'protocol': 'udp', 'data': { 'address': ip, 'port': port, 'mode': mode, }, }, } await self.send_json(payload) async def client_connect(self): payload = {'op': 12, 'd': {'audio_ssrc': self.client.ssrc}} await self.send_json(payload) async def speak(self, state=1): payload = { 'op': 5, 'd': { 'speaking': int(state), 'delay': 0, }, } await self.send_json(payload) async def recv(self): async for msg in self.ws: if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) op = data['op'] d = data.get('d') _log.debug('> %s', data) if op == 2: await self.ready(d) elif op == 6: self.kept_alive = True _log.debug('> %s, kept the connection alive', d) elif op == 9: _log.info('Resumed connection successfully') elif op == 4: _log.info('Received session description') self.client.mode = d['mode'] await self.load_secret_key(d) elif op == 8: raw_interval = d['heartbeat_interval'] / 1000.0 self.interval = min(raw_interval, 5.0) await self.hello(interval=self.interval) close_code = self.ws.close_code if close_code is None: return else: await self.closed(close_code) async def closed(self, code): _log.info('Voice websocket closed with code %s', code) if code == 4001: raise RuntimeError('Invalid opcode was sent') elif code == 4002: raise RuntimeError("Payload failed to decode (server side)") elif code == 4003: raise RuntimeError('Not authenticated') elif code == 4004: raise RuntimeError("Authentication failed") elif code == 4005: raise RuntimeError('Already authenticated') elif code == 4006: await self.connect(False) _log.error("Session is no longer valid") return elif code == 4009: raise RuntimeError("Session timed out") elif code == 4011: raise RuntimeError("Server not found") elif code == 4012: raise RuntimeError("Unknown protocol") elif code == 4014: raise RuntimeError("Disconnected") elif code == 4015: raise RuntimeError("Voice Server Crashed") elif code == 4016: raise RuntimeError("Unknown encyption code") await self.connect(True) async def heartbeat(self, interval: float, /): if not self.ws.closed: if not self.kept_alive: await self.close(1008) await self.connect(resume=True) self.kept_alive = False await self.send_json({'op': 3, 'd': int(time.time() * 1000)}) await asyncio.sleep(interval) self.state.loop.create_task(self.heartbeat(interval)) async def ready(self, data: dict): client = self.client client.ssrc = data['ssrc'] client.voice_port = data['port'] client.endpoint_ip = data['ip'] packet = bytearray(70) struct.pack_into('>H', packet, 0, 1) struct.pack_into('>H', packet, 2, 70) struct.pack_into('>I', packet, 4, client.ssrc) client.socket.sendto(packet, (client.endpoint_ip, client.voice_port)) recv = await self.state.loop.sock_recv(client.socket, 70) _log.debug('Received initial connection: %s', recv) ip_end = recv.index(0, 4) client.ip = recv[4:ip_end].decode('ascii') client.port = struct.unpack_from('>H', recv, len(recv) - 2) _log.debug('IP was detected: %s, port: %s', client.ip, client.port) modes = [mode for mode in data['modes'] if mode in self.client.supported_modes] _log.debug('Using following encryption mode(s): %s', modes) mode = modes[0] await self.select_protocol(client.ip, client.port, mode) _log.info('Using voice protocol: %s', mode) async def hello(self, interval: float): await asyncio.sleep(interval) self.kept_alive = ( True # exception to not kill the connection when saying hello. ) await self.heartbeat(interval) async def load_secret_key(self, data: dict): self.secret_key = self.client.secret_key = data.get('secret_key') await self.speak() await self.speak(False) async def close(self, code: int): await self.ws.close(code=code)