Realtime Compute for Apache Flink は Flink CDC を使用してデータをインジェストします。YAML ジョブを開発して、ソースからシンクにデータを同期できます。このトピックでは、Flink CDC データインジェストジョブの開発方法について説明します。
背景情報
Flink CDC データインジェストは、データ統合に Flink CDC を活用します。YAML 構成を使用して複雑な抽出、変換、書き出し (ETL) プロセスを定義でき、これらは自動的に Flink のコンピューティングロジックに変換されます。この機能は、完全なデータベース同期、単一テーブル同期、シャーディング同期、新規テーブル同期、スキーマ進化、およびカスタム計算列同期をサポートします。また、ETL 処理、WHERE 句フィルタリング、列のトリミング、および計算列もサポートします。これにより、データ統合プロセスが簡素化され、その効率と信頼性が向上します。
Flink CDC の利点
Realtime Compute for Apache Flink では、Flink CDC データインジェストジョブ、SQL ジョブ、または独自の DataStream ジョブを開発してデータを同期できます。以下のセクションでは、他の 2 つの開発方法に対する Flink CDC データインジェストジョブの利点について説明します。
Flink CDC と Flink SQL の比較
Flink CDC データインジェストジョブと SQL ジョブは、データ転送に異なるデータの型を使用します。
SQL ジョブは RowData を転送します。Flink CDC ジョブは DataChangeEvent と SchemaChangeEvent を転送します。SQL ジョブの各 RowData には独自の変更タイプがあります。主な 4 つのタイプは、挿入 (+I)、更新前 (-U)、更新後 (+U)、および削除 (-D) です。
Flink CDC は SchemaChangeEvent を使用して、テーブルの作成、列の追加、テーブルのクリアなどのスキーマ変更情報を転送します。DataChangeEvent は、挿入、更新、削除などのデータ変更を転送するために使用されます。更新メッセージには、更新前と更新後の両方のコンテンツが含まれています。これにより、元の変更データをシンクに書き込むことができます。
次の表に、SQL ジョブに対する Flink CDC データインジェストジョブの利点を示します。
Flink CDC データインジェスト | Flink SQL |
スキーマを自動的に検出し、完全なデータベース同期をサポートします。 | CREATE TABLE 文と INSERT 文を手動で記述する必要があります。 |
複数のスキーマ進化ポリシーをサポートします。 | スキーマ進化をサポートしていません。 |
元の変更ログの同期をサポートします。 | 元の変更ログの構造を破壊します。 |
複数のテーブルからの読み取りと書き込みをサポートします。 | 単一のテーブルから読み取りと書き込みを行います。 |
CTAS 文または CDAS 文と比較して、Flink CDC ジョブはより強力で、次の機能をサポートします。
先祖テーブルのスキーマ進化はすぐに同期されます。新しいデータ書き込みが同期をトリガーするのを待つ必要はありません。
元の変更ログの同期をサポートします。更新メッセージは分割されません。
TRUNCATE TABLE や DROP TABLE など、より多くの種類のスキーマ変更を同期します。
テーブルマッピングをサポートして、シンクテーブル名を柔軟に定義します。
柔軟で構成可能なスキーマ進化の動作をサポートします。
WHERE 句によるデータフィルタリングをサポートします。
フィールドのトリミングをサポートします。
Flink CDC と Flink DataStream の比較
次の表に、DataStream ジョブに対する Flink CDC データインジェストジョブの利点を示します。
Flink CDC データインジェスト | Flink DataStream |
専門家だけでなく、あらゆるレベルのユーザー向けに設計されています。 | Java と分散システムに精通している必要があります。 |
基盤となる詳細を隠して開発を簡素化します。 | Flink フレームワークに精通している必要があります。 |
YAML フォーマットは理解しやすく、学習も容易です。 | 依存関係を管理するために Maven などのツールに関する知識が必要です。 |
既存のジョブは再利用が容易です。 | 既存のコードは再利用が困難です。 |
制限事項
Ververica Runtime (VVR) 11.1 を使用して Flink CDC データインジェストジョブを開発できます。VVR 8.x を使用するには、VVR 8.0.11 を使用する必要があります。
サポートされているソースとシンクは 1 つだけです。複数のデータソースから読み取るか、複数のシンクに書き込むには、複数の Flink CDC ジョブを作成する必要があります。
Flink CDC ジョブをセッションクラスターにデプロイすることはできません。
Flink CDC ジョブでは自動チューニングはサポートされていません。
Flink CDC データインジェストコネクタ
次の表に、Flink CDC データインジェストのソースおよびシンクとしてサポートされているコネクタを示します。
関心のあるアップストリームおよびダウンストリームストレージに関するフィードバックは、チケット や DingTalk などのチャネルを通じて提供できます。今後、お客様のニーズによりよく応えるため、これらのストレージオプションをさらにサポートする予定です。
サポートされているコネクタ
コネクタ | サポートされているタイプ | |
ソース | シンク | |
説明 ApsaraDB RDS for MySQL、PolarDB for MySQL、および自己管理の MySQL に接続します。 | √ | × |
× | √ | |
√ 説明 Ververica Runtime (VVR) 8.0.10 以降が必要です。 | √ | |
× | √ | |
× | √ | |
× | √ | |
√ 説明 VVR 11.1 以降が必要です。 | × | |
√ 説明 VVR 11.2 以降が必要です。 | × | |
× | √ 説明 VVR 11.1 以降が必要です。 | |
× | √ 説明 VVR 11.1 以降が必要です。 | |
× | √ | |
Flink CDC データインジェストジョブの作成
同期ジョブテンプレートからのジョブの生成
ターゲットワークスペースの [アクション] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
をクリックし、[テンプレートから作成] をクリックします。データ同期テンプレートを選択します。
MySQL から StarRocks、MySQL から Paimon、および MySQL から Hologres のテンプレートのみがサポートされています。

ジョブ名、ストレージの場所、エンジンバージョンなどのジョブ設定を入力し、[OK] をクリックします。
Flink CDC ジョブのソースとシンクの情報を構成します。
パラメーターの詳細については、対応するコネクタのドキュメントをご参照ください。
CTAS/CDAS ジョブからのジョブの生成
ジョブに複数の CXAS 文が含まれている場合、Flink は最初の文のみを検出して変換します。
Flink SQL と Flink CDC でサポートされているビルトイン関数は異なるため、生成された変換ルールはすぐに使用できない場合があります。ルールを手動で確認し、調整する必要があります。
ソースが MySQL で、元の CTAS/CDAS ジョブが実行中の場合、元のジョブとの競合を避けるために、Flink CDC データインジェストジョブでソースのサーバー ID を調整する必要があります。
ターゲットワークスペースの [アクション] 列にある [コンソール] をクリックします。
左のナビゲーションウィンドウで、 を選択します。
をクリックし、[既存の CTAS/CDAS ジョブから生成] をクリックします。ターゲットの CTAS または CDAS ジョブを選択し、[OK] をクリックします。選択ページでは、システムは有効な CTAS および CDAS ジョブのみを表示します。通常の ETL ジョブや構文エラーのあるジョブドラフトは表示されません。
ジョブ名、ストレージの場所、エンジンバージョンなどのジョブ情報を入力し、[OK] をクリックします。
オープンソースコミュニティからのジョブの移行
ターゲットワークスペースについては、[アクション] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで、 に移動します。
をクリックし、[データインジェストドラフトの作成] を選択し、[ファイル名] と [エンジンバージョン] を入力して、[作成] をクリックします。オープンソースコミュニティから Flink CDC ジョブをコピーします。
(オプション) [ディープチェック] をクリックします。
構文、ネットワーク接続、およびアクセス権限を確認できます。
Flink CDC データインジェストジョブをゼロから作成する
ターゲットワークスペースの [アクション] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで、 を選択します。
をクリックし、[データインジェストドラフトの作成] を選択します。[ファイル名] と [エンジンバージョン] を入力し、[作成] をクリックします。Flink CDC ジョブを構成します。
# 必須 source: # データソースタイプ type: <ソースコネクタのタイプに置き換えてください> # データソース構成。設定項目の詳細については、対応するコネクタのドキュメントをご参照ください。 ... # 必須 sink: # シンクタイプ type: <シンクコネクタのタイプに置き換えてください> # シンク構成。設定項目の詳細については、対応するコネクタのドキュメントをご参照ください。 ... # オプション transform: # flink_test.customers テーブルの変換ルール - source-table: flink_test.customers # プロジェクション構成。同期する列を指定し、データ変換を実行します。 projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name # フィルター条件。id が 10 より大きいデータのみを同期します。 filter: id > 10 # 変換ルールを説明するために使用される説明 description: ソーステーブルに基づいて計算列を追加 # オプション route: # ルーティングルール。ソーステーブルとシンクテーブル間のマッピングを指定します。 - source-table: flink_test.customers sink-table: db.customers_o # ルーティングルールを説明するために使用される説明 description: customers テーブルを同期 - source-table: flink_test.customers_suffix sink-table: db.customers_s # ルーティングルールを説明するために使用される説明 description: customers_suffix テーブルを同期 #オプション pipeline: # ジョブ名 name: MySQL to Hologres Pipeline説明Flink CDC ジョブでは、キーと値はスペースで区切る必要があります。フォーマットは
Key: Valueです。次の表に、コードブロックについて説明します。
必須
コードブロック
説明
必須
source
データパイプラインの開始点。Flink CDC はデータソースから変更データをキャプチャします。
sink
データパイプラインの終点。Flink CDC は、キャプチャしたデータ変更をこれらのシンクシステムに送信します。
説明現在サポートされているシンクシステムの詳細については、「Flink CDC データインジェストコネクタ」をご参照ください。シンク設定項目の詳細については、対応するコネクタのドキュメントをご参照ください。
変数を使用して機密情報を設定できます。詳細については、「変数管理」をご参照ください。
オプション
pipeline
pipeline
パイプライン名など、データパイプラインジョブ全体に対する基本的な構成を定義します。
transform
データ変換ルールを指定します。変換は、Flink パイプラインを流れるデータに対して操作を行うプロセスです。ETL 処理、WHERE 句フィルタリング、列のトリミング、および計算列をサポートします。
Flink CDC によってキャプチャされた元の変更データを特定の下流システムに合わせて変換する必要がある場合は、transform ブロックを使用できます。
route
このブロックが構成されていない場合、完全なデータベースまたはターゲットテーブルの同期を示します。
場合によっては、キャプチャされた変更データを特定のルールに基づいて異なる宛先に送信する必要があります。ルーティングメカニズムを使用すると、アップストリームシステムとダウンストリームシステム間のマッピングを柔軟に指定して、データを異なるシンクに送信できます。
各ブロックの構文と設定項目の詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。
次のコードは、MySQL の app_db データベースから Hologres のデータベースにすべてのテーブルを同期する方法の例を示しています。
source: type: mysql hostname: <ホスト名> port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: app_db.\.* server-id: 5400-5404 sink: type: hologres name: Hologres Sink endpoint: <エンドポイント> dbname: <データベース名> username: ${secret_values.holousername} password: ${secret_values.holopassword} pipeline: name: MySQL データベースを Hologres に同期(オプション) [ディープチェック] をクリックできます。
構文、ネットワーク接続、およびアクセス権限を確認できます。
リファレンス
Flink CDC ジョブを開発した後、それをデプロイして公開する必要があります。詳細については、「ジョブのデプロイ」をご参照ください。
MySQL データベースから StarRocks にデータを同期する Flink CDC ジョブを迅速に構築するには、「Flink CDC データインジェストジョブのクイックスタート」をご参照ください。