เมื่อนานมาแล้ว ฉันพบว่าตัวเองอยู่ในสถานการณ์ที่จำเป็นต้องสร้างระบบที่ปรับขนาดได้ซึ่งสามารถรองรับการเชื่อมต่อพร้อมกันได้หลายร้อยรายการด้วยต้นทุนที่ไม่สูงมากนัก และมีเวลาในการตอบสนองที่สมเหตุสมผล แต่ไม่ได้รวดเร็วทันที
ความคิดแรกของฉันคืออะไร? ให้เราย้ายการดำเนินการสร้าง/แก้ไข/ลบทั้งหมดไปที่คิว และแจ้งให้ผู้ใช้ทราบหากการดำเนินการของพวกเขาประสบความสำเร็จหรือไม่ผ่าน WebSocket
แต่เมื่อก่อนนี้ ฉันไม่มีประสบการณ์กับ WebSockets ในระบบการผลิตมากนัก ดังนั้น ขั้นตอนแรกของฉันคือการสืบหาว่ามันทำงานอย่างไรด้วยความช่วยเหลือจากแบบฝึกสอน Stack Overflow และแหล่งอื่นๆ
หลังจากนั้นไม่นาน ฉันก็เข้าใจคร่าวๆ ว่ามันควรทำงานอย่างไร และเริ่มเตรียมโค้ดและทดลองใช้เครื่องมือทดสอบโหลดเพื่อจำลองปริมาณการเข้าชมสูงอยู่พักหนึ่ง
คำถามและคำตอบบางข้อแนะนำให้เรียกใช้เมธอด subscribe บนอินสแตนซ์ Redis บนไคลเอนต์ WebSocket ที่เชื่อมต่อ
io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });
แต่ด้วยวิธีนี้ เราจะสร้างการเชื่อมต่อใหม่กับ Redis ดังนั้นการใช้งานหน่วยความจำในแอปพลิเคชันและพูลการเชื่อมต่อ Redis ของเราจึงเพิ่มขึ้น (Redis อนุญาตให้เชื่อมต่อกับอินสแตนซ์เดียวได้เพียง 10,000 รายการ)
นั่นเป็นสิ่งที่ไม่ดีสำหรับฉันเลย เพราะฉันต้องลดการใช้งานหน่วยความจำให้เหลือน้อยที่สุด
ในขณะนี้ บทความจำนวนมากได้กล่าวถึงว่าคุณไม่ควรสร้างการเชื่อมต่อ Redis ใหม่บนไคลเอนต์ WebSocket แต่ละตัว
หลังจากสร้างโค้ดธุรกิจจำนวนมากแล้ว เมื่อผมเริ่มทำส่วนด้วยเว็บซ็อคเก็ต คำถามก็ผุดขึ้นมาในหัวว่าจะสร้างมันขึ้นมาอย่างถูกต้องและปลอดภัยได้อย่างไร
ฉันมีเหตุการณ์บางอย่างอยู่ในระบบแล้ว และบางเหตุการณ์ก็พร้อมที่จะเผยแพร่เพิ่มเติมผ่าน WebSockets ส่วนที่เหลือจะอยู่ในระบบ
คำสัญญาทองของฉันคือฉันไม่จำเป็นต้องเปลี่ยนโค้ดอย่างมากและยังคงสามารถส่งเฉพาะเหตุการณ์ที่เลือกไปยังไคลเอนต์ WebSocket ได้
นั่นเป็นเหตุผลว่าทำไมฉันถึงสร้างโมดูล Redis pub-sub ขึ้นมาในตอนแรก เพราะว่าฉันคิดว่าเหตุการณ์ต่างๆ ของฉันจะต้องถูกส่งผ่านรูปแบบ Redis pub-sub เพื่อที่จะปรากฏในอินสแตนซ์อื่นๆ
อย่ารู้สึกเครียดเมื่อดูโมดูลด้านล่าง เนื่องจากฉันจะอธิบายรายละเอียดในกรณีการใช้งานในภายหลัง
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }
โมดูลนี้จะดูแลการสร้าง/เปิดเผยไคลเอนต์ Pub Redis และเปิดเผยวิธีการเพิ่มเติม - registerEvents ซึ่งรับผิดชอบในการรับฟังเหตุการณ์ที่กำหนดบน Redis pub-sub และส่งออกอีกครั้งผ่านตัวปล่อยเหตุการณ์
ตอนนี้อาจจะยังดูคลุมเครืออยู่บ้าง เหตุใดจึงต้องส่งอีเวนต์ซ้ำ ทำไมเราถึงต้องลงทะเบียนอีเวนต์เหล่านั้น EVENT_EMITTER_TOKEN
และ EVENT_SUBSCRIBER_TOKEN
คืออะไร และทำไมเราถึงต้องส่งออกอีเวนต์เหล่านี้
การใช้งานจริงจะชัดเจนยิ่งขึ้น ดังนั้นมาสร้างกรณีการใช้งาน - ข้อความแชท เราต้องการส่งข้อความผ่าน HTTP POST และรับข้อความผ่าน WebSocket บนส่วนหน้า
มาเริ่มกันเลย
นี่คือโมดูลสำหรับสิ่งนั้น
@Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}
และเหตุการณ์ที่โมดูลนี้จะส่งออกมาหลังจากได้รับคำขอ POST
export class NewMessageEvent { constructor(public readonly message: string) {} }
ในตัวควบคุม เราต้องทำให้สามารถส่งเหตุการณ์ทั้งสำหรับระบบของเราและคิวผับ Redis ได้ เราจะใช้ wrapped
EventEmitter2 สำหรับสิ่งนั้น
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }
แล้วเราก็สามารถใช้งานมันในคอนโทรลเลอร์ของเราได้
@Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }
แต่ก่อนหน้านั้น เราจะต้องปรับปรุงกิจกรรมของเราด้วย PublishableEventInterface เพื่อให้ RedisEventEmitter จับกิจกรรมของเราและส่งออกในคิว pub ของ Redis
export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }
เยี่ยมมาก ตอนนี้เรากำลังส่งกิจกรรมของเราเหมือนเคย แต่ตอนนี้ ถ้ากิจกรรมเหล่านั้นถูกทำเครื่องหมายว่าเผยแพร่ได้ กิจกรรมเหล่านั้นจะไปอยู่ในคิวเผยแพร่ของ Redis
แต่ตอนนี้ เราต้องทำให้สามารถรับเหตุการณ์เหล่านั้นบน WebSocket ได้ใช่หรือไม่
มาดูโมดูลแชทผู้ใช้ของเรากันดีกว่า
@Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}
อย่างที่คุณเห็น เราใช้วิธีการที่กล่าวไว้ก่อนหน้านี้ - registerEvents
ด้วยวิธีดังกล่าว เราจึงแจ้ง RedisEventPubSubModule ว่าควรรับฟังเหตุการณ์ NewMessageEvent ของเราในคิว pub-sub ของ Redis ในแอตทริบิวต์ publishableEventName
ดังนั้น หากเกิดเหตุการณ์ NewMessageEvent ใดๆ เหตุการณ์นั้นจะถูกส่งกลับมาอีกครั้งเป็นเหตุการณ์ NewMessageEvent ปกติ แต่อยู่ภายใต้แอตทริบิวต์ publishableEventName
ควรกล่าวถึงว่าระบบจะทำงานได้กับอินสแตนซ์ 1 หรือ 1,000 อินสแตนซ์ ดังนั้นแม้ว่าเราจะขยายจำนวนอินสแตนซ์ให้มากขึ้น แต่ละอินสแตนซ์ก็จะรับข้อมูลนี้และส่งเหตุการณ์นี้ซ้ำภายในระบบ
ตอนนี้เรามีความสามารถในการส่งเหตุการณ์และรับฟังเหตุการณ์เหล่านั้นแล้ว ตอนนี้เราต้องส่งเหตุการณ์เหล่านั้นไปยังไคลเอนต์ WebSocket ของเรา
มาดู Websocket Gateway กัน
export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
มีสิ่งหนึ่งใน constructor ที่เรามี EVENT_SUBSCRIBER_TOKEN ซึ่งมีประเภทคือ EventSubscriberInterface แต่จริงๆ แล้ว มันทำอะไรได้บ้าง? นี่คือลักษณะภายใต้ประทุน
export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }
มันเป็นเพียงตัวห่อหุ้มสำหรับ EventEmitter2 ที่เราใช้ในวิธี createWebsocketStreamFromEventFactory
private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
เรากำลังใช้ EventEmitter2 ที่ห่อหุ้มนี้เพื่อสร้างตัวรับฟังแบบไดนามิกบน publishableName เมื่อไคลเอนต์ websocket เชื่อมต่อและลบออกเมื่อตัดการเชื่อมต่อ
จากนั้นเราจะไม่ทำอะไรมากกว่าการสร้างสตรีม rxjs เพื่อรักษาการเชื่อมต่อ websocket และส่งข้อความจากผู้รับฟังผ่าน observer.next(message); เมื่อมีข้อความใหม่เกิดขึ้น
กิจกรรมนี้จะเข้าถึงผู้ฟังของเราอย่างไร?
หากคุณกลับไปที่โค้ดส่วนแรกของโมดูลย่อย pub ของ Redis คุณจะสามารถระบุสิ่งนี้ได้ในวิธี registerEvents
for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });
ซึ่งโดยพื้นฐานแล้วทำหน้าที่รับฟังเหตุการณ์ในคิวผับ จากนั้นจึงส่งออกไปอีกครั้งผ่านตัวส่งเหตุการณ์
มาสรุปกันว่าเราทำอะไรกันตรงนี้
เรายังคงใช้เหตุการณ์ของเราในระบบเหมือนอย่างที่เคยใช้ผ่าน EventEmitter2 แต่ถ้าเราต้องการเผยแพร่ไปยังไคลเอนต์ websocket ที่เชื่อมต่อของเรา สิ่งที่เราต้องทำคือใช้งาน PublishableInterface
เราไม่ได้สร้างการเชื่อมต่อ Redis ใหม่บนไคลเอนต์ websocket ที่เชื่อมต่อแต่ละราย
เราขยายระบบของเราให้รองรับอินสแตนซ์ X ได้ และจะยังคงทำงานในลักษณะเดียวกัน นั่นคือ ไคลเอนต์ที่เชื่อมต่อแต่ละรายจะได้รับสำเนาของเหตุการณ์ผ่านเว็บซ็อกเก็ต โดยไม่คำนึงว่าจะเชื่อมต่อกับอินสแตนซ์ใด
โค้ดการทำงานและตัวอย่างมีอยู่ที่นี่: https://github.com/axotion/nestjs-events-websocket