CREATE TABLE AS (CTAS) ステートメントは、StarRocks に一致するテーブルを自動的に作成し、MySQL ソースからデータとスキーマ変更を継続的に同期します。CREATE DATABASE AS (CDAS) ステートメントは、これをデータベース全体に拡張します。このトピックでは、Realtime Compute for Apache Flink で両方のステートメントを使用して、ApsaraDB RDS for MySQL から E-MapReduce (EMR) StarRocks クラスターにトランザクション処理 (TP) データを分析処理 (AP) のために移動する方法について説明します。
アーキテクチャの概要
| コンポーネント | ロール |
|---|---|
| ApsaraDB RDS for MySQL | ソース — 変更データキャプチャ (CDC) イベントとスキーマ変更の発生元 |
| Realtime Compute for Apache Flink | 処理エンジン — MySQL CDC を読み取り、StarRocks に書き込み |
| EMR StarRocks クラスター | シンク — 同期されたデータの分析送信先 |
仕組み
CTAS ステートメントを実行すると、Flink は次の 2 つの操作を順次実行します。
テーブル作成チェック — Flink は、StarRocks に送信先テーブルが存在するかどうかを確認します。
存在しない場合、Flink は送信先カタログに基づいて、ソースと同じスキーマを持つテーブルを作成します。
既に存在する場合、Flink は作成をスキップします。既存のスキーマがソーススキーマと異なる場合、エラーが返されます。
データ同期 — Flink は、ソーステーブルから送信先テーブルへのデータとスキーマ変更の両方を同期する継続的なジョブを開始します。
スキーマ変更の動作
CTAS は、スキーマ変更を伝播するための固定ポリシーを使用します。次の表は、自動的に伝播されるものとされないものをまとめたものです。
サポートされているスキーマ変更
| 変更タイプ | StarRocks での動作 |
|---|---|
| NULL 許容列の追加 | 列は送信先テーブルの末尾に追加され、入力データが列を埋めます。 |
| NULL 許容列の削除 | 列は StarRocks に保持されますが、NULL 値で埋められます。 |
| 列名の変更 | 新しい名前の列が末尾に追加され、古い名前の列は NULL 値で埋められます。たとえば、col_a を col_b に変更すると、col_b が末尾に追加され、col_a が NULL に設定されます。 |
サポートされていないスキーマ変更
ソーステーブルで次のいずれかの変更が発生した場合、StarRocks の送信先テーブルを削除し、CTAS ジョブを再起動します。Flink はテーブルを再作成し、すべての既存データを再同期します。
列のデータの型を変更する (例:
VARCHARからBIGINT、またはNOT NULLからNULLABLE)プライマリキーやインデックスなどの制約を変更する
NULL 不可列を追加または削除する
DDL (データ定義言語) ステートメントでフィールド長を調整する
CTAS は、連続するデータレコードのスキーマを比較することでスキーマ変更を検出します。DDL ステートメントのタイプは解析しません。結果として、次のようになります。
列が削除され、その間にデータ変更なしで再追加された場合、CTAS はこれをスキーマ変更なしとして扱います。
スキーマ変更は、変更後に新しいデータがソーステーブルに到着した場合にのみ伝播されます。
フィールドタイプのマッピングについては、「Apache Flink からデータを継続的にロードする」をご参照ください。
CTAS が複数の MySQL テーブルをマージする場合、Flink はソースを追跡するために、送信先テーブルに_db_nameおよび_table_name列を自動的に先頭に追加します。3 番目の列から独自の列順序を定義します。
前提条件
開始する前に、次のものがあることを確認してください。
vvr-6.0.3-flink-1.15 以降を実行している Flink 完全管理ワークスペース。詳細については、「Flink 完全管理をアクティブ化」および「Flink SQL デプロイメントの開始」をご参照ください。
EMR StarRocks クラスター。詳細については、「StarRocks クラスターを作成」をご参照ください。
MySQL 5.7 以降を実行している ApsaraDB RDS for MySQL インスタンス。詳細については、「ApsaraDB RDS for MySQL インスタンスを作成」をご参照ください。
このトピックの例では、MySQL 5.7、EMR 3.39.1、および Flink 完全管理 vvr-6.0.3-flink-1.15 を使用しています。
制限事項
Flink ワークスペース、StarRocks クラスター、および ApsaraDB RDS for MySQL インスタンスは、同じ VPC 内にある必要があります。
ApsaraDB RDS for MySQL エンジンバージョンは 5.7 以降である必要があります。
StarRocks クラスターのインターネットアクセスは有効にする必要があります。
Flink 完全管理は vvr-6.0.3-flink-1.15 以降である必要があります。
ステップ 1: テストデータの準備
ApsaraDB RDS for MySQL インスタンスにデータベースとアカウントを作成します。詳細については、「ApsaraDB RDS for MySQL インスタンスのデータベースとアカウントを作成」をご参照ください。テストアカウントに読み取り/書き込み権限を付与します。
このトピックでは、
test_cdcという名前のデータベースとtestという名前のアカウントを使用します。テストアカウントを使用して ApsaraDB RDS for MySQL インスタンスにログインします。詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログイン」をご参照ください。
テストテーブルを作成し、行を挿入します。
USE test_cdc; CREATE TABLE IF NOT EXISTS `runoob_tbl` ( `runoob_id` INT UNSIGNED AUTO_INCREMENT, `runoob_title` VARCHAR(100) NOT NULL, `runoob_author` VARCHAR(40) NOT NULL, `submission_date` DATE, `add_col` INT DEFAULT NULL, PRIMARY KEY (`runoob_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; INSERT INTO test_cdc.`runoob_tbl` (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`) VALUES (18, 'first', 'tom', '2022-06-22', 3);SSH 経由で StarRocks クラスターにログインします。詳細については、「クラスターにログイン」をご参照ください。
StarRocks に接続します。
mysql -h127.0.0.1 -P 9030 -urootこのチュートリアルに必要なユーザーを作成し、権限を付与します。
CREATE DATABASE test_cdc; CREATE USER 'test' IDENTIFIED BY '123456'; GRANT CREATE TABLE ON DATABASE test_cdc TO test;
ステップ 2: Flink SQL エディターでカタログを作成
Flink 完全管理コンソールの [ドラフトエディター] ページで、MySQL 用と StarRocks 用のカタログをそれぞれ 1 つ作成します。「Flink SQL デプロイメントの開始」をご参照ください。
以下のパラメーター値は例です。ご利用の環境に合わせて調整してください。
MySQL カタログ
CREATE CATALOG mysql WITH (
'type' = 'mysql',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'default-database' = 'test_cdc'
);| パラメーター | 説明 |
|---|---|
type | カタログタイプ。mysql に設定します。 |
hostname | ApsaraDB RDS for MySQL インスタンスの内部エンドポイントです。ApsaraDB RDS コンソールの [データベース接続] ページからコピーしてください(例: rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com)。 |
port | MySQL データベースのポート。デフォルト: 3306。 |
username | ステップ 1: テストデータの準備で作成したユーザー名。この例では、test です。 |
password | ステップ 1: テストデータの準備で作成したユーザー名のパスワード。 |
default-database | ステップ 1: テストデータの準備で作成したデータベース名。この例では、test_cdc です。 |
StarRocks カタログ
CREATE CATALOG sr WITH (
'type' = 'starrocks',
'endpoint' = '172.16.**.**:9030',
'username' = 'test',
'password' = '123456',
'dbname' = 'test_cdc'
);| パラメーター | 説明 |
|---|---|
type | カタログタイプ。starrocks に設定します。 |
endpoint | StarRocks フロントエンドの IP アドレスとポート (例: 172.16.**.**:9030)。 |
username | ステップ 1: テストデータの準備で作成したユーザー名。この例では、test です。 |
password | ステップ 1: テストデータの準備で作成したユーザー名のパスワード。 |
dbname | StarRocks データベース名。この例では、test_cdc です。 |
ステップ 3: CTAS デプロイメントの記述と公開
[ドラフトエディター] ページで、CTAS 文を記述します。配信モードは 3 つあり、一貫性の要件に基づいて選択します。
At-least-once セマンティクス (低レイテンシーシナリオに推奨)
データは構成可能なフラッシュ間隔で書き込まれます。メモリ使用量は少ないですが、障害発生時に重複書き込みが発生する可能性があります。
/* At-least-once semantics */
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl_sr WITH (
'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl_sr',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;1 回限りのセマンティクス (データが重要なシナリオに推奨)
障害発生時にデータ損失や重複はありません。データの可視性はチェックポイント間隔に依存します。
/* Exactly-once semantics */
SET 'execution.checkpointing.interval' = '1 min';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.timeout' = '10 min';
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl WITH (
'starrocks.create.table.properties' = 'engine = olap primary key(runoob_id) distributed by hash(runoob_id) buckets 8',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl',
'username' = 'test',
'password' = '123456',
'sink.semantic' = 'exactly-once',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;チェックポイントの構成オプションについては、「チェックポイント」をご参照ください。
シンプルモード (クイックセットアップに推奨)
Flink は MySQL スキーマからテーブル定義を推論します。エンジン、キー、またはディストリビューションを手動で指定する必要はありません。シンプルモードではパーティションテーブルはサポートされていません。代わりに通常モードを使用してパーティションを作成してください。
/* Simple mode */
USE CATALOG sr;
CREATE TABLE IF NOT EXISTS runoob_tbl1 WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'database-name' = 'test_cdc',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'table-name' = 'runoob_tbl_sr',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS TABLE mysql.test_cdc.runoob_tbl
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc',
'table-name' = 'runoob_tbl'
) */;WITH 句パラメーター
| パラメーター | 必須 | 説明 |
|---|---|---|
starrocks.create.table.properties | はい | StarRocks の CREATE TABLE ステートメントのサフィックス定義 (フィールド定義を除く) — 例: engine、key、buckets。 |
database-name | はい | StarRocks データベース名。 |
jdbc-url | はい | StarRocks クエリ用の Java Database Connectivity (JDBC) URL — 例: jdbc:mysql://172.16.**.**:9030 (172.16.**.** は StarRocks クラスターの内部 IP アドレスです)。 |
load-url | はい | StarRocks フロントエンドの内部 IP アドレスと HTTP ポート。EMR クラスターのバージョンに基づいてポートを選択します: EMR V5.9.0 以降 (マイナーバージョン) および EMR V3.43.0 以降 (マイナーバージョン) の場合は 18030。EMR V5.8.0、EMR V3.42.0、またはそれ以前の場合は 8030UI とポートへのアクセス。詳細については、「」をご参照ください。 |
sink.semantic | いいえ | 配信セマンティクス: at-least-once (デフォルト) または exactly-once。 |
starrocks.create.table.mode | いいえ | normal (デフォルト): starrocks.create.table.properties で engine、key、および buckets を指定します。simple: Flink は engine=olap を設定し、MySQL プライマリキーを使用し、すべてのプライマリキー列にわたってハッシュで分散します。starrocks.create.table.properties では buckets のみが必要です。パーティションは作成されません。 |
お使いの Flink バージョンが vvr-6.0.5-flink-1.15 より前の場合、WITH 句に 'sink.use.new-apiapi' = 'false' を追加します。追加の sink パラメーターについては、「Apache Flink からデータを継続的にロードする」をご参照ください。OPTIONS 句パラメーター
OPTIONS 句は MySQL CDC ソースを構成します。
| パラメーター | 説明 |
|---|---|
connector | コネクタタイプ。mysql-cdc に設定します。 |
hostname | ApsaraDB RDS for MySQL インスタンスの内部エンドポイント (例: rm-bp1nu0c46fn9k****.mysql.rds.aliyuncs.com)。 |
port | MySQL ポート。デフォルト: 3306。 |
username | ApsaraDB RDS for MySQL アクセス用のユーザー名。ステップ 1: テストデータの準備で作成したアカウントを使用します。 |
password | ApsaraDB RDS for MySQL アクセス用のパスワード。 |
table-name | ソーステーブル名。この例では、runoob_tbl です。 |
database-name | ソースデータベース名。この例では、test_cdc です。 |
デプロイメントの公開と開始
[詳細設定] タブの [下書きエディタ] ページで、[エンジンバージョン] を
vvr-6.0.3-flink-1.15以降に設定します。「下書きエディタ」ページの右上隅で、[公開] をクリックします。
[デプロイメント] ページで、新しいデプロイメントを見つけ、[操作] 列の [開始] をクリックします。
ステップ 4: 同期の検証
ジョブ開始後、次のシナリオを実行して、データ変更とスキーマ変更の両方が StarRocks にリアルタイムで伝播されることを確認します。
検証クエリを実行する前に StarRocks に接続します。
mysql -h127.0.0.1 -P 9030 -uroot
USE test_cdc;初期データの検証
StarRocks テーブルをクエリして、シード行が同期されたことを確認します。
SELECT * FROM runoob_tbl1;期待される出力:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 18 | first | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+INSERT の検証
ApsaraDB RDS for MySQL インスタンスの [SQL Console] タブで、行を挿入します:
INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1);両方の行が表示されることを確認するために StarRocks をクエリします。
SELECT * FROM runoob_tbl1;期待される出力:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 1 | second | tom2 | 2022-06-23 | 1 |
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+UPDATE の検証
MySQL SQL コンソールで、行を更新します。
UPDATE runoob_tbl SET runoob_title = 'new' WHERE runoob_id = 18;変更を確認するために StarRocks をクエリします。
SELECT * FROM runoob_tbl1;期待される出力:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 1 | second | tom2 | 2022-06-23 | 1 |
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+DELETE の検証
MySQL SQL コンソールで、行を削除します。
DELETE FROM runoob_tbl WHERE runoob_id = 1;行が削除されたことを確認するために StarRocks をクエリします。
SELECT * FROM runoob_tbl1;期待される出力:
+-----------+--------------+---------------+-----------------+---------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col |
+-----------+--------------+---------------+-----------------+---------+
| 18 | new | tom | 2022-06-22 | 3 |
+-----------+--------------+---------------+-----------------+---------+スキーマ変更の検証: NULL 許容列の追加
MySQL SQL コンソールで、NULL 許容列を追加し、その列に値を持つ行を挿入します。
ALTER TABLE `runoob_tbl` ADD COLUMN `add_col2` INT;
INSERT INTO runoob_tbl (`runoob_id`, `runoob_title`, `runoob_author`, `submission_date`, `add_col`, `add_col2`)
VALUES (1, 'second', 'tom2', '2022-06-23', 1, 2);新しい列が表示され、既存の行が NULL を示すことを確認するために StarRocks をクエリします。
SELECT * FROM runoob_tbl1;期待される出力:
+-----------+--------------+---------------+-----------------+---------+----------+
| runoob_id | runoob_title | runoob_author | submission_date | add_col | add_col2 |
+-----------+--------------+---------------+-----------------+---------+----------+
| 18 | new | tom | 2022-06-22 | 3 | NULL |
| 1 | second | tom2 | 2022-06-23 | 1 | 2 |
+-----------+--------------+---------------+-----------------+---------+----------+add_col2 列は、それを含む最初のデータ行が到着したときに StarRocks に自動的に追加されました。
CDAS: データベース全体の同期
CREATE DATABASE AS (CDAS) ステートメントは CTAS の糖衣構文です。これは、MySQL データベースから StarRocks へ、選択されたすべてのテーブルを一度に同期する 1 つの Flink デプロイメントを作成します。INCLUDING TABLE 句を使用して、名前で特定のテーブルを選択します。
まず MySQL および StarRocks カタログを作成します (ステップ 2 と同じ)。次に、以下を実行します。
CREATE DATABASE IF NOT EXISTS sr_db WITH (
'starrocks.create.table.properties' = 'buckets 8',
'starrocks.create.table.mode' = 'simple',
'jdbc-url' = 'jdbc:mysql://172.16.**.**:9030',
'load-url' = '172.16.**.**:18030',
'username' = 'test',
'password' = '123456',
'sink.buffer-flush.interval-ms' = '5000',
'sink.properties.row_delimiter' = '\x02',
'sink.properties.column_separator' = '\x01'
)
AS DATABASE mysql.test_cdc INCLUDING TABLE 'tbl1', 'tbl2', 'tbl3'
/*+ OPTIONS (
'connector' = 'mysql-cdc',
'hostname' = 'rm-2zepd6e20u3od****.mysql.rds.aliyuncs.com',
'port' = '3306',
'username' = 'test',
'password' = '123456',
'database-name' = 'test_cdc'
) */;INCLUDING TABLE 句は、テーブル名のコンマ区切りリストを受け入れます。ソースデータベース内のすべてのテーブルを同期するには、これを省略します。
次のステップ
Apache Flink からデータを継続的にロードする — StarRocks Flink コネクタの完全なパラメーターリファレンス
チェックポイント — 1 回限りのセマンティクスのためのチェックポイント間隔およびタイムアウトを設定します。
UI およびポートへのアクセス — お使いの EMR クラスターのバージョンに適した
load-urlポートを確認してください