paint-brush
Како сам решио проблем скалирања ВебСоцкет-а без нарушавања банкеод стране@axotion
13,322 читања
13,322 читања

Како сам решио проблем скалирања ВебСоцкет-а без нарушавања банке

од стране Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Предуго; Читати

Пре много времена, морао сам да направим скалабилан систем који би могао да рукује стотинама истовремених веза по не баш високој цени, и са разумним, али не тренутним временом одговора. Хтео сам да пошаљем само одабране догађаје вебсоцкет клијентима. Направио сам Редис пуб-суб модул, јер сам мислио да моји догађаји, да би били видљиви у другим инстанцама, морају бити пренети преко Редис Пуб-суб шаблона.
featured image - Како сам решио проблем скалирања ВебСоцкет-а без нарушавања банке
Kamil Fronczak HackerNoon profile picture

Давно сам се нашао у ситуацији да сам морао да направим скалабилан систем који би могао да рукује стотинама истовремених веза по не баш високој цени, и са разумним, али не и тренутним временом одговора.


Моје прве мисли? Хајде да преместимо све радње креирања/уређивања/брисања у ред чекања и обавестимо кориснике да ли су њихове акције успеле или не преко ВебСоцкет-а.


Али тада нисам имао много искуства са ВебСоцкетс-има у производњи, па је мој први корак био да истражим како то функционише уз помоћ туторијала, преливања стека и других извора.


Дакле, након неког времена, схватио сам како би требало да функционише и почео да припремам код и да се петљам неко време са алатом за тестирање оптерећења да симулирам велики промет.

Први проблем

Нека питања и одговори су предложили позивање методе претплате на Редис инстанци на повезаном ВебСоцкет клијенту.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Али на овај начин стварамо нову везу са Редис-ом, тако да употреба меморије у нашој апликацији и Редис-овом скупу веза расте. (Редис дозвољава само 10к веза на једну инстанцу)


То је било велико не за мене јер сам морао да смањим употребу меморије на минимум.


Тренутно се у многим чланцима, срећом, помиње да не би требало да креирате нову Редис везу на сваком ВебСоцкет клијенту.

Други проблем

Након креирања великог дела пословног кода, када сам почео део са веб утичницама, појавило ми се питање - како да их креирам на правилан и безбедан начин?


Већ сам имао неке догађаје у систему, а неки од њих су били спремни да буду додатно објављени преко ВебСоцкетс-а, остали су требали да остану унутар система.


Моје златно обећање је било да нећу морати драстично да мењам код и да и даље могу да шаљем само одабране догађаје вебсоцкет клијентима.


Зато сам у почетку направио Редис пуб-суб модул, јер сам мислио да моји догађаји, да би били видљиви у другим инстанцама, морају да се преносе преко Редис пуб-суб шаблона.


Немојте се осећати преоптерећено гледањем модула у наставку, јер ћу касније објаснити детаље у случају употребе


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Овај модул брине о креирању/излагању Пуб Редис клијента и откривању додатног метода - регистерЕвентс, који је одговоран за слушање задатих догађаја на Редис пуб-суб и њихово поновно емитовање преко емитера догађаја.


За сада може бити мало магле. Зашто поново емитовати догађаје? Зашто морамо да се региструјемо за те догађаје? Шта су EVENT_EMITTER_TOKEN и EVENT_SUBSCRIBER_TOKEN и зашто морамо да их извозимо?


Биће јасније са употребом у стварном животу, па хајде да направимо случај употребе - поруке за ћаскање. Желимо да можемо да шаљемо поруке преко ХТТП ПОСТ-а и да их примамо преко ВебСоцкет-а на предњем крају.


Хајде да почнемо

Објављивање догађаја

Ево модула за то

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


И догађај који ће овај модул емитовати након што прими ПОСТ захтев


 export class NewMessageEvent { constructor(public readonly message: string) {} }


У контролеру морамо да омогућимо емитовање догађаја и за наш систем и за Редис пуб ред. Користићемо умотане

ЕвентЕмиттер2 за то


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


А онда можемо да га користимо у нашем контролеру


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Али пре тога, морамо да побољшамо наш догађај помоћу ПублисхаблеЕвентИнтерфаце-а како бисмо омогућили РедисЕвентЕмиттер-у да ухвати наш догађај и емитује га у Редис пуб реду.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Одлично, сада шаљемо наше догађаје као некада, али сада, ако буду означени као објављени, они ће слетети у Редис пуб ред.

Али сада, морамо да омогућимо примање тих догађаја на ВебСоцкет-у, зар не?

Пријем догађаја

Дакле, хајде да погледамо наш модул за корисничко ћаскање


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Као што видите, користили смо раније поменути метод - регистерЕвентс.

Захваљујући тој методи, рекли смо РедисЕвентПубСубМодуле-у да треба да слуша наш догађај НевМессагеЕвент у реду реду пуб-суб Редис на атрибуту публисхаблеЕвентНаме.


Дакле, ако се догоди било који догађај НевМессагеЕвент, он ће бити поново емитован као нормалан догађај НевМессагеЕвент, али под атрибутом публисхаблеЕвентНаме.


Вреди напоменути да ће радити на 1 инстанци или на 1.000 инстанци. Дакле, чак и ако скалирамо на велики број инстанци, свака од њих ће примити ово и поново емитовати овај догађај унутар система.


Дакле, сада имамо способности да емитујемо догађаје и слушамо их. Сада их морамо испоручити нашим вебсоцкет клијентима.

Вебсоцкет Гатеваи

Хајде да погледамо Вебсоцкет Гатеваи


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Дакле, постоји ствар, у конструктору, имамо ЕВЕНТ_СУБСЦРИБЕР_ТОКЕН, чији је тип ЕвентСубсцриберИнтерфаце. Али шта то заиста ради? Овако изгледа испод хаубе


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


То је само омот за ЕвентЕмиттер2, који користимо у методи цреатеВебсоцкетСтреамФромЕвентФацтори


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Користимо овај омотани ЕвентЕмиттер2 да креирамо динамичке слушаоце на публисхаблеНаме када се вебсоцкет клијенти повежу и уклоне након прекида везе.


Затим, не радимо ништа више од креирања ркјс стрима да бисмо задржали везу са веб соцкетом и послали поруке од слушаоца преко обсервер.нект(мессаге); када се појави нова порука.

Како ће овај догађај доћи до наших слушалаца?


Ако се вратите на први исечак кода, наш Редис пуб подмодул, онда ово можете уочити у методи регистерЕвентс

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Који у суштини ослушкује догађаје у пуб реду, а затим их поново емитује преко емитера догађаја.


Дакле, хајде да сумирамо шта смо овде урадили

  • Још увек користимо наше догађаје у систему као некада преко ЕвентЕмиттер2, али ако желимо да објавимо на нашим повезаним вебсоцкет клијентима, све што треба да урадимо је да имплементирамо ПублисхаблеИнтерфаце

  • Не креирамо нове Редис везе на сваком повезаном вебсоцкет клијенту

  • Можемо повећати наш систем на Кс инстанце и он ће се и даље понашати на исти начин - сваки повезани клијент ће добити копију догађаја преко вебсоцкета, без обзира на коју инстанцу ће бити повезан


Радни код и пример су доступни овде: хттпс: //гитхуб.цом/акотион/нестјс-евентс-вебсоцкет

L O A D I N G
. . . comments & more!

About Author

Kamil Fronczak HackerNoon profile picture
Kamil Fronczak@axotion
I’m a 2X-year-old tech dude from Poland, and this is my blog about tech stuff: NestJS, Node

ХАНГ ТАГС

ОВАЈ ЧЛАНАК ЈЕ ПРЕДСТАВЉЕН У...