すべてのプロダクト
Search
ドキュメントセンター

Hologres:Flink または Blink を使用したバイナリログのレプリケーション

最終更新日:Mar 12, 2026

このトピックでは、Flink と Blink を使用して Hologres バイナリログをリアルタイムで消費する方法について説明します。

注意事項

Hologres バイナリログを消費する際は、次の点に注意してください。

  • Hologres V0.9 以降でのみバイナリログを消費できます。Hologres V1.3.21 以降でのみエンジンホワイトリストを設定できます。以前のバージョンはこの機能をサポートしていません。サポートされていないバージョンでホワイトリストを有効にすると、バイナリログの消費は失敗します。ご利用のインスタンスバージョンが必要なバージョンよりも古い場合は、Hologres DingTalk グループに参加してフィードバックを提供してください。詳細については、「オンラインサポート」をご参照ください。

  • Hologres は、行指向テーブルと列指向テーブルの両方で、テーブルレベルでのバイナリロギング機能をサポートしています。Hologres V1.1 以降では、行列ハイブリッドストレージテーブルもサポートされています。バイナリロギングを有効にした後、列指向テーブルは行指向テーブルよりも理論上のオーバーヘッドが高くなります。したがって、頻繁なデータ更新を伴うシナリオでは、行指向テーブルでバイナリロギングを有効にすることを推奨します。

  • バイナリロギングのサポート、およびその有効化と設定方法の詳細については、「Hologres バイナリログの購読」をご参照ください。

  • Realtime Compute for Apache Flink のみが Hologres バイナリログの消費をサポートしています。Holohub モードでは、Flink は Hologres バイナリログからシンプルなデータ型のみを消費することをサポートしています。Flink 6.0.3 以降では、Java Database Connectivity (JDBC) モードで Hologres バイナリログを消費できます。Holohub モードと比較して、JDBC モードはより多くのデータ型をサポートしています。詳細については、「Blink/Flink と Hologres のデータ型マッピング」をご参照ください。このモードには、追加の権限要件もあります。詳細については、「権限」をご参照ください。

  • 親パーティションテーブルからバイナリログを消費することはできません。

  • Hologres V2.0 以降では、Holohub モードのサポートが限定的になります。Hologres V2.1 以降、Holohub モードは非推奨となり、JDBC モードに完全に置き換えられます。Hologres バージョンをアップグレードする前に、Holohub モードを使用する Flink タスクを確認し、Flink Ververica Runtime (VVR) ジョブバージョンをアップグレードしてから、Hologres インスタンスをアップグレードしてください。詳細については、「Holohub モードから JDBC モードへの切り替え」をご参照ください。

権限

  • Flink は、JDBC モードでバイナリログを消費する場合にカスタム Hologres アカウントをサポートしますが、Holohub モードではサポートしません。

  • Flink を使用して Holohub モードで Hologres バイナリログを消費するには、テーブルに対する読み書き権限が必要です。

  • Flink を使用して JDBC モードで Hologres バイナリログを消費するには、次の前提条件があります。詳細については、「JDBC を使用した Hologres バイナリログの消費」をご参照ください。

    1. hg_binlog 拡張機能が作成されていること。この拡張機能は、Hologres V2.0 以降ではデフォルトで作成されます。

    2. ユーザーがインスタンスのスーパーユーザーであるか、ターゲットテーブルに対する Owner 権限とインスタンスに対するレプリケーションロール権限の両方を持っていること。

Flink を使用したバイナリログのリアルタイム消費

VVP 2.4 以降は、Hologres コネクタを使用したバイナリログのリアルタイム消費をサポートしています。以下のセクションでは、その使用方法について説明します。

ソーステーブル DDL (非 CDC モード)

このモードでは、ソースによって消費されたバイナリログデータは、通常の Flink データとして子孫ノードに渡されます。すべてのデータは Insert タイプです。特定の hg_binlog_event_type タイプのデータを、ビジネスニーズに基づいて処理する方法を選択できます。Hologres テーブルでバイナリロギングを有効にした後、Flink でソーステーブル (非 CDC モード) に次の DDL を使用して、バイナリログをリアルタイムで消費します。

create table test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);
  • 3 つの binlogxxx パラメーターはバイナリログシステムフィールドです。これらの名前と型は固定されており、変更できません。

  • その他のフィールドはユーザーフィールドに対応し、小文字である必要があります。

ソーステーブル DDL (CDC モード)

このモードでは、hg_binlog_event_type に基づいて、INSERT、DELETE、UPDATE_BEFORE、または UPDATE_AFTER のような正確な Flink RowKind タイプがバイナリログデータの各行に自動的に設定されます。これにより、MySQL や PostgreSQL の変更データキャプチャ (CDC) 機能と同様に、テーブルデータをミラーリングできます。

説明

Hologres バイナリログソーステーブル (CDC モード) は、ウォーターマーク定義をサポートしていません。ウィンドウ集計を実行するには、非ウィンドウ集計メソッドを使用できます。詳細については、「MySQL/Hologres CDC ソーステーブルはウィンドウ関数をサポートしていません。分単位の集計統計を実装するにはどうすればよいですか?」をご参照ください。

Hologres テーブルでバイナリロギングを有効にした後、Flink でソーステーブル (CDC モード) に次の DDL を使用して、バイナリログをリアルタイムで消費します。

create table test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) with (
  'connector'='hologres',
  'dbname'='<yourDbname>',// Hologres データベースの名前。
  'tablename'='<yourTablename>',// Hologres テーブルの名前。
  'username'='<yourAccessID>',// ご利用のアカウントの AccessKey ID。
  'password'='<yourAccessSecret>',// ご利用のアカウントの AccessKey Secret。
  'endpoint'='<yourEndpoint>',// ご利用の Hologres インスタンスの VPC エンドポイント。
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogMaxRetryTimes' = '10',
  'binlogRetryIntervalMs' = '500',
  'binlogBatchReadSize' = '100'
);

完全データと増分データの両方に対応するソーステーブル

VVR エンジン 1.13-vvr-4.0.13 および Hologres V0.10 以降、Hologres バイナリログ CDC ソーステーブルは、完全データと増分データの両方の消費をサポートしています。このメソッドは、まずデータベースから履歴完全データを読み取り、その後バイナリログから増分データをスムーズに読み取るように切り替えます。詳細については、「Hologres」をご参照ください。

JDBC モードバイナリログソーステーブル

Flink 6.0.3 以降では、JDBC モードで Hologres バイナリログを消費できます。Holohub モードと比較して、JDBC モードはより多くのデータ型とカスタムアカウントをサポートしています。詳細については、「Hologres」をご参照ください。

Holohub モードから JDBC モードへの切り替え

Hologres は V2.0 以降、Holohub モードを段階的に廃止しています。Hologres バージョンをアップグレードするには、ジョブを Holohub モードから JDBC モードに切り替える必要があります。次の手順に従ってください。

Hologres インスタンスを V2.1 にアップグレードする

Hologres インスタンスを V2.1 にアップグレードする前に、次のいずれかのソリューションを選択して Flink タスクと Hologres インスタンスを確認し、Flink タスクが正常に実行されることを確認してください。

  • (ソリューション 1) (推奨) Flink VVR バージョンを 8.0.7 以降にアップグレードします。Flink は Holohub モードから JDBC モードに自動的に切り替わります。

  • (ソリューション 2) Flink VVR バージョンを 6.0.7 から 8.0.5 のバージョンにアップグレードします。ソーステーブルに 'sdkMode'='jdbc' パラメーターを追加し、ジョブを再起動します。また、ユーザーに次のいずれかの権限セットを付与する必要があります。ジョブが正常に実行されることを確認した後、Hologres インスタンスをアップグレードしてください。

    • (オプション 1) インスタンスに対するスーパーユーザー権限。

    • (オプション 2) ターゲットテーブルに対する Owner 権限、CREATE DATABASE 権限、およびインスタンスに対するレプリケーションロール権限。

  • (ソリューション 3) (非推奨) Flink VVR バージョンを 8.0.6 にアップグレードします。Flink は Holohub モードから JDBC モードに自動的に切り替わります。ただし、VVR 8.0.6 には既知のバグがあります。ディメンションテーブルのフィールドが多すぎる場合、VVR ジョブはデプロイメント中にタイムアウトする可能性があります。詳細については、「Hologres Connector リリースノート」をご参照ください。

  • (オプション) 多数の Flink VVR ジョブがある場合は、アップグレードが必要なジョブとテーブルに関する情報を取得するために、以下の内容をご参照ください。

Hologres インスタンスを V2.0 にアップグレードする

  • (ソリューション 1) (推奨) Flink VVR バージョンを 8.0.6 以降にアップグレードします。Flink は Holohub モードから JDBC モードに自動的に切り替わります。VVR 8.0.6 には既知のバグがあります。ディメンションテーブルのフィールドが多すぎる場合、VVR ジョブはデプロイメント中にタイムアウトする可能性があります。詳細については、「Hologres Connector リリースノート」をご参照ください。VVR 8.0.7 を選択することを推奨します。

  • (ソリューション 2) Flink VVR バージョンを 8.0.4 または 8.0.5 にアップグレードし、Flink ジョブを再起動します。また、ユーザーに次のいずれかの権限セットを付与する必要があります。ジョブが正常に実行されることを確認した後、Hologres インスタンスをアップグレードしてください。

    • (オプション 1) インスタンスに対するスーパーユーザー権限。

    • (オプション 2) ターゲットテーブルに対する Owner 権限、CREATE DATABASE 権限、およびインスタンスに対するレプリケーションロール権限。

  • (ソリューション 3) Flink VVR バージョンを 6.0.7 から 8.0.3 のバージョンにアップグレードします。Flink は引き続き Holohub モードを使用してバイナリログを消費します。

Hologres バイナリログを消費する Flink VVR ジョブが多数ある場合は、アップグレードが必要なジョブとテーブルに関する情報を取得するために、次のメソッドを使用してください。

説明

このツールは、次のタイプのジョブに関する情報のみを取得することをサポートしています。

  • DDL ステートメントを使用してテーブルを定義する SQL ジョブ。

  • ヒントを使用してパラメーターを指定するカタログジョブ。

このツールは、JAR ジョブやヒントパラメーターを持たないカタログテーブルに関する情報の取得をサポートしていません。

  1. オープンソースツール find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar

  2. コマンドラインを使用してオープンソースツールのディレクトリに移動します。その後、次のコマンドを実行して、アップグレードが必要なすべてのジョブとテーブルを表示します。

    説明

    次のコマンドを実行するには、JDK 8 以降がインストールされた Java 環境が必要です。

    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <url> <AccessKeyID> <AccessKeySecret> <binlog/rpc>
    
    # Example
    java -cp find-incompatible-flink-jobs-1.0-SNAPSHOT-jar-with-dependencies.jar com.alibaba.hologres.FindIncompatibleFlinkJobs Beijing https://vvp.console.aliyun.com/web/xxxxxx/zh/#/workspaces/xxxx/namespaces/xxxx/operations/stream/xxxx my-access-key-id my-access-key-secret binlog

    以下にパラメーターについて説明します。

    パラメーター

    説明

    region

    ターゲットの Realtime Compute for Apache Flink プロジェクトが配置されているリージョンの値。値については、「リージョン値マッピングテーブル」をご参照ください。

    url

    ターゲットの Realtime Compute for Apache Flink プロジェクト内の任意のジョブの URL。

    AccessKeyID

    Realtime Compute for Apache Flink プロジェクトにアクセスできるアカウントの AccessKey ID。

    AccessKeySecret

    Realtime Compute for Apache Flink プロジェクトにアクセスできるアカウントの AccessKey Secret。

    binlog/rpc

    ジョブでチェックする内容。有効な値は次のとおりです。

    • binlog: プロジェクト内のすべてのジョブの Hologres バイナリログソーステーブルをチェックします。

    • rpc: プロジェクト内のすべてのジョブで rpc モードを使用するディメンションテーブルまたはシンクテーブルをチェックします。

    リージョン値マッピングテーブル (クリックして展開)

    リージョン

    中国 (北京)

    Beijing

    中国 (上海)

    Shanghai

    中国 (杭州)

    Hangzhou

    中国 (深セン)

    Shenzhen

    中国 (張家口)

    Zhangjiakou

    中国 (香港)

    Hong Kong

    シンガポール

    Singapore

    ドイツ (フランクフルト)

    Germany

    インドネシア (ジャカルタ)

    Indonesia

    マレーシア (クアラルンプール)

    Malaysia

    米国 (シリコンバレー)

    US

    上海金融クラウド

    Shanghai Finance Cloud

  3. 以下は結果例です。

    image