すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Hologres

最終更新日:Nov 09, 2025

このトピックでは、Hologres コネクタの使用方法について説明します。

背景

Hologres は、大量のデータのリアルタイム書き込み、更新、分析をサポートするワンストップのリアルタイムデータウェアハウスエンジンです。標準 SQL をサポートし、PostgreSQL プロトコルと互換性があります。Hologres は、多次元オンライン分析処理 (OLAP)、ペタバイト規模のデータのアドホック分析、高い同時実行性と低レイテンシーでのオンラインデータサービングもサポートしています。Hologres は MaxCompute、Flink、DataWorks と深く統合されており、オフラインとオンラインの両方のデータウェアハウスに統合ソリューションを提供します。Hologres コネクタは、次の機能をサポートしています。

カテゴリ

詳細

サポートされるタイプ

ソース、ディメンション、および結果テーブル

実行モード

ストリームモードとバッチモード

データフォーマット

非対応

特定のモニタリングメトリック

監視メトリック

  • ソーステーブル:

    • numRecordsIn

    • numRecordsInPerSecond

  • シンクテーブル:

    • numRecordsOut

    • numRecordsOutPerSecond

    • currentSendTime

    説明

    メトリックの詳細については、「監視メトリック」をご参照ください。

API タイプ

DataStream と SQL

結果テーブルでのデータ更新または削除のサポート

はい

特徴

特徴

詳細

Hologres データのリアルタイム消費

バイナリログの有無にかかわらず Hologres データを読み取ることができます。この機能は、変更データキャプチャ (CDC) モードと非 CDC モードの両方と互換性があります。

フルと増分の統一消費

フル、増分、または 統合されたフルおよび増分 の消費を実行できます。

プライマリキー競合処理ポリシー

新しいデータを無視する、行全体を置き換える、または特定のフィールドのみを更新することができます。

複数ストリーム書き込みにおけるワイドテーブルのマージと部分更新

行全体ではなく、変更された列のみを更新できます。

パーティションテーブルのバイナリログを消費する (パブリックプレビュー)

物理パーティションテーブルからバイナリログをコンシュームできます。1 つのジョブで、新しく追加されたパーティションを含め、すべてのパーティションをモニターできます。論理パーティションテーブルからバイナリログをコンシュームすることもできます。

パーティションテーブルへの書き込み

パーティションテーブルの親テーブルにデータを書き込み、対応する子パーティションを自動的に作成することができます。

単一テーブルまたはデータベース全体のリアルタイム同期

単一テーブルまたはデータベース全体のレベルでのデータのリアルタイム同期を実行できます。この機能は、次の機能を提供します。

  • 先祖テーブルのスキーマ進化の自動検出: ソースデータベーステーブルのスキーマが変更された場合、Hologres はこれらの変更をリアルタイムで結果テーブルに同期できます。

  • スキーマ進化の自動処理: 新しいデータが Hologres に流入した場合、Flink はデータを書き込む前にまずスキーマ変更操作をトリガーします。このプロセスでは手動での介入は不要です。

詳細については、「CREATE TABLE AS (CTAS) 文」および「リアルタイムデータベース同期のクイックスタート」をご参照ください。

制限事項と提案

制限事項

  • 外部テーブルはサポートされていません: Hologres コネクタは、MaxCompute 外部テーブルなどの Hologres 外部テーブルへのアクセスをサポートしていません。

  • 時間型の制限: TIMESTAMP データのリアルタイム消費はサポートされていません。テーブルを作成するときは TIMESTAMPTZ 型を使用する必要があります

  • ソーステーブルのスキャンモード (Ververica Runtime (VVR) 8 以前): デフォルトでは、データはバッチモードで読み取られます。テーブル全体は一度しかスキャンされず、新しいデータは消費されません。

  • ウォーターマークの制限 (VVR 8 以前): CDC モードはウォーターマークの定義をサポートしていません。ウィンドウ集約の場合、非ウィンドウ集約ソリューションを使用する必要があります。

提案

  • ストレージモードの選択:

    • ディメンションテーブルでのポイントクエリ: 行指向ストレージを使用します。プライマリキーとクラスタリングキーを設定する必要があります。

    • ディメンションテーブルでの 1 対多クエリ: 列指向ストレージを使用します。分散キーセグメントキー (イベント時間列) を構成してパフォーマンスを最適化できます。

    • 高頻度の更新と分析クエリのためのテーブル: テーブルがバイナリログ消費と OLAP 分析の両方をリアルタイムでサポートする必要がある場合、行列表ハイブリッドストレージモードを使用します

    重要

    Hologres でテーブルを作成するときにモードを指定しない場合、デフォルトのストレージモードは列指向です。テーブルが作成された後、ストレージモードは変更できません。詳細については、「Hologres でテーブルを作成する」および「テーブルストレージ形式: 列指向、行指向、行列表ハイブリッド」をご参照ください。

  • ジョブの同時実行性の設定: Flink ジョブの同時実行性を Hologres テーブルのシャード数と同じに設定できます。

    # Hologres コンソールで、次のステートメントを実行してテーブルのシャード数を表示します。tablename をテーブルの名前に置き換えます。
    select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';
  • バージョンと特徴: 既知のバグ、機能の更新、バージョンの互換性情報については、Hologres コネクタのリリースノートを定期的に確認することをお勧めします。

注意

  • Hologres と VVR のバージョンの互換性と制限事項

    ソーステーブル

    • VVR 8 以前の場合、sdkMode パラメーターを指定して消費モードを選択できます。

    • VVR 11 以降の場合、source.binlog.read-mode パラメーターを指定して消費モードを選択できます。

    VVR バージョン

    Hologres バージョン

    デフォルト/推奨パラメーター値

    実際の消費モード

    注意

    6.0.7 以降

    < 2.0

    カスタム

    holohub (デフォルト)

    jdbc を設定します。

    6.0.7 から 8.0.4

    2.0 以上

    jdbc (自動スイッチオーバー、構成不要)

    jdbc (強制)

    Hologres 2.0 以降では holohub サービスが公開されていないため、モードは自動的に jdbc に切り替わります。これにより、権限の問題が発生する可能性があります。詳細については、「権限の問題」をご参照ください。

    8.0.5 以降

    2.1 以上

    jdbc (自動スイッチオーバー、構成不要)

    jdbc (強制)

    権限の問題はありません。Hologres 2.1.27 以降では、モードは jdbc_fixed に切り替えられます。

    11.1 以上

    任意のバージョン

    AUTO (デフォルト)

    Hologres のバージョンに基づいて自動的に選択されます

    • バージョン 2.1.27 以降では、jdbc モードが選択され、デフォルトで軽量接続が有効になります。これは、connection.fixed.enabled パラメーターがデフォルトで true に設定されていることを意味します。

    • バージョン 2.1.0 から 2.1.26 までは、jdbc モードが選択されます。

    • バージョン 2.0 以前では、holohub モードが選択されます。

    重要

    VVR 11.1 以降では、デフォルトでバイナリログデータが消費されます。バイナリログが有効になっていることを確認してください。そうしないと、エラーが発生する可能性があります。

    権限の問題

    ユーザーがスーパーユーザーでない場合、JDBC モードでバイナリログを消費するには権限を構成する必要があります。

    [user_name] は Alibaba Cloud アカウント ID または RAM ユーザーを指定します。詳細については、「アカウントの概要」をご参照ください。

    -- 標準 PostgreSQL 権限モデルでは、ユーザーに CREATE 権限を付与し、インスタンスに対する Replication Role 権限をユーザーに付与します。
    GRANT CREATE ON DATABASE <db_name> TO <user_name>;
    alter role <user_name> replication;
    
    -- データベースが簡易権限モデル (SLMP) を使用している場合、GRANT 文を実行できません。spm_grant を使用して、データベースに対する Admin 権限をユーザーに付与します。HoloWeb コンソールで権限を付与することもできます。
    call spm_grant('<db_name>_admin', '<user_name>');
    alter role <user_name> replication;

    シンクテーブル

    • VVR 8 以前の場合、sdkMode パラメーターを指定して消費モードを選択できます。

    • VVR 11 以降の場合、sink.write-mode パラメーターを指定して消費モードを選択できます。

    VVR バージョン

    Hologres バージョン

    rpc モードは影響を受けますか?

    実際の rpc 消費モード

    推奨/デフォルトのパラメーター値

    備考

    6.0.4 から 8.0.2

    < 2.0

    いいえ

    rpc

    カスタム

    /

    6.0.4 ~ 8.0.2

    ≥ 2.0

    はい

    jdbc_fixed (自動スイッチオーバー)

    カスタム

    'jdbcWriteBatchSize' を '1' に設定して、重複排除を防ぐことができます。

    8.0.3 以降

    任意のバージョン

    はい

    jdbc_fixed (自動スイッチオーバー)

    カスタム

    rpc モードを設定すると、重複排除を防ぐために、パラメーターの値が自動的に jdbc_fixed に切り替わり、'jdbcWriteBatchSize' が '1' に設定されます。

    8.0.5 以降

    すべてのバージョン

    はい

    jdbc_fixed (自動スイッチオーバー)

    カスタム

    rpc モードを設定すると、パラメーター値は自動的に jdbc_fixed に切り替えられ、重複排除を防ぐために 'deduplication.enabled' は 'false' に設定されます。

    重要
    • rpc サービスは Hologres 2.0 以降では公開されていません。このパラメーターを rpc に設定すると、Flink システムは自動的にパラメーター値を jdbc_fixed に切り替えます。ただし、別の値に設定した場合、Flink システムは構成した値を使用します。

    • rpc モードは VVR 11.1 以降では削除されています。接続には jdbc モードを使用することをお勧めします。

    • 高い同時実行性シナリオでの書き込みには、jdbc_copy/COPY_STREAM モードを使用できます。

    ディメンションテーブル

    VVR バージョン

    Hologres バージョン

    rpc モードは影響を受けますか?

    実際の rpc 消費モード

    推奨/デフォルトのパラメーター値

    注意事項

    6.0.4 ~ 8.0.2

    < 2.0

    いいえ

    rpc

    カスタム

    /

    6.0.4 から 8.0.2

    ≥ 2.0

    はい

    jdbc_fixed (自動スイッチオーバー)

    カスタム

    Hologres インスタンスがバージョン 2.0 以降の場合、rpc サービスは公開されていません。このパラメーターを rpc に設定すると、Flink システムは自動的にパラメーター値を jdbc_fixed に切り替えます。ただし、別の値に設定した場合、Flink システムは構成した値を使用します。

    8.0.3 以降

    任意バージョン

    はい

    jdbc_fixed (自動スイッチオーバー)

    カスタム

    8.0.5 以降

    任意のバージョン

    はい

    jdbc_fixed (自動スイッチオーバー)

    カスタム

    重要

    rpc モードは VVR 11.1 以降では削除されています。接続にはデフォルトで jdbc モードが使用されます。必要に応じて connection.fixed.enabled パラメーターを有効にして、軽量接続モードを使用できます。

  • JDBC モードのバイナリログソーステーブルは JSONB 型の読み取りをサポートしています。データベースレベルで Grand Unified Configuration (GUC) パラメーターを有効にする必要があります。

    -- データベースレベルで GUC パラメーターを有効にします。スーパーユーザーのみがこのステートメントを実行できます。このパラメーターは、各データベースに対して一度だけ設定する必要があります。
    alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;
  • UPDATE 操作は、2 つの連続したバイナリログレコードを生成します。古いデータのレコード (update_before) の後に、新しいデータのレコード (update_after) が続きます。

  • バイナリログソーステーブルで TRUNCATE やその他のテーブル再作成操作を実行しないでください。詳細については、「よくある質問」をご参照ください。

  • エラーを防ぐために、DECIMAL 型の精度が Flink と Hologres で同じであることを確認してください。詳細については、「よくある質問」をご参照ください。

  • ソーステーブルデータのフルの消費と増分の消費の統合に INITIAL モードを使用する場合、グローバルな順序は保証されません。ダウンストリームアプリケーションが計算のために時間フィールドに依存している場合は、別の純粋なバイナリログ消費モードを使用する必要があります。

バイナリロギングの有効化

テーブルが作成されていない場合

リアルタイム消費機能はデフォルトで無効になっています。Hologres コンソールでテーブルを作成する場合、DDL 文で binlog.level および binlog.ttl パラメーターを設定する必要があります。次のコードに例を示します。

begin;
create table test_table(
  id int primary key, 
  title text not null, 
  body text);
call set_table_property('test_table', 'orientation', 'row');--test_table という名前の行指向テーブルを作成します。
call set_table_property('test_table', 'clustering_key', 'id');--id 列にクラスタリングキーを作成します。
call set_table_property('test_table', 'binlog.level', 'replica');--テーブルプロパティを設定してバイナリロギングを有効にします。
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl はバイナリログの TTL を秒単位で指定します。
commit;

既存のテーブルの場合

Hologres コンソールで、次のステートメントを実行して既存のテーブルのバイナリロギングを有効にし、バイナリログの TTL を設定できます。table_name は、バイナリロギングを有効にするテーブルの名前を指定します。

-- テーブルプロパティを設定してバイナリロギングを有効にします。
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;

-- テーブルプロパティを設定してバイナリログの TTL を秒単位で構成します。
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;

コネクタオプション

VVR 11 以降、Hologres をより適切にサポートするために WITH パラメーターが調整されています。一部のパラメーターは名前が変更されたり、削除されたりする場合があります。VVR 11 は VVR 8 と下位互換性があります。お使いの VVR バージョンに対応するパラメーターの説明を見つけてください。

型マッピング

Flink と Hologres 間のデータ型マッピングの詳細については、「Flink と Hologres 間のデータ型マッピング」をご参照ください。

ソーステーブルの例

Binlog ソーステーブル

CDC モード

このモードでは、ソースによって消費されるバイナリログデータについて、hg_binlog_event_type に基づいて各行の Flink RowKind 型が自動的かつ正確に設定されます。型を明示的に宣言する必要はありません。例としては、INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER があります。これにより、MySQL や Postgres の CDC 機能と同様に、テーブルのミラー化されたデータ同期が可能になります。次のコードは、ソーステーブルの DDL 文の例です。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='${secret_values.ak_id}',            --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。 
  'password'='${secret_values.ak_secret}',        
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL',  --INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての ChangeLog タイプを読み取ります。
  'retry-count'='10',                     --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'retry-sleep-step-ms'='5000',           --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
  'source.binlog.batch-size'='512'        --各バッチでバイナリログから読み取るデータ行数。
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'sdkMode'='jdbc',
  'binlogMaxRetryTimes' = '10',     --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'binlogRetryIntervalMs' = '500',  --バイナリログの読み取りエラーが発生した後のリトライ間隔。
  'binlogBatchReadSize' = '100'     --各バッチでバイナリログから読み取るデータ行数。
);

非 CDC モード

このモードでは、ソースによって消費されるバイナリログデータは、通常の Flink データとして子孫ノードに渡されます。つまり、すべてのデータは INSERT 型になります。必要に応じて、特定の hg_binlog_event_type 型のデータをどのように処理するかを選択できます。次のコードは、ソーステーブルの DDL 文の例です。

VVR 11+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY',  --すべての ChangeLog タイプは INSERT として処理されます。
  'retry-count'='10',                     --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'retry-sleep-step-ms'='5000',           --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
  'source.binlog.batch-size'='512'        --各バッチでバイナリログから読み取るデータ行数。
);

VVR 8+

CREATE TEMPORARY TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'binlogMaxRetryTimes' = '10',     --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'binlogRetryIntervalMs' = '500',  --バイナリログの読み取りエラーが発生した後のリトライ間隔。
  'binlogBatchReadSize' = '100'     --各バッチでバイナリログから読み取るデータ行数。
);

非 Binlog ソーステーブル

VVR 11+

重要

VVR 11.1 以降の場合、バイナリログデータはデフォルトで消費されます。詳細については、「Binlog ソーステーブル」をご参照ください。

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='false'                      --バイナリログデータを消費するかどうかを指定します。
);

VVR 8+

CREATE TEMPORARY TABLE hologres_source (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sdkMode' = 'jdbc'
);

結果テーブルの例

CREATE TEMPORARY TABLE datagen_source(
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
  name varchar, 
  age BIGINT,
  birthday BIGINT
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;

ディメンションテーブルの例

CREATE TEMPORARY TABLE datagen_source (
  a INT,
  b BIGINT,
  c STRING,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
  a INT, 
  b VARCHAR, 
  c VARCHAR
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username' = '${secret_values.ak_id}',       --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。   
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
  a INT,
  b STRING
) WITH (
  'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;

詳細な機能説明

フルの消費と増分の消費の統合

シナリオ

  • この機能は、結果テーブルにプライマリキーがあるシナリオにのみ適用されます。CDC モードでフルおよび増分の Hologres ソーステーブルを使用することをお勧めします。

  • Hologres は、オンデマンドでバイナリロギングを有効にすることをサポートしています。既存データを含むテーブルに対してバイナリロギングを有効にすることができます。

コード例

VVR 11+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog.startup-mode' = 'INITIAL',   --最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
  'retry-count'='10',                         --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'retry-sleep-step-ms'='5000',               --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
  'source.binlog.batch-size'='512'            --各バッチでバイナリログから読み取るデータ行数。
  );
説明
  • source.binlog.startup-modeINITIAL に設定して、最初にすべてのデータを消費し、次に増分消費のためにバイナリログを読み取ることができます。

  • startTime パラメーターを設定するか、起動ページで開始時刻を選択すると、binlogStartUpMode は強制的に timestamp になります。startTime パラメーターの優先度が高いため、他の消費モードは有効になりません。

VVR 8+

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn BIGINT,
  hg_binlog_event_type BIGINT,
  hg_binlog_timestamp_us BIGINT,
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='<yourTablename>',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'binlog' = 'true',
  'cdcMode' = 'true',
  'binlogStartUpMode' = 'initial', --最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
  'binlogMaxRetryTimes' = '10',     --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'binlogRetryIntervalMs' = '500',  --バイナリログの読み取りエラーが発生した後のリトライ間隔。
  'binlogBatchReadSize' = '100'     --各バッチでバイナリログから読み取るデータ行数。
  );
説明
  • binlogStartUpModeinitial に設定して、最初にすべてのデータを消費し、次に増分消費のためにバイナリログを読み取ることができます。

  • startTime パラメーターを設定するか、起動ページで開始時刻を選択すると、binlogStartUpMode は強制的に timestamp になります。startTime パラメーターの優先度が高いため、他の消費モードは有効になりません。

プライマリキーの競合処理ポリシー

コネクタは、Hologres にデータを書き込む際に、重複するプライマリキーを持つデータを処理するための 3 つのポリシーを提供します。

VVR 11+

sink.on-conflict-action パラメーターを指定して、さまざまな処理ポリシーを実装できます。

sink.on-conflict-action の値

意味

INSERT_OR_IGNORE

データの最初の出現を保持し、後続の重複は無視します。

INSERT_OR_REPLACE

既存のデータを後続のデータで置き換えます。

INSERT_OR_UPDATE (デフォルト)

シンクで提供されるフィールドのみを更新します。その他のフィールドは変更されません。

VVR 8+

mutatetype パラメーターを指定して、さまざまな処理ポリシーを実装できます。

mutatetype の値

意味

insertorignore (デフォルト)

データの最初の発生を保持し、後続の重複は無視します。

挿入または置き換え

既存のデータを後続のデータで置き換えます。

insertorupdate

シンクで提供されるフィールドのみを更新します。その他のフィールドは変更されません。

テーブルにフィールド a、b、c、d があり、a がプライマリキーであるとします。結果テーブルがフィールド a と b のみを提供し、ポリシーを INSERT_OR_UPDATE に設定した場合、フィールド b のみが更新され、c と d は変更されません。
説明

結果テーブルのフィールド数は、Hologres 物理テーブルのフィールド数より少なくすることができます。ただし、欠落しているフィールドは null 値を許可する必要があります。そうしないと、書き込み操作は失敗します。

パーティションテーブルへの書き込み

デフォルトでは、Hologres シンクは単一テーブルへのデータインポートのみをサポートします。パーティションテーブルの親テーブルにデータをインポートするには、次の構成を有効にする必要があります:

VVR 11+

sink.create-missing-partitiontrue に設定できます。子パーティションが作成されていない場合、自動的に作成されます。

説明
  • VVR 11.1 以降は、デフォルトでパーティションテーブルへの書き込みをサポートし、データを対応する子パーティションに自動的にルーティングします。

  • tablename パラメーターを親テーブルの名前に設定する必要があります。

  • 子テーブルが事前に作成されておらず、sink.create-missing-partition=true が設定されていない場合、書き込み操作は失敗します。

VVR 8+

  • partitionRoutertrue に設定して、データを対応する子パーティションに自動的にルーティングできます。

  • createparttabletrue に設定できます。子パーティションが作成されていない場合、自動的に作成されます。

説明
  • tablename パラメーターを親テーブルの名前に設定する必要があります。

  • 子テーブルが事前に作成されておらず、createparttable=true が設定されていない場合、書き込み操作は失敗します。

複数ストリーム書き込みのためのワイドテーブルのマージと部分的な更新

同じ Hologres ワイドテーブルに複数のデータストリームを書き込む場合、システムは同じプライマリキーを持つデータを自動的にマージすることをサポートします。行全体を置き換える代わりに、変更された列のみを更新することを選択できます。これにより、書き込み効率とデータ整合性が向上します。

制限事項

  • ワイドテーブルにはプライマリキーが必要です。

  • 各データストリームのデータには、完全なプライマリキーフィールドが含まれている必要があります。

  • 1 秒あたりのリクエスト数 (RPS) が多い列指向ワイドテーブルがマージされるシナリオでは、CPU 使用率が高くなる可能性があります。テーブル内のフィールドの Dictionary Encoding 機能を無効にすることをお勧めします。

2 つの Flink データストリームがあるとします。1 つはフィールド a、b、c を含み、もう 1 つはフィールド a、d、e を含みます。Hologres ワイドテーブル WIDE_TABLE にはフィールド a、b、c、d、e が含まれ、a がプライマリキーです。

VVR 11+

// source1 と source2 はすでに定義されています。
CREATE TEMPORARY TABLE hologres_sink ( -- 5 つのフィールドを宣言します: a、b、c、d、e。
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- 5 つのフィールドを含む Hologres ワイドテーブル: a、b、c、d、e。
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'sink.on-conflict-action'='INSERT_OR_UPDATE',   -- プライマリキーに基づいてデータの一部の列を更新します。
  'sink.delete-strategy'='IGNORE_DELETE',         -- リトラクションメッセージの処理ポリシー。IGNORE_DELETE は、データの挿入または更新のみが必要で、削除は不要なシナリオに適しています。
  'sink.partial-insert.enabled'='true'            -- 部分列更新パラメーターを有効にします。INSERT 文で定義されたフィールドがコネクタにプッシュダウンされるため、宣言されたフィールドのみが更新または挿入できます。
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 3 つのフィールドのみが挿入されることを宣言します: a、b、c。
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 3 つのフィールドのみが挿入されることを宣言します: a、d、e。
END;

VVR 8+

// source1 と source2 はすでに定義されています。
CREATE TEMPORARY TABLE hologres_sink ( -- 5 つのフィールドを宣言します: a、b、c、d、e。
  a BIGINT, 
  b STRING,
  c STRING,
  d STRING,
  e STRING,
  primary key(a) not enforced
) WITH (
  'connector'='hologres',           
  'dbname'='<yourDbname>',
  'tablename'='<yourWideTablename>',  -- 5 つのフィールドを含む Hologres ワイドテーブル: a、b、c、d、e。
  'username' = '${secret_values.ak_id}',
  'password' = '${secret_values.ak_secret}',
  'endpoint'='<yourEndpoint>',
  'mutatetype'='insertorupdate',    -- プライマリキーに基づいてデータの一部の列を更新します。
  'ignoredelete'='true',            -- リトラクションメッセージによって生成された Delete リクエストを無視します。
  'partial-insert.enabled'='true'   -- 部分列更新パラメーターを有効にして、INSERT 文で宣言されたフィールドのみの更新をサポートします。
);

BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1;  -- 3 つのフィールドのみが挿入されることを宣言します: a、b、c。
INSERT INTO hologres_sink(a,d,e) select * from source2;  -- 3 つのフィールドのみが挿入されることを宣言します: a、d、e。
END;
説明

ignoredeletetrue に設定して、リトラクションメッセージによって生成される Delete リクエストを無視できます。VVR 8.0.8 以降では、sink.delete-strategy パラメーターを使用して部分更新のさまざまなポリシーを構成することをお勧めします。

パーティションテーブルのバイナリログの消費 (パブリックプレビュー)

パーティションテーブルは、データアーカイブとクエリの最適化に役立ちます。Hologres コネクタは、物理および論理パーティションテーブルからのバイナリログの消費をサポートしています。物理および論理パーティションテーブルの違いの詳細については、「CREATE LOGICAL PARTITION TABLE」をご参照ください。

物理パーティションテーブルのバイナリログの消費

Hologres コネクタは、単一のジョブでパーティションテーブルのバイナリログの消費をサポートし、新しいパーティションを動的にリッスンできます。これにより、リアルタイムデータ処理の効率と使いやすさが大幅に向上します。

注意

  • VVR 8.0.11 以降、Hologres インスタンス V2.1.27 以降、および JDBC モードのバイナリログソーステーブルのみが、パーティションテーブルのバイナリログの消費をサポートしています。

  • パーティション名は、親テーブル名、アンダースコア、パーティション値で厳密に構成する必要があり、フォーマットは {parent_table}_{partition_value} です。このフォーマットに従わないパーティションは消費されない可能性があります。詳細については、「動的パーティション管理」をご参照ください。

    重要
    • DYNAMIC モードの場合、VVR 8.0.11 は YYYY-MM-DD のような - 区切り文字を持つパーティションフィールドをサポートしていません。

    • VVR 11.1 以降、カスタムフォーマットのパーティションフィールドを消費できます。

    • このフォーマット制限は、パーティションテーブルにデータを書き込む場合には適用されません。

  • Flink で Hologres ソーステーブルを宣言する場合、Hologres パーティションテーブルのパーティションフィールドを含める必要があります。

  • DYNAMIC モードの場合、パーティションテーブルには動的パーティション管理が有効になっている必要があります。パーティション事前作成パラメーター auto_partitioning.num_precreate は 1 より大きい必要があります。そうしないと、ジョブが最新のパーティションを消費しようとすると例外がスローされます。

  • DYNAMIC モードでは、新しいパーティションが追加された後、古いパーティションの以降のデータ更新は読み取られなくなります。

モードタイプ

機能

シナリオの説明

ダイナミック

動的パーティション消費

新しいパーティションを自動的に検出し、消費の進捗を時系列順に動的に進めます。リアルタイムデータストリームシナリオに適しています。

STATIC

静的パーティション消費

既存のパーティションまたは手動で指定されたパーティションのみを読み取ります。新しいパーティションを自動的に検出しません。固定範囲内の既存データを処理するのに適しています。

DYNAMIC モード

VVR 11+

Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングと動的パーティショニングが有効になっているとします。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 動的パーティショニングを有効にします。
    auto_partitioning_time_unit = 'DAY',  -- 時間単位として DAY を使用します。パーティション名の例: test_message_src1_20250512, test_message_src1_20250513。
    auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);
-- 既存のパーティションテーブルの場合、ALTER TABLE を使用して動的パーティショニングを有効にすることもできます。

Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src1 の DYNAMIC モード消費を宣言できます。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 動的パーティショニングが有効になっている親テーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 最新のパーティションを動的にリッスンします。
  'source.binlog.startup-mode' = 'initial'           -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
);

VVR 8.0.11

Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングと動的パーティショニングが有効になっているとします。

CREATE TABLE "test_message_src1" (
    id int,
    title text,
    body text,
    dt text,
    PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
    binlog_level = 'replica', 
    auto_partitioning_enable =  'true',   -- 動的パーティショニングを有効にします。
    auto_partitioning_time_unit = 'DAY',  -- 時間単位として DAY を使用します。パーティション名の例: test_message_src1_20241027, test_message_src1_20241028。
    auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);

-- 既存のパーティションテーブルの場合、ALTER TABLE を使用して動的パーティショニングを有効にすることもできます。

Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src1 の DYNAMIC モード消費を宣言できます。

CREATE TEMPORary TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  dt VARCHAR  -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src1',  -- 動的パーティショニングが有効になっている親テーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'DYNAMIC',  -- 最新のパーティションを動的にリッスンします。
  'binlogstartUpMode' = 'initial',      -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
  'sdkMode' = 'jdbc_fixed'              -- 接続制限を回避するためにこのモードを使用します。
);

STATIC モード

VVR 11+

Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングが有効になっているとします。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src2 の STATIC モード消費を宣言できます。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- パーティションテーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'source.binlog.partition-binlog-mode' = 'STATIC', -- 固定パーティションを消費します。
  'source.binlog.partition-values-to-read' = 'red,blue,green',  -- 構成された 3 つのパーティションのみを消費します。「black」パーティションは消費されません。新しいパーティションは消費されません。設定しない場合、親テーブルのすべてのパーティションが消費されます。
  'source.binlog.startup-mode' = 'initial'  -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
);

VVR 8.0.11

Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングが有効になっているとします。

CREATE TABLE test_message_src2 (
    id int,
    title text,
    body text,
    color text,
    PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
    binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');

Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src2 の STATIC モード消費を宣言できます。

CREATE TEMPORARY TABLE hologres_source
(
  id INTEGER,
  title VARCHAR,
  body VARCHAR,
  color VARCHAR  -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
  'connector' = 'hologres',
  'dbname' = '<yourDatabase>',
  'tablename' = 'test_message_src2',  -- パーティションテーブル。
  'username' = '<yourUserName>',
  'password' = '<yourPassword>',
  'endpoint' = '<yourEndpoint>',
  'binlog' = 'true',
  'partition-binlog.mode' = 'STATIC', -- 固定パーティションを消費します。
  'partition-values-to-read' = 'red,blue,green',  -- 構成された 3 つのパーティションのみを消費します。「black」パーティションは消費されません。新しいパーティションは消費されません。設定しない場合、親テーブルのすべてのパーティションが消費されます。
  'binlogstartUpMode' = 'initial',  -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
  'sdkMode' = 'jdbc_fixed' -- 接続制限を回避するためにこのモードを使用します。
);

論理パーティションテーブルのバイナリログの消費

Hologres コネクタは、論理パーティションテーブルのバイナリログの消費をサポートし、パラメーターを介して指定されたパーティションを消費できます。

注意

  • VVR 11.0.0 以降および Hologres インスタンス V3.1 以降のみが、論理パーティションテーブル内の指定されたパーティションのバイナリログの消費をサポートしています。

  • 論理パーティションテーブル内のすべてのパーティションのバイナリログを消費することは、通常の Hologres テーブルのバイナログを消費することと同じです。詳細については、「ソーステーブルの例」をご参照ください。

パラメーター名

説明

source.binlog.logical-partition-filter-column-names

論理パーティションテーブルからバイナリログが消費される、指定されたパーティションのパーティションキー列の名前。パーティションキー列の名前は、二重引用符 (") で囲む必要があります。複数のパーティションキー列は、コンマ (,) で区切ります。列名に二重引用符が含まれる場合、別の二重引用符でエスケープします。

'source.binlog.logical-partition-filter-column-names'='"Pt","id"'

パーティションキー列は Pt と id の 2 つです。

source.binlog.logical-partition-filter-column-values

論理パーティションテーブルからバイナリログを消費する、指定されたパーティションのパーティションキー列の値。各パーティションは、複数のパーティションキー列の値によって指定できます。パーティションキー列の値は、コンマ (,) で区切ります。値はダブルクォーテーションマーク (") で囲みます。値にダブルクォーテーションマークが含まれる場合は、もう 1 つのダブルクォーテーションマークでエスケープします。複数のパーティションはセミコロン (;) で区切ります。

'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"'

2 つのパーティションが消費されることを指定します。パーティションキー列は 2 つあります。最初のパーティションの値は (20240910, 0) で、2 番目のパーティションの値は (special"value, 9) です。

Hologres で次のテーブルが作成されているとします。

CREATE TABLE holo_table (
    id int not null,
    name text,
    age numeric(18,4),
    "Pt" text,
    primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
    binlog_level ='replica'
);

Flink でこのテーブルのバイナリログを消費できます。

CREATE TEMPORARY TABLE test_src_binlog_table(
  id INTEGER,
  name VARCHAR,
  age decimal(18,4),
  `Pt` VARCHAR
) WITH (
  'connector'='hologres',
  'dbname'='<yourDbname>',
  'tablename'='holo_table',
  'username'='<yourAccessID>',
  'password'='<yourAccessSecret>',
  'endpoint'='<yourEndpoint>',
  'source.binlog'='true',
  'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
  'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
  'source.binlog.change-log-mode'='ALL',  --INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての ChangeLog タイプを読み取ります。
  'retry-count'='10',                     --バイナリログの読み取りエラーが発生した後のリトライ回数。
  'retry-sleep-step-ms'='5000',           --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
  'source.binlog.batch-size'='512'        --各バッチでバイナリログから読み取るデータ行数。
);

DataStream API

重要

DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。Hologres DataStream コネクタは Maven Central リポジトリで入手できます。ローカルデバッグの場合、対応する Uber JAR を使用する必要があります。詳細については、「コネクタを含むジョブをローカルで実行およびデバッグする」をご参照ください。

Hologres ソーステーブル

Binlog ソーステーブル

VVR は、Hologres バイナリログデータを読み取るための Source の HologresBinlogSource 実装クラスを提供します。次のコードは、Hologres バイナリログソースを構築する方法の例です。

VVR 8.0.11+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres 関連のパラメーター。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // HologresBinlogSource を構築します。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet(),
                new ArrayList<>()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 8.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres 関連のパラメーター。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // HologresBinlogSource を構築します。
        long startTimeMs = 0;
        HologresBinlogSource source = new HologresBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.INITIAL,
                "",
                "",
                -1,
                Collections.emptySet()
        );
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}

VVR 6.0.7+

public class Sample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .build();
         // Hologres 関連のパラメーター。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
        config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        // デフォルトのスロット名を設定または作成します。
        config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));

        boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
        // Binlog Record Converter を構築します。
        JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
                jdbcOptions.getTable(),
                schema,
                new HologresConnectionParam(config),
                cdcMode,
                Collections.emptySet());
        
        // HologresBinlogSource を構築します。
        long startTimeMs = 0;
        HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
                new HologresConnectionParam(config),
                schema,
                config,
                jdbcOptions,
                startTimeMs,
                StartupMode.TIMESTAMP,
                recordConverter,
                "",
                -1);
        env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
        env.execute();
    }
}
重要

VVR バージョン 8.0.5 以前または Hologres バージョン 2.1 以前の場合、スーパーユーザーであるか、Replication Role 権限を持っているかを確認する必要があります。詳細については、「Hologres の権限の問題」をご参照ください。

非 Binlog ソーステーブル

VVR は、Hologres テーブルデータを読み取るための RichInputFormat の HologresBulkreadInputFormat 実装クラスを提供します。次のコードは、テーブルデータを読み取るための Hologres ソースを構築する方法の例です。

public class Sample {
    public static void main(String[] args) throws Exception {
        // Java DataStream API を設定します
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .field("c", DataTypes.TIMESTAMP())
                .build();
        // Hologres 関連のパラメーター。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        // JDBCOptions を構築します。
        JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
        HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
                new HologresConnectionParam(config),
                jdbcOptions,
                schema,
                "",
                -1);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
        env.execute();
    }
}

Maven 依存関係

Hologres DataStream コネクタは Maven Central リポジトリで入手できます。

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>${vvr-version}</version>
</dependency>

Hologres 結果テーブル

VVR は、データを書き込むための OutputFormatSinkFunction の HologresSinkFunction 実装クラスを提供します。次のコードは、Hologres シンクを構築する方法の例です。

public class Sample {
    public static void main(String[] args) throws Exception {
        // Java DataStream API を設定します
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
        TableSchema schema = TableSchema.builder()
                .field("a", DataTypes.INT())
                .field("b", DataTypes.STRING())
                .build();
        // Hologres 関連のパラメーター。
        Configuration config = new Configuration();
        config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
        config.setString(HologresConfigs.USERNAME, "yourUserName");
        config.setString(HologresConfigs.PASSWORD, "yourPassword");
        config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
        config.setString(HologresConfigs.TABLE, "yourTableName");
        config.setString(HologresConfigs.SDK_MODE, "jdbc");
        HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
        
         // RowData 形式でデータを書き込むための Hologres ライターを構築します。
        AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
                hologresConnectionParam, 
                schema, 
                HologresTableSchema.get(hologresConnectionParam), 
                new Integer[0]);
        // Hologres シンクを構築します。
        HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
        TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
        env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
        env.execute();
    }
}

メタデータ列

VVR 8.0.11 以降のバイナリログソーステーブルはメタデータ列をサポートしています。このバージョン以降、hg_binlog_event_type などのバイナリログフィールドをメタデータ列として宣言することをお勧めします。メタデータ列は SQL 標準の拡張です。メタデータ列を使用して、データベース名、テーブル名、データ変更タイプ、ソーステーブルの生成時間などの特定の情報にアクセスできます。この情報に基づいて、DELETE 変更タイプのデータをフィルタリングするなど、処理ロジックをカスタマイズできます。

フィールド名

フィールドタイプ

説明

db_name

STRING NOT NULL

行を含むデータベースの名前。

table_name

STRING NOT NULL

行を含むテーブルの名前。

hg_binlog_lsn

BIGINT NOT NULL

バイナリログシーケンス番号を示すバイナリログシステムフィールド。この番号はシャード内で単調に増加しますが、連続していません。異なるシャード間での一意性と順序は保証されません。

hg_binlog_timestamp_us

BIGINT NOT NULL

データベース内のこのレコードの変更のタイムスタンプ (マイクロ秒 (us))。

hg_binlog_event_type

BIGINT NOT NULL

このレコードの変更タイプ。有効値:

  • 5: INSERT メッセージを示します。

  • 2: DELETE メッセージを示します。

  • 3: UPDATE_BEFORE メッセージを示します。

  • 7: UPDATE_AFTER メッセージを示します。

hg_shard_id

INT NOT NULL

データが配置されているデータシャード。シャードの基本的な概念の詳細については、「テーブルグループとシャード」をご参照ください。

DDL 文では、<meta_column_name> <datatype> METADATA VIRTUAL を使用してメタデータ列を宣言できます。次のコードに例を示します:

CREATE TABLE test_message_src_binlog_table(
  hg_binlog_lsn bigint METADATA VIRTUAL
  hg_binlog_event_type bigint METADATA VIRTUAL
  hg_binlog_timestamp_us bigint METADATA VIRTUAL
  hg_shard_id int METADATA VIRTUAL
  db_name string METADATA VIRTUAL
  table_name string METADATA VIRTUAL
  id INTEGER,
  title VARCHAR,
  body VARCHAR
) WITH (
  'connector'='hologres',
  ...
  );

よくある質問

参考資料