やあ👋
私はAll Quietの創設者、Mads Quist です。私たちはMongoDBに基づいて独自のメッセージ キューを実装しました。ここでは次のことについてお話します。
All Quiet は、PagerDuty に似た最新のインシデント管理プラットフォームです。
私たちのプラットフォームには次のような機能が必要です。
私たちの特定の要件を理解するには、技術スタックについて洞察を得ることが重要です。
最終的には、インフラストラクチャ内の可動部品の数を最小限に抑えることが重要です。私たちは優れた顧客のために素晴らしい機能を構築することを目指しており、サービスを確実に維持することが不可欠です。単一のデータベース システムを管理してファイブナインを超える稼働時間を達成することは、十分に困難です。では、なぜ追加の HA RabbitMQ クラスターの管理に負担がかかるのでしょうか?
そうですね…AWS SQS、Google Cloud Tasks、Azure Queue Storage などのクラウド ソリューションは素晴らしいです。ただし、ベンダーロックインが発生する可能性があります。私たちは、クライアントにスケーラブルなサービスを提供しながら、独立性とコスト効率に優れることを目指しています。
メッセージ キューは、メッセージを保存するシステムです。メッセージのプロデューサはこれらをキューに保存し、後でコンシューマによって処理のためにデキューされます。これは、特にメッセージの処理がリソースを大量に消費するタスクである場合に、コンポーネントを分離する場合に非常に有益です。
MongoDB は長年にわたって大幅に進化しており、上記の基準を満たすことができます。
次のセクションでは、MongoDB 固有のメッセージ キューの実装について説明します。 NodeJS、Go、All Quiet の場合は C# など、好みのプログラミング言語に適したクライアント ライブラリが必要ですが、ここで共有する概念はプラットフォームに依存しません。
利用したい各キューは、MongoDB データベース内の専用のコレクションとして表されます。
処理されたメッセージの例を次に示します。
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
メッセージの各プロパティを見てみましょう。
_id
フィールドは、MongoDB の正規の一意識別子プロパティです。ここでは、 ObjectIdではなくNumberLong
含まれています。次の理由から、 ObjectId
の代わりにNumberLong
が必要です。
ObjectId値は時間の経過とともに増加しますが、必ずしも単調であるとは限りません。その理由は次のとおりです。
- 時間解像度は 1 秒のみ含まれるため、同じ秒内に作成された ObjectId 値の順序は保証されません。
- クライアントによって生成されるため、システム クロックが異なる場合があります。
C# 実装では、ミリ秒の精度で ID を生成し、挿入時間に基づいて順序付けを保証します。マルチコンシューマ環境 (RabbitMQ と同様) では厳密な処理順序は必要ありませんが、コンシューマが 1 つだけで動作する場合は FIFO 順序を維持することが重要です。 ObjectId を使用してこれを実現することは現実的ではありません。これが重要でない場合でも、ObjectId を使用できます。
Statuses プロパティは、メッセージ処理履歴を含む配列で構成されます。インデックス 0 には、インデックス作成にとって重要な現在のステータスが表示されます。
ステータス オブジェクト自体には、次の 3 つのプロパティが含まれています。
Status
: 「エンキュー済み」、「処理中」、「処理済み」、または「失敗」のいずれかです。Timestamp
: 現在のタイムスタンプを取得します。NextReevaluation
: 次の評価がいつ行われるかを記録します。これは、再試行と将来のスケジュールされた実行の両方に不可欠です。
このプロパティには、メッセージの特定のペイロードが含まれます。
メッセージの追加は、ステータスが「エンキュー済み」に設定されたコレクションへの簡単な挿入操作です。
NextReevaluation
をnull
に設定します。NextReevaluation
将来のタイムスタンプに設定します。 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
デキューは少し複雑ですが、それでも比較的簡単です。 MongoDB の同時アトミック読み取りおよび更新機能に大きく依存しています。
MongoDB のこの重要な機能により、次のことが保証されます。
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
したがって、状態「Enqueued」にある 1 つのメッセージを読み取り、同時に位置 0 にステータス「Processing」を設定することでメッセージを変更します。この操作はアトミックであるため、メッセージが別のコンシューマによって取得されないことが保証されます。 。
メッセージの処理が完了したら、メッセージの ID を使用してメッセージのステータスを「処理済み」に更新するだけです。
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
処理が失敗した場合は、それに応じてメッセージにマークを付ける必要があります。多くの場合、メッセージの処理を再試行することが必要になる場合があります。これは、メッセージを再度キューに入れることで実現できます。多くのシナリオでは、処理エラーの性質に応じて、10 秒などの特定の遅延の後にメッセージを再処理することが合理的です。
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
私たちは、実際には単なる MongoDB コレクションである「キュー」に対して項目を簡単にエンキューしたり、そこからデキューしたりする方法を確立しました。 NextReevaluation
フィールドを利用して、将来のメッセージを「スケジュール」することもできます。
欠けているのは、定期的にデキューする方法です。コンシューマは、ある種のループでfindAndModify
コマンドを実行する必要があります。簡単なアプローチは、メッセージをデキューして処理する無限ループを作成することです。この方法は簡単で効果的です。ただし、データベースとネットワークにかなりの負荷がかかります。
別の方法は、ループの反復間に遅延 (たとえば、100ms) を導入することです。これにより負荷は大幅に軽減されますが、デキューの速度も低下します。
この問題の解決策は、MongoDB が変更ストリームと呼ぶものです。
変更ストリームとは何ですか? MongoDB の担当者よりもうまく説明することはできません。
変更ストリームにより、アプリケーションはリアルタイムのデータ変更にアクセスできるようになります […]。アプリケーションは変更ストリームを使用して、単一のコレクション […] のすべてのデータ変更をサブスクライブし、それらに即座に対応できます。
素晴らしい!私たちができることは、キュー コレクション内で新しく作成されたドキュメントをリッスンすることです。これは、事実上、新たにキューに入れられたメッセージをリッスンすることを意味します。
これは非常に単純です:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
ただし、変更ストリームのアプローチは、スケジュールされたメッセージと孤立したメッセージの両方に対しては機能しません。これは、明らかにリッスンできる変更がないためです。
これらのユースケースでは、単純なループに戻る必要があります。ただし、反復間にかなり余裕のある遅延を使用することもできます。
MySQL 、 PostgreSQL 、 MongoDB (私はこれも従来型だと考えています) などの「従来型」データベースは、今日では信じられないほど強力です。正しく使用すれば (インデックスが最適化されていることを確認してください!)、従来のホスティング プラットフォームでは迅速で、驚くほど拡張でき、コスト効率が高くなります。
多くのユースケースは、データベースと好みのプログラミング言語だけを使用して対処できます。 「適切な仕事に適したツール」、つまり Redis、Elasticsearch、RabbitMQ などの多様なツールのセットを維持する必要は必ずしも必要ありません。多くの場合、メンテナンスのオーバーヘッドはそれだけの価値がありません。
提案されたソリューションは、たとえば RabbitMQ のパフォーマンスには及ばないかもしれませんが、通常は十分であり、スタートアップにとって大きな成功をもたらすポイントまで拡張できます。
ソフトウェア エンジニアリングでは、トレードオフを回避することが重要です。賢明にお選びください。