The KingbaseES data source enables bidirectional data synchronization between KingbaseES and DataWorks Data Integration. Use it to read data from KingbaseES into your pipeline or write processed data back to KingbaseES.
Limits
-
Supported resource groups: Serverless resource groups (recommended) and exclusive resource groups for Data Integration only.
-
Required permission: The sync task requires
insert/replace intopermissions. Use thepreSqlandpostSqlparameters to determine whether additional permissions are needed.
Supported field types
The following table lists the data types that KingbaseES Reader supports. Field types not listed are not supported.
| Category | Data types | Notes |
|---|---|---|
| Integer | INT, TINYINT, SMALLINT, MEDIUMINT, BIGINT | tinyint(1) is treated as an integer, not a boolean |
| Floating-point | FLOAT, DOUBLE, DECIMAL | |
| String | VARCHAR, CHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT | |
| Date and time | DATE, DATETIME, TIMESTAMP, TIME, YEAR | |
| Boolean | BIT, BOOL | |
| Binary | TINYBLOB, MEDIUMBLOB, BLOB, LONGBLOB, VARBINARY |
Add a data source
Before developing a sync task, add the KingbaseES data source in DataWorks. For instructions, see Data source management. Parameter descriptions are available in the DataWorks console when you add the data source.
Develop a sync task
Configure an offline sync task for a single table
Use either of the following methods to configure an offline sync task:
-
Codeless UI: Configure a task in the codeless UI
-
Code editor: Configure a task in the code editor
For all parameters and a sample script for the code editor, see Appendix: Script demo and parameter descriptions.
Appendix: Script demo and parameter descriptions
To configure a batch sync task using the code editor, set the required parameters in the script following the unified script format. For more information, see Configure a task in the code editor.
Reader script demo
Single database and single table
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "kingbasees",
"parameter": {
"column": [
"id"
],
"connection": [
{
"querySql": ["select a,b from join1 c join join2 d on c.id = d.id;"],
"datasource": "",
"table": [
"xxx"
]
}
],
"where": "",
"splitPk": "",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Sharded databases and tables
When using sharded tables, all selected KingbaseES tables must have the same schema.
{
"type": "job",
"version": "1.0",
"configuration": {
"reader": {
"plugin": "kingbasees",
"parameter": {
"connection": [
{
"table": ["tbl1", "tbl2", "tbl3"],
"datasource": "datasourceName1"
},
{
"table": ["tbl4", "tbl5", "tbl6"],
"datasource": "datasourceName2"
}
],
"singleOrMulti": "multi",
"splitPk": "db_id",
"column": ["id", "name", "age"],
"where": "1 < id and id < 100"
}
},
"writer": {}
}
}
Reader parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
username |
The username for connecting to KingbaseES. | No | None |
password |
The password for connecting to KingbaseES. | No | None |
column |
The fields to synchronize. Use * to synchronize all columns. |
No | None |
table |
The name of the table to synchronize. Use array format even for a single table: ["tablename"]. |
No | None |
jdbcUrl |
The Java Database Connectivity (JDBC) URL for connecting to KingbaseES. Example: jdbc:kingbase8://127.0.0.1:30215?currentschema=TEST. |
No | None |
querySql |
A custom SQL query used to filter data. When specified, the Reader ignores table, column, and where and uses this query directly. |
No | None |
where |
A filter condition applied to the table. Ignored when querySql is specified. |
No | None |
splitPk |
A numeric integer field in the KingbaseES table used as the split key to enable high-concurrency synchronization. Leave blank if no such field exists. | No | None |
encoding |
The encoding format of the data. | No | UTF-8 |
Writer script demo
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "kingbasees",
"parameter": {
"postSql": [],
"datasource": "",
"column": ["id", "value"],
"batchSize": 1024,
"table": "",
"preSql": [
"delete from XXX;"
]
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
datasource |
The name of the data source. Must match the name of the data source added in DataWorks. | Yes | None |
table |
The name of the destination table. | Yes | None |
column |
The fields in the destination table to write data to. Separate multiple fields with commas. Example: ["id", "name", "age"]. To write to all columns in order, use ["*"]. If a field name contains a forward slash (/), escape it: \"/field/name\". |
Yes | None |
preSql |
SQL statement to run before the sync task starts. Use this to prepare the destination table — for example: truncate table tablename. The codeless UI supports one statement; the code editor supports multiple statements. Multiple statements do not support transaction atomicity. |
No | None |
postSql |
SQL statement to run after the sync task completes. For example: alter table tablename add colname timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP. The codeless UI supports one statement; the code editor supports multiple statements. |
No | None |
batchSize |
The number of records to submit in a single batch. Larger values reduce network round trips and improve throughput, but values that are too large may cause out-of-memory (OOM) errors. | No | 1024 |