paint-brush
צינור הנתונים שלך הוא בלגן - הנה איך לעצור נתונים כפולים מבזבוז מיליוניםעל ידי@emailnareshe
334 קריאות
334 קריאות

צינור הנתונים שלך הוא בלגן - הנה איך לעצור נתונים כפולים מבזבוז מיליונים

על ידי Naresh Erukulla10m2025/01/28
Read on Terminal Reader

יותר מדי זמן; לקרוא

בעיבוד נתונים בזמן אמת, רשומות כפולות עלולות להוביל לתובנות לא מדויקות, לעלויות חישוביות מיותרות ולחוסר יעילות במערכות במורד הזרם. זה הופך את מניעת הכפילויות למרכיב קריטי בצינורות הזרמת נתונים. יישום אסטרטגיות יעילות הוא המפתח לשמירה על נתונים נקיים ומהימנים.
featured image - צינור הנתונים שלך הוא בלגן - הנה איך לעצור נתונים כפולים מבזבוז מיליונים
Naresh Erukulla HackerNoon profile picture
0-item


מהנדסי נתונים מתמודדים לעתים קרובות עם אתגרים עם נתונים בפורמט לא הולם במיוחד תווים ונתונים זבל, ערכים אפסים או ריקים, והכי חשוב התמודדות עם נתונים כפולים שמשפיעים על כל היישומים במורד הזרם, כולל מודלים של דיווח ומדעי נתונים. זה הופך למשימה יומיומית כבדה עבור מהנדסים וצוותי תמיכה, שמרוקנים את המשאבים שלהם במהירות מבלי להיות פרודוקטיביים. לעתים קרובות למסגרות המעוצבות בצורה גרועה יש זמנים קשים למפתחים מאוחר יותר כדי להפחית את תיקוני הנתונים הללו. לארגונים רבים יש נתונים מיותרים עקב ארכיטקטורות לא יעילות של צנרת הנתונים, שעלותם עלויות אחסון של מיליוני דולרים, עיבוד הנתונים מספר פעמים וניצול לקוי של משאבים.


בוא נגיע לנקודה, בתפקיד הנוכחי שלך, האם אי פעם התמודדת עם אתגר בטיפול בכפילויות בצנרת הזרימה או האצווה? רוב מהנדסי הנתונים, מדעני הנתונים ומנתחי הנתונים היו אומרים "כן". כדי לתקן נתונים כפולים באגם נתונים, ישנם כלים רבים בעולם הנוכחי אבל באיזה מחיר? האם אתה יכול להתמודד עם אלה בשלב תכנון האדריכלות שלך עצמו? יכולות להיות שאלות רבות שצצות בראשך.


בוא נדון בפירוט מהם הכלים שיכולים לעזור לך לבטל את שכפול הנתונים הזורמים, היתרונות והחסרונות שלהם, ההגדרה והתחזוקה שלהם. לאחר מכן, נצלול לעומקם של שיטות עבודה וסטנדרטים מומלצים לטיפול בכפילויות בצינור הסטרימינג.




הבה נבדוק שלוש גישות עיקריות למניעת כפילויות בצינורות זרימת נתונים:

מניעת כפילויות באמצעות תכונות Pub/Sub Message

כל צינורות הסטרימינג מחלצים נתונים מיישומים שונים כמו מכשירי IoT, חיישנים, סטטיסטיקות משחק, מצלמות תנועה והתקני גלאי מהירות, ומערכות חכמות המזרימות נתוני שימוש ברכבים מרכבים אוטונומיים. רוב המערכות הללו בדרך כלל עוקבות אחר דפוס להזרמת אירועים ולכל אירוע יהיה בדרך כלל מזהה ייחודי, נניח מזהה עסקה של חנות קמעונאית לעסקת המכירה עם חותמת הזמן שלו. לחלק מהמערכות בדרך כלל אין מזהה ייחודי, לדוגמאות כמו התקני חיישני מהירות יש בדרך כלל את מזהה החיישן שלו, אבל כל אירועי הזרם אינם מכילים מזהה ייחודי מלבד חותמת הזמן של האירוע. במקרים אלה, קיימת אפשרות גבוהה של אירועי סטרימינג כפולים עבור אותו מכשיר חיישן.


חשבו על מקרה שימוש שבו הזרמת נתוני מהירות רכב ממכשיר בכביש בינעירוני תנוע בדרך כלל בנפחים גדולים לדקה ביום עמוס. דוגמה נוספת היא במהלך אירועי מכירת חגים, עסקים קמעונאיים חייבים להתמודד עם מיליארדי עסקאות ביום. טיפול בכמות כזו של אירועים בזמן אמת ומניעת כפילויות בנתונים חיוני מאוד כדי שמודלים מדויקים של דיווח ומדעי נתונים יפעלו ביעילות על ידי הסרת חריגים וכפולים.


בואו נדבר במונחים טכניים, google cloud מספק את Pub/Sub שהוא שירות הודעות אסינכרוני וניתן להרחבה שמנתק שירותים שמייצרים הודעות משירותים המעבדים את ההודעות הללו. הוא משמש רבות להזרמת צינורות ניתוח ושילוב נתונים לטעינה והפצה של נתונים. הוא משמש בדרך כלל להטמעת אירועי אינטראקציה של משתמשים, אירועי שרת, אירועים בזמן אמת, שכפול נתונים בין מסדי נתונים, פועל כאוטובוס אירועים עסקיים לשיתוף אירועים עסקיים ברחבי הארגון, והזרמת נתונים מיישומים כולל חיישנים ואירועי יישומים המשמשים יחד עם מוצרי google ענן אחרים דרך צינור נתונים.


Pub/Sub מציע שיטה פשוטה אך רבת עוצמה לטיפול בנתונים כפולים באמצעות התכונות שלו. כל הודעה בנושא Pub/Sub יכולה לכלול צמדי מפתח-ערך במטא נתונים. ניתן להשתמש בנתונים אלו כדי לזהות אירועים כפולים ולאפשר מניעת כפילויות בצנרת הנתונים מבלי לדחוף את העומס לשירותי עיבוד נתונים אשר בדרך כלל יש להם עלויות משאבים גבוהות יותר ומאט מאוד את צינור הנתונים.


עבור הודעות הכוללות שדה ייחודי כמו transaction_id , ניתן להגדיר ערך זה כתכונה בעת פרסום הודעות. בעת קריאת הודעות מ-Pub/Sub ב-Dataflow, תוכל להגדיר את הצינור לביטול כפילות באמצעות תכונה זו.


פתרון זה יעיל כאשר הכפילויות זורמים מאפליקציית המקור או מהמכשיר באמצעות המזהה הייחודי בנושא Pub/Sub. המגבלה של פתרון זה היא שהוא עובד היטב רק עבור הודעות כפולות שפורסמו בטווח של 10 דקות אחד מהשני. אמנם זה פשוט ליישום אך חסר מדרגיות לפי מגבלת חלון הזמן ב-Pub/Sub. זה מאוד שימושי במקרים כמו, מצלמות מהירות או מכשירי חיישן שיוצרים אירועים כפולים בתוך חלון של 10 דקות מכל הודעה, זה עובד מצוין.


יכולים להיות מקרים שבהם הכפילויות שנוצרו בתוך המפרסם עצמו כמו Pub/Sub עקב עיכוב בצריכת הודעות על ידי המורד או Pub/Sub מעולם לא קיבלו אישור על ההודעות שנמסרו, Pub/Sub מנסה לשלוח את אותה הודעה באמצעות אותה הודעה. Message_id, ובכך ליצור אירועים כפולים ב-Publisher. כדי להתמודד עם זה, באמצעות Pub/Sub, נוכל לקבוע את ה- message_id של המטען ולהשתמש בזה כמזהה. Cloud DataFlow שירות מנוהל במלואו לעיבוד נתונים בזרם בפלטפורמת Google Cloud (GCP), מספק עיבוד פעם אחת בדיוק של כל רשומה. מה זה אומר עבורנו? - הוא מזהה אירועים כפולים בהתבסס על message_id ומבטל אותם בעת עיבוד בצינורות נתונים, אך במקרים נדירים אירועים כפולים אלה כאשר הם עברו עיבוד בצמתי העבודה השונים בתוך זרימת הנתונים, הם מגיעים למורד הזרם בצורה לא יעילה. עדיין יהיו לך כפילויות באגם הנתונים שלך.


נדון בהמשך כיצד לטפל במקרים כאלה במאמר זה לקראת הסוף. בואו נתמקד באפשרויות הנותרות לביטול כפילות של הנתונים הזורמים.



מניעת כפילויות באמצעות Deduplicate PTransform של Apache Beam


כעת אנו יודעים כיצד Pub/Sub מטפל באירועים כפולים, בשלב הבא עיבוד ההודעות הללו באמצעות Cloud DataFlow עם מנוי Pub/Sub קורא הודעות זורמות מאפליקציית המקור. Dataflow הוא שירות מנוהל במלואו המשתמש בקוד פתוח Apache Beam SDK כדי לאפשר יכולות סטרימינג מתקדמות. זרימת הנתונים מתרחבת ל-4000 צמתים של עובדים בכל עבודה ויכולה לעבד פטה-בייט של נתונים עם תכונות קנה מידה אוטומטי לניצול טוב יותר של משאבים בצינורות אצווה וגם בזרימה.


Apache Beam מציעה Deduplicate PTransform מובנה המספקת שיטה ניתנת להגדרה וחזקה יותר להסרת כפילויות. שיטה זו משתמשת ב-Stateful API של Beam כדי לשמור על מצב עבור כל מפתח שנצפה ומסירה כפילויות בתוך חלון זמן מוגדר על ידי המשתמש. גישה זו מאפשרת לך להגדיר לוגיקה של מניעת כפילויות בהתבסס על שדות ספציפיים בנתונים שלך או על כל תוכן ההודעה, עם היכולת להגדיר ביטול כפילויות על סמך זמן האירוע או זמן העיבוד .


בדוק את קוד צינור הנתונים לדוגמה שלי מ- GitHub כדי לנסות את הפונקציונליות הזו.


דבר אחד שכדאי לציין כאן הוא שצינורות אצווה תמיד משתמשים בעיבוד פעם אחת בדיוק, בעוד שצינורות זרימה משתמשים בעיבוד חד-פעמי בדיוק כברירת מחדל, אך ניתן להגדיר אותם להשתמש בעיבוד פעם אחת לפחות. המלכוד כאן הוא לציין את החלון שזרימת הנתונים מעבדת כעת, כאשר הוא חוצה את החלון ומעבד הודעה כפולה לא ישווה אותו למה שהוא כבר מעובד מכיוון שזרימת הנתונים לא מאחסנת את מזהי הרשומות בזיכרון. Dataflow עשויה להשליך הודעה זו בהתבסס על נתונים שהגיעו מאוחר או אם לצינור הנתונים יש רגל נוספת ללכידת הודעות לא מעובדות ולכתוב לטבלה ב- Cloud Bigquery - מחסן נתונים מנוהל במלואו, מקורי בענן ב-GCP או כתיבת אחסון בענן - הוא שירות מנוהל לאחסון נתונים לא מובנים, כקובץ למטרות עיבוד מחדש ופתרון בעיות נוספות.



פתרון זה מספק אפשרות גמישה לעיבוד התחברות מורכבות למניעת כפילויות והפיכתו למתאים לתרחישים בהם חלון מניעת הכפילויות גדול או מורכב יותר ממה שמציע Pub/Sub. פשרות כוללות שימוש גבוה יותר במשאבים לשמירה על כל מדינה כדי לקבוע ייחוד שיא



מניעת כפילות בכיור


עד כה ראינו כיצד Publisher כמו Pub/Sub ושירותי האינטגרציה Cloud DataFlow מטפל בכפילויות בזמן אמת. אני חושב שהפתרונות הללו אינם יעילים ב-100% בכל הנוגע לחלונות עקב בעיות עיבוד תקורה ונפח, בתרחישים כאלה, לטיפול במקרים של קצה כולל כאשר הודעה כפולה היא הגעה מאוחרת וזרימת הנתונים חושבת שזו רשומה ייחודית מכיוון שהיא לא אל תחזיק את מזהי הרשומות כדי לבדוק את ייחוד ההודעות ובתרחיש אחר, זרימת נתונים מטפלת בהודעות אלו בצמתי עובדים שונים עקב כשלי רשת ו/או כשלים בצמתי עובד גורמים לו לחשוב שזה הוא רשומה ייחודית בזמן שהוא מעבד בזרימת נתונים ונכנס למערכות במורד הזרם כמו Google Cloud bigquery table.


כדי לצמצם מקרים כאלה וכבדיקה סופית למניעת הכפילויות יכול להתרחש ברמת הכיור, כמו ב-BigQuery או במחסני נתונים אחרים. גישה זו שימושית לעתים קרובות כאשר מניעת כפילויות בזמן אמת אינה קריטית, ודי שכפול תקופתי. זה יסנן ויבטל ביעילות את כל ההודעות הכפולות באמצעות שאילתות SQL מתקדמות.


בהתבסס על מקרה השימוש, ישנם שני סוגים של פתרונות זמינים לתיקון כפילויות.


ראשית, השתמש בשאילתות מתוזמנות באמצעות DAG composer או בתוך מסוף BigQuery כדי ליצור טבלת ביטול זיהוי מעת לעת באמצעות מחיצות (או יומיות או שעתיות), מה שהופך אותה לבחירה פשוטה עבור כל אחד ליצור את התהליך ולאחסן את נתוני ביטול הביטול בטבלת הצגה ולטעון הנתונים המובהקים לטבלה הסופית.


שנית, אנו יכולים להשתמש בתצוגה ממומשת כדי לקבל נתונים בזמן אמת, מה שהופך אותו לפתרון אידיאלי לקבלת תובנות עסקיות במהירות.



שאילתת BigQuery SQL מוצגת בקישור Github dedup_sql שלי.


להלן קוד sql bigquery מסביר שתי אפשרויות שדנו בהן:

 -- Use below SQL queries to periodically deduplicate data in BigQuery tables. CREATE OR REPLACE TABLE Transactions AS SELECT DISTINCT * FROM raw_transactions; --OR use below incremental steps to drop the necessary partitions and re-insert the deduped data into the original table -- Step 1: Insert distinct records from the original table based on the max timestamp available CREATE OR REPLACE TABLE STAGE_TRANSACTIONS AS SELECT DISTINCT * FROM raw_transactions WHERE event_timestamp > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 2: Drop the partition after deduplication DELETE FROM raw_transactions WHERE event_timestamp = > ( SELECT MAX(event_timestamp) FROM raw_transactions ); -- Step 3: Insert deduplicated data into the main table INSERT INTO raw_transactions SELECT DISTINCT * FROM STAGE_TRANSACTIONS; --OR Use below SQL query to Merge new data without duplicates the table MERGE INTO raw_transactions AS target USING ( SELECT * FROM STAGE_TRANSACTIONS ) AS source ON target.transaction_id = source.transaction_id AND target.event_timestamp <= source.event_timestamp WHEN MATCHED THEN UPDATE SET target.product = source.product, target.price = source.price, target.location = source.location, target.store = source.store, target.zipcode = source.zipcode, target.city = source.city, target.promotion = source.promotion, target.event_timestamp = source.event_timestamp WHEN NOT MATCHED THEN INSERT (transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp) VALUES (source.transaction_id, source.product, source.price, source.location, source.store, source.zipcode, source.city, source.promotion, source.event_timestamp); --OR to get the real-time data without duplicates, use following materialized view and a finest solution to retrieve dedup records quickly CREATE MATERIALIZED VIEW raw_transactions_mv AS SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp FROM ( SELECT transaction_id, product, price, location, store, zipcode, city, promotion, event_timestamp, ROW_NUMBER() OVER ( PARTITION BY transaction_id ORDER BY event_timestamp DESC ) AS row_num FROM raw_transactions ) WHERE row_num = 1;

פשרות טכניות

כל אסטרטגיית מניעת כפילות מגיעה עם סט פשרות משלה. להלן תקציר שיעזור לך לבחור את הגישה הנכונה:

שִׁיטָה

יתרונות

חסרונות

תכונות הודעות Pub/Sub

זמן אחזור נמוך, מקורי ל-Pub/Sub

מוגבל לחלון מניעת כפילות של 10 דקות

ביטול כפילויות של קרן Apache

גמיש מאוד, תומך בלוגיקה מורכבת של מניעת כפילות

צריכת משאבים גבוהה יותר עקב ניהול המדינה

ניקוי כפילות על בסיס כיור

מתאים לעדכונים אצווה או תקופתיים, לוגיקה מינימלית

עשוי להציג חביון; ייתכן שיהיה צורך בכלי תזמור

בְּקִצוּר נִמרָץ

מניעת כפילויות היא אבן יסוד של עיבוד נתונים יעיל בצינורות סטרימינג. בחירת האסטרטגיה תלויה בצרכי הצינור שלך בזמן אמת, במורכבות ובאילוצי המשאבים. על ידי מינוף החוזקות של תכונות Pub/Sub, Apache Beam Deduplicate PTransform, או מניעת כפילויות מבוססות כיור, אתה יכול להבטיח נתונים נקיים ואמינים עבור מערכות במורד הזרם. חקור גישות אלה, יישם את הדוגמאות שסופקו והתאם אותן למקרה השימוש שלך לקבלת תוצאות מיטביות.


האם אתה מעוניין במדריכים מעמיקים יותר בנושא ניתוח נתונים ולמידת מכונה? עקבו אחריי הלאה בֵּינוֹנִי אוֹ לינקדאין למאמרים האחרונים, ואל תהסס לשתף את המחשבות או השאלות שלך בתגובות למטה. אם מצאת מאמר זה מועיל, שתף אותו עם הרשת שלך ועזור לאחרים לנצל את הפוטנציאל של ניתוח בזמן אמת בקמעונאות.