Инженерите по данни често се сблъскват с предизвикателства, когато данните са в неподходящ формат, особено нежелани знаци и данни, нулеви или празни стойности, и най-важното - справят се с дублиращи се данни, което засяга всички приложения надолу по веригата, включително модели за отчитане и наука за данни. Това се превръща в тежка ежедневна задача за инженерите и екипите за поддръжка, изтощавайки ресурсите им бързо, без да е продуктивна. Често лошо проектираните рамки имат трудни моменти за разработчиците по-късно да смекчат тези корекции на данни. Много организации имат излишни данни поради неефективни архитектури на тръбопроводи за данни, което им струва милиони долари разходи за съхранение, повторна обработка на данните няколко пъти и лошо използване на ресурсите.
Нека да стигнем до точката, в сегашната си роля изправяли ли сте се някога пред предизвикателство при обработката на дубликати в тръбопроводите за стрийминг или пакетни данни? Повечето инженери по данни, специалисти по данни и анализатори на данни биха казали „ДА“. За коригиране на дублирани данни в езеро от данни има много инструменти в настоящия свят, но на каква цена? Можете ли да се справите с тях в самата фаза на вашия архитектурен дизайн? Може да има много въпроси, които се въртят в ума ви.
Нека обсъдим подробно кои са инструментите, които биха могли да ви помогнат да дедупликирате поточно предаваните данни, техните плюсове и минуси, настройка и поддръжка. След това ще се потопим задълбочено в най-добрите практики и стандарти за обработка на дубликати в тръбопровода за поточно предаване.
Нека проверим три основни подхода за дедупликация в тръбопроводи за поточно предаване на данни:
Всички тръбопроводи за стрийминг извличат данни от различни приложения като IoT устройства, сензори, статистика на игрите, камери за трафик и устройства за откриване на скорост и интелигентни системи, които предават поточно данни за използването на превозни средства от автономни превозни средства. Повечето от тези системи обикновено следват модел за поточно предаване на събития и всяко събитие обикновено има уникален идентификатор, да речем ИД на транзакция в магазина за продажба на дребно за транзакцията за продажба с клеймото за време на събитието. Някои системи обикновено нямат уникален идентификатор, примери като устройства със сензори за скорост обикновено имат неговия идентификатор на сензора, но всички събития в потока не притежават уникален идентификатор освен клеймото за време на събитието. В тези случаи има голяма вероятност от дублиране на поточни събития за едно и също сензорно устройство.
Помислете за случай на употреба, при който поточно предаване на данни за скоростта на превозното средство от устройство на междущатска улица обикновено варира в големи обеми на минута в натоварен ден. Друг пример е, че по време на празнични разпродажби бизнесът на дребно трябва да се справя с милиарди транзакции на ден. Боравенето с такъв обем от събития в реално време и премахването на дублиране на данните е много важно за точното отчитане и моделите за наука на данните, за да работят ефективно чрез премахване на отклонения и дубликати.
Нека поговорим с технически термини, google cloud предоставя Pub/Sub , което е асинхронна и мащабируема услуга за съобщения, която разделя услугите, генериращи съобщения, от услугите, обработващи тези съобщения. Използва се широко за поточни анализи и тръбопроводи за интегриране на данни за зареждане и разпространение на данни. Обикновено се използва за поглъщане на събития за потребителско взаимодействие, сървърни събития, събития в реално време, репликиране на данни между бази данни, действа като корпоративна шина за събития за споделяне на бизнес събития в цялата организация и поточно предаване на данни от приложения, включително сензори и събития на приложения, използвани заедно с други облачни продукти на Google чрез канал за данни.
Pub/Sub предлага прост, но мощен метод за обработка на дублирани данни с помощта на неговите атрибути. Всяко съобщение в тема Pub/Sub може да включва двойки ключ-стойност в метаданните. Тези данни могат да се използват за идентифициране на дублиращи се събития и активиране на дедупликация в тръбопровода за данни, без да се натоварват услугите за обработка на данни, което обикновено има по-високи разходи за ресурси и значително забавя тръбопровода за данни.
За съобщения, които включват уникално поле като transaction_id , тази стойност може да бъде зададена като атрибут при публикуване на съобщения. Когато четете съобщения от Pub/Sub в Dataflow, можете да конфигурирате тръбопровода за дедупликация, като използвате този атрибут.
Това решение е ефективно, когато дубликатите се предават от изходното приложение или устройство, използвайки уникалния идентификатор в темата Pub/Sub. Ограничението за това решение е, че работи добре само за дублирани съобщения, публикувани в рамките на 10 минути едно от друго. Въпреки че е лесен за изпълнение, но му липсва мащабируемост чрез ограничението на времевия прозорец в Pub/Sub. Това е много полезно в случаи като камера за превишена скорост или сензорни устройства, генериращи дублиращи се събития в рамките на 10 минути прозорец от всяко съобщение, това работи чудесно.
Възможно е да има случаи, при които дубликатите, генерирани в самия издател, като Pub/Sub поради забавяне в приемането на съобщения от низходящия поток или Pub/Sub никога не са получили потвърждение за доставените съобщения, Pub/Sub се опитва да изпрати същото съобщение, използвайки същото Message_id, като по този начин създава дублирани събития в издателя. За да се справим с това, използвайки Pub/Sub, можем да определим message_id на полезния товар и да го използваме като идентификатор. Cloud DataFlow е напълно управлявана услуга за поточно обработване на данни на Google Cloud платформа (GCP), осигурява точно веднъж обработка на всеки запис. Какво означава за нас? - Идентифицира дублиращи се събития въз основа на message_id и ги елиминира при обработка в тръбопроводи за данни, но в редки случаи тези дублирани събития, когато са били обработени на различните работни възли в рамките на потока от данни, те стигат до низходящия поток неефективно. Все пак ще имате дубликати в езерото си с данни.
Ще обсъдим допълнително как да се справяме с такива случаи в тази статия към края. Нека се съсредоточим върху оставащите опции за дедупликация на поточните данни.
Сега знаем как Pub/Sub обработва дублиращи се събития, следва обработката на тези съобщения с помощта на Cloud DataFlow с абонат на Pub/Sub, който чете поточно предавани съобщения от приложението източник. Dataflow е напълно управлявана услуга, която използва Apache Beam SDK с отворен код, за да активира разширени възможности за поточно предаване. Dataflow се мащабира до 4000 работни възли на задание и може да обработва петабайти данни с функции за автоматично мащабиране за по-добро използване на ресурсите както в пакетни, така и в поточни тръбопроводи.
Apache Beam предлага вграден Duplicate PTransform, който осигурява по-конфигурируем и стабилен метод за премахване на дубликати. Този метод използва Stateful API на Beam, за да поддържа състояние за всеки наблюдаван ключ и премахва дубликати в рамките на дефиниран от потребителя времеви прозорец. Този подход ви позволява да дефинирате логиката за дедупликация въз основа на конкретни полета във вашите данни или цялото съдържание на съобщението, с възможност за конфигуриране на дедупликация въз основа на време на събитие или време за обработка .
Разгледайте моя примерен код за тръбопровод за данни от GitHub, за да изпробвате тази функционалност.
Едно нещо, което трябва да се отбележи тук е, че груповите конвейери винаги използват точно веднъж обработка, докато стрийминг конвейерите използват точно веднъж обработка по подразбиране, но могат да бъдат конфигурирани да използват и поне веднъж обработка. Уловката тук е да се отбележи, че прозорецът, който потокът от данни обработва в момента, когато пресече обработващия прозорец, дублиращо се съобщение няма да го сравни с това, което вече е обработено, тъй като потокът от данни не съхранява идентификаторите на записи в паметта. Dataflow може да отхвърли това съобщение въз основа на късно пристигащи данни или ако тръбопроводът за данни има друг крак за улавяне на необработени съобщения и записване в таблица в Cloud Bigquery - напълно управлявано, родно в облак хранилище на данни на GCP или записване в облачно хранилище - е управлявана услуга за съхраняване на неструктурирани данни като файл за по-нататъшна повторна обработка и отстраняване на проблеми.
Това решение предоставя гъвкава опция за обработка на сложни входни данни за дедупликация и го прави подходящо за сценарии, при които прозорецът за дедупликация е по-голям или по-сложен от това, което предлага Pub/Sub. Компромисите включват по-високо използване на ресурси за поддържане на всяко състояние, за да се определи уникалността на записа
Досега видяхме как издатели като Pub/Sub и услуги за интеграция Cloud DataFlow обработват дубликати в реално време. Мисля, че тези решения не са 100% ефективни, когато става въпрос за прозорци поради проблеми с обработката и обема, в такива сценарии, за справяне с крайни случаи, включително когато дублирано съобщение е късно пристигащо и потокът от данни смята, че е уникален запис, защото не Не задържайте идентификаторите на записите, за да проверите кръстосаната уникалност на съобщенията и в друг сценарий потокът от данни обработва тези съобщения на различни работни възли поради мрежови повреди и/или грешки в работни възли го кара да мисли, че е уникален запис, докато се обработва в потока от данни и попада в системите надолу по веригата, като таблицата с големи заявки в облака на Google.
За смекчаване на такива случаи и като последна проверка за дедупликацията може да се извърши на ниво приемник, като например в BigQuery или други хранилища за данни. Този подход често е полезен, когато дедупликацията в реално време не е критична и е достатъчна периодичната дедупликация. Това ефективно ще филтрира и елиминира всички дублиращи се съобщения с помощта на разширени SQL заявки.
Въз основа на случая на употреба има два типа решения за коригиране на дубликати.
Първо, използвайте планирани заявки или чрез DAG за композиране, или в рамките на конзолата на BigQuery, за да създавате таблица за дедупиране периодично, като използвате дялове (ежедневно или ежечасно), което прави лесен избор за всеки да създаде процеса и да съхранява данните за дедупиране в междинна таблица и зареждане отделните данни във финалната таблица.
Второ, можем да използваме материализиран изглед, за да получим данни в реално време, което го прави идеално решение за бързо получаване на бизнес прозрения.
Bigquery SQL заявката е представена на моята Github връзка dedup_sql .
По-долу кодът на bigquery sql обяснява две опции, които обсъдихме:
-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;
Всяка стратегия за дедупликация идва със собствен набор от компромиси. Ето обобщение, което да ви помогне да изберете правилния подход:
Метод | Предимства | Недостатъци |
---|---|---|
Атрибути на Pub/Sub съобщение | Ниска латентност, естествено за Pub/Sub | Ограничен до 10-минутен прозорец за дедупликация |
Дедупликация на Apache Beam | Изключително гъвкав, поддържа сложна логика за дедупликация | По-високо потребление на ресурси поради държавно управление |
Дедупликация, базирана на приемник | Подходящ за пакетни или периодични актуализации, минимална логика | Може да въведе латентност; може да са необходими инструменти за оркестрация |
Дедупликацията е крайъгълен камък на ефективната обработка на данни в конвейери за поточно предаване. Изборът на стратегия зависи от нуждите, сложността и ресурсните ограничения на вашия конвейер в реално време. Като използвате силните страни на Pub/Sub атрибутите, Apache Beam Deduplicate PTransform или дедупликация, базирана на приемник, можете да осигурите чисти, надеждни данни за системи надолу по веригата. Разгледайте тези подходи, приложете предоставените примери и ги адаптирайте към вашия случай на употреба за оптимални резултати.
Интересувате ли се от по-задълбочени ръководства за анализ на данни и машинно обучение? Следвайте ме