paint-brush
Como resolvín o problema de escalado de WebSocket sen romper o bancopor@axotion
13,322 lecturas
13,322 lecturas

Como resolvín o problema de escalado de WebSocket sen romper o banco

por Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Demasiado longo; Ler

Hai moito tempo, tiven que crear un sistema escalable que fose capaz de manexar centos de conexións simultáneas a un custo non moi elevado e cun tempo de resposta razoable, pero non instantáneo. Quería enviar só eventos seleccionados aos clientes de websocket. Creei un módulo Redis pub-sub, xa que pensaba que os meus eventos, para ser visibles noutras instancias, debían transmitirse a través do patrón Redis Pub-sub.
featured image - Como resolvín o problema de escalado de WebSocket sen romper o banco
Kamil Fronczak HackerNoon profile picture

Hai moito tempo, atopeime nunha situación na que tiña que crear un sistema escalable que fose capaz de manexar centos de conexións simultáneas a un custo non moi elevado e cun tempo de resposta razoable, pero non instantáneo.


Os meus primeiros pensamentos? Movemos todas as accións de crear/editar/eliminar á cola e notifiquemos aos usuarios se as súas accións tiveron éxito ou non a través de WebSocket.


Pero daquela, non tiña moita experiencia con WebSockets en produción, polo que o meu primeiro paso foi investigar como funciona coa axuda de titoriais, desbordamento de pilas e outras fontes.


Entón, despois dun tempo, teño unha idea de como debería funcionar e comecei a preparar un código e a xogar durante un tempo cunha ferramenta de probas de carga para simular un alto tráfico.

O primeiro problema

Algunhas preguntas e respostas suxeriron chamar ao método de subscrición na instancia de Redis no cliente WebSocket conectado.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Pero deste xeito, estamos a crear unha nova conexión con Redis, polo que o uso de memoria na nosa aplicación e no grupo de conexións de Redis está aumentando. (Redis permite só 10k conexións a unha instancia)


Foi un gran non para min porque tiven que reducir o uso da memoria ao mínimo.


Neste momento, moitos artigos, afortunadamente, mencionan que non debes crear unha nova conexión Redis en cada cliente WebSocket.

O segundo problema

Despois de crear un gran anaco de código empresarial, cando comecei a parte con sockets web, xurdiu unha pregunta á miña mente: como crealos dunha forma correcta e segura?


Xa tiña algúns eventos no sistema, e algúns deles estaban listos para ser publicados adicionalmente a través de WebSockets, o resto estaban destinados a permanecer dentro do sistema.


A miña promesa de ouro foi que non tería que cambiar o código drasticamente e aínda poder enviar só eventos seleccionados aos clientes de websocket.


Por iso, nun primeiro momento, creei un módulo Redis pub-sub, xa que pensaba que os meus eventos, para ser visibles noutras instancias, tiñan que ser transmitidos a través do patrón Redis pub-sub.


Non te sintas abrumado mirando o módulo a continuación, xa que explicarei detalles máis adiante nun caso de uso


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Este módulo encárgase de crear/expoñer un cliente de Pub Redis e de expoñer un método adicional: registerEvents, que se encarga de escoitar determinados eventos en Redis pub-sub e reemitilos a través do emisor de eventos.


Pode ser un pouco de néboa polo momento. Por que reemitir eventos? Por que temos que inscribirnos neses eventos? Que son EVENT_EMITTER_TOKEN e EVENT_SUBSCRIBER_TOKEN e por que temos que exportalos?


Será máis claro co uso da vida real, así que imos crear un caso de uso: mensaxes de chat. Queremos poder enviar mensaxes vía HTTP POST e recibilas a través de WebSocket na interface.


Imos comezar

Eventos de publicación

Aquí tes un módulo para iso

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


E un evento que emitirá este módulo despois de recibir unha solicitude POST


 export class NewMessageEvent { constructor(public readonly message: string) {} }


No controlador, temos que posibilitar a emisión de eventos tanto para o noso sistema como para a cola do pub Redis. Usaremos envolto

EventEmitter2 para iso


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


E entón, podemos usalo no noso controlador


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Pero antes diso, temos que mellorar o noso evento con PublishableEventInterface para permitir que RedisEventEmitter capte o noso evento e o emita na cola do pub de Redis.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Xenial, agora enviamos os nosos eventos como antes, pero agora, se están marcados como publicables, aterrarán na cola do pub Redis.

Pero agora, temos que facer posible recibir eses eventos en WebSocket, non?

Recepción de eventos

Entón, imos dar unha ollada ao noso módulo de chat de usuarios


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Como podes ver, usamos o método mencionado anteriormente - registerEvents.

Grazas a ese método, dixemos a RedisEventPubSubModule que debería escoitar o noso evento NewMessageEvent na cola pub-sub de Redis no atributo publishableEventName.


Polo tanto, se se produce algún evento NewMessageEvent, reemitirase como un evento NewMessageEvent normal, pero baixo o atributo publishableEventName.


Paga a pena mencionar que funcionará en 1 ou 1.000 instancias. Polo tanto, aínda que escalamos a un número elevado de instancias, cada unha delas recibirá isto e reemitirá este evento dentro do sistema.


Entón, agora temos capacidades para emitir eventos e escoitalos. Agora necesitamos entregalos aos nosos clientes de websocket.

Pasarela Websocket

Vexamos Websocket Gateway


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Polo tanto, hai algo, no construtor, temos EVENT_SUBSCRIBER_TOKEN, que tipo é EventSubscriberInterface. Pero que fai realmente? Así se ve debaixo do capó


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


É só un envoltorio para EventEmitter2, que estamos usando no método createWebsocketStreamFromEventFactory


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Estamos a usar este EventEmitter2 envolto para crear oíntes dinámicos en publishableName cando os clientes de websocket se conectan e eliminan ao desconectarse.


Entón, non estamos facendo máis que crear un fluxo rxjs para manter a conexión websocket e enviar mensaxes do oínte a través de observer.next(message); cando se produce unha nova mensaxe.

Como chegará este evento aos nosos oíntes?


Se volves ao primeiro fragmento de código, o noso submódulo Redis pub, podes detectar isto no método registerEvents

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


O que basicamente escoita eventos na cola do pub e, a continuación, reemite a través do emisor de eventos.


Entón, imos resumir o que fixemos aquí

  • Aínda estamos usando os nosos eventos no sistema como antes a través de EventEmitter2, pero se queremos publicar nos nosos clientes de websocket conectados, todo o que temos que facer é implementar PublishableInterface

  • Non estamos creando novas conexións Redis en cada cliente websocket conectado

  • Podemos escalar o noso sistema a X instancias e seguirá comportando do mesmo xeito: cada cliente conectado recibirá unha copia do evento a través de websocket, sen importar a que instancia estean conectados.


O código de traballo e o exemplo están dispoñibles aquí: https://github.com/axotion/nestjs-events-websocket