paint-brush
Seu pipeline de dados é uma bagunça — veja como impedir que dados duplicados desperdicem milhõespor@emailnareshe
334 leituras
334 leituras

Seu pipeline de dados é uma bagunça — veja como impedir que dados duplicados desperdicem milhões

por Naresh Erukulla10m2025/01/28
Read on Terminal Reader

Muito longo; Para ler

No processamento de dados em tempo real, registros duplicados podem levar a insights imprecisos, custos computacionais desnecessários e ineficiências em sistemas downstream. Isso torna a desduplicação um componente crítico de pipelines de dados de streaming. Implementar estratégias eficazes é essencial para manter dados limpos e confiáveis.
featured image - Seu pipeline de dados é uma bagunça — veja como impedir que dados duplicados desperdicem milhões
Naresh Erukulla HackerNoon profile picture
0-item


Engenheiros de Dados frequentemente enfrentam desafios com dados em formato inapropriado, especialmente caracteres e dados inúteis, valores nulos ou vazios e, mais importante, lidando com dados duplicados que impactam todos os aplicativos downstream, incluindo relatórios e modelos de ciência de dados. Isso se torna uma tarefa diária pesada para engenheiros e equipes de suporte, drenando seus recursos rapidamente sem ser produtivo. Frequentemente, estruturas mal projetadas têm momentos difíceis para os desenvolvedores posteriormente mitigarem essas correções de dados. Muitas organizações têm dados redundantes devido a arquiteturas de pipeline de dados ineficazes, custando-lhes milhões de dólares em custos de armazenamento, reprocessamento de dados várias vezes e má utilização de recursos.


Vamos direto ao ponto, em sua função atual, você já enfrentou um desafio ao lidar com duplicatas nos pipelines de dados de streaming ou em lote? A maioria dos engenheiros de dados, cientistas de dados e analistas de dados diria "SIM". Para corrigir dados duplicados em um data lake, existem inúmeras ferramentas no mundo atual, mas a que custo? Você consegue lidar com isso na sua própria fase de design de arquitetura? Pode haver muitas perguntas passando pela sua cabeça.


Vamos discutir em detalhes quais são as ferramentas que podem ajudar você a desduplicar os dados de streaming, seus prós e contras, configuração e manutenção. Em seguida, vamos nos aprofundar em nossas melhores práticas e padrões para lidar com duplicatas no pipeline de streaming.




Vamos verificar três abordagens principais para desduplicação em pipelines de dados de streaming:

Desduplicação usando atributos de mensagem Pub/Sub

Todos os pipelines de streaming extraem dados de diferentes aplicativos, como dispositivos IoT, sensores, estatísticas de jogos, câmeras de tráfego e dispositivos detectores de velocidade, e sistemas inteligentes que transmitem dados de uso de veículos de veículos autônomos. A maioria desses sistemas geralmente segue um padrão para transmitir eventos e cada evento normalmente teria um identificador exclusivo, digamos, um ID de transação de loja de varejo para a transação de venda com seu registro de data e hora do evento. Alguns sistemas geralmente não têm um identificador exclusivo, exemplos como dispositivos sensores de velocidade geralmente têm seu ID de sensor, mas todos os eventos de streaming não possuem um identificador exclusivo, exceto o registro de data e hora do evento. Nesses casos, há uma grande possibilidade de eventos de streaming duplicados para o mesmo dispositivo sensor.


Pense em um caso de uso em que o streaming de dados de velocidade de veículos de um dispositivo em uma rodovia interestadual normalmente variaria em grandes volumes por minuto em um dia movimentado. Outro exemplo é durante eventos de liquidação de feriados, empresas de varejo devem lidar com bilhões de transações por dia. Lidar com tal volume de eventos em tempo real e desduplicar os dados é muito crucial para relatórios precisos e modelos de ciência de dados para operar de forma eficiente, removendo outliers e duplicatas.


Vamos falar em termos técnicos, o Google Cloud fornece o Pub/Sub , que é um serviço de mensagens assíncrono e escalável que desacopla serviços que produzem mensagens de serviços que processam essas mensagens. Ele é muito usado para streaming de análises e pipelines de integração de dados para carregar e distribuir dados. Ele é comumente usado para ingerir eventos de interação do usuário, eventos do servidor, eventos em tempo real, replicar dados entre bancos de dados, atua como um barramento de eventos corporativos para compartilhar eventos de negócios em toda a organização e streaming de dados de aplicativos, incluindo sensores e eventos de aplicativos usados em conjunto com outros produtos do Google Cloud por meio de um pipeline de dados.


O Pub/Sub oferece um método simples, mas poderoso, para lidar com dados duplicados usando seus atributos. Cada mensagem no tópico Pub/Sub pode incluir pares de chave-valor nos metadados. Esses dados podem ser usados para identificar eventos duplicados e habilitar a desduplicação no pipeline de dados sem empurrar a carga para serviços de processamento de dados, o que geralmente tem custos de recursos mais altos e torna o pipeline de dados muito mais lento.


Para mensagens que incluem um campo exclusivo como transaction_id , esse valor pode ser definido como um atributo ao publicar mensagens. Ao ler mensagens do Pub/Sub no Dataflow, você pode configurar o pipeline para desduplicar usando esse atributo.


Esta solução é eficaz quando as duplicatas estão sendo transmitidas do aplicativo ou dispositivo de origem usando o identificador exclusivo dentro do tópico Pub/Sub. A limitação para esta solução é que ela só funciona bem para mensagens duplicadas publicadas dentro de uma janela de 10 minutos uma da outra. Embora seja simples de implementar, mas não tem escalabilidade pela limitação da janela de tempo no Pub/Sub. Isso é muito útil em instâncias como, dispositivos de radares ou sensores gerando eventos duplicados dentro de uma janela de 10 minutos de cada mensagem, isso funciona muito bem.


Pode haver casos em que as duplicatas geradas dentro do próprio publicador, como Pub/Sub, devido ao atraso no consumo de mensagens pelo downstream ou Pub/Sub nunca recebeu uma confirmação para as mensagens entregues, Pub/Sub tenta enviar novamente a mesma mensagem usando o mesmo Message_id, criando assim eventos duplicados no publicador. Para resolver isso, usando Pub/Sub, podemos determinar o message_id do payload e usá-lo como um identificador. O Cloud DataFlow, um serviço totalmente gerenciado para processamento de dados de fluxo na plataforma Google Cloud (GCP), fornece exatamente um processamento de cada registro. O que isso significa para nós? - Ele identifica eventos duplicados com base no message_id e os elimina ao processar em pipelines de dados, mas em casos raros, esses eventos duplicados, quando processados nos diferentes nós de trabalho dentro do fluxo de dados, chegam ao downstream de forma ineficaz. Você ainda acabará tendo duplicatas em seu data lake.


Discutiremos mais sobre como lidar com esses casos neste artigo, mais para o final. Vamos nos concentrar nas opções restantes para desduplicar os dados de streaming.



Desduplicação usando o Deduplicate PTransform do Apache Beam


Agora que sabemos como o Pub/Sub lida com eventos duplicados, o próximo passo é Processamento dessas mensagens usando o Cloud DataFlow com um assinante do Pub/Sub que lê mensagens de streaming do aplicativo de origem. O Dataflow é um serviço totalmente gerenciado que usa o Apache Beam SDK de código aberto para habilitar recursos avançados de streaming. O Dataflow escala para 4000 nós de trabalho por trabalho e pode processar petabytes de dados com recursos de dimensionamento automático para melhor utilização de recursos em pipelines de lote e streaming.


O Apache Beam oferece um Deduplicate PTransform integrado que fornece um método mais configurável e robusto para remover duplicatas. Este método usa a Stateful API do Beam para manter um estado para cada chave observada e remover duplicatas dentro de uma janela de tempo definida pelo usuário. Esta abordagem permite que você defina a lógica de desduplicação com base em campos específicos em seus dados ou em todo o conteúdo da mensagem, com a capacidade de configurar a desduplicação com base no tempo do evento ou no tempo de processamento .


Confira meu código de exemplo de pipeline de dados no GitHub para testar essa funcionalidade.


Uma coisa a ser observada aqui é que os pipelines em lote sempre usam exatamente um processamento, enquanto os pipelines de streaming usam exatamente um processamento por padrão, mas podem ser configurados para usar pelo menos um processamento também. O problema aqui é observar que a janela que o fluxo de dados está processando no momento, quando cruza a janela de processamento, uma mensagem duplicada não a compararia com o que já foi processado porque o fluxo de dados não armazena os IDs de registro na memória. O fluxo de dados pode descartar essa mensagem com base em dados que chegam atrasados ou se o pipeline de dados tiver outra perna para capturar mensagens não processadas e gravar em uma tabela no Cloud Bigquery - um data warehouse nativo da nuvem totalmente gerenciado no GCP ou gravar um armazenamento em nuvem - é um serviço gerenciado para armazenar dados não estruturados, como um arquivo para fins de reprocessamento e solução de problemas.



Esta solução fornece uma opção flexível para processar login de desduplicação complexa e torná-la adequada para cenários em que a janela de desduplicação é maior ou mais complexa do que o que o Pub/Sub oferece. As compensações incluem maior uso de recursos para manter cada estado para determinar a exclusividade do registro



Desduplicação no coletor


Até agora, vimos como o Publisher como o Pub/Sub e os serviços de integração Cloud DataFlow lidam com duplicatas em tempo real. Acho que essas soluções não são 100% eficazes quando se trata de janelas devido a problemas de sobrecarga e volume de processamento, em tais cenários, para lidar com casos extremos, incluindo onde uma mensagem duplicada chega tarde e o fluxo de dados pensa que é um registro exclusivo porque não mantém os IDs de registro para verificar a exclusividade das mensagens e em outro cenário, o fluxo de dados lida com essas mensagens em diferentes nós de trabalho devido a falhas de rede e/ou falhas de nó de trabalho faz com que ele pense que é um registro exclusivo enquanto está processando no fluxo de dados e entra nos sistemas downstream como a tabela do Google Cloud BigQuery.


Para mitigar tais instâncias e como uma verificação final para a desduplicação pode ocorrer no nível do coletor, como no BigQuery ou outros data warehouses. Essa abordagem geralmente é útil quando a desduplicação em tempo real não é crítica, e a desduplicação periódica é suficiente. Isso filtrará e eliminará efetivamente todas as mensagens duplicadas usando consultas SQL avançadas.


Com base no caso de uso, há dois tipos de soluções disponíveis para corrigir duplicatas.


Primeiro, use consultas agendadas por meio de um DAG do Composer ou no console do BigQuery para criar uma tabela de dedup periodicamente usando partições (diárias ou horárias), tornando uma escolha simples para qualquer pessoa criar o processo e armazenar os dados de dedup em uma tabela de preparação e carregar os dados distintos na tabela final.


Em segundo lugar, podemos usar uma visualização materializada para obter dados em tempo real, o que a torna uma solução ideal para obter insights de negócios rapidamente.



A consulta SQL do BigQuery é apresentada no meu link dedup_sql do Github .


Abaixo, o código SQL do BigQuery explica duas opções que discutimos:

 -- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;

Compensações técnicas

Cada estratégia de deduplicação vem com seu próprio conjunto de compensações. Aqui está um resumo para ajudar você a escolher a abordagem certa:

Método

Vantagens

Desvantagens

Atributos de mensagem Pub/Sub

Baixa latência, nativo do Pub/Sub

Limitado a uma janela de desduplicação de 10 minutos

Apache Beam Deduplicar

Altamente flexível, suporta lógica de desduplicação complexa

Maior consumo de recursos devido à gestão do estado

Desduplicação baseada em coletor

Adequado para atualizações em lote ou periódicas, lógica mínima

Pode introduzir latência; ferramentas de orquestração podem ser necessárias

Em poucas palavras

A desduplicação é uma pedra angular do processamento de dados eficaz em pipelines de streaming. A escolha da estratégia depende das necessidades em tempo real, da complexidade e das restrições de recursos do seu pipeline. Ao aproveitar os pontos fortes dos atributos Pub/Sub, Apache Beam Deduplicate PTransform ou da desduplicação baseada em sink, você pode garantir dados limpos e confiáveis para sistemas downstream. Explore essas abordagens, implemente os exemplos fornecidos e adapte-os ao seu caso de uso para obter resultados ideais.


Você está interessado em guias mais aprofundados sobre análise de dados e aprendizado de máquina? Siga-me em Médio ou LinkedIn para os artigos mais recentes, e sinta-se à vontade para compartilhar suas ideias ou perguntas nos comentários abaixo. Se você achou este artigo útil, compartilhe-o com sua rede e ajude outros a desbloquear o potencial da análise em tempo real no varejo.