すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:CREATE TABLE AS ステートメントと CREATE DATABASE AS ステートメントを使用して、ApsaraDB RDS for MySQL インスタンスのデータを StarRocks クラスタに同期する

最終更新日:Jan 11, 2025

CREATE TABLE AS ステートメントを実行して、あるテーブルから別のテーブルにデータとテーブルスキーマの変更を同期できます。 CREATE DATABASE AS ステートメントを実行して、データベースのデータを同期したり、データベース内の複数のテーブルのデータとスキーマを同期したりできます。 このトピックでは、Realtime Compute for Apache Flink の CREATE TABLE AS ステートメントと CREATE DATABASE AS ステートメントを使用して、E-MapReduce(EMR)StarRocks のトランザクション処理(TP)データと分析処理(AP)データを同期する方法について説明します。

背景情報

CTAS ステートメントを使用して、MySQL データベースと同じスキーマを持つテーブルを StarRocks に自動的に作成し、データを同期できます。 また、アップストリームテーブルのスキーマの変更をダウンストリームテーブルにリアルタイムで同期することもできます。 これにより、デスティネーションストレージにテーブルを作成し、ソーステーブルのスキーマの変更を維持する効率が向上します。

CTAS ステートメントを実行すると、Flink は次の操作を実行します。

  1. デスティネーションストレージにデスティネーションテーブルが存在するかどうかを確認します。

    • デスティネーションテーブルが存在しない場合、デスティネーションテーブルが属するカタログに基づいて、対応するデスティネーションテーブルがデスティネーションストレージに作成されます。 デスティネーションテーブルのスキーマは、ソーステーブルと同じです。

    • デスティネーションテーブルが存在する場合、Flink はテーブル作成手順をスキップします。 既存のデスティネーションテーブルのスキーマがソーステーブルのスキーマと異なる場合は、エラーメッセージが返されます。

  2. データ同期ジョブをコミットして実行します。 Flink は、ソーステーブルのデータとスキーマの変更をデスティネーションテーブルに同期します。

スキーマ変更同期ポリシーは、CTAS ステートメントを使用して、データをリアルタイムで同期し、ソーステーブルのスキーマの変更をデスティネーションテーブルに同期します。

スキーマの変更には、テーブルの作成とテーブルの作成後のスキーマの変更が含まれます。

  • 次のスキーマの変更がサポートされています。

    • NULL 値を許容する列を追加する: ステートメントは、関連する列をデスティネーションテーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。

    • NULL 値を許容する列を削除する: ステートメントは、テーブルから列を削除する代わりに、デスティネーションテーブルの NULL 値を許容する列に NULL 値を自動的に設定します。

    • 列の名前を変更する: 列の名前を変更する操作には、列の追加と列の削除が含まれます。 ソーステーブルで列の名前が変更されると、新しい名前を使用する列がデスティネーションテーブルの末尾に追加され、元の名前を使用する列には NULL 値が設定されます。

      たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列がデスティネーションテーブルの末尾に追加され、col_a 列には NULL 値が設定されます。

  • 次のスキーマの変更はサポートされていません。

    • データ型を変更する。

      たとえば、列のデータが VARCHAR 型から BIGINT 型に変更された場合、または列のプロパティが NOT NULL から NULLABLE に変更された場合。

    • 主キーやインデックスなどの制約を変更する。

    • NULL 値を許容しない列を追加または削除する。

    • DDL ステートメントのフィールドの長さを調整する。

説明
  • ソーステーブルのスキーマに上記の変更のいずれかがある場合は、デスティネーションテーブルを削除し、CTAS ステートメントを実行するジョブを再起動する必要があります。 これにより、デスティネーションテーブルが再作成され、履歴データがデスティネーションテーブルに再同期されます。

  • CTAS ステートメントは DDL ステートメントのタイプを識別しませんが、スキーマが変更される前後の 2 つのデータレコード間のスキーマの違いを比較します。 したがって、列を削除してから再度追加し、列を削除および追加するために使用される 2 つの DDL ステートメントの間にデータが変更されない場合、CTAS ステートメントはスキーマの変更が発生していないと見なします。 同様に、ソーステーブルに列を追加した場合でも、CTAS ステートメントはスキーマ変更同期をトリガーしません。 ステートメントは、ソーステーブルのデータが変更された場合にのみスキーマの変更を識別します。 この場合、ステートメントはスキーマの変更をデスティネーションテーブルに同期します。

  • CTAS ステートメントでサポートされているフィールドタイプの詳細については、「Apache Flink から継続的にデータを読み込む」をご参照ください。

  • CTAS ステートメントを使用して複数の MySQL テーブルをマージする場合、システムは _db_name 列と _table_name 列を新しいテーブルに自動的に追加して、ソーステーブルに関する情報を追跡します。 新しいテーブルスキーマが期待どおりになるように、3 番目の列から列の順序を定義する必要があります。

前提条件

説明

このトピックでは、MySQL 5.7、EMR 3.39.1 の StarRocks クラスタ、および vvr-6.0.3-flink-1.15 のフルマネージド Flink を使用します。

制限事項

  • 作成するフルマネージド Flink ワークスペース、StarRocks クラスタ、および ApsaraDB RDS for MySQL インスタンスは、同じ仮想プライベートクラウド(VPC)内にある必要があります。

  • ApsaraDB RDS for MySQL インスタンスのエンジンバージョンは 5.7 以降である必要があります。

  • StarRocks クラスタでインターネットアクセスが有効になっている必要があります。

  • フルマネージド Flink は vvr-6.0.3-flink-1.15 以降である必要があります。

ステップ 1: テストデータを準備する

  1. テスト用のデータベースとアカウントを作成します。 詳細については、「ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントを作成する」をご参照ください。

    データベースとアカウントを作成した後、テストアカウントに読み取りと書き込みの権限を付与します。

    説明

    このトピックでは、データベース名は test_cdc、アカウントのユーザー名は test です。

  2. 作成したテストアカウントを使用して、ApsaraDB RDS for MySQL インスタンスにログオンします。 詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログオンする」をご参照ください。

  3. ApsaraDB RDS for MySQL データベースで次のコマンドを実行して、データテーブルを作成します。

    use test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl`(
       `runoob_id` INT UNSIGNED AUTO_INCREMENT,
       `runoob_title` VARCHAR(100) NOT NULL,
       `runoob_author` VARCHAR(40) NOT NULL,
       `submission_date` DATE,
       `add_col` int DEFAULT NULL,
       PRIMARY KEY ( `runoob_id` )
    )ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)
  4. SSH モードで StarRocks クラスタにログオンします。 詳細については、「クラスタにログオンする」をご参照ください。

  5. 次のコマンドを実行して、StarRocks クラスタに接続します。

    mysql -h127.0.0.1 -P 9030 -uroot
  6. 次のコマンドを実行して、ユーザーを作成し、そのユーザーに権限を付与します。

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED by '123456';
    GRANT CREATE TABLE ON DATABASE test_cdc TO test;

ステップ 2: フルマネージド Flink のコンソールで SQL クライアントを使用してカタログを作成する

フルマネージド Flink のコンソールのドラフトエディタページで、MySQL カタログと StarRocks カタログを作成します。 詳細については、「Flink SQL デプロイメントの概要」をご参照ください。

説明

パラメータの形式は参考用です。 業務要件に基づいてパラメータを設定できます。

  • MySQL カタログ

    • サンプルコード

      CREATE CATALOG mysql WITH (
        'type' = 'mysql',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'default-database' = 'test_cdc'
      );
    • パラメータ

      パラメータ

      説明

      type

      カタログのタイプ。 値を mysql に設定します。

      hostname

      ApsaraDB RDS for MySQL インスタンスの内部エンドポイント。 ApsaraDB RDS コンソールの ApsaraDB RDS for MySQL インスタンスのデータベース接続ページで内部エンドポイントをコピーできます(例: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com)。

      port

      ApsaraDB RDS for MySQL データベースのポート番号。 デフォルト値: 3306。

      username

      ApsaraDB RDS for MySQL データベースへのアクセスに使用するユーザー名。

      このパラメータをステップ 1: テストデータを準備するで作成したアカウントのユーザー名に設定します。 この例では、test を使用します。

      password

      ApsaraDB RDS for MySQL データベースへのアクセスに使用するパスワード。

      このパラメータをステップ 1: テストデータを準備するで作成したアカウントのパスワードに設定します。

      default-database

      デフォルトの ApsaraDB RDS for MySQL データベースの名前。

      このパラメータをステップ 1: テストデータを準備するで作成したデータベースの名前に設定します。 この例では、test_cdc を使用します。

  • StarRocks カタログ

    • サンプルコード

      CREATE CATALOG sr  WITH (
        'type' = 'starrocks',
        'endpoint' = '172.16.**.**:9030',
        'username' = 'test',
        'password' = '123456',
        'dbname' = 'test_cdc'
      );
    • パラメータ

      パラメータ

      説明

      type

      カタログのタイプ。 値を starrocks に設定します。

      endpoint

      StarRocks クラスタのフロントエンドの IP アドレスとポート番号。

      username

      StarRocks データベースへのアクセスに使用するユーザー名。

      このパラメータをステップ 1: テストデータを準備するで作成したアカウントのユーザー名に設定します。 この例では、test を使用します。

      password

      StarRocks データベースへのアクセスに使用するパスワード。

      このパラメータをステップ 1: テストデータを準備するで作成したアカウントのパスワードに設定します。

      dbname

      StarRocks データベースの名前。

      このパラメータをステップ 1: テストデータを準備するで作成したデータベースの名前に設定します。 この例では、test_cdc を使用します。

ステップ 3: ドラフトを作成して公開する

  1. フルマネージド Flink のコンソールのドラフトエディタページで、CREATE TABLE AS ステートメントを記述します。

    次のいずれかの方法を使用して、CREATE TABLE AS ステートメントを実行できます。

    • 少なくとも 1 回のセマンティクス: sink.buffer-flush.interval-ms パラメータを設定して、StarRocks データベースにデータを書き込む時間間隔を指定できます。 利点は、書き込み間隔が短く、メモリ使用量が少なくて済むことです。

      /*
            少なくとも 1 回のセマンティクス
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl_sr with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', /*
            エンジン、キー、バケットなどのテーブル作成時のサフィックス定義
      */
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '123456',
      'sink.buffer-flush.interval-ms' = '5000', /*
            データの書き込み間隔。ミリ秒単位。
      */
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • 正確に 1 回のセマンティクス: チェックポイントを定期的にスケジュールする間隔を指定する必要があります。 利点は、エラーが発生した場合にデータが失われたり重複したりしないことです。 欠点は、チェックポイント間隔によってデータが表示されるタイミングが決まることです。 詳細については、「チェックポイント」をご参照ください。

      /*
            正確に 1 回のセマンティクス
      */
      set 'execution.checkpointing.interval' = '1 min';
      set 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
      set 'execution.checkpointing.timeout' = '10 min';
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl with (
      'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', /*
            エンジン、キー、バケットなどのテーブル作成時のサフィックス定義
      */
      'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl',
      'username'='test',
      'password' = '123456',
      'sink.semantic' = 'exactly-once', /*
            正確に 1 回のセマンティクス
      */
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      
    • シンプルモード: 利点は、テーブルを作成するときに ApsaraDB RDS for MySQL データベースのテーブルのフィールドをメモする必要がないことです。 作成するテーブルのスキーマは、ApsaraDB RDS for MySQL データベースのテーブルのスキーマと同じです。 このモードは、開発者にとって使いやすいです。 欠点は、パーティションを作成できないことです。 パーティション化する必要があるテーブルの場合は、通常モードでパーティションを作成する必要があります。

      /*
            上記の 2 つのメソッドのサンプルコードは、通常モードの例を示しています。 このサンプルコードは、シンプルモードの例を示しています。
      */
      
      use CATALOG sr;
      
      CREATE TABLE IF NOT EXISTS runoob_tbl1 with (
      'starrocks.create.table.properties'='buckets 8',
      'starrocks.create.table.mode'='simple', /*
            シンプルモード
      */
       'database-name'='test_cdc',
      'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
      'load-url'='172.16.**.**:18030',
      'table-name'='runoob_tbl_sr',
      'username'='test',
      'password' = '123456',
      'sink.buffer-flush.interval-ms' = '5000', /*
            データの書き込み間隔。ミリ秒単位。
      */
      'sink.properties.row_delimiter' = '\x02',
      'sink.properties.column_separator' = '\x01'
      )
       as table mysql.test_cdc.runoob_tbl  /*+ OPTIONS (   'connector' = 'mysql-cdc',
        'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'emr-test',
        'password' = '123456',
        'database-name' = 'test_cdc',
        'table-name' = 'runoob_tbl'  )*/;
                                      

    表 1. WITH 句のパラメータ

    パラメータ

    必須

    説明

    starrocks.create.table.properties

    はい

    サンプルコードの engine、key、buckets など、StarRocks データベースにテーブルを作成するために使用されるステートメントのフィールド定義以外のその他のサフィックス定義。

    database-name

    はい

    StarRocks データベースの名前。

    この例では、test_cdc を使用します。

    jdbc-url

    はい

    StarRocks に接続して StarRocks でクエリを実行するために使用される Java Database Connectivity(JDBC)URL。

    たとえば、URL は jdbc:mysql://172.16.**.**:9030 にすることができます。 URL の 172.16.**.** は、StarRocks クラスタの内部 IP アドレスです。

    load-url

    はい

    フロントエンドの IP アドレスと HTTP ポート。 StarRocks クラスタの内部 IP アドレス:8030 の形式。 この例では、ポート 8030 を使用します。 クラスタのバージョンに基づいてポートを選択します。

    • 18030: EMR V5.9.0 以後のマイナーバージョンのクラスタ、および EMR V3.43.0 以後のマイナーバージョンのクラスタの場合は、このポートを選択します。

    • 8030: EMR V5.8.0、EMR V3.42.0、または EMR V5.8.0 または EMR V3.42.0 より前のマイナーバージョンのクラスタの場合は、このポートを選択します。

    説明

    詳細については、「UI とポートにアクセスする」をご参照ください。

    sink.semantic

    いいえ

    ステートメントを実行するために使用されるセマンティクス。 データの整合性を確保するために、このパラメータを exactly-once に設定します。 デフォルト値: at-least-once。

    starrocks.create.table.mode

    いいえ

    有効な値:

    • normal: デフォルト値。 サンプルコードの starrocks.create.table.properties パラメータに、engine、key、buckets などの完全な設定を入力する必要があります。

    • simple: デフォルトでは、engine パラメータは olap に設定され、key パラメータは primary key に設定されます。 主キーは、ApsaraDB RDS for MySQL テーブルの主キーと同じです。 distributed by hash パラメータはすべての主キーに対して設定され、パーティションは存在しません。 starrocks.create.table.properties パラメータの buckets パラメータは必須です。 properties などのその他のパラメータはオプションです。

    説明
    • vvr-6.0.5-flink-1.15 より前の Flink のバージョンを使用する場合は、'sink.use.new-apiapi'='false', を WITH 句に追加する必要があります。

    • その他の設定の詳細については、「Apache Flink から継続的にデータを読み込む」をご参照ください。

    表 2. OPTIONS 句のパラメータ

    パラメータ

    説明

    connector

    コネクタのタイプ。 値を mysql-cdc に設定します。

    hostname

    ApsaraDB RDS for MySQL インスタンスの内部エンドポイント。

    ApsaraDB RDS コンソールの ApsaraDB RDS for MySQL インスタンスのデータベース接続ページで内部エンドポイントをコピーできます(例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com)。

    port

    ApsaraDB RDS for MySQL データベースのポート番号。 デフォルト値: 3306。

    username

    ApsaraDB RDS for MySQL データベースへのアクセスに使用するユーザー名。

    このパラメータをステップ 1: テストデータを準備するで作成したアカウントのユーザー名に設定します。 この例では、test を使用します。

    password

    ApsaraDB RDS for MySQL データベースへのアクセスに使用するパスワード。

    このパラメータをステップ 1: テストデータを準備するで作成したアカウントのパスワードに設定します。

    table-name

    StarRocks データベースのテーブルの名前。

    このパラメータをステップ 1: テストデータを準備するで作成したテーブルの名前に設定します。 この例では、runoob_tbl を使用します。

    database-name

    デフォルトの ApsaraDB RDS for MySQL データベースの名前。

    このパラメータをステップ 1: テストデータを準備するで作成したデータベースの名前に設定します。 この例では、test_cdc を使用します。

  2. ドラフトエディタページの詳細設定タブで、エンジンバージョンを vvr-6.0.3-flink-1.15 以降に設定します。

  3. ドラフトエディタページの右上隅にある[公開]をクリックします。

  4. フルマネージド Flink のコンソールのデプロイメントページで、開始するデプロイメントを見つけ、アクション列の[開始]をクリックします。

ステップ 4: さまざまなシナリオでの手順を実演する

データのクエリ

  1. SSH モードで StarRocks クラスタにログオンします。 詳細については、「クラスタにログオンする」をご参照ください。

  2. 次のコマンドを実行して、StarRocks クラスタに接続します。

    mysql -h127.0.0.1 -P 9030 -uroot
  3. StarRocks クラスタの CLI で、次のステートメントを実行してテーブルデータをクエリします。

    use test_cdc;
    select * from runoob_tbl1;

    次の出力が返されます。 これは、データが ApsaraDB RDS for MySQL インスタンスから StarRocks クラスタに同期されていることを示しています。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

挿入されたデータのクエリ

  1. ApsaraDB RDS for MySQL データベースの SQL コンソールタブで、次のステートメントを実行してデータを挿入します。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`)  values(1,'second','tom2','2022-06-23',1)
  2. StarRocks クラスタの CLI で、次のステートメントを実行してテーブルデータをクエリします。

    select * from runoob_tbl1;

    次の出力が返されます。 これは、データが挿入されていることを示しています。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | first        | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

更新されたデータの同期

  1. ApsaraDB RDS for MySQL データベースの SQL コンソールタブで、次のステートメントを実行して特定のデータを更新します。

    update runoob_tbl set runoob_title= 'new' where runoob_id = 18
  2. StarRocks クラスタの CLI で、次のステートメントを実行してテーブルデータをクエリします。

    select * from runoob_tbl1;

    次の出力が返されます。 これは、更新されたデータが同期されていることを示しています。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |         1 | second       | tom2          | 2022-06-23      |       1 |
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

削除されたデータの同期

  1. ApsaraDB RDS for MySQL データベースの SQL コンソールタブで、次のステートメントを実行して特定のデータを削除します。

    DELETE FROM runoob_tbl WHERE runoob_id = 1
  2. StarRocks クラスタの CLI で、次のステートメントを実行してテーブルデータをクエリします。

    select * from runoob_tbl1;

    次の出力が返されます。 これは、削除されたデータが同期されていることを示しています。

    +-----------+--------------+---------------+-----------------+---------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col |
    +-----------+--------------+---------------+-----------------+---------+
    |        18 | new          | tom           | 2022-06-22      |       3 |
    +-----------+--------------+---------------+-----------------+---------+

NULL 値を許容する列を追加する

  1. ApsaraDB RDS for MySQL データベースの SQL コンソールタブで、次のステートメントを実行して NULL 値を許容する列を追加します。

    alter table `runoob_tbl` add COLUMN `add_col2` INT;
  2. 次のステートメントを実行して、テーブルにデータを挿入します。

    INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`)  values(1,'second','tom2','2022-06-23',1,2)
  3. StarRocks クラスタの CLI で、次のステートメントを実行してテーブルデータをクエリします。

    select * from runoob_tbl1;

    次の出力が返されます。 これは、スキーマが変更され、NULL 値を許容する列が追加されていることを示しています。

    +-----------+--------------+---------------+-----------------+---------+----------+
    | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
    +-----------+--------------+---------------+-----------------+---------+----------+
    |        18 | new          | tom           | 2022-06-22      |       3 |     NULL |
    |         1 | second       | tom2          | 2022-06-23      |       1 |        2 |
    +-----------+--------------+---------------+-----------------+---------+----------+

CREATE DATABASE AS

CREATE DATABASE AS ステートメントは、CREATE TABLE AS ステートメントの糖衣構文です。 CREATE DATABASE AS ステートメントを使用して、ApsaraDB RDS for MySQL データベースのデータを同期できます。 データ同期中に、Flink デプロイメントが生成されます。 ソースは ApsaraDB RDS for MySQL データベースであり、デスティネーションテーブルは StarRocks データベースのテーブルです。 CREATE DATABASE AS ステートメントを実行するときに、INCLUDING TABLE 構文を使用して、データベース内の特定のテーブルのみを選択することもできます。

CREATE TABLE AS ステートメントの実行と同様に、CREATE DATABASE AS ステートメントを実行する前に、ApsaraDB RDS for MySQL データベースと StarRocks データベースにカタログを作成する必要があります。 サンプルステートメント:

CREATE DATABASE IF NOT EXISTS sr_db with (
  'starrocks.create.table.properties'=' buckets 8', /*
      テーブル作成時のサフィックス定義。例: buckets 8。
*/
  'starrocks.create.table.mode'='simple', /*
      シンプルモード。
*/
  'jdbc-url'='jdbc:mysql://172.16.**.**:9030',
  'load-url'='172.16.**.**:18030',
  'username'='test',
  'password' = '123456',
  'sink.buffer-flush.interval-ms' = '5000' , /* データの書き込み間隔。ミリ秒単位。 */
  'sink.properties.row_delimiter' = '\x02',
  'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table  'tabl1','tbl2','tbl3' /*
      同期するテーブルを指定します。
*/
/*+ OPTIONS (   'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;