本文介紹使用DataX Doris Writer同步資料至ApsaraDB for SelectDB。
概述
DataX是阿里巴巴開源的一個異構資料來源離線同步工具,致力於實現包括關係型資料庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構資料來源之間穩定高效的資料同步功能。您可以通過DataX服務讀取上遊資料,然後由DataX Doris Writer將資料寫入到ApsaraDB for SelectDB。
前提條件
已安裝Maven環境。
已安裝Python3.6及以上版本。
使用樣本
如下以MySQL資料來源為例,介紹在Linux環境下如何通過DataX將MySQL資料匯入至ApsaraDB for SelectDB。
步驟一:配置DataX環境
下載DataX程式包代碼,外掛程式代碼下載請訪問Doris社區。
運行DataX程式包中的init-env.sh指令碼,構建DataX開發環境。
sh init-env.sh編譯mysqlreader和doriswriter。
編譯整個DataX專案。
cd DataX/ mvn package assembly:assembly -Dmaven.test.skip=true編譯產出結果在target/datax/datax/.目錄下。
說明hdfsreader,hdfswriter,ossreader和osswriter這四個外掛程式需要額外的jar包,如果不需要這些外掛程式,可以在DataX/pom.xml中刪除這些外掛程式的模組。
單獨編譯mysqlreader和selectdbwriter外掛程式。
mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests
步驟二:構造需要匯入的資料
建立MySQL測試表。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3使用DMS構建測試資料,詳情請參見測試資料構建。
步驟三:配置ApsaraDB for SelectDB執行個體。
通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;建立測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
步驟四:通過DataX服務同步MySQL資料到SelectDB
開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址。
將DataX主機的公網IP添加到IP白名單中,詳情請參見設定白名單。
建立設定檔
mysqlToSelectDB.json,配置任務資訊。{ "job":{ "content":[ { "reader":{ "name": "mysqlreader", "parameter": { "column": [ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "where": "emp_no>0", "connection": [ { "jdbcUrl": [ "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8" ], "table": [ "employees" ] } ], "password": "123456", "splitPk": "emp_no", "username": "admin" } }, "writer":{ "name":"doriswriter", "parameter":{ "loadUrl":[ "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080" ], "loadProps":{ "format":"json", "strip_outer_array":"true" }, "column":[ "emp_no", "birth_date", "first_name", "last_name", "gender", "hire_date" ], "username":"admin", "password":"123456", "postSql":[ ], "preSql":[ ], "connection":[ { "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db", "table":[ "employees" ], "selectedDatabase":"test_db" } ], "maxBatchRows":1000000, "batchSize":536870912000 } } } ], "setting":{ "errorLimit":{ "percentage":0.02, "record":0 }, "speed":{ "channel":5 } } } }參數說明
參數
是否必填
預設值
描述
jdbcUrl
是
無
JDBC串連URL
jdbc:mysql://<ip>:<port>。您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取IP地址和MySQL協議連接埠。
IP地址:VPC地址或公網地址。
連接埠:MySQL協議連接埠。
樣本:
jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030說明如果DataX主機和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下請使用公網地址。
loadUrl
是
無
SelectDB的HTTP協議訪問地址
<ip>:<port>。您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取IP地址和HTTP協議連接埠。
IP地址:VPC地址或公網地址。
連接埠:HTTP協議連接埠。
樣本:
selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080說明如果DataX主機和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下請使用公網地址。
username
是
無
ApsaraDB for SelectDB執行個體的使用者名稱。
password
是
無
ApsaraDB for SelectDB執行個體對應使用者的密碼。
connection.selectedDatabase
是
無
需要寫入的ApsaraDB for SelectDB資料庫名稱。
connection.table
是
無
需要寫入的ApsaraDB for SelectDB表名稱。
column
是
無
目的表需要寫入資料的欄位,這些欄位將作為產生的JSON資料的欄位名。欄位之間用英文逗號分隔。樣本:
"column": ["id","name","age"]。preSql
否
無
寫入資料到目的表前,會先執行這裡的標準語句。
postSql
否
無
寫入資料到目的表後,會執行這裡的標準語句。
maxBatchRows
否
500000
每批次匯入資料的最大行數。和batchSize共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。
batchSize
否
104857600
每批次匯入資料的最巨量資料量。和maxBatchRows共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。預設值為100 M。
maxRetries
否
3
每批次匯入資料失敗後的重試次數。
labelPrefix
否
datax_doris_writer_
每批次上傳檔案的label首碼。最終的label將由 'labelPrefix + UUID'組成全域唯一的label,確保資料不會重複匯入。
loadProps
否
無
與Stream Load的請求參數相同。詳情請參見Stream Load參數說明。配置匯入資料格式使用參數format,匯入資料格式預設使用CSV,支援JSON,詳情請參考類型轉換。
flushInterval
否
30000
資料寫入批次的時間間隔。預設為30000ms
命令列提交任務。
cd target/datax/datax/bin python datax.py ../mysqlToSelectDB.json
類型轉換
預設傳入的資料均會被轉為字串,並以\t作為資料行分隔符號,\n作為行分隔字元,組成csv檔案進行SelectDB匯入操作。 預設是csv格式匯入,如需更改資料行分隔符號, 則正確配置loadProps即可,樣本如下。
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}如需更改匯入格式為JSON,則正確配置loadProps下的format即可,樣本如下。
"loadProps": {
"format": "json",
"strip_outer_array": true
}