DataWorksData Integration支援讀寫Doris。本文為您介紹DataWorks的Doris資料同步能力支援情況。
支援的Doris版本
Doris Writer使用的驅動版本是MySQL Driver 5.1.47,該驅動支援的核心版本如下。驅動能力詳情請參見Doris官網文檔。
Doris 版本 | 是否支援 |
0.x.x | 支援 |
1.1.x | 支援 |
1.2.x | 支援 |
2.x | 支援 |
使用限制
Data Integration僅支援離線寫入Doris。
支援的欄位類型
不同Doris版本支援不同的資料類型和彙總模型。各版本Doris的全量欄位類型請參見Doris的官方文檔,下面為您介紹Doris當前主要欄位的支援情況。
類型 | 支援模型 | Doris版本 | 離線寫入(Doris Writer) |
SMALLINT | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
INT | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
BIGINT | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
LARGEINT | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
FLOAT | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
DOUBLE | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
DECIMAL | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
DECIMALV3 | Aggregate,Unique,Duplicate | 1.2.1+、2.x | 支援 |
DATE | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
DATETIME | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
DATEV2 | Aggregate,Unique,Duplicate | 1.2.x、2.x | 支援 |
DATATIMEV2 | Aggregate,Unique,Duplicate | 1.2.x、2.x | 支援 |
CHAR | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
VARCHAR | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
STRING | Aggregate,Unique,Duplicate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
VARCHAR | Aggregate,Unique,Duplicate | 1.1.x、1.2.x、2.x | 支援 |
ARRAY | Duplicate | 1.2.x、2.x | 支援 |
JSONB | Aggregate,Unique,Duplicate | 1.2.x、2.x | 支援 |
HLL | Aggregate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
BITMAP | Aggregate | 0.x.x、1.1.x、1.2.x、2.x | 支援 |
QUANTILE_STATE | Aggregate | 1.2.x、2.x | 支援 |
實現原理
Doris Writer通過Doris原生支援的StreamLoad方式匯入資料,Doris Writer會將Reader端讀取到的資料緩衝在記憶體中,並拼接成文本,然後大量匯入至Doris資料庫。更多詳情請參見Doris官方文檔。
資料同步前準備
在DataWorks上進行資料同步前,您需要參考本文提前在Doris側進行資料同步環境準備,以便在DataWorks上進行Doris資料同步任務配置與執行時服務正常。以下為您介紹Doris同步前的相關環境準備。
確認Doris的版本
Data Integration對Doris版本有要求,您可以參考上文支援的Doris版本章節,查看當前待同步的Doris是否符合版本要求。您可以在Doris的官方網站下載對應的版本,並且安裝。
建立帳號,並配置帳號許可權
您需要規劃一個資料倉儲的登入帳號用於後續操作,同時,您需要為該帳號設定密碼,以便後續串連到資料倉儲。如果您要使用Doris預設的root使用者進行登入,那麼您需要為root使用者佈建密碼。root使用者預設沒有密碼,您可以在Doris上執行SQL來設定密碼:
SET PASSWORD FOR 'root' = PASSWORD('密碼')配置Doris的網路連接
使用StreamLoad匯入資料,需要訪問FE節點的私網地址。如果訪問FE的公網地址,則會被重新導向到BE節點的內網IP(資料操作問題)。因此,您需要將Doris的網路與Data Integration使用的Serverless資源群組或獨享Data Integration資源群組打通,使之通過內網地址進行訪問。網路打通的具體操作可以參考網路連通方案。
建立資料來源
在進行資料同步任務開發時,您需要在DataWorks上建立一個對應的資料來源,操作流程請參見資料來源管理,詳細的配置參數解釋可在配置介面查看對應參數的文案提示。
下面對Doris資料來源的幾個配置項進行說明:
JdbcUrl:請填寫JDBC串連串,包含IP、連接埠號碼、資料庫和串連參數。支援公網IP和私網IP,如果使用公網IP,請確保Data Integration資源群組能夠正常訪問Doris所在的主機。
FE endpoint:請填寫FE節點的IP和連接埠。如果您的叢集中有多個FE節點,可以配置多個FE節點的IP和連接埠,每個IP和連接埠以逗號分隔,例如
ip1:port1,ip2:port2。在測試連通性時,會對所有的FE endpoint做連通性測試。使用者名稱:請填寫Doris資料庫的使用者名稱。
密碼:請填寫Doris資料庫對應使用者的密碼。
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見指令碼模式配置,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
{
"type": "job",
"version": "2.0",//版本號碼。
"steps": [
{
"stepType": "doris",//外掛程式名。
"parameter": {
"column": [//列名。
"id"
],
"connection": [
{
"querySql": [
"select a,b from join1 c join join2 d on c.id = d.id;"
],
"datasource": ""//資料來源名稱。
}
],
"where": "",//過濾條件。
"splitPk": "",//切分鍵。
"encoding": "UTF-8"//編碼格式。
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"//錯誤記錄數。
},
"speed": {
"throttle": true,//當throttle值為false時,mbps參數不生效,表示不限流;當throttle值為true時,表示限流。
"concurrent": 1,//作業並發數。
"mbps": "12"//限流,此處1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Reader指令碼參數
指令碼參數名 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
table | 選取的需要同步的表名稱。一個Data Integration任務只能從一張表中讀取資料。 table用於配置範圍的進階用法樣本如下:
說明 任務會讀取匹配到的所有表,具體讀取這些表中column配置項指定的列。如果表不存在,或者讀取的列不存在,會導致任務失敗。 | 是 | 無 |
column | 所配置的表中需要同步的列名集合,使用JSON的數組描述欄位資訊 。預設使用所有列配置,例如
| 是 | 無 |
splitFactor | 切分因子,可以配置同步資料的切分份數,如果配置了多並發,會按照並發數 * splitFactor份來切分。例如,並發數=5,splitFactor=5,則會按照5*5=25份來切分,在5個並發線程上執行。 說明 建議取值範圍:1~100,過大會導致記憶體溢出。 | 否 | 5 |
splitPk | Doris Reader進行資料幫浦時,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區,資料同步因此會啟動並發任務進行資料同步,提高資料同步的效率。
| 否 | 無 |
where | 篩選條件,在實際業務情境中,往往會選擇當天的資料進行同步,將where條件指定為
| 否 | 無 |
querySql(進階模式,嚮導模式不支援此參數的配置) | 在部分業務情境中,where配置項不足以描述所篩選的條件,您可以通過該配置項來自訂篩選SQL。配置該項後,資料同步系統會忽略tables、columns和splitPk配置項,直接使用該項配置的內容對資料進行篩選。例如,需要進行多表join後同步資料,使用 說明 querySql需要區分大小寫,例如,寫為querysql會不生效。 | 否 | 無 |
Writer指令碼Demo
{
"stepType": "doris",//外掛程式名。
"parameter":
{
"postSql"://執行資料同步任務之後率先執行的SQL語句。
[],
"preSql":
[],//執行資料同步任務之前率先執行的SQL語句。
"datasource":"doris_datasource",//資料來源名。
"table": "doris_table_name",//表名。
"column":
[
"id",
"table_id",
"table_no",
"table_name",
"table_status"
],
"loadProps":{
"column_separator": "\\x01",//指定CSV格式的資料行分隔符號
"line_delimiter": "\\x02"//指定CSV格式的行分隔字元
}
},
"name": "Writer",
"category": "writer"
}Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
datasource | 資料來源名稱,指令碼模式支援添加資料來源,此配置項填寫的內容必須與添加的資料來源名稱保持一致。 | 是 | 無 |
table | 選取的需要同步的表名稱。 | 是 | 無 |
column | 目標表需要寫入資料的欄位,欄位之間用英文逗號分隔。例如 | 是 | 無 |
preSql | 執行資料同步任務之前,需率先執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如,執行前清空表中的舊資料。 | 否 | 無 |
postSql | 執行資料同步任務之後執行的SQL語句。目前嚮導模式僅允許執行一條SQL語句,指令碼模式可以支援多條SQL語句,例如,加上某個時間戳記。 | 否 | 無 |
maxBatchRows | 每批次匯入資料的最大行數,與batchSize共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。 | 否 | 500000 |
batchSize | 每批次匯入資料的最巨量資料量,與maxBatchRows共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。 | 否 | 104857600 |
maxRetries | 每批次匯入資料失敗後的重試次數。 | 否 | 3 |
labelPrefix | 每批次上傳檔案的 label 首碼。最終的label將有 | 否 | datax_doris_writer_ |
loadProps | StreamLoad的請求參數,主要用於配置匯入的資料格式。預設以CSV格式匯入。如果loadProps沒有配置,則採用預設的CSV格式,以 如果您需要指定為JSON格式匯入,則配置如下所示。 | 否 | 無 |
彙總類型指令碼(Doris Writer寫入彙總類型)
Doris Writer支援寫入彙總類型,但是需要在指令碼模式中做額外的配置。如下所示。
例如,對於如下一張Doris的表,其中,uuid是bitmap類型(彙總類型)、sex是HLL類型(彙總類型)。
CREATE TABLE `example_table_1` (
`user_id` int(11) NULL,
`date` varchar(10) NULL DEFAULT "10.5",
`city` varchar(10) NULL,
`uuid` bitmap BITMAP_UNION NULL, -- 彙總類型
`sex` HLL HLL_UNION -- 彙總類型
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32往表中插入未經處理資料:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'通過Doris Writer寫入彙總類型時,不僅需要在writer.parameter.column中指定該列,還需要在writer.parameter.loadProps.columns中配置彙總函式。例如,為uuid使用彙總函式bitmap_hash,為sex使用彙總函式hll_hash。
指令碼模式樣本:
{
"stepType": "doris",//外掛程式名。
"writer":
{
"parameter":
{
"column":
[
"user_id",
"date",
"city",
"uuid",// 彙總類型bitmap
"sex"// 彙總類型HLL
],
"loadProps":
{
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02",
"columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// 需要指定彙總函式
},
"postSql":
[
"select count(1) from example_tbl_3"
],
"preSql":
[],
"datasource":"doris_datasource",//資料來源名。
"table": "doris_table_name",//表名。
}
"name": "Writer",
"category": "writer"
}
}