このトピックでは、PolarDB-X コネクタの使用方法について説明します。
背景情報
PolarDB for Xscale (PolarDB-X) は、Alibaba Cloud が提供するパフォーマンス専有型のクラウドネイティブ分散データベースサービスです。高スループット、大容量ストレージ、低レイテンシー、容易なスケーラビリティ、高可用性を提供します。
このコネクタは、Ververica Runtime (VVR) 11.5 以降をサポートしており、PolarDB-X 2.0 以降と併用する必要があります。
PolarDB-X CDC コネクタは、ソーステーブルとしてのみ使用できます。PolarDB-X インスタンスのディメンションテーブルをクエリしたり、結果テーブルに書き込んだりするには、MySQL コネクタ (パブリックプレビュー) を使用してください。
カテゴリ | 詳細 |
サポートタイプ | ソーステーブル |
ランタイムモード | ストリーミングモードのみ |
データフォーマット | 該当なし |
特定のモニタリングメトリック |
|
API タイプ | SQL |
結果テーブルへの更新または削除のサポート | いいえ |
機能
PolarDB-X CDC コネクタは、サーバーサイドでのフィルタリングと不要なバイナリログの削減をサポートすることで、バイナリログ解析フェーズのパフォーマンスを最適化します。これにより、スループットが向上し、ネットワーク帯域幅が節約されます。
オンデマンドのバイナリログサブスクリプションの例
このバージョンでは、バイナリログのサーバーサイドフィルタリングをサポートしており、必要な変更ログのみをクライアントに送信します。これにより、ネットワークトラフィックが削減され、ログ消費のスループットが向上します。
例えば、PolarDB-X サーバー上の db.table1 と db.table2 テーブルの変更データのみをサブスクライブするには、Flink SQL ジョブを次のように構成します。
CREATE TABLE polardbx_table_foo (
... -- ここでテーブルスキーマを定義
) WITH (
'connector' = 'polardbx-cdc',
'database-name' = 'db',
'table-name' = '.*',
..., -- その他のパラメーター
'polardbx.binlog.include.tables' = 'db.table1,db.table2' -- 対応するテーブルのデータのみをサブスクライブ
);インスタンス全体のすべての変更ログをロードしてクライアントサイドでフィルタリングする MySQL CDC コネクタとは異なり、PolarDB-X CDC コネクタはサーバー上でバイナリログをフィルタリングできます。これにより、クライアントは必要に応じてバイナリログをサブスクライブでき、ネットワーク I/O オーバーヘッドを大幅に削減できます。
制限事項
サーバーサイドでのバイナリログフィルタリングと特定テーブルへのサブスクリプションには、PolarDB-X サーバーバージョン 2.5.0 以降、および Simple Log Service コンポーネントバージョン 5.4.20 以降が必要です。
SQL
構文
CREATE TABLE polardbx_customer_table(
`id` STRING,
[columnName dataType,]*
PRIMARY KEY(`id`) NOT ENFORCED
) WITH (
'connector' = 'polardbx-cdc',
'hosts' = 'pxc-**************-pub.polarx.rds.aliyuncs.com',
'username' = 'pdx_user',
'password' = 'pdx_password',
'database' = 'full_db',
'collection' = 'customers'
)WITH パラメーター
パラメーター | 説明 | データ型 | 必須 | デフォルト値 | 注意 |
connector | コネクタの名前。 | STRING | はい | なし | 値は polardbx-cdc である必要があります。 |
hostname | PolarDB-X データベースの IP アドレスまたはホスト名。 | STRING | はい | なし | インスタンスのクラスターエンドポイントを指定します。 |
port | PolarDB-X データベースのサービスポート番号。 | INTEGER | いいえ | 3306 | なし。 |
username | PolarDB-X データベースサービス用のユーザー名。 | STRING | はい | なし | なし。 |
password | PolarDB-X データベースサービス用のパスワード。 | STRING | はい | なし | なし。 |
database-name | PolarDB-X データベースの名前。 | STRING | はい | なし | 正規表現を使用して、複数のデータベースからデータを読み取ることができます。 説明 正規表現を使用する場合、文字列の先頭と末尾を照合するために ^ および $ 記号を使用しないでください。 |
table-name | PolarDB-X テーブルの名前。 | STRING | はい | なし | 正規表現を使用して、複数のテーブルからデータを読み取ることができます。 説明 正規表現を使用する場合、文字列の先頭と末尾を照合するために ^ および $ 記号を使用しないでください。 |
server-time-zone | データベースが使用するセッションタイムゾーン。 | STRING | いいえ | ジョブが実行されるゾーンのタイムゾーン。 | Asia/Shanghai などの IANA タイムゾーン識別子を指定します。このパラメーターは、ソーステーブルの TIMESTAMP 型が STRING 型にどのように変換されるかを制御します。 |
scan.incremental.snapshot.chunk.size | 増分スナップショットからデータを読み取る際の各チャンクのサイズ (行数)。 | INTEGER | いいえ | 8096 | PolarDB-X はテーブルを複数のチャンクに分割して読み取り、チャンクデータをメモリにキャッシュします。チャンクあたりの行数を減らすと、チャンクの総数が増加します。これにより、よりきめ細かい障害回復が可能になりますが、Out-of-Memory (OOM) エラーのリスクが高まり、スループットが低下します。パフォーマンスのバランスを取るために、適切なチャンクサイズを構成してください。 |
scan.snapshot.fetch.size | テーブルから完全データを読み取る際に一度にプルするレコードの最大数。 | INTEGER | いいえ | 1024 | なし。 |
connect.timeout | PolarDB-X データベースサーバーへの接続がタイムアウトした後、接続をリトライするまでの最大待機時間。 | DURATION | いいえ | 30s | なし。 |
connection.pool.size | データベース接続プールのサイズ。 | INTEGER | いいえ | 20 | データベース接続プールは接続を再利用して、データベース接続の数を減らします。 |
connect.max-retries | MySQL データベースサービスへの接続に失敗した後の最大リトライ回数。 | INTEGER | いいえ | 3 | なし。 |
scan.startup.mode | データ消費の起動モード。 | STRING | いいえ | initial | 有効な値:
重要 earliest-offset、specific-offset、および timestamp 起動モードでは、起動時のテーブルスキーマが指定されたオフセットのスキーマと一致する必要があります。スキーマの不一致はジョブの失敗を引き起こします。指定されたバイナリログオフセットとジョブの起動の間にテーブルスキーマが変更されないことを確認してください。 |
scan.startup.specific-offset.file | 特定オフセットモードを使用する場合の開始オフセットのバイナリログファイル名。 | STRING | いいえ | なし | このパラメーターを使用する場合、scan.startup.mode を specific-offset に設定する必要があります。ファイル名の形式例: |
scan.startup.specific-offset.pos | 特定オフセットモードを使用する場合の指定されたバイナリログファイル内の開始オフセットの位置。 | INTEGER | いいえ | なし | このパラメーターを使用する場合、scan.startup.mode を specific-offset に設定する必要があります。 |
scan.startup.specific-offset.gtid-set | 特定オフセットモードを使用する場合の開始オフセットの GTID セット。 | STRING | いいえ | なし | このパラメーターを使用する場合、scan.startup.mode を specific-offset に設定する必要があります。GTID セットの形式例: |
scan.startup.timestamp-millis | 特定時間モードを使用する場合の開始オフセットのタイムスタンプ (ミリ秒)。 | LONG | いいえ | なし | このパラメーターを使用する場合、scan.startup.mode を timestamp に設定する必要があります。タイムスタンプはミリ秒単位です。 |
scan.startup.specific-offset.skip-events | 指定されたオフセットから読み取る際にスキップするバイナリログイベントの数。 | INTEGER | いいえ | なし | このパラメーターを使用する場合、scan.startup.mode を specific-offset に設定する必要があります。 |
scan.startup.specific-offset.skip-rows | 指定されたオフセットから読み取る際にスキップする行の変更数。単一のバイナリログイベントは、複数の行の変更に対応する場合があります。 | INTEGER | いいえ | なし | このパラメーターを使用する場合、scan.startup.mode を specific-offset に設定する必要があります。 |
heartbeat.interval | ソースがハートビートイベントを使用してバイナリログオフセットを進める間隔。 | DURATION | いいえ | なし | ハートビートイベントは、ソース側でバイナリログオフセットを強制的に進めます。このメカニズムは、更新頻度が低いことによるバイナリログの期限切れを防ぎます。期限切れのバイナリログはジョブの失敗を引き起こし、ステートレス再起動によってのみ回復できます。 |
chunk-meta.group.size | チャンクメタデータのサイズ。 | INTEGER | いいえ | 1000 | メタデータがこの値より大きい場合、複数の部分に分割されて転送されます。 |
chunk-key.even-distribution.factor.upper-bound | 均等シャーディングのためのチャンク分散係数の上限。 | DOUBLE | いいえ | 1000.0 | 分散係数がこの値より大きい場合、不均等シャーディングが使用されます。 チャンク分散係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 総行数。 |
chunk-key.even-distribution.factor.lower-bound | 均等シャーディングのためのチャンク分散係数の下限。 | DOUBLE | いいえ | 0.05 | 分散係数がこの値より小さい場合、不均等シャーディングが使用されます。 チャンク分散係数 = (MAX(chunk-key) - MIN(chunk-key) + 1) / 総行数。 |
scan.newly-added-table.enabled | ジョブがチェックポイントから再起動する際に、新しく追加されたキャプチャ対象テーブルをスキャンするかどうかを指定します。 | BOOLEAN | いいえ | false | 有効にすると、システムは以前は一致しなかった新しく追加されたテーブルを同期し、一致しなくなったテーブルを状態から削除します。これは、チェックポイントまたはセーブポイントから再起動するときに有効になります。 |
scan.incremental.snapshot.chunk.key-column | スナップショットフェーズでデータシャーディングに使用される列を指定します。 | STRING | 「注意」をご参照ください | なし |
|
scan.incremental.close-idle-reader.enabled | スナップショットフェーズが終了した後、アイドル状態のリーダーをシャットダウンするかどうかを指定します。 | BOOLEAN | いいえ | false | この構成を有効にするには、execution.checkpointing.checkpoints-after-tasks-finish.enabled も true に設定する必要があります。 |
scan.incremental.snapshot.backfill.skip | スナップショット読み取りフェーズでバックフィルをスキップするかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
バックフィルをスキップすると、スナップショットフェーズ中のテーブルへの変更は、スナップショットにマージされる代わりに、後の増分フェーズで読み取られます。 重要 バックフィルをスキップすると、スナップショットフェーズ中に発生した変更が再実行される可能性があるため、データの一貫性が損なわれる可能性があります。at-least-once セマンティクスのみが保証されます。 |
scan.parse.online.schema.changes.enabled | 増分フェーズで、RDS のロックレス変更 DDL イベントの解析を試みるかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
これは実験的な機能です。オンラインのロックレス変更を実行する前に、回復を容易にするために Flink ジョブのセーブポイントを作成してください。 |
scan.only.deserialize.captured.tables.changelog.enabled | 増分フェーズで、指定されたテーブルの変更イベントのみを逆シリアル化するかどうかを指定します。 | BOOLEAN | いいえ | true | 有効な値:
|
scan.read-changelog-as-append-only.enabled | 変更ログストリームを追加専用ストリームに変換するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
scan.parallel-deserialize-changelog.enabled | 増分フェーズで、複数スレッドを使用して変更イベントを解析するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
|
scan.parallel-deserialize-changelog.handler.size | 複数スレッドを使用して変更イベントを解析する場合のイベントハンドラの数。 | INTEGER | いいえ | 2 | なし。 |
scan.incremental.snapshot.unbounded-chunk-first.enabled | スナップショット読み取りフェーズで無制限チャンクを最初に分散するかどうかを指定します。 | BOOLEAN | いいえ | false | 有効な値:
これは実験的な機能です。有効にすると、TaskManager がスナップショットフェーズ中に最後のチャンクを同期する際の OOM エラーのリスクを軽減できます。ジョブが初めて起動する前にこの構成を追加してください。 |
polardbx.binlog.ignore.archive-events.enabled | PolarDB-X バイナリログのアーカイブイベント (主に `DELETE` イベント) を無視するかどうかを指定します。 | BOOLEAN | いいえ | false | |
polardbx.binlog.ignore.query-events.enabled | PolarDB-X バイナリログのクエリイベントを無視するかどうかを指定します。 | BOOLEAN | いいえ | false | |
polardbx.binlog.include.tables | これらのテーブルのバイナリログのみをサブスクライブします。複数のテーブル名はカンマ (,) で区切ります。 | STRING | いいえ | なし | |
polardbx.binlog.exclude.tables | これらのテーブルのバイナリログをサブスクライブしません。複数のテーブル名はカンマ (,) で区切ります。 | STRING | いいえ | なし |
型のマッピング
PolarDB-X データ型 | Flink データ型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
TINYINT UNSIGNED | |
TINYINT UNSIGNED ZEROFILL | |
INT | INT |
MEDIUMINT | |
SMALLINT UNSIGNED | |
SMALLINT UNSIGNED ZEROFILL | |
BIGINT | BIGINT |
INT UNSIGNED | |
INT UNSIGNED ZEROFILL | |
MEDIUMINT UNSIGNED | |
MEDIUMINT UNSIGNED ZEROFILL | |
BIGINT UNSIGNED | DECIMAL(20, 0) |
BIGINT UNSIGNED ZEROFILL | |
SERIAL | |
FLOAT [UNSIGNED] [ZEROFILL] | FLOAT |
DOUBLE [UNSIGNED] [ZEROFILL] | DOUBLE |
DOUBLE PRECISION [UNSIGNED] [ZEROFILL] | |
REAL [UNSIGNED] [ZEROFILL] | |
NUMERIC(p, s) [UNSIGNED] [ZEROFILL] | DECIMAL(p, s) |
DECIMAL(p, s) [UNSIGNED] [ZEROFILL] | |
BOOLEAN | BOOLEAN |
TINYINT(1) | |
DATE | DATE |
TIME [(p)] | TIME [(p)] [WITHOUT TIME ZONE] |
DATETIME [(p)] | TIMESTAMP [(p)] [WITHOUT TIME ZONE] |
TIMESTAMP [(p)] | TIMESTAMP [(p)] |
TIMESTAMP [(p)] WITH LOCAL TIME ZONE | |
CHAR(n) | STRING |
VARCHAR(n) | |
TEXT | |
BINARY | BYTES |
VARBINARY | |
BLOB |