すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Dataflow クラスターの Flink サービスを使用して、CTAS 文で MySQL から StarRocks にデータを同期する

最終更新日:Oct 18, 2025

このトピックでは、EMR Dataflow クラスターの Flink サービスを使用して、CREATE TABLE AS (CTAS) 文で MySQL から EMR Serverless StarRocks にデータを同期する方法について説明します。

背景情報

CTAS または CREATE DATABASE AS (CDAS) 文を使用して、MySQL から EMR Serverless StarRocks にデータを同期できます。CTAS 文は単一テーブルのスキーマとデータを同期でき、CDAS 文はデータベース全体または同じデータベース内の複数テーブルのスキーマとデータを同期できます。このトピックでは CTAS 文を使用します。CDAS 文は CTAS 文と同様の方法で使用されます。詳細については、「CDAS の概要」をご参照ください。

CREATE TABLE AS (CTAS) 文を使用すると、MySQL のテーブルと同じスキーマを持つテーブルを StarRocks に自動的に作成し、データを同期できます。また、ソーステーブルのスキーマ変更をリアルタイムで宛先テーブルに同期することもでき、宛先ストレージでのテーブル作成とソーステーブルのスキーマ変更の維持の効率が向上します。

CTAS 文を実行すると、Flink は次の操作を実行します。

  1. 宛先ストレージに宛先テーブルが存在するかどうかを確認します。

    • 宛先テーブルが存在しない場合、宛先テーブルが属するカタログに基づいて、対応する宛先テーブルが宛先ストレージに作成されます。宛先テーブルはソーステーブルと同じスキーマを持ちます。

    • 宛先テーブルが存在する場合、Flink はテーブル作成ステップをスキップします。既存の宛先テーブルのスキーマがソーステーブルのスキーマと異なる場合、エラーメッセージが返されます。

  2. データ同期ジョブをコミットして実行します。Flink は、ソーステーブルのデータとスキーマの変更を宛先テーブルに同期します。

スキーマ変更同期ポリシーは、CTAS 文を使用して、リアルタイムでデータを同期し、ソーステーブルのスキーマの変更を宛先テーブルに同期します。

スキーマの変更には、テーブルの作成とテーブル作成後のスキーマの変更が含まれます。

  • 次のスキーマ変更がサポートされています。

    • NULL 値を許容する列の追加: 文は、関連する列を宛先テーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。

    • NULL 値を許容する列の削除: 文は、テーブルから列を削除する代わりに、宛先テーブルの NULL 値を許容する列を自動的に NULL 値で埋めます。

    • 列の名前変更: 文は、名前が変更された列を宛先テーブルの末尾に追加し、名前変更前の列を NULL 値で埋めます。

      たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列が宛先テーブルの末尾に追加され、col_a 列は自動的に NULL 値で埋められます。

  • 次のスキーマ変更はサポートされていません。

    • データの型の変更。

      たとえば、列のデータが VARCHAR 型から BIGINT 型に変更されたり、列のプロパティが NOT NULL から NULLABLE に変更されたりする場合です。

    • プライマリキーやインデックスなどの制約の変更。

    • NULL 値を許容しない列の追加または削除。

説明
  • ソーステーブルのスキーマに前述の変更のいずれかがある場合は、宛先テーブルを削除し、CTAS 文を実行するジョブを再起動する必要があります。これにより、宛先テーブルが再作成され、既存データが宛先テーブルに再同期されます。
  • CTAS 文は DDL 文の種類を識別しませんが、スキーマが変更される前後の 2 つのデータレコード間のスキーマの違いを比較します。したがって、列を削除してから再度追加し、列を削除および追加するために使用される 2 つの DDL 文の間にデータ変更がない場合、CTAS 文はスキーマ変更が発生しなかったと見なします。同様に、ソーステーブルに列を追加しても、CTAS 文はスキーマ変更の同期をトリガーしません。この文は、ソーステーブルのデータが変更された場合にのみスキーマの変更を識別します。この場合、文はスキーマの変更を宛先テーブルに同期します。
  • CTAS 文でサポートされているフィールドタイプについての詳細については、「Apache Flink® からの継続的なデータロード」をご参照ください。

前提条件

説明

このトピックでは、MySQL 5.7 と EMR-3.42.0 の Dataflow クラスターを例として使用します。

制限事項

  • Dataflow クラスター、StarRocks インスタンス、および ApsaraDB RDS for MySQL インスタンスは、同じ VPC (VPC) にデプロイする必要があります。

  • Dataflow クラスターと StarRocks インスタンスは、インターネット経由でアクセスできる必要があります。

  • ApsaraDB RDS for MySQL インスタンスのエンジンバージョンは 5.7 以降である必要があります。

  • Dataflow クラスターは EMR-3.42.0 以降または EMR-5.8.0 以降である必要があります。

ステップ 1: テストデータの準備

  1. テストデータベースとテストアカウントを作成します。詳細については、「ステップ 1: ApsaraDB RDS for MySQL インスタンスの作成とデータベースの構成」をご参照ください。

    テストデータベースとアカウントを作成した後、アカウントに読み取りおよび書き込み権限を付与します。

    説明

    このトピックでは、データベース名は test_cdc、アカウント名は emr_test です。

  2. テストアカウントを使用して ApsaraDB RDS for MySQL インスタンスに接続します。詳細については、「ステップ 2: ApsaraDB RDS for MySQL インスタンスへの接続」をご参照ください。

  3. 次のコマンドを実行してテーブルを作成します。

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。

  5. 次のコマンドを実行して、test_cdc データベースを作成し、test という名前のスーパー管理者ユーザー (パスワードの例: 1qaz!QAZ) を作成するか、test という名前の一般ユーザーを作成して、そのユーザーにデータベースの権限を付与します。詳細については、「ユーザーの管理」をご参照ください。

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '1qaz!QAZ';
    GRANT ALL on test_cdc to test;

ステップ 2: カスタムコネクタのアップロード

Flink、StarRocks、および ApsaraDB RDS for MySQL 接続用のカスタムコネクタをアップロードします。

  1. SSH を使用して Dataflow クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。

  2. flink-connector-starrocks-1.2.2_flink-1.13_2.11.jarververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar をダウンロードし、Dataflow クラスターの /opt/apps/FLINK/flink-current/lib ディレクトリにアップロードします。

ステップ 3: CTAS 文の実行

  1. セッションモードでジョブを送信します。

    1. SSH を使用して Dataflow クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。

    2. 次のコマンドを実行して /opt/apps/FLINK/flink-current ディレクトリに移動します。

      cd /opt/apps/FLINK/flink-current
    3. 次のコマンドを実行して YARN セッションを開始します。

      ./bin/yarn-session.sh --detached

      コマンドが正常に実行されると、出力に application_XXXX_YY が返されます。これは、SQL クライアントにログインするために必要な sessionId です。sessionid

    4. 次のコマンドを実行して SQL クライアントを開きます。

      ./bin/sql-client.sh -s <application_XXXX_YY>
      説明

      <application_XXXX_YY> を前のステップで取得した sessionId に置き換えてください。

  2. MySQL と StarRocks のカタログを作成します。

    CREATE CATALOG sr WITH (
      'type' = 'starrocks',
      'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'username' = 'test',
      'password' = '1qaz!QAZ',
      'dbname' = 'test_cdc'
    );
    
    CREATE CATALOG mysql WITH (
      'type' = 'mysql',
      'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
      'port' = '3306',
      'username' = 'emr_test',
      'password' = '123456',
      'default-database' = 'test_cdc'
    );

    次の表にパラメーターを説明します。ビジネス要件に基づいてパラメーターを変更できます。

    表 1. StarRocks カタログパラメーター

    パラメーター

    説明

    type

    カタログのタイプ。値を starrocks に設定します。

    endpoint

    FE ノードの内部エンドポイントとクエリポート。フォーマットは EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:9030 です。例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。

    説明

    EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。

    username

    StarRocks データベースへのアクセスに使用するユーザー名。

    ステップ 1: テストデータの準備」で作成したユーザー名を入力します。この例では、test を使用します。

    password

    StarRocks データベースへのアクセスに使用するパスワード。

    ステップ 1: テストデータの準備」でアカウントに設定したパスワードを入力します。この例では、1qaz!QAZ を使用します。

    dbname

    StarRocks データベースの名前。

    ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。

    表 2. MySQL カタログパラメーター

    パラメーター

    説明

    type

    カタログのタイプ。値を mysql に設定します。

    hostname

    ApsaraDB RDS for MySQL インスタンスの内部エンドポイント。

    ApsaraDB RDS コンソールの ApsaraDB RDS for MySQL インスタンスの [データベース接続] ページで内部エンドポイントをコピーできます。例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

    port

    MySQL データベースのポート番号。デフォルト値: 3306。

    username

    MySQL データベースへのアクセスに使用するユーザー名。

    ステップ 1: テストデータの準備」で作成したアカウントのユーザー名を入力します。この例では、emr_test を使用します。

    password

    MySQL データベースへのアクセスに使用するパスワード。

    ステップ 1: テストデータの準備」で作成したアカウントのパスワードを入力します。この例では、123456 を使用します。

    default-database

    デフォルトの MySQL データベースの名前。

    ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。

  3. StarRocks カタログで CTAS 文を実行します。

    次の 3 つの方法のいずれかを使用して CTAS 文を実行できます。

    • At-least-once セマンティクス: sink.buffer-flush.interval-ms パラメーターを使用して、データが StarRocks に書き込まれる間隔を構成できます。利点は、書き込み間隔が短く、使用されるメモリが少ないことです。

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
    • Exactly-once セマンティクス: チェックポイントを定期的にスケジュールする間隔を指定する必要があります。利点は、エラーが発生したときにデータが失われたり重複したりしないことです。欠点は、チェックポイントの間隔によってデータがいつ表示されるかが決まることです。詳細については、「チェックポイント」をご参照ください。

      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8',
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.semantic' = 'exactly-once',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS ( 'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • シンプルモード: 利点は、テーブルを作成するときに MySQL データベースのテーブルのフィールドを気にする必要がないことです。作成するテーブルのスキーマは、MySQL データベースのテーブルのスキーマと同じです。このモードは開発者にとって使いやすいです。欠点は、パーティションを作成できないことです。パーティション分割する必要があるテーブルの場合は、通常モードでパーティションを作成する必要があります。

      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple',
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
      'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '1qaz!QAZ',
      'sink.buffer-flush.interval-ms' = '5000',
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr_test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

      表 3. WITH パラメーター

      パラメーター

      必須

      説明

      starrocks.create.table.properties

      はい

      StarRocks データベースでテーブルを作成するために使用される文のフィールド定義を除くその他のサフィックス定義 (サンプルコードの engine、key、buckets など)。

      database-name

      はい

      StarRocks データベースの名前。

      この例では、test_cdc を使用します。

      jdbc-url

      はい

      StarRocks に接続して StarRocks でクエリを実行するために使用される JDBC URL。

      例: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com は、EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントです。

      説明

      EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。

      load-url

      はい

      FE ノードの内部エンドポイントとクエリポート。フォーマットは EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:8030 です。

      例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。

      説明

      EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。

      sink.semantic

      いいえ

      文の実行に使用されるセマンティクス。データ整合性を確保するには、このパラメーターを exactly-once に設定します。デフォルト値: at-least-once。

      starrocks.create.table.mode

      いいえ

      有効な値:

      • normal (デフォルト): 例に示すように、starrocks.create.table.properties パラメーターで engine、key、buckets などの完全な構成を指定する必要があります。

      • simple: デフォルトでは、engine パラメーターは olap に設定され、key パラメーターは primary key に設定されます。プライマリキーは MySQL テーブルのプライマリキーと同じです。distributed by hash パラメーターはすべてのプライマリキーに対して構成され、パーティションは存在しません。starrocks.create.table.properties パラメーターで buckets を指定する必要があります。properties などのオプションのパラメーターも指定できます。

      sink.properties.row_delimiter

      いいえ

      カスタムの行区切り文字。

      sink.properties.column_separator

      いいえ

      カスタムの列区切り文字。

      説明
      • vvr-6.0.5-flink-1.15 より前のバージョンの Flink を使用する場合は、WITH 句に 'sink.use.new-apiapi'='false', を追加する必要があります。

      • その他の構成の詳細については、「Apache Flink からデータを継続的にロードする」をご参照ください。

      表 4. OPTIONS パラメーター

      パラメーター

      説明

      connector

      コネクタのタイプ。値を mysql-cdc に設定します。

      hostname

      ApsaraDB RDS for MySQL インスタンスの内部エンドポイント。

      ApsaraDB RDS コンソールの ApsaraDB RDS for MySQL インスタンスの [データベース接続] ページで内部エンドポイントをコピーできます。例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。

      port

      MySQL データベースのポート番号。デフォルト値: 3306。

      username

      MySQL データベースへのアクセスに使用するユーザー名。

      ステップ 1: テストデータの準備」で作成したアカウントのユーザー名を入力します。この例では、emr_test を使用します。

      password

      MySQL データベースへのアクセスに使用するパスワード。

      ステップ 1: テストデータの準備」で作成したアカウントのパスワードを入力します。

      table-name

      StarRocks データベースのテーブルの名前。

      ステップ 1: テストデータの準備」で作成したテーブル名を入力します。この例では、runoob_tbl を使用します。

      database-name

      デフォルトの MySQL データベースの名前。

      ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。

ステップ 4: データ同期結果の表示

説明

チェックポイントが有効になっている場合、最大待機時間はチェックポイントの間隔とほぼ同じです。

データのクエリ

  1. EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。

  2. StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。

    use test_cdc;
    select * from runoob_tbl1;

    次の出力が返されます。これは、データが ApsaraDB RDS for MySQL インスタンスから StarRocks クラスターに同期されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

挿入されたデータのクエリ

  1. ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行してデータを挿入します。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1);
  2. StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。

    select * from runoob_tbl1;

    次の出力が返されます。これは、データが挿入されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

更新されたデータの同期

  1. ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行して、指定したデータを更新します。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18;
  2. StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。

    select * from runoob_tbl1;

    次の出力が返されます。これは、更新されたデータが同期されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

削除されたデータの同期

  1. ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行して、指定したデータを削除します。

    DELETE FROM runoob_tbl WHERE runoob_id = 1;
  2. StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。

    select * from runoob_tbl1;

    次の出力が返されます。これは、削除されたデータが同期されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

NULL 値を許容する列の追加

  1. ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行して、NULL 値を許容する列を追加します。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 次のコマンドを実行してデータを挿入します。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。

    select * from runoob_tbl1;

    次の出力が返されます。これは、スキーマが変更され、NULL 値を許容する列が追加されたことを示します。

    +-----------+--------------+---------------+-----------------+---------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 |
    +-----------+--------------+---------------+-----------------+---------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |       2 |
    |        18 | new          | tom           | 2022-06-22      |       3 |    NULL |
    +-----------+--------------+---------------+-----------------+---------+---------+

CDAS の概要

CREATE DATABASE AS (CDAS) 文は、CTAS 文の糖衣構文です。CDAS 文を使用すると、MySQL からデータベース全体を同期できます。つまり、Flink ジョブが生成されます。ソーステーブルは MySQL のデータベースであり、宛先テーブルは StarRocks の複数のテーブルです。また、including table 構文を使用して、CDAS 操作のためにデータベース内の一部のテーブルのみを選択することもできます。

CTAS 文の実行と同様に、CDAS 文を実行する前に、MySQL データベースと StarRocks データベースにカタログを作成する必要があります。サンプル文:

CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
 as DATABASEmysql.test_cdc including table
 'tabl1','tbl2','tbl3'   /*+ OPTIONS (   'connector' = 'mysql-cdc',
  'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port' = '3306',
  'username' = 'test',
  'password' = '123456',
  'database-name' = 'test_cdc' )*/;