Збір даних про зміни (CDC) — це техніка, яка використовується для відстеження змін на рівні рядка в операціях з базою даних (вставлення, оновлення, видалення) і сповіщення інших систем у порядку подій. У сценаріях аварійного відновлення CDC в основному синхронізує дані між основною та резервною базою даних, забезпечуючи синхронізацію даних у реальному часі з основної бази даних у вторинну.
source ----------> CDC ----------> sink
SeaTunnel CDC пропонує два типи синхронізації даних:
Особлива увага приділяється фазі синхронізації знімків без блокування, оскільки багато існуючих платформ CDC, наприклад Debezium, можуть блокувати таблиці під час синхронізації історичних даних. Зчитування знімків — це процес синхронізації історичних даних бази даних. Основний хід цього процесу такий:
storage -------------> splitEnumerator ---------- split ----------> reader ^ | | | \----------------- report -----------/
Розділення розділів
splitEnumerator
(розподілювач розподілу) розбиває дані таблиці на кілька розділів на основі заданих полів (таких як ідентифікатор таблиці або унікальних ключів) і визначеного розміру кроку.
Паралельна обробка
Кожен розділ призначається іншому читачеві для паралельного читання. Один зчитувач займатиме одне підключення.
Відгук про подію
Після завершення операції читання для split кожен читач звітує про прогрес назад до splitEnumerator
. Метадані для поділу надаються таким чином:
String splitId # Routing ID TableId tableId # Table ID SeatunnelRowType splitKeyType # The type of field used for partitioning Object splitStart # Start point of the partition Object splitEnd # End point of the partition
Після того як зчитувач отримує інформацію про розділення, він генерує відповідні оператори SQL. Перед початком він реєструє відповідну позицію поточного розділення в журналі бази даних. Після завершення поточного розбиття читач повідомляє про прогрес у splitEnumerator
із такими даними:
String splitId # Split ID Offset highWatermark # Log position corresponding to the split, for future validation
Фаза поступової синхронізації починається після фази читання знімка. На цьому етапі будь-які зміни, що відбуваються у вихідній базі даних, фіксуються та синхронізуються з резервною базою даних у режимі реального часу. Ця фаза прослуховує журнал бази даних (наприклад, MySQL binlog). Інкрементне відстеження зазвичай однопоточне, щоб уникнути дублювання binlog і зменшити навантаження на базу даних. Тому використовується тільки один зчитувач, який займає одне з'єднання.
data log -------------> splitEnumerator ---------- split ----------> reader ^ | | | \----------------- report -----------/
На етапі інкрементної синхронізації всі розділення та таблиці з етапу знімка об’єднуються в один розділ. Розділені метадані на цьому етапі є такими:
String splitId Offset startingOffset # The lowest log start position among all splits Offset endingOffset # Log end position, or "continuous" if ongoing, eg, in the incremental phase List<TableId> tableIds Map<TableId, Offset> tableWatermarks # Watermark for all splits List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos # Snapshot phase split details
Поля CompletedSnapshotSplitInfo
такі:
String splitId TableId tableId SeatunnelRowType splitKeyType Object splitStart Object splitEnd Offset watermark # Corresponds to the highWatermark in the report
Поділ у фазі збільшення містить водяний знак для всіх поділів у фазі знімка. Мінімальний водяний знак вибирається як початкова точка для поступової синхронізації.
База даних також може змінюватися для синхронізації, незалежно від того, чи є у фазі читання знімка чи поступового читання. Як ми гарантуємо рівно одну доставку?
Наприклад, у фазі читання знімка відбувається синхронізація розбиття, коли відбуваються зміни, такі як вставка рядка k3
, оновлення до k2
та видалення k1
. Якщо під час процесу читання не використовується ідентифікація завдання, оновлення можуть бути втрачені. SeaTunnel обробляє це за допомогою:
split{start, end}
.
Якщо high = low
, дані для розділення не змінилися під час читання. Якщо (high - low) > 0
, під час обробки відбулися зміни. У такому випадку SeaTunnel:
low watermark
до high watermark
по порядку, використовуючи первинні ключі для повторного відтворення операцій у таблиці в пам’яті.
insert k3 update k2 delete k1 | | | vvv bin log --|---------------------------------------------------|-- log offset low watermark high watermark CDC reads: k1 k3 k4 | Replays v Real data: k2 k3' k4
Перед початком етапу збільшення SeaTunnel спочатку перевіряє всі розбиття з попереднього кроку. Між розділеннями дані можуть оновлюватися, наприклад, якщо нові записи вставляються між розділами 1 і split2, вони можуть бути пропущені під час фази знімка. Щоб відновити ці дані між розділеннями, SeaTunnel дотримується такого підходу:
completedSnapshotSplitInfos
, щоб побачити, чи дані були оброблені в будь-якому розділенні. Якщо ні, це вважається даними між розділеннями та має бути виправлено.
|------------filter split2-----------------| |----filter split1------| data log -|-----------------------|------------------|----------------------------------|- log offset min watermark split1 watermark split2 watermark max watermark
Як щодо призупинення та відновлення CDC? SeaTunnel використовує розподілений алгоритм моментального знімка (Chandy-Lamport):
Припустимо, що система має два процеси, p1
і p2
, де p1
має три змінні X1 Y1 Z1
, а p2
має три змінні X2 Y2 Z2
. Початкові стани такі:
p1 p2 X1:0 X2:4 Y1:0 Y2:2 Z1:0 Z2:3
У цей момент p1
ініціює глобальний знімок. p1
спочатку записує свій стан процесу, а потім надсилає маркер до p2
.
Перш ніж маркер досягне p2
, p2
надсилає повідомлення M
до p1
.
p1 p2 X1:0 -------marker-------> X2:4 Y1:0 <---------M---------- Y2:2 Z1:0 Z2:3
Після отримання маркера p2
записує його стан, а p1
отримує повідомлення M
. Оскільки p1
вже зробив локальний знімок, йому потрібно лише зареєструвати повідомлення M
. Остаточний знімок виглядає так:
p1 M p2 X1:0 X2:4 Y1:0 Y2:2 Z1:0 Z2:3
У SeaTunnel CDC маркери надсилаються всім читачам, розділеним нумераторам, записувачам та іншим вузлам, кожен з яких зберігає свій стан пам’яті.