DataX Writer loads data into StarRocks tables using Stream Load. All incoming data is buffered and flushed in batches — in CSV or JSON format — so large datasets transfer efficiently. DataWorks integrates this capability natively, letting you sync MaxCompute data to E-MapReduce (EMR) StarRocks without writing custom pipeline code.
How it works
The data pipeline follows this sequence:
source → reader → DataX channel → writer → StarRocks
DataX Writer uses the StarRocks Stream Load HTTP API to ingest each batch. By default, data is serialized as a CSV file with \t as the column delimiter and \n as the row delimiter. Use the loadProps parameter to switch to JSON format or change the delimiters.
Prerequisites
Before you begin, make sure you have:
-
A DataX environment set up. Download the DataX plug-in and the DataX source code
-
An EMR StarRocks cluster with a frontend node (FE) accessible over HTTP
-
(For the RDS import example) An ApsaraDB RDS for MySQL instance in the same virtual private cloud (VPC) and vSwitch as the StarRocks instance
Write data to StarRocks
Configure the job
The following example reads from two MySQL databases and writes to a StarRocks table. All examples use the starrockswriter plugin to submit Stream Load jobs.
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"column": ["k1", "k2", "v1", "v2"],
"connection": [
{
"table": ["table1", "table2"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax_test1"]
},
{
"table": ["table3", "table4"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax_test2"]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"database": "xxxx",
"table": "xxxx",
"column": ["k1", "k2", "v1", "v2"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
"loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
"loadProps": {}
}
}
}
]
}
}
Writer parameters
| Parameter | Description | Required | Default |
|---|---|---|---|
username |
Username for the StarRocks database. | Yes | — |
password |
Password for the StarRocks database. | Yes | — |
database |
StarRocks database name. | Yes | — |
table |
StarRocks table name. | Yes | — |
loadUrl |
HTTP endpoint of the StarRocks frontend node (FE). Use the format fe_ip:fe_http_port. Specify one or more FE endpoints. |
Yes | — |
column |
Column names to write to in the destination table, separated by commas. For example: ["id","name","age"]. To import all columns, use ["*"]. |
Yes | — |
preSql |
SQL statement to run before writing data. | No | — |
postSql |
SQL statement to run after writing data. | No | — |
jdbcUrl |
Java Database Connectivity (JDBC) URL of the destination database, used to execute the SQL statements before and after writing data to the destination table. | No | — |
maxBatchRows |
Maximum number of rows per Stream Load job. | No | 500,000 |
maxBatchSize |
Maximum number of bytes per Stream Load job. | No | 104,857,600 |
flushInterval |
Time between the end of one Stream Load job and the start of the next, in milliseconds. | No | 300,000 |
loadProps |
Additional Stream Load request parameters. See Stream Load. | No | — |
Customize the import format
By default, DataX Writer produces CSV output. Use loadProps to override the format.
Change the column and row delimiters:
"loadProps": {
"column_separator": "\\x01",
"row_delimiter": "\\x02"
}
Switch to JSON format:
"loadProps": {
"format": "json",
"strip_outer_array": true
}
Run a test
python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json
End-to-end example: sync from MySQL to StarRocks
This example walks through creating the source and destination tables, then running the DataX job.
1. Create the source MySQL table and insert sample data:
CREATE TABLE `sr_db`.sr_table (
id INT,
name VARCHAR(1024),
event_time DATETIME
);
INSERT INTO `sr_db`.sr_table VALUES
(1, "aaa", "2015-09-12 00:00:00"),
(2, "bbb", "2015-09-12 00:00:00");
2. Create the destination StarRocks table:
CREATE TABLE IF NOT EXISTS load_db.datax_into_tbl (
id INT,
name STRING,
event_time DATETIME
) ENGINE=OLAP
DUPLICATE KEY(id, name)
DISTRIBUTED BY HASH(id, name) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
3. Create the DataX job file:
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "username",
"password": "***",
"column": ["id", "name", "event_time"],
"connection": [
{
"table": ["sr_table"],
"jdbcUrl": ["jdbc:mysql://rm-*****.mysql.rds.aliyuncs.com:3306/sr_db"]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "admin",
"password": "****",
"database": "load_db",
"table": "datax_load_tbl",
"column": ["id", "name", "event_time"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
"loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
"loadProps": {}
}
}
}
]
}
}
4. Run the job. A successful run produces output similar to:
Task start time : 2023-04-07 13:05:55
Task end time : 2023-04-07 13:06:05
Task execution time: 10s
Average traffic : 2 B/s
Write speed : 0 rec/s
Records read : 2
Read/write failures: 0
Sync MaxCompute data using DataWorks
DataWorks provides a visual interface for creating batch synchronization nodes that move data from MaxCompute to StarRocks without writing DataX job files manually.
Steps
-
Create a workspace. In the DataWorks console, create a workspace. See Create a workspace.
-
Prepare your MaxCompute data. Create a test table and upload data to the MaxCompute data source. See Create tables and upload data.
-
Add a StarRocks data source.
-
On the Workspaces page, find your workspace and click Data Integration in the Actions column.
-
In the left-side navigation pane, click Data Source.
-
In the upper-right corner, click Add data source.
-
In the Add data source dialog box, add a StarRocks data source.
-
-
Create a batch synchronization node.
-
Create a workflow. See Create a workflow.
-
In the workflow, create a batch synchronization node. See Configure a batch synchronization node by using the codeless UI.
-
-
Verify the data. After the node runs, view the imported records in your StarRocks instance. See View metadata.
Usage notes
-
The ApsaraDB RDS for MySQL instance and the StarRocks instance must be in the same VPC and vSwitch for direct connectivity.
-
jdbcUrlis used to execute pre- and post-import SQL. If you do not needpreSqlorpostSql, you can leavejdbcUrlblank.