このトピックでは、Hologres コネクタについて説明します。
背景情報
Hologres は、リアルタイムデータウェアハウス向けの統合エンジンです。大規模なデータセットに対するリアルタイムでの書き込み、更新、分析をサポートします。標準 SQL 構文を採用し、PostgreSQL プロトコルと互換性があります。ペタバイト規模の OLAP およびアドホック分析に対応し、高同時実行性・低遅延のオンラインデータサービスを提供します。MaxCompute、Realtime Compute for Apache Flink、DataWorks と深く統合されており、オフラインおよびリアルタイムのエンドツーエンドなデータウェアハウスソリューションを実現します。以下の表に、Hologres コネクタの機能をまとめます。
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブル、ディメンションテーブル、結果テーブル |
実行モード | ストリームモードおよびバッチモード |
データフォーマット | 非対応 |
監視メトリック | |
API 種別 | DataStream および SQL |
結果テーブルにおける更新および削除の対応 | はい |
特徴
特徴 | 詳細 |
バイナリログ(binlog)の有無にかかわらず、Hologres データを読み取ることができます。この機能は、変更データキャプチャ(CDC)モードおよび非 CDC モードの両方と互換性があります。 | |
フル取り込み、増分取り込み、またはフルおよび増分の統合取り込みを実行できます。 | |
新規データを無視する、既存の行全体を置き換える、または特定のフィールドのみを更新するのいずれかを選択できます。 | |
変更されたカラムのみを更新でき、行全体を更新する必要はありません。 | |
物理パーティションテーブルから binlog を読み取ることができます。単一のジョブで、新しく追加されたパーティションを含むすべてのパーティションを監視できます。また、論理パーティションテーブルからの binlog 読み取りも可能です。 | |
パーティションテーブルの親テーブルにデータを書き込むと、対応する子パーティションが自動的に作成されます。 | |
単一テーブルまたはデータベース全体のリアルタイム同期 | 単一テーブルまたはデータベース全体レベルでのリアルタイム同期を実行できます。この機能には、以下の能力が含まれます:
詳細については、「CREATE TABLE AS (CTAS) 文」および「リアルタイムデータベース同期のクイックスタート」をご参照ください。 |
制限事項および推奨事項
制限事項
外部テーブルは非対応: Hologres コネクタは、Hologres 外部テーブル(例: MaxCompute 外部テーブル)へのアクセスをサポートしていません。
タイム型の制限: TIMESTAMP 型データのリアルタイム消費はサポートされていません。テーブル作成時に TIMESTAMPTZ 型を使用してください。
ソーステーブルスキャン(VVR 8 以前): Hologres からのデータ読み取りはデフォルトでバッチモードで実行されます。つまり、増分データは消費されません。
ウォーターマークの制限(VVR 8 以前): CDC モードでは、ウォーターマークの定義はサポートされていません。ウィンドウ集約を実行する場合は、非ウィンドウ集約ソリューションをご利用ください。
推奨事項
ストレージモードの選択
ポイントルックアップ用のディメンションテーブル: 行指向ストレージの使用を推奨します。この場合、プライマリキーおよびクラスタリングキーの両方を設定する必要があります。
1 対多クエリ用のディメンションテーブル: 列指向ストレージの使用を推奨し、パフォーマンス最適化のために分散キーおよびセグメントキーを構成してください。
分析クエリ用に頻繁に更新されるテーブル: リアルタイム binlog 消費および OLAP 分析の両方をサポートする必要がある場合、行列ハイブリッドストレージの使用を推奨します。
重要Hologres テーブル作成時にストレージ形式を明示的に指定しない場合、デフォルトで列指向ストレージが適用されます。このストレージ形式は不変であり、作成後に変更することはできません。詳細については、「Hologres におけるテーブルの作成」および「テーブルストレージ形式: 列指向、行指向、行列ハイブリッド」をご参照ください。
ジョブの並列度: Flink ジョブの並列度を、Hologres テーブルのシャード数に合わせて設定できます。
# このコマンドを HoloWeb で実行して、テーブルのシャード数を確認します。 select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';バージョンおよび機能: Hologres コネクタ リリースノートを定期的に確認し、既知の問題、機能更新、およびバージョン互換性情報をご確認ください。
重要なお知らせ
Hologres および VVR のバージョン互換性および制限事項
ソーステーブル
VVR 8 以前:
sdkModeパラメーターで消費モードを設定します。VVR 11+:
source.binlog.read-modeパラメーターで消費モードを設定します。
VVR バージョン
Hologres バージョン
デフォルト/推奨パラメーター値
実際の消費モード
備考
≥ 6.0.7
< 2.0
カスタム
holohub(デフォルト)
JDBC の構成を推奨します。
6.0.7–8.0.4
≥ 2.0
jdbc(自動スイッチオーバー)
jdbc(強制)
Hologres 2.0 以降では HoloHub が非推奨となりました。holohub を設定した場合、コネクタは自動的に JDBC へフォールバックしますが、その際には追加のユーザー権限が必要になる場合があります。権限の構成については、「権限に関する問題」をご参照ください。
≥ 8.0.5
≥ 2.1
jdbc(自動スイッチオーバー)
jdbc(強制)
権限に関する問題は発生しません。Hologres 2.1.27 以降では、コネクタは jdbc_fixed へ切り替わります。
≥ 11.1
任意のバージョン
AUTO(デフォルト)
Hologres バージョンに基づいて自動選択
Hologres 2.1.27 以降では、コネクタはデフォルトで JDBC を選択し、軽量接続を有効化します(connection.fixed.enabled が true に設定されます)。
バージョン 2.1.0–2.1.26 では、JDBC モードを選択できます。
Hologres 2.0 以前では、コネクタは holohub を選択します。
重要VVR 11.1 以降では、コネクタはデフォルトで binlog 消費を有効化します。エラーを回避するため、binlog の有効化を事前に実施してください。
シンクテーブル
VVR 8 以前のバージョンでは、
sdkModeパラメーターを指定して消費モードを選択します。VVR 11+:
sink.write-modeパラメーターでデータ書き込みモードを設定します。
VVR バージョン
Hologres バージョン
RPC への影響
実際の RPC 消費モード
推奨/デフォルトパラメーター値
備考
6.0.4–8.0.2
< 2.0
なし
rpc
カスタム
/
6.0.4–8.0.2
≥ 2.0
はい
jdbc_fixed(自動スイッチオーバー)
カスタム
重複排除を防止するため、'jdbcWriteBatchSize'='1' を設定してください。
≥ 8.0.3
任意のバージョン
はい
jdbc_fixed(自動スイッチオーバー)
カスタム
rpc を設定した場合、コネクタは自動的に jdbc_fixed へ切り替わり、重複排除を防止するために 'jdbcWriteBatchSize'='1' が設定されます。
≥ 8.0.5
任意のバージョン
はい
jdbc_fixed(自動スイッチオーバー)
カスタム
rpc を設定した場合、コネクタは自動的に jdbc_fixed へ切り替わり、重複排除を防止するために 'deduplication.enabled'='false' が設定されます。
重要RPC は Hologres 2.0 以降で非推奨となりました。データ書き込みモードを rpc に設定した場合、コネクタは自動的に jdbc_fixed へ切り替わります。ただし、他の値を設定した場合は、その値が優先されます。
VVR 11.1 以降では、RPC はサポートされません。接続には JDBC を推奨します。
高同時実行書き込みシナリオでは、
jdbc_copyまたはCOPY_STREAMをご利用ください。
ディメンションテーブル
VVR バージョン
Hologres バージョン
RPC への影響
実際の RPC 消費モード
推奨/デフォルトパラメーター値
備考
6.0.4–8.0.2
< 2.0
なし
rpc
カスタム
/
6.0.4–8.0.2
≥ 2.0
はい
jdbc_fixed(自動スイッチオーバー)
カスタム
Hologres インスタンスのバージョンが 2.0 以降の場合、RPC は非推奨であるため、コネクタは自動的に jdbc_fixed へ切り替わります。ただし、他の値を設定した場合は、その値が優先されます。
≥ 8.0.3
任意のバージョン
はい
jdbc_fixed(自動スイッチオーバー)
カスタム
≥ 8.0.5
任意のバージョン
はい
jdbc_fixed(自動スイッチオーバー)
カスタム
重要VVR 11.1 以降では、RPC はサポートされず、デフォルトで JDBC が使用されます。軽量接続を有効化するには、
connection.fixed.enabledを設定してください。JDBC モードの binlog ソーステーブルは JSONB データの読み取りをサポートしますが、データベースレベルで GUC パラメーターを有効化する必要があります。
-- データベースレベルで GUC パラメーターを有効化します。このコマンドはスーパーユーザーのみ実行可能です。各データベースにつき一度だけ設定すれば十分です。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;UPDATE 操作は、2 つの連続した binlog レコードを生成します。最初に古いデータ(update_before)が記録され、その後に新しいデータ(update_after)が記録されます。
binlog が有効化されたソーステーブルの TRUNCATE または再作成は避けてください。詳細については、「よくある質問」をご参照ください。
エラーを回避するため、Flink と Hologres 間で
DECIMALの精度を一貫して維持してください。詳細については、「よくある質問」をご参照ください。フルおよび増分の統合取り込みに INITIAL モードを使用する場合、グローバルな順序保証はされません。下流アプリケーションがタイムスタンプに基づく計算に依存する場合は、代わりに純粋な binlog 読み取りモードをご利用ください。
binlog の有効化
テーブルが未作成の場合
リアルタイムデータ消費はデフォルトで無効化されています。HoloWeb で Hologres テーブルを作成する際に、binlog.level および binlog.ttl パラメーターを設定します。サンプルコードを以下に示します:
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');-- 行指向テーブル test_table を作成します。
call set_table_property('test_table', 'clustering_key', 'id');-- id カラムにクラスタリングキーを作成します。
call set_table_property('test_table', 'binlog.level', 'replica');-- binlog を有効化します。
call set_table_property('test_table', 'binlog.ttl', '86400');-- binlog の TTL(秒単位)を設定します。
commit;既存のテーブル
HoloWeb で、以下のステートメントを使用して既存のテーブルの binlog を有効化し、binlog TTL を設定します。table_name を実際のテーブル名に置き換えてください。
-- binlog を有効化します。
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- binlog TTL(秒単位)を設定します。
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;WITH パラメーター
VVR 11 以降、Hologres のコネクタオプションが更新され、サポートが強化されました。一部のオプションは名称変更または削除されている場合があります。VVR 11 は VVR 8 と下位互換性を維持しています。詳細については、お使いの VVR バージョンに対応するパラメーターのドキュメントをご参照ください。
データ型マッピング
「Flink と Hologres 間のデータ型マッピング」をご参照ください。
Hologres では、GENERATED ALWAYS AS 構文を使用して生成カラムを定義できます。例:
ds TIMESTAMP NOT NULL GENERATED ALWAYS AS (date_trunc('month', create_time)) STORED生成カラムの NOT NULL 制約は、NULL 許容フィールドとしてマップされます。これは想定される動作です。Hologres は生成カラムを自動的に計算するため、Flink はそれらの値を渡しません。そのため、NOT NULL 制約を保持すると、HologresClient の書き込み検証が失敗します。通常のカラムの NOT NULL 制約は影響を受けません(例: ds TIMESTAMP NOT NULL)。
サンプル
ソーステーブル
binlog が有効化されたソーステーブル
CDC モード
このモードでは、テーブルデータのミラー同期が可能になります。ソースはバイナリログデータを消費し、hg_binlog_event_type に基づいて、INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER のような正しい Flink RowKind 型を各行に自動的に割り当てます。この処理は、MySQL や PostgreSQL の CDC 機能と同様に、変更をミラー化します。以下のサンプルコードは、このモードでソーステーブルを作成する DDL ステートメントを示しています。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', -- INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての changeLog 型を読み取ります。
'retry-count'='10', -- binlog 読み取りエラー発生時のリトライ回数。
'retry-sleep-step-ms'='5000', -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
'source.binlog.batch-size'='512' -- binlog から一度に読み取る行数。
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', -- binlog 読み取りエラー発生時のリトライ回数。
'binlogRetryIntervalMs' = '500', -- binlog 読み取りエラー発生後のリトライ間隔(ミリ秒単位)。
'binlogBatchReadSize' = '100' -- binlog から一度に読み取る行数。
);非 CDC モード
このモードでは、ソースが消費する binlog データは、通常の Flink データとして子孫ノードに渡されます。つまり、すべてのデータは INSERT 型となります。必要に応じて、hg_binlog_event_type の特定の型をどのように処理するかを決定できます。以下のサンプルコードは、ソーステーブルの DDL ステートメントを示しています。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', -- すべての changelog 型を INSERT として扱います。
'retry-count'='10', -- binlog 読み取りエラー発生時のリトライ回数。
'retry-sleep-step-ms'='5000', -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
'source.binlog.batch-size'='512' -- binlog から一度に読み取る行数。
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', -- binlog 読み取りエラー発生時のリトライ回数。
'binlogRetryIntervalMs' = '500', -- binlog 読み取りエラー発生後のリトライ間隔(ミリ秒単位)。
'binlogBatchReadSize' = '100' -- binlog から一度に読み取る行数。
);binlog が無効化されたソーステーブル
VVR 11+
デフォルトでは、Ververica Runtime(VVR)11.1 以降のバージョンでは、バイナリログデータが消費されます。詳細については、「binlog ソーステーブル」をご参照ください。
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' -- binlog データを読み取らないようにします。
);VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);シンクテーブル
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;ディメンションテーブルのサンプル
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', -- AK/SK の漏洩を防ぐため、変数の使用を推奨します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;機能の詳細
フルおよび増分の統合取り込み
適用シーン
この機能は、プライマリキーを持つターゲットテーブルにのみ適用されます。CDC モードの Hologres ソーステーブルとの併用を推奨します。
Hologres では、必要に応じて binlog をオンデマンドで有効化できます。履歴データを含むテーブルに対しては、binlog の有効化を行ってください。
サンプルコード
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', -- 最初にすべての履歴データを読み取り、その後増分読み取りを開始します。
'retry-count'='10', -- binlog 読み取りエラー発生時のリトライ回数。
'retry-sleep-step-ms'='5000', -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
'source.binlog.batch-size'='512' -- binlog から一度に読み取る行数。
);source.binlog.startup-modeをINITIALに設定すると、まずすべての履歴データを読み取り、その後増分読み取りを開始します。startTimeパラメーターを設定するか、起動インターフェイスで開始時刻を選択した場合、binlogStartUpModeは強制的にtimestamp消費モードを使用し、他の消費モードは無視されます。startTimeパラメーターの優先度が高いためです。
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', -- 最初にすべての履歴データを読み取り、その後増分読み取りを開始します。
'binlogMaxRetryTimes' = '10', -- binlog 読み取りエラー発生時のリトライ回数。
'binlogRetryIntervalMs' = '500', -- binlog 読み取りエラー発生後のリトライ間隔(ミリ秒単位)。
'binlogBatchReadSize' = '100' -- binlog から一度に読み取る行数。
);binlogStartUpModeをinitialに設定すると、まずすべての履歴データを読み取り、その後増分読み取りを開始します。startTimeはbinlogStartUpModeより優先されます。そのため、startTimeパラメーターを設定するか、起動インターフェイスで開始時刻を選択した場合、binlogStartUpModeは自動的にtimestampモードを使用し、他の消費モードは有効になりません。
プライマリキーの競合処理
Hologres への書き込み時に、重複するプライマリキーを持つデータが存在する場合、コネクタは 3 つの処理戦略を提供します。
VVR 11+
sink.on-conflict-action オプションを指定することで、異なる戦略を実装できます。
sink.on-conflict-action の値 | 説明 |
INSERT_OR_IGNORE | データの最初の出現を保持し、後続の重複を無視します。 |
INSERT_OR_REPLACE | 既存の行を新しいデータで置き換えます。 |
INSERT_OR_UPDATE(デフォルト) | 指定されたカラムのみを更新し、既存の行の他のカラムは変更しません。 |
VVR 8+
mutatetype オプションを指定することで、異なる戦略を実装できます。
mutatetype の値 | 説明 |
insertorignore(デフォルト) | データの最初の出現を保持し、後続の重複を無視します。 |
insertorreplace | 既存の行を新しいデータで置き換えます。 |
insertorupdate | 指定されたカラムのみを更新し、既存の行の他のカラムは変更しません。 |
テーブルに a、b、c、d というフィールドがあり、a がプライマリキーであると仮定します。結果テーブルに a および b のみのフィールドが含まれている場合、INSERT_OR_UPDATE を設定すると、b のみが更新され、c および d は変更されません。結果テーブルのカラム数は、物理的な Hologres テーブルより少なくても構いませんが、不足しているカラムはすべて NULL 許容である必要があります。そうでない場合、書き込み操作は失敗します。
パーティションテーブルへの書き込み
デフォルトでは、Hologres 結果テーブルは非パーティションテーブルへのデータインポートのみをサポートします。パーティションテーブルへのデータインポートを行うには、以下の構成を有効化する必要があります:
VVR 11+
sink.create-missing-partition を true に設定します。これにより、存在しない子パーティションをコネクタが自動的に作成できます。
VVR 11.1 以降では、パーティションテーブルへの書き込みがサポートされ、対応する子パーティションへのデータの自動ルーティングが可能です。
tablenameには親テーブルの名前を指定します。子テーブルを事前に作成せず、
sink.create-missing-partition=trueを設定しない場合、書き込みは失敗します。
VVR 8+
partitionRouterをtrueに設定すると、対応する子パーティションへのデータの自動ルーティングが可能になります。createparttableをtrueに設定すると、存在しない子パーティションを自動的に作成できます。
tablenameには親テーブルの名前を指定します。書き込みを確実に行うには、子パーティションを事前に作成するか、
createparttable=trueを設定してください。
マルチストリーム書き込みを伴うワイドテーブルへのマージおよび部分更新
コネクタは、複数のデータストリームを単一の Hologres ワイドテーブルに効率的にマージできます。プライマリキーに基づき、変更されたカラムのみを更新する部分行更新をサポートしており、書き込み効率を最適化し、データ整合性を確保します。
制限事項
ワイドテーブルにはプライマリキーが必要です。
各データストリームには、すべてのプライマリキーカラムが含まれている必要があります。
Hologres ワイドテーブルが列指向ストレージを使用している場合、高い RPS により CPU 使用率が上昇する可能性があります。この問題を緩和するには、テーブルのカラムに対する辞書エンコーディングを無効化することを検討してください。
サンプル
2 つのデータストリームがあると仮定します。1 つ目のストリームには a、b、c のカラムが含まれ、2 つ目のストリームには a、d、e のカラムが含まれます。Hologres ワイドテーブル WIDE_TABLE には a、b、c、d、e のカラムが含まれ、a がプライマリキーです。
VVR 11+
// source1 および source2 はすでに定義済みと仮定します。
CREATE TEMPORARY TABLE hologres_sink ( -- a、b、c、d、e の 5 つのカラムを宣言します。
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- a、b、c、d、e を含む Hologres ワイドテーブル。
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- プライマリキーに基づき、指定されたカラムのみを更新します。
'sink.delete-strategy'='IGNORE_DELETE', -- 再送信メッセージによる削除要求の処理戦略。挿入または更新のみを必要とするシナリオには IGNORE_DELETE が適しています。
'sink.partial-insert.enabled'='true' -- 部分カラム更新を有効化します。これにより、INSERT ステートメントで指定されたカラムのみを更新または挿入できます。
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- a、b、c のみを挿入します。
INSERT INTO hologres_sink(a,d,e) select * from source2; -- a、d、e のみを挿入します。
END;VVR 8+
// source1 および source2 はすでに定義済みと仮定します。
CREATE TEMPORARY TABLE hologres_sink ( -- a、b、c、d、e の 5 つのカラムを宣言します。
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- a、b、c、d、e を含む Hologres ワイドテーブル。
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- プライマリキーに基づき、指定されたカラムのみを更新します。
'ignoredelete'='true', -- 再送信メッセージによって生成された削除要求を無視します。
'partial-insert.enabled'='true' -- 部分カラム更新を有効化し、INSERT ステートメントで宣言されたカラムのみの更新をサポートします。
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- a、b、c のみを挿入します。
INSERT INTO hologres_sink(a,d,e) select * from source2; -- a、d、e のみを挿入します。
END;ignoredelete を true に設定すると、再送信メッセージによって生成された削除要求が無視されます。VVR 8.0.8 以降では、sink.delete-strategy を使用して、部分更新のさまざまな戦略を構成することを推奨します。
パーティションテーブルからの binlog 読み取り(ベータ版)
パーティションテーブルは、データアーカイブおよびクエリパフォーマンスを向上させます。Hologres コネクタは、物理パーティションテーブルおよび論理パーティションテーブルの両方からの binlog 読み取りをサポートします。詳細については、「論理パーティションテーブルの作成」をご参照ください。
物理パーティションテーブルからのバイナリログの消費
Hologres コネクタは、パーティションテーブルからの binlog 読み取りをサポートし、単一のジョブ内で新規パーティションを動的に監視できます。これにより、リアルタイムデータ処理の効率性および利便性が大幅に向上します。
注意事項
パーティションテーブルからの binlog 読み取りには、VVR 8.0.11+、Hologres 2.1.27+、binlog が有効化されたテーブル、および JDBC 消費が必要です。
パーティション名は、厳密に親テーブル名、アンダースコア(_)、パーティション値で構成される必要があります。すなわち、
{parent_table}_{partition_value}の形式です。この形式に従わないパーティションは消費できません。詳細については、「動的パーティション管理」をご参照ください。重要DYNAMICモードでは、VVR 8.0.11 は-区切りのパーティションフィールド(例:YYYY-MM-DD)をサポートしていません。VVR 11.1+ では、カスタム名形式のパーティションからの読み取りを完全にサポートしています。
この制限は読み取りにのみ適用され、パーティションテーブルへの書き込みには影響しません。
Flink で Hologres ソーステーブルを宣言する場合、Hologres パーティションテーブルのパーティションカラムを含める必要があります。
DYNAMIC モードでは、パーティションテーブルに動的パーティション管理が有効化されていることを確認してください。また、パーティションの事前作成パラメーター
auto_partitioning.num_precreateは 1 より大きい値である必要があります。そうでない場合、最新のパーティションを読み取る際にジョブが例外をスローします。DYNAMIC パーティション binlog 消費モードでは、新規パーティションが追加された後、古いパーティションからの増分データは読み取られません。
サンプル
パターンタイプ | 機能 | 説明 |
DYNAMIC | 動的パーティション読み取り | 新規パーティションを自動的に監視し、時系列順に読み取り進行状況を動的に進めます。リアルタイムユースケースに適しています。 |
STATIC | 静的パーティション消費 | 既存のパーティション(または明示的に指定されたパーティション)のみを読み取り、新規パーティションを自動的に検出しません。固定範囲内の履歴データ処理に適しています。 |
DYNAMIC モード
VVR 11+
Hologres に、以下の DDL で定義されたパーティションテーブルが存在し、バイナリログおよび動的パーティションが有効化されていると仮定します。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 動的パーティションを有効化します。
auto_partitioning_time_unit = 'DAY', -- パーティションは日単位で作成されます。例: test_message_src1_20250512、test_message_src1_20250513。
auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);
-- 既存のパーティションテーブルに対しても、ALTER TABLE を使用して動的パーティションを有効化できます。Flink では、以下の SQL ステートメントを使用して、test_message_src1 パーティションテーブルを DYNAMIC モードで消費します。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Hologres パーティションテーブルのパーティションカラム。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 動的パーティションが有効化された Hologres テーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 最新のパーティションを動的に監視します。
'source.binlog.startup-mode' = 'initial' -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
);VVR 8.0.11
Hologres に、以下の DDL で定義されたパーティションテーブルが存在し、バイナリログおよび動的パーティションが有効化されていると仮定します。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 動的パーティションを有効化します。
auto_partitioning_time_unit = 'DAY', -- パーティションは日単位で作成されます。例: test_message_src1_20241027、test_message_src1_20241028。
auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);
-- 既存のパーティションテーブルに対しても、ALTER TABLE を使用して動的パーティションを有効化できます。Flink では、以下の SQL ステートメントを使用して、test_message_src1 パーティションテーブルを DYNAMIC モードで消費します。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Hologres パーティションテーブルのパーティションカラム。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 動的パーティションが有効化された Hologres テーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- 最新のパーティションを動的に監視します。
'binlogstartUpMode' = 'initial', -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
'sdkMode' = 'jdbc_fixed' -- 接続数制限の問題を回避するためにこのモードを使用します。
);STATIC モード
VVR 11+
Hologres に、以下の Data Definition Language(DDL)で定義されたパーティションテーブルが存在し、バイナリログが有効化されていると仮定します。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Flink では、以下の SQL ステートメントを使用して、test_message_src2 パーティションテーブルを STATIC モードで消費します。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Hologres パーティションテーブルのパーティションカラム。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- パーティションテーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- 固定のパーティションセットを読み取ります。
'source.binlog.partition-values-to-read' = 'red,blue,green', -- red、blue、green の 3 つのパーティションのみを読み取ります。black パーティションは読み取られません。新規パーティションも読み取られません。設定しない場合、親テーブルのすべてのパーティションが読み取られます。
'source.binlog.startup-mode' = 'initial' -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
);VVR 8.0.11
Hologres に、以下の DDL で定義されたパーティションテーブルが存在し、バイナリログが有効化されていると仮定します。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Flink では、以下の SQL ステートメントを使用して、test_message_src2 パーティションテーブルを STATIC モードで読み取ります。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Hologres パーティションテーブルのパーティションカラム。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- パーティションテーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- 固定のパーティションセットを読み取ります。
'partition-values-to-read' = 'red,blue,green', -- red、blue、green の 3 つのパーティションのみを読み取ります。black パーティションは読み取られません。新規パーティションも読み取られません。設定しない場合、親テーブルのすべてのパーティションが読み取られます。
'binlogstartUpMode' = 'initial', -- まずすべての既存データを読み取り、その後増分読み取りを開始します。
'sdkMode' = 'jdbc_fixed' -- 接続数制限の問題を回避するためにこのモードを使用します。
);論理パーティションテーブルからのバイナリログの消費
Hologres コネクタは、論理パーティションテーブルからの binlog 読み取りをサポートし、明示的に読み取るパーティションを指定できます。
注意事項
特定のパーティションからの binlog 読み取りには、VVR 11.0.0+ および Hologres V3.1+ が必要です。
すべてのパーティションからの binlog 読み取りは、非パーティション Hologres テーブルからの binlog 読み取りと同等です。手順については、「ソーステーブル」をご参照ください。
サンプル
パラメーター名 | 説明 | サンプル |
source.binlog.logical-partition-filter-column-names | パーティションカラム名を二重引用符で囲みます。複数のカラム名はカンマで区切ります。カラム名に二重引用符が含まれる場合は、別の二重引用符でエスケープします。 | 'source.binlog.logical-partition-filter-column-names'='"Pt","id"' パーティションカラムは Pt および id です。 |
source.binlog.logical-partition-filter-column-values | パーティションカラムの値。各パーティションはカラム値のセットで指定されます。異なるカラムの値はカンマで区切り、二重引用符で囲みます。値に二重引用符が含まれる場合は、別の二重引用符でエスケープします。複数のパーティションはセミコロンで区切ります。 | 'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' 2 つのパーティションの消費を指定します。パーティションキーには 2 つのカラムがあります。最初のパーティションキー値は (20240910, 0)、2 番目のパーティションキー値は (special"value, 9) です。 |
Hologres に以下のテーブルが作成されていると仮定します:
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);このテーブルのバイナリログを Flink で消費します。
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', -- INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての changeLog 型を読み取ります。
'retry-count'='10', -- binlog 読み取りエラー発生時のリトライ回数。
'retry-sleep-step-ms'='5000', -- リトライ間の増分バックオフ時間。初回リトライは 5 秒待機、2 回目は 10 秒待機、といった具合です。
'source.binlog.batch-size'='512' -- binlog から一度に読み取る行数。
);DataStream API
DataStream API を使用するには、プロジェクトに Hologres DataStream コネクタの依存関係を含める必要があります。DataStream コネクタの統合および使用方法については、「DataStream プログラムにおけるコネクタの統合および使用」をご参照ください。Hologres DataStream コネクタ は、Maven Central Repository で入手できます。ローカルデバッグには、対応する Uber JAR を使用してください。詳細については、「コネクタを含むジョブのローカル実行およびデバッグ」をご参照ください。
Hologres ソーステーブル
binlog が有効化されたソーステーブル
Realtime Compute for Apache Flink では、HologresBinlogSource クラスを使用して Hologres binlog データを読み取ることができます。以下の例は、HologresBinlogSource の作成方法を示しています。
VVR 11.3+
VVR 11.1.2 以降では、JDBCOptions および startTimeMs パラメーターが HologresBinlogSource コンストラクターから削除されました。VVR 11.3 以降では、List<Subscribe.BinlogFilter> パラメーターが追加されました。VVR 11 以降をご利用の場合は、VVR 11.3 以降の使用を推奨します。
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// 読み取るテーブルの名前。
String sourceTableName = "sourceTableName";
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, sourceTableName);
config.set(HologresConfigs.BINLOG, true);
config.set(HologresConfigs.BINLOG_CHANGE_LOG_MODE, BinlogChangeLogMode.ALL);
// HologresBinlogSource を構築します。
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
StartupMode.INITIAL,
sourceTableName,
"",
Collections.emptyList(),
-1,
Collections.emptySet(),
Collections.emptyList()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// HologresBinlogSource を構築します。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// HologresBinlogSource を構築します。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// 既定のスロット名を設定または作成します。
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// JDBCBinlogRecordConverter を構築します。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// HologresBinlogSource を構築します。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}Realtime Compute for Apache Flink エンジンのバージョンが 8.0.5 より古いか、Hologres のバージョンが V2.1 より古い場合、ユーザーがスーパーユーザーであるか、Replication Role 権限を持っていることを確認してください。詳細については、「Hologres の権限に関する問題」をご参照ください。
binlog が無効化されたソーステーブル
Realtime Compute for Apache Flink では、HologresBulkreadInputFormat クラス(RichInputFormat の実装)を使用して、Hologres テーブルからデータを読み取ることができます。以下の例は、binlog が無効化された Hologres テーブルからデータを読み取る Hologres ソースの構築方法を示しています。
public class Sample {
public static void main(String[] args) throws Exception {
// Java DataStream API を設定します。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}Maven 依存関係
Hologres DataStream コネクタ は、Maven Central Repository で入手できます。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>Hologres 結果テーブル
VVR 11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 書き込むテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema tableSchema = TableSchema.builder()
.field("a", DataTypes.INT().notNull())
.field("b", DataTypes.STRING())
.primaryKey("a")
.build();
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.set(HologresConfigs.ENDPOINT, "yourEndpoint");
config.set(HologresConfigs.USERNAME, "yourUserName");
config.set(HologresConfigs.PASSWORD, "yourPassword");
config.set(HologresConfigs.DATABASE, "yourDatabaseName");
config.set(HologresConfigs.TABLE, "yourTableName");
HologresConnectionParam connectionParam = new HologresConnectionParam(config);
HologresTableSchema hologresTableSchema =
HologresTableSchema.get(connectionParam.getJDBCOptions());
// 結果テーブルに書き込むカラムのインデックス。
Integer[] targetColumnIndexes = {0, 1};
// Hologres 結果テーブルを構築します。
HologresSinkFunction sinkFunction =
new HologresSinkFunction(
connectionParam, tableSchema, targetColumnIndexes, hologresTableSchema);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(tableSchema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}VVR 8+
public class Sample {
public static void main(String[] args) throws Exception {
// Java DataStream API を設定します。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 書き込むテーブルのスキーマを初期化します。これは Hologres テーブルスキーマのカラムと一致する必要があります。サブセットのカラムを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres コネクタオプション。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// RowData としてデータを書き込む Hologres ライターを構築します。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// Hologres 結果テーブルを構築します。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}メタデータカラム
Realtime Compute for Apache Flink VVR 8.0.11 以降では、binlog が有効化されたソーステーブルのメタデータカラムがサポートされています。このバージョン以降では、hg_binlog_event_type などの binlog フィールドをメタデータカラムとして宣言することを推奨します。メタデータカラム は SQL 標準を拡張したもので、ソースのデータベース名およびテーブル名、データの変更タイプおよびタイムスタンプなど、特定の情報を取得できます。これらの情報を使用して、DELETE イベントを除外するなどのカスタム処理ロジックを定義できます。
フィールド名 | データ型 | 説明 |
db_name | STRING NOT NULL | 行を含むデータベースの名前。 |
table_name | STRING NOT NULL | 行を含むテーブルの名前。 |
hg_binlog_lsn | BIGINT NOT NULL | binlog のシステムカラムで、binlog シーケンス番号を表します。シャード内では単調増加ですが、連続していません。異なるシャード間では一意性や順序性は保証されません。 |
hg_binlog_timestamp_us | BIGINT NOT NULL | データベース内の変更イベントのタイムスタンプ(マイクロ秒単位)。 |
hg_binlog_event_type | BIGINT NOT NULL | 行の CDC イベントタイプ。有効な値:
|
hg_shard_id | INT NOT NULL | データが存在するシャード。詳細については、「テーブルグループおよびシャード」をご参照ください。 |
DDL ステートメントでは、<meta_column_name> <datatype> METADATA VIRTUAL を使用してメタデータカラムを宣言します。以下の例を参照してください:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);よくある質問
関連ドキュメント
Hologres カタログの管理に関する詳細については、「Hologres カタログの管理」をご参照ください。
Hologres と Flink を使用した統合リアルタイムデータウェアハウスの構築に関する詳細については、「Flink と Hologres を使用したリアルタイムデータウェアハウスの構築」をご参照ください。
Hologres はデータ更新および修正を効率的にサポートし、マルチストリーム書き込みシナリオにおけるワイドテーブルの構築に適しています。詳細については、「MongoDB+Hologres ユーザー行動分析」をご参照ください。