paint-brush
ฉันแก้ไขปัญหาการปรับขนาด WebSocket ได้อย่างไรโดยไม่ต้องเสียเงินมากโดย@axotion
13,322 การอ่าน
13,322 การอ่าน

ฉันแก้ไขปัญหาการปรับขนาด WebSocket ได้อย่างไรโดยไม่ต้องเสียเงินมาก

โดย Kamil Fronczak11m2025/01/19
Read on Terminal Reader

นานเกินไป; อ่าน

เมื่อนานมาแล้ว ฉันต้องสร้างระบบที่ปรับขนาดได้ซึ่งสามารถจัดการการเชื่อมต่อพร้อมกันได้หลายร้อยรายการด้วยต้นทุนที่ไม่สูงเกินไป และมีเวลาตอบสนองที่เหมาะสมแต่ไม่ทันที ฉันต้องการส่งเฉพาะเหตุการณ์ที่เลือกไปยังไคลเอนต์ WebSocket ฉันจึงสร้างโมดูล Redis pub-sub เนื่องจากฉันคิดว่าเหตุการณ์ของฉันจะต้องถูกส่งผ่านรูปแบบ Redis Pub-sub เพื่อให้มองเห็นได้ในอินสแตนซ์อื่นๆ
featured image - ฉันแก้ไขปัญหาการปรับขนาด WebSocket ได้อย่างไรโดยไม่ต้องเสียเงินมาก
Kamil Fronczak HackerNoon profile picture

เมื่อนานมาแล้ว ฉันพบว่าตัวเองอยู่ในสถานการณ์ที่จำเป็นต้องสร้างระบบที่ปรับขนาดได้ซึ่งสามารถรองรับการเชื่อมต่อพร้อมกันได้หลายร้อยรายการด้วยต้นทุนที่ไม่สูงมากนัก และมีเวลาในการตอบสนองที่สมเหตุสมผล แต่ไม่ได้รวดเร็วทันที


ความคิดแรกของฉันคืออะไร? ให้เราย้ายการดำเนินการสร้าง/แก้ไข/ลบทั้งหมดไปที่คิว และแจ้งให้ผู้ใช้ทราบหากการดำเนินการของพวกเขาประสบความสำเร็จหรือไม่ผ่าน WebSocket


แต่เมื่อก่อนนี้ ฉันไม่มีประสบการณ์กับ WebSockets ในระบบการผลิตมากนัก ดังนั้น ขั้นตอนแรกของฉันคือการสืบหาว่ามันทำงานอย่างไรด้วยความช่วยเหลือจากแบบฝึกสอน Stack Overflow และแหล่งอื่นๆ


หลังจากนั้นไม่นาน ฉันก็เข้าใจคร่าวๆ ว่ามันควรทำงานอย่างไร และเริ่มเตรียมโค้ดและทดลองใช้เครื่องมือทดสอบโหลดเพื่อจำลองปริมาณการเข้าชมสูงอยู่พักหนึ่ง

ปัญหาแรก

คำถามและคำตอบบางข้อแนะนำให้เรียกใช้เมธอด subscribe บนอินสแตนซ์ Redis บนไคลเอนต์ WebSocket ที่เชื่อมต่อ


 io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });


แต่ด้วยวิธีนี้ เราจะสร้างการเชื่อมต่อใหม่กับ Redis ดังนั้นการใช้งานหน่วยความจำในแอปพลิเคชันและพูลการเชื่อมต่อ Redis ของเราจึงเพิ่มขึ้น (Redis อนุญาตให้เชื่อมต่อกับอินสแตนซ์เดียวได้เพียง 10,000 รายการ)


นั่นเป็นสิ่งที่ไม่ดีสำหรับฉันเลย เพราะฉันต้องลดการใช้งานหน่วยความจำให้เหลือน้อยที่สุด


ในขณะนี้ บทความจำนวนมากได้กล่าวถึงว่าคุณไม่ควรสร้างการเชื่อมต่อ Redis ใหม่บนไคลเอนต์ WebSocket แต่ละตัว

ปัญหาที่สอง

หลังจากสร้างโค้ดธุรกิจจำนวนมากแล้ว เมื่อผมเริ่มทำส่วนด้วยเว็บซ็อคเก็ต คำถามก็ผุดขึ้นมาในหัวว่าจะสร้างมันขึ้นมาอย่างถูกต้องและปลอดภัยได้อย่างไร


ฉันมีเหตุการณ์บางอย่างอยู่ในระบบแล้ว และบางเหตุการณ์ก็พร้อมที่จะเผยแพร่เพิ่มเติมผ่าน WebSockets ส่วนที่เหลือจะอยู่ในระบบ


คำสัญญาทองของฉันคือฉันไม่จำเป็นต้องเปลี่ยนโค้ดอย่างมากและยังคงสามารถส่งเฉพาะเหตุการณ์ที่เลือกไปยังไคลเอนต์ WebSocket ได้


นั่นเป็นเหตุผลว่าทำไมฉันถึงสร้างโมดูล Redis pub-sub ขึ้นมาในตอนแรก เพราะว่าฉันคิดว่าเหตุการณ์ต่างๆ ของฉันจะต้องถูกส่งผ่านรูปแบบ Redis pub-sub เพื่อที่จะปรากฏในอินสแตนซ์อื่นๆ


อย่ารู้สึกเครียดเมื่อดูโมดูลด้านล่าง เนื่องจากฉันจะอธิบายรายละเอียดในกรณีการใช้งานในภายหลัง


 export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }


โมดูลนี้จะดูแลการสร้าง/เปิดเผยไคลเอนต์ Pub Redis และเปิดเผยวิธีการเพิ่มเติม - registerEvents ซึ่งรับผิดชอบในการรับฟังเหตุการณ์ที่กำหนดบน Redis pub-sub และส่งออกอีกครั้งผ่านตัวปล่อยเหตุการณ์


ตอนนี้อาจจะยังดูคลุมเครืออยู่บ้าง เหตุใดจึงต้องส่งอีเวนต์ซ้ำ ทำไมเราถึงต้องลงทะเบียนอีเวนต์เหล่านั้น EVENT_EMITTER_TOKEN และ EVENT_SUBSCRIBER_TOKEN คืออะไร และทำไมเราถึงต้องส่งออกอีเวนต์เหล่านี้


การใช้งานจริงจะชัดเจนยิ่งขึ้น ดังนั้นมาสร้างกรณีการใช้งาน - ข้อความแชท เราต้องการส่งข้อความผ่าน HTTP POST และรับข้อความผ่าน WebSocket บนส่วนหน้า


มาเริ่มกันเลย

กิจกรรมการตีพิมพ์

นี่คือโมดูลสำหรับสิ่งนั้น

 @Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}


และเหตุการณ์ที่โมดูลนี้จะส่งออกมาหลังจากได้รับคำขอ POST


 export class NewMessageEvent { constructor(public readonly message: string) {} }


ในตัวควบคุม เราต้องทำให้สามารถส่งเหตุการณ์ทั้งสำหรับระบบของเราและคิวผับ Redis ได้ เราจะใช้ wrapped

EventEmitter2 สำหรับสิ่งนั้น


 export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }


แล้วเราก็สามารถใช้งานมันในคอนโทรลเลอร์ของเราได้


 @Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }


แต่ก่อนหน้านั้น เราจะต้องปรับปรุงกิจกรรมของเราด้วย PublishableEventInterface เพื่อให้ RedisEventEmitter จับกิจกรรมของเราและส่งออกในคิว pub ของ Redis


 export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }


เยี่ยมมาก ตอนนี้เรากำลังส่งกิจกรรมของเราเหมือนเคย แต่ตอนนี้ ถ้ากิจกรรมเหล่านั้นถูกทำเครื่องหมายว่าเผยแพร่ได้ กิจกรรมเหล่านั้นจะไปอยู่ในคิวเผยแพร่ของ Redis

แต่ตอนนี้ เราต้องทำให้สามารถรับเหตุการณ์เหล่านั้นบน WebSocket ได้ใช่หรือไม่

การรับเหตุการณ์

มาดูโมดูลแชทผู้ใช้ของเรากันดีกว่า


 @Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}


อย่างที่คุณเห็น เราใช้วิธีการที่กล่าวไว้ก่อนหน้านี้ - registerEvents

ด้วยวิธีดังกล่าว เราจึงแจ้ง RedisEventPubSubModule ว่าควรรับฟังเหตุการณ์ NewMessageEvent ของเราในคิว pub-sub ของ Redis ในแอตทริบิวต์ publishableEventName


ดังนั้น หากเกิดเหตุการณ์ NewMessageEvent ใดๆ เหตุการณ์นั้นจะถูกส่งกลับมาอีกครั้งเป็นเหตุการณ์ NewMessageEvent ปกติ แต่อยู่ภายใต้แอตทริบิวต์ publishableEventName


ควรกล่าวถึงว่าระบบจะทำงานได้กับอินสแตนซ์ 1 หรือ 1,000 อินสแตนซ์ ดังนั้นแม้ว่าเราจะขยายจำนวนอินสแตนซ์ให้มากขึ้น แต่ละอินสแตนซ์ก็จะรับข้อมูลนี้และส่งเหตุการณ์นี้ซ้ำภายในระบบ


ตอนนี้เรามีความสามารถในการส่งเหตุการณ์และรับฟังเหตุการณ์เหล่านั้นแล้ว ตอนนี้เราต้องส่งเหตุการณ์เหล่านั้นไปยังไคลเอนต์ WebSocket ของเรา

เว็บซ็อกเก็ตเกตเวย์

มาดู Websocket Gateway กัน


 export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


มีสิ่งหนึ่งใน constructor ที่เรามี EVENT_SUBSCRIBER_TOKEN ซึ่งมีประเภทคือ EventSubscriberInterface แต่จริงๆ แล้ว มันทำอะไรได้บ้าง? นี่คือลักษณะภายใต้ประทุน


 export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }


มันเป็นเพียงตัวห่อหุ้มสำหรับ EventEmitter2 ที่เราใช้ในวิธี createWebsocketStreamFromEventFactory


 private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }


เรากำลังใช้ EventEmitter2 ที่ห่อหุ้มนี้เพื่อสร้างตัวรับฟังแบบไดนามิกบน publishableName เมื่อไคลเอนต์ websocket เชื่อมต่อและลบออกเมื่อตัดการเชื่อมต่อ


จากนั้นเราจะไม่ทำอะไรมากกว่าการสร้างสตรีม rxjs เพื่อรักษาการเชื่อมต่อ websocket และส่งข้อความจากผู้รับฟังผ่าน observer.next(message); เมื่อมีข้อความใหม่เกิดขึ้น

กิจกรรมนี้จะเข้าถึงผู้ฟังของเราอย่างไร?


หากคุณกลับไปที่โค้ดส่วนแรกของโมดูลย่อย pub ของ Redis คุณจะสามารถระบุสิ่งนี้ได้ในวิธี registerEvents

 for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });


ซึ่งโดยพื้นฐานแล้วทำหน้าที่รับฟังเหตุการณ์ในคิวผับ จากนั้นจึงส่งออกไปอีกครั้งผ่านตัวส่งเหตุการณ์


มาสรุปกันว่าเราทำอะไรกันตรงนี้

  • เรายังคงใช้เหตุการณ์ของเราในระบบเหมือนอย่างที่เคยใช้ผ่าน EventEmitter2 แต่ถ้าเราต้องการเผยแพร่ไปยังไคลเอนต์ websocket ที่เชื่อมต่อของเรา สิ่งที่เราต้องทำคือใช้งาน PublishableInterface

  • เราไม่ได้สร้างการเชื่อมต่อ Redis ใหม่บนไคลเอนต์ websocket ที่เชื่อมต่อแต่ละราย

  • เราขยายระบบของเราให้รองรับอินสแตนซ์ X ได้ และจะยังคงทำงานในลักษณะเดียวกัน นั่นคือ ไคลเอนต์ที่เชื่อมต่อแต่ละรายจะได้รับสำเนาของเหตุการณ์ผ่านเว็บซ็อกเก็ต โดยไม่คำนึงว่าจะเชื่อมต่อกับอินสแตนซ์ใด


โค้ดการทำงานและตัวอย่างมีอยู่ที่นี่: https://github.com/axotion/nestjs-events-websocket