paint-brush
Comment comprendre vos données en temps réel en utilisant bytewax et ydata-profilingpar@ydata
799 lectures
799 lectures

Comment comprendre vos données en temps réel en utilisant bytewax et ydata-profiling

par YData9m2023/07/25
Read on Terminal Reader

Trop long; Pour lire

Juste un tutoriel étape par étape génial sur la façon d'effectuer le profilage des données sur les flux de données 🚀
featured image - Comment comprendre vos données en temps réel en utilisant bytewax et ydata-profiling
YData HackerNoon profile picture

Dans cet article de blog, nous expliquerons comment vous pouvez combiner et exploiter la solution de streaming open source, bytewax , avec ydata-profiling , pour améliorer la qualité de vos flux de streaming. Bouclez votre ceinture !


Le traitement de flux permet une analyse en temps réel des données en cours et avant le stockage et peut être avec ou sans état .


Le traitement de flux avec état est utilisé pour les recommandations en temps réel , la détection de modèles ou le traitement d'événements complexes, où l'historique de ce qui s'est passé est requis pour le traitement (fenêtres, jointure par une clé, etc.).


Le traitement de flux sans état est utilisé pour la transformation en ligne qui ne nécessite pas la connaissance d'autres points de données dans le flux, comme le masquage d'un e-mail ou la conversion d'un type.


Photo de Markus Spiske sur Unsplash


Dans l'ensemble, les flux de données sont largement utilisés dans l'industrie et peuvent être appliqués à des cas d'utilisation tels que la détection des fraudes , la surveillance des patients ou la maintenance prédictive des événements .

Un aspect crucial que tous les flux de données doivent prendre en compte est la qualité des données

Contrairement aux modèles traditionnels où la qualité des données est généralement évaluée lors de la création de l'entrepôt de données ou de la solution de tableau de bord, le streaming de données nécessite une surveillance continue .


Il est essentiel de maintenir la qualité des données tout au long du processus, de la collecte à l'alimentation des applications en aval. Après tout, le coût d'une mauvaise qualité des données peut être élevé pour les organisations :


« Le coût des mauvaises données représente 15 % à 25 % du chiffre d'affaires pour la plupart des entreprises. (…) Les deux tiers de ces coûts peuvent être éliminés en misant sur la qualité des données.


— Thomas C. Redman, auteur de « Getting in Front on Data Quality »


Tout au long de cet article, nous allons vous montrer comment combiner bytewa avec ydata-profiling pour profiler et améliorer la qualité de vos flux de streaming !

Traitement de flux pour les professionnels des données avec Bytewax

Bytewax est un framework de traitement de flux OSS conçu spécifiquement pour les développeurs Python.


Il permet aux utilisateurs de créer des pipelines de données de streaming et des applications en temps réel avec des fonctionnalités similaires à Flink, Spark et Kafka Streams tout en offrant une interface conviviale et familière et une compatibilité à 100 % avec l'écosystème Python.


Utilisation intégrée connecteurs ou des bibliothèques Python existantes, vous pouvez vous connecter à des sources de données en temps réel et en streaming (Kafka, RedPanda, WebSocket, etc.) et écrire des données transformées sur divers systèmes en aval (Kafka, fichiers parquet, lacs de données, etc.).


Pour les transformations, Bytewax facilite les transformations avec et sans état avec les méthodes map , windowing et agrégation et est livré avec des fonctionnalités familières telles que la récupération et l'évolutivité.


Bytewax facilite une expérience Python-first et centrée sur les données pour les flux de données et est spécialement conçu pour les ingénieurs de données et les scientifiques de données .


Il permet aux utilisateurs de créer des pipelines de données de streaming et des applications en temps réel et de créer les personnalisations nécessaires pour répondre à leurs besoins sans avoir à apprendre et à maintenir des plates-formes de streaming basées sur JVM comme Spark ou Flink.


Bytewax est bien adapté à de nombreux cas d'utilisation, à savoir, Intégration de pipelines pour l'IA générative , Gestion des valeurs manquantes dans les flux de données , Utilisation de modèles de langage dans un contexte de streaming pour comprendre les marchés financiers , et plus.


Pour vous inspirer de cas d'utilisation et plus d'informations comme la documentation, les didacticiels et les guides, n'hésitez pas à consulter le site bytewax .

Pourquoi le profilage des données pour les flux de données ?

Le profilage des données est la clé d'un démarrage réussi de toute tâche d'apprentissage automatique et fait référence à l'étape de bien comprendre nos données : sa structure, son comportement et sa qualité.


En un mot, profilage des données consiste à analyser les aspects liés au format des données et aux descripteurs de base (par exemple, le nombre d'échantillons, le nombre/types de caractéristiques, les valeurs en double), son caractéristiques intrinsèques (telles que la présence de données manquantes ou de caractéristiques déséquilibrées) et d'autres facteurs de complication pouvant survenir lors de la collecte ou du traitement des données (par exemple, des valeurs erronées ou des caractéristiques incohérentes).


Garantir des normes élevées de qualité des données est crucial pour tous les domaines et organisations, mais est particulièrement pertinent pour les domaines fonctionnant avec des domaines produisant des données continues , où les circonstances peuvent changer rapidement et nécessiter une action immédiate (par exemple, surveillance des soins de santé, valeurs des stocks, politiques de qualité de l'air).


Pour de nombreux domaines, le profilage des données est utilisé dans une optique d'analyse exploratoire des données, en tenant compte des données historiques stockées dans des bases de données. Au contraire, pour les flux de données, le profilage des données devient essentiel pour la validation et le contrôle qualité en continu tout au long du flux , où les données doivent être vérifiées à différentes périodes ou étapes du processus.


En intégrant un profilage automatisé dans nos flux de données , nous pouvons immédiatement obtenir des informations sur l'état actuel de nos données et être alertés de tout problème potentiellement critique, qu'il soit lié à la cohérence et à l'intégrité des données (par exemple, des valeurs corrompues ou des changements de formats), ou à des événements se produisant sur de courtes périodes (par exemple, des dérives de données, des écarts par rapport aux règles et résultats métier).


Dans les domaines du monde réel - où vous savez juste que la loi de Murphy est vouée à frapper et "tout peut certainement mal tourner" - le profilage automatisé pourrait nous éviter de multiples énigmes cérébrales et des systèmes devant être retirés de la production !


En ce qui concerne le profilage des données, ydata-profiling a toujours été un favori de la foule , Soit pour tabulaire ou des séries chronologiques données. Et ce n'est pas étonnant : il s'agit d'une seule ligne de code pour un ensemble complet d'analyses et d'informations.


Des opérations complexes et chronophages sont effectuées sous le capot : ydata-profiling détecte automatiquement les types d'entités compris dans les données et, en fonction des types d'entités (numériques ou catégoriques), il ajuste les statistiques récapitulatives et les visualisations qui sont affichées dans le rapport de profilage.


Favorisant une analyse centrée sur les données , le package met également en évidence les relations existantes entre les fonctionnalités , en se concentrant sur leurs interactions et corrélations par paires , et fournit une évaluation approfondie des alertes de qualité des données , des valeurs en double ou constantes aux fonctionnalités biaisées et déséquilibrées .


C'est vraiment une vue à 360º de la qualité de nos données — avec un minimum d'effort.


Rapport de profilage : mise en évidence des problèmes potentiels de qualité des données.



Tout mettre ensemble : bytewax et ydata-profiling

Avant de démarrer le projet, nous devons d'abord définir nos dépendances Python et configurer notre source de données.


Tout d'abord, installons les packages bytewax et ydata-profiling ( vous pouvez utiliser un environnement virtuel pour cela — vérifiez ces instructions si vous avez besoin de conseils supplémentaires !)


 pip install bytewax==0.16.2 ydata-profiling==4.3.1


Ensuite, nous téléchargeons le Ensemble de données de télémétrie des capteurs environnementaux (Licence — CC0 : domaine public), qui contient plusieurs mesures de température, d'humidité, de monoxyde de carbone, de gaz de pétrole liquéfié, de fumée, de lumière et de mouvement à partir de différents appareils IoT :


 wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000


Dans un environnement de production, ces mesures seraient générées en continu par chaque appareil et l'entrée ressemblerait à ce que nous attendons d'une plateforme de streaming. comme Kafka . Dans cet article, pour simuler le contexte que nous trouverions avec des données en streaming, nous allons lire les données du fichier CSV une ligne à la fois et créer un flux de données à l'aide de bytewax.


(En guise de remarque rapide, un flux de données est essentiellement un pipeline de données qui peut être décrit comme un graphe acyclique dirigé - DAG)


Commençons par effectuer les importations nécessaires :


 from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main


Ensuite, nous définissons notre objet de flux de données. Ensuite, nous utiliserons une méthode de carte sans état où nous transmettrons une fonction pour convertir la chaîne en un objet DateTime et restructurer les données au format (device_id, data).


La méthode map apportera la modification à chaque point de données de manière sans état. La raison pour laquelle nous avons modifié la forme de nos données est que nous pouvons facilement regrouper les données dans les prochaines étapes pour profiler les données de chaque appareil séparément plutôt que pour tous les appareils simultanément.


 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))


Maintenant, nous allons tirer parti des capacités avec état de bytewax pour collecter des données pour chaque appareil sur la durée que nous avons définie. ydata-profiling attend un instantané des données au fil du temps, ce qui fait de l'opérateur de fenêtre la méthode idéale à utiliser pour ce faire.


Dans ydata-profiling , nous sommes en mesure de produire des statistiques récapitulatives pour une trame de données spécifiée pour un contexte particulier. Par exemple, dans notre exemple, nous pouvons produire des instantanés de données se référant à chaque appareil IoT ou à des périodes particulières :


 from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)


Une fois les instantanés définis, tirer parti ydata-profiling est aussi simple que d'appeler le ProfileReport pour chacune des trames de données que nous souhaitons analyser :


 import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)


Dans cet exemple, nous écrivons les images dans des fichiers locaux dans le cadre d'une fonction dans une méthode map. Ceux-ci pourraient être signalés via un outil de messagerie ou nous pourrions les enregistrer dans un stockage distant à l'avenir.


Une fois le profil terminé, le flux de données attend une sortie afin que nous puissions utiliser le StdOutput intégré pour imprimer le périphérique qui a été profilé et l'heure à laquelle il a été profilé qui a été transmise à la fonction de profil dans l'étape de mappage :


 flow.output("out", StdOutput())


Il existe plusieurs façons d'exécuter les flux de données Bytewax. Dans cet exemple, nous utilisons la même machine locale, mais Bytewax peut également s'exécuter sur plusieurs processus Python, sur plusieurs hôtes, dans un Conteneur Docker , utilisant un Grappe Kubernetes , et plus .


Dans cet article, nous continuerons avec une configuration locale, mais nous vous encourageons à consulter notre outil d'aide waxctl qui gère les déploiements de flux de données Kubernetes une fois que votre pipeline est prêt à passer en production.


En supposant que nous soyons dans le même répertoire que le fichier avec la définition du flux de données, nous pouvons l'exécuter en utilisant :


 python -m bytewax.run ydata-profiling-streaming:flow


Nous pouvons ensuite utiliser les rapports de profilage pour valider la qualité des données, vérifier les changements de schémas ou de formats de données et comparer les caractéristiques des données entre différents appareils ou fenêtres temporelles .


En fait, nous pouvons tirer parti de la fonctionnalité de rapport de comparaison qui met en évidence les différences entre deux profils de données de manière simple, ce qui nous permet de détecter plus facilement les modèles importants qui doivent être étudiés ou les problèmes qui doivent être résolus :


 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")


Prêt à explorer vos propres flux de données ?

La validation des flux de données est cruciale pour identifier les problèmes de qualité des données de manière continue et comparer l'état des données sur des périodes distinctes.


Pour les organisations des secteurs de la santé , de l'énergie , de la fabrication et du divertissement , qui travaillent toutes avec des flux continus de données, un profilage automatisé est essentiel pour établir les meilleures pratiques de gouvernance des données , de l'évaluation de la qualité à la confidentialité des données.


Cela nécessite l'analyse d'instantanés de données qui, comme présenté dans cet article, peuvent être réalisées de manière transparente en combinant bytewax et ydata-profiling .


Bytewax prend en charge tous les processus nécessaires pour gérer et structurer les flux de données en instantanés, qui peuvent ensuite être résumés et comparés avec ydata-profiling via un rapport complet des caractéristiques des données.


Être capable de traiter et de profiler de manière appropriée les données entrantes ouvre une pléthore de cas d'utilisation dans différents domaines, de la correction des erreurs dans les schémas et formats de données à la mise en évidence et à l'atténuation des problèmes supplémentaires qui découlent des activités du monde réel, tels que la détection d'anomalies (par exemple, la détection de fraude ou d'intrusion/menaces), le dysfonctionnement de l'équipement et d'autres événements qui s'écartent des attentes (par exemple, les dérives de données ou le désalignement avec les règles métier).


Vous êtes maintenant prêt à commencer à explorer vos flux de données ! Faites-nous savoir quels autres cas d'utilisation vous trouvez, et comme toujours, n'hésitez pas à nous laisser un mot dans les commentaires, ou à nous trouver sur le Communauté d'IA centrée sur les données pour d'autres questions et suggestions ! On se voit là-bas!

Remerciements

Cet article a été écrit par Fabiana Clemente (Co-fondatrice & CDO @ YData ) et Miriam Santos (Relations avec les développeurs @ YData ) -- développement ydata-profiling -- et Zander Matheson (PDG et fondateur @ Bytewax ) et Oli Makhasoeva (Relations avec les développeurs @ Byetwax ) -- développement cire d'octet .


Vous pouvez trouver des informations supplémentaires sur les packages OSS dans les documentations respectives : ydata-profiling docs & docs bytewax .


Également publié ici