本文介紹如何通過阿里雲Realtime ComputeFlink版即時讀寫雲原生資料倉儲AnalyticDB PostgreSQL版全量資料。
背景資訊
雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。Realtime ComputeFlink版是基於Apache Flink構建的⼀站式即時巨量資料分析平台,內建豐富上下遊連接器,滿足不同業務情境的需求,提供高效、靈活的Realtime Compute服務。通過Realtime ComputeFlink版讀取AnalyticDB PostgreSQL版資料,可以充分發揮雲原生資料倉儲的優勢,提高資料分析的效率和精度。
使用限制
該功能暫不支援AnalyticDB PostgreSQL版Serverless模式。
僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。
僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。
說明如果您使用了自訂連接器,具體操作請參見管理自訂連接器。
前提條件
已建立Flink全託管工作空間。具體操作,請參見開通Realtime ComputeFlink版。
已建立AnalyticDB PostgreSQL版執行個體。具體操作,請參見建立執行個體。
AnalyticDB PostgreSQL版執行個體和Flink全託管工作空間需要位於同一VPC下。
步驟一:配置AnalyticDB PostgreSQL版執行個體
- 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台。
將Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。如何添加白名單,請參見設定白名單。
單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連。
在AnalyticDB PostgreSQL版執行個體上建立一張名為adbpg_dim_table的維表並插入50條測試資料。
建表SQL和插入資料SQL的樣本如下:
--建立名稱為adbpg_dim_table的表。 CREATE TABLE adbpg_dim_table( id int, username text, PRIMARY KEY(id) ); --向adbpg_dim_table的表中插入50行資料,其中id欄位的值為從1到50的整數,而username欄位的值為username字串後面跟隨當前行數的文本表示。 INSERT INTO adbpg_dim_table(id, username) SELECT i, 'username'||i::text FROM generate_series(1, 50) AS t(i);建立一張名為adbpg_sink_table的結果表,用於Flink寫入結果資料。
CREATE TABLE adbpg_sink_table( id int, username text, score int );
步驟二:建立Flink作業
登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊SQL開發,單擊建立,選擇空白的流作業草稿,單擊下一步。
在建立檔案草稿對話方塊,填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
adbpg-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.7-flink-1.15
單擊建立。
步驟三:編寫作業代碼並部署
將以下作業代碼拷貝到作業文本編輯區。
---建立一個datagen源表。 CREATE TEMPORARY TABLE datagen_source ( id INT, score INT ) WITH ( 'connector' = 'datagen', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='50', 'fields.score.kind'='random', 'fields.score.min'='70', 'fields.score.max'='100' ); --建立adbpg維表。 CREATE TEMPORARY TABLE dim_adbpg( id int, username varchar, PRIMARY KEY(id) not ENFORCED ) WITH( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_dim_table', 'username' = 'flink****test', 'password' = '*******', 'maxJoinRows'='100', 'maxRetryTimes'='1', 'cache'='lru', 'cacheSize'='1000' ); --建立adbpg結果表。 CREATE TEMPORARY TABLE sink_adbpg ( id int, username varchar, score int ) WITH ( 'connector' = 'adbpg', 'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest', 'tablename' = 'adbpg_sink_table', 'username' = 'flink****test', 'password' = '******', 'maxRetryTimes' = '2', 'batchsize' = '5000', 'conflictMode' = 'ignore', 'writeMode' = 'insert', 'retryWaitTime' = '200' ); --維表和源表join後的結果插入adbpg結果表。 INSERT INTO sink_adbpg SELECT ts.id,ts.username,ds.score FROM datagen_source AS ds join dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts on ds.id = ts.id;根據實際情況修改以下參數,參數說明如下。
參數
是否必填
說明
URL
是
AnalyticDB PostgreSQL版的JDBC串連地址。格式為
jdbc:postgresql://<內網地址>:<連接埠>/<串連的資料庫名稱>,樣本如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的資料庫帳號。
password
是
AnalyticDB PostgreSQL版的資料庫帳號密碼。
說明更多參數和類型映射,詳情請參見連接器雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)。
在作業開發頁面頂部,單擊深度檢查,進行語法檢查。
單擊部署。
在作業營運頁面,單擊啟動。
步驟四:查看Flink寫入資料
- 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台。
單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連。
執行如下查詢語句,查看Flink寫入資料。
SELECT * FROM adbpg_sink_table;