Hace mucho tiempo, me encontré en una situación en la que tenía que crear un sistema escalable que fuera capaz de manejar cientos de conexiones simultáneas a un coste no muy alto y con un tiempo de respuesta razonable, pero no instantáneo.
¿Mi primera idea? Muevamos todas las acciones de creación, edición y eliminación a la cola y notifiquemos a los usuarios si sus acciones se realizaron correctamente o no a través de WebSocket.
Pero en ese entonces no tenía mucha experiencia con WebSockets en producción, así que mi primer paso fue investigar cómo funciona con la ayuda de tutoriales, stack overflow y otras fuentes.
Entonces, después de un tiempo, tuve una idea de cómo debería funcionar y comencé a preparar un código y a experimentar un rato con una herramienta de pruebas de carga para simular mucho tráfico.
Algunas preguntas y respuestas sugirieron llamar al método de suscripción en la instancia de Redis en el cliente WebSocket conectado.
io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });
Pero de esta manera, estamos creando una nueva conexión a Redis, por lo que el uso de memoria en nuestra aplicación y el grupo de conexiones de Redis aumentan. (Redis solo permite 10 000 conexiones a una instancia)
Eso fue un gran no para mí porque tenía que reducir el uso de memoria al mínimo.
En este momento, muchos artículos, afortunadamente, mencionan que no debes crear una nueva conexión Redis en cada cliente WebSocket.
Después de crear una gran parte del código comercial, cuando comencé la parte con web sockets, me surgió una pregunta: ¿cómo crearlos de manera adecuada y segura?
Ya tenía algunos eventos en el sistema, y algunos de ellos estaban listos para ser publicados adicionalmente a través de WebSockets, el resto estaba destinado a permanecer dentro del sistema.
Mi promesa de oro fue que no tendría que cambiar el código drásticamente y aún así podría enviar solo eventos seleccionados a los clientes websocket.
Por eso, al principio, creé un módulo pub-sub de Redis, ya que pensé que mis eventos, para ser visibles en otras instancias, debían transmitirse a través del patrón pub-sub de Redis.
No se sienta abrumado al mirar el módulo a continuación, ya que explicaré los detalles más adelante en un caso de uso.
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }
Este módulo se encarga de crear/exponer un cliente Pub Redis y exponer un método adicional, registerEvents, que es responsable de escuchar eventos determinados en Redis pub-sub y reemitirlos a través del emisor de eventos.
Puede que por ahora no esté del todo claro. ¿Por qué reemitir eventos? ¿Por qué tenemos que registrarnos para esos eventos? ¿Qué son EVENT_EMITTER_TOKEN
y EVENT_SUBSCRIBER_TOKEN
y por qué tenemos que exportarlos?
Será más claro con el uso en la vida real, así que vamos a crear un caso de uso: mensajes de chat. Queremos poder enviar mensajes a través de HTTP POST y recibirlos a través de WebSocket en el front-end.
Vamos a empezar
Aquí hay un módulo para eso.
@Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}
Y un evento que este módulo emitirá después de recibir una solicitud POST
export class NewMessageEvent { constructor(public readonly message: string) {} }
En el controlador, tenemos que hacer posible la emisión de eventos tanto para nuestro sistema como para la cola de publicación de Redis. Usaremos Wrapped
EventEmitter2 para eso
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }
Y luego, podemos usarlo en nuestro controlador.
@Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }
Pero antes de eso, tenemos que mejorar nuestro evento con PublishableEventInterface para permitir que RedisEventEmitter capture nuestro evento y lo emita en la cola de publicación de Redis.
export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }
Genial, ahora enviamos nuestros eventos como solíamos hacerlo, pero ahora, si están marcados como publicables, aparecerán en la cola de publicación de Redis.
Pero ahora necesitamos hacer posible recibir esos eventos en WebSocket, ¿verdad?
Entonces, echemos un vistazo a nuestro módulo de chat de usuario.
@Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}
Como puedes ver, utilizamos el método mencionado anteriormente: registerEvents.
Gracias a ese método, le dijimos a RedisEventPubSubModule que debería escuchar nuestro evento NewMessageEvent en la cola pub-sub de Redis en el atributo publishableEventName.
Entonces, si ocurre algún evento NewMessageEvent, se volverá a emitir como un evento NewMessageEvent normal, pero bajo el atributo publishableEventName.
Vale la pena mencionar que funcionará en 1 instancia o en 1000 instancias, por lo que incluso si ampliamos a una gran cantidad de instancias, cada una de ellas recibirá este evento y lo volverá a emitir dentro del sistema.
Ahora tenemos la capacidad de emitir eventos y escucharlos. Ahora tenemos que enviarlos a nuestros clientes de WebSocket.
Echemos un vistazo a Websocket Gateway
export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
Entonces, hay una cosa, en el constructor, tenemos EVENT_SUBSCRIBER_TOKEN, cuyo tipo es EventSubscriberInterface. Pero, ¿qué hace realmente? Así es como se ve bajo el capó
export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }
Es solo un contenedor para EventEmitter2, que estamos usando en el método createWebsocketStreamFromEventFactory
private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
Estamos usando este EventEmitter2 envuelto para crear oyentes dinámicos en publishableName cuando los clientes websocket se conectan y se eliminan al desconectan.
Entonces, no hacemos nada más que crear un flujo rxjs para mantener la conexión websocket y enviar mensajes desde el oyente a través de observer.next(message); cuando se produce un nuevo mensaje.
¿Cómo llegará este evento a nuestros oyentes?
Si regresa al primer fragmento de código, nuestro submódulo de publicación Redis, puede detectar esto en el método registerEvents
for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });
Que básicamente escucha eventos en la cola de publicación y luego los vuelve a emitir a través del emisor de eventos.
Así que, resumamos lo que hemos hecho aquí.
Seguimos usando nuestros eventos en el sistema como solíamos hacerlo a través de EventEmitter2, pero si queremos publicar en nuestros clientes websocket conectados, entonces todo lo que tenemos que hacer es implementar PublishableInterface.
No estamos creando nuevas conexiones Redis en cada cliente websocket conectado
Podemos ampliar nuestro sistema a X instancias y seguirá comportándose de la misma manera: cada cliente conectado recibirá una copia del evento a través de websocket, sin importar a qué instancia esté conectado.
El código de trabajo y el ejemplo están disponibles aquí: https://github.com/axotion/nestjs-events-websocket