すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink CDC を使用して MySQL データベース全体を Kafka に同期する

最終更新日:Jan 30, 2026

このトピックでは、MySQL データベース全体を Kafka に同期する方法について説明します。この方法により、複数のタスクが MySQL データベースにかける負荷を軽減できます。

背景情報

MySQL Change Data Capture (CDC) テーブルは、MySQL からデータを取得し、テーブルの変更をリアルタイムで同期します。これらは、複雑なコンピューティングによく使用されます。たとえば、MySQL CDC テーブルをディメンションテーブルとして使用し、他のデータテーブルと結合できます。単一の MySQL テーブルが複数のジョブの依存関係になることがあります。これらのジョブが同じテーブルのデータを処理すると、MySQL データベースは複数の接続を確立します。これにより、MySQL サーバーとネットワークに大きな負荷がかかります。

ソリューションアーキテクチャ

アップストリームの MySQL データベースへの負荷を軽減するため、Realtime Compute for Apache Flink では、MySQL データベース全体を Kafka に同期できます。このソリューションでは、Kafka を中間レイヤーとして使用します。Flink CDC データインジェストジョブがデータを Kafka に同期します。

単一のジョブで、アップストリームの MySQL データベースから Kafka へデータをリアルタイムに同期します。各 MySQL テーブルは、対応する Kafka Topic に upsert モードで書き込まれます。ダウンストリームのジョブは、MySQL テーブルに直接アクセスする代わりに、Upsert Kafka コネクタを使用して Topic からデータを読み取ります。この方法により、MySQL データベースへの負荷が大幅に軽減されます。

图片 1

制限事項

  • 同期対象の MySQL テーブルには、プライマリキーが必要です。

  • 自己管理型 Kafka クラスター、EMR Kafka クラスター、または ApsaraMQ for Kafka を使用できます。ApsaraMQ for Kafka を使用する場合、デフォルトのエンドポイントを使用する必要があります。

  • Kafka クラスターには、ソーステーブルよりも多くのストレージ容量が必要です。そうでない場合、容量不足によりデータ損失が発生する可能性があります。データベース同期のために作成された Topic はコンパクト化されます。コンパクト化された Topic では、各キーの最新のメッセージのみが保持され、データは有効期限切れになりません。これは、Topic がソーステーブルのサイズとほぼ同量のデータを格納することを意味します。

ユースケース

注文レビューのリアルタイム分析を考えてみましょう。`user`、`order`、`feedback` の 3 つのテーブルがあります。次の図は、これらのテーブルのデータを示しています。mysql database

注文の詳細とユーザーレビューを表示するには、`user` テーブルを結合して `name` フィールドからユーザー名を取得する必要があります。次の SQL 文は、その方法を示しています。

-- 注文情報をユーザーテーブルと結合して、各注文のユーザー名と製品名を表示します。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;

-- レビューをユーザーテーブルと結合して、各レビューの内容と対応するユーザー名を表示します。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;

2 つの SQL タスクでは、`user` テーブルが両方のジョブで使用されます。実行時に、両方のジョブが MySQL から完全データと増分データを読み取ります。完全データを読み取るには MySQL 接続が必要であり、増分データを読み取るにはバイナリログ (Binlog) クライアントが必要です。ジョブの数が増えるにつれて、必要な MySQL 接続と Binlog クライアントリソースも増加します。これにより、上流のデータベースに大きな負荷がかかります。この負荷を軽減するために、Flink CDC データインジェストジョブを使用して、上流の MySQL データをリアルタイムで Kafka に同期できます。その後、データは複数の下流ジョブで消費できるようになります。

前提条件

事前準備

ApsaraDB RDS for MySQL インスタンスの作成とデータソースの準備

  1. データベースを作成します。詳細については、「データベースの作成」をご参照ください。

    宛先インスタンス用に order_dw という名前のデータベースを作成します。

  2. MySQL CDC データソースを準備します。

    1. インスタンスの製品ページの上部にある [データベースにログイン] をクリックします。

    2. 表示される DMS ログインページで、作成したデータベースアカウントのユーザー名とパスワードを入力し、[ログイン] をクリックします。

    3. ログイン後、左側の order_dw データベースをダブルクリックして切り替えます。

    4. SQL コンソールで、3 つのビジネステーブルを作成するための DDL 文と、データを挿入するための文を記述します。

      CREATE TABLE `user` (
        id bigint not null primary key,
        name varchar(50) not null
      );
      
      CREATE TABLE `order` (
        id bigint not null primary key,
        product varchar(50) not null,
        user_id bigint not null
      );
      
      CREATE TABLE `feedback` (
        id bigint not null primary key,
        user_id bigint not null,
        comment varchar(50) not null
      );
      
      -- データの準備
      INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry');
      
      INSERT INTO `order` VALUES
      (1, 'Football', 2),
      (2, 'Basket', 1);
      
      INSERT INTO `feedback` VALUES
      (1, 1, 'Good.'),
      (2, 2, 'Very good');
  3. [実行] をクリックし、次に [直接実行] をクリックします。

操作手順

  1. Flink CDC データインジェストタスクを作成して開始し、アップストリームの MySQL データベースから Kafka にデータをリアルタイムで同期します。これにより、データは複数のダウンストリームジョブで利用可能になります。ジョブは自動的に Topic を作成します。`route` モジュールを使用して Topic 名を定義できます。Topic のパーティション数とレプリカ数は Kafka クラスターのデフォルト設定を使用し、`cleanup.policy` は `compact` に設定されます。

    デフォルトの Topic 名

    データベース全体の同期タスクは、MySQL のデータベース名とテーブル名をピリオドで結合したデフォルトの命名フォーマットを使用して Kafka Topic を作成します。たとえば、ジョブは order_dw.user、order_dw.order、order_dw.feedback などの Topic を作成できます。

    1. [Data Studio] > [データインジェスト] ページで、新しい Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
        scan.binlog.newly-added-table.enabled: true
        # (オプション) テーブルとフィールドのコメントを同期します。
        include-comments.enabled: true
        # (オプション) タスクマネージャーの OutOfMemory 問題を防ぐために、無制限のチャンクを優先します。
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (オプション) 読み取りを高速化するために解析フィルターを有効にします。
        scan.only.deserialize.captured.tables.changelog.enabled: true
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # 以下のパラメーターは ApsaraMQ for Kafka に必要です
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

    Topic 名の指定

    データベース同期タスクの `route` モジュールを使用して、各テーブルの Topic 名を指定できます。次のジョブは、`user1`、`order2`、`feedback3` の 3 つの Topic を作成します。

    1. [データスタジオ] > [データインジェスト] ページで、新しい Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
        scan.binlog.newly-added-table.enabled: true
        # (オプション) テーブルとフィールドのコメントを同期します。
        include-comments.enabled: true
        # (オプション) タスクマネージャーの OutOfMemory 問題を防ぐために、無制限のチャンクを優先します。
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (オプション) 読み取りを高速化するために解析フィルターを有効にします。
        scan.only.deserialize.captured.tables.changelog.enabled: true
        
      route:
        - source-table: order_dw.user
          sink-table: user1
        - source-table: order_dw.order
          sink-table: order2
        - source-table: order_dw.feedback
          sink-table: feedback3
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # 以下のパラメーターは ApsaraMQ for Kafka に必要です
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

    Topic 名の一括設定

    データベース同期タスクの `route` モジュールを使用して、生成される Topic 名のパターンを一括で指定できます。次のジョブは、`topic_user`、`topic_order`、`topic_feedback` の 3 つの Topic を作成します。

    1. [Data Studio] > [データインジェスト] ページで、新しい Flink CDC データインジェストジョブを作成し、YAML エディターに次のコードをコピーします。

      source:
        type: mysql
        name: MySQL Source
        hostname: #{hostname}
        port: 3306
        username: #{usernmae}
        password: #{password}
        tables: order_dw.\.*
        server-id: 28601-28604
        # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。
        scan.binlog.newly-added-table.enabled: true
        # (オプション) テーブルとフィールドのコメントを同期します。
        include-comments.enabled: true
        # (オプション) タスクマネージャーの OutOfMemory 問題を防ぐために、無制限のチャンクを優先します。
        scan.incremental.snapshot.unbounded-chunk-first.enabled: true
        # (オプション) 読み取りを高速化するために解析フィルターを有効にします。
        scan.only.deserialize.captured.tables.changelog.enabled: true
        
      route:
        - source-table: order_dw.\.*
          sink-table: topic_<>
          replace-symbol: <>
      
      sink:
        type: upsert-kafka
        name: upsert-kafka Sink
        properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092
        # 以下のパラメーターは ApsaraMQ for Kafka に必要です
        aliyun.kafka.accessKeyId: #{ak}
        aliyun.kafka.accessKeySecret: #{sk}
        aliyun.kafka.instanceId: #{instanceId}
        aliyun.kafka.endpoint: #{endpoint}
        aliyun.kafka.regionId: #{regionId}
    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

  1. Kafka データのリアルタイムコンシューム

    アップストリームの MySQL データベースからのデータは、JSON フォーマットで Kafka に書き込まれます。単一の Kafka Topic は複数のダウンストリームジョブでコンシュームでき、これらのジョブは Topic から読み取って最新のテーブルデータを取得します。同期されたテーブルのデータは、次の 2 つの方法でコンシュームできます。

    カタログを介したデータの直接コンシューム

    Kafka Topic からのデータをソーステーブルとして読み取ります。

    1. [データ開発] > [ETL] ページで、新しい SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーします。

      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- 複数のシンクに書き込む場合に必要です。
      
      -- Kafka JSON カタログのユーザーテーブルと注文情報を結合して、各注文のユーザー名と製品名を表示します。
      INSERT INTO print_user_proudct
      SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as `order` -- グループと開始モードを指定します
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- グループと開始モードを指定します
      ON `order`.value_user_id = `user`.key_id;
      
      -- レビューをユーザーテーブルと結合して、各レビューの内容と対応するユーザー名を表示します。
      INSERT INTO print_user_feedback
      SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name
      FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/  as feedback  -- グループと開始モードを指定します
      LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- グループと開始モードを指定します
      ON feedback.value_user_id = `user`.key_id;
      
      END;      -- 複数のシンクに書き込む場合に必要です。

      この例では、Print コネクタを使用して結果を直接出力します。結果をコネクタの結果テーブルに出力して、さらなる分析や計算を行うこともできます。複数のシンクへの書き込み構文の詳細については、「INSERT INTO 文」をご参照ください。

      説明

      データ同期中に MySQL テーブルのスキーマが変更された場合、Kafka JSON カタログによって解析されたスキーマが実際のテーブルスキーマと同期しなくなる可能性があります。たとえば、MySQL テーブルからフィールドが削除されても、カタログから解析されたスキーマにまだ表示されている場合、これらのフィールドの値は null になることがあります。

      カタログから読み取られたスキーマは、コンシュームされたデータのフィールドで構成されます。フィールドが削除されても、それを含むメッセージが有効期限切れになっていない場合、スキーマには存在しなくなったフィールドが含まれる可能性があります。そのようなフィールドの値は null になります。この状況では特別な処理は必要ありません。

    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

    一時テーブルを作成してデータをコンシュームする

    カスタムスキーマを定義し、一時テーブルからデータを読み取ります。

    1. [データ開発] > [ETL] ページで、新しい SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーできます。

      CREATE TEMPORARY TABLE user_source (
        key_id BIGINT,
        value_name STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE order_source (
        key_id  BIGINT,
        value_product STRING,
        value_user_id BIGINT  
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'order',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE feedback_source (
        key_id  BIGINT,
        value_user_id BIGINT,
        value_comment STRING
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'feedback',
        'properties.bootstrap.servers' = '<yourKafkaBrokers>',
        'scan.startup.mode' = 'earliest-offset',
        'key.format' = 'json',
        'value.format' = 'json',
        'key.fields' = 'key_id',
        'key.fields-prefix' = 'key_',
        'value.fields-prefix' = 'value_',
        'value.fields-include' = 'EXCEPT_KEY',
        'value.json.infer-schema.flatten-nested-columns.enable' = 'false',
        'value.json.infer-schema.primitive-as-string' = 'false'
      );
      
      CREATE TEMPORARY TABLE print_user_proudct(
        order_id BIGINT,
        product STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      CREATE TEMPORARY TABLE print_user_feedback(
        feedback_id BIGINT,
        `comment` STRING,
        user_name STRING
      ) WITH (
        'connector'='print',
        'logger'='true'
      );
      
      BEGIN STATEMENT SET;      -- 複数のシンクに書き込む場合に必要です。
      -- Kafka JSON カタログのユーザーテーブルと注文情報を結合して、各注文のユーザー名と製品名を表示します。
      INSERT INTO print_user_proudct
      SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name
      FROM order_source LEFT JOIN user_source
      ON order_source.value_user_id = user_source.key_id;
      
      
      -- レビューをユーザーテーブルと結合して、各レビューの内容と対応するユーザー名を表示します。
      INSERT INTO print_user_feedback
      SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name
      FROM feedback_source  LEFT JOIN user_source
      ON feedback_source.value_user_id = user_source.key_id;
      
      END;      -- 複数のシンクに書き込む場合に必要です。

      この例では、Print コネクタを使用して結果を直接出力します。結果をコネクタの結果テーブルに出力して、さらなる分析や計算を行うこともできます。複数のシンクへの書き込み構文の詳細については、「INSERT INTO 文」をご参照ください。

      次の表に、一時テーブルを構成するためのパラメーターを示します。

      パラメーター

      説明

      注意

      connector

      コネクタのタイプ。

      値は `kafka` である必要があります。

      topic

      対応する Topic の名前。

      これは Kafka JSON カタログの説明と同じである必要があります。

      properties.bootstrap.servers

      Kafka ブローカーのアドレス。

      フォーマットは host:port,host:port,host:port です。アドレスを区切るにはカンマ (,) を使用します。

      scan.startup.mode

      Kafka からデータを読み取る開始オフセット。

      有効な値:

      • earliest-offset: Kafka の最も古いパーティションから読み取ります。

      • latest-offset: Kafka の最新のオフセットから読み取ります。

      • group-offsets (デフォルト): 指定された properties.group.id のコミット済みオフセットから読み取ります。

      • timestamp: scan.startup.timestamp-millis で指定されたタイムスタンプから読み取ります。

      • specific-offsets: scan.startup.specific-offsets で指定されたオフセットから読み取ります。

      注意

      このパラメーターは、ジョブがステートなしで開始された場合にのみ有効です。ジョブがチェックポイントから再開したり、状態を回復したりする場合、状態に保存された進行状況を優先して読み取りを再開します。

      key.format

      Flink Kafka コネクタが Kafka メッセージキーをシリアル化または逆シリアル化するために使用するフォーマット。

      値は `json` である必要があります。

      key.fields

      Kafka メッセージキーに対応するソースまたはシンクテーブルのフィールド。

      複数のフィールド名を区切るにはセミコロン (;) を使用します。例:field1;field2

      key.fields-prefix

      メッセージ値またはメタデータのフィールドとの名前の競合を避けるための、すべての Kafka メッセージキーのカスタムプレフィックス。

      これは、Kafka JSON カタログの key.fields-prefix パラメーターの値と同じである必要があります。

      value.format

      Flink Kafka コネクタが Kafka メッセージ値をシリアル化または逆シリアル化するために使用するフォーマット。

      値は `json` である必要があります。

      value.fields-prefix

      メッセージキーまたはメタデータのフィールドとの名前の競合を避けるための、すべての Kafka メッセージ値のカスタムプレフィックス。

      これは、Kafka JSON カタログの value.fields-prefix パラメーターの値と同じである必要があります。

      value.fields-include

      メッセージ本文内のメッセージキーフィールドの処理ポリシーを指定します。

      固定値 EXCEPT_KEY は、メッセージ本文にメッセージキーフィールドが含まれないことを示します。

      value.json.infer-schema.flatten-nested-columns.enable

      Kafka メッセージ値の JSON 内のネストされた列を再帰的に展開するかどうかを指定します。

      カタログの infer-schema.flatten-nested-columns.enable パラメーターの値に対応します。

      value.json.infer-schema.primitive-as-string

      Kafka メッセージ値内のすべての基本データ型を文字列型として推論するかどうかを指定します。

      カタログの infer-schema.primitive-as-string パラメーターの値に対応します。

    2. 右上隅にある [デプロイ] をクリックしてジョブをデプロイします。

    3. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ O&M] を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。

  2. ジョブの結果の表示

    1. 左側のナビゲーションウィンドウで [オペレーションセンター] > [ジョブ O&M] を選択し、対象のジョブをクリックします。

    2. [ジョブログ] タブで、[実行中のタスクマネージャー] タブの [パス、ID] の下にあるタスクをクリックします。

    3. [ログ] をクリックし、ページで PrintSinkOutputWriter に関連するログ情報を検索します。

      1.png

参考資料