Даўным-даўно я апынуўся ў сітуацыі, калі мне трэба было стварыць маштабаваную сістэму, якая магла б апрацоўваць сотні адначасовых злучэнняў па невялікай цане і з разумным, але не імгненным часам адказу.
Мае першыя думкі? Давайце перамесцім усе дзеянні стварэння/рэдагавання/выдалення ў чаргу і паведамім карыстальнікам, калі іх дзеянні ўдаліся або не праз WebSocket.
Але тады ў мяне не было вялікага досведу працы з WebSockets у вытворчасці, таму маім першым крокам было даследаванне таго, як гэта працуе з дапамогай падручнікаў, перапаўнення стэка і іншых крыніц.
Такім чынам, праз некаторы час я зразумеў, як гэта павінна працаваць, і пачаў рыхтаваць код і некаторы час важдацца з інструментам нагрузачных тэстаў для мадэлявання высокага трафіку.
У некаторых пытаннях і адказах прапаноўвалася выклікаць метад падпіскі ў асобніку Redis на падключаным кліенце WebSocket.
io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });
Але такім чынам мы ствараем новае злучэнне з Redis, таму выкарыстанне памяці ў нашым дадатку і пуле злучэнняў Redis расце. (Redis дазваляе толькі 10 тысяч падключэнняў да аднаго асобніка)
Для мяне гэта было вялікім "не", таму што я павінен быў знізіць выкарыстанне памяці да мінімуму.
Зараз у многіх артыкулах, на шчасце, згадваецца, што не варта ствараць новае злучэнне Redis для кожнага кліента WebSocket.
Пасля стварэння вялікага кавалка бізнес-кода, калі я пачаў працу з вэб-сокетамі, у мяне ўзнікла пытанне - як стварыць іх правільным і бяспечным спосабам?
У мяне ўжо былі некаторыя падзеі ў сістэме, і частка з іх была гатовая да дадатковай публікацыі праз WebSockets, астатнія павінны былі заставацца ў сістэме.
Маё залатое абяцанне заключалася ў тым, што мне не давядзецца кардынальна змяняць код і пры гэтым я змагу адпраўляць толькі выбраныя падзеі кліентам websocket.
Вось чаму спачатку я стварыў модуль Redis pub-sub, бо думаў, што мае падзеі, каб быць бачнымі ў іншых выпадках, павінны перадавацца праз шаблон Redis pub-sub.
Не адчувайце сябе прыгнечаным, гледзячы на модуль ніжэй, бо я растлумачу падрабязнасці пазней у выпадку выкарыстання
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }
Гэты модуль клапоціцца аб стварэнні/адкрыцці кліента Pub Redis і адкрыцці дадатковага метаду - registerEvents, які адказвае за праслухоўванне дадзеных падзей на Redis pub-sub і іх паўторнае выпраменьванне праз эмітэр падзей.
Пакуль можа быць невялікі туман. Навошта перадаваць падзеі? Навошта нам рэгістравацца на гэтыя мерапрыемствы? Што такое EVENT_EMITTER_TOKEN
і EVENT_SUBSCRIBER_TOKEN
і чаму мы павінны іх экспартаваць?
Гэта будзе больш зразумела пры выкарыстанні ў рэальным жыцці, таму давайце створым варыянт выкарыстання - паведамленні ў чаце. Мы хочам мець магчымасць адпраўляць паведамленні праз HTTP POST і атрымліваць іх праз WebSocket на інтэрфейсе.
Давайце пачнем
Вось модуль для гэтага
@Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}
І падзея, якую гэты модуль будзе выдаваць пасля атрымання запыту POST
export class NewMessageEvent { constructor(public readonly message: string) {} }
У кантролеры мы павінны зрабіць магчымым выдаваць падзеі як для нашай сістэмы, так і для чаргі Redis. Мы будзем выкарыстоўваць загорнутыя
EventEmitter2 для гэтага
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }
І тады мы можам выкарыстоўваць яго ў нашым кантролеры
@Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }
Але перад гэтым мы павінны палепшыць нашу падзею з дапамогай PublishableEventInterface, каб дазволіць RedisEventEmitter лавіць нашу падзею і выдаваць яе ў чаргу паба Redis.
export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }
Выдатна, цяпер мы адпраўляем нашы падзеі, як і раней, але цяпер, калі яны пазначаны як даступныя для публікацыі, яны будуць трапляць у чаргу паба Redis.
Але цяпер нам трэба зрабіць магчымым атрыманне гэтых падзей на WebSocket, праўда?
Такім чынам, давайце паглядзім на наш модуль карыстальніцкага чата
@Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}
Як бачыце, мы выкарыстоўвалі метад, згаданы раней - registerEvents.
Дзякуючы гэтаму метаду мы сказалі RedisEventPubSubModule, што ён павінен праслухоўваць нашу падзею NewMessageEvent у чарзе Redis pub-sub па атрыбуту publishableEventName.
Такім чынам, калі адбываецца якая-небудзь падзея NewMessageEvent, яна будзе перададзена як звычайная падзея NewMessageEvent, але з атрыбутам publishableEventName.
Варта адзначыць, што ён будзе працаваць на 1 экземпляр або 1000 асобнікаў. Такім чынам, нават калі мы маштабуем вялікую колькасць асобнікаў, кожны з іх атрымае гэта і паўторна выпусціць гэтую падзею ўнутры сістэмы.
Такім чынам, цяпер у нас ёсць магчымасці трансляваць падзеі і праслухоўваць іх. Цяпер нам трэба даставіць іх нашым кліентам websocket.
Давайце паглядзім на Websocket Gateway
export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
Такім чынам, у канструктары ёсць EVENT_SUBSCRIBER_TOKEN, тып якога - EventSubscriberInterface. Але што гэта сапраўды робіць? Вось як гэта выглядае пад капотам
export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }
Гэта проста абалонка для EventEmitter2, якую мы выкарыстоўваем у метадзе createWebsocketStreamFromEventFactory
private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
Мы выкарыстоўваем гэты абгорнуты EventEmitter2 для стварэння дынамічных слухачоў на publishableName, калі кліенты websocket падключаюцца і выдаляюцца пры адключэнні.
Затым мы робім не што іншае, як стварэнне патоку rxjs для падтрымання злучэння праз вэб-сокет і адпраўкі паведамленняў ад слухача праз observer.next(message); пры з'яўленні новага паведамлення.
Як гэтая падзея дойдзе да нашых слухачоў?
Калі вы вернецеся да першага фрагмента кода, нашага падмодуля Redis pub, вы можаце заўважыць гэта ў метадзе registerEvents
for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });
Які ў асноўным праслухоўвае падзеі ў чарзе паба, а затым паўторна выпраменьвае іх праз эмітэр падзей.
Такім чынам, давайце падвядзем вынік таго, што мы тут зрабілі
Мы па-ранейшаму выкарыстоўваем нашы падзеі ў сістэме, як раней праз EventEmitter2, але калі мы хочам публікаваць на нашых падключаных кліентах websocket, то ўсё, што нам трэба зрабіць, гэта рэалізаваць PublishableInterface
Мы не ствараем новыя злучэнні Redis для кожнага падключанага кліента websocket
Мы можам павялічыць нашу сістэму да X асобнікаў, і яна ўсё роўна будзе паводзіць сябе такім жа чынам - кожны падлучаны кліент атрымае копію падзеі праз websocket, незалежна ад таго, да якога асобніка яны будуць падключаны
Працоўны код і прыклад даступныя тут: https://github.com/axotion/nestjs-events-websocket