paint-brush
Imaynata WebSocket Escalamiento Sasachakuyta Mana Bancota P'akispa allicharqaniby@axotion
13,322 ñawinchasqakuna
13,322 ñawinchasqakuna

Imaynata WebSocket Escalamiento Sasachakuyta Mana Bancota P'akispa allicharqani

by Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Nishu unay; Ñawinchanapaq

Unay pacha ñawpaqta, huk sistema escalable ruwanay karqa mayqinchus pachakkuna simultáneo tinkiykuna mana ancha hatun qullqiwan ruwayta atikunman karqa, chaymanta razonable, ichaqa mana chaylla kutichiy pachawan. Akllasqa ruwaykunallata websocket clientekunaman apachiyta munarqani. Huk Redis pub-sub módulo ruwarqani, imaynachus yuyaykurqani ruwasqaykuna, wak instanciakunapi rikukuq kanapaq, Redis Pub-sub patrón kaqnintakama apachisqa kananku tiyan.
featured image - Imaynata WebSocket Escalamiento Sasachakuyta Mana Bancota P'akispa allicharqani
Kamil Fronczak HackerNoon profile picture

Unay pacha ñawpaqta, huk situacionpi tarikurqani maypi huk sistema escalable ruwanay karqa mayqinchus pachakkuna simultáneo tinkiykuna mana ancha hatun qullqiwan ruwayta atiq, chaymanta razonable, ichaqa mana chaylla kutichiy pachawan.


¿Ñawpaq kaq yuyaykusqay? Llapan ruway/allinchay/qulluy ruwaykunata filaman kuyuchisun chaymanta ruwaqkunaman willasun sichus ruwayninku allin ruwasqa utaq mana WebSocket kaqnintakama.


Ichaqa chay pacha, mana ancha experienciayuqchu karqani WebSockets kaqwan ruwaypi, chayrayku ñawpaq ruwayniyqa karqan imayna llamk'an yachachiy yanapakuywan, pila desbordamiento kaqwan chaymanta wak pukyuta yanapakuywan investiganaypaq.


Chaymi, huk pachamanta, huk esencia imayna llamk'ananmanta chaskirqani chaymanta huk código wakichiyta qallarirqani chaymanta huk pachapaq huk herramienta pruebakuna carga kaqwan chaqruyta qallarirqani hatun tráfico simulanaypaq.

Ñawpaq kaq sasachakuy

Wakin tapuykuna chaymanta kutichiykuna yuyaycharqanku suscripción ruwayta waqyayta Redis instancia kaqpi tinkisqa WebSocket cliente kaqpi.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Ichaqa kayhina, huk musuq tinkiyta Redis kaqman ruwachkayku, chayrayku yuyarina llamk'achiyta ruwanaykupi chaymanta Redis tinkiynin quchapi wicharichkanku. (Redis huk instanciaman 10k tinkiykunallata saqin)


Chayqa hatun mana karqan ñoqapaq imaraykuchus memoria utilizacionta pisiyachinay karqan huk mínimoman.


Kunanpacha, achka qillqakuna, kusikuypaq, rimanku mana musuq Redis tinkiyta sapa WebSocket cliente kaqpi ruwanaykichu tiyan.

Iskay kaq sasachakuy

Huk hatun chunka negocio código ruwasqamanta, web sockets kaqwan parteta qallarichkaptiy, huk tapukuy yuyayniyman lluqsirqa - imaynata allin chaymanta mana manchay ruwaypi ruwayta?


Ñam wakin ruwaykuna sistemapi karqani, wakintaq WebSockets kaqnintakama yapamanta lluqsichinapaq wakichisqa karqanku, wakintaq sistema ukhupi qhipakunankupaq ruwasqa karqanku.


Quri promesayniyqa karqan mana sinchitachu código tikrayta ruwanay kanman chaymanta akllasqa ruwaykunallata websocket clientekunaman apachiyta atiymanraq.


Chay rayku, qallariypi, huk Redis pub-sub módulo ruwarqani, imaynachus yuyaykurqani ruwasqaykuna, wak instanciakunapi rikusqa kanankupaq, Redis pub-sub patrón kaqnintakama apachisqa kananku tiyan.


Ama llallichisqachu sientekuy kay urapi módulo qhawaspa, imaynachus sut'inchasaq detalles qhipaman huk caso de uso kaqpi


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Kay módulo huk Pub Redis cliente ruwayta/exposación kaqmanta qhawan chaymanta huk yapasqa métodota rikuchiyta - registerEvents kaqmanta, mayqinchus Redis pub-sub kaqpi qusqa ruwaykuna uyarinapaq chaymanta evento emitter kaqnintakama wakmanta kachaymanta ruwasqa.


Kunankamaqa huk chhika phuyuraqmi kanman. ¿Imaraykutaq sucesokunata yapamanta apachina? ¿Imanasqataq chay eventokunaman inscribikunanchik? ¿Imataq EVENT_EMITTER_TOKEN chaymanta EVENT_SUBSCRIBER_TOKEN chaymanta imarayku hawaman apachinanchik tiyan?


Aswan sut'i kanqa chiqa kawsay llamk'achiywan, chayrayku huk llamk'ana casota ruwasun - rimanakuy willakuykunata. Munayku willayta HTTP POST kaqninta apachiyta chaymanta WebSocket kaqninta ñawpaq tukukuypi chaskiyta.


Qallarisunchik

Eventos nisqakuna lluqsichiy

Kaypi huk módulo chaypaq kachkan

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


Hinallataq huk suceso kay módulo huk POST mañakuyta chaskisqamanta lluqsichinqa


 export class NewMessageEvent { constructor(public readonly message: string) {} }


Controlador kaqpi, ruwanayku tiyan eventokuna kachayta iskaynin sistemaykupaq chaymanta Redis pub fila kaqpaq. P’istuykusqata apaykachasunchik

EventEmitter2 chaypaq


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


Hinaspa chaymanta, controladorniykupi chayta llamk’achiyta atiyku


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Ichaqa manaraq chaymanta, PublishableEventInterface kaqwan ruwayniyku aswan allinta ruwanayku tiyan RedisEventEmitter ruwayniyku hap'inanpaq chaymanta Redis pub filapi kachayta atinanpaq.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Hatun, kunanqa ñawpaq hina eventoykuta apachichkayku, ichaqa kunan, sichus publicable hina marcasqa kanku chayqa, Redis pub filapi chayanqa.

Ichaqa kunan, chay ruwaykuna WebSocket kaqpi chaskiyta atikunanchik tiyan, ¿icharí?

Sucedimientokunata chaskispa

Chaymi, huk qhawayta ruwasun user chat módulo nisqaykupi


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Rikusqaykihina, ñawpaq rimasqa ñanta llamk'achirqayku - registerEvents.

Chay ñanman gracias, RedisEventPubSubModule kaqman willarqayku NewMessageEvent ruwayniyku uyarinan tiyan Redis pub-sub fila kaqpi publishableEventName atributo kaqpi.


Chaymi, sichus mayqin NewMessageEvent ruwaypas ruwakun, chaymanta wakmanta kachasqa kanqa huk normal NewMessageEvent ruway hina, ichaqa publishableEventName laya urapi.


Chayqa allinmi, 1 instanciapi utaq 1.000 instanciakunapi llamkanqa. Chaymi huk hatun yupay instanciakunaman escalayku chaypas, sapankanku kayta chaskinqaku chaymanta kay ruwayta sistema ukhupi wakmanta kachanqa.


Chaymi, kunanqa atiyniyuqmi kanchik sucesokunata lluqsichinapaq hinaspa uyarinapaq. Kunanqa websocket clienteykuman chayachinayku tiyan.

Websocket Punku

Websocket Gateway nisqamanta qhawarisunchis


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Chaymi huk imapas kan, ruwaqpi, EVENT_SUBSCRIBER_TOKEN nisqayuq kayku, mayqin layataq EventSubscriberInterface. Ichaqa, ¿imatam cheqaqtapuni ruwan? Kaynatam qawakun capuchapa uranpi


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


EventEmitter2 kaqpaq huk p'istulla kachkan, chayta createWebsocketStreamFromEventFactory ruwaypi llamk'achkayku


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Kay p'istuykusqa EventEmitter2 llamk'achkayku publishableName kaqpi uyariqkuna dinamico ruwanapaq mayk'aq websocket clientekuna tinkinku chaymanta t'inkinakuypi hurqunku.


Chaymanta, mana imatapas ruwachkaykuchu aswan rxjs mayu ruwaymanta websocket tinkiyta waqaychaypaq chaymanta uyariqmanta willakuykunata observer.next(message) kaqnintakama apachinapaq; musuq willakuy rikurimuptin.

¿Imaynatataq kay ruway uyariqninchikkunaman chayanqa?


Sichus ñawpaq fragmento codigo kaqman kutinki, Redis pub sub móduloykuman, chaymanta kayta registerEvents ruwaypi rikuyta atikunki

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Mayqinchus básicamente uyarin eventokunata pub filapi, chaymanta wakmanta kachan chaymanta evento emisor kaqnintakama.


Chaymi, kaypi ruwasqanchikta pisillapi rimarisunchik

  • Kunankamapas sistemapi ruwasqaykuta llamk'achkayku imaynachus ñawpaq EventEmitter2 kaqnintakama, ichaqa sichus tinkisqa websocket clienteykuman lluqsichiyta munayku, chaymanta tukuy ruwanayku tiyan PublishableInterface ruway

  • Mana musuq Redis tinkiykuna sapa tinkisqa websocket cliente kaqpi ruwachkaykuchu

  • Sistemaykuta X instanciakunaman hatunyachiyta atiyku chaymanta kaqllataraq ruwanqa - sapa tinkisqa cliente huk copiata ruwaymanta websocket kaqnintakama chaskinqa, mayqin instanciamanpas tinkisqa kanqaku


Llamkana codigo chaymanta ejemplo kaypi tarikun: https://github.com/axotion/nestjs-events-websocket