全部產品
Search
文件中心

AnalyticDB:通過Flink即時讀寫全量資料

更新時間:Apr 12, 2025

本文介紹如何通過阿里雲Realtime ComputeFlink版即時讀寫雲原生資料倉儲AnalyticDB PostgreSQL版全量資料。

背景資訊

雲原生資料倉儲AnalyticDB PostgreSQL版是一種大規模平行處理(MPP)資料倉儲服務,可提供海量資料線上分析服務。Realtime ComputeFlink版是基於Apache Flink構建的⼀站式即時巨量資料分析平台,內建豐富上下遊連接器,滿足不同業務情境的需求,提供高效、靈活的Realtime Compute服務。通過Realtime ComputeFlink版讀取AnalyticDB PostgreSQL版資料,可以充分發揮雲原生資料倉儲的優勢,提高資料分析的效率和精度。

使用限制

  • 該功能暫不支援AnalyticDB PostgreSQL版Serverless模式

  • 僅FlinkRealtime Compute引擎VVR 6.0.0及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版連接器。

  • 僅FlinkRealtime Compute引擎VVR 8.0.1及以上版本支援雲原生資料倉儲AnalyticDB PostgreSQL版7.0版本。

    說明

    如果您使用了自訂連接器,具體操作請參見管理自訂連接器

前提條件

  • 已建立Flink全託管工作空間。具體操作,請參見開通Realtime ComputeFlink版

  • 已建立AnalyticDB PostgreSQL版執行個體。具體操作,請參見建立執行個體

  • AnalyticDB PostgreSQL版執行個體和Flink全託管工作空間需要位於同一VPC下。

步驟一:配置AnalyticDB PostgreSQL版執行個體

  1. 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台
  2. 將Flink工作空間所屬的網段加入AnalyticDB PostgreSQL版的白名單。如何添加白名單,請參見設定白名單

  3. 單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連

  4. AnalyticDB PostgreSQL版執行個體上建立一張名為adbpg_dim_table的維表並插入50條測試資料。

    建表SQL和插入資料SQL的樣本如下:

    --建立名稱為adbpg_dim_table的表。
    CREATE TABLE adbpg_dim_table(
    id int,
    username text,
    PRIMARY KEY(id)
    );
    
    --向adbpg_dim_table的表中插入50行資料,其中id欄位的值為從1到50的整數,而username欄位的值為username字串後面跟隨當前行數的文本表示。
    INSERT INTO adbpg_dim_table(id, username)
    SELECT i, 'username'||i::text
    FROM generate_series(1, 50) AS t(i);
  5. 建立一張名為adbpg_sink_table的結果表,用於Flink寫入結果資料。

    CREATE TABLE adbpg_sink_table(
      id int,
      username text,
      score int
    );

步驟二:建立Flink作業

  1. 登入Realtime Compute控制台,在Flink全託管頁簽,單擊目標工作空間操作列下的控制台

  2. 在左側導覽列,單擊SQL開發,單擊建立,選擇空白的流作業草稿,單擊下一步

  3. 建立檔案草稿對話方塊,填寫作業配置資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    adbpg-test

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink的引擎版本。引擎版本號碼含義、版本對應關係和生命週期重要時間點詳情請參見引擎版本介紹

    vvr-6.0.7-flink-1.15

  4. 單擊建立

步驟三:編寫作業代碼並部署

  1. 將以下作業代碼拷貝到作業文本編輯區。

    ---建立一個datagen源表。
    CREATE TEMPORARY TABLE datagen_source (
     id INT,
     score INT
    ) WITH (
     'connector' = 'datagen',
     'fields.id.kind'='sequence',
     'fields.id.start'='1',
     'fields.id.end'='50',
     'fields.score.kind'='random',
     'fields.score.min'='70',
     'fields.score.max'='100'
    );
    
    --建立adbpg維表。
    CREATE TEMPORARY TABLE dim_adbpg(
     id int,
     username varchar,
     PRIMARY KEY(id) not ENFORCED
    ) WITH(
     'connector' = 'adbpg', 
     'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
     'tablename' = 'adbpg_dim_table', 
     'username' = 'flink****test',
     'password' = '*******',
     'maxJoinRows'='100',
     'maxRetryTimes'='1', 
     'cache'='lru',
     'cacheSize'='1000'
    );
    
    --建立adbpg結果表。
    CREATE TEMPORARY TABLE sink_adbpg (
      id int,
      username varchar,
      score int
    ) WITH (
      'connector' = 'adbpg', 
      'url' = 'jdbc:postgresql://gp-2ze****3tysk255b5-master.gpdb.rds.aliyuncs.com:5432/flinktest',
      'tablename' = 'adbpg_sink_table',  
      'username' = 'flink****test',
      'password' = '******',
      'maxRetryTimes' = '2',
      'batchsize' = '5000',
      'conflictMode' = 'ignore',
      'writeMode' = 'insert',
      'retryWaitTime' = '200'
    );
    
    --維表和源表join後的結果插入adbpg結果表。
    INSERT INTO sink_adbpg
    SELECT ts.id,ts.username,ds.score
    FROM datagen_source AS ds
     join
    dim_adbpg FOR SYSTEM_TIME AS OF PROCTIME() AS ts
    on ds.id = ts.id;
  2. 根據實際情況修改以下參數,參數說明如下。

    參數

    是否必填

    說明

    URL

    AnalyticDB PostgreSQL版的JDBC串連地址。格式為jdbc:postgresql://<內網地址>:<連接埠>/<串連的資料庫名稱>,樣本如下jdbc:postgresql://gp-xxxxxx.gpdb.cn-chengdu.rds.aliyuncs.com:3432/postgres

    tablename

    AnalyticDB PostgreSQL版的表名。

    username

    AnalyticDB PostgreSQL版的資料庫帳號。

    password

    AnalyticDB PostgreSQL版的資料庫帳號密碼。

    說明

    更多參數和類型映射,詳情請參見連接器雲原生資料倉儲AnalyticDB PostgreSQL版(ADB PG)

  3. 在作業開發頁面頂部,單擊深度檢查,進行語法檢查。

  4. 單擊部署

  5. 作業營運頁面,單擊啟動

步驟四:查看Flink寫入資料

  1. 登入雲原生資料倉儲AnalyticDB PostgreSQL版控制台
  2. 單擊登入資料庫,串連資料庫的更多方式,請參見用戶端串連

  3. 執行如下查詢語句,查看Flink寫入資料。

    SELECT * FROM adbpg_sink_table;

    image.png

相關文檔