すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Hologres

最終更新日:Mar 05, 2026

このトピックでは、Hologres コネクタについて説明します。

背景情報

Hologres は、リアルタイムデータウェアハウス向けの統合エンジンです。大規模なデータセットに対するリアルタイムでの書き込み、更新、分析をサポートします。標準 SQL 構文を採用し、PostgreSQL プロトコルと互換性があります。ペタバイト規模の OLAP およびアドホック分析に対応し、高同時実行性・低遅延のオンラインデータサービスを提供します。MaxCompute、Realtime Compute for Apache Flink、DataWorks と深く統合されており、オフラインおよびリアルタイムのエンドツーエンドなデータウェアハウスソリューションを実現します。以下の表に、Hologres コネクタの機能をまとめます。

カテゴリ

詳細

サポートされているタイプ

ソーステーブル、ディメンションテーブル、結果テーブル

実行モード

ストリームモードおよびバッチモード

データフォーマット

非対応

監視メトリック

監視メトリック

  • ソーステーブル:

    • numRecordsIn

    • numRecordsInPerSecond

  • シンクテーブル:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    説明

    詳細については、「監視メトリック」をご参照ください。

API 種別

DataStream および SQL

結果テーブルにおける更新および削除の対応

はい

特徴

特徴

詳細

Hologres データのリアルタイム消費

バイナリログ(binlog)の有無にかかわらず、Hologres データを読み取ることができます。この機能は、変更データキャプチャ(CDC)モードおよび非 CDC モードの両方と互換性があります。

フルおよび増分の統合取り込み

フル取り込み、増分取り込み、またはフルおよび増分の統合取り込みを実行できます。

プライマリキーの競合処理

新規データを無視する、既存の行全体を置き換える、または特定のフィールドのみを更新するのいずれかを選択できます。

マルチストリーム書き込みを伴うワイドテーブルへのマージおよび部分更新

変更されたカラムのみを更新でき、行全体を更新する必要はありません。

パーティションテーブルからの binlog 読み取り(ベータ版)

物理パーティションテーブルから binlog を読み取ることができます。単一のジョブで、新しく追加されたパーティションを含むすべてのパーティションを監視できます。また、論理パーティションテーブルからの binlog 読み取りも可能です。

パーティションテーブルへの書き込み

パーティションテーブルの親テーブルにデータを書き込むと、対応する子パーティションが自動的に作成されます。

単一テーブルまたはデータベース全体のリアルタイム同期

単一テーブルまたはデータベース全体レベルでのリアルタイム同期を実行できます。この機能には、以下の能力が含まれます:

  • ソーステーブルスキーマ進化の自動検出: ソースデータベーステーブルのスキーマが変更された場合、Hologres はこれらの変更をリアルタイムで結果テーブルに同期できます。

  • スキーマ進化の自動処理: 新しいデータが Hologres に流入した場合、Flink はデータ書き込みの前にまずスキーマ変更操作をトリガーします。このプロセスには手動介入は不要です。

詳細については、「CREATE TABLE AS (CTAS) 文」および「リアルタイムデータベース同期のクイックスタート」をご参照ください。

制限事項および推奨事項

制限事項

  • 外部テーブルは非対応: Hologres コネクタは、Hologres 外部テーブル(例: MaxCompute 外部テーブル)へのアクセスをサポートしていません。

  • タイム型の制限: TIMESTAMP 型データのリアルタイム消費はサポートされていません。テーブル作成時に TIMESTAMPTZ 型を使用してください。

  • ソーステーブルスキャン(VVR 8 以前): Hologres からのデータ読み取りはデフォルトでバッチモードで実行されます。つまり、増分データは消費されません。

  • ウォーターマークの制限(VVR 8 以前): CDC モードでは、ウォーターマークの定義はサポートされていません。ウィンドウ集約を実行する場合は、非ウィンドウ集約ソリューションをご利用ください。

推奨事項

  • ストレージモードの選択

    • ポイントルックアップ用のディメンションテーブル: 行指向ストレージの使用を推奨します。この場合、プライマリキーおよびクラスタリングキーの両方を設定する必要があります。

    • 1 対多クエリ用のディメンションテーブル: 列指向ストレージの使用を推奨し、パフォーマンス最適化のために分散キーおよびセグメントキーを構成してください。

    • 分析クエリ用に頻繁に更新されるテーブル: リアルタイム binlog 消費および OLAP 分析の両方をサポートする必要がある場合、行列ハイブリッドストレージの使用を推奨します。

    重要

    Hologres テーブル作成時にストレージ形式を明示的に指定しない場合、デフォルトで列指向ストレージが適用されます。このストレージ形式は不変であり、作成後に変更することはできません。詳細については、「Hologres におけるテーブルの作成」および「テーブルストレージ形式: 列指向、行指向、行列ハイブリッド」をご参照ください。

  • ジョブの並列度: Flink ジョブの並列度を、Hologres テーブルのシャード数に合わせて設定できます。

    # このコマンドを HoloWeb で実行して、テーブルのシャード数を確認します。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • バージョンおよび機能: Hologres コネクタ リリースノートを定期的に確認し、既知の問題、機能更新、およびバージョン互換性情報をご確認ください。

重要なお知らせ

  • Hologres および VVR のバージョン互換性および制限事項

    ソーステーブル

    • VVR 8 以前: sdkMode パラメーターで消費モードを設定します。

    • VVR 11+: source.binlog.read-mode パラメーターで消費モードを設定します。

    VVR バージョン

    Hologres バージョン

    デフォルト/推奨パラメーター値

    実際の消費モード

    備考

    ≥ 6.0.7

    < 2.0

    カスタム

    holohub(デフォルト)

    JDBC の構成を推奨します。

    6.0.7–8.0.4

    ≥ 2.0

    jdbc(自動スイッチオーバー)

    jdbc(強制)

    Hologres 2.0 以降では HoloHub が非推奨となりました。holohub を設定した場合、コネクタは自動的に JDBC へフォールバックしますが、その際には追加のユーザー権限が必要になる場合があります。権限の構成については、「権限に関する問題」をご参照ください。

    ≥ 8.0.5

    ≥ 2.1

    jdbc(自動スイッチオーバー)

    jdbc(強制)

    権限に関する問題は発生しません。Hologres 2.1.27 以降では、コネクタは jdbc_fixed へ切り替わります。

    ≥ 11.1

    任意のバージョン

    AUTO(デフォルト)

    Hologres バージョンに基づいて自動選択

    • Hologres 2.1.27 以降では、コネクタはデフォルトで JDBC を選択し、軽量接続を有効化します(connection.fixed.enabled が true に設定されます)。

    • バージョン 2.1.0–2.1.26 では、JDBC モードを選択できます。

    • Hologres 2.0 以前では、コネクタは holohub を選択します。

    重要

    VVR 11.1 以降では、コネクタはデフォルトで binlog 消費を有効化します。エラーを回避するため、binlog の有効化を事前に実施してください。

    権限に関する問題

    ユーザーがスーパーユーザーでない場合、JDBC モードで binlog を読み取るための権限を付与する必要があります。

    user_name は、Alibaba Cloud アカウントまたは RAM ユーザーの ID です。詳細については、「アカウント概要」をご参照ください。

    -- 標準権限モデルでは、ユーザーに CREATE 権限を付与し、インスタンスに対して replication ロール権限を付与します。
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- データベースが簡易権限モデル(SLMP)を使用している場合、GRANT 文は実行できません。spm_grant を使用して、ユーザーにデータベースの Admin 権限を付与します。また、HoloWeb コンソールから権限を付与することもできます。
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    シンクテーブル

    • VVR 8 以前のバージョンでは、sdkMode パラメーターを指定して消費モードを選択します。

    • VVR 11+: sink.write-mode パラメーターでデータ書き込みモードを設定します。

    VVR バージョン

    Hologres バージョン

    RPC への影響

    実際の RPC 消費モード

    推奨/デフォルトパラメーター値

    備考

    6.0.4–8.0.2

    < 2.0

    なし

    rpc

    カスタム

    /

    6.0.4–8.0.2

    ≥ 2.0

    はい

    jdbc_fixed(自動スイッチオーバー)

    カスタム

    重複排除を防止するため、'jdbcWriteBatchSize'='1' を設定してください。

    ≥ 8.0.3

    任意のバージョン

    はい

    jdbc_fixed(自動スイッチオーバー)

    カスタム

    rpc を設定した場合、コネクタは自動的に jdbc_fixed へ切り替わり、重複排除を防止するために 'jdbcWriteBatchSize'='1' が設定されます。

    ≥ 8.0.5

    任意のバージョン

    はい

    jdbc_fixed(自動スイッチオーバー)

    カスタム

    rpc を設定した場合、コネクタは自動的に jdbc_fixed へ切り替わり、重複排除を防止するために 'deduplication.enabled'='false' が設定されます。

    重要
    • RPC は Hologres 2.0 以降で非推奨となりました。データ書き込みモードを rpc に設定した場合、コネクタは自動的に jdbc_fixed へ切り替わります。ただし、他の値を設定した場合は、その値が優先されます。

    • VVR 11.1 以降では、RPC はサポートされません。接続には JDBC を推奨します。

    • 高同時実行書き込みシナリオでは、jdbc_copy または COPY_STREAM をご利用ください。

    ディメンションテーブル

    VVR バージョン

    Hologres バージョン

    RPC への影響

    実際の RPC 消費モード

    推奨/デフォルトパラメーター値

    備考

    6.0.4–8.0.2

    < 2.0

    なし

    rpc

    カスタム

    /

    6.0.4–8.0.2

    ≥ 2.0

    はい

    jdbc_fixed(自動スイッチオーバー)

    カスタム

    Hologres インスタンスのバージョンが 2.0 以降の場合、RPC は非推奨であるため、コネクタは自動的に jdbc_fixed へ切り替わります。ただし、他の値を設定した場合は、その値が優先されます。

    ≥ 8.0.3

    任意のバージョン

    はい

    jdbc_fixed(自動スイッチオーバー)

    カスタム

    ≥ 8.0.5

    任意のバージョン

    はい

    jdbc_fixed(自動スイッチオーバー)

    カスタム

    重要

    VVR 11.1 以降では、RPC はサポートされず、デフォルトで JDBC が使用されます。軽量接続を有効化するには、connection.fixed.enabled を設定してください。

  • JDBC モードの binlog ソーステーブルは JSONB データの読み取りをサポートしますが、データベースレベルで GUC パラメーターを有効化する必要があります。

    -- データベースレベルで GUC パラメーターを有効化します。このコマンドはスーパーユーザーのみ実行可能です。各データベースにつき一度だけ設定すれば十分です。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • UPDATE 操作は、2 つの連続した binlog レコードを生成します。最初に古いデータ(update_before)が記録され、その後に新しいデータ(update_after)が記録されます。

  • binlog が有効化されたソーステーブルの TRUNCATE または再作成は避けてください。詳細については、「よくある質問」をご参照ください。

  • エラーを回避するため、Flink と Hologres 間で DECIMAL の精度を一貫して維持してください。詳細については、「よくある質問」をご参照ください。

  • フルおよび増分の統合取り込みに INITIAL モードを使用する場合、グローバルな順序保証はされません。下流アプリケーションがタイムスタンプに基づく計算に依存する場合は、代わりに純粋な binlog 読み取りモードをご利用ください。

binlog の有効化

テーブルが未作成の場合

リアルタイムデータ消費はデフォルトで無効化されています。HoloWeb で Hologres テーブルを作成する際に、binlog.level および binlog.ttl パラメーターを設定します。サンプルコードを以下に示します:

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');-- 行指向テーブル test_table を作成します。
call set_table_property('test_table', 'clustering_key', 'id');-- id カラムにクラスタリングキーを作成します。
call set_table_property('test_table', 'binlog.level', 'replica');-- binlog を有効化します。
call set_table_property('test_table', 'binlog.ttl', '86400');-- binlog の TTL(秒単位)を設定します。
commit;

既存のテーブル

HoloWeb で、以下のステートメントを使用して既存のテーブルの binlog を有効化し、binlog TTL を設定します。table_name を実際のテーブル名に置き換えてください。

-- binlog を有効化します。
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- binlog TTL(秒単位)を設定します。
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

WITH パラメーター

VVR 11 以降、Hologres のコネクタオプションが更新され、サポートが強化されました。一部のオプションは名称変更または削除されている場合があります。VVR 11 は VVR 8 と下位互換性を維持しています。詳細については、お使いの VVR バージョンに対応するパラメーターのドキュメントをご参照ください。

データ型マッピング

Flink と Hologres 間のデータ型マッピング」をご参照ください。

説明

Hologres では、GENERATED ALWAYS AS 構文を使用して生成カラムを定義できます。例:

ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED

生成カラムの NOT NULL 制約は、NULL 許容フィールドとしてマップされます。これは想定される動作です。Hologres は生成カラムを自動的に計算するため、Flink はそれらの値を渡しません。そのため、NOT NULL 制約を保持すると、HologresClient の書き込み検証が失敗します。通常のカラムの NOT NULL 制約は影響を受けません(例: ds TIMESTAMP NOT NULL)。

サンプル

ソーステーブル

binlog が有効化されたソーステーブル

CDC モード

このモードでは、テーブルデータのミラー同期が可能になります。ソースはバイナリログデータを消費し、hg_binlog_event_type に基づいて、INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER のような正しい Flink RowKind 型を各行に自動的に割り当てます。この処理は、MySQL や PostgreSQL の CDC 機能と同様に、変更をミラー化します。以下のサンプルコードは、このモードでソーステーブルを作成する DDL ステートメントを示しています。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  -- INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての changeLog 型を読み取ります。
  'retry-count'='10',                     -- binlog 読み取りエラー発生時のリトライ回数。
  'retry-sleep-step-ms'='5000',           -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
  'source.binlog.batch-size'='512'        -- binlog から一度に読み取る行数。
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     -- binlog 読み取りエラー発生時のリトライ回数。
  'binlogRetryIntervalMs' = '500',  -- binlog 読み取りエラー発生後のリトライ間隔(ミリ秒単位)。
  'binlogBatchReadSize' = '100'     -- binlog から一度に読み取る行数。
);

非 CDC モード

このモードでは、ソースが消費する binlog データは、通常の Flink データとして子孫ノードに渡されます。つまり、すべてのデータは INSERT 型となります。必要に応じて、hg_binlog_event_type の特定の型をどのように処理するかを決定できます。以下のサンプルコードは、ソーステーブルの DDL ステートメントを示しています。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  -- すべての changelog 型を INSERT として扱います。
  'retry-count'='10',                     -- binlog 読み取りエラー発生時のリトライ回数。
  'retry-sleep-step-ms'='5000',           -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
  'source.binlog.batch-size'='512'        -- binlog から一度に読み取る行数。
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     -- binlog 読み取りエラー発生時のリトライ回数。
  'binlogRetryIntervalMs' = '500',  -- binlog 読み取りエラー発生後のリトライ間隔(ミリ秒単位)。
  'binlogBatchReadSize' = '100'     -- binlog から一度に読み取る行数。
);

binlog が無効化されたソーステーブル

VVR 11+

重要

デフォルトでは、Ververica Runtime(VVR)11.1 以降のバージョンでは、バイナリログデータが消費されます。詳細については、「binlog ソーステーブル」をご参照ください。

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      -- binlog データを読み取らないようにします。
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

シンクテーブル

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

ディメンションテーブルのサンプル

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

機能の詳細

フルおよび増分の統合取り込み

適用シーン

  • この機能は、プライマリキーを持つターゲットテーブルにのみ適用されます。CDC モードの Hologres ソーステーブルとの併用を推奨します。

  • Hologres では、必要に応じて binlog をオンデマンドで有効化できます。履歴データを含むテーブルに対しては、binlog の有効化を行ってください。

サンプルコード

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   -- 最初にすべての履歴データを読み取り、その後増分読み取りを開始します。
  'retry-count'='10',                         -- binlog 読み取りエラー発生時のリトライ回数。
  'retry-sleep-step-ms'='5000',               -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
  'source.binlog.batch-size'='512'            -- binlog から一度に読み取る行数。
  );
説明
  • source.binlog.startup-modeINITIAL に設定すると、まずすべての履歴データを読み取り、その後増分読み取りを開始します。

  • startTime パラメーターを設定するか、起動インターフェイスで開始時刻を選択した場合、binlogStartUpMode は強制的に timestamp 消費モードを使用し、他の消費モードは無視されます。startTime パラメーターの優先度が高いためです。

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', -- 最初にすべての履歴データを読み取り、その後増分読み取りを開始します。
  'binlogMaxRetryTimes' = '10',     -- binlog 読み取りエラー発生時のリトライ回数。
  'binlogRetryIntervalMs' = '500',  -- binlog 読み取りエラー発生後のリトライ間隔(ミリ秒単位)。
  'binlogBatchReadSize' = '100'     -- binlog から一度に読み取る行数。
  );
説明
  • binlogStartUpModeinitial に設定すると、まずすべての履歴データを読み取り、その後増分読み取りを開始します。

  • startTimebinlogStartUpMode より優先されます。そのため、startTime パラメーターを設定するか、起動インターフェイスで開始時刻を選択した場合、binlogStartUpMode は自動的に timestamp モードを使用し、他の消費モードは有効になりません。

プライマリキーの競合処理

Hologres への書き込み時に、重複するプライマリキーを持つデータが存在する場合、コネクタは 3 つの処理戦略を提供します。

VVR 11+

sink.on-conflict-action オプションを指定することで、異なる戦略を実装できます。

sink.on-conflict-action の値

説明

INSERT_OR_IGNORE

データの最初の出現を保持し、後続の重複を無視します。

INSERT_OR_REPLACE

既存の行を新しいデータで置き換えます。

INSERT_OR_UPDATE(デフォルト)

指定されたカラムのみを更新し、既存の行の他のカラムは変更しません。

VVR 8+

mutatetype オプションを指定することで、異なる戦略を実装できます。

mutatetype の値

説明

insertorignore(デフォルト)

データの最初の出現を保持し、後続の重複を無視します。

insertorreplace

既存の行を新しいデータで置き換えます。

insertorupdate

指定されたカラムのみを更新し、既存の行の他のカラムは変更しません。

テーブルに a、b、c、d というフィールドがあり、a がプライマリキーであると仮定します。結果テーブルに a および b のみのフィールドが含まれている場合、INSERT_OR_UPDATE を設定すると、b のみが更新され、c および d は変更されません。
説明

結果テーブルのカラム数は、物理的な Hologres テーブルより少なくても構いませんが、不足しているカラムはすべて NULL 許容である必要があります。そうでない場合、書き込み操作は失敗します。

パーティションテーブルへの書き込み

デフォルトでは、Hologres 結果テーブルは非パーティションテーブルへのデータインポートのみをサポートします。パーティションテーブルへのデータインポートを行うには、以下の構成を有効化する必要があります:

VVR 11+

sink.create-missing-partitiontrue に設定します。これにより、存在しない子パーティションをコネクタが自動的に作成できます。

説明
  • VVR 11.1 以降では、パーティションテーブルへの書き込みがサポートされ、対応する子パーティションへのデータの自動ルーティングが可能です。

  • tablename には親テーブルの名前を指定します。

  • 子テーブルを事前に作成せず、sink.create-missing-partition=true を設定しない場合、書き込みは失敗します。

VVR 8+

  • partitionRoutertrue に設定すると、対応する子パーティションへのデータの自動ルーティングが可能になります。

  • createparttabletrue に設定すると、存在しない子パーティションを自動的に作成できます。

説明
  • tablename には親テーブルの名前を指定します。

  • 書き込みを確実に行うには、子パーティションを事前に作成するか、createparttable=true を設定してください。

マルチストリーム書き込みを伴うワイドテーブルへのマージおよび部分更新

コネクタは、複数のデータストリームを単一の Hologres ワイドテーブルに効率的にマージできます。プライマリキーに基づき、変更されたカラムのみを更新する部分行更新をサポートしており、書き込み効率を最適化し、データ整合性を確保します。

制限事項

  • ワイドテーブルにはプライマリキーが必要です。

  • 各データストリームには、すべてのプライマリキーカラムが含まれている必要があります。

  • Hologres ワイドテーブルが列指向ストレージを使用している場合、高い RPS により CPU 使用率が上昇する可能性があります。この問題を緩和するには、テーブルのカラムに対する辞書エンコーディングを無効化することを検討してください。

サンプル

2 つのデータストリームがあると仮定します。1 つ目のストリームには a、b、c のカラムが含まれ、2 つ目のストリームには a、d、e のカラムが含まれます。Hologres ワイドテーブル WIDE_TABLE には a、b、c、d、e のカラムが含まれ、a がプライマリキーです。

VVR 11+

// source1 および source2 はすでに定義済みと仮定します。
CREATE TEMPORARY TABLE hologres_sink ( -- a、b、c、d、e の 5 つのカラムを宣言します。
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- a、b、c、d、e を含む Hologres ワイドテーブル。
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- プライマリキーに基づき、指定されたカラムのみを更新します。
  'sink.delete-strategy'='IGNORE_DELETE',         -- 再送信メッセージによる削除要求の処理戦略。挿入または更新のみを必要とするシナリオには IGNORE_DELETE が適しています。
  'sink.partial-insert.enabled'='true'            -- 部分カラム更新を有効化します。これにより、INSERT ステートメントで指定されたカラムのみを更新または挿入できます。
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- a、b、c のみを挿入します。
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- a、d、e のみを挿入します。
END;

VVR 8+

// source1 および source2 はすでに定義済みと仮定します。
CREATE TEMPORARY TABLE hologres_sink ( -- a、b、c、d、e の 5 つのカラムを宣言します。
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- a、b、c、d、e を含む Hologres ワイドテーブル。
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- プライマリキーに基づき、指定されたカラムのみを更新します。
  'ignoredelete'='true',            -- 再送信メッセージによって生成された削除要求を無視します。
  'partial-insert.enabled'='true'   -- 部分カラム更新を有効化し、INSERT ステートメントで宣言されたカラムのみの更新をサポートします。
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- a、b、c のみを挿入します。
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- a、d、e のみを挿入します。
END;
説明

ignoredeletetrue に設定すると、再送信メッセージによって生成された削除要求が無視されます。VVR 8.0.8 以降では、sink.delete-strategy を使用して、部分更新のさまざまな戦略を構成することを推奨します。

パーティションテーブルからの binlog 読み取り(ベータ版)

パーティションテーブルは、データアーカイブおよびクエリパフォーマンスを向上させます。Hologres コネクタは、物理パーティションテーブルおよび論理パーティションテーブルの両方からの binlog 読み取りをサポートします。詳細については、「論理パーティションテーブルの作成」をご参照ください。

物理パーティションテーブルからのバイナリログの消費

Hologres コネクタは、パーティションテーブルからの binlog 読み取りをサポートし、単一のジョブ内で新規パーティションを動的に監視できます。これにより、リアルタイムデータ処理の効率性および利便性が大幅に向上します。

注意事項

  • パーティションテーブルからの binlog 読み取りには、VVR 8.0.11+、Hologres 2.1.27+、binlog が有効化されたテーブル、および JDBC 消費が必要です。

  • パーティション名は、厳密に親テーブル名、アンダースコア(_)、パーティション値で構成される必要があります。すなわち、{parent_table}_{partition_value} の形式です。この形式に従わないパーティションは消費できません。詳細については、「動的パーティション管理」をご参照ください。

    重要
    • DYNAMIC モードでは、VVR 8.0.11 は - 区切りのパーティションフィールド(例: YYYY-MM-DD)をサポートしていません。

    • VVR 11.1+ では、カスタム名形式のパーティションからの読み取りを完全にサポートしています。

    • この制限は読み取りにのみ適用され、パーティションテーブルへの書き込みには影響しません。

  • Flink で Hologres ソーステーブルを宣言する場合、Hologres パーティションテーブルのパーティションカラムを含める必要があります。

  • DYNAMIC モードでは、パーティションテーブルに動的パーティション管理が有効化されていることを確認してください。また、パーティションの事前作成パラメーター auto_partitioning.num_precreate は 1 より大きい値である必要があります。そうでない場合、最新のパーティションを読み取る際にジョブが例外をスローします。

  • DYNAMIC パーティション binlog 消費モードでは、新規パーティションが追加された後、古いパーティションからの増分データは読み取られません。

サンプル

パターンタイプ

機能

説明

DYNAMIC

動的パーティション読み取り

新規パーティションを自動的に監視し、時系列順に読み取り進行状況を動的に進めます。リアルタイムユースケースに適しています。

STATIC

静的パーティション消費

既存のパーティション(または明示的に指定されたパーティション)のみを読み取り、新規パーティションを自動的に検出しません。固定範囲内の履歴データ処理に適しています。

DYNAMIC モード

VVR 11+

Hologres に、以下の DDL で定義されたパーティションテーブルが存在し、バイナリログおよび動的パーティションが有効化されていると仮定します。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 動的パーティションを有効化します。
    auto_partitioning_time_unit = 'DAY',  -- パーティションは日単位で作成されます。例: test_message_src1_20250512、test_message_src1_20250513。
    auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);
-- 既存のパーティションテーブルに対しても、ALTER TABLE を使用して動的パーティションを有効化できます。

Flink では、以下の SQL ステートメントを使用して、test_message_src1 パーティションテーブルを DYNAMIC モードで消費します。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Hologres パーティションテーブルのパーティションカラム。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 動的パーティションが有効化された Hologres テーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 最新のパーティションを動的に監視します。
  'source.binlog.startup-mode' = 'initial'           -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
);

VVR 8.0.11

Hologres に、以下の DDL で定義されたパーティションテーブルが存在し、バイナリログおよび動的パーティションが有効化されていると仮定します。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 動的パーティションを有効化します。
    auto_partitioning_time_unit = 'DAY',  -- パーティションは日単位で作成されます。例: test_message_src1_20241027、test_message_src1_20241028。
    auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);

-- 既存のパーティションテーブルに対しても、ALTER TABLE を使用して動的パーティションを有効化できます。

Flink では、以下の SQL ステートメントを使用して、test_message_src1 パーティションテーブルを DYNAMIC モードで消費します。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Hologres パーティションテーブルのパーティションカラム。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 動的パーティションが有効化された Hologres テーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- 最新のパーティションを動的に監視します。
  'binlogstartUpMode' = 'initial',      -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
  'sdkMode' = 'jdbc_fixed'              -- 接続数制限の問題を回避するためにこのモードを使用します。
);

STATIC モード

VVR 11+

Hologres に、以下の Data Definition Language(DDL)で定義されたパーティションテーブルが存在し、バイナリログが有効化されていると仮定します。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Flink では、以下の SQL ステートメントを使用して、test_message_src2 パーティションテーブルを STATIC モードで消費します。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Hologres パーティションテーブルのパーティションカラム。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- パーティションテーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- 固定のパーティションセットを読み取ります。
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- red、blue、green の 3 つのパーティションのみを読み取ります。black パーティションは読み取られません。新規パーティションも読み取られません。設定しない場合、親テーブルのすべてのパーティションが読み取られます。
  'source.binlog.startup-mode' = 'initial'  -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
);

VVR 8.0.11

Hologres に、以下の DDL で定義されたパーティションテーブルが存在し、バイナリログが有効化されていると仮定します。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Flink では、以下の SQL ステートメントを使用して、test_message_src2 パーティションテーブルを STATIC モードで読み取ります。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Hologres パーティションテーブルのパーティションカラム。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- パーティションテーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- 固定のパーティションセットを読み取ります。
  'partition-values-to-read' = 'red,blue,green',  -- red、blue、green の 3 つのパーティションのみを読み取ります。black パーティションは読み取られません。新規パーティションも読み取られません。設定しない場合、親テーブルのすべてのパーティションが読み取られます。
  'binlogstartUpMode' = 'initial',  -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
  'sdkMode' = 'jdbc_fixed' -- 接続数制限の問題を回避するためにこのモードを使用します。
);

論理パーティションテーブルからのバイナリログの消費

Hologres コネクタは、論理パーティションテーブルからの binlog 読み取りをサポートし、明示的に読み取るパーティションを指定できます。

注意事項

  • 特定のパーティションからの binlog 読み取りには、VVR 11.0.0+ および Hologres V3.1+ が必要です。

  • すべてのパーティションからの binlog 読み取りは、非パーティション Hologres テーブルからの binlog 読み取りと同等です。手順については、「ソーステーブル」をご参照ください。

サンプル

パラメーター名

説明

サンプル

source.binlog.logical-partition-filter-column-names

パーティションカラム名を二重引用符で囲みます。複数のカラム名はカンマで区切ります。カラム名に二重引用符が含まれる場合は、別の二重引用符でエスケープします。

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

パーティションカラムは Pt および id です。

source.binlog.logical-partition-filter-column-values

パーティションカラムの値。各パーティションはカラム値のセットで指定されます。異なるカラムの値はカンマで区切り、二重引用符で囲みます。値に二重引用符が含まれる場合は、別の二重引用符でエスケープします。複数のパーティションはセミコロンで区切ります。

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

2 つのパーティションの消費を指定します。パーティションキーには 2 つのカラムがあります。最初のパーティションキー値は (20240910, 0)、2 番目のパーティションキー値は (special"value, 9) です。

Hologres に以下のテーブルが作成されていると仮定します:

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

このテーブルのバイナリログを Flink で消費します。

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  -- INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての changeLog 型を読み取ります。
  'retry-count'='10',                     -- binlog 読み取りエラー発生時のリトライ回数。
  'retry-sleep-step-ms'='5000',           -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
  'source.binlog.batch-size'='512'        -- binlog から一度に読み取る行数。
);

DataStream API

重要

DataStream API を使用するには、プロジェクトに Hologres DataStream コネクタの依存関係を含める必要があります。DataStream コネクタの統合および使用方法については、「DataStream プログラムにおけるコネクタの統合および使用」をご参照ください。Hologres DataStream コネクタ は、Maven Central Repository で入手できます。ローカルデバッグには、対応する Uber JAR を使用してください。詳細については、「コネクタを含むジョブのローカル実行およびデバッグ」をご参照ください。

Hologres ソーステーブル

binlog が有効化されたソーステーブル

Realtime Compute for Apache Flink では、HologresBinlogSource クラスを使用して Hologres binlog データを読み取ることができます。以下の例は、HologresBinlogSource の作成方法を示しています。

VVR 11.3+

重要

VVR 11.1.2 以降では、JDBCOptions および startTimeMs パラメーターが HologresBinlogSource コンストラクターから削除されました。VVR 11.3 以降では、List<Subscribe.BinlogFilter> パラメーターが追加されました。VVR 11 以降をご利用の場合は、VVR 11.3 以降の使用を推奨します。

public class Sample {                                                                                                                                                                          
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
            TableSchema schema = TableSchema.builder()
                    .field("a", DataTypes.INT())
                    .field("b", DataTypes.STRING())
                    .field("c", DataTypes.TIMESTAMP())
                    .build();

            // 読み取るテーブルの名前。
            String sourceTableName = "sourceTableName";

            // Hologres コネクタオプション。
            Configuration config = new Configuration();
            config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
            config.setString(HologresConfigs.USERNAME, "yourUserName");
            config.setString(HologresConfigs.PASSWORD, "yourPassword");
            config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
            config.setString(HologresConfigs.TABLE, sourceTableName);
            config.set(HologresConfigs.BINLOG, true);
            config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
            // HologresBinlogSource を構築します。
            HologresBinlogSource source = new HologresBinlogSource(
                    new HologresConnectionParam(config),
                    schema,
                    config,
                    StartupMode.INITIAL,
                    sourceTableName,
                    "",
                    Collections.emptyList(),
                    -1,
                    Collections.emptySet(),
                    Collections.emptyList()
            );
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
            env.execute();
        }
  }

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres コネクタオプション。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // HologresBinlogSource を構築します。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres コネクタオプション。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // HologresBinlogSource を構築します。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Hologres コネクタオプション。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // 既定のスロット名を設定または作成します。
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // JDBCBinlogRecordConverter を構築します。
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // HologresBinlogSource を構築します。
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
重要

Realtime Compute for Apache Flink エンジンのバージョンが 8.0.5 より古いか、Hologres のバージョンが V2.1 より古い場合、ユーザーがスーパーユーザーであるか、Replication Role 権限を持っていることを確認してください。詳細については、「Hologres の権限に関する問題」をご参照ください。

binlog が無効化されたソーステーブル

Realtime Compute for Apache Flink では、HologresBulkreadInputFormat クラス(RichInputFormat の実装)を使用して、Hologres テーブルからデータを読み取ることができます。以下の例は、binlog が無効化された Hologres テーブルからデータを読み取る Hologres ソースの構築方法を示しています。

public class Sample {
    public static void main(String[] args) throws Exception {
        // Java DataStream API を設定します。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres コネクタオプション。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Maven 依存関係

Hologres DataStream コネクタ は、Maven Central Repository で入手できます。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres 結果テーブル

VVR 11+

public class Sample {
      public static void main(String[] args) throws Exception {
          final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // 書き込むテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
          TableSchema tableSchema = TableSchema.builder()
                  .field("a", DataTypes.INT().notNull())
                  .field("b", DataTypes.STRING())
                  .primaryKey("a")
                  .build();
          // Hologres コネクタオプション。
          Configuration config = new Configuration();
          config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
          config.set(HologresConfigs.USERNAME, "yourUserName");
          config.set(HologresConfigs.PASSWORD, "yourPassword");
          config.set(HologresConfigs.DATABASE, "yourDatabaseName");
          config.set(HologresConfigs.TABLE, "yourTableName");
          HologresConnectionParam connectionParam = new HologresConnectionParam(config);
          HologresTableSchema hologresTableSchema =
                  HologresTableSchema.get(connectionParam.getJDBCOptions());
          // 結果テーブルに書き込むカラムのインデックス。
          Integer[] targetColumnIndexes = {0, 1};
          // Hologres 結果テーブルを構築します。
          HologresSinkFunction sinkFunction =
                  new HologresSinkFunction(
                          connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
          TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
          env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
          env.execute();
      }
  }

VVR 8+

public class Sample {
    public static void main(String[] args) throws Exception {
        // Java DataStream API を設定します。
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 書き込むテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Hologres コネクタオプション。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // RowData としてデータを書き込む Hologres ライターを構築します。
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Hologres 結果テーブルを構築します。
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

メタデータカラム

Realtime Compute for Apache Flink VVR 8.0.11 以降では、binlog が有効化されたソーステーブルのメタデータカラムがサポートされています。このバージョン以降では、hg_binlog_event_type などの binlog フィールドをメタデータカラムとして宣言することを推奨します。メタデータカラム は SQL 標準を拡張したもので、ソースのデータベース名およびテーブル名、データの変更タイプおよびタイムスタンプなど、特定の情報を取得できます。これらの情報を使用して、DELETE イベントを除外するなどのカスタム処理ロジックを定義できます。

フィールド名

データ型

説明

db_name

STRING NOT NULL

行を含むデータベースの名前。

table_name

STRING NOT NULL

行を含むテーブルの名前。

hg_binlog_lsn

BIGINT NOT NULL

binlog のシステムカラムで、binlog シーケンス番号を表します。シャード内では単調増加ですが、連続していません。異なるシャード間では一意性や順序性は保証されません。

hg_binlog_timestamp_us

BIGINT NOT NULL

データベース内の変更イベントのタイムスタンプ(マイクロ秒単位)。

hg_binlog_event_type

BIGINT NOT NULL

行の CDC イベントタイプ。有効な値:

  • 5: INSERT

  • 2: DELETE

  • 3: UPDATE_BEFORE

  • 7: UPDATE_AFTER

hg_shard_id

INT NOT NULL

データが存在するシャード。詳細については、「テーブルグループおよびシャード」をご参照ください。

DDL ステートメントでは、<meta_column_name> <datatype> METADATA VIRTUAL を使用してメタデータカラムを宣言します。以下の例を参照してください:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

よくある質問

関連ドキュメント