すべてのプロダクト
Search
ドキュメントセンター

PolarDB:DDL論理レプリケーション

最終更新日:Jun 28, 2024

論理レプリケーションは、PolarDB for PostgreSQL で使用して、データ定義言語 (DDL) ステートメントをレプリケートできます。

背景情報

ネイティブPostgreSQLは、テーブルデータの同期のみをサポートします。 正しいテーブルデータ同期を確保するには、パブリッシャーノードとサブスクライバーノードで同じ定義のテーブルを手動で作成する必要があります。

PolarDB for PostgreSQL は、DDLステートメントを複製するための拡張論理レプリケーション機能を提供します。 pub/subメソッドを使用して、データベースオブジェクトのCREATE/ALTER/DROPステートメントをサブスクライバーノードにレプリケートできます。

前提条件

DDL論理レプリケーション機能を使用するには、wal_levelパラメーターをlogicalに設定する必要があります。 パラメーターの変更方法の詳細については、「クラスターパラメーターの設定」をご参照ください。

構文

  1. CREATE PUBLICATION

    CREATE PUBLICATION name
        [ FOR TABLE [ ONLY ] table_name [ * ] [, ...]
          | FOR ALL TABLES ]
        [ WITH ( publication_parameter [= value] [, ... ] ) ]
    publication_parameter:
    	...
    	pubddl = '(none | table | all)'

    pubddlパラメーターがpublication_parameterに追加されます。 デフォルト値 : none。 有効な値: nonetableall

    • none: DDL文を複製しません。

    • table: table関連のDDL文のみを複製します。

      • CREATE TABLE

      • ALTER TABLE

      • DROP TABLE

      • CREATE TABLE AS

    • all: すべてのDDL文を複製します。 次のDDLステートメントがサポートされています。

      • ALTER INDEX

      • ALTER SEQUENCE

      • ALTER TABLE

      • ALTER TYPE

      • CREATE INDEX

      • CREATE SCHEMA

      • CREATE SEQUENCE

      • CREATE TABLE

      • CREATE TABLE AS

      • CREATE TYPE

      • CREATE TYPE HEADER

      • CREATE TYPE BODY

      • DROP INDEX

      • DROP SCHEMA

      • DROP SEQUENCE

      • DROP TABLE

      • DROP TYPE

      • 説明

        pubddl = 'all' を指定した場合は、FOR ALL TABLESを追加する必要があります。 グローバルステートメントはすべてのデータベースで実行できますが、レプリケートすることはできません。 グローバルステートメントには、ROLEDATABASETableSpaceGrantStmtまたはRevokeStmt (グローバルオブジェクトの場合) が含まれます。

  2. CREATE SUBSCRIPTION

    CREATE SUBSCRIPTION subscription_name
        CONNECTION 'conninfo'
        PUBLICATION publication_name [, ...]
        [ WITH ( subscription_parameter [= value] [, ... ] ) ]
    subscription_parameter: 
    	...
      dump_schema = false/true

    dump_schemaパラメーターがsubscription_parameterに追加され、サブスクリプションの作成時に既存のオブジェクト定義をパブリッシャーノードからサブスクライバーノードにダンプします。 デフォルト値:false 有効な値:

    • false: サブスクリプションを作成するときに、既存のオブジェクト定義をパブリッシャーノードからサブスクライバーノードにdumpしません。

    • true: サブスクリプションの作成時に、既存のオブジェクト定義をパブリッシャーノードからサブスクライバーノードにdumpsします。

説明

dump_schemaパラメーターは、pg_dumpまたはpg_restoreツールを使用します。 クラスターがhost='127.0.0.1 '接続をサポートしていることを確認する必要があります。 それ以外の場合、クラスターの復元に失敗します。 ダンプされたファイルは、クラスターのpg_logical/schemadumpsディレクトリに保存され、復元またはエラー後に削除されます。

Parameters

パラメーター

説明

polar_enable_ddl_replication

DDL論理レプリケーション機能を有効にするかどうかを指定します。 デフォルト値: true。 有効な値:

  • true

  • false

polar_enable_debug_ddl_replication

デバッグDDL replicaitonを有効にしてより多くのログを印刷するかどうかを指定します。 デフォルト値: false。 有効な値:

true

false

  1. DDLステートメントをサポートするパブリケーションを作成します。

    CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');

    サンプル結果:

    CREATE PUBLICATION
  2. サブスクリプションを作成します。

    CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub;

    サンプル結果:

    NOTICE:  created replication slot "mysub" on publisher
    CREATE SUBSCRIPTION
  3. 発行者ノードでSQL文を実行します。

    # Create a table.
    CREATE TABLE t1(id int ,val char(3));
    
    # Insert data.
    INSERT INTO t1 values (1,'a');
    INSERT INTO t1 values (2,'b');
    INSERT INTO t1 values (3,'c');
    
    # Modify the table.
    ALTER TABLE t1 ADD COLUMN c int GENERATED BY DEFAULT AS IDENTITY,
      ALTER COLUMN c SET GENERATED ALWAYS;
      
    # View the table.
    SELECT * FROM t1;
     id | val | c 
    ----+-----+---
      1 | a   | 1
      2 | b   | 2
      3 | c   | 3
    (3 rows)
    
    # View comments of the table.
    \d+ t1
                                                             Table "public.t1"
     Column |     Type     | Collation | Nullable |           Default            | Storage  | Compression | Stats target | Description 
    --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+-------------
     id     | integer      |           |          |                              | plain    |             |              | 
     val    | character(3) |           |          |                              | extended |             |              | 
     c      | integer      |           | not null | generated always as identity | plain    |             |              | 
    Publications:
        "mypub"
    Replica Identity: FULL
    Access method: heap
  4. サブスクライバノードのレプリケーション情報を表示します。

    # View the table.
    SELECT * FROM t1;
     id | val | c 
    ----+-----+---
      1 | a   | 1
      2 | b   | 2
      3 | c   | 3
    (3 rows)
    
    # View comments of the table.
    \d+ t1
                                                             Table "public.t1"
     Column |     Type     | Collation | Nullable |           Default            | Storage  | Compression | Stats target | Description 
    --------+--------------+-----------+----------+------------------------------+----------+-------------+--------------+-------------
     id     | integer      |           |          |                              | plain    |             |              | 
     val    | character(3) |           |          |                              | extended |             |              | 
     c      | integer      |           | not null | generated always as identity | plain    |             |              | 
    Replica Identity: FULL
    Access method: heap
  5. 発行者ノードのテーブルを削除します。

    DROP TABLE t1;
  6. サブスクライバノードのレプリケーション情報を表示します。

    SELECT * FROM t1;

    サンプル結果:

    ERROR:  relation "t1" does not exist
    LINE 1: SELECT * FROM t1;

デコード拡張機能

次のコールバックインターフェイスがデコード拡張に追加されます。

/*
 * Output plugin callbacks
 */
typedef struct OutputPluginCallbacks
{
  ...
	LogicalDecodeDDLMessageCB ddl_cb;


	/* streaming of changes */
	...
	LogicalDecodeStreamDDLMessageCB stream_ddl_cb;

} OutputPluginCallbacks;

/*
 * Called for the logical decoding DDL messages.
 */
typedef void (*LogicalDecodeDDLMessageCB) (struct LogicalDecodingContext *ctx,
										   ReorderBufferTXN *txn,
										   XLogRecPtr message_lsn,
										   const char *prefix,
										   Oid relid,
										   DeparsedCommandType cmdtype,
										   Size message_size,
										   const char *message);

/*
 * Callback for streaming logical decoding DDL messages from in-progress
 * transactions.
 */
typedef void (*LogicalDecodeStreamDDLMessageCB) (struct LogicalDecodingContext *ctx,
												 ReorderBufferTXN *txn,
												 XLogRecPtr message_lsn,
												 const char *prefix,
												 Oid relid,
												 DeparsedCommandType cmdtype,
												 Size message_size,
												 const char *message);

DDLメッセージメソッドは、test_decoding拡張で実装されます。 test decoding拡張機能は、次の方法で使用できます。

CREATE PUBLICATION mypub FOR ALL TABLES with (pubddl = 'all');
SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
create table t3(id int);
SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
    lsn     | xid |                                                                                                                                                                                                                                  
                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                     
                                                                        data                                                                                                                                                                         
                                                                                                                                                                                                                                                     
                                                                                                                                                                                                                                                     
                                                                                                                                  
------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
----------------------------------------------------------------------------------------------------------------------------------
 0/C001BF10 | 783 | BEGIN 783
 0/C001EBC0 | 783 | message: prefix: deparse, relid: 16418, cmdtype: Simple, sz: 1505 content:{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s
 %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": fal
se, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "
present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{co
llation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": 
false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_
type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}
 0/C001EE98 | 783 | COMMIT 783

select polar_catalog.ddl_deparse_expand_command('{"fmt": "CREATE %{persistence}s TABLE %{if_not_exists}s %{identity}D (%{table_elements:, }s) %{inherits}s %{tablespace}s %{on_commit}s %{partition_by}s %{access_method}s %{with_clause}s", "identity": {"objname": "t3", "schemaname": "public"}, "inherits": {"fmt": "INHERITS (%{parents:, }D)", "parents": null, "present": false}, "on_commit": {"fmt": "ON COMMIT %{on_commit_value}s", "present": false, "on_commit_value": null}, "tablespace": {"fmt": "TABLESPACE %{tablespace}I", "present": false, "tablespace": null}, "persistence": "", "with_clause": {"fmt": "WITH", "present": false}, "partition_by": {"fmt": "PARTITION BY %{definition}s", "present": false, "definition": null}, "access_method": {"fmt": "USING %{access_method}I", "present": false, "access_method": null}, "if_not_exists": "", "table_elements": [{"fmt": "%{name}I %{coltype}T STORAGE %{colstorage}s %{compression}s %{collation}s %{not_null}s %{default}s %{identity_column}s %{generated_column}s", "name": "id", "type": "column", "coltype": {"typmod": "", "typarray": false, "typename": "int4", "schemaname": "pg_catalog"}, "default": {"fmt": "DEFAULT", "present": false}, "not_null": "", "collation": {"fmt": "COLLATE", "present": false}, "colstorage": "plain", "compression": {"fmt": "COMPRESSION %{compression_method}I", "present": false, "compression_method": null}, "identity_column": {"fmt": "%{identity_type}s", "identity_type": {"fmt": "", "present": false}}, "generated_column": {"fmt": "GENERATED ALWAYS AS", "present": false}}]}');
                       ddl_deparse_expand_command                        
-------------------------------------------------------------------------
 CREATE  TABLE  public.t3 (id pg_catalog.int4 STORAGE plain      )      
(1 row)

システムテーブルとシステム関数の追加

  • polar_catalog.ddl_deparse_to_json

    • Syntax: ddl_deparse_to_json(IN pg_ddl_command) RETURN text

    • Usage: 内部解析ツリーをJSON文字列に解析します。

    • Parameters:入力したpg_ddl_commandはPARSETREE型で、TEXT型のJSON文字列が返されます。

  • polar_catalog.ddl_deparse_expand_command

    • Syntax: ddl_deparse_expand_command(IN text) RETURN text

    • Usage: JSON文字列をSQL文字列に解析します。

    • Parameters: 入力するJSON文字列はTEXT型で、TEXT型のSQL文字列が返されます。

  • polar_catalog.polar_publication

    Syntax:

    TABLE polar_publication
    (
        puboid Oid primary key,  -- publication oid
        pubddl "char", -- Specifies whether the publication supports DDL objects.
        pubglobal "char", -- Specifies whether the publication supports global objects (supported soon).
        pubflags int -- The reserved flag bits.
    );