paint-brush
Kuinka ratkaisin WebSocket-skaalausongelman rikkomatta pankkiakirjoittaja@axotion
13,322 lukemat
13,322 lukemat

Kuinka ratkaisin WebSocket-skaalausongelman rikkomatta pankkia

kirjoittaja Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Liian pitkä; Lukea

Minun piti kauan sitten luoda skaalautuva järjestelmä, joka pystyisi käsittelemään satoja samanaikaisia yhteyksiä ei kovin korkein kustannuksin ja kohtuullisella, mutta ei välittömällä vastausajalla. Halusin lähettää vain valitut tapahtumat websocket-asiakkaille. Tein Redis-pub-sub-moduulin, koska ajattelin, että tapahtumani on välitettävä Redis Pub-sub -mallin kautta, jotta ne olisivat näkyvissä muissa tapauksissa.
featured image - Kuinka ratkaisin WebSocket-skaalausongelman rikkomatta pankkia
Kamil Fronczak HackerNoon profile picture

Jo kauan sitten jouduin tilanteeseen, jossa jouduin luomaan skaalautuvan järjestelmän, joka pystyisi käsittelemään satoja samanaikaisia yhteyksiä ei kovin korkealla hinnalla ja kohtuullisella, mutta ei välittömällä vastausajalla.


Ensimmäiset ajatukseni? Siirretään kaikki luomis-/muokkaus-/poistotoiminnot jonoon ja ilmoitamme WebSocketin kautta käyttäjille, onnistuivatko heidän toimet vai eivät.


Mutta tuolloin minulla ei ollut paljoa kokemusta WebSocketsista tuotannossa, joten ensimmäinen askeleeni oli tutkia, miten se toimii opetusohjelmien, pinon ylivuodon ja muiden lähteiden avulla.


Joten jonkin ajan kuluttua sain ytimen siitä, kuinka sen pitäisi toimia, ja aloin valmistelemaan koodia ja sotkemaan jonkin aikaa kuormitustestityökalulla korkean liikenteen simuloimiseksi.

Ensimmäinen ongelma

Jotkut kysymykset ja vastaukset ehdottivat tilausmenetelmän kutsumista yhdistetyn WebSocket-asiakkaan Redis-esiintymässä.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Mutta tällä tavalla luomme uuden yhteyden Redikseen, joten muistin käyttö sovelluksessamme ja Redis-yhteyspoolissamme kasvaa. (Redis sallii vain 10 000 yhteyttä yhteen esiintymään)


Se oli suuri ei minulle, koska minun oli vähennettävä muistin käyttö minimiin.


Tällä hetkellä monissa artikkeleissa onneksi mainitaan, että sinun ei pitäisi luoda uutta Redis-yhteyttä jokaiseen WebSocket-asiakkaaseen.

Toinen ongelma

Suuren bisneskoodipalan luomisen jälkeen, kun aloitin osan web-liittimistä, mieleeni ponnahti kysymys - kuinka luoda niitä oikein ja turvallisesti?


Joitakin tapahtumia minulla oli jo järjestelmässä, ja osa niistä oli valmiita lisättäväksi WebSocketsin kautta, loput oli tarkoitus pysyä järjestelmän sisällä.


Kultainen lupaukseni oli, että minun ei tarvitse muuttaa koodia rajusti ja silti pystyn lähettämään vain valittuja tapahtumia websocket-asiakkaille.


Siksi alun perin loin Redis pub-sub -moduulin, koska ajattelin, että tapahtumani on välitettävä Redis pub-sub -mallin kautta, jotta ne olisivat näkyvissä muissa tapauksissa.


Älä hämmenny alla olevaa moduulia katsoessasi, sillä selitän yksityiskohdat myöhemmin käyttötapauksessa


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Tämä moduuli huolehtii Pub Redis -asiakkaan luomisesta/paljastamisesta ja lisämenetelmän paljastamisesta - registerEvents, joka vastaa tiettyjen tapahtumien kuuntelemisesta Redis-pub-subissa ja niiden lähettämisestä uudelleen tapahtumalähettimen kautta.


Tällä hetkellä saattaa olla hieman sumuista. Miksi tapahtumia pitäisi lähettää uudelleen? Miksi meidän pitää rekisteröityä näihin tapahtumiin? Mitä ovat EVENT_EMITTER_TOKEN ja EVENT_SUBSCRIBER_TOKEN ja miksi meidän on vietävä ne?


Se tulee selvemmäksi tosielämän käytössä, joten luodaan käyttötapaus - chat-viestit. Haluamme lähettää viestejä HTTP POST:n kautta ja vastaanottaa ne käyttöliittymän WebSocketin kautta.


Aloitetaan

Tapahtumien julkaiseminen

Tässä on moduuli sitä varten

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


Ja tapahtuma, jonka tämä moduuli lähettää saatuaan POST-pyynnön


 export class NewMessageEvent { constructor(public readonly message: string) {} }


Ohjaimessa meidän on mahdollistettava tapahtumien lähettäminen sekä järjestelmällemme että Redis-pub-jonolle. Käytämme käärittynä

EventEmitter2 sitä varten


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


Ja sitten voimme käyttää sitä ohjaimessamme


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Mutta ennen sitä meidän on tehostettava tapahtumaamme PublishableEventInterfacella, jotta RedisEventEmitter saa kiinni tapahtumamme ja lähettää sen Redis-pubijonoon.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Hienoa, nyt lähetämme tapahtumamme kuten ennenkin, mutta nyt, jos ne on merkitty julkaistaviksi, ne laskeutuvat Redis-pubijonoon.

Mutta nyt meidän on mahdollistettava näiden tapahtumien vastaanottaminen WebSocketissa, eikö niin?

Tapahtumien vastaanottaminen

Joten katsotaanpa käyttäjän chat-moduuliamme


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Kuten näet, käytimme aiemmin mainittua menetelmää - registerEvents.

Tämän menetelmän ansiosta kerroimme RedisEventPubSubModulelle, että sen pitäisi kuunnella NewMessageEvent-tapahtumaamme Redis-pub-sub-jonossa publishableEventName-attribuutilla.


Joten jos jokin NewMessageEvent-tapahtuma tapahtuu, se lähetetään uudelleen normaalina NewMessageEvent-tapahtumana, mutta publishableEventName-attribuutin alla.


On syytä mainita, että se toimii 1 esiintymässä tai 1 000 ilmentymässä. Joten vaikka skaalaammekin suureen määrään esiintymiä, jokainen niistä vastaanottaa tämän ja lähettää tämän tapahtuman uudelleen järjestelmän sisällä.


Joten nyt meillä on kyky lähettää tapahtumia ja kuunnella niitä. Nyt meidän on toimitettava ne websocket-asiakkaillemme.

Websocket Gateway

Katsotaanpa Websocket Gatewaytä


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Konstruktorissa on siis EVENT_SUBSCRIBER_TOKEN, jonka tyyppi on EventSubscriberInterface. Mutta mitä se todella tekee? Tältä se näyttää konepellin alla


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


Se on vain kääre EventEmitter2:lle, jota käytämme menetelmässä createWebsocketStreamFromEventFactory


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Käytämme tätä käärittyä EventEmitter2:ta dynaamisten kuuntelijoiden luomiseen publishableNameen, kun websocket-asiakkaat muodostavat yhteyden ja poistavat yhteyden katketessa.


Sitten emme tee mitään muuta kuin luomme rxjs-virran ylläpitämään websocket-yhteyttä ja lähettämään viestejä kuuntelijalta kautta tarkkailija.next(message); kun uusi viesti tulee.

Miten tämä tapahtuma tavoittaa kuulijamme?


Jos palaat ensimmäiseen koodinpätkään, Redis-pub-alimoduuliimme, voit havaita tämän registerEvents-menetelmässä

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Joka pohjimmiltaan kuuntelee tapahtumia pub-jonossa ja lähettää ne sitten uudelleen tapahtumalähettimen kautta.


Tehdään siis yhteenveto siitä, mitä olemme tehneet täällä

  • Käytämme edelleen tapahtumiamme järjestelmässä kuten ennen EventEmitter2:n kautta, mutta jos haluamme julkaista liitetyille websocket-asiakkaillemme, meidän tarvitsee vain ottaa PublishableInterface käyttöön.

  • Emme luo uusia Redis-yhteyksiä jokaiselle yhdistetylle websocket-asiakkaalle

  • Voimme skaalata järjestelmämme X-instanssiin ja se toimii edelleen samalla tavalla - jokainen yhdistetty asiakas saa kopion tapahtumasta websocketin kautta riippumatta siitä, mihin ilmentymään ne yhdistetään.


Toimintakoodi ja esimerkki ovat saatavilla täältä: https://github.com/axotion/nestjs-events-websocket