すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Use the CTAS and CDAS statements of Realtime Compute for Apache Flink to synchronize data from an ApsaraDB RDS for MySQL instance to a StarRocks cluster

最終更新日:Mar 26, 2026

CREATE TABLE AS (CTAS) ステートメントは、StarRocks に一致するテーブルを自動的に作成し、MySQL ソースからデータとスキーマ変更を継続的に同期します。CREATE DATABASE AS (CDAS) ステートメントは、これをデータベース全体に拡張します。このトピックでは、Realtime Compute for Apache Flink で両方のステートメントを使用して、ApsaraDB RDS for MySQL から E-MapReduce (EMR) StarRocks クラスターにトランザクション処理 (TP) データを分析処理 (AP) のために移動する方法について説明します。

アーキテクチャの概要

コンポーネントロール
ApsaraDB RDS for MySQLソース — 変更データキャプチャ (CDC) イベントとスキーマ変更の発生元
Realtime Compute for Apache Flink処理エンジン — MySQL CDC を読み取り、StarRocks に書き込み
EMR StarRocks クラスターシンク — 同期されたデータの分析送信先

仕組み

CTAS ステートメントを実行すると、Flink は次の 2 つの操作を順次実行します。

  1. テーブル作成チェック — Flink は、StarRocks に送信先テーブルが存在するかどうかを確認します。

    • 存在しない場合、Flink は送信先カタログに基づいて、ソースと同じスキーマを持つテーブルを作成します。

    • 既に存在する場合、Flink は作成をスキップします。既存のスキーマがソーススキーマと異なる場合、エラーが返されます。

  2. データ同期 — Flink は、ソーステーブルから送信先テーブルへのデータとスキーマ変更の両方を同期する継続的なジョブを開始します。

スキーマ変更の動作

CTAS は、スキーマ変更を伝播するための固定ポリシーを使用します。次の表は、自動的に伝播されるものとされないものをまとめたものです。

サポートされているスキーマ変更

変更タイプStarRocks での動作
NULL 許容列の追加列は送信先テーブルの末尾に追加され、入力データが列を埋めます。
NULL 許容列の削除列は StarRocks に保持されますが、NULL 値で埋められます。
列名の変更新しい名前の列が末尾に追加され、古い名前の列は NULL 値で埋められます。たとえば、col_acol_b に変更すると、col_b が末尾に追加され、col_aNULL に設定されます。

サポートされていないスキーマ変更

ソーステーブルで次のいずれかの変更が発生した場合、StarRocks の送信先テーブルを削除し、CTAS ジョブを再起動します。Flink はテーブルを再作成し、すべての既存データを再同期します。

  • 列のデータの型を変更する (例: VARCHAR から BIGINT、または NOT NULL から NULLABLE)

  • プライマリキーやインデックスなどの制約を変更する

  • NULL 不可列を追加または削除する

  • DDL (データ定義言語) ステートメントでフィールド長を調整する

CTAS は、連続するデータレコードのスキーマを比較することでスキーマ変更を検出します。DDL ステートメントのタイプは解析しません。結果として、次のようになります。
列が削除され、その間にデータ変更なしで再追加された場合、CTAS はこれをスキーマ変更なしとして扱います。
スキーマ変更は、変更後に新しいデータがソーステーブルに到着した場合にのみ伝播されます。
フィールドタイプのマッピングについては、「Apache Flink からデータを継続的にロードする」をご参照ください。
CTAS が複数の MySQL テーブルをマージする場合、Flink はソースを追跡するために、送信先テーブルに _db_name および _table_name 列を自動的に先頭に追加します。3 番目の列から独自の列順序を定義します。

前提条件

開始する前に、次のものがあることを確認してください。

このトピックの例では、MySQL 5.7、EMR 3.39.1、および Flink 完全管理 vvr-6.0.3-flink-1.15 を使用しています。

制限事項

  • Flink ワークスペース、StarRocks クラスター、および ApsaraDB RDS for MySQL インスタンスは、同じ VPC 内にある必要があります。

  • ApsaraDB RDS for MySQL エンジンバージョンは 5.7 以降である必要があります。

  • StarRocks クラスターのインターネットアクセスは有効にする必要があります。

  • Flink 完全管理は vvr-6.0.3-flink-1.15 以降である必要があります。

ステップ 1: テストデータの準備

  1. ApsaraDB RDS for MySQL インスタンスにデータベースとアカウントを作成します。詳細については、「ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントを作成」をご参照ください。テストアカウントに読み取り/書き込み権限を付与します。

    このトピックでは、test_cdc という名前のデータベースと test という名前のアカウントを使用します。
  2. テストアカウントを使用して ApsaraDB RDS for MySQL インスタンスにログインします。詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログイン」をご参照ください。

  3. テストテーブルを作成し、行を挿入します。

    USE test_cdc;
    
    CREATE TABLE IF NOT EXISTS `runoob_tbl` (
      `runoob_id`      INT UNSIGNED AUTO_INCREMENT,
      `runoob_title`   VARCHAR(100) NOT NULL,
      `runoob_author`  VARCHAR(40)  NOT NULL,
      `submission_date` DATE,
      `add_col`        INT DEFAULT NULL,
      PRIMARY KEY (`runoob_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    INSERT INTO test_cdc.`runoob_tbl`
      (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
    VALUES (18, 'first', 'tom', '2022-06-22', 3);
  4. SSH 経由で StarRocks クラスターにログインします。詳細については、「クラスターにログイン」をご参照ください。

  5. StarRocks に接続します。

    mysql -h127.0.0.1 -P 9030 -uroot
  6. このチュートリアルに必要なユーザーを作成し、権限を付与します。

    CREATE DATABASE test_cdc;
    CREATE USER 'test' IDENTIFIED BY '123456';
    GRANT CREATE TABLE ON DATABASE test_cdc TO test;

ステップ 2: Flink SQL エディターでカタログを作成

Flink 完全管理コンソールの [ドラフトエディター] ページで、MySQL 用と StarRocks 用のカタログをそれぞれ 1 つ作成します。「Flink SQL デプロイメントの開始」をご参照ください。

以下のパラメーター値は例です。ご利用の環境に合わせて調整してください。

MySQL カタログ

CREATE CATALOG mysql WITH (
  'type'             = 'mysql',
  'hostname'         = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'             = '3306',
  'username'         = 'test',
  'password'         = '123456',
  'default-database' = 'test_cdc'
);
パラメーター説明
typeカタログタイプ。mysql に設定します。
hostnameApsaraDB RDS for MySQL インスタンスの内部エンドポイントです。ApsaraDB RDS コンソールの [データベース接続] ページからコピーしてください(例: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com)。
portMySQL データベースのポート。デフォルト: 3306
usernameステップ 1: テストデータの準備で作成したユーザー名。この例では、test です。
passwordステップ 1: テストデータの準備で作成したユーザー名のパスワード。
default-databaseステップ 1: テストデータの準備で作成したデータベース名。この例では、test_cdc です。

StarRocks カタログ

CREATE CATALOG sr WITH (
  'type'     = 'starrocks',
  'endpoint' = '172.16.**.**:9030',
  'username' = 'test',
  'password' = '123456',
  'dbname'   = 'test_cdc'
);
パラメーター説明
typeカタログタイプ。starrocks に設定します。
endpointStarRocks フロントエンドの IP アドレスとポート (例: 172.16.**.**:9030)。
usernameステップ 1: テストデータの準備で作成したユーザー名。この例では、test です。
passwordステップ 1: テストデータの準備で作成したユーザー名のパスワード。
dbnameStarRocks データベース名。この例では、test_cdc です。

ステップ 3: CTAS デプロイメントの記述と公開

[ドラフトエディター] ページで、CTAS 文を記述します。配信モードは 3 つあり、一貫性の要件に基づいて選択します。

At-least-once セマンティクス (低レイテンシーシナリオに推奨)

データは構成可能なフラッシュ間隔で書き込まれます。メモリ使用量は少ないですが、障害発生時に重複書き込みが発生する可能性があります。

/* At-least-once semantics */

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl_sr WITH (
  'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl_sr',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

1 回限りのセマンティクス (データが重要なシナリオに推奨)

障害発生時にデータ損失や重複はありません。データの可視性はチェックポイント間隔に依存します。

/* Exactly-once semantics */

SET 'execution.checkpointing.interval' = '1 min';
SET 'execution.checkpointing.mode'     = 'EXACTLY_ONCE';
SET 'execution.checkpointing.timeout'  = '10 min';

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl WITH (
  'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.semantic'                     = 'exactly-once',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

チェックポイントの構成オプションについては、「チェックポイント」をご参照ください。

シンプルモード (クイックセットアップに推奨)

Flink は MySQL スキーマからテーブル定義を推論します。エンジン、キー、またはディストリビューションを手動で指定する必要はありません。シンプルモードではパーティションテーブルはサポートされていません。代わりに通常モードを使用してパーティションを作成してください。

/* Simple mode */

USE CATALOG sr;

CREATE TABLE IF NOT EXISTS runoob_tbl1 WITH (
  'starrocks.create.table.properties' = 'buckets 8',
  'starrocks.create.table.mode'       = 'simple',
  'database-name'                     = 'test_cdc',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'table-name'                        = 'runoob_tbl_sr',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc',
  'table-name'    = 'runoob_tbl'
) */;

WITH 句パラメーター

パラメーター必須説明
starrocks.create.table.propertiesはいStarRocks の CREATE TABLE ステートメントのサフィックス定義 (フィールド定義を除く) — 例: enginekeybuckets
database-nameはいStarRocks データベース名。
jdbc-urlはいStarRocks クエリ用の Java Database Connectivity (JDBC) URL — 例: jdbc:mysql://172.16.**.**:9030 (172.16.**.** は StarRocks クラスターの内部 IP アドレスです)。
load-urlはいStarRocks フロントエンドの内部 IP アドレスと HTTP ポート。EMR クラスターのバージョンに基づいてポートを選択します: EMR V5.9.0 以降 (マイナーバージョン) および EMR V3.43.0 以降 (マイナーバージョン) の場合は 18030。EMR V5.8.0、EMR V3.42.0、またはそれ以前の場合は 8030UI とポートへのアクセス。詳細については、「」をご参照ください。
sink.semanticいいえ配信セマンティクス: at-least-once (デフォルト) または exactly-once
starrocks.create.table.modeいいえnormal (デフォルト): starrocks.create.table.propertiesenginekey、および buckets を指定します。simple: Flink は engine=olap を設定し、MySQL プライマリキーを使用し、すべてのプライマリキー列にわたってハッシュで分散します。starrocks.create.table.properties では buckets のみが必要です。パーティションは作成されません。
お使いの Flink バージョンが vvr-6.0.5-flink-1.15 より前の場合、WITH 句に 'sink.use.new-apiapi' = 'false' を追加します。追加の sink パラメーターについては、「Apache Flink からデータを継続的にロードする」をご参照ください。

OPTIONS 句パラメーター

OPTIONS 句は MySQL CDC ソースを構成します。

パラメーター説明
connectorコネクタタイプ。mysql-cdc に設定します。
hostnameApsaraDB RDS for MySQL インスタンスの内部エンドポイント (例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com)。
portMySQL ポート。デフォルト: 3306
usernameApsaraDB RDS for MySQL アクセス用のユーザー名。ステップ 1: テストデータの準備で作成したアカウントを使用します。
passwordApsaraDB RDS for MySQL アクセス用のパスワード。
table-nameソーステーブル名。この例では、runoob_tbl です。
database-nameソースデータベース名。この例では、test_cdc です。

デプロイメントの公開と開始

  1. [詳細設定] タブの [下書きエディタ] ページで、[エンジンバージョン] を vvr-6.0.3-flink-1.15 以降に設定します。

  2. 下書きエディタ」ページの右上隅で、[公開] をクリックします。

  3. [デプロイメント] ページで、新しいデプロイメントを見つけ、[操作] 列の [開始] をクリックします。

ステップ 4: 同期の検証

ジョブ開始後、次のシナリオを実行して、データ変更とスキーマ変更の両方が StarRocks にリアルタイムで伝播されることを確認します。

検証クエリを実行する前に StarRocks に接続します。

mysql -h127.0.0.1 -P 9030 -uroot
USE test_cdc;

初期データの検証

StarRocks テーブルをクエリして、シード行が同期されたことを確認します。

SELECT * FROM runoob_tbl1;

期待される出力:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|        18 | first        | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

INSERT の検証

ApsaraDB RDS for MySQL インスタンスの [SQL Console] タブで、行を挿入します:

INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1);

両方の行が表示されることを確認するために StarRocks をクエリします。

SELECT * FROM runoob_tbl1;

期待される出力:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|         1 | second       | tom2          | 2022-06-23      |       1 |
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

UPDATE の検証

MySQL SQL コンソールで、行を更新します。

UPDATE runoob_tbl SET runoob_title = 'new' WHERE runoob_id = 18;

変更を確認するために StarRocks をクエリします。

SELECT * FROM runoob_tbl1;

期待される出力:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|         1 | second       | tom2          | 2022-06-23      |       1 |
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

DELETE の検証

MySQL SQL コンソールで、行を削除します。

DELETE FROM runoob_tbl WHERE runoob_id = 1;

行が削除されたことを確認するために StarRocks をクエリします。

SELECT * FROM runoob_tbl1;

期待される出力:

+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
|        18 | new          | tom           | 2022-06-22      |       3 |
+-----------+--------------+---------------+-----------------+---------+

スキーマ変更の検証: NULL 許容列の追加

MySQL SQL コンソールで、NULL 許容列を追加し、その列に値を持つ行を挿入します。

ALTER TABLE `runoob_tbl` ADD COLUMN `add_col2` INT;

INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`, `add_col2`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1, 2);

新しい列が表示され、既存の行が NULL を示すことを確認するために StarRocks をクエリします。

SELECT * FROM runoob_tbl1;

期待される出力:

+-----------+--------------+---------------+-----------------+---------+----------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
+-----------+--------------+---------------+-----------------+---------+----------+
|        18 | new          | tom           | 2022-06-22      |       3 |     NULL |
|         1 | second       | tom2          | 2022-06-23      |       1 |        2 |
+-----------+--------------+---------------+-----------------+---------+----------+

add_col2 列は、それを含む最初のデータ行が到着したときに StarRocks に自動的に追加されました。

CDAS: データベース全体の同期

CREATE DATABASE AS (CDAS) ステートメントは CTAS の糖衣構文です。これは、MySQL データベースから StarRocks へ、選択されたすべてのテーブルを一度に同期する 1 つの Flink デプロイメントを作成します。INCLUDING TABLE 句を使用して、名前で特定のテーブルを選択します。

まず MySQL および StarRocks カタログを作成します (ステップ 2 と同じ)。次に、以下を実行します。

CREATE DATABASE IF NOT EXISTS sr_db WITH (
  'starrocks.create.table.properties' = 'buckets 8',
  'starrocks.create.table.mode'       = 'simple',
  'jdbc-url'                          = 'jdbc:mysql://172.16.**.**:9030',
  'load-url'                          = '172.16.**.**:18030',
  'username'                          = 'test',
  'password'                          = '123456',
  'sink.buffer-flush.interval-ms'     = '5000',
  'sink.properties.row_delimiter'     = '\x02',
  'sink.properties.column_separator'  = '\x01'
)
AS DATABASE mysql.test_cdc INCLUDING TABLE 'tbl1', 'tbl2', 'tbl3'
/*+ OPTIONS (
  'connector'     = 'mysql-cdc',
  'hostname'      = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
  'port'          = '3306',
  'username'      = 'test',
  'password'      = '123456',
  'database-name' = 'test_cdc'
) */;

INCLUDING TABLE 句は、テーブル名のコンマ区切りリストを受け入れます。ソースデータベース内のすべてのテーブルを同期するには、これを省略します。

次のステップ