Unay pacha ñawpaqta, huk situacionpi tarikurqani maypi huk sistema escalable ruwanay karqa mayqinchus pachakkuna simultáneo tinkiykuna mana ancha hatun qullqiwan ruwayta atiq, chaymanta razonable, ichaqa mana chaylla kutichiy pachawan.
¿Ñawpaq kaq yuyaykusqay? Llapan ruway/allinchay/qulluy ruwaykunata filaman kuyuchisun chaymanta ruwaqkunaman willasun sichus ruwayninku allin ruwasqa utaq mana WebSocket kaqnintakama.
Ichaqa chay pacha, mana ancha experienciayuqchu karqani WebSockets kaqwan ruwaypi, chayrayku ñawpaq ruwayniyqa karqan imayna llamk'an yachachiy yanapakuywan, pila desbordamiento kaqwan chaymanta wak pukyuta yanapakuywan investiganaypaq.
Chaymi, huk pachamanta, huk esencia imayna llamk'ananmanta chaskirqani chaymanta huk código wakichiyta qallarirqani chaymanta huk pachapaq huk herramienta pruebakuna carga kaqwan chaqruyta qallarirqani hatun tráfico simulanaypaq.
Wakin tapuykuna chaymanta kutichiykuna yuyaycharqanku suscripción ruwayta waqyayta Redis instancia kaqpi tinkisqa WebSocket cliente kaqpi.
io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });
Ichaqa kayhina, huk musuq tinkiyta Redis kaqman ruwachkayku, chayrayku yuyarina llamk'achiyta ruwanaykupi chaymanta Redis tinkiynin quchapi wicharichkanku. (Redis huk instanciaman 10k tinkiykunallata saqin)
Chayqa hatun mana karqan ñoqapaq imaraykuchus memoria utilizacionta pisiyachinay karqan huk mínimoman.
Kunanpacha, achka qillqakuna, kusikuypaq, rimanku mana musuq Redis tinkiyta sapa WebSocket cliente kaqpi ruwanaykichu tiyan.
Huk hatun chunka negocio código ruwasqamanta, web sockets kaqwan parteta qallarichkaptiy, huk tapukuy yuyayniyman lluqsirqa - imaynata allin chaymanta mana manchay ruwaypi ruwayta?
Ñam wakin ruwaykuna sistemapi karqani, wakintaq WebSockets kaqnintakama yapamanta lluqsichinapaq wakichisqa karqanku, wakintaq sistema ukhupi qhipakunankupaq ruwasqa karqanku.
Quri promesayniyqa karqan mana sinchitachu código tikrayta ruwanay kanman chaymanta akllasqa ruwaykunallata websocket clientekunaman apachiyta atiymanraq.
Chay rayku, qallariypi, huk Redis pub-sub módulo ruwarqani, imaynachus yuyaykurqani ruwasqaykuna, wak instanciakunapi rikusqa kanankupaq, Redis pub-sub patrón kaqnintakama apachisqa kananku tiyan.
Ama llallichisqachu sientekuy kay urapi módulo qhawaspa, imaynachus sut'inchasaq detalles qhipaman huk caso de uso kaqpi
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }
Kay módulo huk Pub Redis cliente ruwayta/exposación kaqmanta qhawan chaymanta huk yapasqa métodota rikuchiyta - registerEvents kaqmanta, mayqinchus Redis pub-sub kaqpi qusqa ruwaykuna uyarinapaq chaymanta evento emitter kaqnintakama wakmanta kachaymanta ruwasqa.
Kunankamaqa huk chhika phuyuraqmi kanman. ¿Imaraykutaq sucesokunata yapamanta apachina? ¿Imanasqataq chay eventokunaman inscribikunanchik? ¿Imataq EVENT_EMITTER_TOKEN
chaymanta EVENT_SUBSCRIBER_TOKEN
chaymanta imarayku hawaman apachinanchik tiyan?
Aswan sut'i kanqa chiqa kawsay llamk'achiywan, chayrayku huk llamk'ana casota ruwasun - rimanakuy willakuykunata. Munayku willayta HTTP POST kaqninta apachiyta chaymanta WebSocket kaqninta ñawpaq tukukuypi chaskiyta.
Qallarisunchik
Kaypi huk módulo chaypaq kachkan
@Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}
Hinallataq huk suceso kay módulo huk POST mañakuyta chaskisqamanta lluqsichinqa
export class NewMessageEvent { constructor(public readonly message: string) {} }
Controlador kaqpi, ruwanayku tiyan eventokuna kachayta iskaynin sistemaykupaq chaymanta Redis pub fila kaqpaq. P’istuykusqata apaykachasunchik
EventEmitter2 chaypaq
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }
Hinaspa chaymanta, controladorniykupi chayta llamk’achiyta atiyku
@Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }
Ichaqa manaraq chaymanta, PublishableEventInterface kaqwan ruwayniyku aswan allinta ruwanayku tiyan RedisEventEmitter ruwayniyku hap'inanpaq chaymanta Redis pub filapi kachayta atinanpaq.
export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }
Hatun, kunanqa ñawpaq hina eventoykuta apachichkayku, ichaqa kunan, sichus publicable hina marcasqa kanku chayqa, Redis pub filapi chayanqa.
Ichaqa kunan, chay ruwaykuna WebSocket kaqpi chaskiyta atikunanchik tiyan, ¿icharí?
Chaymi, huk qhawayta ruwasun user chat módulo nisqaykupi
@Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}
Rikusqaykihina, ñawpaq rimasqa ñanta llamk'achirqayku - registerEvents.
Chay ñanman gracias, RedisEventPubSubModule kaqman willarqayku NewMessageEvent ruwayniyku uyarinan tiyan Redis pub-sub fila kaqpi publishableEventName atributo kaqpi.
Chaymi, sichus mayqin NewMessageEvent ruwaypas ruwakun, chaymanta wakmanta kachasqa kanqa huk normal NewMessageEvent ruway hina, ichaqa publishableEventName laya urapi.
Chayqa allinmi, 1 instanciapi utaq 1.000 instanciakunapi llamkanqa. Chaymi huk hatun yupay instanciakunaman escalayku chaypas, sapankanku kayta chaskinqaku chaymanta kay ruwayta sistema ukhupi wakmanta kachanqa.
Chaymi, kunanqa atiyniyuqmi kanchik sucesokunata lluqsichinapaq hinaspa uyarinapaq. Kunanqa websocket clienteykuman chayachinayku tiyan.
Websocket Gateway nisqamanta qhawarisunchis
export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
Chaymi huk imapas kan, ruwaqpi, EVENT_SUBSCRIBER_TOKEN nisqayuq kayku, mayqin layataq EventSubscriberInterface. Ichaqa, ¿imatam cheqaqtapuni ruwan? Kaynatam qawakun capuchapa uranpi
export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }
EventEmitter2 kaqpaq huk p'istulla kachkan, chayta createWebsocketStreamFromEventFactory ruwaypi llamk'achkayku
private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
Kay p'istuykusqa EventEmitter2 llamk'achkayku publishableName kaqpi uyariqkuna dinamico ruwanapaq mayk'aq websocket clientekuna tinkinku chaymanta t'inkinakuypi hurqunku.
Chaymanta, mana imatapas ruwachkaykuchu aswan rxjs mayu ruwaymanta websocket tinkiyta waqaychaypaq chaymanta uyariqmanta willakuykunata observer.next(message) kaqnintakama apachinapaq; musuq willakuy rikurimuptin.
¿Imaynatataq kay ruway uyariqninchikkunaman chayanqa?
Sichus ñawpaq fragmento codigo kaqman kutinki, Redis pub sub móduloykuman, chaymanta kayta registerEvents ruwaypi rikuyta atikunki
for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });
Mayqinchus básicamente uyarin eventokunata pub filapi, chaymanta wakmanta kachan chaymanta evento emisor kaqnintakama.
Chaymi, kaypi ruwasqanchikta pisillapi rimarisunchik
Kunankamapas sistemapi ruwasqaykuta llamk'achkayku imaynachus ñawpaq EventEmitter2 kaqnintakama, ichaqa sichus tinkisqa websocket clienteykuman lluqsichiyta munayku, chaymanta tukuy ruwanayku tiyan PublishableInterface ruway
Mana musuq Redis tinkiykuna sapa tinkisqa websocket cliente kaqpi ruwachkaykuchu
Sistemaykuta X instanciakunaman hatunyachiyta atiyku chaymanta kaqllataraq ruwanqa - sapa tinkisqa cliente huk copiata ruwaymanta websocket kaqnintakama chaskinqa, mayqin instanciamanpas tinkisqa kanqaku
Llamkana codigo chaymanta ejemplo kaypi tarikun: https://github.com/axotion/nestjs-events-websocket