すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Postgres CDC (パブリックプレビュー)

最終更新日:Jan 22, 2026

Postgres CDC コネクタは、PostgreSQL データベースの完全なスナップショットを読み取った後、後続の変更データをキャプチャします。このプロセスにより、各レコードが 1 回だけ処理されることが保証されます。このトピックでは、Postgres CDC コネクタの使用方法について説明します。

背景情報

Postgres CDC コネクタは、以下をサポートしています。

カテゴリ

詳細

サポートされるタイプ

ソーステーブル

説明

結果テーブルとディメンションテーブルには JDBC コネクタを使用してください。

モード

ストリームモードのみ

データフォーマット

該当なし

固有のメトリック

固有のモニタリングメトリック

  • currentFetchEventTimeLag:データが生成されてから Source オペレーターによってプルされるまでの間隔。

  • currentEmitEventTimeLag:データが生成されてから Source オペレーターを出るまでの間隔。

  • sourceIdleTime:ソースが新しいデータを生成していない期間。

説明
  • currentFetchEventTimeLag と currentEmitEventTimeLag メトリックは、増分フェーズでのみ有効です。完全フェーズでは、これらの値は常に 0 です。

  • メトリックの詳細については、「メトリックの説明」をご参照ください。

API タイプ

SQL およびデータインジェスト YAML

結果テーブルデータの更新または削除のサポート

該当なし

特徴

Postgres CDC コネクタは、Ververica Runtime (VVR) 8.0.6 以降で利用可能な、変更データキャプチャ (CDC) のための増分スナップショットフレームワークと統合されています。完全な既存データを読み取った後、コネクタは自動的に先行書き込みログ (WAL) からの変更ログの読み取りに切り替わります。このプロセスは、1回限りのセマンティクスを保証します。Postgres CDC ソーステーブルは、完全データの同時かつロックフリーな読み取りをサポートし、ブレークポイントからの再開が可能です。

ソーステーブルとしての特徴と利点は次のとおりです:

  • ストリーム処理とバッチ処理の統合。コネクタは完全データと増分データの両方の読み取りをサポートしているため、2つの別々のデータ処理ジョブを維持する必要がありません。

  • 完全データの同時読み取り。この機能により、水平スケーリングが可能になり、パフォーマンスが向上します。

  • 完全読み取りから増分読み取りへのシームレスな切り替え。コネクタは自動的にスケールインし、計算リソースを節約します。

  • 再開可能な読み取り。コネクタは完全データ読み取りフェーズ中にブレークポイントから再開できるため、ジョブの安定性が向上します。

  • ロックフリーな読み取り。完全データの読み取りにはロックが不要なため、オンラインビジネス運用への影響を防ぎます。

前提条件

Postgres CDC コネクタは、PostgreSQL の論理レプリケーション機能を使用して、変更データキャプチャ (CDC) ストリームを読み取ります。このコネクタは、Alibaba Cloud RDS for PostgreSQLAmazon RDS for PostgreSQL、および自己管理 PostgreSQL をサポートしています。

重要

必要な構成は、Alibaba Cloud RDS for PostgreSQLAmazon RDS for PostgreSQL、および自己管理 PostgreSQL で異なる場合があります。詳細な構成手順については、「Postgres の構成」ドキュメントをご参照ください。

構成が完了したら、次の条件が満たされていることを確認してください:

  • wal_level パラメーターは `logical` に設定する必要があります。この設定により、論理デコーディングをサポートするために必要な情報が先行書き込みログ (WAL) に追加されます。

  • サブスクライブするテーブルの REPLICA IDENTITY は `FULL` に設定する必要があります。この設定により、`INSERT` および `UPDATE` イベントにテーブルのすべての列の以前の値が含まれるようになり、データ整合性が保証されます。

    説明

    `REPLICA IDENTITY` は PostgreSQL 固有のテーブルレベルの設定です。これにより、`INSERT` および `UPDATE` イベントに影響を受ける列の以前の値が含まれるかどうかが決まります。`REPLICA IDENTITY` の値の詳細については、「REPLICA IDENTITY」をご参照ください。

  • `max_wal_senders` および `max_replication_slots` パラメーターの値が、現在使用中のレプリケーションスロットの合計数と Flink ジョブで必要なスロット数の合計よりも大きいことを確認してください。

  • アカウントに `SUPERUSER` 権限、または `LOGIN` と `REPLICATION` の両方の権限があることを確認してください。また、アカウントには、完全データをクエリするために、サブスクライブするテーブルに対する `SELECT` 権限も必要です。

注意事項

Realtime Compute for Apache Flink V8.0.6 以降のみが、Postgres CDC の増分スナップショット機能をサポートしています。

レプリケーションスロット

Flink PostgreSQL CDC ジョブはレプリケーションスロットを使用して、先行書き込みログ (WAL) が早期に消去されるのを防ぎ、データ整合性を確保します。適切に管理されない場合、レプリケーションスロットはディスク領域の浪費データ読み取りの遅延などの問題を引き起こす可能性があります。以下のベストプラクティスに従うことを推奨します:

  • 未使用のスロットを速やかにクリーンアップする

    • Flink は、ジョブが停止した後でも、レプリケーションスロットを自動的に削除しません。これは特にステートレス再起動の場合に当てはまります。この動作は、WAL が消去された場合に発生する可能性のあるデータ損失を防ぎます。

    • ジョブが再起動されないことを確認した場合、関連するレプリケーションスロットを手動で削除してディスク領域を解放する必要があります。

      重要

      ライフサイクル管理:レプリケーションスロットをジョブリソースとして扱い、ジョブの開始と停止と同期して管理してください。

  • 古いスロットの再利用を避ける

    • 新しいジョブは、古いスロットを再利用するのではなく、新しいスロット名を使用する必要があります。スロットを再利用すると、ジョブが起動時に大量の履歴 WAL データを読み取ることになり、最新データの処理が遅れる可能性があります。

    • PostgreSQL の論理レプリケーションでは、1つのスロットは1つの接続でのみ使用できます。異なるジョブは異なるスロット名を使用する必要があります。

      重要

      命名規則:`slot.name` をカスタマイズする場合、一時スロットとの競合を避けるために、`my_slot_1` のような数値サフィックスを持つ名前の使用は避けてください。

  • 増分スナップショットが有効な場合のスロットの動作

    • 前提条件:チェックポイントが有効であり、ソーステーブルにプライマリキーが定義されている必要があります。

    • スロット作成ルール:

      • 増分スナップショットが無効:単一の同時実行のみがサポートされます。1つのグローバルスロットが使用されます。

      • 増分スナップショットが有効:

        • 完全フェーズ:各同時ソースサブタスクは一時スロットを作成します。命名フォーマットは ${slot.name}_${task_id} です。

        • 増分フェーズ:すべての一時スロットは自動的に回収されます。1 つのグローバルスロットのみが保持されます。

    • 最大スロット数:ソースの同時実行数 + 1 (完全フェーズ中)

  • リソースとパフォーマンス

    • PostgreSQL で利用可能なスロット数やディスク領域が限られている場合は、完全フェーズの同時実行数を減らして、使用する一時スロットを少なくすることができます。この操作により、完全フェーズ中の読み取り速度は低下します。

    • ダウンストリームシンクがべき等書き込みをサポートしている場合は、scan.incremental.snapshot.backfill.skip = true を設定できます。この設定は、完全フェーズ中の WAL バックフィルをスキップし、ジョブの起動を高速化します。

      この構成は at-least-once セマンティクスのみを提供します。集約やディメンションテーブルの結合など、ステートフルな計算を伴うジョブには適していません。なぜなら、中間状態に必要な履歴の変更が失われる可能性があるためです。

  • 増分スナップショットが無効な場合、全表スキャンフェーズ中にチェックポイントはサポートされません。

    完全同期フェーズ中のタイムアウトを回避するための構成

    増分スナップショットが無効な場合、全表スキャンフェーズ中にジョブがチェックポイントをトリガーすると、チェックポイントタイムアウトによりジョブがフェールオーバーする可能性があります。これを防ぐには、[その他の構成] で以下のパラメーターを構成します。詳細については、「ジョブのカスタム実行時パラメーターを構成する方法」をご参照ください。この構成により、完全同期フェーズ中のチェックポイントタイムアウトによるフェールオーバーを防ぎます。

    execution.checkpointing.interval: 10min
    execution.checkpointing.tolerable-failed-checkpoints: 100
    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 2147483647

    次の表にパラメーターを示します。

    パラメーター

    説明

    備考

    execution.checkpointing.interval

    チェックポイント間の間隔。

    単位は Duration 値です (例:10min、30s)。

    execution.checkpointing.tolerable-failed-checkpoints

    ジョブが失敗するまでに許容されるチェックポイントの失敗回数。

    このパラメーターとチェックポイントスケジューリング間隔の積が、許容されるスナップショット読み取り時間です。

    説明

    テーブルが非常に大きい場合は、このパラメーターをより大きな値に設定してください。

    restart-strategy

    ジョブの再起動戦略。

    有効値:

    • fixed-delay:固定遅延再起動戦略。

    • failure-rate:失敗率再起動戦略。

    • exponential-delay:指数遅延再起動戦略。

    詳細については、「再起動戦略」をご参照ください。

    restart-strategy.fixed-delay.attempts

    固定遅延再起動戦略の最大再試行回数。

    なし。

Postgres サブスクリプションの再利用

Postgres CDC コネクタは、どのテーブルの変更がスロットにプッシュされるかを決定するためにパブリケーションに依存しています。複数のジョブが同じパブリケーションを共有する場合、それらの構成は上書きされます。

原因

publication.autocreate.mode のデフォルト値は filtered であり、コネクタ構成で指定されたテーブルのみが含まれます。このモードは、ジョブが開始されるとパブリケーション内のテーブルを変更するため、他のジョブの読み取り操作に影響を与える可能性があります。

ソリューション

  1. PostgreSQL で、監視対象のすべてのテーブルを含むパブリケーションを手動で作成します。または、ジョブごとに個別のパブリケーションを作成します。

    -- すべてのテーブル (または指定されたテーブル、ジョブごとに1つのパブリケーションを作成) を含む my_flink_pub という名前のパブリケーションを作成します
    CREATE PUBLICATION my_flink_pub FOR TABLE table_a, table_b;
    -- または、より簡単に、データベース内のすべてのテーブルを含めます
    CREATE PUBLICATION my_flink_pub FOR ALL TABLES;
    説明

    データベース内のすべてのテーブルをサブスクライブすることは推奨されません。データベースが大きく、多くのテーブルが含まれている場合、ネットワーク帯域幅の浪費Flink クラスターの高い CPU 消費を引き起こす可能性があります。

  2. 以下の Flink 構成を追加します:

    • debezium.publication.name = 'my_flink_pub' (パブリケーション名を指定)

    • debezium.publication.autocreate.mode = 'disabled' (Flink が起動時にパブリケーションを作成または変更しようとするのを防ぐ)

このアプローチは完全な隔離を提供し、Flink に依存するのではなく手動でパブリケーションを管理できます。これにより、新しいジョブが既存のジョブに影響を与えるのを防ぎ、より安全なアクセス制御を提供します。

SQL

構文

CREATE TABLE postgrescdc_source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>',
  'decoding.plugin.name'= 'pgoutput',
  'scan.incremental.snapshot.enabled' = 'true',
  -- バックフィルをスキップすると、読み取りが高速化され、リソース使用量が削減されますが、データが重複する可能性があります。ダウンストリームシンクがべき等である場合に有効にします。
  'scan.incremental.snapshot.backfill.skip' = 'false',
  -- 本番環境では、これを 'filtered' または 'disabled' に設定し、Flink を介さずにパブリケーションを手動で管理します。
  'debezium-publication.autocreate.mode' = 'disabled'
  -- 複数のソースがある場合は、ソースごとに異なるパブリケーションを構成します。
  --'debezium.publication.name' = 'my_flink_pub'
);

WITH パラメーター

パラメーター

説明

データ型

必須

デフォルト値

備考

connector

コネクタのタイプ。

STRING

はい

なし

値は postgres-cdc である必要があります。

hostname

PostgreSQL データベースの IP アドレスまたはホスト名。

STRING

はい

なし

なし。

username

PostgreSQL データベースサービスのユーザー名。

STRING

はい

なし

なし。

password

PostgreSQL データベースサービスのパスワード。

STRING

はい

なし

なし。

database-name

データベース名。

STRING

はい

なし

データベース名。

schema-name

PostgreSQL のスキーマ名。

STRING

はい

なし

スキーマ名は正規表現をサポートしており、複数のスキーマからデータを読み取ることができます。

table-name

PostgreSQL のテーブル名。

STRING

はい

なし

テーブル名は正規表現をサポートしており、複数のテーブルからデータを読み取ることができます。

port

PostgreSQL データベースサービスのポート番号。

INTEGER

いいえ

5432

なし。

decoding.plugin.name

PostgreSQL の論理デコーディングプラグインの名前。

STRING

いいえ

decoderbufs

これは、PostgreSQL サービスにインストールされているプラグインによって決まります。サポートされているプラグインは次のとおりです:

  • decoderbufs (デフォルト):PostgreSQL 9.6 以降でサポートされています。このプラグインをインストールする必要があります。

  • pgoutput (推奨):PostgreSQL 10 以降の公式組み込みプラグイン。

slot.name

論理デコーディングスロットの名前。

STRING

VVR 8.0.1 以降では必須。以前のバージョンではオプション。

VVR 8.0.1 以降ではなし。以前のバージョンではデフォルトは `flink` です。

PSQLException: ERROR: replication slot "debezium" is active for PID 974 エラーを避けるために、各テーブルに一意の slot.name を設定してください。詳細については、「レプリケーションスロット」をご参照ください。

debezium.*

Debezium のプロパティとパラメーター

STRING

いいえ

なし

Debezium クライアントの動作をより細かく制御します。例:'debezium.snapshot.mode' = 'never'。詳細については、「構成プロパティ」をご参照ください。

scan.incremental.snapshot.enabled

増分スナップショットを有効にするかどうかを指定します。

BOOLEAN

いいえ

false

有効値:

  • false (デフォルト):増分スナップショットを無効にします。

  • true:増分スナップショットを有効にします。

説明
  • これは実験的な機能です。このパラメーターは、リアルタイムコンピューティングエンジン V8.0.6 以降でのみサポートされています。

  • 増分スナップショットの利点、前提条件、および制限事項の詳細については、「特徴」、「前提条件」、および「注意事項」をご参照ください。

scan.startup.mode

データ消費の起動モード。

STRING

いいえ

initial

有効値:

  • initial (デフォルト):最初の起動時に完全な既存データをスキャンし、その後最新の WAL データを読み取ります。

  • latest-offset:最初の起動時に完全な既存データをスキャンしません。WAL の末尾から読み取りを開始します。つまり、コネクタの起動後に加えられた最新の変更のみを読み取ります。

  • snapshot:完全な既存データをスキャンし、完全フェーズ中に生成された新しい WAL データを読み取り、その後ジョブは停止します。

changelog-mode

ストリーム変更をエンコーディングするための変更ログモード。

String

いいえ

all

サポートされている変更ログモード:

  • ALL:INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER を含むすべてのタイプをサポートします。

  • UPSERT:INSERT、DELETE、UPDATE_AFTER を含む upsert タイプのみをサポートします。

heartbeat.interval.ms

ハートビートパケットを送信する間隔。

Duration

いいえ

30s

単位はミリ秒です。

Postgres CDC コネクタは、データベースに積極的にハートビートを送信してスロットのオフセットを進めます。テーブルの変更が頻繁でない場合、この値を設定することで WAL ログのタイムリーな回収が保証されます。

scan.incremental.snapshot.chunk.key-column

スナップショットフェーズ中にシャードを分割するためのチャンクキーとして使用する列を指定します。

STRING

いいえ

なし

デフォルトでは、プライマリキーの最初の列が選択されます。

scan.incremental.close-idle-reader.enabled

スナップショットが完了した後にアイドル状態のリーダーを閉じるかどうかを指定します。

Boolean

いいえ

false

この構成を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled を true に設定します。

scan.incremental.snapshot.backfill.skip

完全フェーズ中にログの読み取りをスキップするかどうかを指定します。

Boolean

いいえ

false

有効値:

  • true:操作はスキップされます。

    増分フェーズは、低ウォーターマークからログの読み取りを開始します。

    ダウンストリームのオペレーターまたはストレージがべき等性をサポートしている場合、完全フェーズのログ読み取りをスキップすることを推奨します。これにより WAL スロットの数が減りますが、at-least-once セマンティクスのみが提供されます。

  • false:スキップしません。

    完全フェーズ中に、低ウォーターマークと高ウォーターマークの間のログが読み取られ、整合性が確保されます。

    SQL に集約や結合などの操作が含まれている場合は、完全フェーズのログ読み取りをスキップしないでください。

型マッピング

以下の表は、PostgreSQL と Flink のフィールド型のマッピングを示しています。

PostgreSQL フィールド型

Flink フィールド型

SMALLINT

SMALLINT

INT2

SMALLSERIAL

SERIAL2

INTEGER

INT

SERIAL

BIGINT

BIGINT

BIGSERIAL

REAL

FLOAT

FLOAT4

FLOAT8

DOUBLE

DOUBLE PRECISION

NUMERIC(p, s)

DECIMAL(p, s)

DECIMAL(p, s)

BOOLEAN

BOOLEAN

DATE

DATE

TIME [(p)] [WITHOUT TIMEZONE]

TIME [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

CHAR(n)

STRING

CHARACTER(n)

VARCHAR(n)

CHARACTER VARYING(n)

TEXT

BYTEA

BYTES

CREATE TABLE source (
  id INT NOT NULL,
  name STRING,
  description STRING,
  weight DECIMAL(10,3)
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = '<host name>',
  'port' = '<port>',
  'username' = '<user name>',
  'password' = '<password>',
  'database-name' = '<database name>',
  'schema-name' = '<schema name>',
  'table-name' = '<table name>'
);

SELECT * FROM source;

データインジェスト

Realtime Compute for Apache Flink V11.4 以降、データ取り込み YAML ジョブで PostgreSQL コネクタをデータソースとして使用できます。

構文

source:
  type: postgres
  name: PostgreSQL Source
  hostname: localhost
  port: 5432
  username: pg_username
  password: pg_password
  tables: db.scm.tbl
  slot.name: test_slot
  scan.startup.mode: initial
  server-time-zone: UTC
  connect.timeout: 120s
  decoding.plugin.name: decoderbufs

sink:
  type: ...

パラメーター

パラメーター

説明

必須

データ型

デフォルト

備考

type

データソースのタイプ。

はい

STRING

なし

値は `postgres` である必要があります。

name

データソース名。

いいえ

STRING

なし

なし。

hostname

PostgreSQL データベースサーバーのドメイン名または IP アドレス。

はい

STRING

(なし)

なし。

port

PostgreSQL データベースサーバーが公開するポート。

いいえ

INTEGER

5432

なし。

username

PostgreSQL のユーザー名。

はい

STRING

(なし)

なし。

password

PostgreSQL のパスワード。

はい

STRING

(なし)

なし。

tables

キャプチャする PostgreSQL データベーステーブルの名前。

正規表現をサポートしており、式に一致する複数のテーブルを監視できます。

はい

文字列

(なし)

重要

現在、同じデータベース内のテーブルのみをキャプチャできます。

ピリオド (.) は、データベース、スキーマ、およびテーブル名の区切り文字として扱われます。正規表現でピリオド (.) を任意の文字に一致させるために使用するには、バックスラッシュでエスケープする必要があります。例:bdb.schema_\.*.order_\.*

slot.name

PostgreSQL レプリケーションスロットの名前。

はい

STRING

(なし)

名前は PostgreSQL レプリケーションスロットの命名規則に準拠し、小文字、数字、およびアンダースコアを含めることができます。

decoding.plugin.name

サーバーにインストールされている PostgreSQL 論理デコーディングプラグインの名前。

いいえ

STRING

pgoutput

オプションの値には decoderbufspgoutput が含まれます。

tables.exclude

除外する PostgreSQL データベーステーブルの名前。このパラメーターは `tables` パラメーターの後に有効になります。

いいえ

STRING

(なし)

テーブル名も正規表現をサポートしており、式に一致する複数のテーブルを除外できます。使用方法は `tables` パラメーターと同じです。

server-time-zone

データベースサーバーのセッションタイムゾーン (例:「Asia/Shanghai」)。

いいえ

STRING

(なし)

設定されていない場合、システムのデフォルトタイムゾーン (ZoneId.systemDefault()) が使用されます。

scan.incremental.snapshot.chunk.size

増分スナップショットフレームワークにおける各チャンクのサイズ (行数)。

いいえ

INTEGER

8096

増分スナップショット読み取りが有効な場合、テーブルは複数のチャンクに分割されて読み取られます。チャンクのデータは、完全に読み取られるまでメモリにキャッシュされます。

チャンクあたりの行数が少ないと、テーブルのチャンク総数が多くなります。これにより、エラー回復の粒度は小さくなりますが、メモリ不足 (OOM) エラーや全体的なスループットの低下につながる可能性があります。したがって、バランスを見つけて適切なチャンクサイズを設定する必要があります。

scan.snapshot.fetch.size

テーブルの完全データを読み取る際に一度にフェッチする最大レコード数。

いいえ

INTEGER

1024

なし。

scan.startup.mode

データ消費の起動モード。

いいえ

STRING

initial

有効値:

  • initial (デフォルト):最初の起動時に完全な既存データをスキャンし、その後最新のバイナリログデータを読み取ります。

  • latest-offset:最初の起動時に完全な既存データをスキャンしません。バイナリログの末尾から読み取りを開始します。つまり、コネクタの起動後に加えられた最新の変更のみを読み取ります。

  • committed-offset:完全な既存データをスキャンしません。指定されたオフセットから増分データを消費します。

  • snapshot:完全な既存データのみを消費し、増分データは消費しません。

scan.incremental.close-idle-reader.enabled

スナップショットが完了した後にアイドル状態のリーダーを閉じるかどうかを指定します。

いいえ

BOOLEAN

false

この構成を有効にするには、`execution.checkpointing.checkpoints-after-tasks-finish.enabled` を `true` に設定します。

scan.lsn-commit.checkpoints-num-delay

LSN オフセットのコミットを開始する前に遅延させるチェックポイントの数。

いいえ

INTEGER

3

チェックポイントの LSN オフセットは、状態から回復できなくなるのを防ぐために、ローリング方式でコミットされます。

connect.timeout

コネクタが PostgreSQL データベースサーバーへの接続を試行してからタイムアウトするまでの最大待機時間。

いいえ

DURATION

30s

この値は 250 ミリ秒未満にすることはできません。

connect.max-retries

コネクタが PostgreSQL データベースサーバーへの接続を確立しようとする最大回数。

いいえ

INTEGER

3

なし。

connection.pool.size

接続プールのサイズ。

いいえ

INTEGER

20

なし。

jdbc.properties.*

ユーザーがカスタム JDBC URL プロパティを渡すことを許可します。

いいえ

STRING

20

ユーザーは、'jdbc.properties.useSSL' = 'false' のようなカスタムプロパティを渡すことができます。

heartbeat.interval

最新の利用可能な WAL ログオフセットを追跡するためにハートビートイベントを送信する間隔。

いいえ

DURATION

30s

なし。

debezium.*

PostgreSQL サーバーからのデータ変更をキャプチャするために使用される Debezium Embedded Engine に Debezium プロパティを渡します。

いいえ

STRING

(なし)

Debezium PostgreSQL コネクタのプロパティの詳細については、「関連ドキュメント」をご参照ください。

chunk-meta.group.size

チャンクメタデータのサイズ。

いいえ

STRING

1000

メタデータがこの値より大きい場合、複数の部分に分けて渡されます。

metadata.list

ダウンストリームに渡される読み取り可能なメタデータのリスト。変換モジュールで使用できます。

いいえ

STRING

false

区切り文字としてカンマ (,) を使用します。現在、利用可能なメタデータは op_ts です。

scan.incremental.snapshot.unbounded-chunk-first.enabled

スナップショット読み取りフェーズ中に、境界のないチャンクを最初にディスパッチするかどうかを指定します。

いいえ

STRING

false

有効値:

  • true:スナップショット読み取りフェーズ中に、境界のないチャンクが最初にディスパッチされます。

  • false (デフォルト):スナップショット読み取りフェーズ中に、境界のないチャンクは最初にディスパッチされません。

重要

これは実験的な機能です。有効にすると、スナップショットフェーズ中に TaskManager が最後のチャンクを同期する際のメモリ不足 (OOM) エラーのリスクを軽減できます。ジョブの最初の起動前に追加することを推奨します。

関連ドキュメント

  • Realtime Compute for Apache Flink でサポートされているコネクタのリストについては、「サポートされているコネクタ」をご参照ください。

  • PolarDB for PostgreSQL (Oracle Compatible) の結果テーブルにデータを書き込むには、「PolarDB for PostgreSQL (Oracle Compatible)」をご参照ください。

  • RDS for MySQL、PolarDB for MySQL、または自己管理 MySQL データベースからの読み取りまたは書き込みを行うには、MySQL コネクタを使用します。