Realtime Compute for Apache Flink は、強力なリアルタイムデータインジェスト機能を提供します。自動的な完全および増分スイッチオーバー、自動的なメタデータディスカバリー、自動的なスキーマ進化同期、データベース全体の同期などの機能により、リアルタイムインジェストパイプラインが簡素化されます。これにより、リアルタイムのデータ同期がより効率的で使いやすくなります。このトピックでは、MySQL から Hologres にデータを移動するデータインジェストジョブを迅速に構築する方法を説明します。
背景情報
ご利用の MySQL インスタンスに、異なるスキーマを持つ 24 のビジネステーブルを含む `tpc_ds` という名前のデータベースがあると仮定します。また、`user_db1`、`user_db2`、`user_db3` という 3 つのデータベースも含まれています。シャーディングのため、これらの各データベースには、同じスキーマを持つ 3 つのテーブルが含まれています。これらを合わせると、`user01` から `user09` までの 9 つのテーブルが含まれます。次の図は、Alibaba Cloud DMS コンソールで表示した MySQL のデータベースとテーブルを示しています。
これらすべてのテーブルとそのデータを Hologres に同期し、シャーディングされたユーザーテーブルを単一の Hologres テーブルにマージするデータインジェストジョブを構築するには、次のステップに従います。
このトピックでは、Flink CDC データインジェストジョブ開発 (パブリックプレビュー) 機能を使用して、データベース全体の同期と、シャーディングされたテーブルの単一のターゲットテーブルへのマージを実行します。この機能は、ワンクリックでの完全および増分同期とリアルタイムのスキーマ進化同期もサポートしています。
前提条件
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用してサービスにアクセスする場合、Flink コンソールに必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。
Flink ワークスペースを作成します。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
データソースとシンク
ApsaraDB RDS for MySQL インスタンスを作成します。詳細については、「(非推奨、"ステップ 1" にリダイレクト) ApsaraDB RDS for MySQL インスタンスを迅速に作成」をご参照ください。
Hologres インスタンスを作成済みであること。詳細については、「Hologres インスタンスの購入」をご参照ください。
説明ApsaraDB RDS for MySQL インスタンスと Hologres インスタンスは、ご利用の Flink ワークスペースと同じリージョンおよび VPC にある必要があります。そうでない場合は、ネットワーク接続を構成する必要があります。詳細については、「VPC をまたいで他のサービスにアクセスするにはどうすればよいですか?」および「インターネットにアクセスするにはどうすればよいですか?」をご参照ください。
テストデータを準備し、IP ホワイトリストを構成します。詳細については、「MySQL テストデータと Hologres データベースの準備」および「IP ホワイトリストの構成」をご参照ください。
MySQL テストデータと Hologres データベースの準備
tpc_ds.sql、user_db1.sql、user_db2.sql、user_db3.sql をクリックして、テストデータをローカルマシンにダウンロードします。
DMS Data Management コンソールで、ApsaraDB RDS for MySQL インスタンスのテストデータを準備します。
DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインします。
詳細については、「(非推奨、"ステップ 2" にリダイレクト) DMS を使用して ApsaraDB RDS for MySQL にログイン」をご参照ください。
SQLConsole ウィンドウで、次のコマンドを入力し、[実行] をクリックします。
tpc_ds、user_db1、user_db2、user_db3 の 4 つのデータベースを作成します。
CREATE DATABASE tpc_ds; CREATE DATABASE user_db1; CREATE DATABASE user_db2; CREATE DATABASE user_db3;上部のショートカットメニューバーで、[データインポート] をクリックします。
[バッチデータインポート] タブで、ターゲットデータベースを選択し、対応する SQL ファイルをアップロードし、[申請を送信] をクリックし、次に [変更を実行] をクリックします。表示されるダイアログボックスで、[実行を確認] をクリックします。
tpc_ds、user_db1、user_db2、user_db3 の各データベースに対してこのプロセスを繰り返し、それぞれのデータファイルをインポートします。

Hologres コンソールで、マージされたユーザーテーブルデータを格納するために `my_user` という名前のデータベースを作成します。
詳細については、「データベースの作成」をご参照ください。
IP ホワイトリストの構成
Flink が ApsaraDB RDS for MySQL および Hologres インスタンスにアクセスできるようにするには、Flink ワークスペースの CIDR ブロックを両方のインスタンスの IP ホワイトリストに追加する必要があります。
Flink ワークスペースの VPC CIDR ブロックを取得します。
Realtime Compute コンソールにログインします。
[ワークスペース] リストで、ターゲットのワークスペースを見つけます。[アクション] 列で、 を選択します。
[ワークスペース詳細] ダイアログボックスで、Flink が使用する [vSwitch] の [VPC CIDR ブロック] を表示します。

Flink の CIDR ブロックを ApsaraDB RDS for MySQL インスタンスの IP ホワイトリストに追加します。
詳細については、「IP ホワイトリストの構成」をご参照ください。

Flink の CIDR ブロックを Hologres インスタンスの IP ホワイトリストに追加します。
HoloWeb でデータ接続を構成する場合、接続の IP ホワイトリストを構成する前に、[ログイン方法] を [現在のユーザーのパスワードなしログイン] に設定する必要があります。詳細については、「IP ホワイトリスト」をご参照ください。

ステップ 1:データ同期ジョブの開発
Flink 開発コンソールにログインし、新しいジョブを作成します。
ページで、[新規] をクリックします。
[空のデータインジェストドラフト] をクリックします。
Flink は多くのコードテンプレートを提供しています。各テンプレートには、ユースケース、サンプルコード、および使用ガイダンスが含まれています。テンプレートをクリックして、Flink の機能と構文について学び、ビジネスロジックを実装できます。
[次へ] をクリックします。
[新規データインジェストジョブドラフト] ダイアログボックスで、構成を指定します。
ジョブパラメーター
説明
例
[ファイル名]
ジョブの名前。
説明ジョブ名は現在のプロジェクト内で一意である必要があります。
flink-test
ストレージの場所
ジョブコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。ジョブドラフト
エンジンバージョン
ジョブが使用する Flink エンジンのバージョン。バージョン番号、バージョンマッピング、およびライフサイクルマイルストーンの詳細については、「エンジンバージョンの概要」をご参照ください。
vvr-11.1-jdk11-flink-1.20
[OK] をクリックします。
次のジョブコードをジョブエディターにコピーします。
このジョブは、tpc_ds データベースのすべてのテーブルを Hologres に同期し、シャーディングされたユーザーテーブルを単一の Hologres テーブルにマージします。サンプルコード:
source: type: mysql name: MySQL Source hostname: localhost port: 3306 username: username password: password tables: tpc_ds.\.*,user_db[0-9]+.user[0-9]+ server-id: 8601-8604 # (オプション) テーブルと列のコメントを同期します。 include-comments.enabled: true # (オプション) TaskManager の OutOfMemory エラーを回避するために、無制限のチャンク分散を優先します。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (オプション) 読み取りを高速化するために、解析フィルターを有効にします。 scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: hologres name: Hologres Sink endpoint: ****.hologres.aliyuncs.com:80 dbname: cdcyaml_test username: ${secret_values.holo-username} password: ${secret_values.holo-password} sink.type-normalize-strategy: BROADEN route: # シャーディングされたユーザーテーブルを my_user.users テーブルにマージします。 - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.users説明MySQL の tpc_ds データベース内のすべてのテーブルは、ダウンストリームデータベースの同じ名前のテーブルに直接マッピングされます。route セクションで追加のマッピングは必要ありません。それらを ods_tps_ds などの別のデータベースに同期するには、route セクションを次のように構成します。
route: # シャーディングされたユーザーテーブルを my_user.users テーブルにマージします。 - source-table: user_db[0-9]+.user[0-9]+ sink-table: my_user.users # tpc_ds データベースのすべてのテーブルを ods_tps_ds データベースに同期します。 - source-table: tpc_ds.\.* sink-table: ods_tps_ds.<> replace-symbol: <>
ステップ 2:ジョブの開始
ページで、[デプロイ] をクリックします。表示されるダイアログボックスで、[確認] をクリックします。

ページで、ターゲットジョブの横にある [アクション] をクリックし、次に [開始] をクリックします。詳細については、「ジョブの開始」をご参照ください。
[開始] をクリックします。
ジョブが開始されると、ジョブオペレーションページでそのステータスとランタイム情報を監視できます。

ステップ 3:完全データ同期結果のモニタリング
-
Hologres 管理コンソールにログインします。
[メタデータ管理] タブで、tpc_ds データベース内の 24 のテーブルとそのデータを表示します。

[メタデータ管理] タブで、my_user データベースの users テーブルのスキーマを表示できます。
同期されたスキーマとデータは次の図に示されています。
スキーマ

users テーブルのスキーマには、`_db_name` と `_table_name` という 2 つの追加列が含まれています。これらの列は、ソースデータベースとテーブル名を記録します。また、シャーディングされたテーブルがマージされた後の一意性を確保するために、複合プライマリキーの一部を形成します。
テーブルデータ
users テーブルの詳細ページの右上隅で、[テーブルをクエリ] をクリックし、次のコマンドを入力して、[実行] をクリックします。
select * from users order by _db_name,_table_name,id;クエリ結果は次の図に示されています。

ステップ 4:増分同期結果のモニタリング
完全データ同期が完了すると、ジョブは自動的に増分同期に切り替わります。手動での介入は必要ありません。[モニタリングとアラート] タブの currentEmitEventTimeLag メトリックを確認して、現在の同期フェーズを判断できます。
Realtime Compute コンソールにログインします。
対象のワークスペースを検索し、[コンソール] を [操作] 列でクリックします。
ページで、ターゲットジョブの名前をクリックします。
[モニタリングとアラート] タブ (または [データ曲線] タブ) をクリックします。
currentEmitEventTimeLag 曲線を確認して、同期フェーズを特定します。

値が 0 の場合は、完全同期がまだ進行中であることを示します。
値が 0 より大きい場合は、増分同期が開始されたことを示します。
データとスキーマ変更のリアルタイム同期を検証します。
MySQL CDC ソースは、増分同期中のデータとスキーマ変更のリアルタイム同期をサポートしています。ジョブが増分同期フェーズに入った後、MySQL のシャーディングされたユーザーテーブルのスキーマとデータを変更して、この機能を検証できます。
DMS を使用して ApsaraDB RDS for MySQL インスタンスにログインします。
詳細については、「(非推奨、"ステップ 2" にリダイレクト) DMS を使用して ApsaraDB RDS for MySQL にログイン」をご参照ください。
user_db2 データベースで、次のコマンドを実行して user02 テーブルのスキーマを変更し、データを挿入および更新します。
USE DATABASE `user_db2`; ALTER TABLE `user02` ADD COLUMN `age` INT; -- age 列を追加します。 INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30); -- age を含むデータを挿入します。 UPDATE `user05` SET name='JARK' WHERE id=15; -- 別のテーブルを更新し、名前を大文字に変更します。Hologres コンソールで、users テーブルのスキーマとデータの変更を確認します。
users テーブルの詳細ページの右上隅で、[テーブルをクエリ] をクリックし、次のコマンドを入力して [実行] をクリックします。
select * from users order by _db_name,_table_name,id;クエリ結果は次の図に示されています。
シャーディングされたテーブルのスキーマが異なっていても、user02 テーブルに加えられたスキーマとデータの変更は、ダウンストリームの users テーブルにリアルタイムで同期されます。Hologres では、users テーブルに新しい age 列、挿入された Tony レコード、および更新された JARK レコードが含まれるようになりました。
(オプション) ステップ 5:ジョブリソースの構成
データ量はさまざまです。ジョブのパフォーマンスを最適化するために、並列度と TaskManager リソースを調整できます。リソース構成を使用して、ジョブの並列度とメモリまたは CU の割り当てを調整できます。
[]ページで、対象のジョブの名前をクリックします。
[デプロイメント詳細] タブで、[リソース構成] をクリックし、右上隅の [編集] をクリックします。
TaskManager のメモリや並列度などのリソースパラメーターを手動で設定します。
[リソース構成] セクションの右上隅で、[保存] をクリックします。
ジョブを再起動します。
リソース構成の変更は、ジョブを再起動した後にのみ有効になります。
参照
データインジェストモジュールの構文の詳細については、「Flink CDC データインジェストジョブ開発リファレンス」をご参照ください。
データインジェストジョブの実行中に例外が発生した場合は、「データインジェストジョブの一般的な問題と解決策」をご参照ください。