すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC を使用して MySQL データベース全体を Kafka に同期する

最終更新日:Nov 09, 2025

このトピックでは、MySQL データベース全体を Kafka に同期して、複数のタスクが MySQL データベースにかける負荷を軽減する方法について説明します。

背景情報

MySQL Change Data Capture (CDC) は、MySQL からデータを取得し、テーブルの変更をリアルタイムで同期するために使用されます。これは、ディメンションテーブルを他のデータテーブルと結合するなどの複雑なコンピューティングシナリオでよく使用されます。実際には、複数のジョブが同じ MySQL テーブルに依存する場合があります。複数のタスクが同じ MySQL テーブルを処理すると、MySQL データベースは複数の接続を開始する必要があります。これにより、MySQL サーバーとネットワークに大きな負荷がかかります。

アーキテクチャ

アップストリームの MySQL データベースへの負荷を軽減するために、Realtime Compute for Apache Flink は、MySQL データベース全体を Kafka に同期する機能を提供します。このソリューションでは、Kafka を中間レイヤーとして導入し、Flink CDC データインジェストジョブを使用してデータを Kafka に同期します。

単一のジョブが、アップストリームの MySQL データベースから Kafka にデータをリアルタイムで同期します。各 MySQL テーブルは、対応する Kafka Topic に upsert モードで書き込まれます。その後、Upsert Kafka コネクタは、MySQL テーブルにアクセスする代わりに Topic からデータを読み取ることができます。このメソッドは、複数のタスクが MySQL データベースにかける負荷を効果的に軽減します。

图片 1

制限事項

  • 同期する MySQL テーブルにはプライマリキーが必要です。

  • セルフマネージド Kafka クラスター、EMR Kafka クラスター、または ApsaraMQ for Kafka を使用できます。ApsaraMQ for Kafka を使用する場合、デフォルトのエンドポイント のみを使用できます。

  • Kafka クラスターのストレージ領域は、ソーステーブルデータのストレージ領域よりも大きくする必要があります。そうしないと、ストレージ領域の不足によりデータが失われる可能性があります。これは、完全なデータベース同期のために作成された Topic が compacted Topic であるためです。compacted Topic では、各メッセージキーの最新のメッセージのみが保持されますが、データは期限切れになりません。compacted Topic は、ソーステーブルと同等のサイズのデータを格納します。

シナリオ

たとえば、注文コメントのリアルタイム分析シナリオでは、ユーザーテーブル、注文テーブル、フィードバックテーブルの 3 つのテーブルがあるとします。各テーブルのデータを次の図に示します。mysql database

ユーザーの注文情報とユーザーのコメントを表示するには、ユーザーテーブルを結合してユーザー名 (name フィールド) を取得する必要があります。次の SQL 文は例です。

-- 注文テーブルとユーザーテーブルを結合して、各注文のユーザー名と製品名を表示します。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- フィードバックテーブルとユーザーテーブルを結合して、コメントの内容と対応するユーザー名を表示します。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

上記の 2 つの SQL タスクでは、両方のジョブでユーザーテーブルが使用されます。実行時に、両方のジョブは MySQL から完全データと増分データを読み取ります。完全データの読み取りには MySQL 接続が必要であり、増分データの読み取りにはバイナリログ (Binlog) クライアントが必要です。ジョブの数が増えるにつれて、MySQL 接続と Binlog クライアントの数も増加します。これにより、アップストリームのデータベースに大きな負荷がかかります。この負荷を軽減するために、CDAS または CTAS 構文を使用して、アップストリームの MySQL データを Kafka にリアルタイムで同期し、複数のダウンストリームジョブで消費できます。

前提条件

準備

ApsaraDB RDS for MySQL インスタンスを作成し、データソースを準備する

  1. ApsaraDB RDS for MySQL データベースを作成します。詳細については、「データベースの作成」をご参照ください。

    宛先インスタンス用に order_dw という名前のデータベースを作成します。

  2. MySQL CDC データソースを準備します。

    1. インスタンス詳細ページで、ページ上部の [データベースにログイン] をクリックします。

    2. DMS ログインダイアログボックスで、作成したデータベースアカウントのユーザー名とパスワードを入力し、[ログイン] をクリックします。

    3. ログイン後、左側の order_dw データベースをダブルクリックして、そのデータベースに切り替えます。

    4. SQL コンソールセクションで、3 つのビジネス テーブルを作成するための DDL 文と、データを挿入するための文を記述します。

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- データを準備
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. [実行] をクリックし、次に [直接実行] をクリックします。

手順

  1. Flink CDC データインジェストタスクを作成して開始し、アップストリームの MySQL データを Kafka にリアルタイムで同期します。これにより、複数のダウンストリームジョブがデータを消費できます。完全なデータベース同期ジョブは自動的に Topic を作成します。route モジュールを使用して Topic 名を定義できます。Topic のパーティションとレプリカの数は Kafka クラスターのデフォルト構成を使用し、cleanup.policy は compact に設定されます。

    デフォルトの Topic 名

    デフォルトでは、完全なデータベース同期タスクによって作成される Kafka Topic の名前は `database_name.table_name` 形式です。次のジョブは、`order_dw.user`、`order_dw.order`、`order_dw.feedback` の 3 つの Topic を作成します。

    1. [データ開発] > [データインジェスト] ページで、Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # The following parameters are required for ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. ナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象のジョブを見つけ、[アクション] 列の [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

    Topic 名の指定

    完全なデータベース同期タスクでは、route モジュールを使用して各テーブルの Topic 名を指定できます。次のジョブは、`user1`、`order2`、`feedback3` の 3 つの Topic を作成します。

    1. [データ開発] > [データインジェスト] ページで、Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.user
          sink-table: user1
        - source-table: order_dw.order
          sink-table: order2
        - source-table: order_dw.feedback
          sink-table: feedback3
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # The following parameters are required for ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. ナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象のジョブを見つけ、[アクション] 列の [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

    Topic 名のバッチ設定

    完全なデータベース同期タスクでは、route モジュールを使用してパターンを指定し、Topic 名をバッチで生成できます。次のジョブは、`topic_user`、`topic_order`、`topic_feedback` の 3 つの Topic を作成します。

    1. [データ開発] > [データインジェスト] ページで、Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        
      route:
        - source-table: order_dw.\.*
          sink-table: topic_<>
          replace-symbol: <>
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # The following parameters are required for ApsaraMQ for Kafka
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. ナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象のジョブを見つけ、[アクション] 列の [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

  1. Kafka データをリアルタイムで消費します。

    アップストリームの MySQL データベースからのデータは、JSON 形式で Kafka に書き込まれます。単一の Kafka Topic は、複数のダウンストリームジョブによって消費できます。ダウンストリームジョブは、Topic からデータを消費して、データベーステーブルから最新のデータを取得します。Kafka に同期されたテーブルからデータを消費するには、次の 2 つの方法のいずれかを使用します。

    カタログを介してデータを直接消費する

    Kafka Topic からソーステーブルとしてデータを読み取ります。

    1. [データ開発] > [ETL] ページで、SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーします。

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- 複数のシンクに書き込む場合に必要です。
      
      -- Kafka JSON カタログのユーザーテーブルと注文情報を結合して、各注文のユーザー名と製品名を表示します。
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` --グループと起動モードを指定
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --グループと起動モードを指定
      ON `order`.value_user_id = `user`.key_id;
      
      -- コメントテーブルとユーザーテーブルを結合して、各コメントの内容と対応するユーザー名を表示します。
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  --グループと起動モードを指定
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` --グループと起動モードを指定
      ON feedback.value_user_id = `user`.key_id;
      
      END;      -- 複数のシンクに書き込む場合に必要です。

      この例では、Print コネクタを使用して結果を直接出力します。コネクタの結果テーブルに結果を出力して、さらなる分析や計算を行うこともできます。複数のシンクへの書き込み構文の詳細については、「INSERT INTO 文」をご参照ください。

      説明

      データを直接消費する場合、Kafka JSON カタログによって解析されたスキーマは、スキーマの変更により、対応する MySQL テーブルのスキーマと異なる場合があります。たとえば、削除されたフィールドが表示されたり、一部のフィールドが null になったりすることがあります。

      カタログによって読み取られるスキーマは、消費されたデータのフィールドで構成されます。フィールドが削除されてもそのメッセージが期限切れになっていない場合、そのフィールドは null 値で表示されることがあります。この状況を処理する必要はありません。

    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. ナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象のジョブを見つけ、[アクション] 列の [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

    一時テーブルを作成してデータを消費する

    カスタムスキーマを定義し、一時テーブルからデータを読み取ります。

    1. [データ開発] > [ETL] ページで、SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーします。

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- 複数のシンクに書き込む場合に必要です。
      -- Kafka JSON カタログのユーザーテーブルと注文情報を結合して、各注文のユーザー名と製品名を表示します。
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- コメントテーブルとユーザーテーブルを結合して、各コメントの内容と対応するユーザー名を表示します。
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      -- 複数のシンクに書き込む場合に必要です。

      この例では、Print コネクタを使用して結果を直接出力します。コネクタの結果テーブルに結果を出力して、さらなる分析や計算を行うこともできます。複数のシンクへの書き込み構文の詳細については、「INSERT INTO 文」をご参照ください。

      次の表に、一時テーブルを構成するためのパラメーターを示します。

      パラメーター

      説明

      注意

      connector

      コネクタのタイプ。

      値を `kafka` に設定します。

      topic

      Topic の名前。

      これは Kafka JSON Catalog と一致します。

      properties.bootstrap.servers

      Kafka ブローカーのアドレス。

      フォーマットは host:port,host:port,host:port です。複数のアドレスはカンマ (,) で区切ります。

      scan.startup.mode

      Kafka でデータを読み取る開始オフセット。

      有効な値:

      • earliest-offset: Kafka パーティションの最も古いオフセットからデータを読み取ります。

      • latest-offset: 最新のオフセットからデータを読み取ります。

      • group-offsets (デフォルト): 指定された properties.group.id のコミット済みオフセットからデータを読み取ります。

      • timestamp: scan.startup.timestamp-millis で指定されたタイムスタンプからデータを読み取ります。

      • specific-offsets: scan.startup.specific-offsets で指定されたオフセットからデータを読み取ります。

      注意

      このパラメーターは、ジョブがステートなしで開始された場合にのみ有効になります。ジョブがチェックポイントから再開されるか、そのステートが再開される場合、ジョブは優先的にステートに保存された進行状況を使用して読み取りを再開します。

      key.format

      Flink Kafka コネクタが Kafka メッセージキーをシリアル化または逆シリアル化するために使用するフォーマット。

      値を `json` に設定します。

      key.fields

      ソーステーブルまたは結果テーブルの、Kafka メッセージのキーに対応するフィールド。

      複数のフィールド名はセミコロン (;) で区切ります。例: field1;field2

      key.fields-prefix

      メッセージ値またはメタデータのフィールドとの名前の競合を避けるための、すべての Kafka メッセージキーのカスタムプレフィックス。

      この値は、Kafka JSON カタログの key.fields-prefix パラメーターの値と同じである必要があります。

      value.format

      Flink Kafka コネクタが Kafka メッセージ値をシリアル化または逆シリアル化するために使用するフォーマット。

      値を `json` に設定します。

      value.fields-prefix

      メッセージキーまたはメタデータのフィールドとの名前の競合を避けるための、すべての Kafka メッセージ値のカスタムプレフィックス。

      この値は、Kafka JSON カタログの value.fields-prefix パラメーターの値と同じである必要があります。

      value.fields-include

      メッセージキーフィールドを処理するときのメッセージ値のポリシー。

      値を `EXCEPT_KEY` に設定します。これは、メッセージ値にメッセージキーのフィールドが含まれていないことを示します。

      value.json.infer-schema.flatten-nested-columns.enable

      Kafka メッセージ値の JSON でネストされた列を再帰的に展開するかどうかを指定します。

      この値は、カタログの infer-schema.flatten-nested-columns.enable パラメーターの値に対応します。

      value.json.infer-schema.primitive-as-string

      Kafka メッセージ値のすべての基本データ型を STRING 型として推論するかどうかを指定します。

      この値は、カタログの infer-schema.primitive-as-string パラメーターの値に対応します。

    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. ナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象のジョブを見つけ、[アクション] 列の [開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

  2. ジョブの結果を表示します。

    1. ナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択し、対象のジョブをクリックします。

    2. [ジョブログ] タブで、[実行中のタスクマネージャー] タブをクリックし、[パス、ID] の下のタスクをクリックします。

    3. [ログ] をクリックし、PrintSinkOutputWriter に関連するログ情報を検索します。

      1.png

リファレンス