paint-brush
Por que criei uma fila de mensagens baseada no MongoDBpor@allquiet
11,470 leituras
11,470 leituras

Por que criei uma fila de mensagens baseada no MongoDB

por All Quiet12m2023/08/27
Read on Terminal Reader

Muito longo; Para ler

Você pode criar uma fila de mensagens de alta disponibilidade e desempenho com o MongoDB porque ele oferece operações atômicas de leitura/atualização simultâneas, bem como fluxos de mudança.
featured image - Por que criei uma fila de mensagens baseada no MongoDB
All Quiet HackerNoon profile picture
0-item
1-item
2-item


Ei👋


Sou Mads Quist, fundador da All Quiet . Implementamos uma fila de mensagens desenvolvida internamente com base no MongoDB e estou aqui para falar sobre:

  1. Por que reinventamos a roda
  2. Como reinventamos a roda


1. Por que reinventamos a roda

Por que precisamos de enfileiramento de mensagens?

All Quiet é uma plataforma moderna de gerenciamento de incidentes, semelhante ao PagerDuty.


Nossa plataforma requer recursos como:


  • Envio de um e-mail de aceitação dupla de forma assíncrona após o registro do usuário
  • Envio de um e-mail de lembrete 24 horas após o registro
  • Envio de notificações push com Firebase Cloud Messaging (FCM), que pode falhar devido a problemas de rede ou carregamento. Como as notificações push são cruciais para nosso aplicativo, precisamos tentar enviá-las novamente se houver algum problema.
  • Aceitar e-mails de fora da nossa integração e processá-los em incidentes. Esse processo pode falhar, por isso queríamos dissociá-lo e processar cada carga útil de e-mail em uma fila.




Nossa pilha de tecnologia

Para entender nossos requisitos específicos, é importante obter alguns insights sobre nossa pilha de tecnologia:


  • Executamos um aplicativo da web monolítico baseado no .NET Core 7.
  • O aplicativo .NET Core é executado em um contêiner Docker.
  • Executamos vários contêineres em paralelo.
  • Uma instância HAProxy distribui solicitações HTTP igualmente para cada contêiner, garantindo uma configuração altamente disponível.
  • Usamos MongoDB como nosso banco de dados subjacente, replicado em zonas de disponibilidade.
  • Todos os componentes acima são hospedados pela AWS em VMs EC2 genéricas.

Por que reinventamos a roda

  • Queríamos um mecanismo de enfileiramento simples que pudesse ser executado em vários processos simultaneamente e, ao mesmo tempo, garantir que cada mensagem fosse processada apenas uma vez.
  • Não precisávamos de um padrão pub/sub.
  • Não pretendíamos um sistema distribuído complexo baseado em CQRS/fonte de eventos porque, você sabe, a primeira regra dos sistemas distribuídos é não distribuir .
  • Queríamos manter as coisas o mais simples possível, seguindo a filosofia de escolher “ tecnologia chata ”.


Em última análise, trata-se de minimizar o número de peças móveis na sua infraestrutura. Nosso objetivo é criar recursos fantásticos para nossos excelentes clientes e é fundamental manter nossos serviços confiáveis. Gerenciar um único sistema de banco de dados para atingir mais de cinco noves de tempo de atividade já é bastante desafiador. Então, por que se sobrecarregar com o gerenciamento de um cluster HA RabbitMQ adicional?


Por que não usar apenas o AWS SQS?

Sim… soluções em nuvem como AWS SQS, Google Cloud Tasks ou Azure Queue Storage são fantásticas! No entanto, eles teriam resultado em dependência do fornecedor. Nós simplesmente aspiramos ser independentes e econômicos, ao mesmo tempo em que fornecemos um serviço escalonável aos nossos clientes.



2. Como reinventamos a roda

O que é uma fila de mensagens?

Uma fila de mensagens é um sistema que armazena mensagens. Os produtores de mensagens as armazenam na fila, que posteriormente são retiradas da fila pelos consumidores para processamento. Isso é extremamente benéfico para desacoplar componentes, especialmente quando o processamento de mensagens é uma tarefa que consome muitos recursos.


Quais características nossa fila deve apresentar?

  • Utilizando MongoDB como nosso armazenamento de dados
  • Garantindo que cada mensagem seja consumida apenas uma vez
  • Permitir que vários consumidores processem mensagens simultaneamente
  • Garantir que, se o processamento da mensagem falhar, novas tentativas serão possíveis
  • Habilitando o agendamento do consumo de mensagens para o futuro
  • Não precisando de pedido garantido
  • Garantindo alta disponibilidade
  • Garantir que as mensagens e seus estados sejam duráveis e possam suportar reinicializações ou períodos de inatividade prolongados


O MongoDB evoluiu significativamente ao longo dos anos e pode atender aos critérios listados acima.


Implementação

Nas seções a seguir, orientarei você pela implementação específica do MongoDB de nossa fila de mensagens. Embora você precise de uma biblioteca cliente adequada para sua linguagem de programação preferida, como NodeJS, Go ou C# no caso de All Quiet, os conceitos que compartilharei são independentes de plataforma.


Filas

Cada fila que você deseja utilizar é representada como uma coleção dedicada em seu banco de dados MongoDB.

Modelo de mensagem

Aqui está um exemplo de uma mensagem processada:

 { "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Processed", "Timestamp" : ISODate("2023-08-06T06:50:23.753+0000"), "NextReevaluation" : null }, { "Status" : "Processing", "Timestamp" : ISODate("2023-08-06T06:50:23.572+0000"), "NextReevaluation" : null }, { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } }


Vejamos cada propriedade da mensagem.


_eu ia

O campo _id é a propriedade do identificador exclusivo canônico do MongoDB. Aqui, ele contém um NumberLong , não um ObjectId . Precisamos de NumberLong em vez de ObjectId porque:



Embora os valores de ObjectId devam aumentar com o tempo, eles não são necessariamente monotônicos. Isso ocorre porque eles:

  • Contém apenas um segundo de resolução temporal, portanto, os valores ObjectId criados no mesmo segundo não têm uma ordem garantida e
  • São gerados por clientes, que podem ter relógios de sistema diferentes.


Em nossa implementação C#, geramos um Id com precisão de milissegundos e ordem garantida com base no tempo de inserção. Embora não exijamos uma ordem de processamento rigorosa em um ambiente multiconsumidor (semelhante ao RabbitMQ), é essencial manter a ordem FIFO ao operar com apenas um consumidor. Conseguir isso com ObjectId não é viável. Se isso não for crucial para você, você ainda pode usar ObjectId.


Status

A propriedade Statuses consiste em um array contendo o histórico de processamento de mensagens. No índice 0, você encontrará o status atual, que é crucial para a indexação.


O próprio objeto de status contém três propriedades:

  • Status : pode ser "Enfileirado", "Processando", "Processado" ou "Falha".
  • Timestamp : captura o carimbo de data/hora atual.
  • NextReevaluation : registra quando a próxima avaliação deve ocorrer, o que é essencial tanto para novas tentativas quanto para futuras execuções programadas.


Carga útil

Esta propriedade contém a carga específica da sua mensagem.


Enfileirando uma mensagem

Adicionar uma mensagem é uma operação de inserção simples na coleção com o status definido como "Enfileirado".

  • Para processamento imediato, defina NextReevaluation como null .
  • Para processamento futuro, defina NextReevaluation para um carimbo de data/hora no futuro, quando desejar que sua mensagem seja processada.
 db.yourQueueCollection.insert({ "_id" : NumberLong(638269014234217933), "Statuses" : [ { "Status" : "Enqueued", "Timestamp" : ISODate("2023-08-06T06:50:23.421+0000"), "NextReevaluation" : null } ], "Payload" : { "YourData" : "abc123" } });


Desenfileirando uma mensagem

A retirada da fila é um pouco mais complexa, mas ainda relativamente simples. Ele depende fortemente dos recursos simultâneos de leitura e atualização atômica do MongoDB.


Este recurso essencial do MongoDB garante:

  • Cada mensagem é processada apenas uma vez.
  • Vários consumidores podem processar mensagens simultaneamente com segurança.


 db.yourQueueCollection.findAndModify({ "query": { "$and": [ { "Statuses.0.Status": "Enqueued" }, { "Statuses.0.NextReevaluation": null } ] }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:23.800+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Então estamos lendo uma mensagem que está no estado “Enfileirada” e ao mesmo tempo a modificamos definindo o status “Processando” na posição 0. Como esta operação é atômica garantirá que a mensagem não será captada por outro consumidor .


Marcando uma mensagem como processada

Assim que o processamento da mensagem for concluído, basta atualizar o status da mensagem para “Processado” usando o ID da mensagem.

 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });


Marcando uma mensagem como falhada

Se o processamento falhar, precisamos marcar a mensagem de acordo. Muitas vezes, você pode tentar processar a mensagem novamente. Isso pode ser conseguido enfileirando novamente a mensagem. Em muitos cenários, faz sentido reprocessar a mensagem após um atraso específico, como 10 segundos, dependendo da natureza da falha no processamento.


 db.yourQueueCollection.findAndModify({ "query": { "_id": NumberLong(638269014234217933) }, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Failed", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": ISODate("2023-08-06T07:00:24.100+0000") } ], "$position": 0 } } } });


O loop de desenfileiramento

Estabelecemos como podemos facilmente enfileirar e desenfileirar itens de nossa “fila”, que é, na verdade, simplesmente uma coleção do MongoDB. Podemos até “agendar” mensagens para o futuro aproveitando o campo NextReevaluation .


O que falta é como retiraremos da fila regularmente. Os consumidores precisam executar o comando findAndModify em algum tipo de loop. Uma abordagem direta seria criar um loop infinito no qual retiramos da fila e processamos uma mensagem. Este método é simples e eficaz. No entanto, exercerá uma pressão considerável sobre o banco de dados e a rede.


Uma alternativa seria introduzir um atraso, por exemplo, 100ms, entre as iterações do loop. Isso reduzirá significativamente a carga, mas também diminuirá a velocidade de desenfileiramento.


A solução para o problema é o que o MongoDB chama de fluxo de mudança .


Fluxos de mudança do MongoDB

O que são fluxos de mudança? Não consigo explicar melhor do que o pessoal do MongoDB:


Os fluxos de mudança permitem que os aplicativos acessem alterações de dados em tempo real [...]. Os aplicativos podem usar fluxos de mudança para assinar todas as alterações de dados em uma única coleção [...] e reagir imediatamente a elas.


Ótimo! O que podemos fazer é ouvir os documentos recém-criados em nossa coleção de filas, o que significa efetivamente ouvir as mensagens recém-enfileiradas


Isso é muito simples:

 const changeStream = db.yourQueueCollection.watch(); changeStream.on('insert', changeEvent => { // Dequeue the message db.yourQueueCollection.findAndModify({ "query": changeEvent.documentKey._id, "update": { "$push": { "Statuses": { "$each": [ { "Status": "Processing", "Timestamp": ISODate("2023-08-06T06:50:24.100+0000"), "NextReevaluation": null } ], "$position": 0 } } } });



Mensagens agendadas e órfãs

A abordagem do fluxo de mudança, entretanto, não funciona tanto para mensagens programadas quanto para mensagens órfãs porque obviamente não há nenhuma mudança que possamos ouvir.


  • As mensagens agendadas simplesmente ficam na coleção com o status "Enfileirado" e um campo "NextReeavaliação" definido para o futuro.
  • Mensagens órfãs são aquelas que estavam no status "Processando" quando o processo do consumidor morreu. Eles permanecem na coleção com o status “Processando”, mas nenhum consumidor jamais alterará seu status para “Processado” ou “Falha”.


Para esses casos de uso, precisamos reverter ao nosso loop simples. No entanto, podemos usar um atraso bastante generoso entre as iterações.


Embrulhando-o

Bancos de dados "tradicionais", como MySQL , PostgreSQL ou MongoDB (que também considero tradicionais), são incrivelmente poderosos hoje. Se usados corretamente (certifique-se de que seus índices estejam otimizados!), Eles são rápidos, têm escala impressionante e são econômicos em plataformas de hospedagem tradicionais.


Muitos casos de uso podem ser resolvidos usando apenas um banco de dados e sua linguagem de programação preferida. Nem sempre é necessário ter a “ferramenta certa para o trabalho certo”, o que significa manter um conjunto diversificado de ferramentas como Redis, Elasticsearch, RabbitMQ, etc. Muitas vezes, a sobrecarga de manutenção não vale a pena.


Embora a solução proposta possa não corresponder ao desempenho, por exemplo, do RabbitMQ, geralmente é suficiente e pode ser dimensionada a um ponto que marcaria um sucesso significativo para sua startup.


A engenharia de software trata de navegar pelas compensações. Escolha o seu com sabedoria.