すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:パラメーター構成 (VVR 11 以降)

最終更新日:Jan 23, 2026

このトピックでは、Ververica Runtime (VVR) 11 以降における Hologres コネクタの WITH パラメーターについて説明します。

バージョンパラメーターの削除

システムアーキテクチャーを最適化し、メンテナンス効率を向上させるため、Ververica Runtime (VVR) バージョン 8 以前の一部の従来のパラメーターが調整または削除されました。以下のリストは、削除された従来のパラメーターとそれに対応する代替パラメーターを示しています。

削除されたパラメーター

元のパラメーター

説明

代替/削除理由

jdbcRetrySleepInitMs

各リトライの固定待機時間。

retry-sleep-step-ms を使用して、増分待機時間を設定できます。

jdbcMetaAutoRefreshFactor

残りのキャッシュ時間がトリガー時間より短い場合、システムは自動的にキャッシュをリフレッシュします。

このパラメーターは不要になりました。meta-cache-ttl-ms パラメーターを構成して、キャッシュの TTL を設定できます。

type-mapping.timestamp-converting.legacy

Flink と Hologres の間で時間型を変換するかどうかを指定します。

このパラメーターは、TIMESTAMP_LTZ 型をサポートするための下位互換性のために導入されました。現在は不要です。

property-version

コネクタパラメーターのバージョン。

共通パラメーターのデフォルト値が最適化されたため、このパラメーターは削除されました。

field_delimiter

データをエクスポートする際の行間の区切り文字。

データ読み取りメソッドが最適化されたため、このパラメーターは削除されました。

jdbcBinlogSlotName

JDBC モードでのバイナリログソーステーブルのスロット名。

データ読み取りメソッドが最適化されたため、このパラメーターは削除されました。

binlogMaxRetryTimes

バイナリログデータの読み取り中にエラーが発生した場合のリトライ回数。

これは retry-count パラメーターを設定することで構成できます。

cdcMode

CDC モードを使用してバイナリログデータを読み取るかどうかを指定します。

CDC モードはデフォルトで使用されるため、このパラメーターは削除されました。非 CDC モードの場合は、source.binlog.change-log-mode パラメーターを使用します。

upsertSource

ソーステーブルが upsert 変更ログタイプを使用するかどうかを指定します。

構成には source.binlog.change-log-mode パラメーターを使用できます。

bulkload

書き込みにバルクロードを使用するかどうかを指定します。

sink.write-mode パラメーターを設定できます。

useRpcMode

RPC 経由で Hologres コネクタを使用するかどうかを指定します。

JDBC 接続の場合、sink.deduplication.enabled パラメーターを構成して重複排除を有効または無効にすることを推奨します。

partitionrouter

パーティションテーブルに書き込むかどうかを指定します。

パーティションテーブルへの書き込みはデフォルトでサポートされているため、このパラメーターは削除されました。

ignoredelete

リトラクションメッセージを無視するかどうかを指定します。

sink.delete-strategy パラメーターを構成して、取り消されたメッセージの処理ポリシーを指定します。

sdkMode

SDK モード。

このパラメーターは最適化されました。テーブルタイプに基づいて source.binlog.read-mode および sink.write-mode パラメーターを構成します。

jdbcReadBatchQueueSize

ディメンションテーブルのポイントクエリリクエスト用のバッファーキューのサイズ。

ポイントクエリのパフォーマンスが低い場合は、connection.pool.size パラメーターを構成できます。

jdbcReadRetryCount

ディメンションテーブルのポイントクエリがタイムアウトした場合のリトライ回数。

一般的なリトライメカニズムのすべての設定項目は、retry-count パラメーターに統合されました。

jdbcScanTransactionSessionTimeoutSeconds

スキャン操作のトランザクションのタイムアウト期間。

このパラメーターは、一般的なスキャンタイムアウトパラメーター source.scan.timeout-seconds にマージされました。

名前変更されたパラメーター

元のパラメーター (VVR 8)

VVR 11

説明

jdbcRetryCount

retry-count

接続に失敗した場合の書き込みおよびクエリのリトライ回数。

jdbcRetrySleepStepMs

retry-sleep-step-ms

各リトライの増分待機時間。

jdbcConnectionMaxIdleMs

connection.max-idle-ms

JDBC 接続のアイドルタイムアウト。

jdbcMetaCacheTTL

meta-cache-ttl-ms

キャッシュされた TableSchema 情報の生存時間 (TTL)。

binlog

source.binlog

バイナリログデータを消費するかどうかを指定します。

sdkMode

source.binlog.read-mode

読み取りモード。

binlogRetryIntervalMs

source.binlog.request-timeout-ms

バイナリログデータの読み取り中にエラーが発生した後の再試行間隔。

binlogBatchReadSize

source.binlog.batch-size

バイナリログデータのバッチ読み取りの行数。

binlogStartupMode

source.binlog.startup-mode

バイナリログデータの消費モード。

jdbcScanFetchSize

source.scan.fetch-size

スキャンのバッチサイズ。

jdbcScanTimeoutSeconds

source.scan.timeout-seconds

スキャン操作のタイムアウト。

enable_filter_push_down

source.scan.filter-push-down.enabled

完全データ読み取りフェーズでフィルターをプッシュダウンするかどうかを指定します。

partition-binlog.mode

source.binlog.partition-binlog-mode

パーティションテーブルのバイナリログの消費モード。

partition-binlog-lateness-timeout-minutes

source.binlog.partition-binlog-lateness-timeout-minutes

DYNAMIC モードでパーティションテーブルを消費する際の最大遅延タイムアウト。

partition-values-to-read

source.binlog.partition-values-to-read

STATIC モードで、消費するパーティションを指定します。パーティション値はカンマ (,) で区切ります。

sdkMode

sink.write-mode

書き込みモード。

mutatetype

sink.on-conflict-action

プライマリキーの競合を処理するためのポリシー。

createparttable

sink.create-missing-partition

パーティションテーブルへの書き込み時にパーティションが存在しない場合、自動的に作成するかどうかを指定します。

jdbcWriteBatchSize

sink.insert.batch-size

Hologres 結果テーブルに書き込む前にバッファリングするレコードの最大数。

jdbcWriteBatchByteSize

sink.insert.batch-byte-size

Hologres 結果テーブルに書き込む前にバッファリングするレコードの最大サイズ (バイト単位)。

jdbcWriteFlushInterval

sink.insert.flush-interval-ms

バッファリングされたデータが Hologres 結果テーブルから Hologres に書き込まれるまでの最大待機時間。

ignoreNullWhenUpdate

sink.ignore-null-when-update.enabled

mutatetype が 'insertOrUpdate' の場合、更新されたデータ内の null 値を無視するかどうかを指定します。

jdbcEnableDefaultForNotNullColumn

sink.default-for-not-null-column.enabled

デフォルト値のない NOT NULL 列に null 値が書き込まれた場合に、コネクタがデフォルト値を入力することを許可するかどうかを指定します。

remove-u0000-in-text.enabled

sink.remove-u0000-in-text.enabled

書き込み中にコネクタが無効な文字 \u0000 を文字列から削除することを許可するかどうかを指定します。

partial-insert.enabled

sink.partial-insert.enabled

INSERT 文で定義されたフィールドのみを挿入するかどうかを指定します。

deduplication.enabled

sink.deduplication.enabled

バッチ書き込み中に重複を削除するかどうかを指定します。

check-and-put.column

sink.insert.check-and-put.column

条件付き更新を有効にし、チェックするフィールドを指定します。

check-and-put.operator

sink.insert.check-and-put.operator

条件付き更新の比較演算子。

check-and-put.null-as

sink.insert.check-and-put.null-as

条件付き更新中、古いデータの null 値をこのパラメーターで指定された値として扱います。

aggressive.enabled

sink.aggressive-flush.enabled

アグレッシブコミットモードを有効にするかどうかを指定します。

connectionSize

connection.pool.size

単一の Flink ディメンションテーブルタスク用に作成される JDBC 接続プールのサイズ。

connectionPoolName

connection.pool.name

接続プールの名前。同じ TaskManager 内で同じ接続プール名を持つテーブルは、接続プールを共有できます。

jdbcReadBatchSize

lookup.read.batch-size

ディメンションテーブルのポイントクエリ用のバッチ内のレコードの最大数。

jdbcReadTimeoutMs

lookup.read.timeout-ms

ディメンションテーブルのポイントクエリのタイムアウト。

WITH パラメーター

一般

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

テーブルタイプ。

String

はい

なし

このパラメーターを hologres に設定します。

dbname

データベース名。

String

はい

なし

`dbname` パラメーターにサフィックスを追加することで、特定の計算グループに接続できます。たとえば、ディメンションテーブルを `read_warehouse` という名前の特定の計算グループに接続するには、接続を 'dbname' = 'db_test@read_warehouse' と指定します。

tablename

テーブル名。

String

はい

なし

スキーマが Public でない場合は、テーブル名を schema.tableName 形式で指定します。

username

  • BASIC$<user_name> 形式のカスタムアカウントのユーザー名。

  • Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。

String

はい

なし

  • ユーザーは、指定された Hologres データベースにアクセスするための権限を持っている必要があります。Hologres のデータベース権限とユーザー管理の詳細については、「権限モデル」および「ユーザーの管理」をご参照ください。

  • AccessKey ID の取得方法の詳細については、「AccessKey ペアの取得」をご参照ください。

重要

AccessKey 情報の漏洩を防ぐため、変数を使用して AccessKey の値を指定することを推奨します。詳細については、「プロジェクト変数」をご参照ください。

password

  • カスタムアカウントのパスワード。

  • Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey Secret。

String

はい

なし

endpoint

Hologres サービスのエンドポイント。

String

はい

なし

詳細については、「エンドポイント」をご参照ください。

connection.pool.size

タスク内の単一の Flink テーブル用に作成される JDBC 接続プールのサイズ。

Integer

いいえ

5

ジョブのパフォーマンスが低い場合は、プールサイズを増やしてください。接続プールサイズはデータスループットに比例します。このパラメーターは、ディメンションテーブルと結果テーブルに対してのみ有効です。

connection.pool.name

接続プールの名前。同じ TaskManager 内で同じ接続プール名を持つテーブルは、接続プールを共有できます。

String

いいえ

'default'

デフォルト値は 'default' です。複数のテーブルが同じ接続プールを使用するように設定されている場合、それらのうちで connection.pool.size パラメーターの最大値が有効になります。

このパラメーターは必要に応じて構成できます。たとえば、ジョブに 2 つのディメンションテーブル A と B、および 3 つの結果テーブル C、D、E がある場合、テーブル A と B には pool1 を、テーブル C と D には pool2 を使用し、トラフィックが多いテーブル E には別の pool3 を使用できます。

説明
  • 接続プールを共有するには、テーブルが同じ connection.pool.name、エンドポイント、データベース、およびその他の接続情報を持っている必要があります。

  • ジョブに多数のテーブルが含まれている場合、接続数が不足するとパフォーマンスに影響する可能性があります。この場合、異なるテーブルに異なる connection.pool.name 値を設定します。

connection.fixed.enabled

軽量接続モードを使用するかどうかを指定します。

Boolean

いいえ

なし

Hologres には接続数の上限があります。Hologres 2.1 以降、リアルタイムデータ書き込みは、最大接続数に制限されない軽量接続の使用をサポートしています。

説明
  • このパラメーターのデフォルト値は、Hologres インスタンスのバージョンによって異なります。ディメンションテーブルと結果テーブルの場合、Hologres のバージョンが 3.0.28 より後の場合、コネクタは自動的に軽量接続モードを選択します。

  • ディメンションテーブルの軽量接続は、JSONB および RoaringBitmap 型のクエリをサポートしていません。

connection.max-idle-ms

JDBC 接続のアイドルタイムアウト。

Long

いいえ

60000

アイドル時間がこの値を超えると、接続は解放されます。次に使用されるときに、新しい接続が自動的に作成されます。単位はミリ秒です。

connection.ssl.mode

転送中のデータに対して SSL 暗号化を有効にするかどうか、およびどのモードを使用するかを指定します。

String

いいえ

disable

  • disable (デフォルト):転送中暗号化を無効にします。

  • require:SSL を有効にしてデータリンクのみを暗号化します。

  • verify-ca:SSL を有効にしてデータリンクを暗号化し、CA 証明書を使用して Hologres サーバーの信頼性を検証します。

  • verify-full:SSL を有効にしてデータリンクを暗号化し、CA 証明書を使用して Hologres サーバーの信頼性を検証し、証明書内の CN または DNS が構成された Hologres エンドポイントと一致するかどうかを確認します。

説明
  • Hologres V2.1 以降は、verify-ca および verify-full モードをサポートしています。詳細については、「転送中暗号化」をご参照ください。

  • このパラメーターを verify-ca または verify-full に設定する場合は、connection.ssl.root-cert.location パラメーターも構成する必要があります。

connection.ssl.root-cert.location

暗号化モードで証明書が必要な場合の証明書へのパス。

String

いいえ

なし

connection.ssl.mode を verify-ca または verify-full に設定する場合は、CA 証明書へのパスも指定する必要があります。リアルタイム計算コンソールのファイル管理機能を使用して証明書をアップロードできます。証明書は /flink/usrlib ディレクトリに保存されます。たとえば、CA 証明書ファイル名が certificate.crt の場合、このパラメーターを '/flink/usrlib/certificate.crt' に設定します。

説明

CA 証明書の取得方法については、「CA 証明書のダウンロード」をご参照ください。

retry-count

接続に失敗した場合の書き込みおよびクエリのリトライ回数。

Integer

いいえ

10

なし。

retry-sleep-step-ms

各リトライの増分待機時間。

Long

いいえ

5000

単位はミリ秒です。たとえば、値が 5000 (5 秒) の場合、最初のリトライは 5 秒待機し、2 回目は 10 秒待機します。

meta-cache-ttl-ms

キャッシュされた TableSchema 情報の TTL。

Long

いいえ

600000

単位はミリ秒です。

serverless-computing.enabled

サーバーレスリソースを使用するかどうかを指定します。

Boolean

いいえ

false

true に設定すると、Hologres インスタンスのリソースの代わりに Hologres サーバーレスリソースが読み取りと書き込みに使用されます。このパラメーターは、バッチ読み取りとバッチインポートでのみサポートされます。バイナリログの消費、ディメンションテーブルのポイントクエリ、またはリアルタイム書き込みには効果がありません。詳細については、「サーバーレスコンピューティングの概要」をご参照ください。

説明
  • バッチ読み取りとは、source.binlog が false に設定されている場合、または source.binlog.startup-mode が INITIAL に設定されている場合の完全データ読み取りフェーズを指します。

  • バッチインポートとは、sink.write-mode が COPY_BULK_LOAD または COPY_BULK_LOAD_ON_CONFLICT に設定されている場合を指します。

説明

大規模な完全データのインポートまたはエクスポートを実行し、ご利用の Hologres インスタンス上の他のクエリに影響を与えたくない場合は、このパラメーターを有効にします。詳細については、「サーバーレスコンピューティングの概要」をご参照ください。

ソーステーブル固有のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

source.binlog

バイナリログデータを消費するかどうかを指定します。

Boolean

いいえ

true

  • true (デフォルト):バイナリログデータを消費します。

  • false:バイナリログデータを消費しません。バッチ読み取りのみが実行されます。読み取りが完了するとジョブは停止します。

source.binlog.read-mode

読み取りモード。

ENUM

いいえ

AUTO

  • AUTO (デフォルト):インスタンスのバージョンに基づいて最適なモードを自動的に選択します。

  • HOLOHUB:HoloHub モードを使用してバイナリログを消費します。

  • JDBC:JDBC モードを使用してバイナリログを消費します。

説明

AUTO モードの自動選択ロジックは次のとおりです:

  • Hologres インスタンス V2.1.27 以降の場合、JDBC モードが選択され、軽量接続がデフォルトで有効になります。これは、connection.fixed.enabled パラメーターがデフォルトで true に設定されることを意味します。

  • Hologres インスタンス V2.1.0 から V2.1.26 の場合、JDBC モードが選択されます。

  • Hologres インスタンス V2.0 以前の場合、HOLOHUB モードが選択されます。

source.binlog.change-log-mode

CDC ソーステーブルでサポートされる変更ログタイプ。

ENUM

いいえ

UPSERT

  • ALL:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべての変更ログタイプをサポートします。

  • UPSERT (デフォルト):INSERT、DELETE、UPDATE_AFTER を含む upsert 変更ログタイプのみをサポートします。

  • ALL_AS_APPEND_ONLY:すべての変更ログタイプを INSERT として扱います。

説明

ダウンストリームパイプラインにリトラクションオペレーターが含まれる場合 (たとえば、ROW_NUMBER OVER WINDOW を使用して重複を削除する場合)、upsertSource を true に設定する必要があります。ソーステーブルは Hologres から upsert 方式でデータを読み取ります。

source.binlog.startup-mode

バイナリログデータの消費モード。

ENUM

いいえ

INITIAL

  • INITIAL (デフォルト):まず既存のすべてのデータを消費し、次にバイナリログを読み取って増分消費を開始します。

  • EARLIEST_OFFSET:最も古いバイナリログから消費を開始します。

  • TIMESTAMP:指定された startTime からバイナリログの消費を開始します。

説明

startTime パラメーターを設定するか、起動インターフェイスで開始時刻を選択すると、binlogStartupMode は強制的に timestamp モードになります。他の消費モードは有効になりません。startTime パラメーターが優先されます。

source.binlog.batch-size

各バッチでバイナリログから読み取る行数。

Integer

いいえ

512

なし。

source.binlog.request-timeout-ms

バイナリログデータの読み取りのタイムアウト期間。

Long

いいえ

300000

単位はミリ秒です。

説明

ダウンストリームオペレーターがソーステーブルデータを処理するのが遅すぎる場合、バックプレッシャーによってタイムアウトが発生する可能性があります。

source.binlog.project-columns.enabled

バイナリログデータを読み取る際に、ユーザーテーブルで指定されたフィールドのみを読み取るかどうかを指定します。

Boolean

いいえ

なし

指定されたフィールドは、CREATE TEMPORARY TABLE 文で宣言されたものです。宣言されていないフィールドは読み取られません。テーブルに多くのフィールドがあるが、サブセットのみを消費したい場合、これにより不要なデータ転送と変換を防ぎ、読み取りパフォーマンスを向上させ、帯域幅を節約できます。

説明

このパラメーターは VVR 11.3 以降および Hologres V3.2 以降でのみサポートされます。通常、このパラメーターを構成する必要はありません。バージョン要件が満たされている場合、コネクタはデフォルトでこれを有効にします。

source.binlog.compression.enabled

バイナリログデータを読み取る際に、転送中のデータ圧縮を有効にするかどうかを指定します。

Boolean

いいえ

なし

バイナリログを消費する際、サーバーは LZ4 アルゴリズムで圧縮されたバイトストリームを返します。これにより、読み取りパフォーマンスが向上し、帯域幅が節約されます。

説明

このパラメーターは VVR 11.3 以降および Hologres V3.2 以降でのみサポートされます。通常、このパラメーターを構成する必要はありません。バージョン要件が満たされている場合、コネクタはデフォルトでこれを有効にします。

source.binlog.partition-binlog-mode

パーティションテーブルのバイナリログの消費モード。

Enum

いいえ

DISABLE

  • DISABLE (デフォルト):ソーステーブルはパーティションテーブルではありません。指定された Hologres テーブルがパーティションテーブルの場合、例外がスローされます。

  • DYNAMIC:パーティションテーブルの最新のパーティションを継続的に消費します。パーティションテーブルに対して動的パーティション機能を有効にする必要があります。DYNAMIC モードでは、パーティションは古いものから新しいものへと時系列で消費されます。最新のパーティションの前のパーティションのデータが消費されているときに、新しい時間単位が到着すると、コネクタは最新のパーティションのデータの消費を開始します。

  • STATIC:パーティションテーブルの固定パーティションを消費します。複数のパーティションを同時に消費できます。消費中にパーティションを追加または削除することはできません。デフォルトでは、この親テーブルのすべてのパーティションが消費されます。

source.binlog.partition-binlog-lateness-timeout-minutes

DYNAMIC モードでパーティションテーブルを消費する際の最大遅延タイムアウト。

Boolean

いいえ

60

  • 単位は分です。DYNAMIC モードでは、新しい時間単位が到着すると、現在の時刻に対応する最新のパーティションの消費が開始されます。ただし、前のパーティションはすぐには閉じられません。前のパーティションからの遅延データを読み取れるように、継続的に監視されます。

たとえば、テーブルが日単位でパーティション分割されており、パーティションが 20240920 で、最大データ遅延が 1 時間の場合、このパーティションの消費は 2024-09-21 00:00:00 ではなく、2024-09-21 01:00:00 に終了します。

  • 遅延タイムアウト時間は、パーティションの時間単位を超えることはできません。

テーブルが日単位でパーティション分割されている場合、最大値は 24 × 60 = 1440 (分) です。DYNAMIC モードでは、ほとんどの場合、1 つのテーブルのみが消費されます。遅延期間中、2 つのパーティションが同時に消費される場合があります。

source.binlog.partition-values-to-read

STATIC モードで、消費するパーティションを指定します。パーティション値はカンマ (,) で区切ります。

String

いいえ

なし

  • このパラメーターが構成されていない場合、STATIC モードは指定された親テーブルのすべてのパーティションを消費します。指定されている場合は、指定されたパーティションのみが消費されます。

  • このパラメーターには、完全なパーティション名ではなく、パーティション値のみが必要です。複数のパーティション値はカンマ (,) で区切ります。正規表現はサポートされていません。

startTime

開始オフセット時間。

String

いいえ

なし

フォーマットは yyyy-MM-dd hh:mm:ss です。このパラメーターが設定されておらず、ジョブが状態から回復しない場合、Hologres データの消費は最も古いバイナリログから開始されます。

source.scan.fetch-size

バッチ読み取りのバッチサイズ。

Integer

いいえ

512

なし。

source.scan.timeout-seconds

バッチ読み取りのタイムアウト期間。

Integer

いいえ

60

単位は秒です。

source.scan.filter-push-down.enabled

バッチ読み取り中にフィルターをプッシュダウンするかどうかを指定します。

Boolean

いいえ

false

  • false (デフォルト):フィルターをプッシュダウンしません。

  • true:バッチ読み取り中にサポートされているフィルター条件を Hologres にプッシュダウンします。

説明
  • このパラメーターと source.binlog.filter-push-down.enabled パラメーターを同時に有効にしないでください。

  • このパラメーターは 2 つのケースで有効になります:

    • source.binlog が false に設定されている場合、これはバッチ読み取りを示し、フィルタープッシュダウンが有効になります。

    • source.binlog が true に設定され、source.binlog.startup-modeINITIAL に設定されている場合、これは完全および増分読み取りを示し、フィルタープッシュダウンは完全データ読み取りフェーズで有効になります。

source.binlog.filter-push-down.enabled

バイナリログ消費中にフィルターをプッシュダウンするかどうかを指定します。

Boolean

いいえ

false

  • false (デフォルト):フィルターをプッシュダウンしません。

  • true:バイナリログ消費中にサポートされているフィルター条件を Hologres にプッシュダウンします。

説明
  • このパラメーターは VVR 11.3 以降および Hologres V4.0 以降でのみサポートされます。このパラメーターと source.scan.filter-push-down.enabled パラメーターを同時に有効にしないでください。

  • source.binlog が true に設定されている場合、フィルタープッシュダウンは常に有効です。たとえば、source.binlog.startup-modeINITIAL に設定されている場合、フィルタープッシュダウンは完全フェーズと増分フェーズの両方で有効になります。

scan.prefer.physical-column.over.metadata-column

物理列がメタデータ列と同じ名前を持つ場合に、物理列からのデータ読み取りを優先するかどうかを指定します。

Boolean

いいえ

false

このパラメーターは VVR 11.5 以降でのみサポートされます。以前のバージョンでは、常にメタデータ列からの読み取りが優先されます。

結果テーブル固有のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

sink.write-mode

書き込みモード。

ENUM

いいえ

INSERT

  • INSERT:デフォルト値。JDBC を使用して INSERT 文でデータを書き込みます。

  • COPY_STREAM:ストリーミング固定コピーメソッドを使用して書き込みます。固定コピーは、高スループットかつ低レイテンシーのシナリオに適した高性能なストリーミング書き込みメソッドです。ただし、このモードはデータの削除、親パーティションテーブルへの書き込み、または ignoreNullWhenUpdate パラメーターをサポートしていません。

  • COPY_BULK_LOAD:COPY プロトコルを使用してバッチ書き込みを行います。COPY_BULK_LOAD は現在、プライマリキーのないテーブルにのみ適しています。プライマリキーの競合は例外をスローします。COPY_STREAM と比較して、このモードは使用する Hologres リソースが少なくなります。

  • COPY_BULK_LOAD_ON_CONFLICT:COPY プロトコルを使用してバッチ書き込みを行い、プライマリキーを持つテーブルへの書き込みとプライマリキーの競合処理をサポートします。

説明
  • COPY_BULK_LOAD_ON_CONFLICT モードは、VVR 11.3 以降でのみサポートされ、Hologres インスタンスバージョン 3.1 以降が必要です。動作原理は、Flink が Hologres 結果テーブルの DistributionKey に基づいてデータを再シャッフルすることです。これにより、同じシャードのデータが同じ Flink タスクによって書き込まれるようになり、バッチインポートのテーブルロックがシャードレベルの粒度に縮小され、異なるシャードへの同時書き込みが可能になります。したがって、ジョブの並列度を Hologres 結果テーブルのシャード数と一致させることを推奨します。

  • COPY_BULK_LOAD または COPY_BULK_LOAD_ON_CONFLICT モードで書き込む場合、データはチェックポイントが成功した後に表示されます。これらのモードは、高いデータ可視性を必要としないシナリオや、既存データのバッチインポートに適しています。

sink.on-conflict-action

プライマリキーの競合を処理するためのポリシー。

ENUM

いいえ

INSERT_OR_UPDATE

  • INSERT_OR_IGNORE:データの最初の発生を保持し、後続のすべてのデータを無視します。

  • INSERT_OR_REPLACE:既存の行を新しいデータで置き換えます。

  • INSERT_OR_UPDATE (デフォルト):既存のデータの一部の列を更新します。

    たとえば、a、b、c、d の 4 つのフィールドを持つテーブルで、a がプライマリキーの場合、結果テーブルが a と b のフィールドのみを提供する場合、システムはプライマリキーの競合が発生したときにフィールド b のみを更新します。フィールド c と d は変更されません。

sink.create-missing-partition

パーティションテーブルへの書き込み時にパーティションが存在しない場合、パーティション値に基づいてパーティションを自動的に作成するかどうかを指定します。

Boolean

いいえ

false

  • DATE 型のフィールドをパーティションキーとして使用し、動的パーティションが有効になっている場合、自動的に作成されるパーティションテーブルの名前形式は、デフォルトで動的パーティションの形式と一致します。

  • パーティション値にダーティデータが含まれていないことを確認してください。そうしないと、不正なパーティションテーブルが作成され、フェールオーバーが発生します。このパラメーターは注意して使用してください。

  • sink.write-mode が INSERT でない場合、親パーティションテーブルへの書き込みはサポートされていません。

sink.delete-strategy

リトラクションメッセージの処理ポリシー。

String

いいえ

CHANGELOG_STANDARD

  • IGNORE_DELETE:Update Before および Delete メッセージを無視します。これは、データの挿入または更新のみが必要で、データの削除は不要なシナリオに適しています。

  • NON_PK_FIELD_TO_NULL:Update Before メッセージを無視し、非プライマリキーフィールドを NULL に更新することで Delete メッセージを処理します。これは、他の列に影響を与えずに部分更新中に削除を実行したいシナリオに適しています。

  • DELETE_ROW_ON_PK:Update Before メッセージを無視し、プライマリキーに基づいて行全体を削除することで Delete メッセージを処理します。これは、部分更新中に他の列に影響を与える行全体を削除したいシナリオに適しています。

  • CHANGELOG_STANDARD:Flink フレームワークは Flink SQL の変更ログの原則に従って動作します。削除操作を無視せず、データの正確性を確保するために、まずデータを削除してから挿入することで更新を実行します。これは、部分更新を伴わないシナリオに適しています

説明

NON_PK_FIELD_TO_NULL オプションを有効にすると、プライマリキーのみを含み、他のすべての列が null であるレコードが生成される可能性があります。

sink.ignore-null-when-update.enabled

sink.on-conflict-action='INSERT_OR_UPDATE' の場合、更新のために書き込まれるデータ内の null 値を無視するかどうかを指定します。

Boolean

いいえ

false

  • false (デフォルト):Hologres 結果テーブルに null 値を書き込みます。

  • true:更新のために書き込まれるデータ内の null 値を無視します。

説明

このパラメーターは、sink.write-modeINSERT に設定されている場合にのみサポートされます。

sink.ignore-null-when-update-by-expr.enabled

sink.on-conflict-action='INSERT_OR_UPDATE' の場合、式を使用して更新のために書き込まれるデータ内の null 値を無視するかどうかを指定します。

Boolean

いいえ

false

sink.ignore-null-when-update.enabled よりも優れたパフォーマンスを提供します。

  • false (デフォルト):

    • sink.ignore-null-when-update.enabled が有効な場合、更新されたデータ内の null 値は無視されます。

    • sink.ignore-null-when-update.enabled が無効な場合、null 値は Hologres 結果テーブルに書き込まれます。

  • true:sink.ignore-null-when-update.enabled が有効かどうかにかかわらず、更新されたデータ内の null 値は無視されます。

説明
  • このパラメーターは、sink.write-modeINSERT に設定されている場合にのみサポートされます。

  • Hologres V4.0 以降が必要です。

sink.default-for-not-null-column.enabled

Hologres テーブルのデフォルト値のない NOT NULL 列に null 値が書き込まれた場合に、コネクタがデフォルト値を入力することを許可するかどうかを指定します。

Boolean

いいえ

true

  • true (デフォルト):コネクタが以下のルールに基づいてデフォルト値を入力して書き込むことを許可します。

    • フィールドが String 型の場合、デフォルトで空の文字列 ("") が書き込まれます。

    • フィールドが Number 型の場合、デフォルトで 0 が書き込まれます。

    • フィールドが Date、timestamp、または timestamptz 型の場合、デフォルトで 1970-01-01 00:00:00 が書き込まれます。

  • false:デフォルト値を入力しません。NOT NULL フィールドに null 値を書き込むと例外がスローされます。

説明

このパラメーターは、sink.write-modeINSERT に設定され、sink.on-conflict-actionINSERT_OR_UPDATE 以外のオプションに設定されている場合にのみサポートされます。

sink.remove-u0000-in-text.enabled

書き込み中に文字列型に無効な文字 \u0000 が含まれている場合に、コネクタがそれを削除することを許可するかどうかを指定します。

Boolean

いいえ

true

  • false:コネクタはデータを操作しませんが、ダーティデータに遭遇すると書き込み時に次の例外がスローされる可能性があります:ERROR: invalid byte sequence for encoding "UTF8": 0x00

    この場合、ソーステーブルのダーティデータを事前に処理するか、SQL でダーティデータ処理ロジックを定義します。

  • true (デフォルト):コネクタは書き込み例外を防ぐために、文字列型から \u0000 を削除します。

sink.partial-insert.enabled

INSERT 文で定義されたフィールドのみを挿入するかどうかを指定します。

Boolean

いいえ

false

  • false (デフォルト):INSERT 文でどのフィールドが宣言されているかに関係なく、結果テーブルの DDL で定義されているすべてのフィールドが更新されます。INSERT 文で宣言されていないフィールドは null に更新されます。

  • true:INSERT 文で定義されたフィールドをコネクタにプッシュダウンし、宣言されたフィールドのみが更新または挿入されるようにします。

説明
  • このパラメーターは、sink.on-conflict-action パラメーターが INSERT_OR_UPDATE に設定されている場合にのみ有効です。

sink.deduplication.enabled

バッチ書き込み中に重複を削除するかどうかを指定します。

Boolean

いいえ

true

  • true (デフォルト):データのバッチに同じプライマリキーを持つレコードが含まれている場合、デフォルトで重複が削除され、最後に到着したレコードのみが保持されます。たとえば、最初のフィールドがプライマリキーである 2 つのフィールドを持つデータを考えます:

    • レコード INSERT (1,'a')INSERT (1,'b') が順番に到着した場合、最後の (1,'b') のみが保持され、重複排除後に Hologres 結果テーブルに書き込まれます。

    • レコード (1,'a') が Hologres 結果テーブルに既に存在し、レコード DELETE (1,'a')INSERT (1,'b') が順番に到着した場合、最後の (1,'b') のみが保持され、Hologres に書き込まれます。これは、削除してから挿入するのではなく、直接更新として表示されます。

  • false:バッチ処理中に重複を削除しません。新しく到着したレコードが現在バッチ内にあるレコードと同じプライマリキーを持つ場合、既存のバッチが最初に書き込まれます。書き込みが完了した後、新しいレコードが書き込まれます。

説明
  • このパラメーターは、sink.write-modeINSERT に設定されている場合にのみサポートされます。

  • バッチ重複排除が許可されていない場合、すべてのデータが同じプライマリキーを持つなどの極端なケースでは、書き込みはバッチ処理なしの単一行書き込みに低下し、パフォーマンスに影響します。

sink.aggressive-flush.enabled

アグレッシブコミットモードを有効にするかどうかを指定します。

Boolean

いいえ

false

true に設定すると、バッチが期待されるサイズに達していなくても、アイドル時に接続が強制的にコミットされます。これにより、トラフィックが少ない場合にデータ書き込みのレイテンシーを効果的に削減できます。

説明

このパラメーターは、sink.write-modeINSERT または COPY_STREAM に設定されている場合にのみサポートされます。

sink.insert.check-and-put.column

条件付き更新を有効にし、チェックするフィールド名を指定します。

String

いいえ

なし

パラメーター値は、Hologres テーブルに存在するフィールド名に設定する必要があります。

重要
  • このパラメーターは、sink.write-modeINSERT に設定されている場合にのみサポートされます。

  • 結果テーブルにはプライマリキーが必要であり、sink.on-conflict-action パラメーターは INSERT_OR_UPDATE または INSERT_OR_REPLACE に設定する必要があります。

  • 逆引き参照が必要なため、結果テーブルを行指向テーブルまたはハイブリッド行列表として作成します。

  • 重複レコードの数が多い場合、check-and-put 操作は単一行書き込みに低下し、書き込みパフォーマンスが低下します。

sink.insert.check-and-put.operator

条件付き更新操作の比較演算子。

String

いいえ

GREATER

新しいレコードのチェックフィールドをテーブルの古い値と比較します。比較演算子の条件が満たされた場合に更新が実行されます。サポートされている値は、GREATER、GREATER_OR_EQUAL、EQUAL、NOT_EQUAL、LESS、LESS_OR_EQUAL、IS_NULL、および IS_NOT_NULL です。

sink.insert.check-and-put.null-as

条件付き更新中に、古いデータが null の場合、null 値はこのパラメーターで構成された有効な値として扱われます。

String

いいえ

なし

PostgreSQL では、NULL との比較結果はすべて FALSE になります。したがって、テーブル内の元のデータが NULL の場合、更新操作のために NULL-AS パラメーターを設定する必要があります。これは SQL の COALESCE 関数に相当します。

sink.insert.batch-size

INSERT モードで、Hologres シンクへの書き込み前にバッファーに格納するレコードの最大数。

Integer

いいえ

512

sink.insert.batch-sizesink.insert.batch-byte-size、および sink.insert.flush-interval-ms パラメーターは論理 OR で関連付けられています。これら 3 つのパラメーターを設定した場合、いずれかの条件が満たされるとデータが書き込まれます。

sink.insert.batch-byte-size

INSERT モードで、Hologres 結果テーブルに書き込む前にバッファリングするレコードの最大サイズ (バイト単位)。

Long

いいえ

2 × 1024 × 1024 バイト、つまり 2 MB

sink.insert.flush-interval-ms

`INSERT` モードでは、Hologres シンクから Hologres にバッファーされたデータが書き込まれるまでの最大待機時間です。

Long

いいえ

10000

sink.copy.format

COPY モードで使用される転送フォーマット。

String

いいえ

  • COPY_STREAM モードのデフォルトは binary です。

  • COPY_BULK_LOAD または COPY_BULK_LOAD_ON_CONFLICT のデフォルトは text です。

COPY_STREAM モードは以下をサポートします:

  • binary

  • text

  • binaryrow (Hologres エンジンバージョン >= 4.1.0)

COPY_BULK_LOAD または COPY_BULK_LOAD_ON_CONFLICT は text のみをサポートします。

説明

このパラメーターは、sink.write-modeCOPY_STREAMCOPY_BULK_LOAD、または COPY_BULK_LOAD_ON_CONFLICT に設定されている場合にのみサポートされます。

sink.insert.conflict-update-set

プライマリキーの競合時の更新に関する Hologres の式。

String

いいえ

なし

これは `insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set>` 文に相当します。Hologres の式または関数を指定できます。

たとえば、このパラメーターが col1=old.col1+excluded.col1,col2=excluded.col2 に設定されている場合、プライマリキーの競合時に col1 の値が古い値と新しい値の合計に更新され、col2 が新しい値に更新されることを意味します。

  • このパラメーターが指定されていない場合、すべての入力フィールドはデフォルトで新しい値に更新されます。

  • 更新式がステートフルな場合、たとえば col=old.col+excluded.col のように結果が古い値に依存する場合、フィールドが行バージョン番号として使用できることを確認してください。次に、sink.insert.conflict-where を excluded.seq>old.seq に設定して、フェールオーバーと回復後のデータの正確性を確保します。

説明

このパラメーターは、sink.write-modeINSERT に設定されている場合にのみサポートされます。

sink.insert.conflict-where

プライマリキーの競合時に更新をトリガーする Hologres のフィルター条件。

String

いいえ

なし

これは `insert into tbl values(xxx) on conflict(pk) do update set <conflict-update-set> where <conflict-where>` に相当します。Hologres の式または関数を指定できます。

たとえば、このパラメーターが excluded.col1>old.col1 に設定されている場合、プライマリキーの競合時に、col1 の新しい値が古い値より大きい場合にのみ更新がトリガーされることを意味します。

説明
  • このパラメーターは、sink.write-modeINSERT に設定されている場合にのみサポートされます。

  • このパラメーターは sink.insert.check-and-put* パラメーターと競合します。同時に構成するとエラーが発生します。

ディメンションテーブル固有のパラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

lookup.read.batch-size

Hologres ディメンションテーブルに対するポイントクエリ中にバッチ処理のためにバッファリングするレコードの最大数。

Integer

いいえ

256

なし。

lookup.read.timeout-ms

ディメンションテーブルのポイントクエリのタイムアウト期間。

Long

いいえ

デフォルト値は 0 で、タイムアウトがないことを意味します。

なし。

lookup.read.column-table.enabled

列指向テーブルをディメンションテーブルとして使用するかどうかを指定します。

Boolean

いいえ

false

列指向テーブルをディメンションテーブルとして使用すると、パフォーマンスが低下します。代わりに行指向テーブルまたはハイブリッド行列表を使用してください。このパラメーターが有効で、列指向テーブルが使用されている場合、警告がログに記録されます。

lookup.insert-if-not-exists

存在しないデータを挿入するかどうかを指定します。

Boolean

いいえ

false

ポイントクエリで現在のデータがディメンションテーブルに存在しないことが判明した場合、現在のデータが挿入されます。

cache

キャッシュポリシー。

String

いいえ

なし

Hologres は None および LRU キャッシュポリシーのみをサポートします。

cacheSize

キャッシュサイズ。

Integer

いいえ

10000

LRU キャッシュポリシーを選択した後、キャッシュサイズを設定できます。単位は行です。

cacheTTLMs

キャッシュの更新間隔。

Long

いいえ

備考をご参照ください。

単位はミリ秒です。cacheTTLMs のデフォルト値はキャッシュ構成によって異なります:

  • cache が LRU に設定されている場合、cacheTTLMs はキャッシュのタイムアウト期間です。デフォルトでは、キャッシュは有効期限切れになりません。

  • cache が None に設定されている場合、cacheTTLMs を構成する必要はありません。これは、キャッシュがタイムアウトしないことを意味します。

cacheEmpty

結合結果が空のデータをキャッシュするかどうかを指定します。

Boolean

いいえ

true

  • true (デフォルト):結合結果が空のデータをキャッシュします。

  • false:結合結果が空のデータをキャッシュしません。

    ただし、結合文の AND の前の条件が満たされても AND の後の条件が満たされない場合、結合結果が空のデータは引き続きキャッシュされます。以下のコードに例を示します。

    LEFT JOIN latest_emergency FOR SYSTEM_TIME AS OF PROCTIME() AS t2
     ON t1.alarm_id = t2.alarm_id -- If a dynamic alert is detected, match the dynamic alert ID. Otherwise, ignore the dynamic_alarm_id field.
     AND CASE
     WHEN alarm_type = 2 THEN t1.dynamic_id = t2.dynamic_alarm_id
     ELSE true
     END
重要

ビジネスシナリオに基づいて、このスイッチを有効にするかどうかを決定します。ジョブの実行中にディメンションテーブルに新しく挿入されたレコードと結合したい場合は、このオプションを無効にするか、cacheTTLMs を短い間隔に設定します。これにより、null の結果がキャッシュされるのを防ぎ、後続のディメンションテーブルの結合が失敗するのを防ぐことができます。

async

データを非同期で返すかどうかを指定します。

Boolean

いいえ

false

  • true:データを非同期で返します。

  • false (デフォルト):データを非同期で返しません。

説明

非同期で返されたデータは順序付けされていません。

lookup.filter-push-down.enabled

ディメンションテーブルのフィルター条件を Hologres サーバーにプッシュダウンするかどうかを指定します。

Boolean

いいえ

false

現在、プッシュダウン操作は、等価演算子と比較演算子 (<、<=、>、>= など) を使用する列と定数の間の比較操作に対してのみ実行されます。

説明

このパラメーターは VVR 11.4 以降でのみ構成できます。