Realtime Compute for Apache Flink は Flink CDC を利用して、ソースからシンクへデータを同期する YAML ジョブを開発できます。このトピックでは、Flink CDC データインジェストジョブの開発方法について説明します。
背景情報
Flink CDC データインジェストは、Flink CDC を使用してデータ統合を行います。YAML 設定を使用することで、複雑な ETL (抽出、変換、ロード) パイプラインを簡単に定義でき、これらは自動的に Flink のコンピューティングロジックに変換されます。この機能は、データベース全体の同期、単一テーブルの同期、分割されたデータベースとテーブルの同期、新しく追加されたテーブルの同期、スキーマ進化、カスタム計算列の同期を効率的にサポートします。また、ETL 処理、WHERE 句フィルタリング、カラムプルーニングもサポートしています。これにより、データ統合プロセスが大幅に簡素化され、データ統合の効率と信頼性が向上します。
Flink CDC の利点
Realtime Compute for Apache Flink では、Flink CDC データインジェストジョブ、SQL ジョブ、または DataStream ジョブを開発してデータを同期できます。以下のセクションでは、他の 2 種類のジョブに対する Flink CDC データインジェストジョブの利点について説明します。
Flink CDC と Flink SQL の比較
Flink CDC データインジェストジョブと SQL ジョブは、異なるデータ型を使用してデータを転送します:
SQL ジョブは RowData を転送しますが、Flink CDC ジョブは DataChangeEvent と SchemaChangeEvent を転送します。SQL ジョブの各 RowData には独自の変更タイプがあります。主な 4 つのタイプは、insert (+I)、update_before (-U)、update_after (+U)、delete (-D) です。
Flink CDC は SchemaChangeEvent を使用して、テーブル作成、列追加、テーブル切り捨てなどのスキーマ変更情報を転送します。DataChangeEvent は、挿入、更新、削除などのデータ変更を転送するために使用されます。更新メッセージには更新前後の内容が含まれます。これにより、生の変更データをシンクに書き込むことができます。
次の表に、SQL ジョブに対する Flink CDC データインジェストジョブの利点を示します。
Flink CDC データインジェスト | Flink SQL |
スキーマを自動的に検出し、データベース全体の同期をサポートします。 | CREATE TABLE 文と INSERT 文を手動で記述する必要があります。 |
複数のスキーマ進化ポリシーをサポートします。 | スキーマ進化をサポートしません。 |
生の変更ログの同期をサポートします。 | 生の変更ログの構造を変更します。 |
複数テーブルからのデータの読み取りと複数テーブルへのデータの書き込みをサポートします。 | 単一テーブルからデータを読み取り、単一テーブルにデータを書き込みます。 |
CTAS 文や CDAS 文と比較して、Flink CDC ジョブはより強力な機能を提供します。Flink CDC ジョブは、次の機能をサポートしています:
新しいデータが書き込まれるのを待たずに、先祖テーブルのスキーマ進化を即座に同期します。
生の変更ログの同期をサポートします。更新メッセージは分割されません。
TRUNCATE TABLE や DROP TABLE など、より多くの種類のスキーマ進化を同期します。
テーブルマッピングをサポートし、シンクテーブル名を柔軟に定義できます。
柔軟で設定可能なスキーマ進化の動作をサポートします。
WHERE 句を使用したデータフィルタリングをサポートします。
カラムプルーニングをサポートします。
Flink CDC と Flink DataStream の比較
次の表に、DataStream ジョブに対する Flink CDC データインジェストジョブの利点を示します。
Flink CDC データインジェスト | Flink DataStream |
専門家だけでなく、あらゆるレベルのユーザー向けに設計されています。 | Java と分散システムに精通している必要があります。 |
基盤となる詳細を隠蔽し、開発を簡素化します。 | Flink フレームワークに精通している必要があります。 |
理解しやすく学習しやすい YAML フォーマットを使用します。 | 依存関係を管理するために Maven などのツールに関する知識が必要です。 |
既存のジョブを簡単に再利用できます。 | 既存のコードを再利用することが困難です。 |
制限事項
Flink CDC データインジェストジョブを開発するには、Ververica Runtime (VVR) 11.1 を使用する必要があります。VVR 8.x を使用する場合は、VVR 8.0.11 を使用する必要があります。
1 つのソースから 1 つのシンクにのみデータを同期できます。複数のデータソースからデータを読み取るか、複数のシンクにデータを書き込むには、複数の Flink CDC ジョブを作成する必要があります。
Flink CDC ジョブをセッションクラスターにデプロイすることはできません。
Flink CDC ジョブでは自動チューニングはサポートされていません。
Flink CDC データインジェストコネクタ
次の表に、Flink CDC データインジェストジョブのソースおよびシンクとしてサポートされているコネクタを示します。
ご興味のあるアップストリームおよびダウンストリームのストレージに関するフィードバックを、チケット や DingTalk などのチャンネルを通じてお寄せいただけます。今後、お客様のニーズによりよくお応えするため、より多くのストレージオプションをサポートする予定です。
サポートされているコネクタ
コネクタ | サポートタイプ | |
ソース | シンク | |
説明 ApsaraDB RDS for MySQL、PolarDB for MySQL、および自己管理型 MySQL に接続します。 | √ | × |
× | √ | |
√ 説明 Ververica Runtime (VVR) 8.0.10 以降が必要です。 | √ | |
× | √ | |
× | √ | |
× | √ | |
√ 説明 Ververica Runtime (VVR) 11.1 以降が必要です。 | × | |
√ 説明 Ververica Runtime (VVR) 11.2 以降が必要です。 | × | |
× | √ 説明 Ververica Runtime (VVR) 11.1 以降が必要です。 | |
× | √ 説明 Ververica Runtime (VVR) 11.1 以降が必要です。 | |
√ 説明 Ververica Runtime (VVR) 11.4 以降が必要です。 | × | |
× | √ | |
既存のカタログからの接続情報の再利用
Ververica Runtime (VVR) 11.5 以降、Flink CDC データインジェストジョブで既存のカタログの接続情報を再利用できます。[データ管理] ページで作成された組み込みカタログを参照して、URL、ユーザー名、パスワードなどの接続プロパティを自動的に取得できます。これにより、手動での設定作業が削減されます。
構文
source:
type: mysql
using.built-in-catalog: mysql_rds_catalog
sink:
type: paimon
using.built-in-catalog: paimon_dlf_catalogsource および sink モジュールで、using.built-in-catalog 構文を使用して組み込みカタログを参照します。
たとえば、上記のコードでは、mysql_rds_catalog カタログのメタデータには、hostname、username、password などの必須パラメーターがすでに含まれています。これらのパラメーターを YAML ジョブで再度指定する必要はありません。
制限事項
次のコネクタは、カタログからの接続情報の再利用をサポートしています:
MySQL (ソース)
Kafka (ソース)
Upsert Kafka (シンク)
StarRocks (シンク)
Hologres (シンク)
Paimon (シンク)
SLS (ソース)
CDC YAML 構文と互換性のないカタログパラメーターは有効になりません。詳細については、各コネクタのパラメーターリストをご参照ください。
Flink CDC データインジェストジョブの作成
同期ジョブテンプレートからの生成
対象のワークスペースの[アクション]列にある[コンソール]をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
をクリックして、[テンプレートから作成] をクリックします。データ同期テンプレートを選択します。
現在、MySQL から StarRocks、MySQL から Paimon、および MySQL から Hologres へのテンプレートのみがサポートされています。

ジョブ名、ストレージの場所、およびエンジンバージョンを入力し、[OK] をクリックします。
Flink CDC ジョブのソースとシンクの情報を設定します。
パラメーター設定の詳細については、対応するコネクタのドキュメントをご参照ください。
CTAS/CDAS ジョブからの生成
ジョブに複数の CXAS 文が含まれている場合、Flink は最初の文のみを検出して変換します。
Flink SQL と Flink CDC の間でビルトイン関数のサポートに違いがあるため、生成された変換ルールはすぐに使用できない場合があります。ルールを手動で確認し、調整する必要があります。
ソースが MySQL で、元の CTAS/CDAS ジョブが実行中の場合、元のジョブとの競合を避けるために、Flink CDC データインジェストジョブのソースのサーバー ID を調整する必要があります。
対象のワークスペースの [アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
をクリックし、[CTAS/CDAS から新規ドラフトを作成] をクリックし、対象の CTAS または CDAS ジョブを選択して、[OK] をクリックします。選択ページには、有効な CTAS および CDAS ジョブのみが表示されます。通常の ETL ジョブや構文エラーのあるジョブの下書きは表示されません。
ジョブ名、ストレージの場所、エンジンバージョンを入力し、[OK] をクリックします。
オープンソースコミュニティからのジョブの移行
対象のワークスペースの[アクション]列にある[コンソール]をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
をクリックし、[新規データ取り込みドラフト] を選択します。[ファイル名] と [エンジンバージョン] を設定し、[作成] をクリックします。オープンソースコミュニティから Flink CDC ジョブをコピーします。
(任意) [深さチェック] をクリックします。
構文、ネットワーク接続性、アクセス権限を確認できます。
Flink CDC データインジェストジョブのゼロからの作成
対象のワークスペースの [アクション] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
をクリックし、[新規データ取り込みドラフト] を選択し、[ファイル名] と [エンジンバージョン] を入力し、[作成] をクリックします。Flink CDC ジョブを設定します。
# 必須 source: # データソースタイプ type: <ソースコネクタのタイプに置き換えてください> # データソースの設定。設定項目については、対応するコネクタのドキュメントをご参照ください。 ... # 必須 sink: # 宛先タイプ type: <宛先コネクタのタイプに置き換えてください> # 宛先の設定。設定項目については、対応するコネクタのドキュメントをご参照ください。 ... # オプション transform: # flink_test.customers テーブルの変換ルール - source-table: flink_test.customers # プロジェクション設定。同期する列を指定し、データ変換を実行します。 projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # フィルター条件。id が 10 より大きいデータのみを同期します。 filter: id > 10 # 変換ルールの説明 description: append calculated columns based on source table # オプション route: # ソーステーブルと宛先テーブル間のマッピングを指定するルーティングルール。 - source-table: flink_test.customers sink-table: db.customers_o # ルーティングルールの説明 description: sync customers table - source-table: flink_test.customers_suffix sink-table: db.customers_s # ルーティングルールの説明 description: sync customers_suffix table #オプション pipeline: # ジョブ名 name: MySQL to Hologres Pipeline説明Flink CDC ジョブでは、キーと値はスペースで区切る必要があります。フォーマットは
キー: 値です。コードブロックは次のように説明されます。
必須
コードモジュール
説明
必須
source (データソース)
データパイプラインの開始点。Flink CDC はデータソースから変更データをキャプチャします。
シンク (宛先)
データパイプラインの終点。Flink CDC はキャプチャしたデータ変更をこれらのシンクシステムに転送します。
説明サポートされているシンクシステムについては、「Flink CDC データインジェストコネクタ」をご参照ください。シンクの設定項目については、対応するコネクタのドキュメントをご参照ください。
機密情報の設定には変数を使用できます。詳細については、「変数管理」をご参照ください。
オプション
pipeline
(データパイプライン)
パイプライン名など、データパイプラインジョブ全体に関する基本設定を定義します。
transform (データ変換)
データ変換ルールを指定します。変換とは、Flink パイプラインを流れるデータに対する操作です。このモジュールは、ETL 処理、WHERE 句フィルタリング、カラムプルーニング、および計算列をサポートします。
Flink CDC によってキャプチャされた生の変更データを、特定のダウンストリームシステムに適応させるために変換する必要がある場合に使用できます。
route (ルーティング)
このモジュールが設定されていない場合、データベース全体または対象テーブルの同期を示します。
場合によっては、キャプチャされた変更データを特定のルールに基づいて異なる宛先に送信する必要があります。ルーティングメカニズムを使用すると、アップストリームシステムとダウンストリームシステム間のマッピングを柔軟に指定し、データを異なるデータシンクに送信できます。
各モジュールの構文構造と設定項目の詳細については、「Flink CDC データインジェストジョブの開発リファレンス」をご参照ください。
次のコードは、MySQL の app_db データベースからすべてのテーブルを Hologres のデータベースに同期する方法の例です。
source: type: mysql hostname: <hostname> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 # (オプション) 増分フェーズで新しく作成されたテーブルからデータを同期します。 scan.binlog.newly-added-table.enabled: true # (オプション) テーブルとフィールドのコメントを同期します。 include-comments.enabled: true # (オプション) 無制限チャンクを優先的にディスパッチして、TaskManager の OutOfMemory の問題を回避します。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (オプション) 読み取りを高速化するために解析フィルターを有効にします。 scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: hologres name: Hologres Sink endpoint: <endpoint> dbname: <database-name> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: Sync MySQL Database to Hologres(任意) [深度チェック] をクリックします。
構文、ネットワーク接続性、アクセス権限を確認できます。
参考資料
Flink CDC ジョブを開発した後、それをデプロイする必要があります。詳細については、「ジョブのデプロイ」をご参照ください。
MySQL データベースから StarRocks にデータを同期する Flink CDC ジョブを迅速に構築するには、「Flink CDC データインジェストジョブのクイックスタート」をご参照ください。