paint-brush
bytewax と ydata-profiling を使用してデータをリアルタイムで理解する方法@ydata
799 測定値
799 測定値

bytewax と ydata-profiling を使用してデータをリアルタイムで理解する方法

YData9m2023/07/25
Read on Terminal Reader

長すぎる; 読むには

データ ストリームでデータ プロファイリングを実行する方法についての素晴らしいステップバイステップのチュートリアル 🚀
featured image - bytewax と ydata-profiling を使用してデータをリアルタイムで理解する方法
YData HackerNoon profile picture

このブログ投稿では、オープンソース ストリーミング ソリューションbytewaxydata-profilingを組み合わせて活用して、ストリーミング フローの品質を向上させる方法について説明します。シートベルトを締める!


ストリーム処理により、処理中および保存前のデータのリアルタイム分析が可能になり、ステートフルまたはステートレスにすることができます。


ステートフル ストリーム処理はリアルタイムの推奨事項、パターン検出、または複雑なイベント処理に使用されます。処理 (ウィンドウ、キーによる結合など) には何が起こったかの履歴が必要です。


ステートレス ストリーム処理は、電子メールのマスクや型の変換など、ストリーム内の他のデータ ポイントの知識を必要としないインライン変換に使用されます。


Unsplash の Markus Spiske による写真


全体として、データ ストリームは業界で広く使用されており、不正行為の検出患者の監視イベントの予測メンテナンスなどのユースケースに適用されていることがわかります。

すべてのデータ ストリームで考慮すべき重要な側面の 1 つはデータの品質です

通常、データ ウェアハウスやダッシュボード ソリューションの作成中にデータ品質が評価される従来のモデルとは異なり、ストリーミング データは継続的な監視が必要です


収集から下流アプリケーションへの供給まで、プロセス全体を通じてデータ品質を維持することが不可欠です。結局のところ、データ品質の低下によるコストは組織にとって高くつく可能性があります。


「不良データのコストは、ほとんどの企業の収益の 15% ~ 25% に上ります。 (…) これらのコストの 3 分の 2 は、データ品質を前面に押し出すことで削減できます。」


— トーマス C. レッドマン、『Getting in Front on Data Quality』の著者


この記事では、 bytewaydata-profiling組み合わせてストリーミング フローの品質をプロファイリングし、改善する方法を説明します。

Bytewax を使用したデータ プロフェッショナル向けのストリーム処理

バイトワックスは、Python 開発者向けに特別に設計された OSS ストリーム処理フレームワークです。


これにより、ユーザーは、Flink、Spark、Kafka Streams と同様の機能を備えたストリーミング データ パイプラインとリアルタイム アプリケーションを構築できると同時に、フレンドリーで使い慣れたインターフェイスとPython エコシステムとの 100% 互換性を提供できます。


組み込みの使用コネクタまたは既存の Python ライブラリを使用して、リアルタイムおよびストリーミング データ ソース (Kafka、RedPanda、WebSocket など) に接続し変換されたデータをさまざまなダウンストリーム システム (Kafka、寄木細工ファイル、データ レイクなど) に書き出すことができます。


変換に関しては、Bytewax は、マップウィンドウ処理集計メソッドを使用してステートフルおよびステートレス変換を容易にし、回復やスケーラビリティなどの使い慣れた機能を備えています。


バイトワックスデータ ストリームに対する Python ファーストかつデータ中心のエクスペリエンスを促進しますデータ エンジニアやデータ サイエンティスト向けに特別に構築されています。


これにより、ユーザーは、Spark や Flink などの JVM ベースのストリーミング プラットフォームを学習および維持することなく、ストリーミング データ パイプラインとリアルタイム アプリケーションを構築し、ニーズを満たすために必要なカスタマイズを作成できます。


Bytewax は多くのユースケースに適しています。生成 AI 用のパイプラインの埋め込みデータ ストリーム内の欠損値の処理ストリーミング コンテキストで言語モデルを使用して金融市場を理解する、 もっと。


ユースケースのインスピレーションや、ドキュメント、チュートリアル、ガイドなどの詳細については、お気軽にチェックしてください。バイトワックスのウェブサイト

データ ストリームのデータ プロファイリングを行う理由

データ プロファイリングは、あらゆる機械学習タスクを正常に開始するための鍵であり、次のステップを指します。 データを徹底的に理解する: その構造、動作、品質。


一言で言えば、 データプロファイリングデータの形式と基本的な記述子 (サンプル数、特徴の数/種類、重複値など) に関連する側面の分析が含まれます。 本質的な特性(欠落データや不均衡な特徴の存在など)、およびデータ収集または処理中に発生する可能性のあるその他の複雑な要因(たとえば、誤った値や一貫性のない特徴)。


高いデータ品質基準を確保することは、すべてのドメインと組織にとって重要ですが、状況が急速に変化する可能性があり、即時対応が必要になる可能性がある (医療モニタリング、株価、大気質ポリシーなど) 連続データを出力するドメインを運用しているドメインに特に関係があります


多くのドメインでは、データベースに保存されている履歴データを考慮した探索的データ分析の観点からデータ プロファイリングが使用されます。逆に、データ ストリームの場合、ストリームに沿った継続的な検証と品質管理にはデータ プロファイリングが不可欠となり、プロセスのさまざまな時間フレームや段階でデータをチェックする必要があります。


自動化されたプロファイリングをデータ フローに組み込むことで、データの現在の状態に関するフィードバックをすぐに取得でき、データの一貫性や整合性(値の破損や形式の変更など) に関連するものであっても、短期間に発生するイベント(データのドリフト、ビジネス ルールや結果からの逸脱など) に関連するものであっても、潜在的に重大な問題について警告を受けることができます。


マーフィーの法則が必ず起こり、「すべてが間違いなくうまくいかない可能性がある」という現実世界の領域では、自動プロファイリングが、複数の脳のパズルやシステムを運用停止にする必要から私たちを救ってくれるかもしれません。


データプロファイリングに関しては、 ydata-profiling一貫して重要な役割を果たしています。群衆のお気に入り、どちらかの場合表形式また時系列データ。それもそのはずです。これは 1 行のコードで広範な分析と洞察を得ることができます。


複雑で時間のかかる操作が内部で実行されます。ydata-profiling は、データに含まれる特徴タイプを自動的に検出し、特徴タイプ (数値またはカテゴリ) に応じて、プロファイリング レポートに表示される概要統計と視覚化を調整します


データ中心の分析を促進するこのパッケージは、ペアごとの相互作用相関関係に焦点を当てて、特徴間の既存の関係も強調し重複または定数値から偏った不均衡な特徴に至るまで、データ品質アラートの徹底的な評価を提供します。


これは、最小限の労力でデータの品質を 360 度見渡すことができます。


プロファイリング レポート: 潜在的なデータ品質の問題を強調します。



すべてをまとめる: bytewax と ydata-profiling

プロジェクトを開始する前に、まず Python の依存関係を設定し、データ ソースを構成する必要があります。


まず、 bytewaxパッケージとydata-profilingパッケージをインストールしましょう (これには仮想環境を使用するとよいでしょう — これらの指示を確認してください追加のガイダンスが必要な場合は!)


 pip install bytewax==0.16.2 ydata-profiling==4.3.1


次に、アップロードします環境センサーテレメトリーデータセット(ライセンス — CC0: パブリック ドメイン)。これには、さまざまな IoT デバイスからの温度、湿度、一酸化炭素、液体石油ガス、煙、光、動きのいくつかの測定値が含まれています。


 wget https://raw.githubusercontent.com/bytewax/air-quality-sensor/main/data/iot_telemetry_data_1000


実稼働環境では、これらの測定値は各デバイスによって継続的に生成され、入力はストリーミング プラットフォームで期待されるものと同じになります。カフカのような。この記事では、ストリーミング データで見つかるコンテキストをシミュレートするために、CSV ファイルから一度に 1 行ずつデータを読み取り、 bytewax を使用してデータフローを作成します。


(補足として、データフローは本質的に、有向非巡回グラフ (DAG) として説明できるデータ パイプラインです。)


まず、必要なインポートをいくつか行います。


 from datetime import datetime, timedelta, timezone from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput from bytewax.connectors.files import CSVInput from bytewax.testing import run_main


次に、データフロー オブジェクトを定義します。その後、ステートレス マップ メソッドを使用して、文字列を DateTime オブジェクトに変換し、データを形式 (device_id、data) に再構築する関数を渡します。


マップ メソッドは、ステートレスな方法で各データ ポイントに変更を加えます。データの形状を変更した理由は、次のステップでデータを簡単にグループ化し、すべてのデバイスのデータを同時にではなく、デバイスごとに個別にプロファイリングできるようにするためです。


 flow = Dataflow() flow.input("simulated_stream", CSVInput("/content/iot_telemetry_data_1000")) # parse timestamp def parse_time(reading_data): reading_data["ts"] = datetime.fromtimestamp(float(reading_data["ts"]), timezone.utc) return reading_data flow.map(parse_time) # remap format to tuple (device_id, reading_data) flow.map(lambda reading_data: (reading_data['device'], reading_data))


次に、 bytewaxのステートフル機能を利用して、定義した期間にわたって各デバイスのデータを収集します。 ydata-profiling時間の経過に伴うデータのスナップショットを期待しているため、ウィンドウ オペレーターはこれを行うのに最適な方法です。


ydata-profilingでは、特定のコンテキストに対して指定されたデータフレームの要約統計を生成できます。たとえば、この例では、各 IoT デバイスまたは特定の時間枠を参照するデータのスナップショットを作成できます。


 from bytewax.window import EventClockConfig, TumblingWindow # This is the accumulator function, and outputs a list of readings def acc_values(acc, reading): acc.append(reading) return acc # This function instructs the event clock on how to retrieve the # event's datetime from the input. def get_time(reading): return reading["ts"] # Configure the `fold_window` operator to use the event time. cc = EventClockConfig(get_time, wait_for_system_duration=timedelta(seconds=30)) # And a tumbling window align_to = datetime(2020, 1, 1, tzinfo=timezone.utc) wc = TumblingWindow(align_to=align_to, length=timedelta(hours=1)) flow.fold_window("running_average", cc, wc, list, acc_values) flow.inspect(print)


スナップショットを定義した後、 ydata-profiling利用するのは、分析したいデータフレームごとにProfileReportを呼び出すのと同じくらい簡単です。


 import pandas as pd from ydata_profiling import ProfileReport def profile(device_id__readings): print(device_id__readings) device_id, readings = device_id__readings start_time = readings[0]['ts'].replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') df = pd.DataFrame(readings) profile = ProfileReport( df, tsmode=True, sortby="ts", title=f"Sensor Readings - device: {device_id}" ) profile.to_file(f"Ts_Profile_{device_id}-{start_time}.html") return f"device {device_id} profiled at hour {start_time}" flow.map(profile)


この例では、map メソッドの関数の一部としてイメージをローカル ファイルに書き込みます。これらはメッセージング ツールを介して報告されるか、将来的にはリモート ストレージに保存される可能性があります。


プロファイルが完了すると、データフローは何らかの出力を期待するため、組み込みStdOutputを使用して、プロファイルされたデバイスと、マップ ステップのプロファイル関数から渡されたプロファイルされた時刻を出力できます。


 flow.output("out", StdOutput())


Bytewax データフローを実行するには複数の方法があります。この例では、同じローカル マシンを使用しますが、Bytewax は、複数のホストにわたる複数の Python プロセス上で実行することもできます。 Dockerコンテナ、 を使ってKubernetes クラスター、 ともっと


この記事ではローカル セットアップを続けますが、ヘルパー ツールを確認することをお勧めします。ワックスctlパイプラインが本番環境に移行する準備ができたら、Kubernetes データフローのデプロイメントを管理します。


データフロー定義が含まれるファイルと同じディレクトリにあると仮定すると、次のコマンドを使用して実行できます。


 python -m bytewax.run ydata-profiling-streaming:flow


その後、プロファイリング レポートを使用して、データ品質を検証し、スキーマまたはデータ形式の変更を確認し、さまざまなデバイスまたは時間枠間のデータ特性を比較できます


実際に、 比較レポート機能これにより、2 つのデータ プロファイル間の違いがわかりやすくなり、調査が必要な重要なパターンや対処する必要がある問題を簡単に検出できるようになります。


 snapshot_a_report = ProfileReport(df_a, title="Snapshot A") snapshot_b_report = ProfileReport(df_b, title="Snapshot B") comparison_report =snapshot_a_report(snapshot_b_report) comparison_report.to_file("comparison_report.html")


独自のデータ ストリームを探索する準備はできましたか?

データ ストリームの検証は、データ品質の問題を継続的に特定し、異なる期間にわたるデータの状態を比較するために重要です。


ヘルスケアエネルギー製造エンターテイメントの組織(すべてが継続的なデータ ストリームを扱う組織)にとって、自動化されたプロファイリングは、品質評価からデータ プライバシーに至るまで、データ ガバナンスのベスト プラクティスを確立するための鍵となります


これには、この記事で紹介されているように、 bytewaxydata-profiling組み合わせることでシームレスな方法で達成できるデータのスナップショットの分析が必要です。


Bytewax は、データ ストリームを処理してスナップショットに構造化するために必要なすべてのプロセスを処理します。その後、データ ストリームを要約して、データ特性の包括的なレポートを通じてydata-profilingと比較できます。


受信データを適切に処理およびプロファイリングできることにより、データ スキーマおよび形式のエラーの修正から、異常検出(例: 詐欺または侵入/脅威の検出)、機器の故障、および期待から逸脱するその他のイベント (例: データ ドリフトまたはビジネス ルールとの不整合) など、現実世界のアクティビティに由来する追加の問題の強調と軽減に至るまで、さまざまなドメインにわたる多数のユース ケースが開かれます。


これで、データ ストリームの探索を開始する準備がすべて整いました。他にどのような使用例を見つけたかをお知らせください。いつものように、お気軽にコメント欄に記入していただくか、次のアドレスまでお問い合わせください。データ中心の AI コミュニティさらに質問や提案がありましたら!それではまたお会いしましょう!

謝辞

この記事は Fabiana Clemente (共同創設者兼 CDO @ ) によって書かれました。 Yデータ) と Miriam Santos (開発者関係 @ Yデータ) -- 開発中ydata-プロファイリング-- およびザンダー・マセソン (CEO 兼創設者 @バイトワックス) と Oli Makhasoeva (開発者関係 @バイトワックス) -- 開発中バイトワックス


OSS パッケージに関する追加情報は、それぞれのドキュメントに記載されています。 ydata-プロファイリングドキュメント&バイトワックスのドキュメント


ここでも公開されています