PolarDB-X コネクタの使用方法について説明します。
背景情報
PolarDB for Xscale (PolarDB-X) は、Alibaba Cloud のパフォーマンス専有型クラウドネイティブ分散データベースであり、高スループット、大容量ストレージ、低レイテンシー、スケーラビリティ、超高可用性を提供します。
このコネクタには、VVR 11.5 以降および PolarDB-X 2.0 以降が必要です。
PolarDB-X CDC コネクタはソーステーブルのみをサポートします。PolarDB-X インスタンスをディメンションテーブルとしてクエリしたり、結果テーブルとして書き込んだりするには、MySQL コネクタ (パブリックプレビュー) を使用してください。
|
カテゴリ |
説明 |
|
サポートタイプ |
ソーステーブル、データインジェストソース |
|
実行モード |
ストリーミングモードのみ |
|
データフォーマット |
N/A |
|
特定の監視メトリクス |
|
|
API タイプ |
SQL、データインジェスト YAML ジョブ |
|
結果テーブルのデータ更新または削除のサポート |
いいえ |
主な特徴
PolarDB-X CDC コネクタは、サーバーサイドフィルタリングによって Binlog 解析を最適化します。PolarDB-X サーバー上で無関係な Binlog データをトリミングすることで、スループットを向上させ、ネットワーク帯域幅を節約します。
オンデマンド Binlog サブスクリプション
サーバーサイドの Binlog フィルタリングにより、コネクタは必要な変更ログのみをクライアントに送信します。これにより、ネットワークトラフィックが削減され、データ消費のスループットが向上します。
例えば、ご利用の PolarDB-X サーバー内の db.table1 テーブルと db.table2 テーブルからの変更のみをサブスクライブするには、Flink SQL ジョブを次のように設定します:
CREATE TABLE polardbx_table_foo (
... -- ここでテーブルスキーマを定義します
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- その他のパラメーター
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 指定されたテーブルからの変更のみをキャプチャします
);
インスタンス全体の Binlog をロードしてクライアント側でフィルタリングする MySQL CDC コネクタとは異なり、PolarDB-X CDC コネクタはサーバーサイドでフィルタリングを実行します。この機能により、クライアントはオンデマンドでサブスクリプションが可能になり、ネットワーク I/O のオーバーヘッドが大幅に削減されます。
制限事項
サーバーサイドの Binlog フィルタリングとテーブルレベルのサブスクリプションには、PolarDB-X サーバーバージョン 2.5.0 以降および Log Service コンポーネントバージョン 5.4.20 以降が必要です。
SQL
構文
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)
WITH パラメーター
|
パラメーター |
説明 |
型 |
必須 |
デフォルト |
備考 |
|
connector |
コネクタの名前。 |
STRING |
はい |
なし |
値は |
|
hostname |
PolarDB-X データベースの IP アドレスまたはホスト名。 |
STRING |
はい |
なし |
インスタンスの接続情報からクラスターアドレスを使用します。 |
|
port |
PolarDB-X データベースのポート番号。 |
INTEGER |
いいえ |
3306 |
なし |
|
username |
PolarDB-X データベースのユーザー名。 |
STRING |
はい |
なし |
なし |
|
password |
PolarDB-X データベースのパスワード。 |
STRING |
はい |
なし |
なし |
|
database-name |
PolarDB-X データベースの名前。 |
STRING |
はい |
なし |
正規表現を使用して、複数のデータベースからデータを読み取ることができます。 説明
正規表現を使用する場合、文字列の先頭と末尾に一致させるために ^ および $ 文字を使用しないでください。 |
|
table-name |
PolarDB-X テーブルの名前。 |
STRING |
はい |
なし |
正規表現を使用して、複数のテーブルからデータを読み取ることができます。 説明
正規表現を使用する場合、文字列の先頭と末尾に一致させるために ^ および $ 文字を使用しないでください。 |
|
server-time-zone |
データベース接続のセッションタイムゾーン。 |
STRING |
いいえ |
ジョブの実行環境のタイムゾーン。 |
|
|
scan.incremental.snapshot.chunk.size |
増分スナップショット読み取りにおける各チャンクの行数。 |
INTEGER |
いいえ |
8096 |
スナップショットフェーズ中、コネクタはテーブルをチャンクに分割し、メモリにキャッシュします。チャンクサイズを小さくすると、チャンクの総数が増加します。これにより、フォールトトレランスの粒度は向上しますが、メモリ不足 (OOM) エラーのリスクが高まり、スループットが低下する可能性もあります。パフォーマンス、フォールトトレランス、メモリ消費量のバランスを取るために、適切な値を設定してください。 |
|
scan.snapshot.fetch.size |
テーブルのスナップショットを読み取る際に一度にフェッチする最大行数。 |
INTEGER |
いいえ |
1024 |
なし |
|
connect.timeout |
PolarDB-X データベースへの接続を確立するためのタイムアウト。 |
DURATION |
いいえ |
30s |
なし |
|
connection.pool.size |
データベース接続プールのサイズ。 |
INTEGER |
いいえ |
20 |
データベース接続プールは接続を再利用して、接続のオーバーヘッドを削減します。 |
|
connect.max-retries |
PolarDB-X データベースへの接続試行が失敗した後の最大リトライ回数。 |
INTEGER |
いいえ |
3 |
なし |
|
scan.startup.mode |
データ消費の起動モード。 |
STRING |
いいえ |
initial |
有効な値:
重要
earliest-offset、specific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定されたオフセットのスキーマと一致しない場合、ジョブは失敗します。指定された Binlog オフセットとジョブの起動の間にテーブルスキーマが変更されないようにしてください。 |
|
scan.startup.specific-offset.file |
|
STRING |
いいえ |
なし |
このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。例: |
|
scan.startup.specific-offset.pos |
|
INTEGER |
いいえ |
なし |
このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。 |
|
scan.startup.specific-offset.gtid-set |
|
STRING |
いいえ |
なし |
このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。例: |
|
scan.startup.timestamp-millis |
|
LONG |
いいえ |
なし |
このパラメーターは、scan.startup.mode が timestamp に設定されている場合にのみ使用されます。単位はミリ秒です。 |
|
scan.startup.specific-offset.skip-events |
特定のオフセットから読み取る際にスキップする Binlog イベントの数。 |
INTEGER |
いいえ |
なし |
このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。 |
|
scan.startup.specific-offset.skip-rows |
特定のオフセットから読み取る際にスキップする行変更の数。単一の Binlog イベントには複数の行変更が含まれる場合があります。 |
INTEGER |
いいえ |
なし |
このパラメーターは、scan.startup.mode が specific-offset に設定されている場合にのみ使用されます。 |
|
heartbeat.interval |
ソースが Binlog オフセットを進めるためにハートビートイベントを送信する間隔。 |
DURATION |
いいえ |
なし |
ハートビートイベントは、ソースの Binlog オフセットを進めます。これにより、アイドル状態のソースで Binlog がパージされるのを防ぎます。Binlog がパージされると、ジョブは失敗し、回復にはステートレス再起動が必要になります。 |
|
chunk-meta.group.size |
チャンクメタデータのサイズ。 |
INTEGER |
いいえ |
1000 |
メタデータサイズがこの値を超えると、送信のために複数の部分に分割されます。 |
|
chunk-key.even-distribution.factor.upper-bound |
均等パーティショニングのためのチャンク分布係数の上限。 |
DOUBLE |
いいえ |
1000.0 |
分布係数がこの値より大きい場合、不均等チャンキングが使用されます。 チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / 総行数。 |
|
chunk-key.even-distribution.factor.lower-bound |
均等パーティショニングのためのチャンク分布係数の下限。 |
DOUBLE |
いいえ |
0.05 |
分布係数がこの値より小さい場合、不均等チャンキングが使用されます。 チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / 総行数。 |
|
scan.newly-added-table.enabled |
チェックポイントから再起動する際に、キャプチャ基準に一致する新しく追加されたテーブルをスキャンするかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効にすると、チェックポイントまたはセーブポイントから再起動されたジョブは、「table-name」パターンに一致する新しく追加されたテーブルをスキャンして同期します。また、一致しなくなったテーブルの追跡を停止します。 |
|
scan.incremental.snapshot.chunk.key-column |
スナップショットフェーズ中のデータチャンキングに使用する列を指定します。 |
STRING |
備考を参照 |
なし |
|
|
scan.incremental.close-idle-reader.enabled |
スナップショットフェーズが完了した後にアイドルリーダーを閉じるかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
この設定を有効にするには、 |
|
scan.incremental.snapshot.backfill.skip |
スナップショット読み取りフェーズ中にバックフィルプロセスをスキップするかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効な値:
バックフィルがスキップされた場合、スナップショットフェーズ中にテーブルに加えられた変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。 重要
バックフィルをスキップすると、スナップショット中に発生した変更が増分フェーズで再実行されるため、データ不整合が発生する可能性があります。このモードは、at-least-once セマンティクスのみを保証します。 |
|
scan.parse.online.schema.changes.enabled |
増分フェーズ中に RDS のロックフリー変更に対する DDL イベントの解析を試みるかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効な値:
これは実験的な機能です。本番環境でロックフリーのスキーマ変更を実行する前に、Flink ジョブのセーブポイントを作成して、回復できるようにしてください。 |
|
scan.only.deserialize.captured.tables.changelog.enabled |
増分フェーズ中に、指定されたキャプチャ対象テーブルの変更イベントのみを逆シリアル化するかどうかを指定します。 |
BOOLEAN |
いいえ |
true |
有効な値:
|
|
scan.read-changelog-as-append-only.enabled |
変更ログストリームを追加専用ストリームに変換するかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効な値:
|
|
scan.parallel-deserialize-changelog.enabled |
増分フェーズ中に複数のスレッドを使用して変更イベントを逆シリアル化するかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効な値:
|
|
scan.parallel-deserialize-changelog.handler.size |
変更イベントを並列で逆シリアル化する際に使用するイベントハンドラの数。 |
INTEGER |
いいえ |
2 |
なし |
|
scan.incremental.snapshot.unbounded-chunk-first.enabled |
スナップショット読み取りフェーズ中に無制限チャンクを最初に配布するかどうかを指定します。 |
BOOLEAN |
いいえ |
false |
有効な値:
これは実験的な機能です。有効にすると、スナップショットの最後のチャンクを同期する際の TaskManager でのメモリ不足 (OOM) エラーのリスクを軽減できます。この設定は、ジョブの最初の起動前に設定する必要があります。 |
|
polardbx.binlog.ignore.archive-events.enabled |
PolarDB-X Binlog 内のアーカイブイベント (主に |
BOOLEAN |
いいえ |
false |
|
|
polardbx.binlog.ignore.query-events.enabled |
PolarDB-X Binlog 内の |
BOOLEAN |
いいえ |
false |
|
|
polardbx.binlog.include.tables |
指定されたテーブルからのみ Binlog イベントを読み取ります。複数のテーブル名を区切るにはコンマ ( |
STRING |
いいえ |
なし |
|
|
polardbx.binlog.exclude.tables |
指定されたテーブルからの Binlog イベントを無視します。複数のテーブル名を区切るにはコンマ ( |
STRING |
いいえ |
なし |
型マッピング
|
PolarDB-X の型 |
Flink の型 |
|
TINYINT |
TINYINT |
|
SMALLINT |
SMALLINT |
|
TINYINT UNSIGNED |
|
|
TINYINT UNSIGNED ZEROFILL |
|
|
INT |
INT |
|
MEDIUMINT |
|
|
SMALLINT UNSIGNED |
|
|
SMALLINT UNSIGNED ZEROFILL |
|
|
BIGINT |
BIGINT |
|
INT UNSIGNED |
|
|
INT UNSIGNED ZEROFILL |
|
|
MEDIUMINT UNSIGNED |
|
|
MEDIUMINT UNSIGNED ZEROFILL |
|
|
BIGINT UNSIGNED |
DECIMAL(20, 0) |
|
BIGINT UNSIGNED ZEROFILL |
|
|
SERIAL |
|
|
FLOAT [UNSIGNED] [ZEROFILL] |
FLOAT |
|
DOUBLE [UNSIGNED] [ZEROFILL] |
DOUBLE |
|
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] |
|
|
REAL [UNSIGNED] [ZEROFILL] |
|
|
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] |
DECIMAL(p, s) |
|
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] |
|
|
BOOLEAN |
BOOLEAN |
|
TINYINT(1) |
|
|
DATE |
DATE |
|
TIME [(p)] |
TIME [(p)] [WITHOUT TIME ZONE] |
|
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
|
TIMESTAMP [(p)] |
TIMESTAMP_LTZ [(p)] |
|
TIMESTAMP [(p)] WITH LOCAL TIME ZONE |
|
|
CHAR(n) |
STRING |
|
VARCHAR(n) |
|
|
TEXT |
|
|
BINARY |
BYTES |
|
VARBINARY |
|
|
BLOB |
データインジェスト
Realtime Compute for Apache Flink 11.6 は、データインジェスト YAML ジョブのデータソースとして PolarDB-X コネクタをサポートするようになりました。
構文
source:
type: polardbx
name: PolarDB-X Source
hostname: localhost
port: 3306
username: <username>
password: <password>
tables: pdb.order_table
# Binlog 内のアーカイブイベントを無視
polardbx.binlog.ignore.archive-events.enabled: true
# Binlog 内のクエリイベントを無視
polardbx.binlog.ignore.query-events.enabled: true
# 帯域幅を節約するために、pdb.order_table の Binlog のみをサブスクライブ
polardbx.binlog.include.tables: pdb.order_table
sink:
type: values
パラメーター
|
パラメーター |
説明 |
必須 |
型 |
デフォルト値 |
備考 |
|
type |
データソースのタイプ。 |
はい |
STRING |
N/A |
値は |
|
name |
データソースの名前。 |
いいえ |
STRING |
N/A |
N/A |
|
hostname |
PolarDB-X インスタンスの IP アドレスまたはホスト名。 |
はい |
STRING |
N/A |
VPC アドレスを推奨します。 説明
ご利用の PolarDB-X インスタンスと Realtime Compute for Apache Flink ワークスペースが同じ VPC にない場合は、VPC 間接続を確立するか、パブリックネットワーク経由でインスタンスにアクセスする必要があります。詳細については、「ワークスペースの管理と運用」および「フルマネージド Flink クラスターがパブリックネットワークにアクセスする方法」をご参照ください。 |
|
username |
PolarDB-X データベースに接続するためのユーザー名。 |
はい |
STRING |
N/A |
N/A |
|
password |
指定されたユーザー名のパスワード。 |
はい |
STRING |
N/A |
N/A |
|
tables |
同期する PolarDB-X テーブル。 |
はい |
STRING |
N/A |
説明
|
|
tables.exclude |
同期から除外するテーブル。 |
いいえ |
STRING |
N/A |
説明
データベース名とテーブル名はピリオド (.) で区切られます。リテラルなピリオドに一致させるには、バックスラッシュでエスケープする必要があります。例: |
|
port |
PolarDB-X インスタンスのポート。 |
いいえ |
INTEGER |
3306 |
N/A |
|
schema-change.enabled |
スキーマ変更イベントを発行するかどうかを指定します。 |
いいえ |
BOOLEAN |
true |
N/A |
|
jdbc.properties.* |
JDBC URL のカスタム接続パラメーター。 |
いいえ |
STRING |
N/A |
カスタム接続パラメーターを渡すことができます。例えば、SSL を無効にするには、'jdbc.properties.useSSL' = 'false' を設定します。 |
|
debezium.* |
Binlog を読み取るためのカスタム Debezium パラメーター。 |
いいえ |
STRING |
N/A |
カスタム Debezium パラメーターを渡すことができます。例えば、'debezium.event.deserialization.failure.handling.mode' = 'ignore' を設定して、コネクタが逆シリアル化エラーをどのように処理するかを定義します。 |
|
scan.incremental.snapshot.chunk.size |
各チャンクのサイズ (行数)。 |
いいえ |
INTEGER |
8096 |
PolarDB-X テーブルは、読み取りのために複数のチャンクに分割されます。各チャンクのデータは、完全に読み取られるまでメモリにキャッシュされます。 チャンクサイズを小さくすると、チャンクの総数が増加し、障害回復の粒度が細かくなりますが、OOM のリスクが高まり、全体のスループットが低下する可能性があります。これらのトレードオフのバランスを取り、適切なチャンクサイズを設定する必要があります。 |
|
scan.snapshot.fetch.size |
完全なテーブルスナップショット中に一度にフェッチされる最大レコード数。 |
いいえ |
INTEGER |
1024 |
N/A |
|
scan.startup.mode |
データを消費するための起動モード。 |
いいえ |
STRING |
initial |
有効な値:
重要
earliest-offset、specific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定された起動位置のテーブルスキーマと異なる場合、ジョブはエラーで失敗します。つまり、これら 3 つの起動モードを使用する場合、対応するテーブルのスキーマが、指定された Binlog 消費位置とジョブの起動時間の間に変更されないようにする必要があります。 |
|
scan.startup.specific-offset.file |
起動位置の Binlog ファイル名。 |
いいえ |
STRING |
N/A |
scan.startup.mode が specific-offset に設定されている場合に必須です。フォーマット例: |
|
scan.startup.specific-offset.pos |
指定された Binlog ファイル内で読み取りを開始するオフセット。 |
いいえ |
INTEGER |
N/A |
scan.startup.mode が specific-offset に設定されている場合に必須です。 |
|
scan.startup.specific-offset.gtid-set |
起動位置の GTID セット。 |
いいえ |
STRING |
N/A |
scan.startup.mode が specific-offset に設定されている場合に使用します。GTID セットのフォーマット例: |
|
scan.startup.timestamp-millis |
起動位置のタイムスタンプ (ミリ秒単位)。 |
いいえ |
LONG |
N/A |
scan.startup.mode が timestamp に設定されている場合に必須です。 重要
タイムスタンプを指定すると、コネクタは各 Binlog ファイルの最初のイベントを読み取ってタイムスタンプを確認し、正しい開始ファイルを見つけます。指定されたタイムスタンプの対象となる Binlog ファイルがデータベースからパージされておらず、読み取り可能であることを確認してください。 |
|
server-time-zone |
データベースが使用するセッションタイムゾーン。 |
いいえ |
STRING |
Flink ジョブの実行環境のタイムゾーン (ご利用のワークスペースのアベイラビリティゾーン)。 |
例:Asia/Shanghai。このパラメーターは、PolarDB-X の |
|
scan.startup.specific-offset.skip-events |
特定の位置から読み取る際にスキップする Binlog イベントの数。 |
いいえ |
INTEGER |
N/A |
scan.startup.mode が specific-offset に設定されている場合に使用します。 |
|
scan.startup.specific-offset.skip-rows |
特定の位置から読み取る際にスキップする行変更の数。単一の Binlog イベントには複数の行変更が含まれる場合があります。 |
いいえ |
INTEGER |
N/A |
scan.startup.mode が specific-offset に設定されている場合に使用します。 |
|
connect.timeout |
データベース接続をリトライする前に待機する最大時間。 |
いいえ |
DURATION |
30s |
N/A |
|
connect.max-retries |
失敗したデータベース接続の最大リトライ回数。 |
いいえ |
INTEGER |
3 |
N/A |
|
connection.pool.size |
データベース接続プールのサイズ。 |
いいえ |
INTEGER |
20 |
接続プールは接続を再利用して、接続のオーバーヘッドを削減します。 |
|
heartbeat.interval |
ソースが Binlog の位置を進めるためにハートビートイベントを送信する間隔。 |
いいえ |
DURATION |
30s |
ハートビートイベントは、更新頻度の低いテーブルの Binlog の位置を進めます。これがないと、Binlog の位置が進まず、Binlog の有効期限が切れる可能性があります。Binlog の有効期限が切れるとジョブは失敗し、回復にはステートレス再起動が必要になります。 |
|
scan.incremental.snapshot.chunk.key-column |
スナップショットフェーズ中にチャンクの分割キーとして使用する列を指定します。 |
いいえ |
STRING |
N/A |
プライマリキーから 1 つの列のみを選択できます。 |
|
chunk-meta.group.size |
チャンクメタデータのサイズ。 |
いいえ |
INTEGER |
1000 |
メタデータサイズがこの値を超えると、送信のために複数の部分に分割されます。 |
|
chunk-key.even-distribution.factor.lower-bound |
均等チャンキングを使用するかどうかを決定するためのチャンク分布係数の下限。 |
いいえ |
DOUBLE |
0.05 |
分布係数がこの値より小さい場合、不均等チャンキングが使用されます。 チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / データ行の総数。 |
|
chunk-key.even-distribution.factor.upper-bound |
均等チャンキングを使用するかどうかを決定するためのチャンク分布係数の上限。 |
いいえ |
DOUBLE |
1000.0 |
分布係数がこの値より大きい場合、不均等チャンキングが使用されます。 チャンク分布係数 = (MAX(チャンクキー) - MIN(チャンクキー) + 1) / データ行の総数。 |
|
scan.incremental.close-idle-reader.enabled |
スナップショットフェーズが完了した後にアイドルリーダーを閉じるかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
この設定を有効にするには、 |
|
scan.only.deserialize.captured.tables.changelog.enabled |
増分フェーズ中に、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。 |
いいえ |
BOOLEAN |
true |
有効な値:
|
|
scan.parallel-deserialize-changelog.enabled |
増分フェーズ中に複数のスレッドを使用して変更イベントを解析するかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
有効な値:
|
|
scan.parallel-deserialize-changelog.handler.size |
変更イベントを並列で解析する際に使用するイベントハンドラの数。 |
いいえ |
INTEGER |
2 |
N/A |
|
metadata-column.include-list |
下流のシンクに渡すメタデータ列。 |
いいえ |
STRING |
N/A |
利用可能なメタデータには 説明
PolarDB-X CDC YAML コネクタは、データベース名、テーブル名、または 重要
|
|
scan.newly-added-table.enabled |
チェックポイントまたはセーブポイントから再起動する際に、テーブルパターンに一致する新しく追加されたテーブルを同期し、一致しなくなったテーブルの追跡を停止するかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
この設定は、チェックポイントまたはセーブポイントから再起動する場合にのみ有効です。 |
|
scan.binlog.newly-added-table.enabled |
増分フェーズ中に、新しく一致したテーブルのデータを送信するかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
|
|
scan.incremental.snapshot.chunk.key-column |
特定のテーブルのスナップショットフェーズ中にチャンキングするための分割キー列を指定します。 |
いいえ |
STRING |
N/A |
|
|
scan.parse.online.schema.changes.enabled |
増分フェーズ中に、ApsaraDB RDS のロックフリーのスキーマ変更に対する DDL イベントを解析するかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
有効な値:
これは実験的な機能です。オンラインでロックフリーの変更を実行する前に、Flink ジョブのセーブポイントを作成して、必要に応じて復元できるようにすることを推奨します。 |
|
scan.incremental.snapshot.backfill.skip |
スナップショット読み取りフェーズ中にバックフィルをスキップするかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
有効な値:
バックフィルがスキップされた場合、スナップショットフェーズ中に発生するテーブルへの変更は、スナップショットにマージされるのではなく、後の増分フェーズで読み取られます。 重要
バックフィルをスキップすると、スナップショットフェーズからの変更が再実行される可能性があるため、データの不整合が発生する可能性があります。このモードは at-least-once セマンティクスを提供します。 |
|
treat-tinyint1-as-boolean.enabled |
|
いいえ |
BOOLEAN |
true |
有効な値:
|
|
treat-timestamp-as-datetime-enabled |
|
いいえ |
BOOLEAN |
false |
有効な値:
PolarDB-X の 有効にすると、このパラメーターは |
|
include-comments.enabled |
テーブルと列のコメントを同期するかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
有効な値:
このパラメーターを有効にすると、ジョブのメモリ使用量が増加します。 |
|
scan.incremental.snapshot.unbounded-chunk-first.enabled |
スナップショット読み取りフェーズ中に無制限チャンクを最初にディスパッチするかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
有効な値:
これは実験的な機能です。有効にすると、最後のチャンクが同期される際の TaskManager での OOM エラーのリスクを軽減できます。ジョブの最初の起動前にこのパラメーターを追加することを推奨します。 |
|
binlog.session.network.timeout |
Binlog 接続のネットワークタイムアウト。 |
いいえ |
DURATION |
10m |
|
|
scan.rate-limit.records-per-second |
ソースが 1 秒あたりに出力できる最大レコード数。 |
いいえ |
LONG |
N/A |
このパラメーターを使用して、データ読み取りのスループットを制限します。この制限は、スナップショットフェーズと増分フェーズの両方に適用されます。 ソースの スナップショット読み取りフェーズ中に、 |
|
include-binlog-meta.enable |
GTID や Binlog の位置など、生の PolarDB-X Binlog メタデータを出力レコードに含めるかどうかを指定します。 |
いいえ |
BOOLEAN |
false |
既存の Canal ベースの同期パイプラインを置き換えるなど、生の Binlog 同期シナリオに適用できます。 |
|
polardbx.binlog.ignore.archive-events.enabled |
PolarDB-X Binlog 内のアーカイブイベント (主に |
いいえ |
BOOLEAN |
false |
|
|
polardbx.binlog.ignore.query-events.enabled |
PolarDB-X Binlog 内の |
いいえ |
BOOLEAN |
false |
|
|
polardbx.binlog.include.tables |
Binlog イベントをサブスクライブするテーブルのコンマ区切りリスト。 |
いいえ |
STRING |
N/A |
説明
このパラメーターは増分 (Binlog) フェーズにのみ影響し、スナップショットフェーズには適用されません。 |
|
polardbx.binlog.exclude.tables |
Binlog イベントを無視するテーブルのコンマ区切りリスト。 |
いいえ |
STRING |
N/A |
説明
このパラメーターは増分 (Binlog) フェーズにのみ影響し、スナップショットフェーズには適用されません。 |