すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:CTAS 文を使用して Realtime Compute for Apache Flink で MySQL から StarRocks にデータを同期する

最終更新日:Oct 30, 2025

このトピックでは、CREATE TABLE AS (CTAS) 文を使用して、Realtime Compute for Apache Flink で MySQL から EMR Serverless StarRocks にデータを同期する方法について説明します。

背景情報

CTAS または CREATE DATABASE AS (CDAS) 文を使用して、MySQL から EMR Serverless StarRocks にデータを同期できます。CTAS 文は単一テーブルのスキーマとデータを同期でき、CDAS 文はデータベース全体または同じデータベース内の複数テーブルのスキーマとデータを同期できます。このトピックでは CTAS 文を使用します。CDAS 文は CTAS 文と同様の方法で使用されます。詳細については、「CDAS の概要」をご参照ください。

CREATE TABLE AS (CTAS) 文を使用すると、MySQL のテーブルと同じスキーマを持つテーブルを StarRocks に自動的に作成し、データを同期できます。また、ソーステーブルのスキーマ変更を宛先テーブルにリアルタイムで同期することもでき、宛先ストレージでのテーブル作成やソーステーブルのスキーマ変更の維持の効率が向上します。

CTAS 文を実行すると、Flink は次の操作を実行します。

  1. 宛先ストレージに宛先テーブルが存在するかどうかを確認します。

    • 宛先テーブルが存在しない場合、宛先テーブルが属するカタログに基づいて、対応する宛先テーブルが宛先ストレージに作成されます。宛先テーブルはソーステーブルと同じスキーマを持ちます。

    • 宛先テーブルが存在する場合、Flink はテーブル作成ステップをスキップします。既存の宛先テーブルのスキーマがソーステーブルのスキーマと異なる場合、エラーメッセージが返されます。

  2. データ同期ジョブをコミットして実行します。Flink は、データとソーステーブルのスキーマの変更を宛先テーブルに同期します。

スキーマ変更同期ポリシーは、CTAS 文を使用して、データとソーステーブルのスキーマの変更を宛先テーブルにリアルタイムで同期します。

スキーマの変更には、テーブルの作成と、テーブル作成後のスキーマの変更が含まれます。

  • 次のスキーマ変更がサポートされています。

    • NULL 値を許容する列の追加: 文は、関連する列を宛先テーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。

    • NULL 値を許容する列の削除: 文は、テーブルから列を削除する代わりに、宛先テーブルの NULL 値を許容する列を自動的に NULL 値で埋めます。

    • 列の名前変更: 文は、名前が変更された列を宛先テーブルの末尾に追加し、名前変更前の列を NULL 値で埋めます。

      たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列が宛先テーブルの末尾に追加され、col_a 列は自動的に NULL 値で埋められます。

  • 次のスキーマ変更はサポートされていません。

    • データ型の変更。

      たとえば、列のデータが VARCHAR 型から BIGINT 型に変更されたり、列のプロパティが NOT NULL から NULLABLE に変更されたりする場合です。

    • プライマリキーやインデックスなどの制約の変更。

    • NULL 値を許容しない列の追加または削除。

説明
  • ソーステーブルのスキーマに前述の変更のいずれかがある場合は、宛先テーブルを削除し、CTAS 文を実行するジョブを再起動する必要があります。これにより、宛先テーブルが再度作成され、既存データが宛先テーブルに再同期されます。
  • CTAS 文は DDL 文の種類を識別しませんが、スキーマが変更される前後の 2 つのデータレコード間のスキーマの違いを比較します。したがって、列を削除してから再度追加し、列を削除および追加するために使用される 2 つの DDL 文の間にデータ変更がない場合、CTAS 文はスキーマ変更が発生しなかったと見なします。同様に、ソーステーブルに列を追加しても、CTAS 文はスキーマ変更同期をトリガーしません。文は、ソーステーブルのデータが変更された場合にのみスキーマ変更を識別します。この場合、文はスキーマ変更を宛先テーブルに同期します。
  • CTAS 文でサポートされているフィールドタイプの詳細については、「Continuously load data from Apache Flink®」をご参照ください。

前提条件

説明

このトピックでは、MySQL 5.7 と Flink バージョン vvr-8.0.11-flink-1.17 を例として使用します。

制限事項

  • Flink クラスター、EMR Serverless StarRocks インスタンス、および ApsaraDB RDS for MySQL インスタンスは、同じ VPC 内にある必要があります。

  • ApsaraDB RDS for MySQL インスタンスは、バージョン 5.7 以降である必要があります。

ステップ 1: テストデータの準備

  1. テストデータベースとアカウントを作成します。詳細については、「データベースとアカウントの作成」をご参照ください。

    データベースとアカウントを作成した後、テストアカウントに読み取りおよび書き込み権限を付与します。

    説明

    このトピックでは、データベース名は test_cdc、アカウント名は test です。

  2. 作成したテストアカウントを使用して MySQL インスタンスに接続します。詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインする」をご参照ください。

  3. MySQL で、次のコマンドを実行してデータテーブルを作成します。

    use test_cdc;
    -- テーブルを作成
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- データを挿入
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2025-06-22 17:13:44',3)
  4. EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。

  5. 次のコマンドを実行して、test_cdc データベースとテストユーザーを作成します。ユーザーはスーパー管理者 (パスワード例: 1qaz!QAZ) として作成するか、一般ユーザーとして作成してデータベースとそのテーブルに対する権限を付与できます。詳細については、「ユーザーの管理」をご参照ください。

    -- データベースを作成
    CREATE DATABASE test_cdc;
    -- ユーザーを作成
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    -- ユーザーにデータベース権限を付与
    GRANT ALL on test_cdc to test;
    -- ユーザーにテーブル権限を付与
    GRANT ALL ON ALL TABLES IN DATABASE test_cdc to test;

ステップ 2: Flink コンソールでカタログを作成する

Realtime Compute for Apache Flink コンソールの Data Management ページで MySQL と StarRocks のカタログを作成します。詳細については、「Data Management」をご参照ください。

説明

パラメーター設定は参照用です。必要に応じてパラメーターを設定してください。

  • MySQL カタログ

    • コード例

      CREATE CATALOG mysql WITH (
        'type' = 'mysql',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'default-database' = 'test_cdc'
      );
    • パラメーター

      パラメーター

      説明

      type

      タイプ。値は mysql に固定されます。

      hostname

      RDS インスタンスの内部エンドポイント。RDS インスタンスの [データベース接続] ページに移動し、内部エンドポイントをクリックしてコピーできます。例: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。

      port

      MySQL データベースサービスのポート番号。デフォルト値は 3306 です。

      username

      MySQL データベースサービスのユーザー名。

      ステップ 1: テストデータの準備」のアカウントのユーザー名を入力します。この例では、test を使用します。

      password

      MySQL データベースサービスのパスワード。

      ステップ 1: テストデータの準備」のアカウントのパスワードを入力します。

      default-database

      デフォルトの MySQL データベース名。

      ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。

  • StarRocks カタログ

    • コード例

      CREATE CATALOG sr  WITH (
        'type' = 'starrocks',
        'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
        'username' = 'test',
        'password' = '1qaz!QAZ',
        'dbname' = 'test_cdc'
      );
    • パラメーター

      パラメーター

      説明

      type

      タイプ。値は starrocks に固定されます。

      endpoint

      フロントエンド (FE) ノードの内部エンドポイントとクエリポート。フォーマットは EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:9030 です。

      例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。

      説明

      EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストとインスタンスの詳細の表示」をご参照ください。

      username

      StarRocks のユーザー名。

      ステップ 1: テストデータの準備」のアカウントのユーザー名を入力します。この例では、test です。

      password

      StarRocks データベースサービスのパスワード。

      ステップ 1: テストデータの準備」のアカウントのパスワードを入力します。

      dbname

      StarRocks データベース名。

      ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc です。

ステップ 3: ジョブの作成とデプロイ

  1. Realtime Compute for Apache Flink コンソールの [データ開発] > [ETL] ページで、CTAS 文を記述します。

    次の 3 つの例が提供されています。

    • 少なくとも 1 回のセマンティクス: sink.buffer-flush.interval-ms 設定項目を使用して、データが StarRocks に書き込まれる間隔を設定します。このモードは、短い書き込み間隔と低いメモリ使用量を提供します。

      /*
            少なくとも 1 回のセマンティクス
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      
    • 1 回限りのセマンティクス: チェックポイント間隔を指定する必要があります。利点は、エラーが発生したときにデータが失われたり重複したりしないことです。欠点は、チェックポイント間隔によってデータがいつ表示されるかが決まることです。詳細については、「チェックポイント」をご参照ください。

      /*
            1 回限りのセマンティクス。
      */
      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      
    • シンプルモード: テーブルを作成するときに、ソーステーブルのフィールドを指定する必要はありません。テーブルは MySQL テーブルのスキーマに基づいて作成されます。これは開発者にとって便利です。ただし、パーティションを作成することはできません。パーティションが必要なテーブルは、通常モードで作成する必要があります。

      /*
            上記の 2 つの例では通常モードを使用しています。この例ではシンプルモードを使用しています。
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl;
                                      

    表 1. WITH パラメーター

    パラメーター

    必須

    説明

    starrocks.create.table.properties

    はい

    フィールド定義を除く、StarRocks テーブル作成文のその他のサフィックス定義。例: engine、key、および buckets。

    database-name

    はい

    StarRocks データベース名。

    このトピックでは、データベース名は test_cdc です。

    jdbc-url

    はい

    StarRocks でクエリを実行するために使用されます。

    例: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。この例では、fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com は EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントです。

    説明

    EMR Serverless StarRocks インスタンスの FE ノードの内部ネットワークアドレスを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。

    load-url

    はい

    FE ノードの内部エンドポイントとクエリポート。フォーマットは EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:8030 です。

    例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

    説明

    EMR Serverless StarRocks インスタンスの FE ノードの内部ネットワークアドレスを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。

    sink.semantic

    いいえ

    値を exactly-once に設定して、1 回限りのセマンティクスを保証します。デフォルト値は at-least-once です。

    starrocks.create.table.mode

    いいえ

    次の値がサポートされています。

    • 通常モード (デフォルト): 例に示すように、starrocks.create.table.properties パラメーターに engine、key、buckets などの完全な設定を指定する必要があります。

    • シンプルモード: engine はデフォルトで olap です。キータイプはプライマリキーで、プライマリキーは MySQL テーブルのプライマリキーと同じです。デフォルトの分散はハッシュ (すべてのプライマリキー) で、パーティションはありません。starrocks.create.table.properties パラメーターでは、buckets は必須で、properties はオプションです。

    sink.properties.row_delimiter

    いいえ

    カスタムの行区切り文字。

    sink.properties.column_separator

    いいえ

    カスタムの列区切り文字。

    説明
    • vvr-6.0.5-flink-1.15 より前のバージョンの Flink を使用する場合、WITH 句に 'sink.use.new-apiapi'='false', を追加する必要があります。

    • その他の構成の詳細については、「Apache Flink からの継続的なデータロード」をご参照ください。

    表 2. OPTIONS パラメーター

    パラメーター

    説明

    connector

    タイプ。値は mysql-cdc に固定されます。

    hostname

    RDS インスタンスの内部エンドポイント。

    RDS インスタンスの [データベース接続] ページに移動し、内部エンドポイントをクリックしてコピーできます。例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    MySQL データベースサービスのポート番号。デフォルト値は 3306 です。

    username

    MySQL データベースサービスのユーザー名。

    ステップ 1: テストデータの準備」のアカウントのユーザー名を入力します。この例では、test を使用します。

    password

    MySQL データベースサービスのパスワード。

    ステップ 1: テストデータの準備」のアカウントのパスワードを入力します。

    table-name

    StarRocks のテーブル名。

    ステップ 1: テストデータの準備」で作成したテーブル名を入力します。この例では、runoob_tbl です。

    database-name

    デフォルトの MySQL データベース名。

    ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc です。

  2. [デプロイ] をクリックします。

  3. [新しいバージョンのデプロイ] ページで、デプロイメントターゲットを選択し、[OK] をクリックします。

  4. [ジョブ O&M] ページで、開始したいジョブの [アクション] 列にある [開始] をクリックします。

説明

Realtime Compute for Apache Flink コンソールでは CTAS 文をデバッグできません。

ステップ 4: データ同期結果の検証

データのクエリ

  1. EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。

  2. StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。

    use test_cdc;
    select * from runoob_tbl;

    次の結果が返され、MySQL からのデータが StarRocks に同期されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

挿入されたデータの同期

  1. RDS データベースウィンドウで、次のコマンドを実行してデータを挿入します。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。

    select * from runoob_tbl;

    次の結果が返され、データが挿入されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2025-06-23      |       1 |
    |        18 | first        | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

更新されたデータの同期

  1. RDS データベースウィンドウで、次のコマンドを実行して指定されたデータを更新します。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。

    select * from runoob_tbl;

    次の結果が返され、データが更新されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2025-06-23      |       1 |
    |        18 | new          | tom           | 2025-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

削除されたデータの同期

  1. RDS データベースウィンドウで、次のコマンドを実行して指定されたデータを削除します。

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。

    select * from runoob_tbl;

    次の結果が返され、データが削除されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+ 
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | 
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2025-06-22      |       3 | 
    +-----------+--------------+---------------+-----------------+---------+

追加された NULL 値を許容する列の同期

  1. RDS データベースウィンドウで、次のコマンドを実行して NULL 値を許容する列を追加します。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 次のコマンドを実行してデータを挿入します。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。

    select * from runoob_tbl;

    次の結果が返され、スキーマが変更されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+----------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | 
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |        18 | new          | tom           | 2025-06-22      |       3 |     NULL |
    +-----------+--------------+---------------+-----------------+---------+----------+ 
    |         1 | second       | tom2          | 2025-06-23      |       1 |      2   |
    +-----------+--------------+---------------+-----------------+---------+----------+ 

CDAS の概要

CDAS は CTAS の糖衣構文です。CDAS 文を使用すると、MySQL からのデータベース全体の同期が可能になります。これにより、ソースが MySQL のデータベースで、宛先が StarRocks の対応するテーブルセットである Flink ジョブが生成されます。`including table` 構文を使用して、データベースからテーブルのサブセットのみを選択して CDAS 操作を行うこともできます。

CTAS の実行と同様に、CDAS 文を実行する前に MySQL と StarRocks のカタログを作成する必要があります。次のコードは構文の例です。

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASE mysql.test_cdc including table
 'tabl1','tbl2','tbl3';

リファレンス

Realtime Compute for Apache Flink は、CTAS 文だけでなく、データインジェスト YAML ファイルを使用した StarRocks へのデータ同期もサポートしています。詳細については、「データインジェスト」をご参照ください。