MaxCompute は、新バージョンの Flink Change Data Capture (CDC) コネクタを提供しています。 Flink CDC コネクタを使用すると、MySQL などのデータソースから MaxCompute 標準テーブルまたは Delta テーブルにデータをリアルタイムで同期できます。 このトピックでは、新バージョンの Flink CDC コネクタを使用して MaxCompute にデータを同期する方法について説明します。
Flink CDC の背景情報
Flink CDC は、リアルタイムのデータ統合のためのエンドツーエンドのオープンソースツールです。 これは、完全な機能を備えた API のセットを定義し、データを処理するための抽出、変換、書き出し (ETL) タスクフレームワークを提供します。 Apache Flink ジョブを実行して、Flink CDC が提供する機能を使用できます。 詳細については、「Flink CDC へようこそ」をご参照ください。 Apache Flink と緊密に統合され、その機能を活用する Flink CDC は、次のコア機能を提供します。
エンドツーエンドのデータ統合フレームワークを提供します。
データ統合ユーザーが効率的にジョブを構築するための API を提供します。
ソースまたはシンク内の複数のテーブルを処理します。
データベースからのすべてのデータの同期をサポートします。
スキーマ進化をサポートします。
前提条件
MaxCompute プロジェクトが作成されていること。 詳細については、「MaxCompute プロジェクトを作成する」をご参照ください。
注意事項
Flink CDC コネクタは、テーブルを自動的に作成できます。 コネクタを使用してデータを同期する場合、MaxCompute テーブルとソーステーブル間の場所とデータ型は自動的にマッピングされます。 ソーステーブルにプライマリキーがある場合、Delta テーブルが自動的に作成されます。 ソーステーブルにプライマリキーがない場合、MaxCompute 標準テーブルが作成されます。 ソーステーブルと MaxCompute テーブル間の場所とデータ型のマッピングの詳細については、「テーブルの場所のマッピング」および「データ型のマッピング」をご参照ください。
データが MaxCompute 標準テーブルに書き込まれる場合、システムは
DELETE操作を無視します。UPDATE操作はINSERT操作と見なされます。少なくとも 1 回のセマンティクスのみがサポートされています。 冪等書き込みは、Delta テーブルのプライマリキーに基づいて Delta テーブルに実装できます。
ソーステーブルのスキーマ変更は、MaxCompute テーブルに同期できます。
新しい列は、最後の列としてのみ MaxCompute テーブルに追加できます。
列のデータ型は、元のデータ型と互換性のあるデータ型にのみ変更できます。 データ型間の変換の詳細については、「列のデータ型を変更する」をご参照ください。
はじめに
このトピックでは、Flink CDC パイプラインを使用して、MySQL から MaxCompute に変更データを同期するためのストリーミング ETL ジョブを開発する方法について説明します。 Flink CDC パイプラインでは、データベース内のすべてのデータ、テーブルスキーマの変更、およびシャーディングされたデータベース内のテーブルからのデータを同期できます。
環境を準備する
スタンドアロンモードでデプロイされた Flink クラスタを準備する
flink-1.18.0-bin-scala_2.12.tgz パッケージをダウンロードし、パッケージを解凍して
flink-1.18.0ディレクトリを取得します。flink-1.18.0ディレクトリに移動し、次のコマンドを実行して flink-1.18.0 のインストールディレクトリを FLINK_HOME に設定します。export FLINK_HOME=$(pwd)$flink-1.18.0/confディレクトリでvim flink-conf.yamlコマンドを実行し、構成ファイルに次のパラメータを追加して、ファイルを保存します。# チェックポイント機能を有効にします。 3 秒ごとにチェックポイントを実行します。 # この構成はテスト目的でのみ提供されています。 ジョブのチェックポイント間隔は 30 秒以上に設定することをお勧めします。 execution.checkpointing.interval: 3000 # flink-cdc-pipeline-connector-maxcompute は、データ同期の Flink の通信メカニズムに依存しています。 # Flink 通信のタイムアウト期間を延長します。 pekko.ask.timeout: 60s次のコマンドを実行して、Flink クラスタを起動します。
./bin/start-cluster.sh起動が成功した場合、Web ブラウザのアドレスバーに http://localhost:8081/ と入力して、Flink Web UI にアクセスできます。 8081 はデフォルトのポート番号です。
start-cluster.sh コマンドを複数回実行して、複数の TaskManager を並列に実行できます。
MySQL 環境を準備する
この例では、Docker Compose を使用して MySQL 環境を準備します。
Docker イメージを起動し、
docker-compose.yamlという名前のファイルを作成します。 次のコードはファイルの内容を示しています。version: '2.1' services: mysql: image: debezium/example-mysql:1.1 ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=123456 - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw次の表にパラメータを示します。
パラメータ
説明
version
Docker のバージョン。
image
Docker イメージのバージョン。 値を debezium/example-mysql:1.1 に設定します。
ports
MySQL インスタンスのポート番号。
environment
MySQL インスタンスのユーザー名とパスワード。
この例では、製品情報を含む MySQL データベース app_db が Docker Compose で使用されます。
docker-compose.yaml ファイルが格納されているディレクトリで次のコマンドを実行して、必要なコンポーネントを起動します。
docker-compose up -dこのコマンドは、Docker Compose 構成で定義されているすべてのコンテナをデタッチモードで自動的に起動します。
docker psコマンドを実行して、コンテナが起動されているかどうかを確認できます。
MySQL データベースでデータを準備する
次のコマンドを実行して MySQL コンテナにアクセスします。
docker-compose exec mysql mysql -uroot -p123456MySQL でデータベースを作成し、テーブルデータを準備します。
データベースを作成します。
CREATE DATABASE app_db; USE app_db;テーブルデータを準備します。
orders という名前のテーブルを作成し、テーブルにデータを挿入します。
CREATE TABLE `orders` ( `id` INT NOT NULL, `price` DECIMAL(10,2) NOT NULL, PRIMARY KEY (`id`) ); -- テーブルにデータを挿入します。 INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00); INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);shipments という名前のテーブルを作成し、テーブルにデータを挿入します。
CREATE TABLE `shipments` ( `id` INT NOT NULL, `city` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- テーブルにデータを挿入します。 INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing'); INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');products という名前のテーブルを作成し、テーブルにデータを挿入します。
-- CREATE TABLE `products` ( `id` INT NOT NULL, `product` VARCHAR(255) NOT NULL, PRIMARY KEY (`id`) ); -- テーブルにデータを挿入します。 INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer'); INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap'); INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');
Flink CDC CLI を使用して YAML ファイルを送信する
必要な JAR パッケージをダウンロードします。
flink-cdc パッケージ
flink-cdc ページに移動して、バイナリ圧縮パッケージ flink-cdc-3.1.1-bin.tar.gz をダウンロードし、パッケージを解凍して
flink-cdc-3.1.1ディレクトリを取得します。 flink-cdc-3.1.1 ディレクトリには、bin、lib、log、および conf ディレクトリが含まれています。 次に、これらのディレクトリ内のファイルを flink-1.18.0 の関連ディレクトリに移動します。コネクタパッケージ
次のコネクタパッケージをダウンロードし、
flink-1.18.0/libディレクトリに移動します。説明ダウンロードリンクをクリックすると、リリースされたコネクタバージョンのみをダウンロードできます。 SNAPSHOT バージョンを参照する場合は、マスターまたはリリースブランチに基づいて、オンプレミス マシンでバージョンのソースコードをコンパイルする必要があります。
ドライバパッケージ
MySQL Connector/J パッケージをダウンロードし、--jar パラメータを使用して Flink CDC CLI に渡すか、
$flink-1.18.0/libディレクトリに配置して Flink クラスタを再起動します。 これは、CDC コネクタにこれらのドライバが含まれなくなったためです。
タスク構成の YAML ファイルを作成します。 次のサンプルコードは、データベース同期のファイル
mysql-to-maxcompute.yamlの例を示しています。################################################################################ # 説明:MySQL のすべてのテーブルを MaxCompute に同期する ################################################################################ source: type: mysql hostname: localhost port: 3306 username: root password: 123456 tables: app_db.\.* server-id: 5400-5404 server-time-zone: UTC # accessId、accessKey、エンドポイント、およびプロジェクトパラメータを構成します。 sink: type: maxcompute name: MaxComputeSink accessId: ${your_accessId} accessKey: ${your_accessKey} endpoint: ${your_maxcompute_endpoint} project: ${your_project} bucketsNum: 8 pipeline: name: MySQL データベースを MaxCompute に同期する parallelism: 1パラメータ:
source セクションのパラメータの詳細については、「MySQL コネクタ」をご参照ください。
sink セクションのパラメータの詳細については、「コネクタの構成項目」をご参照ください。
次のコマンドを実行して、スタンドアロンモードでデプロイされた Flink クラスタにタスクを送信します。
./bin/flink-cdc.sh mysql-to-maxcompute.yamlタスクが送信されると、次の結果が返されます。
パイプラインはクラスタに送信されました。 ジョブ ID:f9f9689866946e25bf151ecc179ef46f ジョブの説明:MySQL データベースを MaxCompute に同期するFlink Web UI では、
MySQL データベースを MaxCompute に同期するという名前のタスクが実行されています。MaxCompute で次の SQL ステートメントを実行して、orders、shipments、および products テーブルが作成され、テーブルにデータを書き込むことができるかどうかを確認します。
-- orders テーブルをクエリします。 read orders; -- 次の結果が返されます。 +------------+------------+ | id | price | +------------+------------+ | 1 | 4 | | 2 | 100 | +------------+------------+ -- shipments テーブルをクエリします。 read shipments; -- 次の結果が返されます。 +------------+------------+ | id | city | +------------+------------+ | 1 | beijing | | 2 | xian | +------------+------------+ -- products テーブルをクエリします。 read products; -- 次の結果が返されます。 +------------+------------+ | id | product | +------------+------------+ | 3 | Peanut | | 1 | Beer | | 2 | Cap | +------------+------------+
変更をリアルタイムで同期する
この例では、orders テーブルを使用します。 MySQL データベースのソーステーブルのデータが変更されると、宛先 MaxCompute テーブルのデータもリアルタイムで変更されます。
次のコマンドを実行して MySQL コンテナにアクセスします。
docker-compose exec mysql mysql -uroot -p123456MySQL の orders テーブルにデータレコードを挿入します。
INSERT INTO app_db.orders (id, price) VALUES (3, 100.00);MaxCompute で
read orders;コマンドを実行して、orders テーブルのデータをクエリします。 次の結果が返されます。+------------+------------+ | id | price | +------------+------------+ | 3 | 100 | | 1 | 4 | | 2 | 100 | +------------+------------+MySQL の orders テーブルにフィールドを追加します。
ALTER TABLE app_db.orders ADD amount varchar(100) NULL;MaxCompute で
read orders;コマンドを実行して、orders テーブルのデータをクエリします。 次の結果が返されます。+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 4 | NULL | | 2 | 100 | NULL | +------------+------------+------------+MySQL の orders テーブルのデータレコードを更新します。
UPDATE app_db.orders SET price=100.00, amount=100.00 WHERE id=1;MaxCompute で
read orders;コマンドを実行して、orders テーブルのデータをクエリします。 次の結果が返されます。+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | | 2 | 100 | NULL | +------------+------------+------------+MySQL の orders テーブルからデータレコードを削除します。
DELETE FROM app_db.orders WHERE id=2;MaxCompute で
read orders;コマンドを実行して、orders テーブルのデータをクエリします。 次の結果が返されます。+------------+------------+------------+ | id | price | amount | +------------+------------+------------+ | 3 | 100 | NULL | | 1 | 100 | 100.00 | +------------+------------+------------+
MySQL で操作を実行するたびに、MaxCompute でデータプレビューが実行されます。 MaxCompute の orders テーブルに表示されるデータはリアルタイムで更新されます。
変更をルーティングする
Flink CDC は、ソーステーブルのスキーマまたはデータを他のテーブル名にルーティングする機能を提供します。 この機能を使用して、テーブルまたはデータベース名の置換、データベースの同期などの操作を実行できます。 次のサンプルコードは例を示しています。
################################################################################
# 説明:MySQL のすべてのテーブルを MaxCompute に同期する
################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
# accessId、accessKey、エンドポイント、およびプロジェクトパラメータを構成します。
sink:
type: maxcompute
name: MaxComputeSink
accessId: ${your_accessId}
accessKey: ${your_accessKey}
endpoint: ${your_maxcompute_endpoint}
project: ${your_project}
bucketsNum: 8
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: MySQL データベースを MaxCompute に同期する
parallelism: 1route セクションのパラメータの詳細については、「ルート」をご参照ください。
route セクションの前の構成は、app_db.orders テーブルのスキーマとデータを ods_db.ods_orders テーブルに同期するために使用されます。 このようにして、データベースの移行が実装されます。 正規表現を使用して source-table 内の複数のテーブルを照合し、シャーディングされたデータベース内の複数のテーブルからデータを同期できます。 サンプルコード:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_ordersこの場合、app_db.order01、app_db.order02、app_db.order03 などのテーブルのデータを ods_db.ods_orders テーブルに集約できます。
複数のテーブルに重複するプライマリキー値が存在するシナリオはサポートされておらず、後のコネクタバージョンでサポートされる予定です。
環境をクリアする
上記の操作を実行した後、環境をクリアする必要があります。
docker-compose.yml ファイルが格納されているディレクトリで次のコマンドを実行して、すべてのコンテナを停止します。
docker-compose downFlink が存在する flink-1.18.0 ディレクトリで次のコマンドを実行して、Flink クラスタを停止します。
./bin/stop-cluster.sh
付録
コネクタの構成項目
パラメータ | 必須 | デフォルト値 | データ型 | 説明 |
type | はい | なし | String | 使用するコネクタ。 値を |
name | いいえ | なし | String | シンクの名前。 |
accessId | はい | なし | String | Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。 AccessKey ID は、アクセスキーペア ページから取得できます。 |
accessKey | はい | なし | String | AccessKey ID に対応する AccessKey シークレット。 |
endpoint | はい | なし | String | MaxCompute のエンドポイント。 MaxCompute プロジェクトの作成時に選択したリージョンとネットワーク接続方法に基づいて、このパラメータを構成する必要があります。 異なるリージョンおよび異なるネットワーク接続モードで使用されるエンドポイントの詳細については、「エンドポイント」をご参照ください。 |
project | はい | なし | String | MaxCompute プロジェクトの名前。 MaxCompute プロジェクトの名前を取得するには、次の手順を実行します。 MaxCompute コンソール にログオンします。 左側のナビゲーションウィンドウで、[ワークスペース] > [プロジェクト] を選択して、MaxCompute プロジェクトの名前を表示します。 |
tunnelEndpoint | いいえ | なし | String | MaxCompute Tunnel サービスのエンドポイント。 ほとんどの場合、エンドポイントはプロジェクトが存在するリージョンに基づく自動ルーティングをサポートしています。 エンドポイントは、プロキシが使用されるネットワーク環境など、特別なネットワーク環境でのみ使用されます。 |
quotaName | いいえ | なし | String | MaxCompute Tunnel 専用のリソースグループの名前。 このパラメータが構成されていない場合は、共有リソースグループが使用されます。 |
stsToken | いいえ | なし | String | 認証に RAM ロールによって発行されたセキュリティトークンサービス (STS) トークンを使用する場合は、このパラメータが必要です。 |
bucketsNum | いいえ | 16 | Integer | MaxCompute Delta テーブルを自動的に作成するために使用されるバケットの数。 詳細については、「ニアリアルタイムデータウェアハウスの概要」をご参照ください。 |
compressAlgorithm | いいえ | zlib | String | MaxCompute にデータを書き込むときに使用される圧縮アルゴリズム。 有効な値: |
totalBatchSize | いいえ | 64 MB | String | パーティションまたは非パーティションテーブルのバッファ内の最大データ量。 異なるパーティションまたは非パーティションテーブルのバッファは互いに独立しています。 バッファ内のデータがこのパラメータの値を超えると、データは MaxCompute に書き込まれます。 |
bucketBatchSize | いいえ | 4 MB | String | バケットのバッファ内の最大データ量。 このパラメータは、Delta テーブルにデータを書き込む場合にのみ有効です。 異なるバケットのバッファは互いに独立しています。 バッファ内のデータがこのパラメータの値を超えると、データは MaxCompute に書き込まれます。 |
numCommitThreads | いいえ | 16 | Integer | チェックポイント操作中に同時に処理できるパーティションまたはテーブルの最大数。 |
numFlushConcurrent | いいえ | 4 | Integer | 同時に MaxCompute に書き込むことができるバケットの最大数。 このパラメータは、Delta テーブルにデータを書き込む場合にのみ有効です。 |
retryTimes | いいえ | 3 | Integer | ネットワーク接続エラーが発生した場合の最大再試行回数。 |
sleepMillis | いいえ | true | Long | ネットワーク接続エラーが発生した場合の各再試行の待機時間。 単位:ミリ秒。 |
テーブルの場所のマッピング
Flink CDC コネクタがテーブルを自動的に作成する場合、ソーステーブルの場所情報は、次の表に示すマッピングに基づいて MaxCompute テーブルにマッピングされます。
MaxCompute プロジェクトがスキーママモデルをサポートしていない場合、各同期タスクは 1 つの MySQL データベースのデータのみを同期できます。 他のデータソースの場合、Flink CDC コネクタは tableId.namespace 情報を無視します。
Flink CDC のオブジェクト | MaxCompute の場所 | MySQL の場所 |
構成ファイルのプロジェクト | プロジェクト | なし |
TableId.namespace | スキーマ (この構成は、MaxCompute プロジェクトがスキーママモデルをサポートしている場合にのみサポートされます。 MaxCompute プロジェクトがスキーママモデルをサポートしていない場合、この構成は無視されます。) | データベース |
TableId.tableName | テーブル | テーブル |
データ型のマッピング
Flink データ型 | MaxCompute データ型 |
CHAR/VARCHAR | STRING |
BOOLEAN | BOOLEAN |
BINARY/VARBINARY | BINARY |
DECIMAL | DECIMAL |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INTEGER | INTEGER |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
TIME_WITHOUT_TIME_ZONE | STRING |
DATE | DATE |
TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_NTZ |
TIMESTAMP_WITH_LOCAL_TIME_ZONE | TIMESTAMP |
TIMESTAMP_WITH_TIME_ZONE | TIMESTAMP |
ARRAY | ARRAY |
MAP | MAP |
ROW | STRUCT |