paint-brush
Як я вырашыў праблему маштабавання WebSocket, не патраціўшы грошыпа@axotion
13,322 чытанні
13,322 чытанні

Як я вырашыў праблему маштабавання WebSocket, не патраціўшы грошы

па Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Занадта доўга; Чытаць

Даўным-даўно мне прыйшлося стварыць маштабаваную сістэму, якая магла б апрацоўваць сотні адначасовых злучэнняў па не вельмі высокай цане і з разумным, але не імгненным часам адказу. Я хацеў адправіць кліентам websocket толькі выбраныя падзеі. Я стварыў модуль Redis pub-sub, бо думаў, што мае падзеі, каб быць бачнымі ў іншых выпадках, павінны перадавацца праз шаблон Redis Pub-sub.
featured image - Як я вырашыў праблему маштабавання WebSocket, не патраціўшы грошы
Kamil Fronczak HackerNoon profile picture

Даўным-даўно я апынуўся ў сітуацыі, калі мне трэба было стварыць маштабаваную сістэму, якая магла б апрацоўваць сотні адначасовых злучэнняў па невялікай цане і з разумным, але не імгненным часам адказу.


Мае першыя думкі? Давайце перамесцім усе дзеянні стварэння/рэдагавання/выдалення ў чаргу і паведамім карыстальнікам, калі іх дзеянні ўдаліся або не праз WebSocket.


Але тады ў мяне не было вялікага досведу працы з WebSockets у вытворчасці, таму маім першым крокам было даследаванне таго, як гэта працуе з дапамогай падручнікаў, перапаўнення стэка і іншых крыніц.


Такім чынам, праз некаторы час я зразумеў, як гэта павінна працаваць, і пачаў рыхтаваць код і некаторы час важдацца з інструментам нагрузачных тэстаў для мадэлявання высокага трафіку.

Першая праблема

У некаторых пытаннях і адказах прапаноўвалася выклікаць метад падпіскі ў асобніку Redis на падключаным кліенце WebSocket.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Але такім чынам мы ствараем новае злучэнне з Redis, таму выкарыстанне памяці ў нашым дадатку і пуле злучэнняў Redis расце. (Redis дазваляе толькі 10 тысяч падключэнняў да аднаго асобніка)


Для мяне гэта было вялікім "не", таму што я павінен быў знізіць выкарыстанне памяці да мінімуму.


Зараз у многіх артыкулах, на шчасце, згадваецца, што не варта ствараць новае злучэнне Redis для кожнага кліента WebSocket.

Другая праблема

Пасля стварэння вялікага кавалка бізнес-кода, калі я пачаў працу з вэб-сокетамі, у мяне ўзнікла пытанне - як стварыць іх правільным і бяспечным спосабам?


У мяне ўжо былі некаторыя падзеі ў сістэме, і частка з іх была гатовая да дадатковай публікацыі праз WebSockets, астатнія павінны былі заставацца ў сістэме.


Маё залатое абяцанне заключалася ў тым, што мне не давядзецца кардынальна змяняць код і пры гэтым я змагу адпраўляць толькі выбраныя падзеі кліентам websocket.


Вось чаму спачатку я стварыў модуль Redis pub-sub, бо думаў, што мае падзеі, каб быць бачнымі ў іншых выпадках, павінны перадавацца праз шаблон Redis pub-sub.


Не адчувайце сябе прыгнечаным, гледзячы на модуль ніжэй, бо я растлумачу падрабязнасці пазней у выпадку выкарыстання


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Гэты модуль клапоціцца аб стварэнні/адкрыцці кліента Pub Redis і адкрыцці дадатковага метаду - registerEvents, які адказвае за праслухоўванне дадзеных падзей на Redis pub-sub і іх паўторнае выпраменьванне праз эмітэр падзей.


Пакуль можа быць невялікі туман. Навошта перадаваць падзеі? Навошта нам рэгістравацца на гэтыя мерапрыемствы? Што такое EVENT_EMITTER_TOKEN і EVENT_SUBSCRIBER_TOKEN і чаму мы павінны іх экспартаваць?


Гэта будзе больш зразумела пры выкарыстанні ў рэальным жыцці, таму давайце створым варыянт выкарыстання - паведамленні ў чаце. Мы хочам мець магчымасць адпраўляць паведамленні праз HTTP POST і атрымліваць іх праз WebSocket на інтэрфейсе.


Давайце пачнем

Выдавецкія мерапрыемствы

Вось модуль для гэтага

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


І падзея, якую гэты модуль будзе выдаваць пасля атрымання запыту POST


 export class NewMessageEvent { constructor(public readonly message: string) {} }


У кантролеры мы павінны зрабіць магчымым выдаваць падзеі як для нашай сістэмы, так і для чаргі Redis. Мы будзем выкарыстоўваць загорнутыя

EventEmitter2 для гэтага


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


І тады мы можам выкарыстоўваць яго ў нашым кантролеры


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Але перад гэтым мы павінны палепшыць нашу падзею з дапамогай PublishableEventInterface, каб дазволіць RedisEventEmitter лавіць нашу падзею і выдаваць яе ў чаргу паба Redis.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Выдатна, цяпер мы адпраўляем нашы падзеі, як і раней, але цяпер, калі яны пазначаны як даступныя для публікацыі, яны будуць трапляць у чаргу паба Redis.

Але цяпер нам трэба зрабіць магчымым атрыманне гэтых падзей на WebSocket, праўда?

Атрыманне падзей

Такім чынам, давайце паглядзім на наш модуль карыстальніцкага чата


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Як бачыце, мы выкарыстоўвалі метад, згаданы раней - registerEvents.

Дзякуючы гэтаму метаду мы сказалі RedisEventPubSubModule, што ён павінен праслухоўваць нашу падзею NewMessageEvent у чарзе Redis pub-sub па атрыбуту publishableEventName.


Такім чынам, калі адбываецца якая-небудзь падзея NewMessageEvent, яна будзе перададзена як звычайная падзея NewMessageEvent, але з атрыбутам publishableEventName.


Варта адзначыць, што ён будзе працаваць на 1 экземпляр або 1000 асобнікаў. Такім чынам, нават калі мы маштабуем вялікую колькасць асобнікаў, кожны з іх атрымае гэта і паўторна выпусціць гэтую падзею ўнутры сістэмы.


Такім чынам, цяпер у нас ёсць магчымасці трансляваць падзеі і праслухоўваць іх. Цяпер нам трэба даставіць іх нашым кліентам websocket.

Шлюз Websocket

Давайце паглядзім на Websocket Gateway


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Такім чынам, у канструктары ёсць EVENT_SUBSCRIBER_TOKEN, тып якога - EventSubscriberInterface. Але што гэта сапраўды робіць? Вось як гэта выглядае пад капотам


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


Гэта проста абалонка для EventEmitter2, якую мы выкарыстоўваем у метадзе createWebsocketStreamFromEventFactory


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Мы выкарыстоўваем гэты абгорнуты EventEmitter2 для стварэння дынамічных слухачоў на publishableName, калі кліенты websocket падключаюцца і выдаляюцца пры адключэнні.


Затым мы робім не што іншае, як стварэнне патоку rxjs для падтрымання злучэння праз вэб-сокет і адпраўкі паведамленняў ад слухача праз observer.next(message); пры з'яўленні новага паведамлення.

Як гэтая падзея дойдзе да нашых слухачоў?


Калі вы вернецеся да першага фрагмента кода, нашага падмодуля Redis pub, вы можаце заўважыць гэта ў метадзе registerEvents

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Які ў асноўным праслухоўвае падзеі ў чарзе паба, а затым паўторна выпраменьвае іх праз эмітэр падзей.


Такім чынам, давайце падвядзем вынік таго, што мы тут зрабілі

  • Мы па-ранейшаму выкарыстоўваем нашы падзеі ў сістэме, як раней праз EventEmitter2, але калі мы хочам публікаваць на нашых падключаных кліентах websocket, то ўсё, што нам трэба зрабіць, гэта рэалізаваць PublishableInterface

  • Мы не ствараем новыя злучэнні Redis для кожнага падключанага кліента websocket

  • Мы можам павялічыць нашу сістэму да X асобнікаў, і яна ўсё роўна будзе паводзіць сябе такім жа чынам - кожны падлучаны кліент атрымае копію падзеі праз websocket, незалежна ад таго, да якога асобніка яны будуць падключаны


Працоўны код і прыклад даступныя тут: https://github.com/axotion/nestjs-events-websocket