DataWorks Data Integration connects to Lindorm through the Lindorm Reader and Lindorm Writer plug-ins. This topic covers supported capabilities, field types, sync task types, and script parameters for both plug-ins.
Lindorm is a multi-model database. DataWorks currently supports only LindormTable and the compute engine. For a full overview of Lindorm, see Lindorm documentation.
Supported capabilities
Use the following table to confirm whether your Lindorm engine and resource group combination is supported before configuring a sync task.
| Engine | Read | Write | Serverless resource group | Exclusive resource group for Data Integration |
|---|---|---|---|---|
| LindormTable | Yes | Yes | Yes (recommended) | Yes |
| Compute engine | Yes | Yes | Yes | No |
Supported field types
Lindorm Reader and Lindorm Writer support the following data types. Verify that your columns use supported types before configuring a sync task.
| Category | Data types |
|---|---|
| Integer | INT, LONG, SHORT |
| Floating-point | DOUBLE, FLOAT |
| String | STRING |
| Date and time | DATE |
| Boolean | BOOLEAN |
| Binary | BINARYSTRING |
Sync task types
| Task type | Supported sources | Configuration guide |
|---|---|---|
| Offline single-table sync | All data source types supported by Data Integration | Offline single-table sync task |
| Real-time single-table sync | Kafka, LogHub, Hologres | Real-time single-table sync task |
| Real-time full-database sync | PostgreSQL | Configure a real-time full-database sync task |
For offline single-table sync tasks configured through the code editor, see Appendix: Script demos and parameters for the full parameter reference and script examples.
Appendix: Script demos and parameters
Reader script demos
The following examples show how to configure a Lindorm Reader job in the code editor. Each example targets a different engine mode — choose the one that matches your table type.
LindormTable (SQL mode)
Reads data from a LindormTable SQL table.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "lindorm",
"parameter": {
"mode": "FixedColumn",
"caching": 128,
"column": [
"id",
"value"
],
"envType": 1,
"datasource": "lindorm",
"tableMode": "tableService",
"table": "lindorm_table"
},
"name": "lindormreader",
"category": "reader"
},
{
"stepType": "mysql",
"parameter": {
"postSql": [],
"datasource": "lindorm",
"session": [],
"envType": 1,
"column": [
"id",
"value"
],
"socketTimeout": 3600000,
"writeMode": "insert",
"batchSize": 1024,
"encoding": "UTF-8",
"table": "",
"preSql": []
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"jvmOption": "",
"executeMode": null,
"errorLimit": {
"record": "0"
},
"speed": {
"byte": 1048576
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
LindormTable (HBaseLike WideColumn mode)
Reads data from a LindormTable wide-column table. Columns are specified as TYPE|columnFamily:columnName, with the row key specified as TYPE|rowkey.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "lindorm",
"parameter": {
"mode": "FixedColumn",
"column": [
"STRING|rowkey",
"INT|f:a"
],
"envType": 1,
"datasource": "lindorm",
"tableMode": "wideColumn",
"table": "lindorm_table"
},
"name": "lindormreader",
"category": "reader"
},
{
"stepType": "mysql",
"parameter": {
"postSql": [],
"datasource": "_IDB.TAOBAO",
"session": [],
"envType": 1,
"column": [
"id",
"value"
],
"socketTimeout": 3600000,
"guid": "",
"writeMode": "insert",
"batchSize": 1024,
"encoding": "UTF-8",
"table": "",
"preSql": []
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"jvmOption": "",
"executeMode": null,
"errorLimit": {
"record": "0"
},
"speed": {
"byte": 1048576
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Compute engine
Reads data from a Lindorm compute engine table. Use splitPk to enable parallel reads.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "lindorm",
"parameter": {
"datasource": "lindorm_datasource",
"column": [
"id",
"value"
],
"tableComment": "",
"where": "",
"session": [],
"splitPk": "id",
"table": "auto_ob_149912212480"
},
"name": "lindormreader",
"category": "reader"
},
{
"stepType": "mysql",
"parameter": {
"postSql": [],
"datasource": "_IDB.TAOBAO",
"session": [],
"envType": 1,
"column": [
"id",
"value"
],
"socketTimeout": 3600000,
"guid": "",
"writeMode": "insert",
"batchSize": 1024,
"encoding": "UTF-8",
"table": "",
"preSql": []
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"jvmOption": "",
"executeMode": null,
"errorLimit": {
"record": "0"
},
"speed": {
"byte": 1048576
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Reader parameters
Parameters marked LindormTable apply only to LindormTable (SQL and WideColumn modes). Parameters marked Compute engine apply only to the compute engine. Parameters with no tag apply to both.
| Parameter | Applies to | Description | Required | Default |
|---|---|---|---|---|
table |
Both | Name of the Lindorm table to read from. Case-sensitive. | Yes | None |
column |
Both | List of columns to read. Supports column selection and reordering. For SQL-mode tables, specify column names only — schema is retrieved automatically. For wide-column tables, use the format TYPE|columnFamily:columnName (for example, INT|f:a), and TYPE|rowkey for the row key. |
Yes | None |
mode |
LindormTable | Data read mode. Valid values: FixedColumn, DynamicColumn. |
Yes | FixedColumn |
tableMode |
LindormTable | Table access mode. Valid values: table (SQL mode), wideColumn (wide-column mode). Omit this parameter for SQL mode. |
No | table |
encoding |
LindormTable | Character encoding used to convert Lindorm byte[] binary values to strings. Valid values: UTF-8, GBK. |
No | UTF-8 |
caching |
LindormTable | Number of records fetched per batch. Larger values reduce network round trips but may increase pressure on the Lindorm server or cause out-of-memory (OOM) errors in the sync process. | No | 100 |
selects |
LindormTable | Manual shard configuration for parallel reads. LindormTable does not auto-shard, so the job runs single-threaded unless you configure this parameter. See Configuring selects for parallel reads. | No | None |
session |
Compute engine | Session-level parameters, such as set hive.execution.engine=tez. |
No | None |
splitPk |
Compute engine | Shard key for parallel reads. When specified, the job splits data by this field and runs concurrent sync tasks. Supports integer columns only — strings, floating-point numbers, and dates are not supported. If omitted or blank, the job uses a single channel. | No | None |
Configuring selects for parallel reads
LindormTable does not automatically shard data, so reads run as a single concurrent process by default. To enable parallel reads, configure the selects parameter to manually define shard ranges.
Example: Shard by primary key `id`
"selects": [
"where(compare(\"id\", LESS, 5))",
"where(and(compare(\"id\", GREATER_OR_EQUAL, 5), compare(\"id\", LESS, 10)))",
"where(compare(\"id\", GREATER_OR_EQUAL, 10))"
]
Only primary key columns and secondary index columns can be used as query conditions in selects. Using a standard (non-indexed) column triggers a full table scan, which may degrade source cluster stability.
For tables with composite primary keys, conditions must follow the leftmost prefix rule: the first n−1 consecutive primary key columns must use equality conditions. For example, given a primary key [id, order_time] and a secondary index on type:
| SQL syntax | Plug-in syntax | Status |
|---|---|---|
where id >= 1 and id < 100 |
where(and(compare("id", GREATER_OR_EQUAL, 1), compare("id", LESS, 100))) |
Supported |
where id = 1 and order_time > 1234567 |
where(and(compare("id", EQUAL, 1), compare("order_time", GREATER, 1234567))) |
Supported |
where type = 'pay' |
where(compare("type", EQUAL, "pay")) |
Supported (secondary index) |
where order_time >= 1234567 and order_time < 5678910 |
where(and(compare("order_time", GREATER_OR_EQUAL, 1234567), compare("order_time", LESS, 5678910))) |
Not supported — id (leftmost key) is missing |
where id > 1 and order_time > 1234567 |
where(and(compare("id", GREATER, 1), compare("order_time", GREATER, 1234567))) |
Not supported — id does not use an equality condition |
where data > 'xxx' |
where(compare("data", GREATER, "xxx")) |
Not supported — data is a standard column, not a primary key or index |
Writer script demos
The following examples show how to configure a Lindorm Writer job. Choose the example that matches your target table type.
LindormTable (SQL mode)
Writes data from a MySQL source to a LindormTable SQL table.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"checkSlave": true,
"datasource": " ",
"envType": 1,
"column": [
"id",
"value"
],
"socketTimeout": 3600000,
"masterSlave": "slave",
"connection": [
{
"datasource": " ",
"table": []
}
],
"where": "",
"splitPk": "",
"encoding": "UTF-8",
"print": true
},
"name": "mysqlReader",
"category": "reader"
},
{
"stepType": "lindorm",
"parameter": {
"nullMode": "skip",
"datasource": "lindorm_datasource",
"envType": 1,
"column": [
"id",
"value"
],
"dynamicColumn": "false",
"table": "lindorm_table",
"encoding": "utf8"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"jvmOption": "",
"executeMode": null,
"speed": {
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
LindormTable (HBaseLike WideColumn mode)
Writes data from a MySQL source to a LindormTable wide-column table. The column array maps source fields in order: the first field maps to the row key (ROW|STRING), and subsequent fields map to column family columns (cf:name|STRING).
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": " ",
"column": [
"id",
"value"
],
"connection": [
{
"datasource": " ",
"table": []
}
],
"where": "",
"splitPk": "",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "lindorm",
"parameter": {
"datasource": "lindorm_datasource",
"table": "xxxxxx",
"encoding": "utf8",
"nullMode": "skip",
"dynamicColumn": "false",
"caching": 128,
"column": [
"ROW|STRING",
"cf:name|STRING"
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"jvmOption": "",
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 3,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
In thecolumnarray,ROW|STRINGmaps the first source field to the row key. Each subsequent entry uses the formatcolumnFamily:columnName|TYPE— replacecfandnamewith your actual column family name and column name.
Compute engine
Writes data from a MySQL source to a Lindorm compute engine table. Set formatType to match the table's storage format.
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mysql",
"parameter": {
"envType": 0,
"datasource": " ",
"column": [
"id",
"value"
],
"connection": [
{
"datasource": " ",
"table": []
}
],
"where": "",
"splitPk": "",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "lindorm",
"parameter": {
"datasource": "lindorm_datasource",
"table": "xxxxxx",
"column": [
"id",
"value"
],
"formatType": "ICEBERG"
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"jvmOption": "",
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": 3,
"throttle": false
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer parameters
Parameters marked LindormTable apply only to LindormTable (SQL and WideColumn modes). Parameters marked Compute engine apply only to the compute engine. Parameters with no tag apply to both.
| Parameter | Applies to | Description | Required | Default |
|---|---|---|---|---|
table |
Both | Name of the Lindorm table to write to. Case-sensitive. | Yes | None |
column |
Both | List of columns to write. Supports column selection and reordering. For SQL-mode tables, specify column names only. For wide-column tables, use the format columnFamily:columnName|TYPE for regular columns and ROW|TYPE for the row key. |
Yes | None |
encoding |
LindormTable | Character encoding used to convert Lindorm byte[] binary values to strings. Valid values: UTF-8, GBK. |
No | UTF-8 |
nullMode |
LindormTable | How to handle null values from the source. Valid values: SKIP (skip the column), EMPTY_BYTES (write an empty byte array), NULL (write null), DELETE (delete the corresponding field). |
No | EMPTY_BYTES |
formatType |
Compute engine | Storage format of the target compute engine table. Valid values: iceberg, parquet, orc. |
No | None |