通過DTS、阿里雲DataWorks、Flink CDC和Catalog均可將PostgreSQL(如自建PostgreSQL、RDS PostgreSQL和PolarDB PostgreSQL等)的資料移轉至ApsaraDB for SelectDB。您可依據遷移的資料量和業務情境,選擇合適的方式完成資料移轉。
遷移方式功能對比
DTS、DataWorks、Flink CDC、Catalog均可將PostgreSQL的資料移轉至SelectDB,但不同的方式支援遷移的資料有所不同,您可根據不同的業務情境,選擇合適的遷移方式。
遷移方式 | 歷史資料移轉 | 增量資料同步 | 表結構遷移 | 整庫遷移 | 增量同步處理DDL | 資料校正 |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ | |
✔️ | ✔️ | ✔️ | ✔️ | ✔️ | ❌ | |
✔️ | ❌ | ❌ | ❌ | ❌ | ❌ |
前提條件
確保PostgreSQL執行個體和SelectDB執行個體的網路處於互連狀態。
PostgreSQL執行個體和SelectDB執行個體處於同一VPC下。如果不在同一VPC下,請先解決網路互連問題。如何操作,請參見如何解決SelectDB執行個體與資料來源網路互連問題?
已將PostgreSQL執行個體IP添加至SelectDB的白名單。具體操作,請參見設定白名單。
若PostgreSQL執行個體存在白名單機制,已將SelectDB執行個體所在網段IP添加至PostgreSQL執行個體的白名單中。
擷取SelectDB執行個體VPC地址的IP,請參見如何查看雲資料庫 SelectDB 版執行個體所屬VPC的IP網段?
擷取SelectDB執行個體公網的IP地址,通過
ping命令訪問SelectDB執行個體的公網地址,擷取其對應的 IP 位址。
操作步驟
通過DTS遷移
DTS支援遷移PostgreSQL歷史資料、同步增量資料到SelectDB,且支援庫表遷移、DDL同步、資料校正等能力。
通過DataWorks遷移
步驟一:新增資料來源
在進行資料同步任務開發時,您需要在DataWorks上分別建立PostgreSQL和SelectDB資料來源。
建立SelectDB資料來源,詳情請參見建立並管理資料來源。SelectDB資料來源的部分配置參數如下所示:
參數
說明
資料來源名稱
資料來源的名稱。
MySQL串連地址
JDBC串連串
jdbc:mysql://<ip>:<port>/<dbname>。您可在SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠。
樣本:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030/test_dbHTTP串連地址
HTTP協議訪問地址
<ip>:<port>。您在SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。
樣本:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080使用者名稱
SelectDB執行個體的使用者名稱。
密碼
SelectDB執行個體對應使用者的密碼。
步驟二:配置資料同步任務
請參見如下文檔,配置資料移轉任務:
通過Flink CDC遷移
Flink提供Flink CDC方式從PostgreSQL遷移資料到SelectDB。Flink CDC支援歷史資料移轉、增量資料同步,且具備完備的庫表遷移、DDL同步等能力。
準備環境
搭建Flink環境,本樣本以Flink 1.16單機環境為例。
下載flink-1.16.3-bin-scala_2.12.tgz,進行解壓,樣本如下。如果此版本已到期,請可以下載其他版本。更多版本,請參見Apache Flink。
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz tar -zxvf flink-1.16.3-bin-scala_2.12.tgz進入FLINK_HOME/lib目錄中下載flink-sql-connector-mysql-cdc-2.4.2和flink-doris-connector-1.16-1.5.2,樣本如下。
說明整庫同步支援Flink 1.15以上的版本,各個版本Flink Doris Connector的下載請參見Flink Doris Connector。
cd flink-1.16.3 cd lib/ wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar啟動Flink Standalone叢集,樣本如下。
bin/start-cluster.sh建立SelectDB執行個體。具體操作,請參見建立執行個體。
通過MySQL協議串連SelectDB執行個體。具體操作,請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;建立測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
提交Flink CDC任務
提交Flink CDC任務的文法如下:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****參數說明
參數 | 是否必填 | 說明 |
execution.checkpointing.interval | 是 | Flink checkpoint的時間間隔,影響資料同步的頻率,推薦10s。 |
parallelism.default | 否 | 設定Flink任務的並行度,適當增加並行度可提高資料同步速度。 |
database | 是 | 同步到SelectDB的資料庫名。 |
including-tables | 否 | 需要同步的PostgreSQL表。 可以使用"|"分隔多個表,並支援Regex。 例如 |
excluding-tables | 否 | 不需要同步的表,配置方法與including-tables相同。 |
postgres-conf | 是 | Postgres CDC Source配置。配置詳情請參見Postgres CDC Connector,其中 |
sink-conf | 是 | Doris Sink的所有配置。更多詳情,請參見通過Flink匯入資料。 |
table-conf | 否 | SelectDB表的配置項,即建立SelectDB表時properties中的內容。 |
通過Catalog遷移
SelectDB提供的Catalog能力,支援通過聯邦查詢方式訪問PostgreSQL,可簡單快速完成PostgreSQL歷史資料移轉到SelectDB。
串連SelectDB執行個體。具體操作,請參見串連執行個體。
說明使用DMS登入時,
SWITCH指令失效。推薦使用MySQL用戶端串連。建立PostgreSQL JDBC Catalog。
CREATE CATALOG jdbc_postgresql PROPERTIES ( "type"="jdbc", "user"="root", "password"="123456", "jdbc_url" = "jdbc:postgresql://127.0.0.1:5432/demo", "driver_url" = "postgresql-42.5.1.jar", "driver_class" = "org.postgresql.Driver", "checksum" = "20c8228267b6c9ce620fddb39467d3eb" );參數說明
參數
是否必選
說明
user
是
對應資料庫的帳號。
password
是
對應資料庫的密碼。
jdbc_url
是
JDBC串連串。
driver_url
是
JDBC Driver Jar包名稱。
driver_class
是
JDBC Driver Class名稱。
lower_case_table_names
否
指定是否以小寫形式同步JDBC外部資料源的庫名和表名。
預設值:
"false"only_specified_database
否
指定是否只同步指定的Database。
預設值:
"false"include_database_list
否
當
only_specified_database=true時,指定同步多個Database,以英文逗號(,)分隔。DB名稱大小寫敏感。預設值:
""exclude_database_list
否
當
only_specified_database=true時,指定不需要同步的多個Database,以英文逗號(,)分隔。DB名稱大小寫敏感。預設值:
""更多詳情,請參見JDBC資料來源。
在SelectDB中進行建表後,然後通過庫內ETL文法
insert into select完成資料同步。更多insert into詳情,請參見Insert Into。# 建表 CREATE TABLE selectdb_table ... # 遷移資料 INSERT INTO selectdb_table SELECT * FROM jdbc_postgresql.pg_database.pg_table;