全部產品
Search
文件中心

DataWorks:Lindorm資料來源

更新時間:Dec 05, 2025

DataWorksData Integration支援使用Lindorm Reader和Lindorm Writer外掛程式讀取和寫入Lindorm雙向通道的功能,本文為您介紹DataWorks的Lindorm資料讀取與寫入能力。

適用範圍

說明

Lindorm為多模資料庫,詳情請參見Lindorm使用文檔,當前DataWorks僅支援寬表引擎及計算引擎兩種。

支援的欄位類型

Lindorm Reader和Lindorm Writer支援大部分Lindorm類型,但也存在個別沒有支援的情況,請注意檢查您的資料類型。

Lindorm Reader和Lindorm Writer針對Lindorm類型的轉換列表,如下所示。

類型分類

資料類型

整數類

INT、LONG、SHORT

浮點類

DOUBLE、FLOAT、DOUBLE

字串類

STRING

日期時間類

DATE

布爾類

BOOLEAN

二進位類

BINARYSTRING

資料同步任務開發

資料同步任務的配置入口和通用配置流程可參見下文的配置指導。

單表離線

單表即時

整庫即時

附錄:指令碼Demo與參數說明

離線任務指令碼配置方式

如果您配置離線任務時使用指令碼模式的方式進行配置,您需要按照統一的指令碼格式要求,在任務指令碼中編寫相應的參數,詳情請參見指令碼模式配置,以下為您介紹指令碼模式下資料來源的參數配置詳情。

Reader指令碼Demo

  • 配置一個寬表引擎Lindorm SQL Table抽取資料到本地的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "caching": 128,
                    "column": [
                       "id",
                      "value" 
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "tableService",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "lindorm",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value" 
                    ],
                    "socketTimeout": 3600000,
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它.
            "byte": 1048576
          }
          //出錯限制
          "errorLimit": {
            //出錯的record條數上限,當大於該值即報錯。
            "record": 0,
            //出錯的record百分比上限 1.0表示100%,0.02表示2%
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一個寬表引擎Lindorm HBaseLike(WideColumn)表抽取資料到本地的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "column":  [
                         "STRING|rowkey",
                          "INT|f:a"
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "wideColumn",
                    "table":"lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。
            "byte": 1048576
          }
            //出錯限制
            "errorLimit": {
            //出錯的record條數上限,當大於該值即報錯。
            "record": 0,
            //出錯的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • 配置一個計算引擎表抽取資料到本地的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                   "datasource": "lindorm_datasource",
                    "column": [
                       "id",
                       "value"
                    ],
                    "tableComment": "",
                    "where": "",
                    "session": [],
                    "splitPk": "id",
                    "table": "auto_ob_149912212480"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。
            "byte": 1048576
          }
            //出錯限制
            "errorLimit": {
            //出錯的record條數上限,當大於該值即報錯。
            "record": 0,
            //出錯的record百分比上限 1.0表示100%,0.02表示2%。
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }

Reader指令碼參數

參數

描述

是否必選

預設值

mode

寬表引擎特有,表示資料讀模數式,包括固定列模式FixedColumn和動態列模式DynamicColumn

FixedColumn

tableMode

寬表引擎特有,包括普通表SQL模式table和寬表模式wideColumn。預設為table,如果選擇table模式,可不填寫。

預設不填寫

table

表示所要讀取的Lindorm表名。Lindorm表名對大小寫敏感。

encoding

寬表引擎特有,編碼方式,取值為UTF-8或GBK。一般用於將二進位儲存的Lindorm byte[]類型轉換為String類型。

UTF-8

caching

寬表引擎特有,一次性批量擷取的記錄數大小,該值可以極大減少資料同步系統與Lindorm的網路互動次數,並提升整體輸送量。如果該值設定過大,會導致Lindorm服務端壓力過大或者資料同步運行進程OOM異常。

100

selects

寬表引擎特有,當前讀取的Table類型資料不支援自動切割分區,預設單並發運行,因此需要手動設定selects參數進行資料切片,例如:

selects": [
    "where(compare(\"id\", LESS, 5))",
    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
    ],

使用限制

  • 僅允許主鍵列和索引列作為查詢條件,如果使用普通列作為查詢條件,將會觸發全表掃描,影響源叢集的穩定性。

  • 當表包含多個主鍵列時,查詢條件必須遵循主鍵的最左匹配原則,即左側連續的n-1個主鍵列都需使用等於條件。例如:假設某表的主鍵為 [id, order_time],普通列為 [type, data],其中 type 列已建立二級索引。

    • 推薦文法樣本

      SQL文法

      外掛程式文法

      where id >= 1 and id < 100

      where(and(compare(\"id\", GREATER_OR_EQUAL, 1), compare(\"id\", LESS, 100)))

      where id = 1 and order_time > 1234567

      where(and(compare(\"id\", EQUAL, 1), compare(\"order_time\", GREATER, 1234567)))

      where type = 'pay'

      where(compare(\"type\", EQUAL, \"pay\"))
    • 不推薦文法樣本

      SQL文法

      外掛程式文法

      不推薦原因

      where order_time >= 1234567 and order_time < 5678910

      where(and(compare(\"order_time\", GREATER_OR_EQUAL, 1234567), compare(\"order_time\", LESS, 5678910)))

      缺少左側主鍵列id

      where id > 1 and order_time > 1234567

      where(and(compare(\"id\", GREATER, 1), compare(\"order_time\", GREATER, 1234567)))

      左側主鍵列id不是等於。

      where data > 'xxx'

      where(compare(\"data\", GREATER, \"xxx\"))

      data欄位是非主鍵列。

session

計算引擎特有,Session粒度作業參數,例如set hive.execution.engine=tez

splitPk

計算引擎特有,切分鍵,計算引擎表讀取特有,如果指定splitPk,表示您希望使用splitPk代表的欄位進行資料分區,資料同步因此會啟動並發任務進行資料同步,提高資料同步的效率。

  • 如果不填寫splitPk,包括不提供splitPk或者splitPk值為空白,資料同步視作使用單通道同步該表資料。

  • 目前splitPk僅支援整型資料切分,不支援字串、浮點和日期等其他類型。

columns

讀取欄位列表。讀取欄位列表支援列裁剪和列換序,列裁剪指可以選擇部分列進行匯出,列換序指可以不按照表schema資訊順序進行匯出。

  • table類型的表,只需要填寫列名即可,會自動從表的meta擷取schema資訊。樣本如下:

    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • HBaseLike(widecolumn)類型的表。樣本如下:

    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

Writer指令碼Demo

  • 配置一個資料來源為MySQL,需要將資料寫入寬表引擎 Lindorm SQL Table的作業。

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "nullMode": "skip",
            "datasource": "lindorm_datasource",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          //設定傳輸速度,單位為byte/s,DataX運行會儘可能達到該速度但是不超過它。
          "byte": 1048576
        },
        //出錯限制
        "errorLimit": {
          //出錯的record條數上限,當大於該值即報錯。
          "record": 0,
          //出錯的record百分比上限 1.0表示100%,0.02表示2%。
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • 配置一個資料來源為MySQL,需要將資料寫入寬表引擎 Lindorm HBaseLike (WideColumn)表的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                "datasource": "lindorm_datasource",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "column": [  //從源端按欄位順序映射
                      "ROW|STRING", //行鍵,固定配置,將源端第一個欄位對應為行鍵,例如本樣本中將id映射為行鍵。
                      "cf:name|STRING" //cf表示列族名,可修改,name表示目標端列名,可修改
                ]
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }
  • 配置一個資料來源為MySQL,需要將資料寫入計算引擎表的作業。

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                  "datasource": "lindorm_datasource",
                  "table": "xxxxxx",
                  "column": [ 
                     "id",
                    "value"
                   ],
                  "formatType": "ICEBERG"
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }

Writer指令碼參數

參數

描述

是否必選

預設值

table

表示所要寫入的Lindorm表名。Lindorm表名對大小寫敏感。

encoding

寬表引擎特有,編碼方式,取值為UTF-8或GBK。一般用於將二進位儲存的Lindorm byte[]類型轉換為String類型。

UTF-8

columns

寫入欄位列表。寫入欄位列表支援列裁剪和列換序,列裁剪指可以選擇部分列進行匯出,列換序指可以不按照表schema資訊順序進行匯出。

  • table類型的表,只需要填寫列名即可,會自動從表的meta擷取schema資訊。

  • widecolumn類型或table類型的表。

nullMode

寬表引擎特有,表示在讀取源頭資料的值為null時,Lindorm Writer 中的nullMode參數可通過配置不同內容,實現不同的處理方式。

  • SKIP:表示不向Lindorm寫入這列。

  • EMPTY_BYTES:表示遇到欄位值為空白時,寫入空位元組數組到Lindorm對應的欄位。

  • NULL:表示寫入null值。

  • DELETE:表示遇到欄位值為空白時刪除Lindorm中對應的欄位。

EMPTY_BYTES

formatType

計算引擎特有,比較待同步表的類型,取值範圍:

  • iceberg

  • parquet

  • orc