Realtime Compute for Apache Flink を使用すると、Apache Flink の Change Data Capture (CDC) コネクタに基づいて、ソースからシンクへのデータインジェスト用の YAML スクリプトを作成できます。このトピックでは、YAML を使用してデータインジェストパイプラインを開発し、MySQL データベースから StarRocks にすべてのデータを同期する方法について説明します。
前提条件
ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。
アップストリームおよびダウンストリームのストレージシステムが準備できていること。
ApsaraDB RDS for MySQL インスタンスが作成されていること。詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
E-MapReduce (EMR) Serverless StarRocks インスタンスが作成されていること。詳細については、「ステップ 1:コンピューティングとストレージが統合された StarRocks インスタンスの作成」をご参照ください。
説明ApsaraDB RDS for MySQL インスタンスと EMR Serverless StarRocks インスタンスは、ご利用の Realtime Compute for Apache Flink ワークスペースと同じ VPC (Virtual Private Cloud) に存在する必要があります。同じ VPC にない場合は、ネットワーク接続を確立し、ApsaraDB RDS for MySQL インスタンスの IP アドレスホワイトリストを設定する必要があります。詳細については、「Realtime Compute for Apache Flink はどのようにして VPC をまたいでサービスにアクセスしますか?」、「Realtime Compute for Apache Flink はどのようにしてインターネットにアクセスしますか?」、および「ホワイトリストの設定方法」をご参照ください。
背景情報
例えば、ApsaraDB RDS for MySQL インスタンスに `order_dw_mysql` という名前のデータベースがあるとします。`order_dw_mysql` データベースには、`orders`、`orders_pay`、`product_catalog` という 3 つのビジネス テーブルが作成されています。`order_dw_mysql` データベースのビジネス テーブルから StarRocks の `order_dw_sr` データベースにデータを同期するデータインジェストパイプラインを開発する場合は、次の手順を実行します。
ステップ 1:テストデータの準備
ApsaraDB RDS for MySQL インスタンスでデータベースを作成し、そのデータベース用のアカウントを作成します。
`order_dw_mysql` という名前のデータベースと、そのデータベースに対する読み取り/書き込み権限を持つ標準アカウントを作成します。詳細については、「アカウントとデータベースの作成」および「データベースの管理」をご参照ください。
Data Management (DMS) コンソールから ApsaraDB RDS for MySQL インスタンスにログインします。
詳細については、「DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインする」をご参照ください。
[SQL コンソール] タブで、次の文を入力し、[実行] をクリックして、データベースに 3 つのビジネス テーブルを作成し、データを挿入します。
CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee numeric(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); CREATE TABLE `orders_pay` ( pay_id bigint not null primary key, order_id bigint not null, pay_platform int not null, create_time timestamp not null ); CREATE TABLE `product_catalog` ( product_id bigint not null primary key, catalog_name varchar(50) not null ); -- Prepare data. INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee'); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, '2023-02-15 17:40:56'), (2002, 100002, 1, '2023-02-15 17:40:56'), (2003, 100003, 0, '2023-02-15 17:40:56'), (2004, 100004, 0, '2023-02-15 17:40:56'), (2005, 100005, 0, '2023-02-15 18:40:56'), (2006, 100006, 0, '2023-02-15 18:40:56'), (2007, 100007, 0, '2023-02-15 18:40:56');
ステップ 2:データインジェストジョブの開発
管理したいワークスペースを見つけ、[操作] 列の [コンソール] をクリックします。Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションウィンドウで、 を選択します。
をクリックします。ダイアログボックスで、[Data Ingestion From MySQL To Starrocks] をクリックし、[次へ] をクリックします。
[名前]、[場所]、[エンジンバージョン] を設定し、[OK] をクリックします。
ジョブコードを記述します。
次のコードは、ApsaraDB RDS for MySQL の `order_dw_mysql` データベースから StarRocks の `order_dw_sr` データベースにすべてのテーブルを同期する方法の例を示しています。
source: type: mysql hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com port: 3306 username: ${secret_values.mysqlusername} password: ${secret_values.mysqlpassword} tables: order_dw_mysql.\.* server-id: 5405-5415 sink: type: starrocks name: StarRocks Sink jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030 load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030 username: ${secret_values.starrocksusername} password: ${secret_values.starrockspassword} table.create.properties.replication_num: 1 sink.buffer-flush.interval-ms: 5000 # 5 秒ごとにデータをフラッシュします。 route: - source-table: order_dw_mysql.\.* sink-table: order_dw_sr.<> replace-symbol: <> description: route all tables in source_db to sink_db pipeline: name: Sync MySQL Database to StarRocks次の表に、この例の ApsaraDB RDS for MySQL および StarRocks インスタンスに必要な構成情報を示します。データインジェストに使用されるその他のオプションの詳細については、「MySQL」および「StarRocks」をご参照ください。
説明YAML スクリプトで開発されたデータインジェストジョブは、名前空間変数のみをサポートします。セキュリティリスクを防ぐため、認証情報をハードコーディングするのではなく、変数を使用することを推奨します。詳細については、「変数の管理」をご参照ください。
カテゴリ
オプション
説明
例
source
hostname
ApsaraDB RDS for MySQL データベースへのアクセスに使用する IP アドレスまたはホスト名。
データベースの VPC エンドポイントを入力することを推奨します。
rm-bp1rk934iidc3****.mysql.rds.aliyuncs.comport
ApsaraDB RDS for MySQL データベースへのアクセスに使用するポート番号。
3306
username
ApsaraDB RDS for MySQL データベースへのアクセスに使用するユーザー名とパスワード。「ステップ 1:テストデータの準備」で作成したアカウントのユーザー名とパスワードにオプションを設定します。
${secret_values.mysqlusername}password
${secret_values.mysqlpassword}tables
ApsaraDB RDS for MySQL テーブルの名前。正規表現を使用して、複数のテーブルからデータを読み取ることができます。
この例では、`order_dw_mysql` データベース内のすべてのテーブルとデータが同期されます。
order_dw_mysql.\.*
server-id
データベースクライアントに割り当てられる数値 ID。
5405-5415
sink
jdbc-url
データベースへの接続に使用される Java Database Connectivity (JDBC) URL。
JDBC URL には、特定の IP アドレスとフロントエンド (FE) の JDBC ポートが含まれます。このオプションには、
jdbc:mysql://ip:port形式で値を指定します。EMR コンソールの EMR Serverless StarRocks インスタンスの [インスタンス詳細] タブに移動して、FE 詳細セクションでインスタンスの [内部エンドポイント] と [クエリポート] を確認できます。
jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030load-url
FE ノードへの接続に使用される HTTP サービス URL。
EMR コンソールの EMR Serverless StarRocks インスタンスの [インスタンス詳細] タブに移動して、FE 詳細セクションでインスタンスの [内部エンドポイント] と [HTTP ポート] を確認できます。
fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030username
StarRocks データベースへのアクセスに使用するユーザー名とパスワード。
EMR Serverless StarRocks インスタンスを作成したときに使用したユーザー名とパスワードにオプションを設定します。
説明この例では、プレーンテキストのパスワード情報によるセキュリティリスクを防ぐために変数が使用されています。詳細については、「変数の管理」をご参照ください。
${secret_values.starrocksusername}password
${secret_values.starrockspassword}sink.buffer-flush.interval-ms
Flink がバッファーをフラッシュする頻度を指定します。この例ではデータセットが小さいため、結果生成を早めるために短い間隔が設定されています。
5000
route
source-table
ソーステーブルを指定します。
正規表現を使用して複数のテーブルを照合します。例えば、
order_dw_mysql.\.*はorder_dw_mysqlデータベース内のすべてのテーブルをルーティングします。order_dw_mysql.\.*
sink-table
データルーティングの宛先を指定します。
replace-symbolのシンボルを各ソーステーブル名のプレースホルダーとして使用して、多対多のルーティングを実装できます。ルーティング構成ルールの詳細については、「ルートモジュール」をご参照ください。
order_dw_sr.<>
replace-symbol
パターンマッチングが使用されるときに、ソーステーブルの名前で置き換えられる文字列。
<>
[デプロイ] をクリックします。
ステップ 3:ジョブの開始
[データインジェスト] ページの右上の [デプロイ] をクリックします。ダイアログボックスで、パラメーターを設定し、[確認] をクリックします。
Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションウィンドウで、 を選択します。[デプロイメント] ページで、対象のデプロイメントを見つけ、[操作] 列の [開始] をクリックします。
[ジョブの開始] パネルで、ジョブの起動設定を行い、[開始] をクリックします。
この例では、[初期モード] が選択されています。パラメーターの詳細については、「ジョブデプロイメントの開始」をご参照ください。ジョブを開始した後、[デプロイメント] ページでジョブデプロイメントのステータスと情報を確認できます。
ステップ 4:StarRocks での同期結果の確認
YAML デプロイメントが [実行中] 状態になった後、StarRocks でデータ同期のステータスを確認できます。
EMR StarRocks Manager を使用して EMR Serverless StarRocks インスタンスにアクセスします。詳細については、「EMR StarRocks Manager を使用して EMR Serverless StarRocks インスタンスにアクセスする」をご参照ください。
表示されたページの左側のナビゲーションウィンドウで、[SQL エディター] をクリックします。[データベース] タブで、
アイコンをクリックします。`default_catalog` に `order_dw_sr` という名前のデータベースが表示されます。
[クエリ] タブで、[+ ファイル] をクリックします。[ファイルの作成] ダイアログボックスで、パラメーターを設定して [クエリスクリプト] を作成します。クエリスクリプトの編集ページで、次の SQL 文を入力し、[実行] をクリックします。
SELECT * FROM default_catalog.order_dw_sr.orders order by order_id; SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id; SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;文の下に表示される同期結果を確認します。
ApsaraDB RDS for MySQL データベースのテーブルと同じ名前のテーブルと、そのテーブルのデータが StarRocks データベースにすでに存在しています。
