このトピックでは、EMR Dataflow クラスターの Flink サービスを使用して、CREATE TABLE AS (CTAS) 文で MySQL から EMR Serverless StarRocks にデータを同期する方法について説明します。
背景情報
CTAS または CREATE DATABASE AS (CDAS) 文を使用して、MySQL から EMR Serverless StarRocks にデータを同期できます。CTAS 文は単一テーブルのスキーマとデータを同期でき、CDAS 文はデータベース全体または同じデータベース内の複数テーブルのスキーマとデータを同期できます。このトピックでは CTAS 文を使用します。CDAS 文は CTAS 文と同様の方法で使用されます。詳細については、「CDAS の概要」をご参照ください。
CREATE TABLE AS (CTAS) 文を使用すると、MySQL のテーブルと同じスキーマを持つテーブルを StarRocks に自動的に作成し、データを同期できます。また、ソーステーブルのスキーマ変更をリアルタイムで宛先テーブルに同期することもでき、宛先ストレージでのテーブル作成とソーステーブルのスキーマ変更の維持の効率が向上します。
CTAS 文を実行すると、Flink は次の操作を実行します。
宛先ストレージに宛先テーブルが存在するかどうかを確認します。
宛先テーブルが存在しない場合、宛先テーブルが属するカタログに基づいて、対応する宛先テーブルが宛先ストレージに作成されます。宛先テーブルはソーステーブルと同じスキーマを持ちます。
宛先テーブルが存在する場合、Flink はテーブル作成ステップをスキップします。既存の宛先テーブルのスキーマがソーステーブルのスキーマと異なる場合、エラーメッセージが返されます。
データ同期ジョブをコミットして実行します。Flink は、ソーステーブルのデータとスキーマの変更を宛先テーブルに同期します。
スキーマ変更同期ポリシーは、CTAS 文を使用して、リアルタイムでデータを同期し、ソーステーブルのスキーマの変更を宛先テーブルに同期します。
スキーマの変更には、テーブルの作成とテーブル作成後のスキーマの変更が含まれます。
次のスキーマ変更がサポートされています。
NULL 値を許容する列の追加: 文は、関連する列を宛先テーブルのスキーマの末尾に自動的に追加し、追加された列にデータを同期します。
NULL 値を許容する列の削除: 文は、テーブルから列を削除する代わりに、宛先テーブルの NULL 値を許容する列を自動的に NULL 値で埋めます。
列の名前変更: 文は、名前が変更された列を宛先テーブルの末尾に追加し、名前変更前の列を NULL 値で埋めます。
たとえば、ソーステーブルの col_a 列の名前が col_b に変更された場合、col_b 列が宛先テーブルの末尾に追加され、col_a 列は自動的に NULL 値で埋められます。
次のスキーマ変更はサポートされていません。
データの型の変更。
たとえば、列のデータが VARCHAR 型から BIGINT 型に変更されたり、列のプロパティが NOT NULL から NULLABLE に変更されたりする場合です。
プライマリキーやインデックスなどの制約の変更。
NULL 値を許容しない列の追加または削除。
- ソーステーブルのスキーマに前述の変更のいずれかがある場合は、宛先テーブルを削除し、CTAS 文を実行するジョブを再起動する必要があります。これにより、宛先テーブルが再作成され、既存データが宛先テーブルに再同期されます。
- CTAS 文は DDL 文の種類を識別しませんが、スキーマが変更される前後の 2 つのデータレコード間のスキーマの違いを比較します。したがって、列を削除してから再度追加し、列を削除および追加するために使用される 2 つの DDL 文の間にデータ変更がない場合、CTAS 文はスキーマ変更が発生しなかったと見なします。同様に、ソーステーブルに列を追加しても、CTAS 文はスキーマ変更の同期をトリガーしません。この文は、ソーステーブルのデータが変更された場合にのみスキーマの変更を識別します。この場合、文はスキーマの変更を宛先テーブルに同期します。
- CTAS 文でサポートされているフィールドタイプについての詳細については、「Apache Flink® からの継続的なデータロード」をご参照ください。
前提条件
新しいコンソールで Dataflow クラスターが作成され、Flink サービスが選択されていること。詳細については、「クラスターの作成」をご参照ください。
EMR Serverless StarRocks インスタンスが作成されていること。詳細については、「インスタンスの作成」をご参照ください。
ApsaraDB RDS for MySQL インスタンスが作成されていること。詳細については、「ステップ 1: ApsaraDB RDS for MySQL インスタンスの作成とデータベースの構成」をご参照ください。
このトピックでは、MySQL 5.7 と EMR-3.42.0 の Dataflow クラスターを例として使用します。
制限事項
Dataflow クラスター、StarRocks インスタンス、および ApsaraDB RDS for MySQL インスタンスは、同じ VPC (VPC) にデプロイする必要があります。
Dataflow クラスターと StarRocks インスタンスは、インターネット経由でアクセスできる必要があります。
ApsaraDB RDS for MySQL インスタンスのエンジンバージョンは 5.7 以降である必要があります。
Dataflow クラスターは EMR-3.42.0 以降または EMR-5.8.0 以降である必要があります。
ステップ 1: テストデータの準備
テストデータベースとテストアカウントを作成します。詳細については、「ステップ 1: ApsaraDB RDS for MySQL インスタンスの作成とデータベースの構成」をご参照ください。
テストデータベースとアカウントを作成した後、アカウントに読み取りおよび書き込み権限を付与します。
説明このトピックでは、データベース名は test_cdc、アカウント名は emr_test です。
テストアカウントを使用して ApsaraDB RDS for MySQL インスタンスに接続します。詳細については、「ステップ 2: ApsaraDB RDS for MySQL インスタンスへの接続」をご参照ください。
次のコマンドを実行してテーブルを作成します。
use test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl`( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` int DEFAULT NULL, PRIMARY KEY ( `runoob_id` ) )ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values (18,'first','tom','2022-06-22 17:13:44',3)EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。
次のコマンドを実行して、test_cdc データベースを作成し、test という名前のスーパー管理者ユーザー (パスワードの例: 1qaz!QAZ) を作成するか、test という名前の一般ユーザーを作成して、そのユーザーにデータベースの権限を付与します。詳細については、「ユーザーの管理」をご参照ください。
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED by '1qaz!QAZ'; GRANT ALL on test_cdc to test;
ステップ 2: カスタムコネクタのアップロード
Flink、StarRocks、および ApsaraDB RDS for MySQL 接続用のカスタムコネクタをアップロードします。
SSH を使用して Dataflow クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。
flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar と ververica-connector-mysql-1.13-vvr-4.0.12-1-20220330.065158-3-jar-with-dependencies.jar をダウンロードし、Dataflow クラスターの /opt/apps/FLINK/flink-current/lib ディレクトリにアップロードします。
ステップ 3: CTAS 文の実行
セッションモードでジョブを送信します。
SSH を使用して Dataflow クラスターにログインします。詳細については、「クラスターへのログイン」をご参照ください。
次のコマンドを実行して /opt/apps/FLINK/flink-current ディレクトリに移動します。
cd /opt/apps/FLINK/flink-current次のコマンドを実行して YARN セッションを開始します。
./bin/yarn-session.sh --detachedコマンドが正常に実行されると、出力に
application_XXXX_YYが返されます。これは、SQL クライアントにログインするために必要な sessionId です。
次のコマンドを実行して SQL クライアントを開きます。
./bin/sql-client.sh -s <application_XXXX_YY>説明<application_XXXX_YY>を前のステップで取得した sessionId に置き換えてください。
MySQL と StarRocks のカタログを作成します。
CREATE CATALOG sr WITH ( 'type' = 'starrocks', 'endpoint' = 'fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'username' = 'test', 'password' = '1qaz!QAZ', 'dbname' = 'test_cdc' ); CREATE CATALOG mysql WITH ( 'type' = 'mysql', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'default-database' = 'test_cdc' );次の表にパラメーターを説明します。ビジネス要件に基づいてパラメーターを変更できます。
表 1. StarRocks カタログパラメーター
パラメーター
説明
type
カタログのタイプ。値を starrocks に設定します。
endpoint
FE ノードの内部エンドポイントとクエリポート。フォーマットは
EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:9030です。例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。説明EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。
username
StarRocks データベースへのアクセスに使用するユーザー名。
「ステップ 1: テストデータの準備」で作成したユーザー名を入力します。この例では、test を使用します。
password
StarRocks データベースへのアクセスに使用するパスワード。
「ステップ 1: テストデータの準備」でアカウントに設定したパスワードを入力します。この例では、1qaz!QAZ を使用します。
dbname
StarRocks データベースの名前。
「ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。
表 2. MySQL カタログパラメーター
パラメーター
説明
type
カタログのタイプ。値を mysql に設定します。
hostname
ApsaraDB RDS for MySQL インスタンスの内部エンドポイント。
ApsaraDB RDS コンソールの ApsaraDB RDS for MySQL インスタンスの [データベース接続] ページで内部エンドポイントをコピーできます。例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port
MySQL データベースのポート番号。デフォルト値: 3306。
username
MySQL データベースへのアクセスに使用するユーザー名。
「ステップ 1: テストデータの準備」で作成したアカウントのユーザー名を入力します。この例では、emr_test を使用します。
password
MySQL データベースへのアクセスに使用するパスワード。
「ステップ 1: テストデータの準備」で作成したアカウントのパスワードを入力します。この例では、123456 を使用します。
default-database
デフォルトの MySQL データベースの名前。
「ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。
StarRocks カタログで CTAS 文を実行します。
次の 3 つの方法のいずれかを使用して CTAS 文を実行できます。
At-least-once セマンティクス: sink.buffer-flush.interval-ms パラメーターを使用して、データが StarRocks に書き込まれる間隔を構成できます。利点は、書き込み間隔が短く、使用されるメモリが少ないことです。
use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;Exactly-once セマンティクス: チェックポイントを定期的にスケジュールする間隔を指定する必要があります。利点は、エラーが発生したときにデータが失われたり重複したりしないことです。欠点は、チェックポイントの間隔によってデータがいつ表示されるかが決まることです。詳細については、「チェックポイント」をご参照ください。
set 'execution.checkpointing.interval' = '1 min'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.timeout' = '10 min'; use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'=' engine = olap primary key(runoob_id) distributed by hash(runoob_id ) buckets 8', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl', 'username'='test', 'password' = '1qaz!QAZ', 'sink.semantic' = 'exactly-once', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;シンプルモード: 利点は、テーブルを作成するときに MySQL データベースのテーブルのフィールドを気にする必要がないことです。作成するテーブルのスキーマは、MySQL データベースのテーブルのスキーマと同じです。このモードは開発者にとって使いやすいです。欠点は、パーティションを作成できないことです。パーティション分割する必要があるテーブルの場合は、通常モードでパーティションを作成する必要があります。
use CATALOG sr; CREATE TABLE IF NOT EXISTS runoob_tbl1 with ( 'starrocks.create.table.properties'='buckets 8', 'starrocks.create.table.mode'='simple', 'database-name'='test_cdc', 'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030', 'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030', 'table-name'='runoob_tbl_sr', 'username'='test', 'password' = '1qaz!QAZ', 'sink.buffer-flush.interval-ms' = '5000', 'sink.properties.row_delimiter' = '\x02', 'sink.properties.column_separator' = '\x01' ) as table mysql.test_cdc.runoob_tbl /*+ OPTIONS ( 'connector' = 'mysql-cdc', 'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com', 'port' = '3306', 'username' = 'emr_test', 'password' = '123456', 'database-name' = 'test_cdc', 'table-name' = 'runoob_tbl' )*/;表 3. WITH パラメーター
パラメーター
必須
説明
starrocks.create.table.properties
はい
StarRocks データベースでテーブルを作成するために使用される文のフィールド定義を除くその他のサフィックス定義 (サンプルコードの engine、key、buckets など)。
database-name
はい
StarRocks データベースの名前。
この例では、test_cdc を使用します。
jdbc-url
はい
StarRocks に接続して StarRocks でクエリを実行するために使用される JDBC URL。
例: jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030。
fe-c-9b354c83e891****-internal.starrocks.aliyuncs.comは、EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントです。説明EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。
load-url
はい
FE ノードの内部エンドポイントとクエリポート。フォーマットは
EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイント:8030です。例: fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030。
説明EMR Serverless StarRocks インスタンスの FE ノードの内部エンドポイントを取得する方法については、「インスタンスリストと詳細の表示」をご参照ください。
sink.semantic
いいえ
文の実行に使用されるセマンティクス。データ整合性を確保するには、このパラメーターを exactly-once に設定します。デフォルト値: at-least-once。
starrocks.create.table.mode
いいえ
有効な値:
normal (デフォルト): 例に示すように、starrocks.create.table.properties パラメーターで engine、key、buckets などの完全な構成を指定する必要があります。
simple: デフォルトでは、engine パラメーターは olap に設定され、key パラメーターは primary key に設定されます。プライマリキーは MySQL テーブルのプライマリキーと同じです。distributed by hash パラメーターはすべてのプライマリキーに対して構成され、パーティションは存在しません。starrocks.create.table.properties パラメーターで buckets を指定する必要があります。properties などのオプションのパラメーターも指定できます。
sink.properties.row_delimiter
いいえ
カスタムの行区切り文字。
sink.properties.column_separator
いいえ
カスタムの列区切り文字。
説明vvr-6.0.5-flink-1.15 より前のバージョンの Flink を使用する場合は、WITH 句に
'sink.use.new-apiapi'='false',を追加する必要があります。その他の構成の詳細については、「Apache Flink からデータを継続的にロードする」をご参照ください。
表 4. OPTIONS パラメーター
パラメーター
説明
connector
コネクタのタイプ。値を mysql-cdc に設定します。
hostname
ApsaraDB RDS for MySQL インスタンスの内部エンドポイント。
ApsaraDB RDS コンソールの ApsaraDB RDS for MySQL インスタンスの [データベース接続] ページで内部エンドポイントをコピーできます。例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com。
port
MySQL データベースのポート番号。デフォルト値: 3306。
username
MySQL データベースへのアクセスに使用するユーザー名。
「ステップ 1: テストデータの準備」で作成したアカウントのユーザー名を入力します。この例では、emr_test を使用します。
password
MySQL データベースへのアクセスに使用するパスワード。
「ステップ 1: テストデータの準備」で作成したアカウントのパスワードを入力します。
table-name
StarRocks データベースのテーブルの名前。
「ステップ 1: テストデータの準備」で作成したテーブル名を入力します。この例では、runoob_tbl を使用します。
database-name
デフォルトの MySQL データベースの名前。
「ステップ 1: テストデータの準備」で作成したデータベース名を入力します。この例では、test_cdc を使用します。
ステップ 4: データ同期結果の表示
チェックポイントが有効になっている場合、最大待機時間はチェックポイントの間隔とほぼ同じです。
データのクエリ
EMR Serverless StarRocks インスタンスにログインして接続します。詳細については、「MySQL クライアントを使用して StarRocks インスタンスに接続する」をご参照ください。
StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。
use test_cdc; select * from runoob_tbl1;次の出力が返されます。これは、データが ApsaraDB RDS for MySQL インスタンスから StarRocks クラスターに同期されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
挿入されたデータのクエリ
ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行してデータを挿入します。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`) values(1,'second','tom2','2022-06-23',1);StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。
select * from runoob_tbl1;次の出力が返されます。これは、データが挿入されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | first | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
更新されたデータの同期
ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行して、指定したデータを更新します。
update runoob_tbl set runoob_title= 'new' where runoob_id = 18;StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。
select * from runoob_tbl1;次の出力が返されます。これは、更新されたデータが同期されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
削除されたデータの同期
ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行して、指定したデータを削除します。
DELETE FROM runoob_tbl WHERE runoob_id = 1;StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。
select * from runoob_tbl1;次の出力が返されます。これは、削除されたデータが同期されたことを示します。
+-----------+--------------+---------------+-----------------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | +-----------+--------------+---------------+-----------------+---------+ | 18 | new | tom | 2022-06-22 | 3 | +-----------+--------------+---------------+-----------------+---------+
NULL 値を許容する列の追加
ApsaraDB RDS for MySQL データベースウィンドウで次のコマンドを実行して、NULL 値を許容する列を追加します。
alter table `runoob_tbl` add COLUMN `add_col2` INT;次のコマンドを実行してデータを挿入します。
INSERT INTO runoob_tbl(`runoob_id`,`runoob_title`,`runoob_author`,`submission_date`,`add_col`,`add_col2`) values(1,'second','tom2','2022-06-23',1,2)StarRocks 接続ウィンドウで次のコマンドを実行して、テーブルデータを表示します。
select * from runoob_tbl1;次の出力が返されます。これは、スキーマが変更され、NULL 値を許容する列が追加されたことを示します。
+-----------+--------------+---------------+-----------------+---------+---------+ | runoob_id | runoob_title | runoob_author | submission_date | add_col | add_co2 | +-----------+--------------+---------------+-----------------+---------+---------+ | 1 | second | tom2 | 2022-06-23 | 1 | 2 | | 18 | new | tom | 2022-06-22 | 3 | NULL | +-----------+--------------+---------------+-----------------+---------+---------+
CDAS の概要
CREATE DATABASE AS (CDAS) 文は、CTAS 文の糖衣構文です。CDAS 文を使用すると、MySQL からデータベース全体を同期できます。つまり、Flink ジョブが生成されます。ソーステーブルは MySQL のデータベースであり、宛先テーブルは StarRocks の複数のテーブルです。また、including table 構文を使用して、CDAS 操作のためにデータベース内の一部のテーブルのみを選択することもできます。
CTAS 文の実行と同様に、CDAS 文を実行する前に、MySQL データベースと StarRocks データベースにカタログを作成する必要があります。サンプル文:
CREATE DATABASE IF NOT EXISTS sr_db with (
'starrocks.create.table.properties'=' buckets 8',
'starrocks.create.table.mode'='simple',
'jdbc-url'='jdbc:mysql://fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:9030',
'load-url'='fe-c-9b354c83e891****-internal.starrocks.aliyuncs.com:8030',
'username'='test',
'password' = '1qaz!QAZ',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
as DATABASEmysql.test_cdc including table
'tabl1','tbl2','tbl3' /*+ OPTIONS ( 'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc' )*/;