The Doris data source in DataWorks provides a bidirectional channel for reading from and writing to Doris databases, letting you move large volumes of data efficiently. This topic describes how to configure Doris as a data source and set up batch synchronization tasks.
Supported capabilities:
Read from Doris tables using filters, shard keys, or custom SQL
Write to Doris tables, including columns of aggregation types (BITMAP, HLL)
Incremental data synchronization using
whereconditionsConcurrent reads using
splitPkfor integer-type shard keys
Supported data types
Different Doris versions support different data types and aggregation models. The following table lists the major supported types. For a complete reference, see the official Doris documentation.
| Type | Supported models | Doris version |
|---|---|---|
| SMALLINT | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| INT | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| BIGINT | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| LARGEINT | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| FLOAT | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| DOUBLE | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| DECIMAL | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| DECIMALV3 | Aggregate, Unique, Duplicate | 1.2.1+, 2.x |
| DATE | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| DATETIME | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| DATEV2 | Aggregate, Unique, Duplicate | 1.2.x, 2.x |
| DATATIMEV2 | Aggregate, Unique, Duplicate | 1.2.x, 2.x |
| CHAR | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| VARCHAR | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| STRING | Aggregate, Unique, Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| VARCHAR | Aggregate, Unique, Duplicate | 1.1.x, 1.2.x, 2.x |
| ARRAY | Duplicate | 1.2.x, 2.x |
| JSONB | Aggregate, Unique, Duplicate | 1.2.x, 2.x |
| HLL | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| BITMAP | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x |
| QUANTILE_STATE | Aggregate | 1.2.x, 2.x |
Prerequisites
Before you begin, make sure that:
A logon account with read or write permissions exists in your Doris database. If you plan to use the
rootuser, set a password first — the root user has no password by default:SET PASSWORD FOR 'root' = PASSWORD('your_password')Network connectivity is established between Doris and the DataWorks resource group you plan to use (serverless resource group or exclusive resource group for Data Integration). Stream Load writes data through the private endpoint of the frontend (FE) node. If you access the FE node through a public endpoint, the request is redirected to the private IP address of a backend (BE) node, which causes the connection to fail (see Data Operation FAQs). Use the private endpoint to avoid this issue. For setup instructions, see Network connectivity solutions.
Add a Doris data source
Add the Doris data source in DataWorks before configuring any synchronization task. For general instructions, see Data source management.
The Doris data source requires the following parameters:
| Parameter | Description |
|---|---|
| JdbcUrl | The JDBC connection string, including the IP address, port number, database name, and any connection parameters. Both public and private IP addresses are supported. If you use a public IP address, make sure the Data Integration resource group can reach the Doris host. |
| FE endpoint | The IP addresses and HTTP ports of the FE nodes, in ip:port format. For multiple FE nodes, separate entries with commas — for example, ip1:port1,ip2:port2. All specified endpoints are tested when you verify the connection. |
| Username | The username for the Doris database. |
| Password | The password for the specified username. |
Develop a synchronization task
Configure the synchronization task using either the codeless UI or the code editor:
For the complete parameter reference and script examples, see the Appendix.
Appendix: Script examples and parameters
Reader
Script example
{
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "doris",
"parameter": {
"column": [
"id"
],
"connection": [
{
"querySql": [
"select a,b from join1 c join join2 d on c.id = d.id;"
],
"datasource": ""
}
],
"where": "",
"splitPk": "",
"encoding": "UTF-8"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"throttle": true,
"concurrent": 1,
"mbps": "12"
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Reader parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
datasource | Yes | — | The name of the data source, as configured in DataWorks. |
table | Yes | — | The source table to read from. A single task reads from one table. Supports range syntax for sharded tables — for example, table_[0-99] reads from table_0 through table_99. If a specified table or column does not exist, the task fails. |
column | Yes | — | The columns to read, as a JSON array. Supports column selection, reordering, and constants. Use ["*"] to read all columns. Cannot be empty. |
splitPk | No | — | The shard key for parallel reads. Supports integer columns only — strings, floating-point numbers, and dates are not supported. Use the primary key to ensure even data distribution. If omitted, data is read in a single channel. |
where | No | — | A SQL WHERE filter condition. For incremental sync, use a condition such as gmt_create>$bizdate. Cannot be set to limit 10. If omitted, all data is read. |
querySql | No | — | A custom SQL query (code editor only). When set, table, column, where, and splitPk are all ignored. Use this for multi-table joins or complex filter logic. Case-sensitive: use querySql, not querysql. |
`column` parameter — constant configuration
The column array supports constants in addition to column names. Follow Doris SQL syntax:
["id", "table", "1", "'mingya.wmy'", "'null'", "to_char(a+1)", "2.3", "true"]id— a regular column nametable— a column name that is also a reserved word1— an integer constant'mingya.wmy'— a string constant (single quotes required)" "— an empty string;null— a null value;'null'— the string "null"to_char(a+1)— a function expression2.3— a floating-point constanttrue— a boolean constant
Writer
Script example
{
"stepType": "doris",
"parameter": {
"postSql": [],
"preSql": [],
"datasource": "doris_datasource",
"table": "doris_table_name",
"column": [
"id",
"table_id",
"table_no",
"table_name",
"table_status"
],
"loadProps": {
"column_separator": "\\x01",
"line_delimiter": "\\x02"
}
},
"name": "Writer",
"category": "writer"
}Writer parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
datasource | Yes | — | The name of the data source, as configured in DataWorks. |
table | Yes | — | The destination table to write to. |
column | Yes | — | The destination columns to write, as a JSON array. Use ["*"] to write to all columns in order. |
preSql | No | — | SQL statements to run before the task starts — for example, to clear stale data. The codeless UI supports one statement; the code editor supports multiple. |
postSql | No | — | SQL statements to run after the task completes — for example, to add a timestamp. The codeless UI supports one statement; the code editor supports multiple. |
maxBatchRows | No | 500000 | Maximum number of rows per import batch. Import starts when either maxBatchRows or batchSize is reached. |
batchSize | No | 104857600 | Maximum data size per import batch, in bytes. Import starts when either batchSize or maxBatchRows is reached. |
maxRetries | No | 3 | Number of retry attempts after a batch import failure. |
labelPrefix | No | datax_doris_writer_ | Prefix for the Stream Load label. The final label is labelPrefix + UUID, which prevents duplicate imports. |
loadProps | No | — | Stream Load request parameters, primarily used to configure the import format. See details below. |
`loadProps` — import format configuration
By default, data is imported in CSV format with \t as the column delimiter and \n as the row delimiter:
"loadProps": {
"format": "csv",
"column_separator": "\\t",
"line_delimiter": "\\n"
}To import data in JSON format:
"loadProps": {
"format": "json"
}Write to aggregation type columns
Doris Writer supports writing to columns of aggregation types such as BITMAP and HLL. This requires additional configuration in the code editor: specify the column names in column, then define the aggregate functions in loadProps.columns.
For BITMAP columns: use the
bitmap_hashfunctionFor HLL columns: use the
hll_hashfunction
Example
Consider the following Doris table, where uuid is of the BITMAP type and sex is of the HLL type:
CREATE TABLE `example_table_1` (
`user_id` int(11) NULL,
`date` varchar(10) NULL DEFAULT "10.5",
`city` varchar(10) NULL,
`uuid` bitmap BITMAP_UNION NULL,
`sex` HLL HLL_UNION
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`, `city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32Raw data to insert:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'Writer script for aggregation types:
{
"stepType": "doris",
"writer": {
"parameter": {
"column": [
"user_id",
"date",
"city",
"uuid",
"sex"
],
"loadProps": {
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02",
"columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"
},
"postSql": [
"select count(1) from example_tbl_3"
],
"preSql": [],
"datasource": "doris_datasource",
"table": "doris_table_name"
},
"name": "Writer",
"category": "writer"
}
}The key is the columns field in loadProps: it maps intermediate variables (k1, k2) to the raw input columns and applies the aggregate functions (bitmap_hash, hll_hash) before writing to the BITMAP and HLL columns.