Los ingenieros de datos a menudo se enfrentan a desafíos relacionados con datos en un formato inadecuado, especialmente caracteres y datos basura, valores nulos o vacíos y, lo más importante, con datos duplicados que afectan a todas las aplicaciones posteriores, incluidos los modelos de informes y ciencia de datos. Esto se convierte en una tarea diaria pesada para los ingenieros y los equipos de soporte, que agota sus recursos rápidamente sin ser productivos. A menudo, los marcos mal diseñados hacen que los desarrolladores tengan dificultades para mitigar estas correcciones de datos más adelante. Muchas organizaciones tienen datos redundantes debido a arquitecturas de canalización de datos ineficaces, lo que les cuesta millones de dólares en costos de almacenamiento, reprocesamiento de datos varias veces y mala utilización de los recursos.
Vayamos al grano: en su función actual, ¿alguna vez se ha enfrentado a un desafío al gestionar duplicados en los flujos de datos en streaming o por lotes? La mayoría de los ingenieros de datos, científicos de datos y analistas de datos dirían "SÍ". Para corregir datos duplicados en un lago de datos, existen numerosas herramientas en el mundo actual, pero ¿a qué costo? ¿Puede gestionarlas en la propia fase de diseño de la arquitectura? Es posible que se le ocurran muchas preguntas.
Analicemos en detalle cuáles son las herramientas que pueden ayudarlo a deduplicar los datos de transmisión, sus ventajas y desventajas, configuración y mantenimiento. A continuación, analizaremos en profundidad las mejores prácticas y estándares para gestionar los datos duplicados en el flujo de transmisión.
Veamos tres enfoques principales para la deduplicación en canales de transmisión de datos:
Todas las canalizaciones de transmisión extraen datos de diferentes aplicaciones, como dispositivos de IoT, sensores, estadísticas de juegos, cámaras de tráfico y dispositivos detectores de velocidad, y sistemas inteligentes que transmiten datos de uso de vehículos desde vehículos autónomos. La mayoría de estos sistemas suelen seguir un patrón para transmitir eventos y cada evento normalmente tendría un identificador único, digamos un ID de transacción de una tienda minorista para la transacción de venta con su marca de tiempo del evento. Algunos sistemas generalmente no tienen un identificador único, por ejemplo, los dispositivos con sensores de velocidad generalmente tienen su ID de sensor, pero todos los eventos de transmisión no tienen un identificador único excepto la marca de tiempo del evento. En estos casos, existe una alta posibilidad de eventos de transmisión duplicados para el mismo dispositivo sensor.
Pensemos en un caso de uso en el que la transmisión de datos sobre la velocidad de un vehículo desde un dispositivo en una autopista normalmente variaría en grandes volúmenes por minuto en un día ajetreado. Otro ejemplo es que, durante los eventos de rebajas navideñas, los comercios minoristas deben gestionar miles de millones de transacciones al día. Gestionar un volumen de este tipo de eventos en tiempo real y deduplicar los datos es fundamental para que los informes y los modelos de ciencia de datos funcionen de manera eficiente eliminando valores atípicos y duplicados.
Hablemos en términos técnicos: Google Cloud ofrece Pub/Sub , un servicio de mensajería asincrónico y escalable que desacopla los servicios que producen mensajes de los servicios que procesan esos mensajes. Se utiliza mucho para la analítica de streaming y los canales de integración de datos para cargar y distribuir datos. Se utiliza habitualmente para ingerir eventos de interacción de usuarios, eventos de servidor, eventos en tiempo real, replicar datos entre bases de datos, actúa como un bus de eventos empresariales para compartir eventos comerciales en toda la organización y para la transmisión de datos desde aplicaciones, incluidos sensores y eventos de aplicaciones utilizados junto con otros productos de Google Cloud a través de un canal de datos.
Pub/Sub ofrece un método simple pero poderoso para manejar datos duplicados mediante sus atributos. Cada mensaje en el tema de Pub/Sub puede incluir pares clave-valor en los metadatos. Estos datos se pueden usar para identificar eventos duplicados y habilitar la deduplicación en el flujo de datos sin aumentar la carga de trabajo de los servicios de procesamiento de datos, lo que generalmente tiene costos de recursos más altos y ralentiza considerablemente el flujo de datos.
En el caso de los mensajes que incluyen un campo único como transaction_id , este valor se puede configurar como un atributo al publicar mensajes. Al leer mensajes de Pub/Sub en Dataflow, puede configurar la canalización para que desduplique mediante este atributo.
Esta solución es eficaz cuando los duplicados se transmiten desde la aplicación o el dispositivo de origen mediante el identificador único dentro del tema de Pub/Sub. La limitación de esta solución es que solo funciona bien para mensajes duplicados publicados en un intervalo de 10 minutos entre sí. Si bien es fácil de implementar, carece de escalabilidad debido a la limitación de la ventana de tiempo en Pub/Sub. Esto es muy útil en casos como cámaras de velocidad o dispositivos con sensores que generan eventos duplicados en un intervalo de 10 minutos desde cada mensaje; funciona muy bien.
Puede haber casos en los que los duplicados generados dentro del propio publicador, como Pub/Sub, debido a la demora en el consumo de mensajes por parte del downstream o Pub/Sub nunca recibió un acuse de recibo de los mensajes entregados, Pub/Sub vuelve a intentar enviar el mismo mensaje utilizando el mismo Message_id, creando así eventos duplicados en el publicador. Para abordar esto, utilizando Pub/Sub, podemos determinar el message_id de la carga útil y usarlo como un identificador. Cloud DataFlow, un servicio completamente administrado para el procesamiento de datos de transmisión en la plataforma Google Cloud (GCP), proporciona exactamente un procesamiento de cada registro. ¿Qué significa para nosotros? - Identifica eventos duplicados en función de message_id y los elimina cuando se procesan en canalizaciones de datos, pero en casos excepcionales estos eventos duplicados cuando se procesaron en los diferentes nodos de trabajo dentro del flujo de datos, llegan al downstream de manera ineficaz. Aún así, terminará teniendo duplicados en su lago de datos.
Analizaremos más a fondo cómo manejar estos casos en este artículo hacia el final. Centrémonos en las opciones restantes para deduplicar los datos de transmisión.
Ahora que sabemos cómo maneja Pub/Sub los eventos duplicados, el siguiente paso es procesar estos mensajes mediante Cloud DataFlow con un suscriptor de Pub/Sub que lee los mensajes de streaming desde la aplicación de origen. Dataflow es un servicio totalmente administrado que utiliza el SDK Apache Beam de código abierto para habilitar capacidades de streaming avanzadas. Dataflow escala a 4000 nodos de trabajo por trabajo y puede procesar petabytes de datos con funciones de escalado automático para una mejor utilización de los recursos tanto en los procesos de procesamiento por lotes como en los de streaming.
Apache Beam ofrece un Deduplicate PTransform integrado que proporciona un método más configurable y sólido para eliminar duplicados. Este método utiliza la API con estado de Beam para mantener un estado para cada clave observada y elimina duplicados dentro de una ventana de tiempo definida por el usuario. Este enfoque le permite definir la lógica de deduplicación en función de campos específicos en sus datos o en todo el contenido del mensaje, con la capacidad de configurar la deduplicación en función del tiempo del evento o del tiempo de procesamiento .
Consulta mi código de canalización de datos de muestra de GitHub para probar esta funcionalidad.
Una cosa a tener en cuenta aquí es que las canalizaciones por lotes siempre utilizan exactamente un procesamiento, mientras que las canalizaciones de transmisión utilizan exactamente un procesamiento de forma predeterminada, pero se pueden configurar para utilizar también al menos un procesamiento. El truco aquí es tener en cuenta que la ventana que el flujo de datos está procesando actualmente, cuando cruza la ventana de procesamiento de un mensaje duplicado, no lo comparará con lo que ya ha procesado porque el flujo de datos no almacena los ID de registro en la memoria. El flujo de datos puede descartar este mensaje en función de los datos que llegan tarde o si la canalización de datos tiene otra etapa para capturar mensajes sin procesar y escribir en una tabla en Cloud Bigquery (un almacén de datos nativo de la nube totalmente administrado en GCP) o escribir en un almacenamiento en la nube (es un servicio administrado para almacenar datos no estructurados, como un archivo para su posterior reprocesamiento y resolución de problemas).
Esta solución ofrece una opción flexible para procesar inicios de sesión de deduplicación complejos y la hace adecuada para situaciones en las que la ventana de deduplicación es más grande o más compleja que la que ofrece Pub/Sub. Las desventajas incluyen un mayor uso de recursos para mantener cada estado a fin de determinar la singularidad de los registros.
Hasta ahora, hemos visto cómo los servicios de publicación como Pub/Sub y de integración Cloud DataFlow manejan los duplicados en tiempo real. Creo que estas soluciones no son 100 % efectivas cuando se trata de ventanas debido a la sobrecarga de procesamiento y los problemas de volumen; en tales escenarios, para manejar casos extremos, incluido cuando un mensaje duplicado llega tarde y DataFlow piensa que es un registro único porque no tiene los identificadores de registro para verificar la unicidad de los mensajes y, en otro escenario, DataFlow maneja estos mensajes en diferentes nodos de trabajo debido a fallas de red o fallas de nodos de trabajo que hacen que piense que es un registro único mientras se procesa en DataFlow y entra en los sistemas posteriores como la tabla de Google Cloud BigQuery.
Para mitigar estos casos y como comprobación final, la deduplicación puede realizarse en el nivel de receptor, como en BigQuery u otros almacenes de datos. Este enfoque suele ser útil cuando la deduplicación en tiempo real no es fundamental y la deduplicación periódica es suficiente. Esto filtrará y eliminará eficazmente todos los mensajes duplicados mediante consultas SQL avanzadas.
Según el caso de uso, hay dos tipos de soluciones disponibles para corregir duplicados.
En primer lugar, use consultas programadas a través de un DAG de Composer o dentro de la consola de BigQuery para crear una tabla de deduplicación periódicamente usando particiones (diariamente o por hora), lo que hace que sea una opción sencilla para cualquier persona crear el proceso y almacenar los datos de deduplicación en una tabla de ensayo y cargar los datos distintos en la tabla final.
En segundo lugar, podemos utilizar una vista materializada para obtener datos en tiempo real, lo que la convierte en una solución ideal para obtener información empresarial rápidamente.
La consulta SQL de Bigquery se presenta en mi enlace dedup_sql de Github .
A continuación, el código SQL de BigQuery explica dos opciones que hemos analizado:
-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;
Cada estrategia de deduplicación tiene sus propias ventajas y desventajas. A continuación, se incluye un resumen para ayudarlo a elegir el enfoque adecuado:
Método | Ventajas | Desventajas |
---|---|---|
Atributos de mensajes de Pub/Sub | Baja latencia, nativa de Pub/Sub | Limitado a una ventana de deduplicación de 10 minutos |
Desduplicación de Apache Beam | Altamente flexible, admite lógica de deduplicación compleja | Mayor consumo de recursos debido a la gestión estatal |
Desduplicación basada en receptores | Adecuado para actualizaciones periódicas o por lotes, lógica mínima | Puede introducir latencia; pueden necesitarse herramientas de orquestación |
La deduplicación es una piedra angular del procesamiento eficaz de datos en los canales de streaming. La elección de la estrategia depende de las necesidades en tiempo real, la complejidad y las limitaciones de recursos de su canal. Al aprovechar las ventajas de los atributos Pub/Sub, Apache Beam Deduplicate PTransform o la deduplicación basada en receptores, puede garantizar datos limpios y confiables para los sistemas posteriores. Explore estos enfoques, implemente los ejemplos proporcionados y adáptelos a su caso de uso para obtener resultados óptimos.
¿Te interesan guías más detalladas sobre análisis de datos y aprendizaje automático? Sígueme en