DataX Writer外掛程式實現了寫入資料到StarRocks目的表的功能。在底層實現上,DataX Writer通過Stream Load以CSV或JSON格式匯入資料至StarRocks。內部將Reader讀取的資料進行緩衝後大量匯入至StarRocks,以提高寫入效能。阿里雲DataWorks已經整合了DataX匯入的能力,可以同步MaxCompute資料到EMR StarRocks。本文為您介紹DataX Writer原理,以及如何使用DataWorks進行離線同步任務。
背景資訊
DataX Writer總體的資料流為Source -> Reader -> DataX channel -> Writer -> StarRocks。
功能說明
環境準備
測試時可以使用命令python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json。
配置範例
從MySQL讀取資料後匯入至StarRocks。
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"column": [ "k1", "k2", "v1", "v2" ],
"connection": [
{
"table": [ "table1", "table2" ],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax_test1"
]
},
{
"table": [ "table3", "table4" ],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax_test2"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"database": "xxxx",
"table": "xxxx",
"column": ["k1", "k2", "v1", "v2"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://172.28.**.**:9030/",
"loadUrl": ["172.28.**.**:8030", "172.28.**.**:8030"],
"loadProps": {}
}
}
}
]
}
}相關參數描述如下表所示。
參數 | 描述 | 是否必選 | 預設值 |
username | StarRocks資料庫的使用者名稱。 | 是 | 無 |
password | StarRocks資料庫的密碼。 | 是 | 無 |
database | StarRocks資料庫的名稱。 | 是 | 無 |
table | StarRocks表的名稱。 | 是 | 無 |
loadUrl | StarRocks FE的地址,用於Stream Load,可以為多個FE地址,格式為 | 是 | 無 |
column | 目的表需要寫入資料的欄位,欄位之間用英文逗號(,)分隔。例如, 重要 該參數必須指定。如果希望匯入所有欄位,可以使用 | 是 | 無 |
preSql | 寫入資料到目的表前,會先執行設定的標準語句。 | 否 | 無 |
postSql | 寫入資料到目的表後,會先執行設定的標準語句。 | 否 | 無 |
jdbcUrl | 目的資料庫的JDBC串連資訊,用於執行preSql和postSql。 | 否 | 無 |
maxBatchRows | 單次Stream Load匯入的最大行數。 | 否 | 500000(50W) |
maxBatchSize | 單次Stream Load匯入的最大位元組數。 | 否 | 104857600 (100M) |
flushInterval | 上一次Stream Load結束至下一次開始的時間間隔。單位為ms。 | 否 | 300000(ms) |
loadProps | Stream Load的請求參數,詳情請參見Stream Load。 | 否 | 無 |
類型轉換
預設傳入的資料均會被轉為字串,並以\t作為資料行分隔符號,\n作為行分隔字元,組成CSV檔案進行Stream Load匯入操作。類型轉換樣本如下:
更改資料行分隔符號,則loadProps配置如下。
"loadProps": { "column_separator": "\\x01", "row_delimiter": "\\x02" }更改匯入格式為JSON,則loadProps配置如下。
"loadProps": { "format": "json", "strip_outer_array": true }
DataWorks離線同步使用方式
在DataWorks上建立工作空間,詳情請參見建立工作空間。
在DataWorks上建立測試表並上傳資料到MaxCompute資料來源,詳情請參見建表並上傳資料。
建立StarRocks資料來源。
在DataWorks的工作空間列表頁面,單擊目標工作空間操作列的Data Integration。
在左側導覽列,單擊資料來源。
單擊右上方的新增資料來源。
在新增資料來源對話方塊中,新增StarRocks類型的資料來源。
建立離線同步任務流程。
建立商務程序,詳情請參見建立商務程序。
在目錄商務程序,建立離線同步任務,詳情請參見通過嚮導模式配置離線同步任務。
在StarRocks叢集中查看資料,詳情請參見快速入門。