論理レプリケーションは、PolarDB for PostgreSQL で使用して、データ定義言語 (DDL) ステートメントをレプリケートできます。
背景情報
ネイティブPostgreSQLは、テーブルデータの同期のみをサポートします。 正しいテーブルデータ同期を確保するには、パブリッシャーノードとサブスクライバーノードで同じ定義のテーブルを手動で作成する必要があります。
PolarDB for PostgreSQL は、DDLステートメントを複製するための拡張論理レプリケーション機能を提供します。 pub/subメソッドを使用して、データベースオブジェクトのCREATE/ALTER/DROPステートメントをサブスクライバーノードにレプリケートできます。
前提条件
DDL論理レプリケーション機能を使用するには、wal_levelパラメーターをlogicalに設定する必要があります。 パラメーターの変更方法の詳細については、「クラスターパラメーターの設定」をご参照ください。
構文
CREATE PUBLICATIONCREATE PUBLICATION name [ FOR TABLE [ ONLY ] table_name [ * ] [, ...] | FOR ALL TABLES ] [ WITH ( publication_parameter [= value] [, ... ] ) ] publication_parameter: ... pubddl = '(none | table | all)'pubddlパラメーターがpublication_parameterに追加されます。 デフォルト値 : none。 有効な値:none、table、allnone: DDL文を複製しません。
table:
table関連のDDL文のみを複製します。CREATE TABLEALTER TABLEDROP TABLECREATE TABLE AS
all: すべてのDDL文を複製します。 次のDDLステートメントがサポートされています。
ALTER INDEXALTER SEQUENCEALTER TABLEALTER TYPECREATE INDEXCREATE SCHEMACREATE SEQUENCECREATE TABLECREATE TABLE ASCREATE TYPECREATE TYPE HEADERCREATE TYPE BODYDROP INDEXDROP SCHEMADROP SEQUENCEDROP TABLEDROP TYPE- 説明
pubddl = 'all'を指定した場合は、FOR ALL TABLESを追加する必要があります。 グローバルステートメントはすべてのデータベースで実行できますが、レプリケートすることはできません。 グローバルステートメントには、ROLE、DATABASE、TableSpace、GrantStmtまたはRevokeStmt(グローバルオブジェクトの場合) が含まれます。
CREATE SUBSCRIPTIONCREATE SUBSCRIPTION subscription_name CONNECTION 'conninfo' PUBLICATION publication_name [, ...] [ WITH ( subscription_parameter [= value] [, ... ] ) ] subscription_parameter: ... dump_schema = false/truedump_schemaパラメーターがsubscription_parameterに追加され、サブスクリプションの作成時に既存のオブジェクト定義をパブリッシャーノードからサブスクライバーノードにダンプします。 デフォルト値:false 有効な値:false: サブスクリプションを作成するときに、既存のオブジェクト定義をパブリッシャーノードからサブスクライバーノードに
dumpしません。true: サブスクリプションの作成時に、既存のオブジェクト定義をパブリッシャーノードからサブスクライバーノードに
dumpsします。
dump_schemaパラメーターは、pg_dumpまたはpg_restoreツールを使用します。 クラスターがhost='127.0.0.1 '接続をサポートしていることを確認する必要があります。 それ以外の場合、クラスターの復元に失敗します。 ダンプされたファイルは、クラスターのpg_logical/schemadumpsディレクトリに保存され、復元またはエラー後に削除されます。
Parameters
パラメーター | 説明 |
polar_enable_ddl_replication | DDL論理レプリケーション機能を有効にするかどうかを指定します。 デフォルト値: true。 有効な値:
|
polar_enable_debug_ddl_replication | デバッグDDL replicaitonを有効にしてより多くのログを印刷するかどうかを指定します。 デフォルト値: false。 有効な値: true false |
例
DDLステートメントをサポートするパブリケーションを作成します。
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');サンプル結果:
CREATE PUBLICATIONサブスクリプションを作成します。
CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;サンプル結果:
NOTICE: created replication slot "mysub" on publisher CREATE SUBSCRIPTION発行者ノードでSQL文を実行します。
# Create a table. CREATE TABLE t1(id int ,val char(3)); # Insert data. INSERT INTO t1 values (1,'a'); INSERT INTO t1 values (2,'b'); INSERT INTO t1 values (3,'c'); # Modify the table. ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY, ALTER COLUMN c SET GENERATED ALWAYS; # View the table. SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # View comments of the table. \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Publications: "mypub" Replica Identity: FULL Access method: heapサブスクライバノードのレプリケーション情報を表示します。
# View the table. SELECT * FROM t1; id | val | c ----+-----+--- 1 | a | 1 2 | b | 2 3 | c | 3 (3 rows) # View comments of the table. \d+ t1 Table "public.t1" Column | Type | Collation | Nullable | Default | Storage | Compression | Stats target | Description --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+------------- id | integer | | | | plain | | | val | character(3) | | | | extended | | | c | integer | | not null | generated always as identity | plain | | | Replica Identity: FULL Access method: heap発行者ノードのテーブルを削除します。
DROP TABLE t1;サブスクライバノードのレプリケーション情報を表示します。
SELECT * FROM t1;サンプル結果:
ERROR: relation "t1" does not exist LINE 1: SELECT * FROM t1;
デコード拡張機能
次のコールバックインターフェイスがデコード拡張に追加されます。
/*
* Output plugin callbacks
*/
typedef struct OutputPluginCallbacks
{
...
LogicalDecodeDDLMessageCB ddl_cb;
/* streaming of changes */
...
LogicalDecodeStreamDDLMessageCB stream_ddl_cb;
} OutputPluginCallbacks;
/*
* Called for the logical decoding DDL messages.
*/
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);
/*
* Callback for streaming logical decoding DDL messages from in-progress
* transactions.
*/
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr message_lsn,
const char *prefix,
Oid relid,
DeparsedCommandType cmdtype,
Size message_size,
const char *message);DDLメッセージメソッドは、test_decoding拡張で実装されます。 test decoding拡張機能は、次の方法で使用できます。
CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid |
data
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
0/C001BF10 | 783 | BEGIN 783
0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
%{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present":
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
0/C001EE98 | 783 | COMMIT 783
select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
ddl_deparse_expand_command
-------------------------------------------------------------------------
CREATE TABLE public.t3 (id pg_catalog.int4 STORAGE plain )
(1 row)
システムテーブルとシステム関数の追加
polar_catalog.ddl_deparse_to_json
Syntax:
ddl_deparse_to_json(IN pg_ddl_command) RETURN textUsage: 内部解析ツリーをJSON文字列に解析します。
Parameters:入力したpg_ddl_commandはPARSETREE型で、TEXT型のJSON文字列が返されます。
polar_catalog.ddl_deparse_expand_command
Syntax:
ddl_deparse_expand_command(IN text) RETURN textUsage: JSON文字列をSQL文字列に解析します。
Parameters: 入力するJSON文字列はTEXT型で、TEXT型のSQL文字列が返されます。
polar_catalog.polar_publication
Syntax:
TABLE polar_publication ( puboid Oid primary key, -- publication oid pubddl "char", -- Specifies whether the publication supports DDL objects. pubglobal "char", -- Specifies whether the publication supports global objects (supported soon). pubflags int -- The reserved flag bits. );