DataWorksData Integration支援使用Lindorm Reader和Lindorm Writer外掛程式讀取和寫入Lindorm雙向通道的功能,本文為您介紹DataWorks的Lindorm資料讀取與寫入能力。
適用範圍
寬表引擎支援使用Serverless資源群組(推薦)和獨享Data Integration資源群組。
計算引擎僅支援使用Serverless資源群組。
Lindorm為多模資料庫,詳情請參見Lindorm使用文檔,當前DataWorks僅支援寬表引擎及計算引擎兩種。
支援的欄位類型
Lindorm Reader和Lindorm Writer支援大部分Lindorm類型,但也存在個別沒有支援的情況,請注意檢查您的資料類型。
Lindorm Reader和Lindorm Writer針對Lindorm類型的轉換列表,如下所示。
類型分類 | 資料類型 |
整數類 | INT、LONG、SHORT |
浮點類 | DOUBLE、FLOAT、DOUBLE |
字串類 | STRING |
日期時間類 | DATE |
布爾類 | BOOLEAN |
二進位類 | BINARYSTRING |
資料同步任務開發
資料同步任務的配置入口和通用配置流程可參見下文的配置指導。
單表離線
支援資料來源:Data Integration模組資料來源支援的所有資料來源類型
配置指導:單表離線同步任務
指令碼模式配置的全量參數和指令碼Demo請參見下文的附錄:指令碼Demo與參數說明。
單表即時
支援資料來源:Kafka、LogHub、Hologres
配置指導:單表即時同步任務
整庫即時
支援資料來源:PostgreSQL
配置指導:整庫即時同步任務配置
附錄:指令碼Demo與參數說明
離線任務指令碼配置方式
如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見指令碼模式配置,以下為您介紹指令碼模式下資料來源的參數配置詳情。
Reader指令碼Demo
配置一個寬表引擎Lindorm SQL Table抽取資料到本地的作業。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "caching": 128, "column": [ "id", "value" ], "envType": 1, "datasource": "lindorm", "tableMode": "tableService", "table": "lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "lindorm", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它. "byte": 1048576 } //出錯限制 "errorLimit": { //出錯的record條數上限,當大於該值即報錯。 "record": 0, //出錯的record百分比上限 1.0表示100%,0.02表示2% "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }配置一個寬表引擎Lindorm HBaseLike(WideColumn)表抽取資料到本地的作業。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "mode": "FixedColumn", "column": [ "STRING|rowkey", "INT|f:a" ], "envType": 1, "datasource": "lindorm", "tableMode": "wideColumn", "table":"lindorm_table" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。 "byte": 1048576 } //出錯限制 "errorLimit": { //出錯的record條數上限,當大於該值即報錯。 "record": 0, //出錯的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }配置一個計算引擎表抽取資料到本地的作業。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "column": [ "id", "value" ], "tableComment": "", "where": "", "session": [], "splitPk": "id", "table": "auto_ob_149912212480" }, "name": "lindormreader", "category": "reader" }, { "stepType": "mysql", "parameter": { "postSql": [], "datasource": "_IDB.TAOBAO", "session": [], "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "guid": "", "writeMode": "insert", "batchSize": 1024, "encoding": "UTF-8", "table": "", "preSql": [] }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "errorLimit": { "record": "0" }, "speed": { //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。 "byte": 1048576 } //出錯限制 "errorLimit": { //出錯的record條數上限,當大於該值即報錯。 "record": 0, //出錯的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Reader指令碼參數
參數 | 描述 | 是否必選 | 預設值 | ||||||||||||||||||||
mode | 寬表引擎特有,表示資料讀模數式,包括固定列模式FixedColumn和動態列模式DynamicColumn。 | 是 | FixedColumn | ||||||||||||||||||||
tableMode | 寬表引擎特有,包括普通表SQL模式table和寬表模式wideColumn。預設為table,如果選擇table模式,可不填寫。 | 否 | 預設不填寫 | ||||||||||||||||||||
table | 表示所要讀取的Lindorm表名。Lindorm表名對大小寫敏感。 | 是 | 無 | ||||||||||||||||||||
encoding | 寬表引擎特有,編碼方式,取值為UTF-8或GBK。一般用於將二進位儲存的Lindorm byte[]類型轉換為String類型。 | 否 | UTF-8 | ||||||||||||||||||||
caching | 寬表引擎特有,一次性批量擷取的記錄數大小,該值可以極大減少資料同步系統與Lindorm的網路互動次數,並提升整體輸送量。如果該值設定過大,會導致Lindorm服務端壓力過大或者資料同步運行進程OOM異常。 | 否 | 100 | ||||||||||||||||||||
selects | 寬表引擎特有,當前讀取的Table類型資料不支援自動切割分區,預設單並發運行,因此需要手動設定selects參數進行資料切片,例如: 使用限制:
| 否 | 無 | ||||||||||||||||||||
session | 計算引擎特有,Session粒度作業參數,例如 | 否 | 無 | ||||||||||||||||||||
splitPk | 計算引擎特有,切分鍵,計算引擎表讀取特有,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區,資料同步因此會啟動並發任務進行資料同步,提高資料同步的效率。
| 否 | 無 | ||||||||||||||||||||
columns | 讀取欄位列表。讀取欄位列表支援列裁剪和列換序,列裁剪指可以選擇部分列進行匯出,列換序指可以不按照表schema資訊順序進行匯出。
| 是 | 無 |
Writer指令碼Demo
配置一個資料來源為MySQL,需要將資料寫入寬表引擎 Lindorm SQL Table的作業。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "checkSlave": true, "datasource": " ", "envType": 1, "column": [ "id", "value" ], "socketTimeout": 3600000, "masterSlave": "slave", "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8", "print": true }, "name": "mysqlReader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "nullMode": "skip", "datasource": "lindorm_datasource", "envType": 1, "column": [ "id", "value" ], "dynamicColumn": "false", "table": "lindorm_table", "encoding": "utf8" }, "name": "Writer", "category": "writer" } ], "setting": { "jvmOption": "", "executeMode": null, "speed": { //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。 "byte": 1048576 }, //出錯限制 "errorLimit": { //出錯的record條數上限,當大於該值即報錯。 "record": 0, //出錯的record百分比上限 1.0表示100%,0.02表示2%。 "percentage": 0.02 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }配置一個資料來源為MySQL,需要將資料寫入寬表引擎 Lindorm HBaseLike (WideColumn)表的作業。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "value" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "table": "xxxxxx", "encoding": "utf8", "nullMode": "skip", "dynamicColumn": "false", "caching": 128, "column": [ //從源端按欄位順序映射 "ROW|STRING", //行鍵,固定配置,將源端第一個欄位對應為行鍵,例如本樣本中將id映射為行鍵。 "cf:name|STRING" //cf表示列族名,可修改,name表示目標端列名,可修改 ] }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }配置一個資料來源為MySQL,需要將資料寫入計算引擎表的作業。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "datasource": " ", "column": [ "id", "value" ], "connection": [ { "datasource": " ", "table": [] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "lindorm", "parameter": { "datasource": "lindorm_datasource", "table": "xxxxxx", "column": [ "id", "value" ], "formatType": "ICEBERG" }, "name":"Writer", "category":"writer" } ], "setting": { "jvmOption": "", "errorLimit": { "record": "0" }, "speed": { "concurrent": 3, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }
Writer指令碼參數
參數 | 描述 | 是否必選 | 預設值 |
table | 表示所要寫入的Lindorm表名。Lindorm表名對大小寫敏感。 | 是 | 無 |
encoding | 寬表引擎特有,編碼方式,取值為UTF-8或GBK。一般用於將二進位儲存的Lindorm byte[]類型轉換為String類型。 | 否 | UTF-8 |
columns | 寫入欄位列表。寫入欄位列表支援列裁剪和列換序,列裁剪指可以選擇部分列進行匯出,列換序指可以不按照表schema資訊順序進行匯出。
| 是 | 無 |
nullMode | 寬表引擎特有,表示在讀取源頭資料的值為null時,Lindorm Writer 中的nullMode參數可通過配置不同內容,實現不同的處理方式。
| 否 | EMPTY_BYTES |
formatType | 計算引擎特有,比較待同步表的類型,取值範圍:
| 否 | 無 |