かなり昔、私は、それほどコストをかけずに、適度な、しかし即時ではない応答時間で、数百の同時接続を処理できるスケーラブルなシステムを作成しなければならない状況にありました。
私の最初の考えは、すべての作成/編集/削除アクションをキューに移動し、アクションが成功したかどうかを WebSocket 経由でユーザーに通知することです。
しかし当時、私は本番環境での WebSocket の経験があまりなかったので、チュートリアル、スタック オーバーフロー、その他の情報源を参考にして、WebSocket がどのように機能するかを調べるのが最初のステップでした。
そこで、しばらくして、どのように動作するかの要点がわかり、コードを準備し、高トラフィックをシミュレートする負荷テスト ツールを使用してしばらく試行錯誤し始めました。
いくつかの質問と回答では、接続された WebSocket クライアントの Redis インスタンスで subscribe メソッドを呼び出すことが提案されていました。
io.sockets.on('connection', function (sockets) { sockets.emit('message',{Hello: 'World!'}); sub.subscribe('attack-map-production'); sockets.on('disconnect', function() { sub.unsubscribe('attack-map-production'); }); });
しかし、この方法では Redis への新しい接続が作成されるため、アプリケーションと Redis 接続プールのメモリ使用量が増加します。(Redis では 1 つのインスタンスに対して 10,000 の接続しか許可されません)
メモリ使用量を最小限に抑える必要があったため、これは私にとっては大きな問題でした。
幸いなことに、現在多くの記事で、各 WebSocket クライアントで新しい Redis 接続を作成してはいけないと述べられています。
大量のビジネス コードを作成した後、Web ソケットの部分を開始したとき、適切かつ安全な方法で Web ソケットを作成するにはどうすればよいかという疑問が頭に浮かびました。
システム内にはすでにいくつかのイベントがあり、そのうちのいくつかは WebSocket 経由で追加公開する準備ができていましたが、残りはシステム内に留まる予定でした。
私の最大の約束は、コードを大幅に変更する必要がなく、選択したイベントのみを Websocket クライアントに送信できるというものでした。
そのため、最初に Redis pub-sub モジュールを作成しました。イベントを他のインスタンスで表示するには、Redis pub-sub パターンを介して送信する必要があると考えたからです。
以下のモジュールを見ても圧倒されることはありません。詳細は後ほどユースケースで説明します。
export const REDIS_PUB_CLIENT = 'REDIS_PUB_CLIENT'; export const REDIS_SUB_CLIENT = 'REDIS_SUB_CLIENT'; export const REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS = 'REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS'; @Module({ providers: [ { provide: REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, useFactory: (options: RedisEventPubSubModuleOptions) => options, inject: [MODULE_OPTIONS_TOKEN], }, { provide: REDIS_PUB_CLIENT, useFactory: async (options: RedisEventPubSubModuleOptions) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err)); await client.connect(); return client; }, inject: [MODULE_OPTIONS_TOKEN], }, { provide: EVENT_EMITTER_TOKEN, useFactory: ( redisPubClient: RedisClientType, eventEmitter: EventEmitter2, ) => { return new RedisEventEmitter(redisPubClient, eventEmitter); }, inject: [REDIS_PUB_CLIENT, EventEmitter2], }, { provide: EVENT_SUBSCRIBER_TOKEN, useFactory: (eventEmitterSub: EventEmitter2) => { return new EventEmitter2EventSubscriber(eventEmitterSub); }, inject: [EventEmitter2], }, ], exports: [ REDIS_PUB_CLIENT, EVENT_EMITTER_TOKEN, EVENT_SUBSCRIBER_TOKEN, REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, ], }) export class RedisEventPubSubModule extends ConfigurableModuleClass { static registerEvents(eventsPublishableNames: string[]): DynamicModule { return { module: class {}, providers: [ { provide: REDIS_SUB_CLIENT, useFactory: async ( options: RedisEventPubSubModuleOptions, eventEmitter: EventEmitter2, ) => { const client = createClient({ url: `redis://${options.host}:${options.port}`, }); client.on('error', (err) => console.error('Redis Client Error', err), ); await client.connect(); for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); }); } return client; }, inject: [REDIS_EVENT_PUB_SUB_REGISTER_EVENT_OPTIONS, EventEmitter2], }, ], }; } }
このモジュールは、Pub Redis クライアントの作成/公開と、Redis pub-sub 上の特定のイベントをリッスンし、イベント エミッターを介して再送信する役割を担う追加メソッド registerEvents の公開を担当します。
今のところ、少し不明瞭かもしれません。なぜイベントを再発行するのでしょうか? なぜそれらのイベントを登録する必要があるのでしょうか? EVENT_EMITTER_TOKEN
とEVENT_SUBSCRIBER_TOKEN
とは何ですか? また、なぜそれらをエクスポートする必要があるのでしょうか?
実際の使用例を見るとより明確になりますので、ユースケースとしてチャット メッセージを作成しましょう。HTTP POST 経由でメッセージを送信し、フロントエンドで WebSocket 経由で受信できるようにします。
始めましょう
これがそのためのモジュールです
@Module({ imports: [], controllers: [], providers: [], }) export class UserChatModule {}
そして、このモジュールがPOSTリクエストを受け取った後に発行するイベント
export class NewMessageEvent { constructor(public readonly message: string) {} }
コントローラでは、システムとRedisパブキューの両方にイベントを発行できるようにする必要があります。ラップされた
EventEmitter2 はそのためにある
export const EVENT_EMITTER_TOKEN = 'EVENT_EMITTER_TOKEN'; export class RedisEventEmitter implements EventEmitterInterface { constructor( private redisPubClient: RedisClientType, private eventEmitter: EventEmitter2, ) {} async emit(eventName: string, payload: Record<any, any>): Promise<void> { this.eventEmitter.emit(eventName, payload); if (this.isPublishableEvent(payload)) { await this.redisPubClient.publish( payload.publishableEventName, JSON.stringify(payload), ); } } private isPublishableEvent(event: any): event is PublishableEventInterface { return event.publishableEventName !== undefined; } }
そして、それをコントローラーで使うことができます
@Controller('messages') export class SendMessageAction { constructor( // Previously eventEmitter2 @Inject(EVENT_EMITTER_TOKEN) private readonly eventEmitter: EventEmitterInterface, ) {} @Post() async handle(@Body() request: SendMessageHttpRequest) { await this.eventEmitter.emit( NewMessageEvent.name, new NewMessageEvent(request.content), ); } }
しかしその前に、RedisEventEmitter がイベントをキャッチして Redis pub キューに発行できるように、PublishableEventInterface を使用してイベントを拡張する必要があります。
export class NewMessageEvent implements PublishableEventInterface { static publishableEventName = 'events:new-message'; publishableEventName = NewMessageEvent.publishableEventName; constructor(public readonly message: string) {} }
素晴らしいです。以前と同じようにイベントを送信できるようになりました。ただし、イベントが公開可能としてマークされている場合は、Redis の公開キューに格納されます。
しかし、今度は、それらのイベントを WebSocket で受信できるようにする必要がありますね?
それでは、ユーザーチャットモジュールを見てみましょう
@Module({ imports: [ RedisEventPubSubModule.registerEvents([ NewMessageEvent.publishableEventName, ]), ], controllers: [SendMessageAction], providers: [], }) export class UserChatModule {}
ご覧のとおり、前述のメソッド registerEvents を使用しました。
このメソッドのおかげで、RedisEventPubSubModule に、publishableEventName 属性の Redis pub-sub キューで NewMessageEvent イベントをリッスンするように指示しました。
したがって、NewMessageEvent イベントが発生した場合、それは通常の NewMessageEvent イベントとして、ただし publishableEventName 属性の下で再発行されます。
言及する価値があるのは、これは 1 つのインスタンスでも 1,000 のインスタンスでも機能するということです。したがって、インスタンスの数を増やした場合でも、各インスタンスはこれを受信し、システム内でこのイベントを再発行します。
これで、イベントを発行してリッスンする機能ができました。次に、イベントを Websocket クライアントに配信する必要があります。
Websocket Gatewayを見てみましょう
export enum WebsocketEventSubscribeList { FETCH_EVENTS_MESSAGES = 'fetch-events-messages', EVENTS_MESSAGES_STREAM = 'events-messages-stream', } @WebSocketGateway({ pingInterval: 30000, pingTimeout: 5000, cors: { origin: '*', }, }) export class MessagesWebsocketGateway { constructor( @Inject(EVENT_SUBSCRIBER_TOKEN) private eventSubscriber: EventSubscriberInterface, ) {} @SubscribeMessage(WebsocketEventSubscribeList.FETCH_EVENTS_MESSAGES) async streamMessagesData(@ConnectedSocket() client: any) { const stream$ = this.createWebsocketStreamFromEventFactory( client, this.eventSubscriber, NewMessageEvent.publishableEventName, ); const event = WebsocketEventSubscribeList.EVENTS_MESSAGES_STREAM; return from(stream$).pipe(map((data) => ({ event, data }))); } private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
コンストラクタには、EventSubscriberInterface型のEVENT_SUBSCRIBER_TOKENがあります。しかし、これは実際に何をするのでしょうか?内部的にはこんな感じです。
export class EventEmitter2EventSubscriber implements EventSubscriberInterface { constructor(private eventEmitter: EventEmitter2) {} on(name: string, listener: any): void { this.eventEmitter.on(name, listener); } off(name: string, listener: any): void { this.eventEmitter.removeListener(name, listener); } }
これは、createWebsocketStreamFromEventFactoryメソッドで使用しているEventEmitter2のラッパーです。
private createWebsocketStreamFromEventFactory( client: any, eventSubscriber: EventSubscriberInterface, eventName: string, ): Observable<any> { return new Observable((observer) => { const dynamicListener = (message: PublishableEventInterface) => { observer.next(message); }; eventSubscriber.on(eventName, dynamicListener); client.on('disconnect', () => { eventSubscriber.off(eventName, dynamicListener); }); }); } }
このラップされた EventEmitter2 を使用して、Websocket クライアントが接続するときに publishableName に動的リスナーを作成し、切断時に削除します。
次に、Websocket 接続を維持し、新しいメッセージが発生したときに observer.next(message); を介してリスナーからメッセージを送信するための rxjs ストリームを作成するだけです。
このイベントはリスナーにどのように伝わるのでしょうか?
最初のコードスニペット、Redis pub subモジュールに戻ると、registerEventsメソッドでこれを見つけることができます。
for (const eventPublishableName of eventsPublishableNames) { await client.subscribe(eventPublishableName, (message) => { const normalizedMessage = JSON.parse( message, ) as PublishableEventInterface; delete ( normalizedMessage as Writeable<PublishableEventInterface> ).publishableEventName; eventEmitter.emit(eventPublishableName, normalizedMessage); });
基本的には、パブ キュー上のイベントをリッスンし、イベント エミッターを介してイベントを再送信します。
では、ここで何をしたかをまとめてみましょう
以前と同じようにEventEmitter2経由でシステム内のイベントを使用していますが、接続されたWebSocketクライアントに公開したい場合は、PublishableInterfaceを実装するだけです。
接続された各WebSocketクライアントに新しいRedis接続を作成することはありません
システムをXインスタンスにスケールアップしても、同じように動作します。接続された各クライアントは、どのインスタンスに接続されても、WebSocket経由でイベントのコピーを取得します。
実際のコードと例は、こちらから入手できます: https://github.com/axotion/nestjs-events-websocket