PolarDB for PostgreSQL は、論理ログファイルを JSON フォーマットで出力する wal2json プラグインを提供します。
適用範囲
PolarDB for PostgreSQL でサポートされているマイナーエンジンバージョンは次のとおりです。
PostgreSQL 18 (マイナーエンジンバージョン 2.0.18.1.1.0 以降)
PostgreSQL 17 (マイナーエンジンバージョン 2.0.17.7.5.0 以降)
PostgreSQL 16 (マイナーエンジンバージョン 2.0.16.6.2.0 以降)
PostgreSQL 15 (マイナーエンジンバージョン 2.0.15.12.4.0 以降)
PostgreSQL 14 (マイナーエンジンバージョン 2.0.14.5.1.0 以降)
PostgreSQL 11 (マイナーエンジンバージョン 2.0.11.9.29.0 以降)
マイナーエンジンバージョンは、コンソールで表示するか、SHOW polardb_version; 文を実行して確認できます。ご利用のバージョンがサポートされていない場合は、マイナーエンジンバージョンをアップグレードする必要があります。
背景情報
wal2json は、以下の特徴を持つ論理デコーディングプラグインです。
INSERTおよびUPDATE操作によって生成されたタプルをデコードします。設定されたレプリカアイデンティティに基づいて、
UPDATEおよびDELETE操作から以前の行バージョンを取得できます。ストリーミングプロトコル (論理レプリケーションスロット) または特別な SQL API のいずれかを使用して変更をコンシュームできます。
wal2json プラグインは、トランザクションごとに 1 つの JSON オブジェクトを生成します。この JSON オブジェクトには、すべての新しいタプルと古いタプルが含まれます。オプションを使用して、トランザクションのタイムスタンプ、スキーマ、データ型、トランザクション ID などのプロパティを含めることができます。詳細については、「SQL を使用した JSON オブジェクトの取得」をご参照ください。
注意事項
PolarDB for PostgreSQL は
REPLICA_IDENTITY_FULLを使用するため、UPDATE および DELETE 操作では、変更された列のデータだけでなく、行全体のデータがログに記録されます。変更された列のデータのみをログに記録するには、polar_create_table_with_full_replica_identityパラメーターを無効にします。このパラメーターはコンソールでは変更できません。弊社までお問い合わせください。wal2json プラグインには論理デコーディングが必要です。この機能を有効にするには、
wal_levelパラメーターをlogicalに設定します。説明コンソールで `wal_level` パラメーターを設定できます。詳細については、「クラスターパラメーターの設定」をご参照ください。この変更にはクラスターの再起動が必要です。それに応じてワークロードを計画してください。
SQL を使用した JSON オブジェクトの取得
wal2json プラグインは CREATE EXTENSION を使用して作成しません。代わりに、論理レプリケーションスロットを介してロードできます。
wal2json プラグインを使用して論理レプリケーションスロットを作成した後、次のコマンドを実行して WAL から JSON オブジェクトを取得します。
-- プライマリキーがあるテーブルとないテーブルを作成 CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c)); CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT); -- wal2json タイプの論理レプリケーションスロットを作成 SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json'); -- トランザクションをコミットして WAL に書き込む BEGIN; INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now()); INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now()); INSERT INTO table2_with_pk (b, c) VALUES('Replication', now()); DELETE FROM table2_with_pk WHERE a < 3; INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir'); UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir'; COMMIT; -- WAL から JSON オブジェクトを取得 SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');出力は次のとおりです。
{ "change": [ { "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"] } ,{ "kind": "insert", "schema": "public", "table": "table2_with_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "character varying(30)", "timestamp without time zone"], "columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"] } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [1, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "delete", "schema": "public", "table": "table2_with_pk", "oldkeys": { "keynames": ["a", "c"], "keytypes": ["integer", "timestamp without time zone"], "keyvalues": [2, "2018-03-27 12:05:29.914496"] } } ,{ "kind": "insert", "schema": "public", "table": "table2_without_pk", "columnnames": ["a", "b", "c"], "columntypes": ["integer", "numeric(5,2)", "text"], "columnvalues": [1, 2.34, "Tapir"] } ] }次のコマンドを実行して、
test_slotレプリケーションスロットを削除します。コマンドは文字列'stop'を返します。SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
パラメーターの説明
次の表に、wal2json パラメーターを示します。
パラメーター | 説明 |
change | INSERT、UPDATE、DELETE、TRUNCATE などの各 DML 操作の WAL エントリ。 |
changeset | 複数の変更エントリのコレクション。 |
include-xids | 各チェンジセットに xid を追加するかどうかを制御します。デフォルト:false。
|
include-timestamp | 各チェンジセットにタイムスタンプを追加するかどうかを制御します。デフォルト:false。
|
include-schemas | 各変更にスキーマ名を追加するかどうかを制御します。デフォルト:true。
|
include-types | 各変更にデータ型を追加するかどうかを制御します。デフォルト:true。
|
include-typmod | 修飾子を持つ型に修飾子を追加します (例:varchar ではなく varchar(20))。デフォルト:true。
|
include-type-oids | 型 OID を追加するかどうかを制御します。デフォルト:false。
|
include-not-null |
|
pretty-print | JSON 構造をフォーマットするために空白とインデントを追加するかどうかを制御します。デフォルト:false。
|
write-in-chunks | 各チェンジセットの後ではなく、各変更の後に書き込むかどうかを制御します。デフォルト:false。
|
include-lsn | 各チェンジセットに nextlsn を追加するかどうかを制御します。デフォルト:false。
|
filter-tables | 指定されたテーブルを除外します。デフォルト:空 (フィルターされるテーブルなし)。 説明
|
add-tables | デコードするテーブルを指定します。デフォルトでは、すべてのスキーマのすべてのテーブルがデコードされます。使用方法は filter-tables と同じです。 |
filter-msg-prefixes | 指定されたプレフィックスを持つ行を除外します。通常、 |
add-msg-prefixes | 指定されたプレフィックスを持つ行を含めます。通常、 |
format-version | 出力フォーマットのバージョンを定義します。デフォルト:1。
|
actions | 出力する操作を定義します。デフォルト:すべて (INSERT、UPDATE、DELETE、TRUNCATE)。 |
例
次の例は、include-xids パラメーターの使用方法を示しています。
テーブルと論理レプリケーションスロットを作成し、行を挿入します。
DROP TABLE IF EXISTS tbl; CREATE TABLE tbl (id int); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); INSERT INTO tbl VALUES (1);パラメーター名と値を関数に渡します。
SELECT count(*) = 1, count(distinct ((data::json)->'xid')::text) = 1 FROM pg_logical_slot_get_changes( 'regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '1');
設計原則
詳細および設計原則については、公式ドキュメントをご参照ください。