全部產品
Search
文件中心

ApsaraDB RDS:使用事件觸發程序實現DDL資源回收筒、防火牆、增量訂閱同步

更新時間:Jun 08, 2024

RDS PostgreSQL開放事件觸發程序,可以實現DDL資源回收筒、DDL防火牆、DDL增量訂閱同步等功能,靈活使用事件觸發程序可以減少維護成本,保護資料安全。

前提條件

執行個體版本為PostgreSQL雲端硬碟版。

背景資訊

如果您對資料庫安全有非常高的要求,可以基於事件觸發程序建立DDL資源回收筒規則、DDL防火牆規則,在多個維度保護資料安全:

  • 事前防禦:防止drop table、drop index、drop database等刪庫刪表危險操作。

  • 事後回檔:在發生意外刪表後,可以從資源回收筒找回。

原理是使用pg_get_ddl_command和pg_get_ddl_drop這兩個事件觸發程序收集並存入DDL語句到表dts_audit.dts_tb_ddl_command中,其具體實現在函數pg_func_ddl_command()中。其中dts_audit.dts_tb_ddl_command表結構如下:

     Column      |            Type             | Collation | Nullable |      Default       | Storage  | Stats target | Description 
-----------------+-----------------------------+-----------+----------+--------------------+----------+--------------+-------------
 event           | text                        |           |          |                    | extended |              |   
 tag             | text                        |           |          |                    | extended |              | Command tag
 classid         | oid                         |           |          |                    | plain    |              | OID of catalog the object belonged in
 objid           | oid                         |           |          |                    | plain    |              | OID the object had within the catalog
 objsubid        | integer                     |           |          |                    | plain    |              | Object sub-id (e.g. attribute number for columns)
 object_type     | text                        |           |          |                    | extended |              | Type of the object
 schema_name     | text                        |           |          |                    | extended |              | Name of the schema the object belonged in, if any; otherwise NULL. No quoting is applied.
 object_identity | text                        |           |          |                    | extended |              | Text rendering of the object identity, schema-qualified.
 is_extension    | boolean                     |           |          |                    | plain    |              | True if the command is part of an extension script
 query           | text                        |           |          |                    | extended |              | sql text
 username        | text                        |           |          | CURRENT_USER       | extended |              |  
 datname         | text                        |           |          | current_database() | extended |              | 
 client_addr     | inet                        |           |          | inet_client_addr() | main     |              | 
 crt_time        | timestamp without time zone |           |          | now()              | plain    |              |

建立語句如下:

CREATE SCHEMA IF NOT EXISTS dts_audit;
CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
tag text, classid oid, objid oid, objsubid int,
object_type text, schema_name text, object_identity text, is_extension bool, query text,
username text default current_user, datname text default current_database(),
client_addr inet default inet_client_addr(), crt_time timestamp default now()
);

下文將以樣本的方式介紹如何?DDL資源回收筒、DDL防火牆、DDL增量訂閱及同步等功能,您可以根據業務情況修改相關代碼。

DDL資源回收筒

  1. 執行如下命令建立表、函數和相關觸發器。

    /* external/rds_ddl_pulication/rds_ddl_pulication--1.0.sql */
    
    --create schema
    CREATE SCHEMA IF NOT EXISTS dts_audit;
    --create table for ddl record
    CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
    tag text, classid oid, objid oid, objsubid int,
    object_type text, schema_name text, object_identity text, is_extension bool, query text,
    username text default current_user, datname text default current_database(), client_addr inet default inet_client_addr(), crt_time timestamp default now()
    );
    -- create function for event triggers
    create or replace function dts_audit.dts_func_ddl_command() returns event_trigger as $$
    declare v1 text;
    is_superuser bool = false;
    r record; 
    begin
        -- we don't record  ddl command from superusers 
        select u.rolsuper into is_superuser from pg_catalog.pg_roles u where u.rolname = SESSION_USER;
        if is_superuser then
            return;
        end if;
        select query into v1 from pg_stat_activity where pid=pg_backend_pid();
        -- RAISE NOTICE 'ddl event:%, command:%', tg_event, tg_tag;
        -- NB:since ddl_command_end cannot collect the details of the drop statement, we use sql_drop
        if TG_EVENT='ddl_command_end' then
            SELECT * into r FROM pg_event_trigger_ddl_commands(); 
            if r.classid > 0 then
                insert into dts_audit.dts_tb_ddl_command(event, tag, classid, objid, objsubid, object_type, schema_name, object_identity, is_extension, query)
                values(TG_EVENT, TG_TAG, r.classid, r.objid, r.objsubid, r.object_type, r.schema_name, r.object_identity, r.in_extension, v1);
             end if; 
        end if;
        if TG_EVENT='sql_drop' then
            -- To avoid repeated collection, we filtered 'ALTER TABLE' and 'ALTER FOREIGN TABLE'
            if TG_TAG != 'ALTER TABLE' and TG_TAG != 'ALTER FOREIGN TABLE' then
                SELECT * into r FROM pg_event_trigger_dropped_objects();
                insert into dts_audit.dts_tb_ddl_command(event, tag, classid, objid, objsubid, object_type, schema_name, object_identity, is_extension, query)
                values(TG_EVENT, TG_TAG, r.classid, r.objid, r.objsubid, r.object_type, r.schema_name, r.object_identity, 'f', v1);
            end if; 
        end if;
    end;
    $$ language plpgsql strict;
    -- ddl_command_end event trigger
    CREATE EVENT TRIGGER pg_get_ddl_command on ddl_command_end EXECUTE PROCEDURE dts_audit.dts_func_ddl_command();
    -- pg_get_ddl_drop event trigger
    CREATE EVENT TRIGGER pg_get_ddl_drop on sql_drop EXECUTE PROCEDURE dts_audit.dts_func_ddl_command();
    -- grant privileges to all user
    GRANT USAGE ON SCHEMA dts_audit TO PUBLIC;
    GRANT SELECT, INSERT ON TABLE dts_audit.dts_tb_ddl_command TO PUBLIC;
    說明

    執行以上命令後,您的DDL語句就會記錄在表dts_audit.dts_tb_ddl_command中。

  2. 執行一個DDL語句,測試能否記錄變更。

    CREATE TABLE tb_test(id int);
    SELECT * FROM dts_audit.dts_tb_ddl_command;

    測試是否成功記錄DDL

DDL防火牆

您可以根據業務需求建立事件觸發程序,使用ddl_command_start事件類型,可以阻止相應的DDL語句執行。

  1. 建立觸發器函數。

    CREATE OR REPLACE FUNCTION abort1()
      RETURNS event_trigger
     LANGUAGE plpgsql
      AS $$
    BEGIN
      if current_user = 'test1' then
        RAISE EXCEPTION 'event:%, command:%', tg_event, tg_tag;
      end if;
     END;
    $$;
  2. 建立觸發器阻止建立和刪除表的語句。

    create event trigger b on ddl_command_start when TAG IN ('CREATE TABLE', 'DROP TABLE') execute procedure abort1();
  3. 使用對應的使用者test1登入執行個體後建立表,測試能否建立。

    無法建立

    說明

    DDL語句被成功阻止。

DDL增量訂閱同步

在發布端我們將已經執行了的DDL語句儲存在dts_audit.dts_tb_ddl_command中。訂閱端可以讀取記錄進行同步。

  1. 在發布端執行發布命令。

    CREATE PUBLICATION my_ddl_publication FOR TABLE ONLY dts_audit.dts_tb_ddl_command;
  2. 在訂閱端建立相同的表。

    CREATE SCHEMA IF NOT EXISTS dts_audit;
    CREATE TABLE IF NOT EXISTS dts_audit.dts_tb_ddl_command ( event text,
    tag text, classid oid, objid oid, objsubid int,
    object_type text, schema_name text, object_identity text, is_extension bool, query text,
    username text, datname text, client_addr inet , crt_time timestamp );
  3. 在訂閱端建立訂閱。

    CREATE SUBSCRIPTION my_ddl_subscriptin CONNECTION 'host=*** port=*** user=*** password=***  dbname=**' PUBLICATION my_ddl_publication;
    說明

    需要確保執行個體參數wal_level的值為logical,您可以在控制台的參數設定頁面進行修改,該參數修改後需要重啟執行個體才會生效。詳情請參見邏輯訂閱

    樣本

    CREATE SUBSCRIPTION my_ddl_subscriptin CONNECTION 'host=pgm-bpxxxxx.pg.rds.aliyuncs.com port=1433 user=test1 password=xxxxx  dbname=testdb1' PUBLICATION my_ddl_publication;
  4. 在訂閱端針對dts_audit.dts_tb_ddl_command表建立相應的觸發器,實現DDL增量同步處理。