すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC データインジェストジョブの開発 (パブリックプレビュー)

最終更新日:Nov 09, 2025

Realtime Compute for Apache Flink は Flink CDC を使用してデータをインジェストします。YAML ジョブを開発して、ソースからシンクにデータを同期できます。このトピックでは、Flink CDC データインジェストジョブの開発方法について説明します。

背景情報

Flink CDC データインジェストは、データ統合に Flink CDC を活用します。YAML 構成を使用して複雑な抽出、変換、書き出し (ETL) プロセスを定義でき、これらは自動的に Flink のコンピューティングロジックに変換されます。この機能は、完全なデータベース同期、単一テーブル同期、シャーディング同期、新規テーブル同期、スキーマ進化、およびカスタム計算列同期をサポートします。また、ETL 処理、WHERE 句フィルタリング、列のトリミング、および計算列もサポートします。これにより、データ統合プロセスが簡素化され、その効率と信頼性が向上します。

Flink CDC の利点

Realtime Compute for Apache Flink では、Flink CDC データインジェストジョブ、SQL ジョブ、または独自の DataStream ジョブを開発してデータを同期できます。以下のセクションでは、他の 2 つの開発方法に対する Flink CDC データインジェストジョブの利点について説明します。

Flink CDC と Flink SQL の比較

Flink CDC データインジェストジョブと SQL ジョブは、データ転送に異なるデータの型を使用します。

  • SQL ジョブは RowData を転送します。Flink CDC ジョブは DataChangeEvent と SchemaChangeEvent を転送します。SQL ジョブの各 RowData には独自の変更タイプがあります。主な 4 つのタイプは、挿入 (+I)、更新前 (-U)、更新後 (+U)、および削除 (-D) です。

  • Flink CDC は SchemaChangeEvent を使用して、テーブルの作成、列の追加、テーブルのクリアなどのスキーマ変更情報を転送します。DataChangeEvent は、挿入、更新、削除などのデータ変更を転送するために使用されます。更新メッセージには、更新前と更新後の両方のコンテンツが含まれています。これにより、元の変更データをシンクに書き込むことができます。

次の表に、SQL ジョブに対する Flink CDC データインジェストジョブの利点を示します。

Flink CDC データインジェスト

Flink SQL

スキーマを自動的に検出し、完全なデータベース同期をサポートします。

CREATE TABLE 文と INSERT 文を手動で記述する必要があります。

複数のスキーマ進化ポリシーをサポートします。

スキーマ進化をサポートしていません。

元の変更ログの同期をサポートします。

元の変更ログの構造を破壊します。

複数のテーブルからの読み取りと書き込みをサポートします。

単一のテーブルから読み取りと書き込みを行います。

CTAS 文または CDAS 文と比較して、Flink CDC ジョブはより強力で、次の機能をサポートします。

  • 先祖テーブルのスキーマ進化はすぐに同期されます。新しいデータ書き込みが同期をトリガーするのを待つ必要はありません。

  • 元の変更ログの同期をサポートします。更新メッセージは分割されません。

  • TRUNCATE TABLE や DROP TABLE など、より多くの種類のスキーマ変更を同期します。

  • テーブルマッピングをサポートして、シンクテーブル名を柔軟に定義します。

  • 柔軟で構成可能なスキーマ進化の動作をサポートします。

  • WHERE 句によるデータフィルタリングをサポートします。

  • フィールドのトリミングをサポートします。

Flink CDC と Flink DataStream の比較

次の表に、DataStream ジョブに対する Flink CDC データインジェストジョブの利点を示します。

Flink CDC データインジェスト

Flink DataStream

専門家だけでなく、あらゆるレベルのユーザー向けに設計されています。

Java と分散システムに精通している必要があります。

基盤となる詳細を隠して開発を簡素化します。

Flink フレームワークに精通している必要があります。

YAML フォーマットは理解しやすく、学習も容易です。

依存関係を管理するために Maven などのツールに関する知識が必要です。

既存のジョブは再利用が容易です。

既存のコードは再利用が困難です。

制限事項

  • Ververica Runtime (VVR) 11.1 を使用して Flink CDC データインジェストジョブを開発できます。VVR 8.x を使用するには、VVR 8.0.11 を使用する必要があります。

  • サポートされているソースとシンクは 1 つだけです。複数のデータソースから読み取るか、複数のシンクに書き込むには、複数の Flink CDC ジョブを作成する必要があります。

  • Flink CDC ジョブをセッションクラスターにデプロイすることはできません。

  • Flink CDC ジョブでは自動チューニングはサポートされていません。

Flink CDC データインジェストコネクタ

次の表に、Flink CDC データインジェストのソースおよびシンクとしてサポートされているコネクタを示します。

説明

関心のあるアップストリームおよびダウンストリームストレージに関するフィードバックは、チケット や DingTalk などのチャネルを通じて提供できます。今後、お客様のニーズによりよく応えるため、これらのストレージオプションをさらにサポートする予定です。

サポートされているコネクタ

コネクタ

サポートされているタイプ

ソース

シンク

MySQL

説明

ApsaraDB RDS for MySQL、PolarDB for MySQL、および自己管理の MySQL に接続します。

×

Paimon

×

Kafka

説明

Ververica Runtime (VVR) 8.0.10 以降が必要です。

Upsert Kafka

×

StarRocks

×

Hologres

×

SLS

説明

VVR 11.1 以降が必要です。

×

MongoDB

説明

VVR 11.2 以降が必要です。

×

MaxCompute

×

説明

VVR 11.1 以降が必要です。

SelectDB

×

説明

VVR 11.1 以降が必要です。

Print

×

Flink CDC データインジェストジョブの作成

同期ジョブテンプレートからのジョブの生成

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. ターゲットワークスペースの [アクション] 列で、[コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで、[データ開発] > [データインジェスト] を選択します。

  4. image をクリックし、[テンプレートから作成] をクリックします。

  5. データ同期テンプレートを選択します。

    MySQL から StarRocks、MySQL から Paimon、および MySQL から Hologres のテンプレートのみがサポートされています。

    image

  6. ジョブ名、ストレージの場所、エンジンバージョンなどのジョブ設定を入力し、[OK] をクリックします。

  7. Flink CDC ジョブのソースとシンクの情報を構成します。

    パラメーターの詳細については、対応するコネクタのドキュメントをご参照ください。

CTAS/CDAS ジョブからのジョブの生成

重要
  • ジョブに複数の CXAS 文が含まれている場合、Flink は最初の文のみを検出して変換します。

  • Flink SQL と Flink CDC でサポートされているビルトイン関数は異なるため、生成された変換ルールはすぐに使用できない場合があります。ルールを手動で確認し、調整する必要があります。

  • ソースが MySQL で、元の CTAS/CDAS ジョブが実行中の場合、元のジョブとの競合を避けるために、Flink CDC データインジェストジョブでソースのサーバー ID を調整する必要があります。

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. ターゲットワークスペースの [アクション] 列にある [コンソール] をクリックします。

  3. 左のナビゲーションウィンドウで、[データ開発] > [データインジェスト] を選択します。

  4. image をクリックし、[既存の CTAS/CDAS ジョブから生成] をクリックします。ターゲットの CTAS または CDAS ジョブを選択し、[OK] をクリックします。

    選択ページでは、システムは有効な CTAS および CDAS ジョブのみを表示します。通常の ETL ジョブや構文エラーのあるジョブドラフトは表示されません。

  5. ジョブ名、ストレージの場所、エンジンバージョンなどのジョブ情報を入力し、[OK] をクリックします。

オープンソースコミュニティからのジョブの移行

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. ターゲットワークスペースについては、[アクション] 列の [コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで、[データ開発] > [データインジェスト] に移動します。

  4. image をクリックし、[データインジェストドラフトの作成] を選択し、[ファイル名][エンジンバージョン] を入力して、[作成] をクリックします。

  5. オープンソースコミュニティから Flink CDC ジョブをコピーします。

  6. (オプション) [ディープチェック] をクリックします。

    構文、ネットワーク接続、およびアクセス権限を確認できます。

Flink CDC データインジェストジョブをゼロから作成する

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. ターゲットワークスペースの [アクション] 列で、[コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで、[データ開発] > [データインジェスト] を選択します。

  4. image をクリックし、[データインジェストドラフトの作成] を選択します。[ファイル名][エンジンバージョン] を入力し、[作成] をクリックします。

  5. Flink CDC ジョブを構成します。

    # 必須
    source:
      # データソースタイプ
      type: <ソースコネクタのタイプに置き換えてください>
      # データソース構成。設定項目の詳細については、対応するコネクタのドキュメントをご参照ください。
      ...
    
    # 必須
    sink:
      # シンクタイプ
      type: <シンクコネクタのタイプに置き換えてください>
      # シンク構成。設定項目の詳細については、対応するコネクタのドキュメントをご参照ください。
      ...
    
    # オプション
    transform:
      # flink_test.customers テーブルの変換ルール
      - source-table: flink_test.customers
        # プロジェクション構成。同期する列を指定し、データ変換を実行します。
        projection: id, username, UPPER(username) as username1, age, (age + 1) as age1, test_col1, __schema_name__ || '.' || __table_name__ identifier_name
        # フィルター条件。id が 10 より大きいデータのみを同期します。
        filter: id > 10
        # 変換ルールを説明するために使用される説明
        description: ソーステーブルに基づいて計算列を追加
    
    # オプション
    route:
      # ルーティングルール。ソーステーブルとシンクテーブル間のマッピングを指定します。
      - source-table: flink_test.customers
        sink-table: db.customers_o
        # ルーティングルールを説明するために使用される説明
        description: customers テーブルを同期
      - source-table: flink_test.customers_suffix
        sink-table: db.customers_s
        # ルーティングルールを説明するために使用される説明
        description: customers_suffix テーブルを同期
    
    #オプション
    pipeline:
      # ジョブ名
      name: MySQL to Hologres Pipeline
    説明

    Flink CDC ジョブでは、キーと値はスペースで区切る必要があります。フォーマットは Key: Value です。

    次の表に、コードブロックについて説明します。

    必須

    コードブロック

    説明

    必須

    source

    データパイプラインの開始点。Flink CDC はデータソースから変更データをキャプチャします。

    説明
    • 現在、データソースとしてサポートされているのは MySQL のみです。設定項目の詳細については、「MySQL」をご参照ください。

    • 変数を使用して機密情報を設定できます。詳細については、「変数管理」をご参照ください。

    sink

    データパイプラインの終点。Flink CDC は、キャプチャしたデータ変更をこれらのシンクシステムに送信します。

    説明
    • 現在サポートされているシンクシステムの詳細については、「Flink CDC データインジェストコネクタ」をご参照ください。シンク設定項目の詳細については、対応するコネクタのドキュメントをご参照ください。

    • 変数を使用して機密情報を設定できます。詳細については、「変数管理」をご参照ください。

    オプション

    pipeline

    pipeline

    パイプライン名など、データパイプラインジョブ全体に対する基本的な構成を定義します。

    transform

    データ変換ルールを指定します。変換は、Flink パイプラインを流れるデータに対して操作を行うプロセスです。ETL 処理、WHERE 句フィルタリング、列のトリミング、および計算列をサポートします。

    Flink CDC によってキャプチャされた元の変更データを特定の下流システムに合わせて変換する必要がある場合は、transform ブロックを使用できます。

    route

    このブロックが構成されていない場合、完全なデータベースまたはターゲットテーブルの同期を示します。

    場合によっては、キャプチャされた変更データを特定のルールに基づいて異なる宛先に送信する必要があります。ルーティングメカニズムを使用すると、アップストリームシステムとダウンストリームシステム間のマッピングを柔軟に指定して、データを異なるシンクに送信できます。

    各ブロックの構文と設定項目の詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。

    次のコードは、MySQL の app_db データベースから Hologres のデータベースにすべてのテーブルを同期する方法の例を示しています。

    source:
      type: mysql
      hostname: <ホスト名>
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: app_db.\.*
      server-id: 5400-5404
    
    sink:
      type: hologres
      name: Hologres Sink
      endpoint: <エンドポイント>
      dbname: <データベース名>
      username: ${secret_values.holousername}
      password: ${secret_values.holopassword}
    
    pipeline:
      name: MySQL データベースを Hologres に同期
  6. (オプション) [ディープチェック] をクリックできます。

    構文、ネットワーク接続、およびアクセス権限を確認できます。

リファレンス