This commit is contained in:
nocnico
2025-03-16 22:00:21 +01:00
26 changed files with 1271 additions and 37 deletions

View File

@@ -6,6 +6,8 @@ import { createAdapter } from "@socket.io/redis-adapter";
import { jwtMiddleware } from "modules/socketJWTmiddleware";
import { pubClient, subClient } from "modules/redis";
import { handle } from "socket-events/connect-dispatch";
import router from "routes/router";
import cors from "cors";
const app = express();
const server = createServer(app);
@@ -14,12 +16,15 @@ const io = new Server(server, {
adapter: createAdapter(pubClient, subClient),
cors: {},
});
io.use(jwtMiddleware);
io.on("connection", (socket) => {
socket.on("connect-dispatch", handle(socket, io));
});
app.use(cors());
app.use(router);
server.listen(process.env.PORT, () => {
console.log(`Server running on port ${process.env.PORT}`);
});

View File

@@ -5,4 +5,13 @@ export const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
console.log("Redis connected");
pubClient.keys("dispatchers*").then((keys) => {
if (!keys) return;
keys.forEach(async (key) => {
await pubClient.json.del(key);
});
});
});
pubClient.on("error", (err) => console.log("Redis Client Error", err));
subClient.on("error", (err) => console.log("Redis Client Error", err));

View File

@@ -21,10 +21,12 @@
"@redis/json": "^1.0.7",
"@socket.io/redis-adapter": "^8.3.0",
"axios": "^1.7.9",
"cors": "^2.8.5",
"cron": "^4.1.0",
"dotenv": "^16.4.7",
"express": "^4.21.2",
"jsonwebtoken": "^9.0.2",
"livekit-server-sdk": "^2.10.2",
"nodemailer": "^6.10.0",
"react": "^19.0.0",
"redis": "^4.7.0",

View File

@@ -0,0 +1,38 @@
import { Router } from "express";
import { AccessToken } from "livekit-server-sdk";
if (!process.env.LIVEKIT_API_KEY) throw new Error("LIVEKIT_API_KEY not set");
if (!process.env.LIVEKIT_API_SECRET)
throw new Error("LIVEKIT_API_SECRET not set");
const createToken = async () => {
// If this room doesn't exist, it'll be automatically created when the first
// participant joins
const roomName = "quickstart-room";
// Identifier to be used for participant.
// It's available as LocalParticipant.identity with livekit-client SDK
const participantName = "quickstart-username";
const at = new AccessToken(
process.env.LIVEKIT_API_KEY,
process.env.LIVEKIT_API_SECRET,
{
identity: participantName,
// Token to expire after 10 minutes
ttl: "10m",
},
);
at.addGrant({ roomJoin: true, room: roomName });
return await at.toJwt();
};
const router = Router();
router.get("/token", async (req, res) => {
res.send({
token: await createToken(),
});
});
export default router;

View File

@@ -0,0 +1,8 @@
import { Router } from "express";
import livekitRouter from "./livekit";
const router = Router();
router.use("/livekit", livekitRouter);
export default router;

View File

@@ -11,6 +11,7 @@ export const handle =
selectedZone: string;
}) => {
const userId = socket.data.user.id; // User ID aus dem JWT-Token
console.log("User connected to dispatch server");
await pubClient.json.set(`dispatchers:${socket.id}`, "$", {
logoffTime,
selectedZone,

View File

@@ -1,33 +0,0 @@
export const ChangeRufgruppe = () => {
return (
<>
<details className="dropdown">
<summary className="dropdown flex items-center gap-1">
<svg
xmlns="http://www.w3.org/2000/svg"
className="h-5 w-5"
fill="none"
viewBox="0 0 24 24"
stroke="currentColor"
>
<path
strokeLinecap="round"
strokeLinejoin="round"
strokeWidth="2"
d="M9 19v-6a2 2 0 00-2-2H5a2 2 0 00-2 2v6a2 2 0 002 2h2a2 2 0 002-2zm0 0V9a2 2 0 012-2h2a2 2 0 012 2v10m-6 0a2 2 0 002 2h2a2 2 0 002-2m0 0V5a2 2 0 012-2h2a2 2 0 012 2v14a2 2 0 01-2 2h-2a2 2 0 01-2-2z"
/>
</svg>
<div className="badge badge-soft badge-success">1</div>
</summary>
<ul className="menu dropdown-content bg-base-100 rounded-box z-1 w-52 p-2 shadow-sm">
<li>
<a>Rufgruppe 1</a>
</li>
<li>
<a>Rufgruppe 2</a>
</li>
</ul>
</details>
</>
);
};

View File

@@ -1,11 +1,16 @@
"use client";
import { ToggleTalkButton } from "../ToggleTalkButton";
<<<<<<< HEAD
import { ChangeRufgruppe } from "../ChangeRufgruppe";
=======
import { Notifications } from "../Notifications";
>>>>>>> 4862f736124cb26fdc154299b855a830e1248452
import Link from "next/link";
import { Connection } from "../Connection";
import { ThemeSwap } from "./_components/ThemeSwap";
import { useState } from "react";
import { Audio } from "./_components/Audio";
export default function Navbar() {
const [isDark, setIsDark] = useState(false);
@@ -30,7 +35,7 @@ export default function Navbar() {
<ToggleTalkButton />
</li>
<li>
<ChangeRufgruppe />
<Audio />
</li>
</ul>
</div>

View File

@@ -0,0 +1,159 @@
"use client";
import { useEffect, useState } from "react";
import {
LocalParticipant,
LocalTrackPublication,
Participant,
RemoteParticipant,
RemoteTrack,
RemoteTrackPublication,
Room,
RoomEvent,
Track,
VideoPresets,
} from "livekit-client";
import { connectionStore } from "../../../../_store/connectionStore";
export const Audio = () => {
const connection = connectionStore();
const [token, setToken] = useState("");
const [room, setRoom] = useState<Room | null>(null);
useEffect(() => {
const fetchToken = async () => {
const response = await fetch(
`${process.env.NEXT_PUBLIC_DISPATCH_SERVER_URL}/livekit/token`,
);
const data = await response.json();
setToken(data.token);
};
fetchToken();
}, []);
useEffect(() => {
const joinRoom = async () => {
if (!connection.isConnected) return;
/* if (!token) return;
if (!process.env.NEXT_PUBLIC_LIVEKIT_URL)
return console.error("NEXT_PUBLIC_LIVEKIT_URL not set");
console.log("COnnecting to room", {
token,
url: process.env.NEXT_PUBLIC_LIVEKIT_URL,
}); */
const url = "ws://localhost:7880";
const token =
"eyJhbGciOiJIUzI1NiJ9.eyJ2aWRlbyI6eyJyb29tSm9pbiI6dHJ1ZSwicm9vbSI6InF1aWNrc3RhcnQtcm9vbSJ9LCJpc3MiOiJBUElBbnNHZHRkWXAySG8iLCJleHAiOjE3NDIxNDk3MzAsIm5iZiI6MCwic3ViIjoicXVpY2tzdGFydC11c2VybmFtZSJ9.MVFDpwvjCF_AXjL9Mg40TFoKukZ4F3vOVB4DI_TZhHM";
console.log("Connecting to room", { token, url });
const room = new Room({
// automatically manage subscribed video quality
adaptiveStream: true,
// optimize publishing bandwidth and CPU for published tracks
dynacast: true,
// default capture settings
videoCaptureDefaults: {
resolution: VideoPresets.h720.resolution,
},
});
// pre-warm connection, this can be called as early as your page is loaded
room.prepareConnection(url, token);
// set up event listeners
room
.on(RoomEvent.TrackSubscribed, handleTrackSubscribed)
.on(RoomEvent.TrackUnsubscribed, handleTrackUnsubscribed)
.on(RoomEvent.ActiveSpeakersChanged, handleActiveSpeakerChange)
.on(RoomEvent.Disconnected, handleDisconnect)
.on(RoomEvent.LocalTrackUnpublished, handleLocalTrackUnpublished);
// connect to room
await room.connect(url, token);
console.log("connected to room", room.name);
// publish local camera and mic tracks
await room.localParticipant.enableCameraAndMicrophone();
function handleTrackSubscribed(
track: RemoteTrack,
publication: RemoteTrackPublication,
participant: RemoteParticipant,
) {
if (
track.kind === Track.Kind.Video ||
track.kind === Track.Kind.Audio
) {
// attach it to a new HTMLVideoElement or HTMLAudioElement
const element = track.attach();
}
}
function handleTrackUnsubscribed(
track: RemoteTrack,
publication: RemoteTrackPublication,
participant: RemoteParticipant,
) {
// remove tracks from all attached elements
track.detach();
}
function handleLocalTrackUnpublished(
publication: LocalTrackPublication,
participant: LocalParticipant,
) {
// when local tracks are ended, update UI to remove them from rendering
publication.track?.detach();
}
function handleActiveSpeakerChange(speakers: Participant[]) {
// show UI indicators when participant is speaking
}
function handleDisconnect() {
console.log("disconnected from room");
}
setRoom(room);
};
joinRoom();
return () => {
room?.disconnect();
};
}, [token, connection.isConnected]);
return (
<>
<details className="dropdown">
<summary className="dropdown flex items-center gap-1">
<svg
xmlns="http://www.w3.org/2000/svg"
className="h-5 w-5"
fill="none"
viewBox="0 0 24 24"
stroke="currentColor"
>
<path
strokeLinecap="round"
strokeLinejoin="round"
strokeWidth="2"
d="M9 19v-6a2 2 0 00-2-2H5a2 2 0 00-2 2v6a2 2 0 002 2h2a2 2 0 002-2zm0 0V9a2 2 0 012-2h2a2 2 0 012 2v10m-6 0a2 2 0 002 2h2a2 2 0 002-2m0 0V5a2 2 0 012-2h2a2 2 0 012 2v14a2 2 0 01-2 2h-2a2 2 0 01-2-2z"
/>
</svg>
<div className="badge badge-soft badge-success">1</div>
</summary>
<ul className="menu dropdown-content bg-base-100 rounded-box z-1 w-52 p-2 shadow-sm">
<li>
<a>Rufgruppe 1</a>
</li>
<li>
<a>Rufgruppe 2</a>
</li>
</ul>
</details>
</>
);
};

View File

@@ -11,10 +11,13 @@
"check-types": "tsc --noEmit"
},
"dependencies": {
"@livekit/components-react": "^2.8.1",
"@livekit/components-styles": "^1.1.4",
"@radix-ui/react-icons": "^1.3.2",
"@repo/ui": "*",
"@tailwindcss/postcss": "^4.0.14",
"leaflet": "^1.9.4",
"livekit-client": "^2.9.7",
"next": "^15.1.0",
"next-auth": "^4.24.11",
"postcss": "^8.5.1",

8
apps/mediasoup-server/.d.ts vendored Normal file
View File

@@ -0,0 +1,8 @@
declare module "next-auth/jwt" {
interface JWT {
uid: string;
firstname: string;
lastname: string;
email: string;
}
}

View File

@@ -0,0 +1,4 @@
PORT=3002
REDIS_HOST=localhost
REDIS_PORT=6379
DISPATCH_APP_TOKEN=

View File

@@ -0,0 +1,31 @@
import "dotenv/config";
import express from "express";
import { createServer } from "http";
import { Server } from "socket.io";
import { createAdapter } from "@socket.io/redis-adapter";
import { jwtMiddleware } from "modules/socketJWTmiddleware";
import { pubClient, subClient } from "modules/redis";
const app = express();
const server = createServer(app);
pubClient.keys("dispatchers*").then((keys) => {
if (!keys) return;
keys.forEach(async (key) => {
await pubClient.json.del(key);
});
});
const io = new Server(server, {
adapter: createAdapter(pubClient, subClient),
cors: {},
});
io.use(jwtMiddleware);
io.on("connection", (socket) => {
console.log("User conencted to mediasoup server");
});
server.listen(process.env.PORT, () => {
console.log(`Server running on port ${process.env.PORT}`);
});

View File

@@ -0,0 +1,317 @@
import { Worker } from 'mediasoup/node/lib/Worker';
import { types as MediasoupTypes } from 'mediasoup';
import logger from 'modules/winston/logger';
import { Router } from 'mediasoup/node/lib/Router';
import { DtlsParameters, WebRtcTransport } from 'mediasoup/node/lib/WebRtcTransport';
import { Socket } from 'socket.io';
import { UserDocument } from 'models/user';
import { MediaKind, RtpCapabilities, RtpParameters } from 'mediasoup/node/lib/RtpParameters';
import { EventEmitter } from 'stream';
import { ServerTransportParams } from '@common/types/mediasoup';
import { ConsumerOptions } from 'mediasoup-client/lib/Consumer';
import { routerOptions, webRtcTransportConfig } from './config';
/* *
* Represents a Voice-Channel: all transports, Producers and Consumers are managed using this Class
* Does NOT manages events for when a user joins/ leaves the channel
*/
export class MediasoupChannel extends EventEmitter {
worker: Worker;
router: Router | undefined;
transports: WebRtcTransport[] = [];
producers: MediasoupTypes.Producer[] = [];
consumers: MediasoupTypes.Consumer[] = [];
constructor(worker: Worker) {
super();
this.worker = worker;
this.init();
}
async init() {
this.router = await this.worker.createRouter(/* routerOptions */ routerOptions);
}
handleSocket(socket: Socket, user: UserDocument) {
if (!this.router || !this.worker) {
logger.warn('Rejected user connection: channel not yet initialized', { system: 'mediasoup' });
return socket.emit('error-message', { error: 'channel not yet initialized' });
}
if (this.producers.length >= Number(process.env.MAX_VOICE_CONNECTIONS)) {
logger.warn('Rejected user connection: to many connections', { system: 'mediasoup' });
return socket.emit('error-message', { error: 'to many connections', system: 'mediasoup' });
}
socket.emit('sfu-ready');
socket.on('get-rtp-capabilities', (callback) => {
const { rtpCapabilities } = this.router!;
callback(rtpCapabilities);
});
socket.on('create-webrtc-transport', async (callback) => {
try {
const transport = await this.createWebRtcTransport();
this.transports.push(transport);
// how to delete unused transports which never connected to a client?
// For now:
socket.once('disconnect', () => {
transport.close();
this.transports = this.transports.filter((t) => t.id !== transport.id);
});
// send the parameters for the created transport back to the client
// https://mediasoup.org/documentation/v3/mediasoup-client/api/#TransportOptions
callback({
id: transport.id,
iceParameters: transport.iceParameters,
iceCandidates: transport.iceCandidates,
dtlsParameters: transport.dtlsParameters
} as ServerTransportParams);
} catch (err) {
const error = err as Error;
callback({
error: error.message
});
}
});
socket.on(
'transport-connect',
async ({ dtlsParameters, transportId }: { dtlsParameters: DtlsParameters; transportId: string }) => {
try {
const transport = this.transports.find((t) => t.id === transportId);
transport?.on('@close', () => {
this.transports = this.transports.filter((t) => t.id !== transportId);
});
if (!transport) return;
await transport.connect({ dtlsParameters });
} catch (error) {
logger.warn('cannot connect transport', { service: 'mediasoup' });
socket.emit('error-message', { error: (error as Error).message });
}
}
);
socket.on(
'transport-produce',
async (
{
kind,
rtpParameters,
transportId
}: { kind: MediaKind; rtpParameters: RtpParameters; transportId: string },
callback
) => {
try {
const transport = this.transports.find((t) => t.id === transportId);
if (!transport) throw Error('transport not found');
const producer = await transport.produce({
kind,
rtpParameters,
paused: true
});
// DEBUG
this.producers.push(producer);
this.emit('new-producer', { producerId: producer.id, user: user.getPublicUser() });
producer.appData.user = user.getPublicUser();
producer.observer.on('pause', () => {
this.emit('producer-paused', { producerId: producer.id });
});
producer.observer.on('resume', () => {
this.emit('producer-resumed', { producerId: producer.id });
});
producer.on('transportclose', () => {
this.producers = this.producers.filter((p) => p.id !== producer.id);
this.emit('producer-closed', { producerId: producer.id });
producer.close();
});
// Send back to the client the Producer's id
callback({
id: producer.id
});
} catch (err) {
logger.warn(`Error while creating Producer on Transport! ${err}`, { service: 'mediasoup' });
const error = err as Error;
callback({ error: error.message });
}
}
);
socket.on(
'transport-consume',
async (
{
rtpCapabilities,
producerId,
transportId
}: {
rtpCapabilities: RtpCapabilities;
producerId: string;
transportId: string;
},
callback
) => {
try {
const transport = this.transports.find((t) => t.id === transportId);
if (!transport) {
socket.disconnect();
throw Error('transport not found');
}
// check if the router can consume the specified producer
if (
this.router!.canConsume({
producerId,
rtpCapabilities
})
) {
// transport can now consume and return a consumer
const producer = this.producers.find((p) => p.id === producerId);
if (!producer) {
socket.disconnect();
throw Error(`producer ${producerId} not found`);
}
const consumer = await transport.consume({
producerId,
rtpCapabilities,
appData: { user, producerId },
paused: producer.paused // Important: otherwise remote video stays black, no audio
});
this.consumers.push(consumer);
// Cannot detect when consumer is closed dirrectly
// Assuming that:
// 1. when producer is closed, consumer is also closed
// 2. when transport is closed, consumer is also closed
// 2. and when socket is closed, consumer is also closed
consumer.on('transportclose', () => {
consumer.close();
});
consumer.observer.on('close', () => {
this.consumers = this.consumers.filter((c) => c.id !== consumer.id);
});
// Bad, because it adds a listener for each consumer
/* socket.once('disconnect', () => {
consumer.close();
this.consumers = this.consumers.filter((c) => c.id !== consumer.id);
}); */
consumer.on('producerclose', () => {
consumer.close();
});
// from the consumer extract the following params
// to send back to the Client
const consumerOptions: ConsumerOptions = {
id: consumer.id,
producerId,
kind: consumer.kind,
rtpParameters: consumer.rtpParameters,
appData: { user }
};
// send the parameters to the client
callback({ consumerOptions });
} else {
logger.warn(`Cannot consume producer with id: ${producerId}`, { service: 'mediasoup' });
callback({
params: {
error: true
}
});
}
} catch (err) {
const error = err as Error;
logger.error(`error while creating consumer ${error}`, { service: 'Mediasoup' });
callback({
error
});
}
}
);
socket.on('consumer-resume', async ({ consumerId }: { consumerId: string }, resumeCallback) => {
logger.silly(`Resuming consumer with id ${consumerId}`, { service: 'mediasoup' });
const consumer = this.consumers.find((c) => c.id === consumerId);
if (!consumer) return resumeCallback({ error: 'consumer not found' });
await consumer.resume();
if (typeof resumeCallback === 'function') {
resumeCallback();
}
return true;
});
socket.on('consumer-pause', async ({ consumerId }: { consumerId: string }, pauseCallback) => {
logger.silly(`Pausing consumer with id ${consumerId}`, { service: 'mediasoup' });
const consumer = this.consumers.find((c) => c.id === consumerId);
if (!consumer) return pauseCallback({ error: 'consumer not found' });
await consumer.pause();
if (typeof pauseCallback === 'function') {
pauseCallback();
}
return true;
});
socket.on(
'producer-resume',
async ({ producerId, source }: { producerId: string; source: string }, resumeCallback) => {
logger.silly(`Resuming producer with id ${producerId}`, { service: 'mediasoup' });
const producer = this.producers.find((p) => p.id === producerId);
if (!producer) return resumeCallback({ error: 'producer not found' });
if (source === 'admin') {
this.producers.forEach((p) => p.pause());
this.emit('producer-pausing-forced', { user: user.getPublicUser() });
}
await producer.resume();
if (resumeCallback) {
resumeCallback();
}
return true;
}
);
socket.on('producer-pause', async ({ producerId }: { producerId: string }, pauseCallback) => {
logger.silly(`Pausing producer with id ${producerId}`, { service: 'mediasoup' });
const producer = this.producers.find((p) => p.id === producerId);
if (!producer) return pauseCallback({ error: 'producer not found' });
await producer.pause();
if (typeof pauseCallback === 'function') {
pauseCallback();
}
return true;
});
return true;
}
async createWebRtcTransport(): Promise<WebRtcTransport> {
// https://mediasoup.org/documentation/v3/mediasoup/api/#router-createWebRtcTransport
const transport = await this.router!.createWebRtcTransport(webRtcTransportConfig);
transport.on('dtlsstatechange', (dtlsState) => {
// Not reliable
logger.silly('transport dtlsstatechange', { dtlsState, service: 'mediasoup' });
if (dtlsState === 'closed') {
transport.close();
this.transports = this.transports.filter((t) => t.id !== transport.id);
}
});
return transport;
}
}

View File

@@ -0,0 +1,51 @@
import os from 'os';
import { types as mediasoupTypes } from 'mediasoup';
export const workerSettings: mediasoupTypes.WorkerSettings = {
rtcMinPort: 2000,
rtcMaxPort: 2300,
logLevel: 'debug',
logTags: ['info', 'ice', 'dtls', 'rtp', 'srtp', 'rtcp', 'message']
};
export const routerOptions: mediasoupTypes.RouterOptions = {
mediaCodecs: [
{
kind: 'audio',
mimeType: 'audio/opus',
clockRate: 48000,
channels: 2
},
{
kind: 'video',
mimeType: 'video/VP8',
clockRate: 90000,
parameters: {
'x-google-start-bitrate': 1000
}
}
]
};
export const webRtcTransportConfig: mediasoupTypes.WebRtcTransportOptions = {
// https://mediasoup.org/documentation/v3/mediasoup/api/#WebRtcTransportOptions
listenInfos: [
{
protocol: 'tcp',
ip: '0.0.0.0',
announcedIp: process.env.MEDIASOUP_ANOUNCE_IP // public ip
},
{
protocol: 'udp',
ip: '0.0.0.0',
announcedIp: process.env.MEDIASOUP_ANOUNCE_IP // public ip
}
],
enableUdp: true,
enableTcp: true,
preferUdp: true
};
export default {
numWorkers: Object.keys(os.cpus()).length
};

View File

@@ -0,0 +1,21 @@
import { createWorker as mediasoupCreateWorker } from 'mediasoup';
import logger from 'modules/winston/logger';
import { workerSettings } from './config';
/**
* * For now, each channel uses its own worker
* ! Do not create more Workers than the number of CPU-Cores
*/
export const createWorker = async () => {
const worker = await mediasoupCreateWorker(workerSettings);
logger.info(`Mediasoup worker created`, { service: 'mediasoup' });
worker.on('died', (error) => {
// This implies something serious happened, so kill the application
logger.error(`Mediasoup worker crashed! ${error}`, { error, service: 'mediasoup' });
setTimeout(() => process.exit(1), 2000); // exit in 2 seconds
});
return worker;
};

View File

@@ -0,0 +1,8 @@
import { createClient } from "redis";
export const pubClient = createClient();
export const subClient = pubClient.duplicate();
Promise.all([pubClient.connect(), subClient.connect()]).then(() => {
console.log("Redis connected");
});

View File

@@ -0,0 +1,28 @@
import { ExtendedError, Server, Socket } from "socket.io";
import { prisma } from "@repo/db";
if (!process.env.DISPATCH_APP_TOKEN)
throw new Error("DISPATCH_APP_TOKEN is not defined");
export const jwtMiddleware = async (
socket: Socket,
next: (err?: ExtendedError) => void,
) => {
try {
const { uid } = socket.handshake.auth;
if (!uid) return new Error("Authentication error");
/* const token = socket.handshake.auth?.token;
if (!token) return new Error("Authentication error");
const decoded = jwt.verify(token, process.env.DISPATCH_APP_TOKEN!); */
// socket.data.userId = decoded.; // User ID lokal speichern
const user = await prisma.user.findUniqueOrThrow({
where: { id: uid },
});
socket.data.user = user;
next();
} catch (err) {
console.error(err);
next(new Error("Authentication error"));
}
};

View File

@@ -0,0 +1,5 @@
{
"watch": ["."],
"ext": "ts",
"exec": "tsx index.ts"
}

View File

@@ -0,0 +1,33 @@
{
"name": "mediasoup-server",
"exports": {
"helpers": "./helper"
},
"scripts": {
"dev": "nodemon",
"build": "tsc"
},
"devDependencies": {
"@repo/db": "*",
"@repo/typescript-config": "*",
"@types/express": "^5.0.0",
"@types/node": "^22.13.5",
"@types/nodemailer": "^6.4.17",
"concurrently": "^9.1.2",
"typescript": "latest"
},
"dependencies": {
"@react-email/components": "^0.0.33",
"@redis/json": "^1.0.7",
"@socket.io/redis-adapter": "^8.3.0",
"axios": "^1.7.9",
"cron": "^4.1.0",
"dotenv": "^16.4.7",
"express": "^4.21.2",
"jsonwebtoken": "^9.0.2",
"nodemailer": "^6.10.0",
"react": "^19.0.0",
"redis": "^4.7.0",
"socket.io": "^4.8.1"
}
}

View File

@@ -0,0 +1,158 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Worker } from 'mediasoup/node/lib/types';
import { UserDocument } from 'models/user';
import { MediasoupChannel } from 'modules/mediasoup/Channel';
import { createWorker } from 'modules/mediasoup/worker';
import { Socket } from 'socket.io';
import { User } from '@common/types/user';
import { EventEmitter } from 'stream';
import { setMemberNickname } from 'modules/bot/bot';
import PilotController from './pilot';
interface SfuClient {
socket: Socket;
channelId: string;
user: UserDocument;
}
interface ISfuController {
worker: Worker[];
observer: EventEmitter;
clients: Map<string, SfuClient>;
channel: Map<string, MediasoupChannel>;
init: () => void;
handle: (channelId: string, socket: Socket, user: UserDocument) => void;
disconnectUser: (userId: string) => void;
}
const SfuController: ISfuController = {
clients: new Map(),
channel: new Map(),
observer: new EventEmitter(),
worker: [],
disconnectUser: async (userId: string) => {
SfuController.clients.forEach((client) => {
if (client.user._id.toString() === userId) {
client.socket.emit('error-message', { error: 'DISCONNECTED_BY_ADMIN' });
client.socket.disconnect();
SfuController.clients.delete(client.socket.id);
}
});
},
init: async () => {
// Setup Worker and Channel
// Scalable!
SfuController.worker = [
await createWorker(),
await createWorker(),
await createWorker(),
await createWorker(),
await createWorker()
];
SfuController.channel.set('1', new MediasoupChannel(SfuController.worker[0])); // LST_VAR_RD_01
SfuController.channel.set('2', new MediasoupChannel(SfuController.worker[1])); // LST_VAR_RD_02
SfuController.channel.set('3', new MediasoupChannel(SfuController.worker[2])); // LST_VAR_RD_03
SfuController.channel.set('4', new MediasoupChannel(SfuController.worker[3])); // LST_VAR_RD_04
SfuController.channel.set('x', new MediasoupChannel(SfuController.worker[4])); // LST_VAR_RESERVE
// setup listends for new connections, producer-close event is handled by Channel
SfuController.channel.forEach((channel, channelId) => {
channel.on('producer-pausing-forced', ({ user }: { user: User.PublicUser }) => {
SfuController.clients.forEach((client) => {
if (client.channelId === channelId && client.user._id.toString() !== user.id) {
client.socket.emit('producer-pausing-forced', { user });
}
});
});
channel.on('new-producer', ({ producerId, user }) => {
SfuController.clients.forEach((client) => {
if (client.channelId === channelId) {
client.socket.emit('new-producer', { producerId, user });
}
});
});
channel.on('producer-closed', ({ producerId }) => {
SfuController.clients.forEach((client) => {
if (client.channelId === channelId) {
client.socket.emit('producer-closed', { producerId });
}
});
});
channel.on('producer-paused', ({ producerId }) => {
SfuController.clients.forEach((client) => {
if (client.channelId === channelId) {
client.socket.emit('producer-paused', { producerId });
}
});
});
channel.on('producer-resumed', ({ producerId }) => {
SfuController.clients.forEach((client) => {
if (client.channelId === channelId) {
client.socket.emit('producer-resumed', { producerId });
}
});
});
});
},
handle: (channelId, socket, user) => {
const channel = SfuController.channel.get(channelId);
if (!channel) {
socket.emit('error-message', { error: 'invalid channel id' });
return;
}
// check for double connections
if (
Array.from(SfuController.clients.values()).find(
(client) => client.user._id.toString() === user._id.toString()
) &&
process.env.ALLOW_DOUBLE_CONNECTION === 'false'
) {
socket.emit('error-message', { error: 'DOUBLE_CONNECTION' });
return;
}
SfuController.clients.set(socket.id, { socket, channelId, user });
// Update Discord username for dispatcher
SfuController.observer.emit('channel-changed', channelId, user);
const userInPilot = PilotController.clients.get(user._id.toString());
if (userInPilot) {
userInPilot.voiceChannel = channelId;
PilotController.clients.set(user._id.toString(), userInPilot);
PilotController.observer.emit('stations-changed');
}
channel.handleSocket(socket, user);
socket.on('get-producers', (getPcb) => {
getPcb(channel.producers.map((p) => ({ producerId: p.id, user: p.appData.user, paused: p.paused })));
});
socket.on('disconnect', () => {
socket.removeAllListeners();
SfuController.clients.delete(socket.id);
const userInPilotDisconnect = PilotController.clients.get(user._id.toString());
if (userInPilotDisconnect) {
userInPilotDisconnect.voiceChannel = undefined;
PilotController.clients.set(user._id.toString(), userInPilotDisconnect);
PilotController.observer.emit('stations-changed');
}
});
socket.on('set-should-transmit', (eventData: { shouldTransmit: boolean; source: string }) => {
SfuController.clients.forEach((client) => {
if (client.user._id.toString() === user._id.toString()) {
client.socket.emit('set-should-transmit', eventData);
}
});
});
}
};
SfuController.init();
export default SfuController;

View File

@@ -0,0 +1,11 @@
{
"extends": "@repo/typescript-config/base.json",
"compilerOptions": {
"outDir": "dist",
"allowImportingTsExtensions": false,
"baseUrl": ".",
"jsx": "react"
},
"include": ["**/*.ts", "./index.ts"],
"exclude": ["node_modules", "dist"]
}