paint-brush
Magadzirisiro andakaita iyo WebSocket Kuyera Dambudziko Pasina Kuputsa Bhangiby@axotion
13,322 kuverenga
13,322 kuverenga

Magadzirisiro andakaita iyo WebSocket Kuyera Dambudziko Pasina Kuputsa Bhangi

by Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Kurebesa; Kuverenga

Kare kare, ndaifanira kugadzira scalable system inogona kukwanisa kubata mazana ekubatanidza panguva imwe chete pasina mutengo wakanyanya, uye zvine musoro, asi kwete nguva yekupindura. Ini ndaida kutumira zviitiko zvakasarudzwa chete kune websocket vatengi. Ini ndakagadzira Redis pub-sub module, sezvandaifunga kuti zviitiko zvangu, kuti zvionekwe mune mamwe mamiriro, zvinofanirwa kufambiswa kuburikidza neRedis Pub-sub pateni.
featured image - Magadzirisiro andakaita iyo WebSocket Kuyera Dambudziko Pasina Kuputsa Bhangi
Kamil Fronczak HackerNoon profile picture

Kare kare, ndakazviwana ndiri mumamiriro ezvinhu apo ndaifanira kugadzira scalable system iyo inogona kukwanisa kubata mazana ekubatanidza panguva imwe chete pasina mari yakanyanyisa, uye ine musoro, asi kwete nguva yekupindura.


Pfungwa dzangu dzekutanga? Ngatifambisei zvese kugadzira/kugadzirisa/kudzima zviito kumutsara uye tizivise vashandisi kana zviito zvavo zvabudirira kana kwete kuburikidza neWebSocket.


Asi kare ikako, ndakanga ndisina ruzivo rwakawanda neWebSockets mukugadzira, saka danho rangu rekutanga raive rekuongorora kuti rinoshanda sei nerubatsiro rwezvidzidzo, mafashama, uye mamwe masosi.


Saka, mushure menguva yakati, ndakawana pfungwa yekuti yaifanirwa kushanda sei ndokutanga kugadzirira kodhi uye kuvhiringa kwechinguva nechishandiso chekuyedza kutevedzera yakakwira traffic.

Dambudziko rekutanga

Mimwe mibvunzo nemhinduro zvinokurudzira kufonera nzira yekunyorera pane iyo Redis muenzaniso pane yakabatana WebSocket mutengi.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Asi nenzira iyi, tiri kugadzira chinongedzo chitsva kuRedis, saka kushandiswa kwendangariro mukushandisa kwedu uye Redis yekubatanidza dziva iri kuwedzera. (Redis inobvumira chete 10k yekubatanidza kune imwe chiitiko)


That was a big no for me nekuti ndaifanira kudzikisa memory kushandiswa kusvika pashoma.


Parizvino, zvinyorwa zvakawanda, nerombo rakanaka, taura kuti haufanirwe kugadzira nyowani Redis yekubatanidza pane yega yega WebSocket mutengi.

Dambudziko rechipiri

Mushure mekugadzira chunk hombe yebhizinesi kodhi, pandakatanga chikamu newebhu sockets, mubvunzo wakapinda mupfungwa dzangu - maitiro ekuagadzira nenzira kwayo uye yakachengeteka?


Ini ndanga ndatove nezvimwe zviitiko muhurongwa, uye zvimwe zvacho zvaive zvakagadzirira kuburitswa kuburikidza neWebSockets, zvimwe zvacho zvaiitirwa kugara mukati mehurongwa.


Vimbiso yangu yegoridhe yaive yekuti ndaisazofanira kushandura kodhi zvakanyanya uye ndichiri kukwanisa kutumira zviitiko zvakasarudzwa kune websocket vatengi.


Ndokusaka, pakutanga, ndakagadzira Redis pub-sub module, sezvandaifunga kuti zviitiko zvangu, kuti zvionekwe mune mamwe mamiriro, zvinofanirwa kufambiswa kuburikidza neRedis pub-sub pateni.


Usanzwe kuremerwa nekutarisa iyo module pazasi, sezvo ini ndichatsanangura zvakadzama gare gare mune imwe nyaya yekushandisa


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Iyi module inotarisira kugadzira / kufumura mutengi wePub Redis uye kufumura imwe nzira - RegisterEvents, iyo ine basa rekuteerera zviitiko zvakapihwa paRedis pub-sub uye kuzviburitsa zvakare kuburikidza nechiitiko emitter.


Panogona kunge paine mhute zvishoma parizvino. Sei uchidzokorora zviitiko? Nei tichifanira kunyoresa kuzviitiko izvozvo? Chii chinonzi EVENT_EMITTER_TOKEN EVENT_SUBSCRIBER_TOKEN uye nei tichifanira kuzvitengesa kunze?


Ichave yakanyatsojeka nekushandiswa kwehupenyu chaihwo, saka ngatigadzire nyaya yekushandisa - chat mameseji. Tinoda kukwanisa kutumira mameseji kuburikidza neHTTP POST uye tinoagamuchira kuburikidza neWebSocket kumberi kwekupedzisira.


Ngatitangei

Zviitiko zvekudhinda

Heino module yeiyo

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


Uye chiitiko icho module iyi ichave ichiburitsa mushure mekugamuchira chikumbiro chePOST


 export class NewMessageEvent { constructor(public readonly message: string) {} }


Mune controller, isu tinofanirwa kuita kuti zvikwanise kuburitsa zviitiko zvese kune yedu system uye iyo Redis pub mutsara. Tichashandisa yakaputirwa

ChiitikoEmitter2 cheizvozvo


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


Uye ipapo, isu tinokwanisa kuishandisa mune yedu controller


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Asi izvi zvisati zvaitika, isu tinofanirwa kusimudzira chiitiko chedu nePublishableEventInterface kuitira kuti tibvumire RedisEventEmitter kubata chiitiko chedu nekuchiburitsa muRedis pub mutsara.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Hongu, tave kutumira zviitiko zvedu sezvataimboita, asi ikozvino, kana dzikamakidzwa sezviri kuburitswa, vanozomhara mumutsetse weRedis pub.

Asi ikozvino, isu tinofanirwa kuita kuti zvikwanise kugamuchira izvo zviitiko paWebSocket, handiti?

Kugamuchira zviitiko

Saka, ngatitarisei kune yedu mushandisi chat module


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Sezvauri kuona, takashandisa nzira yambotaurwa - RegisterEvents.

Nekuda kweiyo nzira, takaudza RedisEventPubSubModule kuti inofanirwa kuteerera kune yedu NewMessageEvent chiitiko muRedis pub-sub mutsara pane inoburitswaEventName hunhu.


Saka, kana chero chiitiko cheNewMessageEvent chikaitika, chinozoburitswa zvakare seyakajairwa NewMessageEvent chiitiko, asi pasi peinoburitswaEventName hunhu.


Zvakakodzera kutaura, kuti ichashanda pane 1 muenzaniso kana chiuru chiitiko. Saka kunyangwe tikakwira kusvika kunhamba yepamusoro yezviitiko, imwe neimwe yadzo inogashira izvi uye kuburitsazve chiitiko ichi mukati mehurongwa.


Saka, ikozvino tine kugona kuburitsa zviitiko nekuzviteerera. Zvino isu tinoda kuvaendesa kune edu websocket vatengi.

Websocket Gateway

Ngatitarisei kuWebsocket Gateway


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Saka pane chinhu, muvaki, isu tine EVENT_SUBSCRIBER_TOKEN, iyo mhando inonzi EventSubscriberInterface. Asi chii chainyatsoita? Aya ndiwo maitiro anotaridzika pasi pehodhi


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


Ingori wrapper yeChiitikoEmitter2, yatiri kushandisa nenzira yekugadziraWebsocketStreamFromEventFactory.


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Isu tiri kushandisa iyi yakaputirwa ChiitikoEmitter2 kugadzira vateereri vane simba parinoshambadzirwaName kana websocket vatengi vabatana nekubvisa pakudimbura.


Zvadaro, hapana chatiri kuita kunze kwekugadzira rxjs stream kuchengetedza websocket yekubatanidza uye kutumira mameseji kubva kumuteereri kuburikidza nemucherechedzi.next(message); kana meseji itsva yaitika.

Chiitiko ichi chichasvika sei kune vateereri vedu?


Kana iwe ukadzokera kune yekutanga snippet yekodhi, yedu Redis pub sub module, saka unogona kuona izvi mune rerejistaEvents nzira.

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Izvo zvinonyatso teerera zviitiko paPub mutsara, uye wozozviburitsa zvakare kuburikidza nechiitiko emitter.


Saka, ngatipei muchidimbu zvataita pano

  • Tichiri kushandisa zviitiko zvedu muhurongwa sezvataimboita kuburikidza neChiitikoEmitter2, asi kana tichida kushambadza kune edu akabatana websocket vatengi, saka chatinofanira kuita kushandisa PublishableInterface.

  • Isu hatisi kugadzira nyowani Redis yekubatanidza pane yega yega yakabatana websocket mutengi

  • Tinogona kukwirisa sisitimu yedu kusvika kuX zviitiko uye icharamba ichiita nenzira imwechete - mutengi wega wega akabatana achawana kopi yechiitiko kuburikidza newebhusocket, zvisinei kuti vachange vakabatana kupi.


Kushanda kodhi uye muenzaniso zviripo pano: https://github.com/axotion/nestjs-events-websocket