このトピックでは、論理ログファイルをJSON形式に変換するために使用される PolarDB for PostgreSQL (Compatible with Oracle) のwal2jsonプラグインについて説明します。
前提条件
この拡張機能は、次のエンジンを実行するPolarDB for PostgreSQL (Compatible with Oracle) クラスターでサポートされています。
PolarDB for PostgreSQL (Oracle互換) 2.0 (バージョン2.0.14.1.0以降)
次のステートメントを実行して、PolarDB for PostgreSQL (Compatible with Oracle) クラスターのリビジョンバージョンを表示できます。
SHOW polar_version;背景情報
wal2jsonは、次の機能を提供するロジックデコードプラグインです。
INSERTおよびUPDATEステートメントを実行して生成されたアクセスタプル。指定された重複IDに応じて、
UPDATEおよびDELETEの古い行バージョンにアクセスします。ストリーミングプロトコル (論理レプリケーションスロット) または特別なSQL APIを使用して変更を消費します。
wal2jsonプラグインは、トランザクションごとにJSONオブジェクトを生成します。 すべての新旧のタプルは、JSONオブジェクトで使用できます。 さらに、オプションには、トランザクションのタイムスタンプ、スキーマ修飾、データ型、トランザクションIDなどのプロパティが含まれます。 詳細については、「SQL文を実行してJSONオブジェクトを取得する」をご参照ください。
注意事項
は
REPLICA_IDENTITY_FULLのレプリケーション方法を使用するため、UPDATEとDELETEの前後の列ではなく、UPDATEとDELETEの間にデータの行全体が表示されます。 UPDATEの前後の列のみを変更するには、polar_create_table_with_full_replica_identityパラメーターをoffに設定する必要があります。 このパラメーターはコンソールで変更できません。 パラメーターを変更するには、お問い合わせください。wal2jsonプラグインには、論理的なエンコードとデコードが必要です。
wal_levelパラメーターをlogicalに変更する必要があります。説明このパラメーターはコンソールで変更できます。 詳細については、「」「クラスターパラメーターの設定」をご参照ください。 パラメーターを変更すると、クラスターが再起動します。 作業は慎重に行ってください。
SQL文を実行してJSONオブジェクトを取得する
wal2jsonプラグインを作成するためにcreate extensionステートメントを実行する必要はありません。 論理コピースロットを作成して、wal2jsonプラグインをロードできます。
wal2jsonプラグインを含む論理レプリケーションスロットを作成したら、次のステートメントを実行して、WALでJSONオブジェクトを取得します。
-- Create tables with and without primary keys.
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
-- Create a logical replication slot of the wal2json type.
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
-- Commit a transaction and write it to WAL.
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
DELETE FROM table2_with_pk WHERE a < 3;
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
-- Obtain the JSON object in WAL.
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1');サンプル結果:
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}test_slotレプリケーションスロットを削除し、文字列 'stop' を返します。
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');パラメーター
次の表に、wal2jsonに関連するパラメーターを示します。
パラメーター | 説明 |
change | INSERT、UPDATE、DELETE、およびTRUNCATEのWALレコードなど、各DMLのWALエントリ。 |
changeset | いくつかの変更のセット。 |
include-xids | 各変更セットにxidを追加するかどうかを指定します。 デフォルト値:false 有効な値:
|
include-timestamp | 各変更セットにタイムスタンプを追加するかどうかを指定します。 デフォルト値:false 有効な値:
|
include-schemas | 各変更にスキーマを追加するかどうかを指定します。 デフォルト値:true 有効な値:
|
include-types | 各変更に型を追加するかどうかを指定します。 デフォルト値:true 有効な値:
|
include-typmod | 修飾子を持つ型に修飾子を追加するかどうかを指定します (varcharの代わりにvarchar(20) など) 。 デフォルト値:true 有効な値:
|
include-type-oids | タイプoidsを追加するかどうかを指定します。 デフォルト値:false 有効な値:
|
include-not-null |
|
pretty-print | 書式設定のためにJSON構造にスペースとインデントを追加するかどうかを指定します。 デフォルト値:false 有効な値:
|
write-in-chunks | 各変更セットの代わりに各変更後に書き込むかどうかを指定します。 デフォルト値:false 有効な値:
|
include-lsn | nextlsnを各変更セットに追加するかどうかを指定します。 デフォルト値:false 有効な値:
|
filter-tables | 指定したテーブルから行を除外します。 デフォルトでは、このパラメーターは空で、除外されるテーブルがないことを示します。 説明
|
add-tables | 指定されたテーブルの行のみが含まれます。 デフォルトでは、すべてのスキーマのすべてのテーブルが解析されます。 詳細については、「フィルターテーブル」をご参照ください。 |
filter-msg-prefixes | 指定されたプレフィックスを持つ行を除外します。 このパラメーターは通常、 |
add-msg-prefixes | 指定されたプレフィックスを持つ行を含みます。 このパラメーターは通常、 |
format-version | 使用する出力形式を定義します。 デフォルト値は 1 です。 有効な値:
|
actions | 送信される操作を指定します。 デフォルトでは、INSERT、UPDATE、DELETE、およびTRUNCATEを含むすべての操作が送信されます。 |
例
include-xidsを以下の実施例で使用します。
テーブルと論理レプリケーションスロットを作成します。 データの行を挿入します。
DROP TABLE IF EXISTS tbl; CREATE TABLE tbl (id int); SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'wal2json'); INSERT INTO tbl VALUES (1);関数にパラメーターと値を入力します。
SELECT count(*) = 1, count(distinct ((data::json)->'xid')::text) = 1 FROM pg_logical_slot_get_changes( 'regression_slot', NULL, NULL, 'format-version', '1', 'include-xids', '1');
制御ポリシー機能の動作
詳細については、「公式ドキュメント」をご参照ください。