全部產品
Search
文件中心

AnalyticDB:通過Realtime ComputeFlink版寫入資料到雲原生資料倉儲AnalyticDB PostgreSQL版

更新時間:Jun 19, 2024

本文介紹如何通過阿里雲Realtime ComputeFlink版寫入資料到AnalyticDB PostgreSQL版

使用限制

  • 該功能暫不支援AnalyticDB PostgreSQL版Serverless模式

  • 僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。

  • 僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。

    說明

    如果您使用了自訂連接器,具體操作請參見管理自訂連接器

前提條件

  • 已建立Flink全託管工作空間。具體操作,請參見開通Realtime ComputeFlink版

  • 已建立AnalyticDB PostgreSQL版執行個體。具體操作,請參見建立執行個體

  • AnalyticDB PostgreSQL版執行個體和Flink全託管工作空間需要位於同一VPC下。

配置AnalyticDB PostgreSQL版執行個體

  1. 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台
  2. 將Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。如何添加白名單,請參見設定白名單

  3. 單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連

  4. AnalyticDB PostgreSQL版執行個體上建立一張表。

    建表SQL樣本如下:

    CREATE TABLE test_adbpg_table(
    b1 int,
    b2 int,
    b3 text,
    PRIMARY KEY(b1)
    );

配置Realtime ComputeFlink

  1. 登入Realtime Compute控制台

  2. Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  3. 在左側導覽列,單擊資料連線

  4. 資料連線頁面,單擊建立自訂連接器

  5. 上傳自訂連接器JAR檔案。

    說明
    • 擷取AnalyticDB PostgreSQL版自訂Flink Connector的JAR包,請參見AnalyticDB PostgreSQL Connector

    • JAR包的版本需要與Realtime Compute平台的Flink引擎版本一致。

  6. 上傳完成後,單擊下一步

    系統會對您上傳的自訂連接器內容進行解析。如果解析成功,您可以繼續下一步。如果解析失敗,請確認您上傳的自訂連接器代碼是否符合Flink社區標準。

  7. 單擊完成

    建立完成的自訂連接器會出現在連接器列表中。

建立Flink作業

  1. 登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  2. 在左側導覽列,單擊SQL開發,單擊建立,選擇空白的流作業草稿,單擊下一步

  3. 建立檔案草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    adbpg-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.7-flink-1.15

  4. 單擊建立

寫入資料到AnalyticDB PostgreSQL版

  1. 編寫作業代碼。

    建立隨機源表datagen_source和對應AnalyticDB PostgreSQL版的表test_adbpg_table,將以下作業代碼拷貝到作業文本編輯區。

    CREATE TABLE datagen_source (
     f_sequence INT,
     f_random INT,
     f_random_str STRING
    ) WITH (
     'connector' = 'datagen',
     'rows-per-second'='5',
     'fields.f_sequence.kind'='sequence',
     'fields.f_sequence.start'='1',
     'fields.f_sequence.end'='1000',
     'fields.f_random.min'='1',
     'fields.f_random.max'='1000',
     'fields.f_random_str.length'='10'
    );
    
    CREATE TABLE test_adbpg_table (
        `B1` bigint   ,
        `B2` bigint  ,
        `B3` VARCHAR ,
        `B4` VARCHAR,
         PRIMARY KEY(B1) not ENFORCED
    ) with (
       'connector' = 'adbpg-nightly-1.13',
       'password' = 'xxx',
       'tablename' = 'test_adbpg_table',
       'username' = 'xxxx',
       'url' = 'jdbc:postgresql://url:5432/schema',
       'maxretrytimes' = '2',
       'batchsize' = '50000',
       'connectionmaxactive' = '5',
       'conflictmode' = 'ignore',
       'usecopy' = '0',
       'targetschema' = 'public',
       'exceptionmode' = 'ignore',
       'casesensitive' = '0',
       'writemode' = '1',
       'retrywaittime' = '200'
    );

    其中datagen_source表的參數無需修改,test_adbpg_table表的參數需要根據您實際情況進行修改,參數說明如下:

    參數

    是否必填

    說明

    connector

    connector名稱,固定為adbpg-nightly-版本號碼,例如adbpg-nightly-1.13

    url

    AnalyticDB PostgreSQL版的JDBC串連地址。格式為jdbc:postgresql://<內網地址>:<連接埠>/<串連的資料庫名稱>,樣本如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:5432/postgres

    tablename

    AnalyticDB PostgreSQL版的表名。

    username

    AnalyticDB PostgreSQL版的資料庫帳號。

    password

    AnalyticDB PostgreSQL版的資料庫帳號密碼。

    maxretrytimes

    SQL執行失敗後重試次數,預設為3次。

    batchsize

    一次批量寫入的最大條數,預設為50000條。

    exceptionmode

    資料寫入過程中出現異常時的處理策略。支援以下兩種處理策略:

    • ignore(預設值):忽略出現異常時寫入的資料。

    • strict:資料寫入異常時,容錯移轉(Failover)並報錯。

    conflictmode

    當出現主鍵衝突或者唯一索引衝突時的處理策略。支援以下四種處理策略:

    • ignore :忽略主鍵衝突,保留之前的資料。

    • strict:主鍵衝突時,容錯移轉(Failover)並報錯。

    • update:主鍵衝突時,更新新到的資料。

    • upsert(預設值):主鍵衝突時,採用UPSERT方式寫入資料。

      AnalyticDB PostgreSQL版通過INSERT ON CONFLICTCOPY ON CONFLICT實現UPSERT寫入資料,如果目標表為分區表,則需要核心小版本為V6.3.6.1及以上,如何升級核心小版本,請參見版本升級

    targetschema

    AnalyticDB PostgreSQL版的Schema,預設為public。

    writemode

    寫入方式。取值說明:

    • 0 :採用BATCH INSERT方式寫入資料。

    • 1(預設值):採用COPY API寫入資料。

    • 2:採用BATCH UPSERT方式寫入資料。

    verbose

    是否輸出connector作業記錄。取值說明:

    • 0(預設):不輸出作業記錄。

    • 1:輸出作業記錄。

    retrywaittime

    出現異常重試時間隔的時間。單位為ms,預設值為100。

    batchwritetimeoutms

    攢批寫入資料時最長攢批時間,超過此事件會觸發這一批的寫入。單位為毫秒(ms),預設值為50000。

    connectionmaxactive

    串連池參數,單個Task manager中串連池同一時刻最大串連數。預設值為5。

    casesensitive

    列名和表名是否區分大小寫,取值說明:

    • 0(預設):不區分大小寫。

    • 1:區分大小寫。

    說明

    支援參數和類型映射,詳情請參見連接器雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)

  2. 啟動作業。

    1. 在作業開發頁面頂部,單擊部署,在彈出的對話方塊中,單擊確定

      說明

      Session叢集適用於非生產環境的開發測試環境,您可以使用Session叢集模式調試作業,提高作業JM(Job Manager)資源使用率和作業啟動速度。但不推薦您將作業提交至Session叢集中,因為會存在業務穩定性問題,詳情請參見作業調試

    2. 作業營運頁面,單擊目標作業操作列的啟動

    3. 單擊啟動

觀察同步結果

  1. 串連AnalyticDB PostgreSQL版資料庫。具體操作,請參見用戶端串連

  2. 執行以下語句查詢test_adbpg_table表。

    SELECT * FROM test_adbpg_table;

    資料正常寫入到AnalyticDB PostgreSQL版中,返回樣本如下。

    adbpg2.png

相關文檔