このトピックでは、Realtime Compute for Apache Flink または Blink を使用して Hologres バイナリログをリアルタイムで消費する方法について説明します。
使用上の注意
Hologres バイナリログを消費する前に、以下の項目に注意してください。
Hologres V0.9 以降では、バイナリログを消費できます。 Hologres V1.3.21 以降では、エンジンのホワイトリストを設定できます。 V1.3.21 より前のバージョンのインスタンスにエンジンのホワイトリストを設定すると、インスタンスのバイナリログを消費できません。 Hologres インスタンスのバージョンが V1.3.21 より前の場合は、技術サポートのために Hologres DingTalk グループに参加できます。 詳細については、Hologres のオンラインサポートを受けるをご参照ください。
Hologres では、テーブルレベルでバイナリログを消費できます。 行指向テーブルと列指向テーブルの両方がサポートされています。 Hologres インスタンスのバージョンが V1.1 以降の場合は、行と列のハイブリッドストレージモードを使用するテーブルのバイナリログも消費できます。 バイナリロギング機能を有効にすると、理論的には、列指向テーブルのオーバーヘッドは行指向テーブルよりも大きくなります。 そのため、データが頻繁に更新される場合は、行指向テーブルに対してバイナリロギング機能を有効にすることをお勧めします。
バイナリロギング機能のサポートと、この機能を有効化および設定する方法の詳細については、Hologres バイナリログをサブスクライブするをご参照ください。
Alibaba Cloud Realtime Compute for Apache Flink のみで Hologres バイナリログを消費できます。 Realtime Compute for Apache Flink を使用して HoloHub モードで Hologres バイナリログを消費する場合、単純なデータ型のみがサポートされます。 Ververica Runtime(VVR)6.0.3 以降を使用する Realtime Compute for Apache Flink では、Java Database Connectivity(JDBC)モードで Hologres バイナリログを消費できます。 HoloHub モードと比較して、JDBC モードはより多くのデータ型をサポートしています。 データ型のマッピングの詳細については、データ型の「Realtime Compute for Apache Flink または Blink と Hologres 間のデータ型のマッピング」セクションをご参照ください。 JDBC モードと HoloHub モードに必要な権限の詳細については、このトピックの権限をご参照ください。
親パーティションテーブルのバイナリログは消費できません。
Hologres V2.0 および V2.0 以降のマイナーバージョンでは、特定の条件が満たされた場合に HoloHub モードがサポートされます。 Hologres V2.1 以降では、HoloHub モードはサポートされなくなりました。 JDBC モードのみがサポートされています。 Hologres インスタンスをアップグレードする前に、HoloHub モードを使用する Realtime Compute for Apache Flink デプロイメントを確認し、Realtime Compute for Apache Flink デプロイメントの VVR バージョンをアップグレードしてください。 詳細については、このトピックのHoloHub モードから JDBC モードへの変更をご参照ください。
権限
Realtime Compute for Apache Flink を使用して JDBC モードで Hologres バイナリログを消費する場合は、カスタム Hologres アカウントを使用できます。 Realtime Compute for Apache Flink を使用して HoloHub モードで Hologres バイナリログを消費する場合は、カスタム Hologres アカウントはサポートされていません。
Realtime Compute for Apache Flink を使用して HoloHub モードで Hologres バイナリログを消費する場合は、バイナリログのテーブルに対する読み取りおよび書き込み権限が付与されている必要があります。
Realtime Compute for Apache Flink を使用して JDBC モードで Hologres バイナリログを消費する前に、以下の条件が満たされていることを確認する必要があります。 詳細については、JDBC を使用して Hologres バイナリログを消費するをご参照ください。
hg_binlog拡張機能が作成されています。 この拡張機能は、Hologres V2.0 以降でデフォルトで作成されます。使用するアカウントに、目的の Hologres インスタンスのスーパーユーザーロールが割り当てられているか、目的のテーブルの所有者権限と目的のインスタンスのレプリケーションロール権限が付与されています。
Realtime Compute for Apache Flink を使用してバイナリログをリアルタイムで消費する
Realtime Compute for Apache Flink VVP 2.4 以降では、Hologres コネクタを使用してバイナリログをリアルタイムで消費できます。 このセクションでは、手順について説明します。
非 CDC モードのソーステーブル DDL
このモードでは、ソーステーブルで消費されるバイナリログは、Realtime Compute for Apache Flink の通常のデータとしてダウンストリームノードに転送されます。 すべてのデータの変更タイプは INSERT です。 hg_binlog_event_type フィールドで指定されたタイプのデータを、ビジネス要件に基づいて処理できます。 Hologres テーブルに対してバイナリロギング機能が有効になったら、次の DDL ステートメントを実行して、Realtime Compute for Apache Flink で非変更データキャプチャ(CDC)モードでバイナリログを消費するソーステーブルを作成できます。
create table test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>', /* データベース名 */
'tablename'='<yourTablename>', /* テーブル名 */
'username'='<yourAccessID>', /* AccessKey ID */
'password'='<yourAccessSecret>', /* AccessKey Secret */
'endpoint'='<yourEndpoint>', /* エンドポイント */
'binlog' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);binlogというプレフィックスが付いた 3 つのフィールドはシステムフィールドです。 これらのフィールドの名前またはデータ型を変更することはできません。他のフィールドはユーザーフィールドに対応し、すべて小文字にする必要があります。
CDC モードのソーステーブル DDL
このモードでは、ソーステーブルで消費されるバイナリログデータの各行に、hg_binlog_event_type フィールドで指定されたタイプに基づいて、INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER などの正確な Flink RowKind タイプが自動的に割り当てられます。 これにより、バイナリログをソーステーブルにミラーリングできます。 これは、MySQL および PostgreSQL の CDC 機能に似ています。
バイナリログ用に作成された Hologres CDC ソーステーブルにウォーターマークを定義することはできません。 このようなソーステーブルでウィンドウ集計を実行する場合は、別の方法を使用して集計を実行できます。 詳細については、MySQL CDC ソーステーブルと Hologres CDC ソーステーブルはウィンドウ関数をサポートしていません。MySQL CDC ソーステーブルまたは Hologres CDC ソーステーブルで分単位のデータ集計を実装するにはどうすればよいですか?をご参照ください。
Hologres テーブルに対してバイナリロギング機能が有効になったら、次の DDL ステートメントを実行して、Realtime Compute for Apache Flink で CDC ソーステーブルのバイナリログをリアルタイムで消費できます。
create table test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) with (
'connector'='hologres',
'dbname'='<yourDbname>', // Hologres データベースの名前。
'tablename'='<yourTablename>',// Hologres テーブルの名前。
'username'='<yourAccessID>',// 現在のアカウントの AccessKey ID。
'password'='<yourAccessSecret>',// 現在のアカウントの AccessKey シークレット。
'endpoint'='<yourEndpoint>',// Hologres インスタンスの VPC エンドポイント。
'binlog' = 'true',
'cdcMode' = 'true',
'binlogMaxRetryTimes' = '10',
'binlogRetryIntervalMs' = '500',
'binlogBatchReadSize' = '100'
);フルデータと増分データが消費されるソーステーブル
エンジンバージョンが vvr-4.0.13-flink-1.13 の Realtime Compute for Apache Flink と V0.10 以降の Hologres インスタンスでは、CDC モードのソーステーブルでフルデータと増分データの消費がサポートされています。 この方法でバイナリログが消費されると、最初にデータベース内のすべてのデータが読み取られ、次に増分バイナリログが読み取られます。 詳細については、Hologres コネクタをご参照ください。
JDBC モードのバイナリログソーステーブル
VVR 6.0.3 以降を使用する Realtime Compute for Apache Flink では、JDBC モードで Hologres バイナリログを消費できます。 HoloHub モードと比較して、JDBC モードはカスタムアカウントとより多くのデータ型をサポートしています。 詳細については、Hologres コネクタをご参照ください。
HoloHub モードから JDBC モードへの変更
Hologres V2.0 以降では、HoloHub モードはサポートされていません。 Hologres インスタンスをアップグレードする場合は、Realtime Compute for Apache Flink デプロイメントの HoloHub モードを JDBC モードに変更する必要があります。
Hologres インスタンスを V2.1 にアップグレードする
Hologres インスタンスを V2.1 にアップグレードする前に、次のいずれかのソリューションを使用して Realtime Compute for Apache Flink デプロイメントと Hologres インスタンスを確認し、Realtime Compute for Apache Flink デプロイメントが想定どおりに実行できることを確認します。
ソリューション 1:推奨。 Realtime Compute for Apache Flink の VVR バージョンを 8.0.7 以降にアップグレードしてから、Hologres インスタンスをアップグレードします。 この場合、Realtime Compute for Apache Flink は HoloHub モードを JDBC モードに自動的に変更します。
ソリューション 2:Realtime Compute for Apache Flink の VVR バージョンを 6.0.7 から 8.0.5 までのバージョンにアップグレードし、Realtime Compute for Apache Flink のソーステーブルに
'sdkMode'='jdbc'設定を追加してから、デプロイメントを再起動します。 Hologres インスタンスにログオンするために使用するユーザーアカウントに、次の権限のいずれか 1 つを付与します。 デプロイメントが正しく実行されていることを確認したら、Hologres インスタンスをアップグレードします。Hologres インスタンスのスーパーユーザー権限
テーブル所有者の権限、CREATE DATABASE 権限、および Hologres インスタンスのレプリケーションロールの権限
ソリューション 3:非推奨。 Realtime Compute for Apache Flink の VVR バージョンを 8.0.6 にアップグレードしてから、Hologres インスタンスをアップグレードします。 この場合、Realtime Compute for Apache Flink は HoloHub モードを JDBC モードに自動的に変更します。 VVR 8.0.6 を使用する Realtime Compute for Apache Flink には既知の欠陥があります。 ディメンションテーブルに過剰な数のフィールドが含まれている場合、タイムアウトのため、VVR ベースの Realtime Compute for Apache Flink ドラフトをデプロイできません。 詳細については、概要の「Hologres コネクタのリリースノート」セクションをご参照ください。
オプション。 多数の VVR ベースの Realtime Compute for Apache Flink デプロイメントがある場合は、次のセクションを参照して、デプロイメントとテーブルに関する情報を取得できます。
Hologres インスタンスを V2.0 にアップグレードする
ソリューション 1:推奨。 Realtime Compute for Apache Flink の VVR バージョンを 8.0.6 以降にアップグレードしてから、Hologres インスタンスをアップグレードします。 この場合、Realtime Compute for Apache Flink は HoloHub モードを JDBC モードに自動的に変更します。 VVR 8.0.6 を使用する Realtime Compute for Apache Flink には既知の欠陥があります。 ディメンションテーブルに過剰な数のフィールドが含まれている場合、タイムアウトのため、VVR ベースの Realtime Compute for Apache Flink ドラフトをデプロイできません。 詳細については、概要の「Hologres コネクタのリリースノート」セクションをご参照ください。 Realtime Compute for Apache Flink の VVR バージョンを 8.0.7 にアップグレードすることをお勧めします。
ソリューション 2:Realtime Compute for Apache Flink の VVR バージョンを 8.0.4 または 8.0.5 にアップグレードし、デプロイメントを再起動します。 Hologres インスタンスにログオンするために使用するユーザーアカウントに、次の権限のいずれか 1 つを付与します。 デプロイメントが正しく実行されていることを確認したら、Hologres インスタンスをアップグレードします。
Hologres インスタンスのスーパーユーザー権限
テーブル所有者の権限、CREATE DATABASE 権限、および Hologres インスタンスのレプリケーションロールの権限
ソリューション 3:Realtime Compute for Apache Flink の VVR バージョンを 6.0.7 から 8.0.3 までのバージョンにアップグレードしてから、Hologres インスタンスをアップグレードします。 この場合、Realtime Compute for Apache Flink は引き続き HoloHub モードを使用してバイナリログを消費します。
多数の VVR ベースの Realtime Compute for Apache Flink デプロイメントがある場合は、次の手順を実行して、デプロイメントとテーブルに関する情報を取得できます。
次のデプロイメントの情報のみを取得できます。
データ定義言語(DDL)ステートメントを使用してテーブルが作成される SQL デプロイメント。
ヒントを使用してパラメータが指定されるカタログデプロイメント。
JAR デプロイメントに関する情報と、ヒントパラメータを含まないカタログテーブルに関する情報は取得できません。
find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jarをダウンロードします。
オンプレミスコマンドラインツールを使用してオープンソースツールディレクトリに移動し、次のコマンドを実行してデプロイメントとテーブルに関する情報を表示します。
説明次のコマンドを実行するには、Java 環境をインストールし、JDK 8 以降を使用する必要があります。
java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc> # 例 java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs Beijing https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog次の表は、上記の構文のパラメータについて説明しています。
パラメータ
説明
region
Realtime Compute for Apache Flink ワークスペースが存在するリージョン。 さまざまなリージョンの値の詳細については、このトピックのリージョンの値をご参照ください。
url
Realtime Compute for Apache Flink ワークスペースのデプロイメントの URL。
AccessKeyID
Realtime Compute for Apache Flink ワークスペースにアクセスするために使用するアカウントの AccessKey ID。
AccessKeySecret
Realtime Compute for Apache Flink ワークスペースにアクセスするために使用するアカウントの AccessKey シークレット。
binlog/rpc
Realtime Compute for Apache Flink デプロイメントでチェックされるコンテンツ。 有効な値:
binlog:システムは、Realtime Compute for Apache Flink ワークスペースのすべてのデプロイメントで Hologres バイナリログを消費するソーステーブルをチェックします。rpc:システムは、Realtime Compute for Apache Flink ワークスペースのすべてのデプロイメントでrpcモードのディメンションテーブルと結果テーブルをチェックします。
次の図に示すように、返された結果を表示します。
