Này👋
Tôi là Mads Quist, người sáng lập All Quiet . Chúng tôi đã triển khai hàng đợi tin nhắn tự tạo dựa trên MongoDB và tôi ở đây để nói về:
All Quiet là một nền tảng quản lý sự cố hiện đại, tương tự như PagerDuty.
Nền tảng của chúng tôi yêu cầu các tính năng như:
Để hiểu các yêu cầu cụ thể của chúng tôi, điều quan trọng là phải hiểu rõ hơn về nền tảng công nghệ của chúng tôi:
Cuối cùng, đó là việc giảm thiểu số lượng bộ phận chuyển động trong cơ sở hạ tầng của bạn. Chúng tôi mong muốn xây dựng các tính năng tuyệt vời cho những khách hàng xuất sắc của mình và bắt buộc phải duy trì dịch vụ của mình một cách đáng tin cậy. Việc quản lý một hệ thống cơ sở dữ liệu duy nhất để đạt được thời gian hoạt động hơn 5 giây là đủ thách thức. Vậy tại sao bạn lại phải gánh nặng việc quản lý một cụm HA RabbitMQ bổ sung?
Vâng… các giải pháp đám mây như AWS SQS, Google Cloud Tasks hoặc Azure Queue Storage thật tuyệt vời! Tuy nhiên, chúng sẽ dẫn đến việc nhà cung cấp bị khóa. Chúng tôi chỉ mong muốn trở nên độc lập và tiết kiệm chi phí trong khi vẫn cung cấp dịch vụ có thể mở rộng cho khách hàng của mình.
Hàng đợi tin nhắn là một hệ thống lưu trữ tin nhắn. Người tạo tin nhắn lưu trữ những tin nhắn này trong hàng đợi, sau đó được người tiêu dùng đưa ra hàng đợi để xử lý. Điều này cực kỳ có lợi cho việc tách các thành phần, đặc biệt khi xử lý tin nhắn là một nhiệm vụ tiêu tốn nhiều tài nguyên.
MongoDB đã phát triển đáng kể trong những năm qua và có thể đáp ứng các tiêu chí được liệt kê ở trên.
Trong các phần tiếp theo, tôi sẽ hướng dẫn bạn cách triển khai hàng đợi thư dành riêng cho MongoDB. Mặc dù bạn sẽ cần một thư viện máy khách phù hợp với ngôn ngữ lập trình ưa thích của mình, chẳng hạn như NodeJS, Go hoặc C# trong trường hợp All Quiet, nhưng các khái niệm tôi sẽ chia sẻ đều mang tính bất khả tri về nền tảng.
Mỗi hàng đợi bạn muốn sử dụng được thể hiện dưới dạng một bộ sưu tập chuyên dụng trong cơ sở dữ liệu MongoDB của bạn.
Dưới đây là ví dụ về tin nhắn được xử lý:
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
Chúng ta hãy xem xét từng thuộc tính của tin nhắn.
Trường _id
là thuộc tính định danh duy nhất chuẩn của MongoDB. Ở đây, nó chứa NumberLong
, không phải ObjectId . Chúng ta cần NumberLong
thay vì ObjectId
vì:
Mặc dù các giá trị ObjectId sẽ tăng theo thời gian nhưng chúng không nhất thiết phải đơn điệu. Điều này là do họ:
- Chỉ chứa một giây độ phân giải thời gian, do đó các giá trị ObjectId được tạo trong cùng một giây không có thứ tự được đảm bảo và
- Được tạo bởi các máy khách, có thể có đồng hồ hệ thống khác nhau.
Trong quá trình triển khai C#, chúng tôi tạo Id có độ chính xác đến mili giây và thứ tự được đảm bảo dựa trên thời gian chèn. Mặc dù chúng tôi không yêu cầu thứ tự xử lý nghiêm ngặt trong môi trường nhiều người tiêu dùng (tương tự như RabbitMQ), nhưng điều cần thiết là duy trì trật tự FIFO khi hoạt động chỉ với một người tiêu dùng. Đạt được điều này với ObjectId là không khả thi. Nếu điều này không quan trọng với bạn, bạn vẫn có thể sử dụng ObjectId.
Thuộc tính Trạng thái bao gồm một mảng chứa lịch sử xử lý tin nhắn. Tại chỉ mục 0, bạn sẽ tìm thấy trạng thái hiện tại, điều này rất quan trọng cho việc lập chỉ mục.
Bản thân đối tượng trạng thái chứa ba thuộc tính:
Status
: Có thể là "Đang xếp hàng", "Đang xử lý", "Đã xử lý" hoặc "Không thành công".Timestamp
: Cái này ghi lại dấu thời gian hiện tại.NextReevaluation
: Ghi lại thời điểm diễn ra lần đánh giá tiếp theo, điều này rất cần thiết cho cả lần thử lại và lần thực thi theo lịch trình trong tương lai.
Thuộc tính này chứa tải trọng cụ thể của tin nhắn của bạn.
Thêm tin nhắn là một thao tác chèn đơn giản vào bộ sưu tập với trạng thái được đặt thành "Đã xếp hàng".
NextReevaluation
thành null
.NextReevaluation
thành dấu thời gian trong tương lai khi bạn muốn xử lý thư của mình. db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
Dequeuing phức tạp hơn một chút nhưng vẫn tương đối đơn giản. Nó phụ thuộc rất nhiều vào khả năng đọc và cập nhật nguyên tử đồng thời của MongoDB.
Tính năng thiết yếu này của MongoDB đảm bảo:
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Vì vậy, chúng tôi đang đọc một tin nhắn ở trạng thái “Đã xếp hàng” và đồng thời sửa đổi nó bằng cách đặt trạng thái “Đang xử lý” ở vị trí 0. Vì thao tác này là nguyên tử nên nó sẽ đảm bảo rằng tin nhắn sẽ không được người tiêu dùng khác nhận .
Sau khi quá trình xử lý thư hoàn tất, việc cập nhật trạng thái thư thành "Đã xử lý" bằng id của thư chỉ là một vấn đề đơn giản.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Nếu xử lý không thành công, chúng ta cần đánh dấu tin nhắn cho phù hợp. Thông thường, bạn có thể muốn thử xử lý lại tin nhắn. Điều này có thể đạt được bằng cách sắp xếp lại tin nhắn. Trong nhiều trường hợp, việc xử lý lại tin nhắn sau một khoảng thời gian trễ cụ thể, chẳng hạn như 10 giây, tùy thuộc vào bản chất của lỗi xử lý.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
Chúng tôi đã thiết lập cách chúng tôi có thể dễ dàng xếp và loại bỏ các mục khỏi "hàng đợi" của mình, trên thực tế, đây chỉ đơn giản là một bộ sưu tập MongoDB. Chúng tôi thậm chí có thể "lên lịch" các tin nhắn trong tương lai bằng cách tận dụng trường NextReevaluation
.
Điều còn thiếu là chúng ta sẽ xếp hàng thường xuyên như thế nào. Người tiêu dùng cần thực thi lệnh findAndModify
trong một số loại vòng lặp. Một cách tiếp cận đơn giản là tạo ra một vòng lặp vô tận trong đó chúng ta sắp xếp hàng đợi và xử lý một thông báo. Phương pháp này rất đơn giản và hiệu quả. Tuy nhiên, nó sẽ gây áp lực đáng kể lên cơ sở dữ liệu và mạng.
Một giải pháp thay thế là đưa ra độ trễ, ví dụ: 100 mili giây, giữa các lần lặp vòng lặp. Điều này sẽ giảm đáng kể tải nhưng cũng sẽ làm giảm tốc độ xếp hàng.
Giải pháp cho vấn đề này là thứ mà MongoDB gọi là luồng thay đổi .
Luồng thay đổi là gì? Tôi không thể giải thích điều đó tốt hơn những người ở MongoDB:
Luồng thay đổi cho phép ứng dụng truy cập các thay đổi dữ liệu theo thời gian thực […]. Các ứng dụng có thể sử dụng luồng thay đổi để đăng ký tất cả các thay đổi dữ liệu trên một bộ sưu tập duy nhất […] và phản ứng ngay lập tức với chúng.
Tuyệt vời! Những gì chúng ta có thể làm là nghe các tài liệu mới được tạo trong bộ sưu tập hàng đợi của mình, điều này có nghĩa là nghe các tin nhắn mới được xếp hàng đợi một cách hiệu quả
Điều này thật đơn giản:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Tuy nhiên, cách tiếp cận luồng thay đổi không hoạt động đối với cả tin nhắn được lên lịch và tin nhắn mồ côi vì rõ ràng là không có thay đổi nào mà chúng ta có thể lắng nghe.
Đối với những trường hợp sử dụng này, chúng ta cần quay lại vòng lặp đơn giản của mình. Tuy nhiên, chúng ta có thể sử dụng độ trễ khá lớn giữa các lần lặp.
Các cơ sở dữ liệu "truyền thống", như MySQL , PostgreSQL hoặc MongoDB (mà tôi cũng xem là truyền thống), ngày nay cực kỳ mạnh mẽ. Nếu được sử dụng đúng cách (đảm bảo các chỉ mục của bạn được tối ưu hóa!), chúng sẽ hoạt động nhanh chóng, có quy mô ấn tượng và tiết kiệm chi phí trên các nền tảng lưu trữ truyền thống.
Nhiều trường hợp sử dụng có thể được giải quyết chỉ bằng cách sử dụng cơ sở dữ liệu và ngôn ngữ lập trình ưa thích của bạn. Không phải lúc nào cũng cần có "công cụ phù hợp cho công việc phù hợp", nghĩa là duy trì một bộ công cụ đa dạng như Redis, Elaticsearch, RabbitMQ, v.v. Thông thường, chi phí bảo trì là không đáng.
Mặc dù giải pháp được đề xuất có thể không phù hợp với hiệu suất của RabbitMQ, chẳng hạn, nhưng nó thường đủ và có thể mở rộng đến mức đánh dấu thành công đáng kể cho công ty khởi nghiệp của bạn.
Công nghệ phần mềm là điều hướng sự đánh đổi. Chọn của bạn một cách khôn ngoan.