このトピックでは、MySQL データベース全体を Kafka に同期する方法について説明します。この方法により、複数のタスクが MySQL データベースにかける負荷を軽減できます。
背景情報
MySQL Change Data Capture (CDC) テーブルは、MySQL からデータを取得し、テーブルの変更をリアルタイムで同期します。これらは、複雑なコンピューティングによく使用されます。たとえば、MySQL CDC テーブルをディメンションテーブルとして使用し、他のデータテーブルと結合できます。単一の MySQL テーブルが複数のジョブの依存関係になることがあります。これらのジョブが同じテーブルのデータを処理すると、MySQL データベースは複数の接続を確立します。これにより、MySQL サーバーとネットワークに大きな負荷がかかります。
ソリューションアーキテクチャ
アップストリームの MySQL データベースへの負荷を軽減するため、Realtime Compute for Apache Flink では、MySQL データベース全体を Kafka に同期できます。このソリューションでは、Kafka を中間レイヤーとして使用します。Flink CDC データインジェストジョブがデータを Kafka に同期します。
単一のジョブで、アップストリームの MySQL データベースから Kafka へデータをリアルタイムに同期します。各 MySQL テーブルは、対応する Kafka Topic に upsert モードで書き込まれます。ダウンストリームのジョブは、MySQL テーブルに直接アクセスする代わりに、Upsert Kafka コネクタを使用して Topic からデータを読み取ります。この方法により、MySQL データベースへの負荷が大幅に軽減されます。

制限事項
同期対象の MySQL テーブルには、プライマリキーが必要です。
自己管理型 Kafka クラスター、EMR Kafka クラスター、または ApsaraMQ for Kafka を使用できます。ApsaraMQ for Kafka を使用する場合、デフォルトのエンドポイントを使用する必要があります。
Kafka クラスターには、ソーステーブルよりも多くのストレージ容量が必要です。そうでない場合、容量不足によりデータ損失が発生する可能性があります。データベース同期のために作成された Topic はコンパクト化されます。コンパクト化された Topic では、各キーの最新のメッセージのみが保持され、データは有効期限切れになりません。これは、Topic がソーステーブルのサイズとほぼ同量のデータを格納することを意味します。
ユースケース
注文レビューのリアルタイム分析を考えてみましょう。`user`、`order`、`feedback` の 3 つのテーブルがあります。次の図は、これらのテーブルのデータを示しています。
注文の詳細とユーザーレビューを表示するには、`user` テーブルを結合して `name` フィールドからユーザー名を取得する必要があります。次の SQL 文は、その方法を示しています。
-- 注文情報をユーザーテーブルと結合して、各注文のユーザー名と製品名を表示します。
SELECT order.id as order_id, product, user.name as user_name
FROM order LEFT JOIN user
ON order.user_id = user.id;
-- レビューをユーザーテーブルと結合して、各レビューの内容と対応するユーザー名を表示します。
SELECT feedback.id as feedback_id, comment, user.name as user_name
FROM feedback LEFT JOIN user
ON feedback.user_id = user.id;2 つの SQL タスクでは、`user` テーブルが両方のジョブで使用されます。実行時に、両方のジョブが MySQL から完全データと増分データを読み取ります。完全データを読み取るには MySQL 接続が必要であり、増分データを読み取るにはバイナリログ (Binlog) クライアントが必要です。ジョブの数が増えるにつれて、必要な MySQL 接続と Binlog クライアントリソースも増加します。これにより、上流のデータベースに大きな負荷がかかります。この負荷を軽減するために、Flink CDC データインジェストジョブを使用して、上流の MySQL データをリアルタイムで Kafka に同期できます。その後、データは複数の下流ジョブで消費できるようになります。
前提条件
Realtime Compute for Apache Flink がアクティベートされていること。詳細については、「Realtime Compute for Apache Flink のアクティベート」をご参照ください。
ApsaraMQ for Kafka がアクティベートされていること。詳細については、「ApsaraMQ for Kafka インスタンスのデプロイ」をご参照ください。
ApsaraDB RDS for MySQL がアクティベートされていること。詳細については、「ApsaraDB RDS for MySQL インスタンスの作成」をご参照ください。
Realtime Compute for Apache Flink、ApsaraDB RDS for MySQL、および ApsaraMQ for Kafka は、同じ VPC にある必要があります。異なる VPC にある場合は、VPC 間ネットワーク通信を有効にするか、インターネット経由でアクセスする必要があります。詳細については、「VPC をまたいで他のサービスにアクセスするにはどうすればよいですか?」および「インターネットにアクセスするにはどうすればよいですか?」をご参照ください。
Resource Access Management (RAM) ユーザーや RAM ロールなどの ID を使用する場合は、その ID がリソースにアクセスするために必要な権限を持っていることを確認してください。
事前準備
ApsaraDB RDS for MySQL インスタンスの作成とデータソースの準備
データベースを作成します。詳細については、「データベースの作成」をご参照ください。
宛先インスタンス用に
order_dwという名前のデータベースを作成します。MySQL CDC データソースを準備します。
インスタンスの製品ページの上部にある [データベースにログイン] をクリックします。
表示される DMS ログインページで、作成したデータベースアカウントのユーザー名とパスワードを入力し、[ログイン] をクリックします。
ログイン後、左側の
order_dwデータベースをダブルクリックして切り替えます。SQL コンソールで、3 つのビジネステーブルを作成するための DDL 文と、データを挿入するための文を記述します。
CREATE TABLE `user` ( id bigint not null primary key, name varchar(50) not null ); CREATE TABLE `order` ( id bigint not null primary key, product varchar(50) not null, user_id bigint not null ); CREATE TABLE `feedback` ( id bigint not null primary key, user_id bigint not null, comment varchar(50) not null ); -- データの準備 INSERT INTO `user` VALUES(1, 'Tom'),(2, 'Jerry'); INSERT INTO `order` VALUES (1, 'Football', 2), (2, 'Basket', 1); INSERT INTO `feedback` VALUES (1, 1, 'Good.'), (2, 2, 'Very good');
[実行] をクリックし、次に [直接実行] をクリックします。
操作手順
Flink CDC データインジェストタスクを作成して開始し、アップストリームの MySQL データベースから Kafka にデータをリアルタイムで同期します。これにより、データは複数のダウンストリームジョブで利用可能になります。ジョブは自動的に Topic を作成します。`route` モジュールを使用して Topic 名を定義できます。Topic のパーティション数とレプリカ数は Kafka クラスターのデフォルト設定を使用し、`cleanup.policy` は `compact` に設定されます。
デフォルトの Topic 名
データベース全体の同期タスクは、MySQL のデータベース名とテーブル名をピリオドで結合したデフォルトの命名フォーマットを使用して Kafka Topic を作成します。たとえば、ジョブは order_dw.user、order_dw.order、order_dw.feedback などの Topic を作成できます。
ページで、新しい Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。 scan.binlog.newly-added-table.enabled: true # (オプション) テーブルとフィールドのコメントを同期します。 include-comments.enabled: true # (オプション) タスクマネージャーの OutOfMemory 問題を防ぐために、無制限のチャンクを優先します。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (オプション) 読み取りを高速化するために解析フィルターを有効にします。 scan.only.deserialize.captured.tables.changelog.enabled: true sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # 以下のパラメーターは ApsaraMQ for Kafka に必要です aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}右上隅にある [デプロイ] をクリックしてジョブをデプロイします。
左側のナビゲーションウィンドウで、 を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。
Topic 名の指定
データベース同期タスクの `route` モジュールを使用して、各テーブルの Topic 名を指定できます。次のジョブは、`user1`、`order2`、`feedback3` の 3 つの Topic を作成します。
ページで、新しい Flink CDC データインジェストジョブを作成し、次のコードを YAML エディターにコピーします。
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。 scan.binlog.newly-added-table.enabled: true # (オプション) テーブルとフィールドのコメントを同期します。 include-comments.enabled: true # (オプション) タスクマネージャーの OutOfMemory 問題を防ぐために、無制限のチャンクを優先します。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (オプション) 読み取りを高速化するために解析フィルターを有効にします。 scan.only.deserialize.captured.tables.changelog.enabled: true route: - source-table: order_dw.user sink-table: user1 - source-table: order_dw.order sink-table: order2 - source-table: order_dw.feedback sink-table: feedback3 sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # 以下のパラメーターは ApsaraMQ for Kafka に必要です aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}右上隅にある [デプロイ] をクリックしてジョブをデプロイします。
左側のナビゲーションウィンドウで、 を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。
Topic 名の一括設定
データベース同期タスクの `route` モジュールを使用して、生成される Topic 名のパターンを一括で指定できます。次のジョブは、`topic_user`、`topic_order`、`topic_feedback` の 3 つの Topic を作成します。
ページで、新しい Flink CDC データインジェストジョブを作成し、YAML エディターに次のコードをコピーします。
source: type: mysql name: MySQL Source hostname: #{hostname} port: 3306 username: #{usernmae} password: #{password} tables: order_dw.\.* server-id: 28601-28604 # (オプション) 増分フェーズ中に新しく作成されたテーブルからデータを同期します。 scan.binlog.newly-added-table.enabled: true # (オプション) テーブルとフィールドのコメントを同期します。 include-comments.enabled: true # (オプション) タスクマネージャーの OutOfMemory 問題を防ぐために、無制限のチャンクを優先します。 scan.incremental.snapshot.unbounded-chunk-first.enabled: true # (オプション) 読み取りを高速化するために解析フィルターを有効にします。 scan.only.deserialize.captured.tables.changelog.enabled: true route: - source-table: order_dw.\.* sink-table: topic_<> replace-symbol: <> sink: type: upsert-kafka name: upsert-kafka Sink properties.bootstrap.servers: xxxx.alikafka.aliyuncs.com:9092 # 以下のパラメーターは ApsaraMQ for Kafka に必要です aliyun.kafka.accessKeyId: #{ak} aliyun.kafka.accessKeySecret: #{sk} aliyun.kafka.instanceId: #{instanceId} aliyun.kafka.endpoint: #{endpoint} aliyun.kafka.regionId: #{regionId}右上隅にある [デプロイ] をクリックしてジョブをデプロイします。
左側のナビゲーションウィンドウで、 を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。
Kafka データのリアルタイムコンシューム
アップストリームの MySQL データベースからのデータは、JSON フォーマットで Kafka に書き込まれます。単一の Kafka Topic は複数のダウンストリームジョブでコンシュームでき、これらのジョブは Topic から読み取って最新のテーブルデータを取得します。同期されたテーブルのデータは、次の 2 つの方法でコンシュームできます。
カタログを介したデータの直接コンシューム
Kafka Topic からのデータをソーステーブルとして読み取ります。
ページで、新しい SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーします。
CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- 複数のシンクに書き込む場合に必要です。 -- Kafka JSON カタログのユーザーテーブルと注文情報を結合して、各注文のユーザー名と製品名を表示します。 INSERT INTO print_user_proudct SELECT `order`.key_id as order_id, value_product as product, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.`order`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `order` -- グループと開始モードを指定します LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- グループと開始モードを指定します ON `order`.value_user_id = `user`.key_id; -- レビューをユーザーテーブルと結合して、各レビューの内容と対応するユーザー名を表示します。 INSERT INTO print_user_feedback SELECT feedback.key_id as feedback_id, value_comment as `comment`, `user`.value_name as user_name FROM `kafka-catalog`.`kafka`.feedback/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as feedback -- グループと開始モードを指定します LEFT JOIN `kafka-catalog`.`kafka`.`user`/*+OPTIONS('properties.group.id'='<yourGroupName>', 'scan.startup.mode'='earliest-offset')*/ as `user` -- グループと開始モードを指定します ON feedback.value_user_id = `user`.key_id; END; -- 複数のシンクに書き込む場合に必要です。この例では、Print コネクタを使用して結果を直接出力します。結果をコネクタの結果テーブルに出力して、さらなる分析や計算を行うこともできます。複数のシンクへの書き込み構文の詳細については、「INSERT INTO 文」をご参照ください。
説明データ同期中に MySQL テーブルのスキーマが変更された場合、Kafka JSON カタログによって解析されたスキーマが実際のテーブルスキーマと同期しなくなる可能性があります。たとえば、MySQL テーブルからフィールドが削除されても、カタログから解析されたスキーマにまだ表示されている場合、これらのフィールドの値は null になることがあります。
カタログから読み取られたスキーマは、コンシュームされたデータのフィールドで構成されます。フィールドが削除されても、それを含むメッセージが有効期限切れになっていない場合、スキーマには存在しなくなったフィールドが含まれる可能性があります。そのようなフィールドの値は null になります。この状況では特別な処理は必要ありません。
右上隅にある [デプロイ] をクリックしてジョブをデプロイします。
左側のナビゲーションウィンドウで、 を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。
一時テーブルを作成してデータをコンシュームする
カスタムスキーマを定義し、一時テーブルからデータを読み取ります。
ページで、新しい SQL ストリームジョブを作成し、次のコードを SQL エディターにコピーできます。
CREATE TEMPORARY TABLE user_source ( key_id BIGINT, value_name STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE order_source ( key_id BIGINT, value_product STRING, value_user_id BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'order', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE feedback_source ( key_id BIGINT, value_user_id BIGINT, value_comment STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'feedback', 'properties.bootstrap.servers' = '<yourKafkaBrokers>', 'scan.startup.mode' = 'earliest-offset', 'key.format' = 'json', 'value.format' = 'json', 'key.fields' = 'key_id', 'key.fields-prefix' = 'key_', 'value.fields-prefix' = 'value_', 'value.fields-include' = 'EXCEPT_KEY', 'value.json.infer-schema.flatten-nested-columns.enable' = 'false', 'value.json.infer-schema.primitive-as-string' = 'false' ); CREATE TEMPORARY TABLE print_user_proudct( order_id BIGINT, product STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); CREATE TEMPORARY TABLE print_user_feedback( feedback_id BIGINT, `comment` STRING, user_name STRING ) WITH ( 'connector'='print', 'logger'='true' ); BEGIN STATEMENT SET; -- 複数のシンクに書き込む場合に必要です。 -- Kafka JSON カタログのユーザーテーブルと注文情報を結合して、各注文のユーザー名と製品名を表示します。 INSERT INTO print_user_proudct SELECT order_source.key_id as order_id, value_product as product, user_source.value_name as user_name FROM order_source LEFT JOIN user_source ON order_source.value_user_id = user_source.key_id; -- レビューをユーザーテーブルと結合して、各レビューの内容と対応するユーザー名を表示します。 INSERT INTO print_user_feedback SELECT feedback_source.key_id as feedback_id, value_comment as `comment`, user_source.value_name as user_name FROM feedback_source LEFT JOIN user_source ON feedback_source.value_user_id = user_source.key_id; END; -- 複数のシンクに書き込む場合に必要です。この例では、Print コネクタを使用して結果を直接出力します。結果をコネクタの結果テーブルに出力して、さらなる分析や計算を行うこともできます。複数のシンクへの書き込み構文の詳細については、「INSERT INTO 文」をご参照ください。
次の表に、一時テーブルを構成するためのパラメーターを示します。
パラメーター
説明
注意
connector
コネクタのタイプ。
値は `kafka` である必要があります。
topic
対応する Topic の名前。
これは Kafka JSON カタログの説明と同じである必要があります。
properties.bootstrap.servers
Kafka ブローカーのアドレス。
フォーマットは
host:port,host:port,host:portです。アドレスを区切るにはカンマ (,) を使用します。scan.startup.mode
Kafka からデータを読み取る開始オフセット。
有効な値:
earliest-offset: Kafka の最も古いパーティションから読み取ります。
latest-offset: Kafka の最新のオフセットから読み取ります。
group-offsets (デフォルト): 指定された properties.group.id のコミット済みオフセットから読み取ります。
timestamp: scan.startup.timestamp-millis で指定されたタイムスタンプから読み取ります。
specific-offsets: scan.startup.specific-offsets で指定されたオフセットから読み取ります。
注意
このパラメーターは、ジョブがステートなしで開始された場合にのみ有効です。ジョブがチェックポイントから再開したり、状態を回復したりする場合、状態に保存された進行状況を優先して読み取りを再開します。
key.format
Flink Kafka コネクタが Kafka メッセージキーをシリアル化または逆シリアル化するために使用するフォーマット。
値は `json` である必要があります。
key.fields
Kafka メッセージキーに対応するソースまたはシンクテーブルのフィールド。
複数のフィールド名を区切るにはセミコロン (;) を使用します。例:
field1;field2。key.fields-prefix
メッセージ値またはメタデータのフィールドとの名前の競合を避けるための、すべての Kafka メッセージキーのカスタムプレフィックス。
これは、Kafka JSON カタログの key.fields-prefix パラメーターの値と同じである必要があります。
value.format
Flink Kafka コネクタが Kafka メッセージ値をシリアル化または逆シリアル化するために使用するフォーマット。
値は `json` である必要があります。
value.fields-prefix
メッセージキーまたはメタデータのフィールドとの名前の競合を避けるための、すべての Kafka メッセージ値のカスタムプレフィックス。
これは、Kafka JSON カタログの value.fields-prefix パラメーターの値と同じである必要があります。
value.fields-include
メッセージ本文内のメッセージキーフィールドの処理ポリシーを指定します。
固定値 EXCEPT_KEY は、メッセージ本文にメッセージキーフィールドが含まれないことを示します。
value.json.infer-schema.flatten-nested-columns.enable
Kafka メッセージ値の JSON 内のネストされた列を再帰的に展開するかどうかを指定します。
カタログの infer-schema.flatten-nested-columns.enable パラメーターの値に対応します。
value.json.infer-schema.primitive-as-string
Kafka メッセージ値内のすべての基本データ型を文字列型として推論するかどうかを指定します。
カタログの infer-schema.primitive-as-string パラメーターの値に対応します。
右上隅にある [デプロイ] をクリックしてジョブをデプロイします。
左側のナビゲーションウィンドウで、 を選択します。対象ジョブの [アクション] 列で、[開始] をクリックします。[ステートレス開始] を選択し、[開始] をクリックします。
ジョブの結果の表示
左側のナビゲーションウィンドウで を選択し、対象のジョブをクリックします。
[ジョブログ] タブで、[実行中のタスクマネージャー] タブの [パス、ID] の下にあるタスクをクリックします。
[ログ] をクリックし、ページで
PrintSinkOutputWriterに関連するログ情報を検索します。