paint-brush
Como resolvi o problema de dimensionamento do WebSocket sem gastar muitopor@axotion
13,322 leituras
13,322 leituras

Como resolvi o problema de dimensionamento do WebSocket sem gastar muito

por Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Muito longo; Para ler

Há muito tempo, tive que criar um sistema escalável que pudesse ser capaz de lidar com centenas de conexões simultâneas a um custo não muito alto e com tempo de resposta razoável, mas não instantâneo. Eu queria enviar apenas eventos selecionados para clientes websocket. Criei um módulo Redis pub-sub, pois pensei que meus eventos, para serem visíveis em outras instâncias, tinham que ser transmitidos pelo padrão Redis Pub-sub.
featured image - Como resolvi o problema de dimensionamento do WebSocket sem gastar muito
Kamil Fronczak HackerNoon profile picture

Há muito tempo, me vi em uma situação em que precisava criar um sistema escalável que fosse capaz de lidar com centenas de conexões simultâneas a um custo não muito alto e com um tempo de resposta razoável, mas não instantâneo.


Meus primeiros pensamentos? Vamos mover todas as ações de criar/editar/excluir para a fila e notificar os usuários se suas ações foram bem-sucedidas ou não via WebSocket.


Mas naquela época, eu não tinha muita experiência com WebSockets em produção, então meu primeiro passo foi investigar como ele funciona com a ajuda de tutoriais, stack overflow e outras fontes.


Então, depois de algum tempo, entendi como isso deveria funcionar e comecei a preparar um código e brincar um pouco com uma ferramenta de testes de carga para simular alto tráfego.

O primeiro problema

Algumas perguntas e respostas sugeriram chamar o método subscribe na instância do Redis no cliente WebSocket conectado.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Mas dessa forma, estamos criando uma nova conexão com o Redis, então o uso de memória em nosso aplicativo e no pool de conexões do Redis estão aumentando. (O Redis permite apenas 10 mil conexões para uma instância)


Isso foi um grande não para mim porque eu tive que reduzir o uso de memória ao mínimo.


No momento, muitos artigos, felizmente, mencionam que você não deve criar uma nova conexão Redis em cada cliente WebSocket.

O segundo problema

Depois de criar um grande pedaço de código comercial, quando comecei a parte com web sockets, uma pergunta surgiu na minha mente: como criá-los de forma adequada e segura?


Eu já tinha alguns eventos no sistema, e alguns deles estavam prontos para serem publicados adicionalmente via WebSockets, o restante deveria permanecer dentro do sistema.


Minha promessa de ouro era que eu não precisaria alterar o código drasticamente e ainda seria capaz de enviar apenas eventos selecionados para clientes websocket.


É por isso que, a princípio, criei um módulo pub-sub do Redis, pois pensei que meus eventos, para serem visíveis em outras instâncias, teriam que ser transmitidos por meio do padrão pub-sub do Redis.


Não se sinta sobrecarregado ao olhar para o módulo abaixo, pois explicarei os detalhes mais tarde em um caso de uso


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Este módulo cuida da criação/exposição de um cliente Pub Redis e da exposição de um método adicional - registerEvents, que é responsável por escutar determinados eventos no Redis pub-sub e reemiti-los por meio do emissor de eventos.


Pode estar um pouco nebuloso por enquanto. Por que reemitir eventos? Por que precisamos nos registrar para esses eventos? O que são EVENT_EMITTER_TOKEN e EVENT_SUBSCRIBER_TOKEN e por que temos que exportá-los?


Ficará mais claro com o uso na vida real, então vamos criar um caso de uso - mensagens de bate-papo. Queremos poder enviar mensagens via HTTP POST e recebê-las via WebSocket no front-end.


Vamos começar

Eventos de publicação

Aqui está um módulo para isso

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


E um evento que este módulo irá emitir após receber uma requisição POST


 export class NewMessageEvent { constructor(public readonly message: string) {} }


No controlador, temos que tornar possível emitir eventos tanto para o nosso sistema quanto para a fila do pub Redis. Usaremos o wrapper

EventEmitter2 para isso


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


E então, podemos usá-lo em nosso controlador


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Mas antes disso, temos que aprimorar nosso evento com PublishableEventInterface para permitir que o RedisEventEmitter capture nosso evento e o emita na fila do pub do Redis.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Ótimo, agora estamos enviando nossos eventos como costumávamos fazer, mas agora, se eles forem marcados como publicáveis, eles irão para a fila de publicação do Redis.

Mas agora, precisamos tornar possível receber esses eventos no WebSocket, certo?

Recebendo eventos

Então, vamos dar uma olhada em nosso módulo de bate-papo do usuário


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Como você pode ver, usamos o método mencionado anteriormente - registerEvents.

Graças a esse método, informamos ao RedisEventPubSubModule que ele deveria escutar nosso evento NewMessageEvent na fila pub-sub do Redis no atributo publishableEventName.


Portanto, se qualquer evento NewMessageEvent ocorrer, ele será reemitido como um evento NewMessageEvent normal, mas sob o atributo publishableEventName.


Vale mencionar que ele funcionará em 1 instância ou 1.000 instâncias. Então, mesmo se escalarmos para um número alto de instâncias, cada uma delas receberá isso e reemitirá esse evento dentro do sistema.


Então, agora temos habilidades para emitir eventos e ouvi-los. Agora precisamos entregá-los aos nossos clientes websocket.

Gateway de Websocket

Vamos dar uma olhada no Websocket Gateway


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Então tem uma coisa, no construtor, temos EVENT_SUBSCRIBER_TOKEN, cujo tipo é EventSubscriberInterface. Mas o que ele realmente faz? É assim que ele se parece por baixo do capô


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


É apenas um wrapper para EventEmitter2, que estamos usando no método createWebsocketStreamFromEventFactory


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Estamos usando este EventEmitter2 encapsulado para criar ouvintes dinâmicos em publishableName quando clientes websocket se conectam e removem na desconexão.


Então, não estamos fazendo nada mais do que criar um fluxo rxjs para manter a conexão do websocket e enviar mensagens do listener via observer.next(message); quando uma nova mensagem ocorre.

Como esse evento chegará aos nossos ouvintes?


Se você retornar ao primeiro trecho de código, nosso módulo Redis pub sub, poderá identificar isso no método registerEvents

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Que basicamente escuta eventos na fila do pub e então os reemite por meio do emissor de eventos.


Então, vamos resumir o que fizemos aqui

  • Ainda estamos usando nossos eventos no sistema como costumávamos fazer via EventEmitter2, mas se quisermos publicar em nossos clientes websocket conectados, tudo o que precisamos fazer é implementar PublishableInterface

  • Não estamos criando novas conexões Redis em cada cliente websocket conectado

  • Podemos escalar nosso sistema para X instâncias e ele ainda se comportará da mesma maneira - cada cliente conectado receberá uma cópia do evento via websocket, não importa a qual instância ele estará conectado


Código de trabalho e exemplo estão disponíveis aqui: https://github.com/axotion/nestjs-events-websocket