SeaTunnel整合ApsaraDB for SelectDB,支援使用SeaTunnel SelectDB Sink匯入表資料至ApsaraDB for SelectDB。本文將為您介紹使用SeaTunnel SelectDB Sink同步資料至ApsaraDB for SelectDB的使用方式。
概述
SeaTunnel是一款簡單易用、高效能的分布式Data Integration平台,支援海量資料即時同步。您可以通過SeaTunnel平台讀取MySQL、Hive、Kafka等資料來源中的海量資料,然後由SeaTunnel SelectDB Sink將資料寫入到ApsaraDB for SelectDB中。
前提條件
SeaTunnel 2.3.1版本及以上。
使用方式
SeaTunnel支援以JSON格式或CSV格式將上遊資料寫入到ApsaraDB for SelectDB,不同寫入方式的配置文法如下。
JSON格式
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="json"
}
}
}CSV格式
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="csv"
file.column_separator=","
file.line_delimiter="\n"
}
}
}參數說明如下。
參數 | 是否必填 | 說明 |
load-url | 是 | ApsaraDB for SelectDB執行個體的訪問地址和HTTP協議連接埠。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。 樣本: |
jdbc-url | 是 | ApsaraDB for SelectDB執行個體的訪問地址和MySQL協議連接埠。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠。 樣本: |
cluster-name | 是 | ApsaraDB for SelectDB執行個體中的叢集名稱。執行個體中可能包含多個叢集,可按需選擇。 |
username | 是 | ApsaraDB for SelectDB執行個體的使用者名稱。 |
password | 是 | ApsaraDB for SelectDB執行個體對應使用者名稱的密碼。 |
table.identifier | 是 | ApsaraDB for SelectDB執行個體的表名,格式為 |
selectdb.config | 是 | 寫入任務的屬性配置。
|
sink.enable-delete | 否 | 是否開啟大量刪除功能(僅支援Unique表)。 |
sink.buffer-size | 否 | 緩衝的最大容量,單位位元組,預設為:10MB,當緩衝超過最大容量時,會將緩衝中的內容全部flush到Object Storage Service上,不建議修改。 |
sink.buffer-count | 否 | 緩衝的最大條數,預設為:10000,當緩衝超過最大條數時,會將緩衝中的內容全部flush到Object Storage Service上,不建議修改。 |
sink.max-retries | 否 | Commit階段的最大重試次數。預設3次。 |
sink.enable-2pc | 否 | 是否啟用兩階段交易認可,以確保exact-once語義。預設為true。 |
使用樣本
以MySQL資料來源為例,為您介紹如何通過SeaTunnel將上遊的MySQL資料匯入至ApsaraDB for SelectDB。樣本中各軟體版本如下:
環境 | 版本 |
JDK | 1.8 |
SeaTunnel | 2.3.3 |
SelectDB | 3.0.4 |
環境準備
配置SeaTunnel環境。
下載並解壓SeaTunnel安裝包。本樣本中使用SeaTunnel安裝包:apache-seatunnel-2.3.3-bin.tar.gz。
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz修改SEATUNNEL_HOME/config/plugin_config設定檔,保留需要的Connector外掛程式。
--connectors-v2-- connector-cdc-mysql connector-selectdb-cloud connector-jdbc connector-fake connector-console connector-assert --end--安裝SeaTunnel Connector外掛程式。
sh bin/install-plugin.sh下載MySQL驅動並放至SEATUNNEL_HOME/jar目錄。
cd lib/ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
構造需要匯入的資料。本文以MySQL為例,構造少量範例資料來完成匯入。
建立MySQL測試表。
CREATE TABLE `employees` ( `emp_no` INT NOT NULL, `birth_date` DATE NOT NULL, `first_name` VARCHAR(14) NOT NULL, `last_name` VARCHAR(16) NOT NULL, `gender` ENUM('M','F') NOT NULL, `hire_date` DATE NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3使用DMS構建測試資料,詳情請參見測試資料構建。
配置ApsaraDB for SelectDB執行個體。
建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體。
通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;建立測試表。
USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址。
將SeaTunnel環境的公網IP添加到IP白名單中,詳情請參見設定白名單。
通過SuaTunnel本地引擎同步MySQL資料到SelectDB
建立設定檔
mysqlToSelectDB.conf,配置任務資訊。env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ jdbc { url = "jdbc:mysql://host:ip/test_db" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "****" query = "select * from employees" } } sink { SelectDBCloud { load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" cluster-name="new_cluster" table.identifier="test_db.employees" username="admin" password="****" selectdb.config { file.type="json" } } }命令列提交任務。
sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local