このトピックでは、CREATE TABLE AS (CTAS) 文を使用して、Realtime Compute for Apache Flink で MySQL から EMR Serverless StarRocks にデータを同期する方法について説明します。
背景情報
CTAS または CREATE DATABASE AS (CDAS) 文を使用して、MySQL から EMR Serverless StarRocks にデータを同期できます。CTAS 文は単一テーブルのスキーマとデータを同期でき、CDAS 文はデータベース全体または同じデータベース内の複数テーブルのスキーマとデータを同期できます。このトピックでは CTAS 文を使用します。CDAS 文は CTAS 文と同様の方法で使用されます。詳細については、「CDAS の概要」をご参照ください。
CREATE TABLE AS (CTAS) 文を使用すると、MySQL のテーブルと同じスキーマを持つテーブルを StarRocks に自動的に作成し、データを同期できます。また、ソーステーブルのスキーマ変更を宛先テーブルにリアルタイムで同期することもでき、宛先ストレージでのテーブル作成やソーステーブルのスキーマ変更の維持の効率が向上します。
CTAS 文を実行すると、Flink は次の操作を実行します。
宛先ストレージに宛先テーブルが存在するかどうかを確認します。
宛先テーブルが存在しない場合、宛先テーブルが属するカタログに基づいて、対応する宛先テーブルが宛先ストレージに作成されます。宛先テーブルはソーステーブルと同じスキーマを持ちます。
宛先テーブルが存在する場合、Flink はテーブル作成ステップをスキップします。既存の宛先テーブルのスキーマがソーステーブルのスキーマと異なる場合、エラーメッセージが返されます。
データ同期ジョブをコミットして実行します。Flink は、データとソーステーブルのスキーマの変更を宛先テーブルに同期します。
スキーマ変更同期ポリシーは、CTAS 文を使用して、データとソーステーブルのスキーマの変更を宛先テーブルにリアルタイムで同期します。
スキーマの変更には、テーブルの作成と、テーブル作成後のスキーマの変更が含まれます。
次のスキーマ変更がサポートされています。
NULL 値を許容する列の追加: 文は、関連する列を宛先テーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。
NULL 値を許容する列の削除: 文は、テーブルから列を削除する代わりに、宛先テーブルの NULL 値を許容する列を自動的に NULL 値で埋めます。
列の名前変更: 文は、名前が変更された列を宛先テーブルの末尾に追加し、名前変更前の列を NULL 値で埋めます。
たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列が宛先テーブルの末尾に追加され、col_a 列は自動的に NULL 値で埋められます。
次のスキーマ変更はサポートされていません。
データ型の変更。
たとえば、列のデータが VARCHAR 型から BIGINT 型に変更されたり、列のプロパティが NOT NULL から NULLABLE に変更されたりする場合です。
プライマリキーやインデックスなどの制約の変更。
NULL 値を許容しない列の追加または削除。
- ソーステーブルのスキーマに前述の変更のいずれかがある場合は、宛先テーブルを削除し、CTAS 文を実行するジョブを再起動する必要があります。これにより、宛先テーブルが再度作成され、既存データが宛先テーブルに再同期されます。
- CTAS 文は DDL 文の種類を識別しませんが、スキーマが変更される前後の 2 つのデータレコード間のスキーマの違いを比較します。したがって、列を削除してから再度追加し、列を削除および追加するために使用される 2 つの DDL 文の間にデータ変更がない場合、CTAS 文はスキーマ変更が発生しなかったと見なします。同様に、ソーステーブルに列を追加しても、CTAS 文はスキーマ変更同期をトリガーしません。文は、ソーステーブルのデータが変更された場合にのみスキーマ変更を識別します。この場合、文はスキーマ変更を宛先テーブルに同期します。
- CTAS 文でサポートされているフィールドタイプの詳細については、「Continuously load data from Apache Flink®」をご参照ください。
前提条件
フルマネージド Flink がアクティブ化され、Flink クラスターが作成されていること。詳細については、「フルマネージド Flink のアクティブ化」および「Flink SQL ジョブのクイックスタート」をご参照ください。
EMR Serverless StarRocks インスタンスが作成されていること。詳細については、「インスタンスの作成」をご参照ください。
ApsaraDB RDS for MySQL インスタンスが作成されていること。詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
このトピックでは、MySQL 5.7 と Flink バージョン vvr-8.0.11-flink-1.17 を例として使用します。
制限事項
Flink クラスター、EMR Serverless StarRocks インスタンス、および ApsaraDB RDS for MySQL インスタンスは、同じ VPC 内にある必要があります。
ApsaraDB RDS for MySQL インスタンスは、バージョン 5.7 以降である必要があります。
ステップ 1: テストデータの準備
テストデータベースとアカウントを作成します。詳細については、「データベースとアカウントの作成」をご参照ください。
データベースとアカウントを作成した後、テストアカウントに読み取りおよび書き込み権限を付与します。
説明このトピックでは、データベース名は test_cdc、アカウント名は test です。
作成したテストアカウントを使用して MySQL インスタンスに接続します。詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインする」をご参照ください。
MySQL で、次のコマンドを実行してデータテーブルを作成します。
use test_cdc; -- テーブルを作成 CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; -- データを挿入 INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2025-06-22 17:13:44',3)EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。
次のコマンドを実行して、test_cdc データベースとテストユーザーを作成します。ユーザーはスーパー管理者 (パスワード例: 1qaz!QAZ) として作成するか、一般ユーザーとして作成してデータベースとそのテーブルに対する権限を付与できます。詳細については、「ユーザーの管理」をご参照ください。
-- データベースを作成 CREATE DATABASE test_cdc; -- ユーザーを作成 CREATE USER 'test' IDENTIFIED by '1qaz!QAZ'; -- ユーザーにデータベース権限を付与 GRANT ALL on test_cdc to test; -- ユーザーにテーブル権限を付与 GRANT ALL ON ALL TABLES IN DATABASE test_cdc to test;
ステップ 2: Flink コンソールでカタログを作成する
Realtime Compute for Apache Flink コンソールの Data Management ページで MySQL と StarRocks のカタログを作成します。詳細については、「Data Management」をご参照ください。
パラメーター設定は参照用です。必要に応じてパラメーターを設定してください。
MySQL カタログ
コード例
CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr-test', 'password' = '123456', 'default-database' = 'test_cdc' );パラメーター
パラメーター
説明
type
タイプ。値は mysql に固定されます。
hostname
RDS インスタンスの内部エンドポイント。RDS インスタンスの [データベース接続] ページに移動し、内部エンドポイントをクリックしてコピーできます。例: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com。
port
MySQL データベースサービスのポート番号。デフォルト値は 3306 です。
username
MySQL データベースサービスのユーザー名。
「ステップ 1: テストデータの準備」のアカウントのユーザー名を入力します。この例では、test を使用します。
password
MySQL データベースサービスのパスワード。
「ステップ 1: テストデータの準備」のアカウントのパスワードを入力します。
default-database
デフォルトの MySQL データベース名。
「ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。
StarRocks カタログ
コード例
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'username' = 'test', 'password' = '1qaz!QAZ', 'dbname' = 'test_cdc' );パラメーター
パラメーター
説明
type
タイプ。値は starrocks に固定されます。
endpoint
フロントエンド (FE) ノードの内部エンドポイントとクエリポート。フォーマットは
EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:9030です。例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。
説明EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストとインスタンスの詳細の表示」をご参照ください。
username
StarRocks のユーザー名。
「ステップ 1: テストデータの準備」のアカウントのユーザー名を入力します。この例では、test です。
password
StarRocks データベースサービスのパスワード。
「ステップ 1: テストデータの準備」のアカウントのパスワードを入力します。
dbname
StarRocks データベース名。
「ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc です。
ステップ 3: ジョブの作成とデプロイ
Realtime Compute for Apache Flink コンソールの ページで、CTAS 文を記述します。
次の 3 つの例が提供されています。
少なくとも 1 回のセマンティクス: sink.buffer-flush.interval-ms 設定項目を使用して、データが StarRocks に書き込まれる間隔を設定します。このモードは、短い書き込み間隔と低いメモリ使用量を提供します。
/* 少なくとも 1 回のセマンティクス */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl;1 回限りのセマンティクス: チェックポイント間隔を指定する必要があります。利点は、エラーが発生したときにデータが失われたり重複したりしないことです。欠点は、チェックポイント間隔によってデータがいつ表示されるかが決まることです。詳細については、「チェックポイント」をご参照ください。
/* 1 回限りのセマンティクス。 */ set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl;シンプルモード: テーブルを作成するときに、ソーステーブルのフィールドを指定する必要はありません。テーブルは MySQL テーブルのスキーマに基づいて作成されます。これは開発者にとって便利です。ただし、パーティションを作成することはできません。パーティションが必要なテーブルは、通常モードで作成する必要があります。
/* 上記の 2 つの例では通常モードを使用しています。この例ではシンプルモードを使用しています。 */ use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl with ( 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl;
表 1. WITH パラメーター
パラメーター
必須
説明
starrocks.create.table.properties
はい
フィールド定義を除く、StarRocks テーブル作成文のその他のサフィックス定義。例: engine、key、および buckets。
database-name
はい
StarRocks データベース名。
このトピックでは、データベース名は test_cdc です。
jdbc-url
はい
StarRocks でクエリを実行するために使用されます。
例: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。この例では、
fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comは EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントです。説明EMR Serverless StarRocks インスタンスの FE ノードの内部ネットワークアドレスを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。
load-url
はい
FE ノードの内部エンドポイントとクエリポート。フォーマットは
EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:8030です。例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。
説明EMR Serverless StarRocks インスタンスの FE ノードの内部ネットワークアドレスを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。
sink.semantic
いいえ
値を exactly-once に設定して、1 回限りのセマンティクスを保証します。デフォルト値は at-least-once です。
starrocks.create.table.mode
いいえ
次の値がサポートされています。
通常モード (デフォルト): 例に示すように、starrocks.create.table.properties パラメーターに engine、key、buckets などの完全な設定を指定する必要があります。
シンプルモード: engine はデフォルトで olap です。キータイプはプライマリキーで、プライマリキーは MySQL テーブルのプライマリキーと同じです。デフォルトの分散はハッシュ (すべてのプライマリキー) で、パーティションはありません。starrocks.create.table.properties パラメーターでは、buckets は必須で、properties はオプションです。
sink.properties.row_delimiter
いいえ
カスタムの行区切り文字。
sink.properties.column_separator
いいえ
カスタムの列区切り文字。
説明vvr-6.0.5-flink-1.15 より前のバージョンの Flink を使用する場合、WITH 句に
'sink.use.new-apiapi'='false',を追加する必要があります。その他の構成の詳細については、「Apache Flink からの継続的なデータロード」をご参照ください。
表 2. OPTIONS パラメーター
パラメーター
説明
connector
タイプ。値は mysql-cdc に固定されます。
hostname
RDS インスタンスの内部エンドポイント。
RDS インスタンスの [データベース接続] ページに移動し、内部エンドポイントをクリックしてコピーできます。例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port
MySQL データベースサービスのポート番号。デフォルト値は 3306 です。
username
MySQL データベースサービスのユーザー名。
「ステップ 1: テストデータの準備」のアカウントのユーザー名を入力します。この例では、test を使用します。
password
MySQL データベースサービスのパスワード。
「ステップ 1: テストデータの準備」のアカウントのパスワードを入力します。
table-name
StarRocks のテーブル名。
「ステップ 1: テストデータの準備」で作成したテーブル名を入力します。この例では、runoob_tbl です。
database-name
デフォルトの MySQL データベース名。
「ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc です。
[デプロイ] をクリックします。
[新しいバージョンのデプロイ] ページで、デプロイメントターゲットを選択し、[OK] をクリックします。
[ジョブ O&M] ページで、開始したいジョブの [アクション] 列にある [開始] をクリックします。
Realtime Compute for Apache Flink コンソールでは CTAS 文をデバッグできません。
ステップ 4: データ同期結果の検証
データのクエリ
EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。
StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。
use test_cdc; select * from runoob_tbl;次の結果が返され、MySQL からのデータが StarRocks に同期されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
挿入されたデータの同期
RDS データベースウィンドウで、次のコマンドを実行してデータを挿入します。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1)StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。
select * from runoob_tbl;次の結果が返され、データが挿入されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2025-06-23 | 1 | | 18 | first | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
更新されたデータの同期
RDS データベースウィンドウで、次のコマンドを実行して指定されたデータを更新します。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。
select * from runoob_tbl;次の結果が返され、データが更新されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2025-06-23 | 1 | | 18 | new | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
削除されたデータの同期
RDS データベースウィンドウで、次のコマンドを実行して指定されたデータを削除します。
DELETE FROM runoob_tbl WHERE runoob_id = 1StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。
select * from runoob_tbl;次の結果が返され、データが削除されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2025-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
追加された NULL 値を許容する列の同期
RDS データベースウィンドウで、次のコマンドを実行して NULL 値を許容する列を追加します。
alter table `runoob_tbl` add COLUMN `add_col2` INT;次のコマンドを実行してデータを挿入します。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)StarRocks 接続ウィンドウで、次のコマンドを実行してテーブルデータを表示します。
select * from runoob_tbl;次の結果が返され、スキーマが変更されたことを示します。
+-----------+--------------+---------------+-----------------+---------+----------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 | +-----------+--------------+---------------+-----------------+---------+----------+ | 18 | new | tom | 2025-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+----------+ | 1 | second | tom2 | 2025-06-23 | 1 | 2 | +-----------+--------------+---------------+-----------------+---------+----------+
CDAS の概要
CDAS は CTAS の糖衣構文です。CDAS 文を使用すると、MySQL からのデータベース全体の同期が可能になります。これにより、ソースが MySQL のデータベースで、宛先が StarRocks の対応するテーブルセットである Flink ジョブが生成されます。`including table` 構文を使用して、データベースからテーブルのサブセットのみを選択して CDAS 操作を行うこともできます。
CTAS の実行と同様に、CDAS 文を実行する前に MySQL と StarRocks のカタログを作成する必要があります。次のコードは構文の例です。
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASE mysql.test_cdc including table
'tabl1','tbl2','tbl3';リファレンス
Realtime Compute for Apache Flink は、CTAS 文だけでなく、データインジェスト YAML ファイルを使用した StarRocks へのデータ同期もサポートしています。詳細については、「データインジェスト」をご参照ください。