Files
var-monorepo/apps/mediasoup-server/modules/mediasoup/Channel.ts
PxlLoewe 4862f73612 livekit
2025-03-16 13:56:18 -07:00

318 lines
13 KiB
TypeScript

import { Worker } from 'mediasoup/node/lib/Worker';
import { types as MediasoupTypes } from 'mediasoup';
import logger from 'modules/winston/logger';
import { Router } from 'mediasoup/node/lib/Router';
import { DtlsParameters, WebRtcTransport } from 'mediasoup/node/lib/WebRtcTransport';
import { Socket } from 'socket.io';
import { UserDocument } from 'models/user';
import { MediaKind, RtpCapabilities, RtpParameters } from 'mediasoup/node/lib/RtpParameters';
import { EventEmitter } from 'stream';
import { ServerTransportParams } from '@common/types/mediasoup';
import { ConsumerOptions } from 'mediasoup-client/lib/Consumer';
import { routerOptions, webRtcTransportConfig } from './config';
/* *
* Represents a Voice-Channel: all transports, Producers and Consumers are managed using this Class
* Does NOT manages events for when a user joins/ leaves the channel
*/
export class MediasoupChannel extends EventEmitter {
worker: Worker;
router: Router | undefined;
transports: WebRtcTransport[] = [];
producers: MediasoupTypes.Producer[] = [];
consumers: MediasoupTypes.Consumer[] = [];
constructor(worker: Worker) {
super();
this.worker = worker;
this.init();
}
async init() {
this.router = await this.worker.createRouter(/* routerOptions */ routerOptions);
}
handleSocket(socket: Socket, user: UserDocument) {
if (!this.router || !this.worker) {
logger.warn('Rejected user connection: channel not yet initialized', { system: 'mediasoup' });
return socket.emit('error-message', { error: 'channel not yet initialized' });
}
if (this.producers.length >= Number(process.env.MAX_VOICE_CONNECTIONS)) {
logger.warn('Rejected user connection: to many connections', { system: 'mediasoup' });
return socket.emit('error-message', { error: 'to many connections', system: 'mediasoup' });
}
socket.emit('sfu-ready');
socket.on('get-rtp-capabilities', (callback) => {
const { rtpCapabilities } = this.router!;
callback(rtpCapabilities);
});
socket.on('create-webrtc-transport', async (callback) => {
try {
const transport = await this.createWebRtcTransport();
this.transports.push(transport);
// how to delete unused transports which never connected to a client?
// For now:
socket.once('disconnect', () => {
transport.close();
this.transports = this.transports.filter((t) => t.id !== transport.id);
});
// send the parameters for the created transport back to the client
// https://mediasoup.org/documentation/v3/mediasoup-client/api/#TransportOptions
callback({
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters
} as ServerTransportParams);
} catch (err) {
const error = err as Error;
callback({
error: error.message
});
}
});
socket.on(
'transport-connect',
async ({ dtlsParameters, transportId }: { dtlsParameters: DtlsParameters; transportId: string }) => {
try {
const transport = this.transports.find((t) => t.id === transportId);
transport?.on('@close', () => {
this.transports = this.transports.filter((t) => t.id !== transportId);
});
if (!transport) return;
await transport.connect({ dtlsParameters });
} catch (error) {
logger.warn('cannot connect transport', { service: 'mediasoup' });
socket.emit('error-message', { error: (error as Error).message });
}
}
);
socket.on(
'transport-produce',
async (
{
kind,
rtpParameters,
transportId
}: { kind: MediaKind; rtpParameters: RtpParameters; transportId: string },
callback
) => {
try {
const transport = this.transports.find((t) => t.id === transportId);
if (!transport) throw Error('transport not found');
const producer = await transport.produce({
kind,
rtpParameters,
paused: true
});
// DEBUG
this.producers.push(producer);
this.emit('new-producer', { producerId: producer.id, user: user.getPublicUser() });
producer.appData.user = user.getPublicUser();
producer.observer.on('pause', () => {
this.emit('producer-paused', { producerId: producer.id });
});
producer.observer.on('resume', () => {
this.emit('producer-resumed', { producerId: producer.id });
});
producer.on('transportclose', () => {
this.producers = this.producers.filter((p) => p.id !== producer.id);
this.emit('producer-closed', { producerId: producer.id });
producer.close();
});
// Send back to the client the Producer's id
callback({
id: producer.id
});
} catch (err) {
logger.warn(`Error while creating Producer on Transport! ${err}`, { service: 'mediasoup' });
const error = err as Error;
callback({ error: error.message });
}
}
);
socket.on(
'transport-consume',
async (
{
rtpCapabilities,
producerId,
transportId
}: {
rtpCapabilities: RtpCapabilities;
producerId: string;
transportId: string;
},
callback
) => {
try {
const transport = this.transports.find((t) => t.id === transportId);
if (!transport) {
socket.disconnect();
throw Error('transport not found');
}
// check if the router can consume the specified producer
if (
this.router!.canConsume({
producerId,
rtpCapabilities
})
) {
// transport can now consume and return a consumer
const producer = this.producers.find((p) => p.id === producerId);
if (!producer) {
socket.disconnect();
throw Error(`producer ${producerId} not found`);
}
const consumer = await transport.consume({
producerId,
rtpCapabilities,
appData: { user, producerId },
paused: producer.paused // Important: otherwise remote video stays black, no audio
});
this.consumers.push(consumer);
// Cannot detect when consumer is closed dirrectly
// Assuming that:
// 1. when producer is closed, consumer is also closed
// 2. when transport is closed, consumer is also closed
// 2. and when socket is closed, consumer is also closed
consumer.on('transportclose', () => {
consumer.close();
});
consumer.observer.on('close', () => {
this.consumers = this.consumers.filter((c) => c.id !== consumer.id);
});
// Bad, because it adds a listener for each consumer
/* socket.once('disconnect', () => {
consumer.close();
this.consumers = this.consumers.filter((c) => c.id !== consumer.id);
}); */
consumer.on('producerclose', () => {
consumer.close();
});
// from the consumer extract the following params
// to send back to the Client
const consumerOptions: ConsumerOptions = {
id: consumer.id,
producerId,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
appData: { user }
};
// send the parameters to the client
callback({ consumerOptions });
} else {
logger.warn(`Cannot consume producer with id: ${producerId}`, { service: 'mediasoup' });
callback({
params: {
error: true
}
});
}
} catch (err) {
const error = err as Error;
logger.error(`error while creating consumer ${error}`, { service: 'Mediasoup' });
callback({
error
});
}
}
);
socket.on('consumer-resume', async ({ consumerId }: { consumerId: string }, resumeCallback) => {
logger.silly(`Resuming consumer with id ${consumerId}`, { service: 'mediasoup' });
const consumer = this.consumers.find((c) => c.id === consumerId);
if (!consumer) return resumeCallback({ error: 'consumer not found' });
await consumer.resume();
if (typeof resumeCallback === 'function') {
resumeCallback();
}
return true;
});
socket.on('consumer-pause', async ({ consumerId }: { consumerId: string }, pauseCallback) => {
logger.silly(`Pausing consumer with id ${consumerId}`, { service: 'mediasoup' });
const consumer = this.consumers.find((c) => c.id === consumerId);
if (!consumer) return pauseCallback({ error: 'consumer not found' });
await consumer.pause();
if (typeof pauseCallback === 'function') {
pauseCallback();
}
return true;
});
socket.on(
'producer-resume',
async ({ producerId, source }: { producerId: string; source: string }, resumeCallback) => {
logger.silly(`Resuming producer with id ${producerId}`, { service: 'mediasoup' });
const producer = this.producers.find((p) => p.id === producerId);
if (!producer) return resumeCallback({ error: 'producer not found' });
if (source === 'admin') {
this.producers.forEach((p) => p.pause());
this.emit('producer-pausing-forced', { user: user.getPublicUser() });
}
await producer.resume();
if (resumeCallback) {
resumeCallback();
}
return true;
}
);
socket.on('producer-pause', async ({ producerId }: { producerId: string }, pauseCallback) => {
logger.silly(`Pausing producer with id ${producerId}`, { service: 'mediasoup' });
const producer = this.producers.find((p) => p.id === producerId);
if (!producer) return pauseCallback({ error: 'producer not found' });
await producer.pause();
if (typeof pauseCallback === 'function') {
pauseCallback();
}
return true;
});
return true;
}
async createWebRtcTransport(): Promise<WebRtcTransport> {
// https://mediasoup.org/documentation/v3/mediasoup/api/#router-createWebRtcTransport
const transport = await this.router!.createWebRtcTransport(webRtcTransportConfig);
transport.on('dtlsstatechange', (dtlsState) => {
// Not reliable
logger.silly('transport dtlsstatechange', { dtlsState, service: 'mediasoup' });
if (dtlsState === 'closed') {
transport.close();
this.transports = this.transports.filter((t) => t.id !== transport.id);
}
});
return transport;
}
}