Data Engineers worden vaak geconfronteerd met uitdagingen met data in een ongepast formaat, met name ongewenste tekens en data, null of lege waarden, en vooral met het omgaan met dubbele data die van invloed zijn op alle downstream-applicaties, inclusief rapportage- en data science-modellen. Dit wordt een zware dagelijkse taak voor engineers en supportteams, die hun resources snel uitputten zonder productief te zijn. Vaak hebben slecht ontworpen frameworks het later moeilijk voor ontwikkelaars om deze datafixes te verzachten. Veel organisaties hebben redundante data vanwege ineffectieve datapijplijnarchitecturen, wat hen miljoenen dollars aan opslagkosten kost, de data meerdere keren opnieuw verwerkt en slecht resourcegebruik.
Laten we ter zake komen, bent u in uw huidige rol ooit een uitdaging tegengekomen bij het verwerken van duplicaten in de streaming- of batchdatapijplijnen? De meeste data engineers, datawetenschappers en dataanalisten zouden "JA" zeggen. Om dubbele gegevens in een data lake te corrigeren, zijn er talloze tools in de huidige wereld, maar tegen welke prijs? Kunt u deze in uw architectuurontwerpfase zelf verwerken? Er kunnen veel vragen door uw hoofd spoken.
Laten we in detail bespreken welke tools u kunnen helpen bij het dedupliceren van streamingdata, hun voor- en nadelen, installatie en onderhoud. Vervolgens duiken we dieper in onze best practices en standaarden voor het verwerken van duplicaten in de streamingpijplijn.
Laten we drie primaire benaderingen voor deduplicatie in streaming-datapijplijnen bekijken:
Alle streaming-pipelines halen gegevens uit verschillende toepassingen, zoals IoT-apparaten, sensoren, spelstatistieken, verkeerscamera's en snelheidsdetectors, en slimme systemen die voertuiggebruiksgegevens streamen van autonome voertuigen. De meeste van deze systemen volgen doorgaans een patroon om gebeurtenissen te streamen en elke gebeurtenis zou normaal gesproken een unieke identificatie hebben, laten we zeggen een Retail Store-transactie-ID voor de verkooptransactie met zijn gebeurtenis-tijdstempel. Sommige systemen hebben over het algemeen geen unieke identificatie, voorbeelden zoals snelheidssensorapparaten hebben doorgaans zijn eigen Sensor-ID, maar alle streamgebeurtenissen bevatten geen unieke identificatie, behalve de gebeurtenis-tijdstempel. In deze gevallen is er een grote kans op dubbele streaminggebeurtenissen voor hetzelfde sensorapparaat.
Denk aan een use case waarbij het streamen van snelheidsgegevens van voertuigen vanaf een apparaat op een snelweg normaal gesproken in grote volumes per minuut zou variëren op een drukke dag. Een ander voorbeeld is tijdens feestdagenverkoopevenementen, waarbij detailhandelsbedrijven miljarden transacties per dag moeten verwerken. Het verwerken van een dergelijk volume aan gebeurtenissen in realtime en het dedupliceren van de gegevens is van cruciaal belang voor nauwkeurige rapportage en data science-modellen om efficiënt te werken door outliers en duplicaten te verwijderen.
Laten we het in technische termen bespreken, Google Cloud biedt Pub/Sub , een asynchrone en schaalbare berichtenservice die services die berichten produceren loskoppelt van services die die berichten verwerken. Het wordt veel gebruikt voor streaminganalyses en data-integratiepijplijnen om data te laden en te distribueren. Het wordt vaak gebruikt voor het opnemen van gebruikersinteractiegebeurtenissen, servergebeurtenissen, realtimegebeurtenissen, het repliceren van data tussen databases, fungeert als een enterprise event bus om zakelijke gebeurtenissen binnen de organisatie te delen en datastreaming van applicaties, waaronder sensoren en applicatiegebeurtenissen die worden gebruikt in combinatie met andere Google Cloud-producten via een datapijplijn.
Pub/Sub biedt een eenvoudige maar krachtige methode om dubbele gegevens te verwerken met behulp van zijn kenmerken. Elk bericht in het Pub/Sub-onderwerp kan sleutel-waardeparen in de metagegevens bevatten. Deze gegevens kunnen worden gebruikt om dubbele gebeurtenissen te identificeren en deduplicatie in de gegevenspijplijn mogelijk te maken zonder de belasting op gegevensverwerkingsservices te leggen, wat over het algemeen hogere resourcekosten met zich meebrengt en de gegevenspijplijn aanzienlijk vertraagt.
Voor berichten die een uniek veld bevatten zoals transaction_id , kan deze waarde worden ingesteld als een kenmerk bij het publiceren van berichten. Bij het lezen van berichten van Pub/Sub in Dataflow, kunt u de pijplijn configureren om te dedupliceren met behulp van dit kenmerk.
Deze oplossing is effectief wanneer de duplicaten worden gestreamd vanaf de brontoepassing of het bronapparaat met behulp van de unieke id in het Pub/Sub-onderwerp. De beperking van deze oplossing is dat het alleen goed werkt voor dubbele berichten die binnen een tijdsbestek van 10 minuten van elkaar zijn gepubliceerd. Hoewel het eenvoudig te implementeren is, mist het schaalbaarheid door de tijdsvensterbeperking in Pub/Sub. Dit is erg handig in gevallen zoals snelheidscamera's of sensorapparaten die dubbele gebeurtenissen genereren binnen een tijdsbestek van 10 minuten van elk bericht, dit werkt geweldig.
Er kunnen gevallen zijn waarin de duplicaten die binnen de publisher zelf worden gegenereerd, zoals Pub/Sub, te wijten zijn aan vertraging bij het verwerken van berichten door de downstream of Pub/Sub heeft nooit een bevestiging ontvangen voor de geleverde berichten. Pub/Sub probeert hetzelfde bericht opnieuw te verzenden met dezelfde Message_id, waardoor dubbele gebeurtenissen in de publisher worden gemaakt. Om dit aan te pakken, kunnen we met Pub/Sub de message_id van de payload bepalen en deze gebruiken als identificatie. Cloud DataFlow, een volledig beheerde service voor het verwerken van streamgegevens op het Google Cloud-platform (GCP), biedt exact één verwerking van elke record. Wat betekent dit voor ons? - Het identificeert dubbele gebeurtenissen op basis van message_id en elimineert deze bij verwerking in datapijplijnen, maar in zeldzame gevallen bereiken deze dubbele gebeurtenissen, wanneer ze werden verwerkt op de verschillende werkknooppunten binnen dataflow, de downstream niet effectief. U zult nog steeds duplicaten in uw data lake hebben.
We zullen in dit artikel tegen het einde verder bespreken hoe we met zulke gevallen om moeten gaan. Laten we ons richten op de resterende opties om de streamingdata te dedupliceren.
Nu we weten hoe Pub/Sub dubbele gebeurtenissen verwerkt, volgt de verwerking van deze berichten met Cloud DataFlow waarbij een Pub/Sub-abonnee streamingberichten van de brontoepassing leest. Dataflow is een volledig beheerde service die gebruikmaakt van open source Apache Beam SDK om geavanceerde streamingmogelijkheden mogelijk te maken. Dataflow schaalt naar 4000 werkernodes per taak en kan petabytes aan gegevens verwerken met autoscalingfuncties voor beter resourcegebruik in zowel batch- als streamingpipelines.
Apache Beam biedt een ingebouwde Deduplicate PTransform die een meer configureerbare en robuuste methode biedt voor het verwijderen van duplicaten. Deze methode gebruikt Beam's Stateful API om een status te behouden voor elke waargenomen sleutel en verwijdert duplicaten binnen een door de gebruiker gedefinieerd tijdsvenster. Met deze aanpak kunt u deduplicatielogica definiëren op basis van specifieke velden in uw gegevens of de volledige berichtinhoud, met de mogelijkheid om deduplicatie te configureren op basis van gebeurtenistijd of verwerkingstijd .
Bekijk mijn voorbeelddatapijplijncode op GitHub om deze functionaliteit uit te proberen.
Eén ding om hier op te merken is dat batch-pipelines altijd exact één keer verwerken, terwijl streaming-pipelines standaard exact één keer verwerken, maar ook geconfigureerd kunnen worden om ten minste één keer te verwerken. Het addertje onder het gras hierbij is dat wanneer een venster dat dataflow momenteel verwerkt, het venster dat verwerkt een duplicaatbericht passeert, het niet vergelijkt met wat het al verwerkt heeft, omdat dataflow de record-ID's niet in het geheugen opslaat. Dataflow kan dit bericht verwijderen op basis van laat binnenkomende data of als de datapipeline een andere poot heeft om onverwerkte berichten vast te leggen en te schrijven naar een tabel in Cloud Bigquery - een volledig beheerd, cloud-native datawarehouse op GCP of schrijf een cloudopslag - is een beheerde service voor het opslaan van ongestructureerde data, als een bestand voor verdere herverwerking en probleemoplossingsdoeleinden.
Deze oplossing biedt een flexibele optie voor het verwerken van complexe deduplicatie-login en maakt het geschikt voor scenario's waarbij het deduplicatievenster groter of complexer is dan wat Pub/Sub biedt. Nadelen zijn onder andere een hoger resourcegebruik voor het onderhouden van elke status om de uniciteit van records te bepalen
Tot nu toe hebben we gezien hoe Publisher zoals Pub/Sub en integratieservices Cloud DataFlow duplicaten in realtime verwerken. Ik denk dat deze oplossingen niet 100% effectief zijn als het gaat om windowing vanwege verwerkingsoverhead en volumeproblemen, in dergelijke scenario's, om edge cases te verwerken, waaronder wanneer een duplicaatbericht een late aankomst is en dataflow denkt dat het een uniek record is omdat het de record-ID's niet bevat om de uniekheid van het bericht te controleren en in een ander scenario verwerkt dataflow deze berichten op verschillende werkknooppunten vanwege netwerkstoringen en/of werkknooppuntstoringen waardoor het denkt dat het een uniek record is terwijl het wordt verwerkt in dataflow en in de downstream-systemen terechtkomt, zoals Google Cloud BigQuery Table.
Om dergelijke gevallen te beperken en als laatste controle voor de deduplicatie kan dit op sinkniveau gebeuren, zoals in BigQuery of andere datawarehouses. Deze aanpak is vaak handig wanneer realtime deduplicatie niet kritisch is en periodieke deduplicatie volstaat. Dit zal effectief alle dubbele berichten filteren en elimineren met behulp van geavanceerde SQL-query's.
Afhankelijk van het gebruiksscenario zijn er twee soorten oplossingen beschikbaar voor het oplossen van duplicaten.
Gebruik eerst geplande query's via een composer DAG of binnen de BigQuery-console om periodiek een deduplicatietabel te maken met behulp van partities (dagelijks of elk uur). Zo kan iedereen eenvoudig het proces maken en de deduplicatiegegevens opslaan in een stagingtabel en de afzonderlijke gegevens in de definitieve tabel laden.
Ten tweede kunnen we een gematerialiseerde weergave gebruiken om realtime gegevens te verkrijgen, wat het een ideale oplossing maakt om snel zakelijke inzichten te verkrijgen.
De BigQuery SQL-query wordt weergegeven op mijn Github dedup_sql- link.
De onderstaande bigquery sql-code legt de twee opties uit die we hebben besproken:
-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;
Elke deduplicatiestrategie heeft zijn eigen set van afwegingen. Hier is een samenvatting om u te helpen de juiste aanpak te kiezen:
Methode | Voordelen | Nadelen |
---|---|---|
Pub/Sub-berichtkenmerken | Lage latentie, eigen aan Pub/Sub | Beperkt tot 10 minuten deduplicatievenster |
Apache Beam-dedupliceren | Zeer flexibel, ondersteunt complexe deduplicatielogica | Hoger hulpbronnenverbruik door staatsbeheer |
Sink-gebaseerde deduplicatie | Geschikt voor batch- of periodieke updates, minimale logica | Kan latentie veroorzaken; orkestratiehulpmiddelen zijn mogelijk nodig |
Deduplicatie is een hoeksteen van effectieve gegevensverwerking in streaming-pipelines. De keuze van de strategie hangt af van de realtimebehoeften, complexiteit en resourcebeperkingen van uw pipeline. Door de sterke punten van Pub/Sub-attributen, Apache Beam Deduplicate PTransform of sink-based deduplicatie te benutten, kunt u schone, betrouwbare gegevens voor downstream-systemen garanderen. Verken deze benaderingen, implementeer de gegeven voorbeelden en pas ze aan op uw use case voor optimale resultaten.
Bent u geïnteresseerd in meer diepgaande handleidingen over data-analyse en machine learning? Volg mij op