Hey👋
I'm Mads Quist, founder of All Quiet. We've implemented a home-grown message queue based on MongoDB and I'm here to talk about:
All Quiet is a modern incident management platform, similar to PagerDuty.
Our platform requires features like:
To understand our specific requirements, it's important to get some insights into our tech stack:
Ultimately, it's about minimizing the number of moving parts in your infrastructure. We aim to build fantastic features for our excellent customers, and it's imperative to maintain our services reliably. Managing a single database system to achieve more than five nines of uptime is challenging enough. So why burden yourself with managing an additional HA RabbitMQ cluster?
Yeah… cloud solutions like AWS SQS, Google Cloud Tasks, or Azure Queue Storage are fantastic! However, they would have resulted in vendor lock-in. We simply aspire to be independent and cost-effective while still providing a scalable service to our clients.
A message queue is a system that stores messages. Producers of messages store these in the queue, which are later dequeued by consumers for processing. This is incredibly beneficial for decoupling components, especially when processing messages is a resource-intensive task.
MongoDB has significantly evolved over the years and can meet the criteria listed above.
In the sections that follow, I'll guide you through the MongoDB-specific implementation of our message queue. While you'll need a client library suitable for your preferred programming language, such as NodeJS, Go, or C# in the case of All Quiet, the concepts I'll share are platform agnostic.
Each queue you want to utilize is represented as a dedicated collection in your MongoDB database.
Here's an example of a processed message:
{
"_id" : NumberLong(638269014234217933),
"Statuses" : [
{
"Status" : "Processed",
"Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"),
"NextReevaluation" : null
},
{
"Status" : "Processing",
"Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"),
"NextReevaluation" : null
},
{
"Status" : "Enqueued",
"Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
"NextReevaluation" : null
}
],
"Payload" : {
"YourData" : "abc123"
}
}
Let’s look at each property of the message.
The _id
field is the canonical unique identifier property of MongoDB. Here, it contains a NumberLong
, not an ObjectId . We need NumberLong
instead of ObjectId
because:
While ObjectId values should increase over time, they are not necessarily monotonic. This is because they:
- Only contain one second of temporal resolution, so ObjectId values created within the same second do not have a guaranteed ordering, and
- Are generated by clients, which may have differing system clocks.
In our C# implementation, we generate an Id with millisecond precision and guaranteed ordering based on insertion time. Although we don't require strict processing order in a multi-consumer environment (similar to RabbitMQ), it's essential to maintain FIFO order when operating with just one consumer. Achieving this with ObjectId is not feasible. If this isn't crucial for you, you can still use ObjectId.
The Statuses property consists of an array containing the message processing history. At index 0, you'll find the current status, which is crucial for indexing.
The status object itself contains three properties:
Status
: Can be "Enqueued", "Processing", "Processed", or "Failed".Timestamp
: This captures the current timestamp.NextReevaluation
: Records when the next evaluation should occur, which is essential for both retries and future scheduled executions.
This property contains the specific payload of your message.
Adding a message is a straightforward insert operation into the collection with the status set to "Enqueued".
NextReevaluation
to null
.NextReevaluation
to a timestamp in the future, when you want your message to be processed.db.yourQueueCollection.insert({
"_id" : NumberLong(638269014234217933),
"Statuses" : [
{
"Status" : "Enqueued",
"Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"),
"NextReevaluation" : null
}
],
"Payload" : {
"YourData" : "abc123"
}
});
Dequeuing is slightly more complex but still relatively straightforward. It heavily relies on the concurrent atomic read and update capabilities of MongoDB.
This essential feature of MongoDB ensures:
db.yourQueueCollection.findAndModify({
"query": {
"$and": [
{
"Statuses.0.Status": "Enqueued"
},
{
"Statuses.0.NextReevaluation": null
}
]
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processing",
"Timestamp": ISODate("2023-08-06T06:50:23.800+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
So we are reading one message that is in state “Enqueued” and at the same time modify it by setting the status “Processing” at position 0. Since this operation is atomic it will guarantee that the message will not be picked up by another consumer.
Once the processing of the message is complete, it's a simple matter of updating the message status to "Processed" using the message’s id.
db.yourQueueCollection.findAndModify({
"query": {
"_id": NumberLong(638269014234217933)
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processed",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
If processing fails, we need to mark the message accordingly. Often, you might want to retry processing the message. This can be achieved by re-enqueuing the message. In many scenarios, it makes sense to reprocess the message after a specific delay, such as 10 seconds, depending on the nature of the processing failure.
db.yourQueueCollection.findAndModify({
"query": {
"_id": NumberLong(638269014234217933)
},
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Failed",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000")
}
],
"$position": 0
}
}
}
});
We've established how we can easily enqueue and dequeue items from our "queue," which is, in fact, simply a MongoDB collection. We can even "schedule" messages for the future by leveraging the NextReevaluation
field.
What's missing is how we will dequeue regularly. Consumers need to execute the findAndModify
command in some kind of loop. A straightforward approach would be to create an endless loop in which we dequeue and process a message. This method is straightforward and effective. However, it will exert considerable pressure on the database and the network.
An alternative would be to introduce a delay, e.g., 100ms, between loop iterations. This will significantly reduce the load but will also decrease the speed of dequeuing.
The solution to the problem is what MongoDB refers to as a change stream.
What are change streams? I can’t explain it better than the guys at MongoDB:
Change streams allow applications to access real-time data changes […]. Applications can use change streams to subscribe to all data changes on a single collection […] and immediately react to them.
Great! What we can do is listen to newly created documents in our queue collection, which effectively means listening to newly enqueued messages
This is dead simple:
const changeStream = db.yourQueueCollection.watch();
changeStream.on('insert', changeEvent => {
// Dequeue the message
db.yourQueueCollection.findAndModify({
"query": changeEvent.documentKey._id,
"update": {
"$push": {
"Statuses": {
"$each": [
{
"Status": "Processing",
"Timestamp": ISODate("2023-08-06T06:50:24.100+0000"),
"NextReevaluation": null
}
],
"$position": 0
}
}
}
});
The change stream approach, however, does not work for both scheduled and orphaned messages because there is obviously no change that we can listen to.
For these use cases, we need to revert to our simple loop. However, we can use a rather generous delay between iterations.
"Traditional" databases, like MySQL, PostgreSQL, or MongoDB (which I also view as traditional), are incredibly powerful today. If used correctly (ensure your indexes are optimized!), they are swift, scale impressively, and are cost-effective on traditional hosting platforms.
Many use cases can be addressed using just a database and your preferred programming language. It's not always necessary to have the "right tool for the right job," meaning maintaining a diverse set of tools like Redis, Elasticsearch, RabbitMQ, etc. Often, the maintenance overhead isn't worth it.
While the solution proposed might not match the performance of, for instance, RabbitMQ, it's usually sufficient and can scale to a point that would mark significant success for your startup.
Software engineering is about navigating trade-offs. Choose yours wisely.