Ei👋
Sou Mads Quist, fundador da All Quiet . Implementamos uma fila de mensagens desenvolvida internamente com base no MongoDB e estou aqui para falar sobre:
All Quiet é uma plataforma moderna de gerenciamento de incidentes, semelhante ao PagerDuty.
Nossa plataforma requer recursos como:
Para entender nossos requisitos específicos, é importante obter alguns insights sobre nossa pilha de tecnologia:
Em última análise, trata-se de minimizar o número de peças móveis na sua infraestrutura. Nosso objetivo é criar recursos fantásticos para nossos excelentes clientes e é fundamental manter nossos serviços confiáveis. Gerenciar um único sistema de banco de dados para atingir mais de cinco noves de tempo de atividade já é bastante desafiador. Então, por que se sobrecarregar com o gerenciamento de um cluster HA RabbitMQ adicional?
Sim… soluções em nuvem como AWS SQS, Google Cloud Tasks ou Azure Queue Storage são fantásticas! No entanto, eles teriam resultado em dependência do fornecedor. Nós simplesmente aspiramos ser independentes e econômicos, ao mesmo tempo em que fornecemos um serviço escalonável aos nossos clientes.
Uma fila de mensagens é um sistema que armazena mensagens. Os produtores de mensagens as armazenam na fila, que posteriormente são retiradas da fila pelos consumidores para processamento. Isso é extremamente benéfico para desacoplar componentes, especialmente quando o processamento de mensagens é uma tarefa que consome muitos recursos.
O MongoDB evoluiu significativamente ao longo dos anos e pode atender aos critérios listados acima.
Nas seções a seguir, orientarei você pela implementação específica do MongoDB de nossa fila de mensagens. Embora você precise de uma biblioteca cliente adequada para sua linguagem de programação preferida, como NodeJS, Go ou C# no caso de All Quiet, os conceitos que compartilharei são independentes de plataforma.
Cada fila que você deseja utilizar é representada como uma coleção dedicada em seu banco de dados MongoDB.
Aqui está um exemplo de uma mensagem processada:
{ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }
Vejamos cada propriedade da mensagem.
O campo _id
é a propriedade do identificador exclusivo canônico do MongoDB. Aqui, ele contém um NumberLong
, não um ObjectId . Precisamos de NumberLong
em vez de ObjectId
porque:
Embora os valores de ObjectId devam aumentar com o tempo, eles não são necessariamente monotônicos. Isso ocorre porque eles:
- Contém apenas um segundo de resolução temporal, portanto, os valores ObjectId criados no mesmo segundo não têm uma ordem garantida e
- São gerados por clientes, que podem ter relógios de sistema diferentes.
Em nossa implementação C#, geramos um Id com precisão de milissegundos e ordem garantida com base no tempo de inserção. Embora não exijamos uma ordem de processamento rigorosa em um ambiente multiconsumidor (semelhante ao RabbitMQ), é essencial manter a ordem FIFO ao operar com apenas um consumidor. Conseguir isso com ObjectId não é viável. Se isso não for crucial para você, você ainda pode usar ObjectId.
A propriedade Statuses consiste em um array contendo o histórico de processamento de mensagens. No índice 0, você encontrará o status atual, que é crucial para a indexação.
O próprio objeto de status contém três propriedades:
Status
: pode ser "Enfileirado", "Processando", "Processado" ou "Falha".Timestamp
: captura o carimbo de data/hora atual.NextReevaluation
: registra quando a próxima avaliação deve ocorrer, o que é essencial tanto para novas tentativas quanto para futuras execuções programadas.
Esta propriedade contém a carga específica da sua mensagem.
Adicionar uma mensagem é uma operação de inserção simples na coleção com o status definido como "Enfileirado".
NextReevaluation
como null
.NextReevaluation
para um carimbo de data/hora no futuro, quando desejar que sua mensagem seja processada. db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });
A retirada da fila é um pouco mais complexa, mas ainda relativamente simples. Ele depende fortemente dos recursos simultâneos de leitura e atualização atômica do MongoDB.
Este recurso essencial do MongoDB garante:
db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Então estamos lendo uma mensagem que está no estado “Enfileirada” e ao mesmo tempo a modificamos definindo o status “Processando” na posição 0. Como esta operação é atômica garantirá que a mensagem não será captada por outro consumidor .
Assim que o processamento da mensagem for concluído, basta atualizar o status da mensagem para “Processado” usando o ID da mensagem.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
Se o processamento falhar, precisamos marcar a mensagem de acordo. Muitas vezes, você pode tentar processar a mensagem novamente. Isso pode ser conseguido enfileirando novamente a mensagem. Em muitos cenários, faz sentido reprocessar a mensagem após um atraso específico, como 10 segundos, dependendo da natureza da falha no processamento.
db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });
Estabelecemos como podemos facilmente enfileirar e desenfileirar itens de nossa “fila”, que é, na verdade, simplesmente uma coleção do MongoDB. Podemos até “agendar” mensagens para o futuro aproveitando o campo NextReevaluation
.
O que falta é como retiraremos da fila regularmente. Os consumidores precisam executar o comando findAndModify
em algum tipo de loop. Uma abordagem direta seria criar um loop infinito no qual retiramos da fila e processamos uma mensagem. Este método é simples e eficaz. No entanto, exercerá uma pressão considerável sobre o banco de dados e a rede.
Uma alternativa seria introduzir um atraso, por exemplo, 100ms, entre as iterações do loop. Isso reduzirá significativamente a carga, mas também diminuirá a velocidade de desenfileiramento.
A solução para o problema é o que o MongoDB chama de fluxo de mudança .
O que são fluxos de mudança? Não consigo explicar melhor do que o pessoal do MongoDB:
Os fluxos de mudança permitem que os aplicativos acessem alterações de dados em tempo real [...]. Os aplicativos podem usar fluxos de mudança para assinar todas as alterações de dados em uma única coleção [...] e reagir imediatamente a elas.
Ótimo! O que podemos fazer é ouvir os documentos recém-criados em nossa coleção de filas, o que significa efetivamente ouvir as mensagens recém-enfileiradas
Isso é muito simples:
const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });
A abordagem do fluxo de mudança, entretanto, não funciona tanto para mensagens programadas quanto para mensagens órfãs porque obviamente não há nenhuma mudança que possamos ouvir.
Para esses casos de uso, precisamos reverter ao nosso loop simples. No entanto, podemos usar um atraso bastante generoso entre as iterações.
Bancos de dados "tradicionais", como MySQL , PostgreSQL ou MongoDB (que também considero tradicionais), são incrivelmente poderosos hoje. Se usados corretamente (certifique-se de que seus índices estejam otimizados!), Eles são rápidos, têm escala impressionante e são econômicos em plataformas de hospedagem tradicionais.
Muitos casos de uso podem ser resolvidos usando apenas um banco de dados e sua linguagem de programação preferida. Nem sempre é necessário ter a “ferramenta certa para o trabalho certo”, o que significa manter um conjunto diversificado de ferramentas como Redis, Elasticsearch, RabbitMQ, etc. Muitas vezes, a sobrecarga de manutenção não vale a pena.
Embora a solução proposta possa não corresponder ao desempenho, por exemplo, do RabbitMQ, geralmente é suficiente e pode ser dimensionada a um ponto que marcaria um sucesso significativo para sua startup.
A engenharia de software trata de navegar pelas compensações. Escolha o seu com sabedoria.