変更データ キャプチャ (CDC) は、データベース操作 (挿入、更新、削除) の行レベルでの変更を追跡し、イベントの順序に従って他のシステムに通知するために使用される手法です。災害復旧シナリオでは、CDC は主にプライマリ データベースとバックアップ データベース間でデータを同期し、プライマリ データベースからセカンダリ データベースへのリアルタイムのデータ同期を可能にします。
source ----------> CDC ----------> sink
SeaTunnel CDC は、次の 2 種類のデータ同期を提供します。
Debezium などの既存の CDC プラットフォームの多くは、履歴データの同期中にテーブルをロックする可能性があるため、ロックフリーのスナップショット同期フェーズが強調されています。スナップショット読み取りは、データベースの履歴データを同期するプロセスです。このプロセスの基本的な流れは次のとおりです。
storage -------------> splitEnumerator ---------- split ----------> reader ^ | | | \----------------- report -----------/
分割パーティション
splitEnumerator
(分割ディストリビューター) は、指定されたフィールド (テーブル ID や一意のキーなど) と定義されたステップ サイズに基づいて、テーブル データを複数の分割に分割します。
並列処理
各分割は、並列読み取りのために異なるリーダーに割り当てられます。 1 つのリーダーが 1 つの接続を占有します。
イベントフィードバック
分割の読み取り操作が完了すると、各リーダーは進行状況をsplitEnumerator
に報告します。分割のメタデータは次のように提供されます。
String splitId # Routing ID TableId tableId # Table ID SeatunnelRowType splitKeyType # The type of field used for partitioning Object splitStart # Start point of the partition Object splitEnd # End point of the partition
リーダーは分割情報を受け取ると、適切な SQL ステートメントを生成します。開始する前に、現在の分割の対応する位置をデータベース ログに記録します。現在の分割が完了すると、リーダーは次のデータを使用して進行状況をsplitEnumerator
に報告します。
String splitId # Split ID Offset highWatermark # Log position corresponding to the split, for future validation
増分同期フェーズは、スナップショット読み取りフェーズの後に開始されます。この段階では、ソース データベースで発生した変更がすべてキャプチャされ、バックアップ データベースにリアルタイムで同期されます。このフェーズでは、データベース ログ (MySQL binlog など) をリッスンします。増分トラッキングは通常、binlog の重複プルを回避し、データベースの負荷を軽減するためにシングル スレッドで行われます。したがって、使用されるリーダーは 1 つだけで、単一の接続を占有します。
data log -------------> splitEnumerator ---------- split ----------> reader ^ | | | \----------------- report -----------/
増分同期フェーズでは、スナップショット フェーズのすべての分割とテーブルが 1 つの分割に結合されます。このフェーズでの分割メタデータは次のとおりです。
String splitId Offset startingOffset # The lowest log start position among all splits Offset endingOffset # Log end position, or "continuous" if ongoing, eg, in the incremental phase List<TableId> tableIds Map<TableId, Offset> tableWatermarks # Watermark for all splits List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos # Snapshot phase split details
CompletedSnapshotSplitInfo
フィールドは次のとおりです。
String splitId TableId tableId SeatunnelRowType splitKeyType Object splitStart Object splitEnd Offset watermark # Corresponds to the highWatermark in the report
増分フェーズの分割には、スナップショット フェーズのすべての分割のウォーターマークが含まれます。最小のウォーターマークが増分同期の開始点として選択されます。
スナップショット読み取りフェーズでも増分読み取りフェーズでも、データベースは同期のために変更される可能性があります。正確に 1 つの配信を保証するにはどうすればよいでしょうか。
たとえば、スナップショットの読み取りフェーズでは、行k3
の挿入、 k2
の更新、 k1
の削除などの変更が発生している間に分割が同期されます。読み取りプロセス中にタスク識別が使用されない場合、更新が失われる可能性があります。SeaTunnel はこれを次のように処理します。
split{start, end}
内のデータを読み取ります。
high = low
場合、分割されたデータは読み取り中に変更されていません。 (high - low) > 0
場合、処理中に変更が発生しています。このような場合、SeaTunnel は次の処理を実行します。
low watermark
からhigh watermark
まで順番に変更を適用します。
insert k3 update k2 delete k1 | | | vvv bin log --|---------------------------------------------------|-- log offset low watermark high watermark CDC reads: k1 k3 k4 | Replays v Real data: k2 k3' k4
増分フェーズを開始する前に、SeaTunnel はまず前のステップからのすべての分割を検証します。分割間ではデータが更新されることがあります。たとえば、split1 と split2 の間に新しいレコードが挿入された場合、スナップショット フェーズでそのレコードが失われる可能性があります。分割間でこのデータを回復するために、SeaTunnel は次のアプローチに従います。
completedSnapshotSplitInfos
をチェックして、データが分割で処理されたかどうかを確認します。処理されていない場合は、分割間のデータと見なされ、修正する必要があります。
|------------filter split2-----------------| |----filter split1------| data log -|-----------------------|------------------|----------------------------------|- log offset min watermark split1 watermark split2 watermark max watermark
CDC を一時停止および再開するにはどうすればよいでしょうか? SeaTunnel は分散スナップショット アルゴリズム (Chandy-Lamport) を使用します。
システムに 2 つのプロセスp1
とp2
があり、 p1
には 3 つの変数X1 Y1 Z1
があり、 p2
には 3 つの変数X2 Y2 Z2
があるとします。初期状態は次のとおりです。
p1 p2 X1:0 X2:4 Y1:0 Y2:2 Z1:0 Z2:3
この時点で、 p1
グローバル スナップショットを開始します。 p1
最初にプロセス状態を記録し、次にp2
にマーカーを送信します。
マーカーがp2
に到達する前に、 p2
メッセージM
をp1
に送信します。
p1 p2 X1:0 -------marker-------> X2:4 Y1:0 <---------M---------- Y2:2 Z1:0 Z2:3
マーカーを受信すると、 p2
状態を記録し、 p1
メッセージM
を受信します。 p1
すでにローカル スナップショットを実行しているため、メッセージM
をログに記録するだけで済みます。最終的なスナップショットは次のようになります。
p1 M p2 X1:0 X2:4 Y1:0 Y2:2 Z1:0 Z2:3
SeaTunnel CDC では、マーカーはすべてのリーダー、分割列挙子、ライター、およびその他のノードに送信され、それぞれがメモリ状態を保持します。