このトピックでは、Hologres コネクタの使用方法について説明します。
背景
Hologres は、大量のデータのリアルタイム書き込み、更新、分析をサポートするワンストップのリアルタイムデータウェアハウスエンジンです。標準 SQL をサポートし、PostgreSQL プロトコルと互換性があります。Hologres は、多次元オンライン分析処理 (OLAP)、ペタバイト規模のデータのアドホック分析、高い同時実行性と低レイテンシーでのオンラインデータサービングもサポートしています。Hologres は MaxCompute、Flink、DataWorks と深く統合されており、オフラインとオンラインの両方のデータウェアハウスに統合ソリューションを提供します。Hologres コネクタは、次の機能をサポートしています。
カテゴリ | 詳細 |
サポートされるタイプ | ソース、ディメンション、および結果テーブル |
実行モード | ストリームモードとバッチモード |
データフォーマット | 非対応 |
特定のモニタリングメトリック | |
API タイプ | DataStream と SQL |
結果テーブルでのデータ更新または削除のサポート | はい |
特徴
特徴 | 詳細 |
バイナリログの有無にかかわらず Hologres データを読み取ることができます。この機能は、変更データキャプチャ (CDC) モードと非 CDC モードの両方と互換性があります。 | |
フル、増分、または 統合されたフルおよび増分 の消費を実行できます。 | |
新しいデータを無視する、行全体を置き換える、または特定のフィールドのみを更新することができます。 | |
行全体ではなく、変更された列のみを更新できます。 | |
物理パーティションテーブルからバイナリログをコンシュームできます。1 つのジョブで、新しく追加されたパーティションを含め、すべてのパーティションをモニターできます。論理パーティションテーブルからバイナリログをコンシュームすることもできます。 | |
パーティションテーブルの親テーブルにデータを書き込み、対応する子パーティションを自動的に作成することができます。 | |
単一テーブルまたはデータベース全体のリアルタイム同期 | 単一テーブルまたはデータベース全体のレベルでのデータのリアルタイム同期を実行できます。この機能は、次の機能を提供します。
詳細については、「CREATE TABLE AS (CTAS) 文」および「リアルタイムデータベース同期のクイックスタート」をご参照ください。 |
制限事項と提案
制限事項
外部テーブルはサポートされていません: Hologres コネクタは、MaxCompute 外部テーブルなどの Hologres 外部テーブルへのアクセスをサポートしていません。
時間型の制限: TIMESTAMP データのリアルタイム消費はサポートされていません。テーブルを作成するときは TIMESTAMPTZ 型を使用する必要があります。
ソーステーブルのスキャンモード (Ververica Runtime (VVR) 8 以前): デフォルトでは、データはバッチモードで読み取られます。テーブル全体は一度しかスキャンされず、新しいデータは消費されません。
ウォーターマークの制限 (VVR 8 以前): CDC モードはウォーターマークの定義をサポートしていません。ウィンドウ集約の場合、非ウィンドウ集約ソリューションを使用する必要があります。
提案
ストレージモードの選択:
ディメンションテーブルでのポイントクエリ: 行指向ストレージを使用します。プライマリキーとクラスタリングキーを設定する必要があります。
ディメンションテーブルでの 1 対多クエリ: 列指向ストレージを使用します。分散キーとセグメントキー (イベント時間列) を構成してパフォーマンスを最適化できます。
高頻度の更新と分析クエリのためのテーブル: テーブルがバイナリログ消費と OLAP 分析の両方をリアルタイムでサポートする必要がある場合、行列表ハイブリッドストレージモードを使用します。
重要Hologres でテーブルを作成するときにモードを指定しない場合、デフォルトのストレージモードは列指向です。テーブルが作成された後、ストレージモードは変更できません。詳細については、「Hologres でテーブルを作成する」および「テーブルストレージ形式: 列指向、行指向、行列表ハイブリッド」をご参照ください。
ジョブの同時実行性の設定: Flink ジョブの同時実行性を Hologres テーブルのシャード数と同じに設定できます。
# Hologres コンソールで、次のステートメントを実行してテーブルのシャード数を表示します。tablename をテーブルの名前に置き換えます。 select tg.property_value from hologres.hg_table_properties tb join hologres.hg_table_group_properties tg on tb.property_value = tg.tablegroup_name where tb.property_key = 'table_group' and tg.property_key = 'shard_count' and table_name = '<tablename>';バージョンと特徴: 既知のバグ、機能の更新、バージョンの互換性情報については、Hologres コネクタのリリースノートを定期的に確認することをお勧めします。
注意
Hologres と VVR のバージョンの互換性と制限事項
ソーステーブル
VVR 8 以前の場合、
sdkModeパラメーターを指定して消費モードを選択できます。VVR 11 以降の場合、
source.binlog.read-modeパラメーターを指定して消費モードを選択できます。
VVR バージョン
Hologres バージョン
デフォルト/推奨パラメーター値
実際の消費モード
注意
6.0.7 以降
< 2.0
カスタム
holohub (デフォルト)
jdbc を設定します。
6.0.7 から 8.0.4
2.0 以上
jdbc (自動スイッチオーバー、構成不要)
jdbc (強制)
Hologres 2.0 以降では holohub サービスが公開されていないため、モードは自動的に jdbc に切り替わります。これにより、権限の問題が発生する可能性があります。詳細については、「権限の問題」をご参照ください。
8.0.5 以降
2.1 以上
jdbc (自動スイッチオーバー、構成不要)
jdbc (強制)
権限の問題はありません。Hologres 2.1.27 以降では、モードは jdbc_fixed に切り替えられます。
11.1 以上
任意のバージョン
AUTO (デフォルト)
Hologres のバージョンに基づいて自動的に選択されます
バージョン 2.1.27 以降では、jdbc モードが選択され、デフォルトで軽量接続が有効になります。これは、connection.fixed.enabled パラメーターがデフォルトで true に設定されていることを意味します。
バージョン 2.1.0 から 2.1.26 までは、jdbc モードが選択されます。
バージョン 2.0 以前では、holohub モードが選択されます。
重要VVR 11.1 以降では、デフォルトでバイナリログデータが消費されます。バイナリログが有効になっていることを確認してください。そうしないと、エラーが発生する可能性があります。
シンクテーブル
VVR 8 以前の場合、
sdkModeパラメーターを指定して消費モードを選択できます。VVR 11 以降の場合、
sink.write-modeパラメーターを指定して消費モードを選択できます。
VVR バージョン
Hologres バージョン
rpc モードは影響を受けますか?
実際の rpc 消費モード
推奨/デフォルトのパラメーター値
備考
6.0.4 から 8.0.2
< 2.0
いいえ
rpc
カスタム
/
6.0.4 ~ 8.0.2
≥ 2.0
はい
jdbc_fixed (自動スイッチオーバー)
カスタム
'jdbcWriteBatchSize' を '1' に設定して、重複排除を防ぐことができます。
8.0.3 以降
任意のバージョン
はい
jdbc_fixed (自動スイッチオーバー)
カスタム
rpc モードを設定すると、重複排除を防ぐために、パラメーターの値が自動的に jdbc_fixed に切り替わり、'jdbcWriteBatchSize' が '1' に設定されます。
8.0.5 以降
すべてのバージョン
はい
jdbc_fixed (自動スイッチオーバー)
カスタム
rpc モードを設定すると、パラメーター値は自動的に jdbc_fixed に切り替えられ、重複排除を防ぐために 'deduplication.enabled' は 'false' に設定されます。
重要rpc サービスは Hologres 2.0 以降では公開されていません。このパラメーターを rpc に設定すると、Flink システムは自動的にパラメーター値を jdbc_fixed に切り替えます。ただし、別の値に設定した場合、Flink システムは構成した値を使用します。
rpc モードは VVR 11.1 以降では削除されています。接続には jdbc モードを使用することをお勧めします。
高い同時実行性シナリオでの書き込みには、
jdbc_copy/COPY_STREAMモードを使用できます。
ディメンションテーブル
VVR バージョン
Hologres バージョン
rpc モードは影響を受けますか?
実際の rpc 消費モード
推奨/デフォルトのパラメーター値
注意事項
6.0.4 ~ 8.0.2
< 2.0
いいえ
rpc
カスタム
/
6.0.4 から 8.0.2
≥ 2.0
はい
jdbc_fixed (自動スイッチオーバー)
カスタム
Hologres インスタンスがバージョン 2.0 以降の場合、rpc サービスは公開されていません。このパラメーターを rpc に設定すると、Flink システムは自動的にパラメーター値を jdbc_fixed に切り替えます。ただし、別の値に設定した場合、Flink システムは構成した値を使用します。
8.0.3 以降
任意バージョン
はい
jdbc_fixed (自動スイッチオーバー)
カスタム
8.0.5 以降
任意のバージョン
はい
jdbc_fixed (自動スイッチオーバー)
カスタム
重要rpc モードは VVR 11.1 以降では削除されています。接続にはデフォルトで jdbc モードが使用されます。必要に応じて
connection.fixed.enabledパラメーターを有効にして、軽量接続モードを使用できます。JDBC モードのバイナリログソーステーブルは JSONB 型の読み取りをサポートしています。データベースレベルで Grand Unified Configuration (GUC) パラメーターを有効にする必要があります。
-- データベースレベルで GUC パラメーターを有効にします。スーパーユーザーのみがこのステートメントを実行できます。このパラメーターは、各データベースに対して一度だけ設定する必要があります。 alter database <db_name> set hg_experimental_enable_binlog_jsonb = on;UPDATE 操作は、2 つの連続したバイナリログレコードを生成します。古いデータのレコード (update_before) の後に、新しいデータのレコード (update_after) が続きます。
バイナリログソーステーブルで TRUNCATE やその他のテーブル再作成操作を実行しないでください。詳細については、「よくある質問」をご参照ください。
エラーを防ぐために、
DECIMAL型の精度が Flink と Hologres で同じであることを確認してください。詳細については、「よくある質問」をご参照ください。ソーステーブルデータのフルの消費と増分の消費の統合に INITIAL モードを使用する場合、グローバルな順序は保証されません。ダウンストリームアプリケーションが計算のために時間フィールドに依存している場合は、別の純粋なバイナリログ消費モードを使用する必要があります。
バイナリロギングの有効化
テーブルが作成されていない場合
リアルタイム消費機能はデフォルトで無効になっています。Hologres コンソールでテーブルを作成する場合、DDL 文で binlog.level および binlog.ttl パラメーターを設定する必要があります。次のコードに例を示します。
begin;
create table test_table(
id int primary key,
title text not null,
body text);
call set_table_property('test_table', 'orientation', 'row');--test_table という名前の行指向テーブルを作成します。
call set_table_property('test_table', 'clustering_key', 'id');--id 列にクラスタリングキーを作成します。
call set_table_property('test_table', 'binlog.level', 'replica');--テーブルプロパティを設定してバイナリロギングを有効にします。
call set_table_property('test_table', 'binlog.ttl', '86400');--binlog.ttl はバイナリログの TTL を秒単位で指定します。
commit;既存のテーブルの場合
Hologres コンソールで、次のステートメントを実行して既存のテーブルのバイナリロギングを有効にし、バイナリログの TTL を設定できます。table_name は、バイナリロギングを有効にするテーブルの名前を指定します。
-- テーブルプロパティを設定してバイナリロギングを有効にします。
begin;
call set_table_property('<table_name>', 'binlog.level', 'replica');
commit;
-- テーブルプロパティを設定してバイナリログの TTL を秒単位で構成します。
begin;
call set_table_property('<table_name>', 'binlog.ttl', '2592000');
commit;コネクタオプション
VVR 11 以降、Hologres をより適切にサポートするために WITH パラメーターが調整されています。一部のパラメーターは名前が変更されたり、削除されたりする場合があります。VVR 11 は VVR 8 と下位互換性があります。お使いの VVR バージョンに対応するパラメーターの説明を見つけてください。
型マッピング
Flink と Hologres 間のデータ型マッピングの詳細については、「Flink と Hologres 間のデータ型マッピング」をご参照ください。
例
ソーステーブルの例
Binlog ソーステーブル
CDC モード
このモードでは、ソースによって消費されるバイナリログデータについて、hg_binlog_event_type に基づいて各行の Flink RowKind 型が自動的かつ正確に設定されます。型を明示的に宣言する必要はありません。例としては、INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER があります。これにより、MySQL や Postgres の CDC 機能と同様に、テーブルのミラー化されたデータ同期が可能になります。次のコードは、ソーステーブルの DDL 文の例です。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password'='${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL', --INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての ChangeLog タイプを読み取ります。
'retry-count'='10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'retry-sleep-step-ms'='5000', --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
'source.binlog.batch-size'='512' --各バッチでバイナリログから読み取るデータ行数。
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'sdkMode'='jdbc',
'binlogMaxRetryTimes' = '10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'binlogRetryIntervalMs' = '500', --バイナリログの読み取りエラーが発生した後のリトライ間隔。
'binlogBatchReadSize' = '100' --各バッチでバイナリログから読み取るデータ行数。
);非 CDC モード
このモードでは、ソースによって消費されるバイナリログデータは、通常の Flink データとして子孫ノードに渡されます。つまり、すべてのデータは INSERT 型になります。必要に応じて、特定の hg_binlog_event_type 型のデータをどのように処理するかを選択できます。次のコードは、ソーステーブルの DDL 文の例です。
VVR 11+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog.change-log-mode'='ALL_AS_APPEND_ONLY', --すべての ChangeLog タイプは INSERT として処理されます。
'retry-count'='10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'retry-sleep-step-ms'='5000', --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
'source.binlog.batch-size'='512' --各バッチでバイナリログから読み取るデータ行数。
);VVR 8+
CREATE TEMPORARY TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'binlogMaxRetryTimes' = '10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'binlogRetryIntervalMs' = '500', --バイナリログの読み取りエラーが発生した後のリトライ間隔。
'binlogBatchReadSize' = '100' --各バッチでバイナリログから読み取るデータ行数。
);非 Binlog ソーステーブル
VVR 11+
VVR 11.1 以降の場合、バイナリログデータはデフォルトで消費されます。詳細については、「Binlog ソーステーブル」をご参照ください。
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'source.binlog'='false' --バイナリログデータを消費するかどうかを指定します。
);VVR 8+
CREATE TEMPORARY TABLE hologres_source (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sdkMode' = 'jdbc'
);結果テーブルの例
CREATE TEMPORARY TABLE datagen_source(
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='datagen'
);
CREATE TEMPORARY TABLE hologres_sink (
name varchar,
age BIGINT,
birthday BIGINT
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
INSERT INTO hologres_sink SELECT * from datagen_source;ディメンションテーブルの例
CREATE TEMPORARY TABLE datagen_source (
a INT,
b BIGINT,
c STRING,
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen'
);
CREATE TEMPORARY TABLE hologres_dim (
a INT,
b VARCHAR,
c VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username' = '${secret_values.ak_id}', --キーの漏洩を防ぐために、AccessKey ペアには変数管理を使用します。
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>'
);
CREATE TEMPORARY TABLE blackhole_sink (
a INT,
b STRING
) WITH (
'connector' = 'blackhole'
);
INSERT INTO blackhole_sink SELECT T.a,H.b
FROM datagen_source AS T JOIN hologres_dim FOR SYSTEM_TIME AS OF T.proctime AS H ON T.a = H.a;詳細な機能説明
フルの消費と増分の消費の統合
シナリオ
この機能は、結果テーブルにプライマリキーがあるシナリオにのみ適用されます。CDC モードでフルおよび増分の Hologres ソーステーブルを使用することをお勧めします。
Hologres は、オンデマンドでバイナリロギングを有効にすることをサポートしています。既存データを含むテーブルに対してバイナリロギングを有効にすることができます。
コード例
VVR 11+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog.startup-mode' = 'INITIAL', --最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
'retry-count'='10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'retry-sleep-step-ms'='5000', --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
'source.binlog.batch-size'='512' --各バッチでバイナリログから読み取るデータ行数。
);source.binlog.startup-modeをINITIALに設定して、最初にすべてのデータを消費し、次に増分消費のためにバイナリログを読み取ることができます。startTimeパラメーターを設定するか、起動ページで開始時刻を選択すると、binlogStartUpModeは強制的にtimestampになります。startTimeパラメーターの優先度が高いため、他の消費モードは有効になりません。
VVR 8+
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn BIGINT,
hg_binlog_event_type BIGINT,
hg_binlog_timestamp_us BIGINT,
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourTablename>',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'binlog' = 'true',
'cdcMode' = 'true',
'binlogStartUpMode' = 'initial', --最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
'binlogMaxRetryTimes' = '10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'binlogRetryIntervalMs' = '500', --バイナリログの読み取りエラーが発生した後のリトライ間隔。
'binlogBatchReadSize' = '100' --各バッチでバイナリログから読み取るデータ行数。
);binlogStartUpModeをinitialに設定して、最初にすべてのデータを消費し、次に増分消費のためにバイナリログを読み取ることができます。startTimeパラメーターを設定するか、起動ページで開始時刻を選択すると、binlogStartUpModeは強制的にtimestampになります。startTimeパラメーターの優先度が高いため、他の消費モードは有効になりません。
プライマリキーの競合処理ポリシー
コネクタは、Hologres にデータを書き込む際に、重複するプライマリキーを持つデータを処理するための 3 つのポリシーを提供します。
VVR 11+
sink.on-conflict-action パラメーターを指定して、さまざまな処理ポリシーを実装できます。
sink.on-conflict-action の値 | 意味 |
INSERT_OR_IGNORE | データの最初の出現を保持し、後続の重複は無視します。 |
INSERT_OR_REPLACE | 既存のデータを後続のデータで置き換えます。 |
INSERT_OR_UPDATE (デフォルト) | シンクで提供されるフィールドのみを更新します。その他のフィールドは変更されません。 |
VVR 8+
mutatetype パラメーターを指定して、さまざまな処理ポリシーを実装できます。
mutatetype の値 | 意味 |
insertorignore (デフォルト) | データの最初の発生を保持し、後続の重複は無視します。 |
挿入または置き換え | 既存のデータを後続のデータで置き換えます。 |
insertorupdate | シンクで提供されるフィールドのみを更新します。その他のフィールドは変更されません。 |
テーブルにフィールド a、b、c、d があり、a がプライマリキーであるとします。結果テーブルがフィールド a と b のみを提供し、ポリシーを INSERT_OR_UPDATE に設定した場合、フィールド b のみが更新され、c と d は変更されません。結果テーブルのフィールド数は、Hologres 物理テーブルのフィールド数より少なくすることができます。ただし、欠落しているフィールドは null 値を許可する必要があります。そうしないと、書き込み操作は失敗します。
パーティションテーブルへの書き込み
デフォルトでは、Hologres シンクは単一テーブルへのデータインポートのみをサポートします。パーティションテーブルの親テーブルにデータをインポートするには、次の構成を有効にする必要があります:
VVR 11+
sink.create-missing-partition を true に設定できます。子パーティションが作成されていない場合、自動的に作成されます。
VVR 11.1 以降は、デフォルトでパーティションテーブルへの書き込みをサポートし、データを対応する子パーティションに自動的にルーティングします。
tablenameパラメーターを親テーブルの名前に設定する必要があります。子テーブルが事前に作成されておらず、
sink.create-missing-partition=trueが設定されていない場合、書き込み操作は失敗します。
VVR 8+
partitionRouterをtrueに設定して、データを対応する子パーティションに自動的にルーティングできます。createparttableをtrueに設定できます。子パーティションが作成されていない場合、自動的に作成されます。
tablenameパラメーターを親テーブルの名前に設定する必要があります。子テーブルが事前に作成されておらず、
createparttable=trueが設定されていない場合、書き込み操作は失敗します。
複数ストリーム書き込みのためのワイドテーブルのマージと部分的な更新
同じ Hologres ワイドテーブルに複数のデータストリームを書き込む場合、システムは同じプライマリキーを持つデータを自動的にマージすることをサポートします。行全体を置き換える代わりに、変更された列のみを更新することを選択できます。これにより、書き込み効率とデータ整合性が向上します。
制限事項
ワイドテーブルにはプライマリキーが必要です。
各データストリームのデータには、完全なプライマリキーフィールドが含まれている必要があります。
1 秒あたりのリクエスト数 (RPS) が多い列指向ワイドテーブルがマージされるシナリオでは、CPU 使用率が高くなる可能性があります。テーブル内のフィールドの Dictionary Encoding 機能を無効にすることをお勧めします。
例
2 つの Flink データストリームがあるとします。1 つはフィールド a、b、c を含み、もう 1 つはフィールド a、d、e を含みます。Hologres ワイドテーブル WIDE_TABLE にはフィールド a、b、c、d、e が含まれ、a がプライマリキーです。
VVR 11+
// source1 と source2 はすでに定義されています。
CREATE TEMPORARY TABLE hologres_sink ( -- 5 つのフィールドを宣言します: a、b、c、d、e。
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- 5 つのフィールドを含む Hologres ワイドテーブル: a、b、c、d、e。
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'sink.on-conflict-action'='INSERT_OR_UPDATE', -- プライマリキーに基づいてデータの一部の列を更新します。
'sink.delete-strategy'='IGNORE_DELETE', -- リトラクションメッセージの処理ポリシー。IGNORE_DELETE は、データの挿入または更新のみが必要で、削除は不要なシナリオに適しています。
'sink.partial-insert.enabled'='true' -- 部分列更新パラメーターを有効にします。INSERT 文で定義されたフィールドがコネクタにプッシュダウンされるため、宣言されたフィールドのみが更新または挿入できます。
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- 3 つのフィールドのみが挿入されることを宣言します: a、b、c。
INSERT INTO hologres_sink(a,d,e) select * from source2; -- 3 つのフィールドのみが挿入されることを宣言します: a、d、e。
END;VVR 8+
// source1 と source2 はすでに定義されています。
CREATE TEMPORARY TABLE hologres_sink ( -- 5 つのフィールドを宣言します: a、b、c、d、e。
a BIGINT,
b STRING,
c STRING,
d STRING,
e STRING,
primary key(a) not enforced
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='<yourWideTablename>', -- 5 つのフィールドを含む Hologres ワイドテーブル: a、b、c、d、e。
'username' = '${secret_values.ak_id}',
'password' = '${secret_values.ak_secret}',
'endpoint'='<yourEndpoint>',
'mutatetype'='insertorupdate', -- プライマリキーに基づいてデータの一部の列を更新します。
'ignoredelete'='true', -- リトラクションメッセージによって生成された Delete リクエストを無視します。
'partial-insert.enabled'='true' -- 部分列更新パラメーターを有効にして、INSERT 文で宣言されたフィールドのみの更新をサポートします。
);
BEGIN STATEMENT SET;
INSERT INTO hologres_sink(a,b,c) select * from source1; -- 3 つのフィールドのみが挿入されることを宣言します: a、b、c。
INSERT INTO hologres_sink(a,d,e) select * from source2; -- 3 つのフィールドのみが挿入されることを宣言します: a、d、e。
END;ignoredelete を true に設定して、リトラクションメッセージによって生成される Delete リクエストを無視できます。VVR 8.0.8 以降では、sink.delete-strategy パラメーターを使用して部分更新のさまざまなポリシーを構成することをお勧めします。
パーティションテーブルのバイナリログの消費 (パブリックプレビュー)
パーティションテーブルは、データアーカイブとクエリの最適化に役立ちます。Hologres コネクタは、物理および論理パーティションテーブルからのバイナリログの消費をサポートしています。物理および論理パーティションテーブルの違いの詳細については、「CREATE LOGICAL PARTITION TABLE」をご参照ください。
物理パーティションテーブルのバイナリログの消費
Hologres コネクタは、単一のジョブでパーティションテーブルのバイナリログの消費をサポートし、新しいパーティションを動的にリッスンできます。これにより、リアルタイムデータ処理の効率と使いやすさが大幅に向上します。
注意
VVR 8.0.11 以降、Hologres インスタンス V2.1.27 以降、および JDBC モードのバイナリログソーステーブルのみが、パーティションテーブルのバイナリログの消費をサポートしています。
パーティション名は、親テーブル名、アンダースコア、パーティション値で厳密に構成する必要があり、フォーマットは
{parent_table}_{partition_value}です。このフォーマットに従わないパーティションは消費されない可能性があります。詳細については、「動的パーティション管理」をご参照ください。重要DYNAMIC モードの場合、VVR 8.0.11 は YYYY-MM-DD のような
-区切り文字を持つパーティションフィールドをサポートしていません。VVR 11.1 以降、カスタムフォーマットのパーティションフィールドを消費できます。
このフォーマット制限は、パーティションテーブルにデータを書き込む場合には適用されません。
Flink で Hologres ソーステーブルを宣言する場合、Hologres パーティションテーブルのパーティションフィールドを含める必要があります。
DYNAMIC モードの場合、パーティションテーブルには動的パーティション管理が有効になっている必要があります。パーティション事前作成パラメーター
auto_partitioning.num_precreateは 1 より大きい必要があります。そうしないと、ジョブが最新のパーティションを消費しようとすると例外がスローされます。DYNAMIC モードでは、新しいパーティションが追加された後、古いパーティションの以降のデータ更新は読み取られなくなります。
例
モードタイプ | 機能 | シナリオの説明 |
ダイナミック | 動的パーティション消費 | 新しいパーティションを自動的に検出し、消費の進捗を時系列順に動的に進めます。リアルタイムデータストリームシナリオに適しています。 |
STATIC | 静的パーティション消費 | 既存のパーティションまたは手動で指定されたパーティションのみを読み取ります。新しいパーティションを自動的に検出しません。固定範囲内の既存データを処理するのに適しています。 |
DYNAMIC モード
VVR 11+
Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングと動的パーティショニングが有効になっているとします。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 動的パーティショニングを有効にします。
auto_partitioning_time_unit = 'DAY', -- 時間単位として DAY を使用します。パーティション名の例: test_message_src1_20250512, test_message_src1_20250513。
auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);
-- 既存のパーティションテーブルの場合、ALTER TABLE を使用して動的パーティショニングを有効にすることもできます。Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src1 の DYNAMIC モード消費を宣言できます。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 動的パーティショニングが有効になっている親テーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'DYNAMIC', -- 最新のパーティションを動的にリッスンします。
'source.binlog.startup-mode' = 'initial' -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
);VVR 8.0.11
Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングと動的パーティショニングが有効になっているとします。
CREATE TABLE "test_message_src1" (
id int,
title text,
body text,
dt text,
PRIMARY KEY (id, dt)
)
PARTITION BY LIST (dt) WITH (
binlog_level = 'replica',
auto_partitioning_enable = 'true', -- 動的パーティショニングを有効にします。
auto_partitioning_time_unit = 'DAY', -- 時間単位として DAY を使用します。パーティション名の例: test_message_src1_20241027, test_message_src1_20241028。
auto_partitioning_num_precreate = '2' -- 2 つのパーティションを事前に作成します。
);
-- 既存のパーティションテーブルの場合、ALTER TABLE を使用して動的パーティショニングを有効にすることもできます。Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src1 の DYNAMIC モード消費を宣言できます。
CREATE TEMPORary TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
dt VARCHAR -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src1', -- 動的パーティショニングが有効になっている親テーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'DYNAMIC', -- 最新のパーティションを動的にリッスンします。
'binlogstartUpMode' = 'initial', -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
'sdkMode' = 'jdbc_fixed' -- 接続制限を回避するためにこのモードを使用します。
);STATIC モード
VVR 11+
Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングが有効になっているとします。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src2 の STATIC モード消費を宣言できます。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- パーティションテーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'source.binlog.partition-binlog-mode' = 'STATIC', -- 固定パーティションを消費します。
'source.binlog.partition-values-to-read' = 'red,blue,green', -- 構成された 3 つのパーティションのみを消費します。「black」パーティションは消費されません。新しいパーティションは消費されません。設定しない場合、親テーブルのすべてのパーティションが消費されます。
'source.binlog.startup-mode' = 'initial' -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
);VVR 8.0.11
Hologres に次の DDL パーティションテーブルが存在し、バイナリロギングが有効になっているとします。
CREATE TABLE test_message_src2 (
id int,
title text,
body text,
color text,
PRIMARY KEY (id, color)
)
PARTITION BY LIST (color) WITH (
binlog_level = 'replica'
);
create table test_message_src2_red partition of test_message_src2 for values in ('red');
create table test_message_src2_blue partition of test_message_src2 for values in ('blue');
create table test_message_src2_green partition of test_message_src2 for values in ('green');
create table test_message_src2_black partition of test_message_src2 for values in ('black');
Flink では、次の SQL 文を使用して、パーティションテーブル test_message_src2 の STATIC モード消費を宣言できます。
CREATE TEMPORARY TABLE hologres_source
(
id INTEGER,
title VARCHAR,
body VARCHAR,
color VARCHAR -- Hologres パーティションテーブルのパーティションフィールド。
)
with (
'connector' = 'hologres',
'dbname' = '<yourDatabase>',
'tablename' = 'test_message_src2', -- パーティションテーブル。
'username' = '<yourUserName>',
'password' = '<yourPassword>',
'endpoint' = '<yourEndpoint>',
'binlog' = 'true',
'partition-binlog.mode' = 'STATIC', -- 固定パーティションを消費します。
'partition-values-to-read' = 'red,blue,green', -- 構成された 3 つのパーティションのみを消費します。「black」パーティションは消費されません。新しいパーティションは消費されません。設定しない場合、親テーブルのすべてのパーティションが消費されます。
'binlogstartUpMode' = 'initial', -- 最初にすべての既存データを読み取り、次にバイナリログを増分的に消費します。
'sdkMode' = 'jdbc_fixed' -- 接続制限を回避するためにこのモードを使用します。
);論理パーティションテーブルのバイナリログの消費
Hologres コネクタは、論理パーティションテーブルのバイナリログの消費をサポートし、パラメーターを介して指定されたパーティションを消費できます。
注意
VVR 11.0.0 以降および Hologres インスタンス V3.1 以降のみが、論理パーティションテーブル内の指定されたパーティションのバイナリログの消費をサポートしています。
論理パーティションテーブル内のすべてのパーティションのバイナリログを消費することは、通常の Hologres テーブルのバイナログを消費することと同じです。詳細については、「ソーステーブルの例」をご参照ください。
例
パラメーター名 | 説明 | 例 |
source.binlog.logical-partition-filter-column-names | 論理パーティションテーブルからバイナリログが消費される、指定されたパーティションのパーティションキー列の名前。パーティションキー列の名前は、二重引用符 (") で囲む必要があります。複数のパーティションキー列は、コンマ (,) で区切ります。列名に二重引用符が含まれる場合、別の二重引用符でエスケープします。 | 'source.binlog.logical-partition-filter-column-names'='"Pt","id"' パーティションキー列は Pt と id の 2 つです。 |
source.binlog.logical-partition-filter-column-values | 論理パーティションテーブルからバイナリログを消費する、指定されたパーティションのパーティションキー列の値。各パーティションは、複数のパーティションキー列の値によって指定できます。パーティションキー列の値は、コンマ (,) で区切ります。値はダブルクォーテーションマーク (") で囲みます。値にダブルクォーテーションマークが含まれる場合は、もう 1 つのダブルクォーテーションマークでエスケープします。複数のパーティションはセミコロン (;) で区切ります。 | 'source.binlog.logical-partition-filter-column-values'='"20240910","0";"special""value","9"' 2 つのパーティションが消費されることを指定します。パーティションキー列は 2 つあります。最初のパーティションの値は (20240910, 0) で、2 番目のパーティションの値は (special"value, 9) です。 |
Hologres で次のテーブルが作成されているとします。
CREATE TABLE holo_table (
id int not null,
name text,
age numeric(18,4),
"Pt" text,
primary key(id, "Pt")
)
LOGICAL PARTITION BY LIST ("Pt", id)
WITH (
binlog_level ='replica'
);Flink でこのテーブルのバイナリログを消費できます。
CREATE TEMPORARY TABLE test_src_binlog_table(
id INTEGER,
name VARCHAR,
age decimal(18,4),
`Pt` VARCHAR
) WITH (
'connector'='hologres',
'dbname'='<yourDbname>',
'tablename'='holo_table',
'username'='<yourAccessID>',
'password'='<yourAccessSecret>',
'endpoint'='<yourEndpoint>',
'source.binlog'='true',
'source.binlog.logical-partition-filter-column-names'='"Pt","id"',
'source.binlog.logical-partition-filter-column-values'='<yourPartitionColumnValues>',
'source.binlog.change-log-mode'='ALL', --INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての ChangeLog タイプを読み取ります。
'retry-count'='10', --バイナリログの読み取りエラーが発生した後のリトライ回数。
'retry-sleep-step-ms'='5000', --リトライの増分待機時間。最初のリトライは 5 秒、2 回目は 10 秒待機します。
'source.binlog.batch-size'='512' --各バッチでバイナリログから読み取るデータ行数。
);DataStream API
DataStream API を使用してデータを読み書きするには、対応する DataStream コネクタを使用して Realtime Compute for Apache Flink に接続する必要があります。DataStream コネクタの設定方法の詳細については、「DataStream コネクタの使用方法」をご参照ください。Hologres DataStream コネクタは Maven Central リポジトリで入手できます。ローカルデバッグの場合、対応する Uber JAR を使用する必要があります。詳細については、「コネクタを含むジョブをローカルで実行およびデバッグする」をご参照ください。
Hologres ソーステーブル
Binlog ソーステーブル
VVR は、Hologres バイナリログデータを読み取るための Source の HologresBinlogSource 実装クラスを提供します。次のコードは、Hologres バイナリログソースを構築する方法の例です。
VVR 8.0.11+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres 関連のパラメーター。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// HologresBinlogSource を構築します。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet(),
new ArrayList<>()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 8.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres 関連のパラメーター。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// HologresBinlogSource を構築します。
long startTimeMs = 0;
HologresBinlogSource source = new HologresBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.INITIAL,
"",
"",
-1,
Collections.emptySet()
);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR 6.0.7+
public class Sample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.build();
// Hologres 関連のパラメーター。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
config.setBoolean(HologresBinlogConfigs.OPTIONAL_BINLOG, true);
config.setBoolean(HologresBinlogConfigs.BINLOG_CDC_MODE, true);
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
// デフォルトのスロット名を設定または作成します。
config.setString(HologresBinlogConfigs.JDBC_BINLOG_SLOT_NAME, HoloBinlogUtil.getOrCreateDefaultSlotForJDBCBinlog(jdbcOptions));
boolean cdcMode = config.get(HologresBinlogConfigs.BINLOG_CDC_MODE) && config.get(HologresBinlogConfigs.OPTIONAL_BINLOG);
// Binlog Record Converter を構築します。
JDBCBinlogRecordConverter recordConverter = new JDBCBinlogRecordConverter(
jdbcOptions.getTable(),
schema,
new HologresConnectionParam(config),
cdcMode,
Collections.emptySet());
// HologresBinlogSource を構築します。
long startTimeMs = 0;
HologresJDBCBinlogSource source = new HologresJDBCBinlogSource(
new HologresConnectionParam(config),
schema,
config,
jdbcOptions,
startTimeMs,
StartupMode.TIMESTAMP,
recordConverter,
"",
-1);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Test source").print();
env.execute();
}
}VVR バージョン 8.0.5 以前または Hologres バージョン 2.1 以前の場合、スーパーユーザーであるか、Replication Role 権限を持っているかを確認する必要があります。詳細については、「Hologres の権限の問題」をご参照ください。
非 Binlog ソーステーブル
VVR は、Hologres テーブルデータを読み取るための RichInputFormat の HologresBulkreadInputFormat 実装クラスを提供します。次のコードは、テーブルデータを読み取るための Hologres ソースを構築する方法の例です。
public class Sample {
public static void main(String[] args) throws Exception {
// Java DataStream API を設定します
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.TIMESTAMP())
.build();
// Hologres 関連のパラメーター。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
// JDBCOptions を構築します。
JDBCOptions jdbcOptions = JDBCUtils.getJDBCOptions(config);
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
new HologresConnectionParam(config),
jdbcOptions,
schema,
"",
-1);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.addSource(new InputFormatSourceFunction<>(inputFormat, typeInfo)).returns(typeInfo).print();
env.execute();
}
}Maven 依存関係
Hologres DataStream コネクタは Maven Central リポジトリで入手できます。
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>${vvr-version}</version>
</dependency>Hologres 結果テーブル
VVR は、データを書き込むための OutputFormatSinkFunction の HologresSinkFunction 実装クラスを提供します。次のコードは、Hologres シンクを構築する方法の例です。
public class Sample {
public static void main(String[] args) throws Exception {
// Java DataStream API を設定します
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 読み取るテーブルのスキーマを初期化します。フィールドは Hologres テーブルスキーマのフィールドと一致する必要があります。フィールドのサブセットのみを定義できます。
TableSchema schema = TableSchema.builder()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.build();
// Hologres 関連のパラメーター。
Configuration config = new Configuration();
config.setString(HologresConfigs.ENDPOINT, "yourEndpoint");
config.setString(HologresConfigs.USERNAME, "yourUserName");
config.setString(HologresConfigs.PASSWORD, "yourPassword");
config.setString(HologresConfigs.DATABASE, "yourDatabaseName");
config.setString(HologresConfigs.TABLE, "yourTableName");
config.setString(HologresConfigs.SDK_MODE, "jdbc");
HologresConnectionParam hologresConnectionParam = new HologresConnectionParam(config);
// RowData 形式でデータを書き込むための Hologres ライターを構築します。
AbstractHologresWriter<RowData> hologresWriter = HologresJDBCWriter.createRowDataWriter(
hologresConnectionParam,
schema,
HologresTableSchema.get(hologresConnectionParam),
new Integer[0]);
// Hologres シンクを構築します。
HologresSinkFunction sinkFunction = new HologresSinkFunction(hologresConnectionParam, hologresWriter);
TypeInformation<RowData> typeInfo = InternalTypeInfo.of(schema.toRowDataType().getLogicalType());
env.fromElements((RowData) GenericRowData.of(101, StringData.fromString("name"))).returns(typeInfo).addSink(sinkFunction);
env.execute();
}
}メタデータ列
VVR 8.0.11 以降のバイナリログソーステーブルはメタデータ列をサポートしています。このバージョン以降、hg_binlog_event_type などのバイナリログフィールドをメタデータ列として宣言することをお勧めします。メタデータ列は SQL 標準の拡張です。メタデータ列を使用して、データベース名、テーブル名、データ変更タイプ、ソーステーブルの生成時間などの特定の情報にアクセスできます。この情報に基づいて、DELETE 変更タイプのデータをフィルタリングするなど、処理ロジックをカスタマイズできます。
フィールド名 | フィールドタイプ | 説明 |
db_name | STRING NOT NULL | 行を含むデータベースの名前。 |
table_name | STRING NOT NULL | 行を含むテーブルの名前。 |
hg_binlog_lsn | BIGINT NOT NULL | バイナリログシーケンス番号を示すバイナリログシステムフィールド。この番号はシャード内で単調に増加しますが、連続していません。異なるシャード間での一意性と順序は保証されません。 |
hg_binlog_timestamp_us | BIGINT NOT NULL | データベース内のこのレコードの変更のタイムスタンプ (マイクロ秒 (us))。 |
hg_binlog_event_type | BIGINT NOT NULL | このレコードの変更タイプ。有効値:
|
hg_shard_id | INT NOT NULL | データが配置されているデータシャード。シャードの基本的な概念の詳細については、「テーブルグループとシャード」をご参照ください。 |
DDL 文では、<meta_column_name> <datatype> METADATA VIRTUAL を使用してメタデータ列を宣言できます。次のコードに例を示します:
CREATE TABLE test_message_src_binlog_table(
hg_binlog_lsn bigint METADATA VIRTUAL
hg_binlog_event_type bigint METADATA VIRTUAL
hg_binlog_timestamp_us bigint METADATA VIRTUAL
hg_shard_id int METADATA VIRTUAL
db_name string METADATA VIRTUAL
table_name string METADATA VIRTUAL
id INTEGER,
title VARCHAR,
body VARCHAR
) WITH (
'connector'='hologres',
...
);よくある質問
参考資料
Hologres カタログの作成方法と使用方法の詳細については、「Hologres カタログの管理」をご参照ください。
Hologres は Flink と深く統合されており、統合されたリアルタイムデータウェアハウスソリューションを提供します。詳細については、「Hologres を使用してリアルタイムデータウェアハウスを構築する」をご参照ください。
Hologres はデータ更新と修正を効率的にサポートしているため、複数ストリーム書き込みシナリオでのワイドテーブルの構築に適しています。詳細については、「MongoDB と Hologres を使用したユーザー行動分析」をご参照ください。