本文介紹AnalyticDB PostgreSQL版如何通過DMS的作業調度功能,實現定時調度RDS PostgreSQL資料庫的資料。
功能介紹
本次作業調度使用OSS作為中間態的儲存,調度任務會將資料從RDS PostgreSQL資料庫載入到OSS上,再使用AnalyticDB PostgreSQL版Serverless模式對該資料進行分析。整個ETL(Extract-Transform-Load)鏈路調度均通過DMS實現。作業調度示意圖如下:

優勢
資料存放區在OSS上,實現低成本儲存歸檔,且資料不會被刪除。
資料從RDS資料庫以“T+1”的形式載入到AnalyticDB PostgreSQL版Serverless模式中進行高效能資料分析。
DMS作業調度支援配置自動調度架構,低代碼、白屏化操作,上手容易。
注意事項
RDS資料庫中的資料,需要可以指定條件來增量歸檔。例如通過表中時間列按天歸檔。
RDS PostgreSQL執行個體、AnalyticDB PostgreSQL版執行個體和OSS Bucket需在同一地區內。
準備工作
AnalyticDB PostgreSQL版
RDS PostgreSQL
建立RDS PostgreSQL執行個體。如何建立執行個體,請參見快速建立RDS PostgreSQL執行個體。
說明RDS PostgreSQL執行個體需要為PostgreSQL 9.4至PostgreSQL 13.0版本。
建立高許可權帳號。如何建立高許可權帳號,請參見建立帳號。
OSS
建立OSS Bucket。如何建立OSS Bucket,請參見控制台建立儲存空間。
擷取OSS Bucket的Bucket名稱和Endpoint(地區節點)資訊,擷取方式如下:
登入OSS管理主控台。
在左側導覽列中,單擊目標Bucket列表。
在Bucket列表,單擊目標Bucket。
在Bucket列表頁面,您可以擷取Bucket名稱。
單擊左側導覽列中的概覽。
在概覽頁面的訪問連接埠地區,您可以擷取Endpoint(地區節點)。
建議使用ECS的VPC網路訪問(內網)的訪問網域名稱進行訪問。
擷取AccessKey ID和AccessKey Secret
擷取AccessKey ID和AccessKey Secret的具體操作,請參見建立AccessKey。
準備服務和資料
RDS PostgreSQL
串連RDS PostgreSQL資料庫。如何串連資料庫,請參見串連PostgreSQL執行個體。
本文樣本中所有操作均使用DMS串連並執行。
建立測試表t_src,並插入測試資料。語句如下:
CREATE TABLE t_src (a int, b int, c date); INSERT INTO t_src SELECT generate_series(1, 1000), 1, now();安裝OSS外表外掛程式。語句如下:
CREATE EXTENSION IF NOT EXISTS oss_fdw;建立一個OSS外表。語句如下:
CREATE SERVER ossserver FOREIGN DATA WRAPPER oss_fdw OPTIONS (host '<bucket_host>' , id '<access_key>', key '<secret_key>',bucket '<bucket_name>');參數說明如下:
參數
說明
host
準備工作中擷取的OSS的Endpoint(地區節點)。
id
準備工作中擷取的AccessKey ID。
key
準備工作中擷取的AccessKey Secret。
bucket
準備工作中擷取的OSS的Bucket名稱。
AnalyticDB PostgreSQL版
串連AnalyticDB PostgreSQL版資料庫。如何串連資料庫,請參見用戶端串連。
本文樣本中所有操作均使用DMS串連並執行。
建立一張與RDS PostgreSQL側表結構一致的表t_target。語句如下:
CREATE TABLE t_target (a int, b int, c date);說明AnalyticDB PostgreSQL版Serverless模式暫不支援主鍵。
安裝OSS外表外掛程式。語句如下:
CREATE EXTENSION IF NOT EXISTS oss_fdw;建立OSS Server和User Mapping。語句如下:
CREATE SERVER oss_serv FOREIGN DATA WRAPPER oss_fdw OPTIONS ( endpoint '<bucket_host>', bucket '<bucket_name>' ); CREATE USER MAPPING FOR PUBLIC SERVER oss_serv OPTIONS ( id '<access_key>', key '<secret_key>' );參數說明如下:
參數
說明
endpoint
準備工作中擷取的OSS的Endpoint(地區節點)。
id
準備工作中擷取的AccessKey ID。
key
準備工作中擷取的AccessKey Secret。
bucket
準備工作中擷取的OSS的Bucket名稱。
配置ETL任務
單擊頂部功能表列中的整合與開發(DTS),然後在左側導覽列中,選擇。
在任務流地區中,單擊新增任務流。
在建立任務流對話方塊中,輸入任務流名稱後,單擊確認。
本次樣本中,任務流名稱為RDSPG資料匯入OSS。
配置RDS PostgreSQL資料歸檔任務流。具體步驟如下:
在RDSPG資料匯入OSS頁簽中,將左側資料加工分類中的單一實例SQL拖動到中間畫布中。
可選:單擊畫布中該任務的
表徵圖,重新命名該任務。重新命名任務是為了方便後續維護整個ETL鏈路,您可以根據自身需求設定任務名。本次樣本中的任務名設定為RDS資料幫浦。
單擊畫布中建立任務的
表徵圖。選擇需要綁定的RDS PostgreSQL資料庫。

您可以切換至RDS PostgreSQL資料庫的SQL編輯頁簽查看資料庫。

在下方編輯框中,粘貼以下SQL:
DROP FOREIGN TABLE IF EXISTS oss_${mydate}; CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate} (a int, b int, c date) SERVER ossserver OPTIONS ( dir 'rds/t3/${mydate}/', DELIMITER '|' , format 'csv', encoding 'utf8'); INSERT INTO oss_${mydate} SELECT * FROM t_src WHERE c >= '${mydate}';單擊右側的變數設定,選擇節點變數,將變數名設定為
mydata,時間格式設定為yyyyMMdd。
返回RDSPG資料匯入OSS任務流,配置AnalyticDB PostgreSQL版的載入任務。具體步驟如下:
在RDSPG資料匯入OSS頁簽中,將左側資料加工分類中的單一實例SQL拖動到中間畫布中。
可選:單擊畫布中該任務的
表徵圖,重新命名該任務。重新命名任務是為了方便後續維護整個ETL鏈路,您可以根據自身需求設定任務名。本次樣本中的任務名設定為ADBPG資料載入。
單擊畫布中建立任務的
表徵圖。選擇需要綁定的AnalyticDB PostgreSQL版資料庫。
AnalyticDB PostgreSQL版資料庫擷取方式與步驟5中擷取RDS PostgreSQL資料庫方式一致。
在下方編輯框中,粘貼以下SQL:
CREATE FOREIGN TABLE IF NOT EXISTS oss_${mydate}( a int , b int , c date ) SERVER oss_serv OPTIONS ( dir 'rds/t3/${mydate}/', format 'csv', delimiter '|', encoding 'utf8'); INSERT INTO t_target SELECT * FROM oss_${mydate};單擊右側的變數設定,選擇節點變數,將變數名設定為
mydata,時間格式設定為yyyyMMdd。
配置調度任務,需要先運行RDS PostgreSQL資料幫浦任務,再運行AnalyticDB PostgreSQL版資料載入任務。配置方法如下:
選中RDS資料幫浦任務右側的圓點,拖動到ADBPG資料載入任務上,完成拖動後顯示效果如下:

單擊頁面下方的任務流資訊,開啟調度配置下開啟調度的開關。
選擇需要的作業調度周期,每個周期調度任務都會進行RDS側資料的抽取和AnalyticDB PostgreSQL版側資料的載入。

完成配置後,單擊左上方試運行。
任務流測試無誤後單擊發布。