本文介紹如何通過阿里雲Realtime ComputeFlink版寫入資料到AnalyticDB PostgreSQL版。
使用限制
該功能暫不支援AnalyticDB PostgreSQL版Serverless模式。
僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。
僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。
說明如果您使用了自訂連接器,具體操作請參見管理自訂連接器。
前提條件
已建立Flink全託管工作空間。具體操作,請參見開通Realtime ComputeFlink版。
已建立AnalyticDB PostgreSQL版執行個體。具體操作,請參見建立執行個體。
AnalyticDB PostgreSQL版執行個體和Flink全託管工作空間需要位於同一VPC下。
配置AnalyticDB PostgreSQL版執行個體
- 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台。
將Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。如何添加白名單,請參見設定白名單。
單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連。
在AnalyticDB PostgreSQL版執行個體上建立一張表。
建表SQL樣本如下:
CREATE TABLE test_adbpg_table( b1 int, b2 int, b3 text, PRIMARY KEY(b1) );
配置Realtime ComputeFlink
在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊資料連線。
在資料連線頁面,單擊建立自訂連接器。
上傳自訂連接器JAR檔案。
說明擷取AnalyticDB PostgreSQL版自訂Flink Connector的JAR包,請參見AnalyticDB PostgreSQL Connector。
JAR包的版本需要與Realtime Compute平台的Flink引擎版本一致。
上傳完成後,單擊下一步。
系統會對您上傳的自訂連接器內容進行解析。如果解析成功,您可以繼續下一步。如果解析失敗,請確認您上傳的自訂連接器代碼是否符合Flink社區標準。
單擊完成。
建立完成的自訂連接器會出現在連接器列表中。
建立Flink作業
登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台。
在左側導覽列,單擊SQL開發,單擊建立,選擇空白的流作業草稿,單擊下一步。
在建立檔案草稿對話方塊,填寫作業配置資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
adbpg-test
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。作業草稿
引擎版本
當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹。
vvr-6.0.7-flink-1.15
單擊建立。
寫入資料到AnalyticDB PostgreSQL版
編寫作業代碼。
建立隨機源表
datagen_source和對應AnalyticDB PostgreSQL版的表test_adbpg_table,將以下作業代碼拷貝到作業文本編輯區。CREATE TABLE datagen_source ( f_sequence INT, f_random INT, f_random_str STRING ) WITH ( 'connector' = 'datagen', 'rows-per-second'='5', 'fields.f_sequence.kind'='sequence', 'fields.f_sequence.start'='1', 'fields.f_sequence.end'='1000', 'fields.f_random.min'='1', 'fields.f_random.max'='1000', 'fields.f_random_str.length'='10' ); CREATE TABLE test_adbpg_table ( `B1` bigint , `B2` bigint , `B3` VARCHAR , `B4` VARCHAR, PRIMARY KEY(B1) not ENFORCED ) with ( 'connector' = 'adbpg-nightly-1.13', 'password' = 'xxx', 'tablename' = 'test_adbpg_table', 'username' = 'xxxx', 'url' = 'jdbc:postgresql://url:5432/schema', 'maxretrytimes' = '2', 'batchsize' = '50000', 'connectionmaxactive' = '5', 'conflictmode' = 'ignore', 'usecopy' = '0', 'targetschema' = 'public', 'exceptionmode' = 'ignore', 'casesensitive' = '0', 'writemode' = '1', 'retrywaittime' = '200' );其中
datagen_source表的參數無需修改,test_adbpg_table表的參數需要根據您實際情況進行修改,參數說明如下:參數
是否必填
說明
connector
是
connector名稱,固定為
adbpg-nightly-版本號碼,例如adbpg-nightly-1.13。url
是
AnalyticDB PostgreSQL版的JDBC串連地址。格式為
jdbc:postgresql://<內網地址>:<連接埠>/<串連的資料庫名稱>,樣本如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres。tablename
是
AnalyticDB PostgreSQL版的表名。
username
是
AnalyticDB PostgreSQL版的資料庫帳號。
password
是
AnalyticDB PostgreSQL版的資料庫帳號密碼。
maxretrytimes
否
SQL執行失敗後重試次數,預設為3次。
batchsize
否
一次批量寫入的最大條數,預設為50000條。
exceptionmode
否
資料寫入過程中出現異常時的處理策略。支援以下兩種處理策略:
ignore(預設值):忽略出現異常時寫入的資料。
strict:資料寫入異常時,容錯移轉(Failover)並報錯。
conflictmode
否
當出現主鍵衝突或者唯一索引衝突時的處理策略。支援以下四種處理策略:
ignore :忽略主鍵衝突,保留之前的資料。
strict:主鍵衝突時,容錯移轉(Failover)並報錯。
update:主鍵衝突時,更新新到的資料。
upsert(預設值):主鍵衝突時,採用UPSERT方式寫入資料。
AnalyticDB PostgreSQL版通過INSERT ON CONFLICT和COPY ON CONFLICT實現UPSERT寫入資料,如果目標表為分區表,則需要核心小版本為V6.3.6.1及以上,如何升級核心小版本,請參見版本升級。
targetschema
否
AnalyticDB PostgreSQL版的Schema,預設為public。
writemode
否
寫入方式。取值說明:
0 :採用BATCH INSERT方式寫入資料。
1(預設值):採用COPY API寫入資料。
2:採用BATCH UPSERT方式寫入資料。
verbose
否
是否輸出connector作業記錄。取值說明:
0(預設):不輸出作業記錄。
1:輸出作業記錄。
retrywaittime
否
出現異常重試時間隔的時間。單位為ms,預設值為100。
batchwritetimeoutms
否
攢批寫入資料時最長攢批時間,超過此事件會觸發這一批的寫入。單位為毫秒(ms),預設值為50000。
connectionmaxactive
否
串連池參數,單個Task manager中串連池同一時刻最大串連數。預設值為5。
casesensitive
否
列名和表名是否區分大小寫,取值說明:
0(預設):不區分大小寫。
1:區分大小寫。
說明支援參數和類型映射,詳情請參見連接器雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)。
啟動作業。
在作業開發頁面頂部,單擊部署,在彈出的對話方塊中,單擊確定。
說明Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式調試作業,提高作業JM(Job Manager)資源使用率和作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題,詳情請參見作業調試。
在作業營運頁面,單擊目標作業操作列的啟動。
單擊啟動。
觀察同步結果
串連AnalyticDB PostgreSQL版資料庫。具體操作,請參見用戶端串連。
執行以下語句查詢
test_adbpg_table表。SELECT * FROM test_adbpg_table;資料正常寫入到AnalyticDB PostgreSQL版中,返回樣本如下。
