All Products
Search
Document Center

DataWorks:Lindorm data source

Last Updated:Dec 04, 2025

DataWorks Data Integration uses the Lindorm Reader and Lindorm Writer plug-ins to read data from and write data to Lindorm. This topic describes the data read and write capabilities that DataWorks provides for Lindorm.

Applicability

Note

Lindorm is a multi-model database. For more information, see Lindorm documentation. DataWorks currently supports only LindormTable and the compute engine.

Supported field types

Lindorm Reader and Lindorm Writer support most Lindorm data types, but some types are not supported. You must verify that your data types are supported.

The following table lists the data type conversions for Lindorm Reader and Lindorm Writer.

Type categorization

Data type

Integer

INT, LONG, SHORT

Floating-point

DOUBLE, FLOAT, DOUBLE

String

STRING

Date and time

DATE

Boolean

BOOLEAN

Binary

BINARYSTRING

Develop a data synchronization task

For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.

Offline single-table sync

Real-time single-table sync

Real-time full-database sync

Appendix: Script demo and parameters

Configure a batch synchronization task by using the code editor

If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a task in the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.

Reader script demo

  • Configure a job to extract data from a LindormTable Lindorm SQL table to a local machine.

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "caching": 128,
                    "column": [
                       "id",
                      "value" 
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "tableService",
                    "table": "lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "lindorm",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value" 
                    ],
                    "socketTimeout": 3600000,
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            // Sets the transmission speed in byte/s. DataX tries to reach but not exceed this speed.
            "byte": 1048576
          }
          // Error limit
          "errorLimit": {
            // The maximum number of error records. If the number of error records exceeds this value, the job fails.
            "record": 0,
            // The maximum percentage of error records. For example, 1.0 means 100%, and 0.02 means 2%.
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • Configure a job to extract data from a LindormTable Lindorm HBaseLike (WideColumn) table to a local machine.

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                    "mode": "FixedColumn",
                    "column":  [
                         "STRING|rowkey",
                          "INT|f:a"
                    ],
                    "envType": 1,
                    "datasource": "lindorm",
                    "tableMode": "wideColumn",
                    "table":"lindorm_table"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            // Sets the transmission speed in byte/s. DataX tries to reach but not exceed this speed.
            "byte": 1048576
          }
            // Error limit
            "errorLimit": {
            // The maximum number of error records. If the number of error records exceeds this value, the job fails.
            "record": 0,
            // The maximum percentage of error records. For example, 1.0 means 100%, and 0.02 means 2%.
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }
  • Configure a job to extract data from a compute engine table to a local machine.

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "lindorm",
                "parameter": {
                   "datasource": "lindorm_datasource",
                    "column": [
                       "id",
                       "value"
                    ],
                    "tableComment": "",
                    "where": "",
                    "session": [],
                    "splitPk": "id",
                    "table": "auto_ob_149912212480"
                },
                "name": "lindormreader",
                "category": "reader"
            },
            {
                "stepType": "mysql",
                "parameter": {
                    "postSql": [],
                    "datasource": "_IDB.TAOBAO",
                    "session": [],
                    "envType": 1,
                    "column": [
                        "id",
                        "value"
                    ],
                    "socketTimeout": 3600000,
                    "guid": "",
                    "writeMode": "insert",
                    "batchSize": 1024,
                    "encoding": "UTF-8",
                    "table": "",
                    "preSql": []
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "setting": {
            "jvmOption": "",
            "executeMode": null,
            "errorLimit": {
                "record": "0"
            },
            "speed": {
            // Sets the transmission speed in byte/s. DataX tries to reach but not exceed this speed.
            "byte": 1048576
          }
            // Error limit
            "errorLimit": {
            // The maximum number of error records. If the number of error records exceeds this value, the job fails.
            "record": 0,
            // The maximum percentage of error records. For example, 1.0 means 100%, and 0.02 means 2%.
            "percentage": 0.02
          }
        },
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
    }

Reader script parameters

Parameter

Description

Required

Default value

mode

Specific to LindormTable. Specifies the data read mode. Valid values are FixedColumn and DynamicColumn.

Yes

FixedColumn

tableMode

Specific to LindormTable. Valid values are table for standard table SQL mode and wideColumn for wide table mode. The default value is table. If you select table mode, you do not need to specify this parameter.

No

Not specified by default

table

The name of the Lindorm table from which to read data. The table name is case-sensitive.

Yes

None

encoding

Specific to LindormTable. The codec. Valid values are UTF-8 and GBK. This parameter is typically used to convert a Lindorm byte[] type stored in binary to a String type.

No

UTF-8

caching

Specific to LindormTable. The number of records to retrieve in a single batch. A larger value can significantly reduce network interactions between the data synchronization system and Lindorm, improving overall throughput. If this value is too large, it may cause excessive pressure on the Lindorm server or an out-of-memory (OOM) error in the data synchronization process.

No

100

selects

Specific to LindormTable. The system does not automatically shard data for the current table type being read. By default, the job runs with a single concurrent process. You must manually configure the selects parameter to shard the data. Example:

selects": [
    "where(compare(\"id\", LESS, 5))",
    "where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
    "where(compare(\"id\", GREATER_OR_EQUAL, 10))"
    ],

Limits:

  • Only primary key columns and index columns can be used as query conditions. If you use a standard column as a query condition, it triggers a full table scan, which affects the stability of the source cluster.

  • If a table has multiple primary key columns, the query condition must follow the leftmost prefix rule. This means the first n-1 consecutive primary key columns on the left must use an equality condition. For example, assume a table has the primary key [id, order_time] and standard columns [type, data]. A secondary index has been created for the type column.

    • Recommended syntax examples:

      SQL syntax

      Plug-in syntax

      where id >= 1 and id < 100

      where(and(compare(\"id\", GREATER_OR_EQUAL, 1), compare(\"id\", LESS, 100)))

      where id = 1 and order_time > 1234567

      where(and(compare(\"id\", EQUAL, 1), compare(\"order_time\", GREATER, 1234567)))

      where type = 'pay'

      where(compare(\"type\", EQUAL, \"pay\"))
    • Not recommended syntax examples:

      SQL syntax

      Plug-in syntax

      Reason not recommended

      where order_time >= 1234567 and order_time < 5678910

      where(and(compare(\"order_time\", GREATER_OR_EQUAL, 1234567), compare(\"order_time\", LESS, 5678910)))

      The leftmost primary key column id is missing.

      where id > 1 and order_time > 1234567

      where(and(compare(\"id\", GREATER, 1), compare(\"order_time\", GREATER, 1234567)))

      The leftmost primary key column id does not use an equality condition.

      where data > 'xxx'

      where(compare(\"data\", GREATER, \"xxx\"))

      The data field is not a primary key column.

No

None

session

Specific to the compute engine. Session-level job parameters, such as set hive.execution.engine=tez.

No

None

splitPk

Specific to the compute engine. The shard key. This parameter is specific to reading data from compute engine tables. If you specify splitPk, the data is sharded based on the specified field. Data synchronization starts concurrent tasks to synchronize the data, which improves efficiency.

  • If you do not specify splitPk, or if the value of splitPk is empty, data synchronization uses a single channel to synchronize the table data.

  • Currently, splitPk supports sharding only for integer data. It does not support strings, floating-point numbers, dates, or other data types.

No

None

columns

The list of fields to read. You can crop columns and reorder them. Cropping columns means you can select a subset of columns to export. Reordering columns means you can export columns in an order different from the table schema.

  • For tables of the table type, specify only the column names. The schema information is automatically obtained from the table's metadata. Example:

    [
        "id",
        "name",
        "age",
        "birthday",
        "gender"
    ]
  • For tables of the HBaseLike (widecolumn) type. Example:

    [
        "STRING|rowkey",
        "INT|f:a",
        "DOUBLE|f:b"
    ]

Yes

None

Writer script demo

  • Configure a job to write data from a MySQL data source to a LindormTable Lindorm SQL table.

    {
      "type": "job",
      "version": "2.0",
      "steps": [
        {
          "stepType": "mysql",
          "parameter": {
            "checkSlave": true,
            "datasource": " ",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "socketTimeout": 3600000,
            "masterSlave": "slave",
            "connection": [
              {
                "datasource": " ",
                "table": []
              }
            ],
            "where": "",
            "splitPk": "",
            "encoding": "UTF-8",
            "print": true
          },
          "name": "mysqlReader",
          "category": "reader"
        },
        {
          "stepType": "lindorm",
          "parameter": {
            "nullMode": "skip",
            "datasource": "lindorm_datasource",
            "envType": 1,
            "column": [
              "id",
              "value"
            ],
            "dynamicColumn": "false",
            "table": "lindorm_table",
            "encoding": "utf8"
          },
          "name": "Writer",
          "category": "writer"
        }
      ],
      "setting": {
        "jvmOption": "",
        "executeMode": null,
        "speed": {
          // Sets the transmission speed in byte/s. DataX tries to reach but not exceed this speed.
          "byte": 1048576
        },
        // Error limit
        "errorLimit": {
          // The maximum number of error records. If the number of error records exceeds this value, the job fails.
          "record": 0,
          // The maximum percentage of error records. For example, 1.0 means 100%, and 0.02 means 2%.
          "percentage": 0.02
        }
      },
      "order": {
        "hops": [
          {
            "from": "Reader",
            "to": "Writer"
          }
        ]
      }
    }
  • Configure a job to write data from a MySQL data source to a LindormTable Lindorm HBaseLike (WideColumn) table.

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                "datasource": "lindorm_datasource",
                "table": "xxxxxx",
                "encoding": "utf8",
                "nullMode": "skip",
                "dynamicColumn": "false",
                "caching": 128,
                "column": [  // Maps fields from the source in order.
                      "ROW|STRING", // The rowkey. This is a fixed configuration. The first field from the source is mapped to the rowkey. In this example, the id field is mapped to the rowkey.
                      "cf:name|STRING" // cf specifies the column family name, which you can change. name specifies the column name in the destination, which you can change.
                ]
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }
  • Configure a job to write data from a MySQL data source to a compute engine table.

    {
        "type": "job",
        "version": "2.0",
        "steps": [
            {
                "stepType": "mysql",
                "parameter": {
                    "envType": 0,
                    "datasource": " ",
                    "column": [
                         "id",
                         "value"
                    ],
                    "connection": [
                        {
                            "datasource": " ",
                            "table": []
                        }
                    ],
                    "where": "",
                    "splitPk": "",
                    "encoding": "UTF-8"
                },
                "name": "Reader",
                "category": "reader"
    
            },
          {
              "stepType": "lindorm",
              "parameter": {
                  "datasource": "lindorm_datasource",
                  "table": "xxxxxx",
                  "column": [ 
                     "id",
                    "value"
                   ],
                  "formatType": "ICEBERG"
                },
                "name":"Writer",
                "category":"writer"
            }
        ],
        "setting": {
            "jvmOption": "",
                    "errorLimit": {
                            "record": "0"
                    },
                    "speed": {
                        "concurrent": 3,
                        "throttle": false
                    }
        },
        "order": {
                "hops": [
                   {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        }
      }

Writer script parameters

Parameter

Description

Required

Default value

table

The name of the Lindorm table to which to write data. The table name is case-sensitive.

Yes

None

encoding

Specifies the codec for LindormTable. Valid values are UTF-8 and GBK. This parameter is typically used to convert a Lindorm byte[] type stored in binary to a String type.

No

UTF-8

columns

Specifies the list of fields to write. You can crop columns, which means you can select a subset of columns to export. You can also reorder columns, which means you can export columns in an order that is different from the table schema.

  • For tables of the `table` type, you only need to specify the column names. The schema information is automatically retrieved from the table's metadata.

  • For wide-column tables.

Yes

None

nullMode

This parameter applies only to LindormTable. It specifies how to handle null values from the source data.

  • SKIP: Does not write the column to Lindorm.

  • EMPTY_BYTES: Writes an empty byte array to the corresponding field in Lindorm if the field value is null.

  • NULL: Writes a null value.

  • DELETE: Deletes the corresponding field in Lindorm if the field value is null.

No

EMPTY_BYTES

formatType

This parameter applies only to the compute engine. It specifies the format of the table for the sync task. Valid values:

  • iceberg

  • parquet

  • orc

No

None