مهندسان داده اغلب با چالش هایی روبرو هستند که داده ها در قالب نامناسب هستند، به خصوص کاراکترها و داده های ناخواسته، مقادیر خالی یا خالی، و مهمتر از همه با داده های تکراری سروکار دارند که بر همه برنامه های کاربردی پایین دستی از جمله مدل های گزارش و علم داده تأثیر می گذارد. این به یک وظیفه روزانه سنگین برای مهندسان و تیمهای پشتیبانی تبدیل میشود و منابع خود را به سرعت و بدون بهرهوری تخلیه میکنند. اغلب چارچوبهایی که طراحی ضعیفی دارند، برای توسعهدهندگان بعداً برای کاهش این اصلاحات دادهها، زمانهای سختی دارند. بسیاری از سازمانها به دلیل معماریهای خط لوله داده ناکارآمد، دادههای اضافی دارند، که میلیونها دلار هزینه ذخیرهسازی برای آنها به همراه دارد، چندین بار پردازش مجدد دادهها و استفاده ضعیف از منابع.
بیایید به این نکته برسیم، در نقش فعلی خود، آیا تا به حال با چالشی در رسیدگی به موارد تکراری در خطوط لوله داده های جریانی یا دسته ای مواجه شده اید؟ اکثر مهندسان داده، دانشمندان داده و تحلیلگران داده می گویند "بله". برای تصحیح داده های تکراری در دریاچه داده، ابزارهای متعددی در دنیای کنونی وجود دارد، اما به چه قیمتی؟ آیا می توانید اینها را در مرحله طراحی معماری خود مدیریت کنید؟ ممکن است سوالات زیادی در ذهن شما وجود داشته باشد.
بیایید به تفصیل درباره ابزارهایی که میتوانند به شما در حذف دادههای جریان، مزایا و معایب، راهاندازی و نگهداری کمک کنند، بحث کنیم. در مرحله بعد، ما به بهترین شیوه ها و استانداردها برای رسیدگی به موارد تکراری در خط لوله جریان می پردازیم.
بیایید سه رویکرد اصلی برای حذف مجدد در خطوط لوله داده جریانی را بررسی کنیم:
همه خطوط لوله جریان دادهها را از برنامههای مختلف مانند دستگاههای اینترنت اشیا، حسگرها، آمار بازی، دوربینهای ترافیک و دستگاههای آشکارساز سرعت، و سیستمهای هوشمندی که دادههای استفاده از خودرو را از وسایل نقلیه خودران جریان میدهند، استخراج میکنند. اغلب این سیستمها معمولاً از یک الگو برای پخش جریانی رویدادها پیروی میکنند و هر رویداد معمولاً دارای یک شناسه منحصربهفرد است، مثلاً یک شناسه تراکنش فروشگاه خردهفروشی برای تراکنش فروش با مهر زمانی رویداد آن. برخی از سیستمها معمولاً یک شناسه منحصربهفرد ندارند، نمونههایی مانند دستگاههای حسگر سرعت معمولاً شناسه حسگر آن را دارند، اما همه رویدادهای جریان یک شناسه منحصربهفرد به جز مهر زمانی رویداد ندارند. در این مواقع، امکان پخش جریانی تکراری برای همان دستگاه حسگر وجود دارد.
به موردی فکر کنید که در آن جریان دادههای سرعت خودرو از دستگاهی در بین ایالتی معمولاً در یک روز شلوغ حجم زیادی در دقیقه دارد. مثال دیگر این است که در طول رویدادهای فروش تعطیلات، مشاغل خرده فروشی باید با میلیاردها تراکنش در روز سر و کار داشته باشند. مدیریت چنین حجمی از رویدادها در زمان واقعی و حذف دادهها برای گزارشدهی دقیق و مدلهای علم داده بسیار مهم است تا با حذف موارد پرت و تکراری به طور موثر عمل کنند.
بیایید در اصطلاح فنی صحبت کنیم، Google cloud Pub/Sub را ارائه میکند که یک سرویس پیامرسانی ناهمزمان و مقیاسپذیر است که خدمات تولید پیام را از سرویسهایی که آن پیامها را پردازش میکنند جدا میکند. این به شدت برای تجزیه و تحلیل جریان و خطوط لوله یکپارچه داده ها برای بارگیری و توزیع داده ها استفاده می شود. معمولاً برای دریافت رویدادهای تعامل کاربر، رویدادهای سرور، رویدادهای بلادرنگ، تکرار دادهها در بین پایگاههای داده، به عنوان گذرگاه رویداد سازمانی برای اشتراکگذاری رویدادهای تجاری در سراسر سازمان، و جریان دادهها از برنامهها از جمله حسگرها و رویدادهای برنامه کاربردی استفاده میشود. با سایر محصولات ابری گوگل از طریق خط لوله داده.
Pub/Sub یک روش ساده و در عین حال قدرتمند برای مدیریت داده های تکراری با استفاده از ویژگی های آن ارائه می دهد. هر پیامی در موضوع Pub/Sub میتواند شامل جفتهای کلید-مقدار در فراداده باشد. این داده ها را می توان برای شناسایی رویدادهای تکراری و فعال کردن deduplication در خط لوله داده بدون فشار دادن بار به خدمات پردازش داده استفاده کرد که به طور کلی هزینه منابع بالاتری دارد و خط لوله داده را به شدت کند می کند.
برای پیامهایی که دارای یک فیلد منحصربهفرد مانند transaction_id هستند، این مقدار میتواند به عنوان یک ویژگی در هنگام انتشار پیامها تنظیم شود. هنگام خواندن پیامهای Pub/Sub در Dataflow، میتوانید خط لوله را برای کپی کردن با استفاده از این ویژگی پیکربندی کنید.
این راه حل زمانی موثر است که موارد تکراری از برنامه یا دستگاه منبع با استفاده از شناسه منحصربهفرد در موضوع Pub/Sub جریان مییابند. محدودیت این راه حل این است که فقط برای پیام های تکراری منتشر شده در عرض 10 دقیقه از یکدیگر به خوبی کار می کند. اگرچه پیاده سازی آن ساده است، اما با محدودیت پنجره زمانی در Pub/Sub، مقیاس پذیری ندارد. این در مواردی مانند دوربینهای سرعتگیر یا دستگاههای حسگر که رویدادهای تکراری را در عرض 10 دقیقه از هر پیام ایجاد میکنند بسیار مفید است، این کار عالی است.
ممکن است مواردی وجود داشته باشد که موارد تکراری ایجاد شده در خود ناشر مانند Pub/Sub به دلیل تأخیر در مصرف پیامها توسط پاییندست یا Pub/Sub هرگز تأییدیهای برای پیامهای تحویلشده دریافت نکردهاند، Pub/Sub دوباره سعی میکند همان پیام را با استفاده از همان پیام ارسال کند. Message_id، در نتیجه رویدادهای تکراری در ناشر ایجاد می شود. برای مقابله با این موضوع، با استفاده از Pub/Sub، میتوانیم message_id از payload را تعیین کنیم و از آن به عنوان شناسه استفاده کنیم. Cloud DataFlow یک سرویس کاملاً مدیریت شده برای پردازش جریانی داده ها در پلت فرم Google Cloud (GCP) است که دقیقاً یک بار پردازش هر رکورد را ارائه می دهد. برای ما چه معنایی دارد؟ - رویدادهای تکراری را بر اساس message_id شناسایی می کند و هنگام پردازش در خطوط لوله داده، آنها را حذف می کند، اما در موارد نادری، این رویدادهای تکراری زمانی که روی گره های کارگر مختلف در جریان داده پردازش می شوند، به طور غیر موثر به پایین دست می رسند. همچنان در دریاچه داده های خود موارد تکراری خواهید داشت.
در این مقاله تا پایان در مورد نحوه رسیدگی به چنین مواردی بیشتر بحث خواهیم کرد. بیایید روی گزینههای باقیمانده برای کپی کردن دادههای جریان تمرکز کنیم.
اکنون میدانیم که Pub/Sub چگونه رویدادهای تکراری را مدیریت میکند، سپس پردازش این پیامها با استفاده از Cloud DataFlow با مشترک Pub/Sub میآید که پیامهای پخش جریانی را از برنامه منبع میخواند. Dataflow یک سرویس کاملاً مدیریت شده است که از Apache Beam SDK منبع باز برای فعال کردن قابلیتهای پخش پیشرفته استفاده میکند. جریان داده به 4000 گره کارگر در هر کار مقیاس میشود و میتواند پتابایت داده را با ویژگیهای مقیاس خودکار برای استفاده بهتر از منابع در خطوط لوله دستهای و جریانی پردازش کند.
Apache Beam یک Deduplicate PTransform داخلی ارائه می دهد که روشی قابل تنظیم و قوی تر برای حذف موارد تکراری ارائه می دهد. این روش از API Stateful Beam برای حفظ وضعیت برای هر کلید مشاهده شده استفاده می کند و موارد تکراری را در یک پنجره زمانی تعریف شده توسط کاربر حذف می کند. این رویکرد به شما امکان میدهد منطق deduplication را بر اساس فیلدهای خاص در دادههای خود یا کل محتوای پیام، با قابلیت پیکربندی deduplication بر اساس زمان رویداد یا زمان پردازش ، تعریف کنید.
نمونه کد خط لوله داده من را از GitHub بررسی کنید تا این عملکرد را امتحان کنید.
نکته ای که در اینجا باید به آن توجه کرد این است که خطوط لوله دسته ای همیشه دقیقاً یک بار پردازش را انجام می دهند در حالی که خطوط لوله جریان به طور پیش فرض از پردازش دقیقاً یک بار استفاده می کنند اما می توان آن را طوری پیکربندی کرد که حداقل یک بار پردازش را نیز انجام دهد. نکته قابل توجه در اینجا این است که به پنجره ای که جریان داده در حال پردازش است توجه داشته باشید، هنگامی که از پنجره پردازش یک پیام تکراری عبور می کند آن را با آنچه قبلا پردازش شده است مقایسه نمی کند زیرا جریان داده شناسه های رکورد را در حافظه ذخیره نمی کند. جریان داده ممکن است این پیام را بر اساس دادههایی که دیر به دست میآیند نادیده بگیرد یا اگر خط لوله داده دارای یک قسمت دیگر برای گرفتن پیامهای پردازش نشده و نوشتن در جدولی در Cloud Bigquery باشد - انبار دادهای کاملاً مدیریتشده و بومی ابری در GCP یا نوشتن یک ذخیرهسازی ابری - یک سرویس مدیریت شده برای ذخیره داده های بدون ساختار، به عنوان یک فایل برای اهداف بیشتر پردازش مجدد و عیب یابی است.
این راه حل یک گزینه منعطف برای پردازش ورود به سیستم کپی برداری پیچیده و مناسب کردن آن برای سناریوهایی است که در آن پنجره حذف مجدد بزرگتر یا پیچیده تر از چیزی است که Pub/Sub ارائه می دهد. مبادلات شامل استفاده بیشتر از منابع برای حفظ هر حالت برای تعیین منحصر به فرد بودن رکورد است
تاکنون، دیدهایم که چگونه Publisher مانند Pub/Sub و سرویسهای یکپارچه Cloud DataFlow موارد تکراری را در زمان واقعی مدیریت میکند. من فکر میکنم این راهحلها در مورد پنجرهسازی به دلیل مشکلات سربار پردازش و حجم، در چنین سناریوهایی، برای رسیدگی به موارد لبه از جمله مواردی که پیام تکراری دیر رسیده است و جریان داده فکر میکند که یک رکورد منحصربهفرد است، 100٪ مؤثر نیستند، زیرا این کار را انجام نمیدهد. شناسههای رکورد را برای بررسی متقاطع بودن پیامها نگه ندارید و در سناریویی دیگر، جریان داده این پیامها را در گرههای کارگری مختلف به دلیل خرابی شبکه و/یا خرابی گره کارگر مدیریت میکند. باعث می شود که فکر کند یک رکورد منحصر به فرد است در حالی که در جریان داده پردازش می شود و وارد سیستم های پایین دستی مانند جدول بزرگ ابری گوگل می شود.
برای کاهش چنین مواردی و به عنوان یک بررسی نهایی برای حذف مجدد میتواند در سطح سینک، مانند BigQuery یا سایر انبارهای داده، رخ دهد. این رویکرد اغلب زمانی مفید است که کپیبرداری بلادرنگ حیاتی نیست، و کپیبرداری دورهای کافی است. این به طور موثر تمام پیام های تکراری را با استفاده از پرس و جوهای پیشرفته SQL فیلتر و حذف می کند.
بر اساس موارد استفاده، دو نوع راه حل برای رفع موارد تکراری وجود دارد.
ابتدا، از پرسوجوهای زمانبندیشده یا از طریق یک Composer DAG یا در کنسول BigQuery برای ایجاد جدول حذف به صورت دورهای با استفاده از پارتیشنها (اعم از روزانه یا ساعتی)، استفاده کنید، و ایجاد فرآیند و ذخیره دادههای حذف در جدول مرحلهبندی و بارگیری را برای هرکسی انتخاب سادهای میکند. داده های متمایز در جدول نهایی.
دوم، ما میتوانیم از یک نمای واقعی برای دریافت دادههای همزمان استفاده کنیم و آن را به یک راهحل ایدهآل برای دریافت سریع بینشهای تجاری تبدیل کنیم.
پرس و جو Bigquery SQL در پیوند Github dedup_sql من ارائه شده است.
کد زیر bigquery sql دو گزینه را توضیح می دهد که در مورد آنها بحث کرده ایم:
-- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;
هر استراتژی کپی برداری با مجموعه ای از معاوضه ها همراه است. در اینجا خلاصه ای وجود دارد که به شما در انتخاب روش مناسب کمک می کند:
روش | مزایا | معایب |
---|---|---|
ویژگی های پیام Pub/Sub | تأخیر کم، بومی Pub/Sub | محدود به 10 دقیقه پنجره بازنویسی |
Apache Beam Deduplicate | بسیار انعطاف پذیر است، از منطق پیچیده deduplication پشتیبانی می کند | مصرف بیشتر منابع به دلیل مدیریت دولتی |
Deduplication مبتنی بر سینک | مناسب برای به روز رسانی دسته ای یا دوره ای، حداقل منطق | ممکن است تاخیر را معرفی کند. ممکن است به ابزارهای ارکستراسیون نیاز باشد |
Deduplication سنگ بنای پردازش داده موثر در خطوط لوله جریان است. انتخاب استراتژی به نیازهای بلادرنگ، پیچیدگی و محدودیت های منابع خط لوله شما بستگی دارد. با استفاده از ویژگیهای Pub/Sub، Apache Beam Deduplicate PTransform یا Deduplication مبتنی بر سینک، میتوانید از دادههای تمیز و قابل اعتماد برای سیستمهای پایین دست اطمینان حاصل کنید. این رویکردها را کاوش کنید، مثالهای ارائه شده را پیادهسازی کنید، و آنها را با موارد استفاده خود برای نتایج بهینه تطبیق دهید.
آیا به راهنمایی های عمیق تر در مورد تجزیه و تحلیل داده ها و یادگیری ماشین علاقه مند هستید؟ من را دنبال کنید