全部產品
Search
文件中心

ApsaraDB for SelectDB:遷移PostgreSQL資料

更新時間:Jun 19, 2025

通過DTS、阿里雲DataWorks、Flink CDCCatalog均可將PostgreSQL(如自建PostgreSQL、RDS PostgreSQL和PolarDB PostgreSQL等)的資料移轉至ApsaraDB for SelectDB。您可依據遷移的資料量和業務情境,選擇合適的方式完成資料移轉。

遷移方式功能對比

DTS、DataWorks、Flink CDC、Catalog均可將PostgreSQL的資料移轉至SelectDB,但不同的方式支援遷移的資料有所不同,您可根據不同的業務情境,選擇合適的遷移方式。

遷移方式

歷史資料移轉

增量資料同步

表結構遷移

整庫遷移

增量同步處理DDL

資料校正

DTS

✔️

✔️

✔️

✔️

✔️

✔️

DataWorks

✔️

✔️

✔️

✔️

✔️

Flink CDC

✔️

✔️

✔️

✔️

✔️

Catalog

✔️

前提條件

  • 確保PostgreSQL執行個體和SelectDB執行個體的網路處於互連狀態。

操作步驟

通過DTS遷移

DTS支援遷移PostgreSQL歷史資料、同步增量資料到SelectDB,且支援庫表遷移、DDL同步、資料校正等能力。

通過DataWorks遷移

步驟一:新增資料來源

在進行資料同步任務開發時,您需要在DataWorks上分別建立PostgreSQL和SelectDB資料來源。

  1. 建立PostgreSQL資料來源

  2. 建立SelectDB資料來源,詳情請參見建立並管理資料來源SelectDB資料來源的部分配置參數如下所示:

    參數

    說明

    資料來源名稱

    資料來源的名稱。

    MySQL串連地址

    JDBC串連串jdbc:mysql://<ip>:<port>/<dbname>

    您可在SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

    樣本:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_db

    HTTP串連地址

    HTTP協議訪問地址<ip>:<port>

    您在SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

    樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

    使用者名稱

    SelectDB執行個體的使用者名稱。

    密碼

    SelectDB執行個體對應使用者的密碼。

步驟二:配置資料同步任務

請參見如下文檔,配置資料移轉任務:

通過Flink CDC遷移

Flink提供Flink CDC方式從PostgreSQL遷移資料到SelectDB。Flink CDC支援歷史資料移轉、增量資料同步,且具備完備的庫表遷移、DDL同步等能力。

準備環境

搭建Flink環境,本樣本以Flink 1.16單機環境為例。

  1. 下載flink-1.16.3-bin-scala_2.12.tgz,進行解壓,樣本如下。如果此版本已到期,請可以下載其他版本。更多版本,請參見Apache Flink

    wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  2. 進入FLINK_HOME/lib目錄中下載flink-sql-connector-mysql-cdc-2.4.2flink-doris-connector-1.16-1.5.2,樣本如下。

    說明

    整庫同步支援Flink 1.15以上的版本,各個版本Flink Doris Connector的下載請參見Flink Doris Connector

    cd flink-1.16.3
    cd lib/
    wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
    wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
  3. 啟動Flink Standalone叢集,樣本如下。

    bin/start-cluster.sh
  4. 建立SelectDB執行個體。具體操作,請參見建立執行個體

  5. 通過MySQL協議串連SelectDB執行個體。具體操作,請參見串連執行個體

  6. 建立測試資料庫和測試表。

    1. 建立測試資料庫。

      CREATE DATABASE test_db;
    2. 建立測試表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

提交Flink CDC任務

提交Flink CDC任務的文法如下:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

參數說明

參數

是否必填

說明

execution.checkpointing.interval

Flink checkpoint的時間間隔,影響資料同步的頻率,推薦10s。

parallelism.default

設定Flink任務的並行度,適當增加並行度可提高資料同步速度。

database

同步到SelectDB的資料庫名。

including-tables

需要同步的PostgreSQL表。

可以使用"|"分隔多個表,並支援Regex。

例如--including-tables table1|tbl.*,指同步table1和所有以tbl開頭的表。

excluding-tables

不需要同步的表,配置方法與including-tables相同。

postgres-conf

Postgres CDC Source配置。配置詳情請參見Postgres CDC Connector,其中hostnameusernamepassworddatabase-nametable-nameschema-name是必選項。

sink-conf

Doris Sink的所有配置。更多詳情,請參見通過Flink匯入資料

table-conf

SelectDB表的配置項,即建立SelectDB表時properties中的內容。

通過Catalog遷移

SelectDB提供的Catalog能力,支援通過聯邦查詢方式訪問PostgreSQL,可簡單快速完成PostgreSQL歷史資料移轉到SelectDB。

  1. 串連SelectDB執行個體。具體操作,請參見串連執行個體

    說明

    使用DMS登入時,SWITCH指令失效。推薦使用MySQL用戶端串連。

  2. 建立PostgreSQL JDBC Catalog。

    CREATE CATALOG jdbc_postgresql PROPERTIES (
        "type"="jdbc",
        "user"="root",
        "password"="123456",
        "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/demo",
        "driver_url" = "postgresql-42.5.1.jar",
        "driver_class" = "org.postgresql.Driver",
        "checksum" = "20c8228267b6c9ce620fddb39467d3eb"
    );

    參數說明

    參數

    是否必選

    說明

    user

    對應資料庫的帳號。

    password

    對應資料庫的密碼。

    jdbc_url

    JDBC串連串。

    driver_url

    JDBC Driver Jar包名稱。

    driver_class

    JDBC Driver Class名稱。

    lower_case_table_names

    指定是否以小寫形式同步JDBC外部資料源的庫名和表名。

    預設值:"false"

    only_specified_database

    指定是否只同步指定的Database。

    預設值:"false"

    include_database_list

    only_specified_database=true時,指定同步多個Database,以英文逗號(,)分隔。DB名稱大小寫敏感。

    預設值:""

    exclude_database_list

    only_specified_database=true時,指定不需要同步的多個Database,以英文逗號(,)分隔。DB名稱大小寫敏感。

    預設值:""

    更多詳情,請參見JDBC資料來源

  3. 在SelectDB中進行建表後,然後通過庫內ETL文法insert into select完成資料同步。更多insert into詳情,請參見Insert Into

    # 建表
    CREATE TABLE selectdb_table ...
    
    # 遷移資料
    INSERT INTO selectdb_table SELECT * FROM jdbc_postgresql.pg_database.pg_table;