paint-brush
Би WebSocket-ийн масштабын асуудлыг банк эвдэхгүйгээр хэрхэн шийдсэн бэby@axotion
13,322 уншилтууд
13,322 уншилтууд

Би WebSocket-ийн масштабын асуудлыг банк эвдэхгүйгээр хэрхэн шийдсэн бэ

by Kamil Fronczak11m2025/01/19
Read on Terminal Reader

Хэтэрхий урт; Унших

Удаан хугацааны өмнө би олон зуун холболтыг тийм ч өндөр биш, боломжийн боловч шуурхай хариу өгөхгүй байх чадвартай, өргөтгөх боломжтой системийг бий болгох шаардлагатай болсон. Би зөвхөн сонгосон үйл явдлуудыг websocket үйлчлүүлэгч рүү илгээхийг хүссэн. Миний үйл явдлууд бусад тохиолдолд харагдахын тулд Redis Pub-sub загвараар дамжих ёстой гэж бодсон учраас би Redis pub-sub модулийг үүсгэсэн.
featured image - Би WebSocket-ийн масштабын асуудлыг банк эвдэхгүйгээр хэрхэн шийдсэн бэ
Kamil Fronczak HackerNoon profile picture

Удаан хугацааны өмнө би нэг дор олон зуун холболтыг тийм ч өндөр биш, боломжийн боловч шуурхай хариу өгөх боломжгүй өргөтгөх боломжтой системийг бий болгох хэрэгтэй болсон.


Миний анхны бодол? Бүх үүсгэх/засах/устгах үйлдлүүдийг дараалалд шилжүүлж, WebSocket-ээр хийсэн үйлдэл нь амжилттай болсон эсэхийг хэрэглэгчдэд мэдэгдэцгээе.


Гэхдээ тэр үед би WebSockets-ийн үйлдвэрлэлийн талаар тийм ч их туршлагагүй байсан тул миний эхний алхам бол заавар, стекийн ачаалал болон бусад эх сурвалжийн тусламжтайгаар хэрхэн ажилладагийг судлах явдал байв.


Тиймээс, хэсэг хугацааны дараа би энэ нь хэрхэн ажиллах ёстой талаар ойлголттой болж, код бэлдэж, ачаалал шалгах хэрэглүүрийг ашиглан ачаалал ихтэй урсгалыг дуурайж эхлэв.

Эхний асуудал

Зарим асуулт, хариулт нь холбогдсон WebSocket клиент дээрх Redis instance дээр бүртгүүлэх аргыг дуудахыг санал болгосон.


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


Гэхдээ ийм байдлаар бид Redis-тэй шинэ холболт үүсгэж байгаа тул манай программ болон Redis холболтын сан дахь санах ойн хэрэглээ нэмэгдэж байна. (Redis нь нэг жишээнд зөвхөн 10к холболтыг зөвшөөрдөг)


Энэ нь миний хувьд маш том үгүй байсан, учир нь би санах ойн хэрэглээг хамгийн бага хэмжээнд хүртэл бууруулах шаардлагатай болсон.


Яг одоо олон нийтлэлд аз болоход та WebSocket клиент бүр дээр шинэ Redis холболт үүсгэх ёсгүй гэж дурдсан байдаг.

Хоёр дахь асуудал

Бизнесийн кодын том хэсгийг үүсгэсний дараа вэб залгууртай хэсгийг эхлүүлэх үед миний толгойд асуулт гарч ирэв - тэдгээрийг хэрхэн зөв, аюулгүй байдлаар үүсгэх вэ?


Би системд зарим үйл явдлуудтай байсан бөгөөд тэдгээрийн заримыг нь WebSockets-ээр дамжуулан нэмж нийтлэхэд бэлэн байсан, үлдсэн хэсэг нь систем дотор үлдэх зорилготой байсан.


Миний алтан амлалт бол би кодыг эрс өөрчлөх шаардлагагүй бөгөөд зөвхөн сонгосон үйл явдлуудыг websocket үйлчлүүлэгч рүү илгээх боломжтой байсан.


Тийм ч учраас миний үйл явдлууд бусад тохиолдлуудад харагдахын тулд Redis pub-sub загвараар дамжих ёстой гэж бодсон учраас би эхлээд Redis pub-sub модулийг үүсгэсэн.


Доорх модулийг хараад сэтгэлээр унах хэрэггүй, учир нь би дараа нь хэрэглээний тохиолдолд дэлгэрэнгүй тайлбарлах болно


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


Энэ модуль нь Pub Redis-ийн үйлчлүүлэгчийг үүсгэх/илчлэх, Redis pub-sub дээр өгөгдсөн үйл явдлуудыг сонсож, үйл явдлын ялгаруулагчаар дамжуулан дахин гаргах үүрэгтэй registerEvents гэсэн нэмэлт аргыг ил гаргахад анхаардаг.


Одоохондоо бага зэрэг манантай байж магадгүй. Яагаад үйл явдлыг дахин гаргах болов? Бид яагаад эдгээр арга хэмжээнд бүртгүүлэх шаардлагатай байна вэ? EVENT_EMITTER_TOKEN болон EVENT_SUBSCRIBER_TOKEN гэж юу вэ, яагаад бид тэдгээрийг экспортлох ёстой вэ?


Энэ нь бодит амьдрал дээр илүү ойлгомжтой байх тул хэрэглээний кейс - chat messages үүсгэцгээе. Бид HTTP POST-ээр мессеж илгээж, урд талын WebSocket-ээр дамжуулан хүлээн авах боломжтой байхыг хүсч байна.


Эхэлцгээе

Үйл явдлыг нийтлэх

Үүнд зориулсан модуль энд байна

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


Мөн POST хүсэлтийг хүлээн авсны дараа энэ модуль гарах үйл явдал


 export class NewMessageEvent { constructor(public readonly message: string) {} }


Удирдлагад бид өөрсдийн систем болон Redis пабын дараалалд үйл явдлуудыг гаргах боломжтой болгох ёстой. Бид ороосон хэлбэрээр ашиглах болно

Үүний тулд EventEmitter2


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


Тэгээд бид үүнийг хянагчдаа ашиглах боломжтой болно


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


Гэхдээ үүнээс өмнө бид RedisEventEmitter-д манай үйл явдлыг барьж аваад Redis пабын дараалалд цацах боломжийг олгохын тулд PublishableEventInterface ашиглан арга хэмжээгээ сайжруулах хэрэгтэй.


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


Гайхалтай, бид одоо өмнөх шигээ арга хэмжээнүүдээ илгээж байна, гэхдээ одоо тэдгээрийг нийтлэх боломжтой гэж тэмдэглэвэл Redis пабын дараалалд орох болно.

Гэхдээ одоо бид WebSocket дээр эдгээр үйл явдлыг хүлээн авах боломжтой болгох хэрэгтэй байна, тийм үү?

Үйл явдал хүлээн авч байна

Ингээд хэрэглэгчийн чатын модулийг харцгаая


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


Таны харж байгаагаар бид өмнө дурдсан аргыг ашигласан - registerEvents.

Энэ аргын ачаар бид RedisEventPubSubModule-д манай NewMessageEvent үйл явдлыг publishableEventName шинж чанар дээрх Redis pub-sub дараалалд сонсох ёстой гэж хэлсэн.


Тиймээс, хэрэв ямар нэгэн NewMessageEvent үйл явдал тохиолдвол энэ нь ердийн NewMessageEvent үйл явдал болгон дахин цацагдах болно, гэхдээ publishableEventName шинж чанарын дор.


Энэ нь 1 тохиолдол эсвэл 1000 тохиолдол дээр ажиллах болно гэдгийг тэмдэглэх нь зүйтэй. Тиймээс бид олон тооны тохиолдлуудад масштабтай байсан ч тэд тус бүр үүнийг хүлээн авч, энэ үйл явдлыг систем дотор дахин гаргах болно.


Тиймээс одоо бид үйл явдлыг ялгаж, сонсох чадвартай болсон. Одоо бид тэдгээрийг вэбсокет үйлчлүүлэгчиддээ хүргэх хэрэгтэй.

Websocket Gateway

Websocket Gateway-г харцгаая


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Тэгэхээр нэг зүйл бий, бүтээгч дээр бид EVENT_SUBSCRIBER_TOKEN-тэй байгаа бөгөөд энэ нь EventSubscriberInterface төрөл юм. Гэхдээ энэ нь үнэхээр юу хийдэг вэ? Бүрээсний доор ингэж харагдаж байна


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


Энэ бол бид createWebsocketStreamFromEventFactory аргад ашиглаж байгаа EventEmitter2-д зориулсан зүгээр л боодол юм.


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


Вебсокет клиентүүд холбогдож, салгах үед устгах үед publishableName дээр динамик сонсогч үүсгэхийн тулд бид энэхүү ороосон EventEmitter2-г ашиглаж байна.


Дараа нь бид websocket холболтыг хадгалахын тулд rxjs урсгал үүсгэхээс өөр зүйл хийхгүй бөгөөд observer.next(message)-ээр дамжуулан сонсогчоос мессеж илгээх болно; шинэ мессеж ирэх үед.

Энэ үйл явдал манай сонсогчдод хэрхэн хүрэх бол?


Хэрэв та кодын эхний хэсэг болох манай Redis pub дэд модуль руу буцах юм бол үүнийг registerEvents аргаас олж харж болно.

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


Энэ нь үндсэндээ пабын дараалалд байгаа үйл явдлуудыг сонсож, дараа нь үйл явдлын ялгаруулагчаар дамжуулан дахин дамжуулдаг.


Ингээд энд хийсэн ажлаа дүгнэж хэлье

  • Бид EventEmitter2-ээр дамжуулан системд үйл явдлуудаа ашигласаар байгаа боловч хэрэв бид холбогдсон вэбсокет үйлчлүүлэгчиддээ нийтлэхийг хүсвэл PublishableInterface-ийг хэрэгжүүлэхэд л хангалттай.

  • Бид холбогдсон вэбсокет клиент бүр дээр шинэ Redis холболт үүсгэхгүй

  • Бид системээ X инстанц болгон томруулж болох бөгөөд энэ нь ижил байдлаар ажиллах болно - холбогдсон үйлчлүүлэгч бүр үйл явдлын хуулбарыг вэбсокетээр дамжуулан авах болно.


Ажлын код болон жишээг эндээс авах боломжтой: https://github.com/axotion/nestjs-events-websocket

L O A D I N G
. . . comments & more!

About Author

Kamil Fronczak HackerNoon profile picture
Kamil Fronczak@axotion
I’m a 2X-year-old tech dude from Poland, and this is my blog about tech stuff: NestJS, Node

TAG ҮҮ

ЭНЭ ӨГҮҮЛЛИЙГ ТОЛГОЙЛУУЛСАН...