All Products
Search
Document Center

E-MapReduce:DataX Writer

Last Updated:Mar 26, 2026

DataX Writer loads data into StarRocks tables using Stream Load. All incoming data is buffered and flushed in batches — in CSV or JSON format — so large datasets transfer efficiently. DataWorks integrates this capability natively, letting you sync MaxCompute data to E-MapReduce (EMR) StarRocks without writing custom pipeline code.

How it works

The data pipeline follows this sequence:

source → reader → DataX channel → writer → StarRocks

DataX Writer uses the StarRocks Stream Load HTTP API to ingest each batch. By default, data is serialized as a CSV file with \t as the column delimiter and \n as the row delimiter. Use the loadProps parameter to switch to JSON format or change the delimiters.

Prerequisites

Before you begin, make sure you have:

  • A DataX environment set up. Download the DataX plug-in and the DataX source code

  • An EMR StarRocks cluster with a frontend node (FE) accessible over HTTP

  • (For the RDS import example) An ApsaraDB RDS for MySQL instance in the same virtual private cloud (VPC) and vSwitch as the StarRocks instance

Write data to StarRocks

Configure the job

The following example reads from two MySQL databases and writes to a StarRocks table. All examples use the starrockswriter plugin to submit Stream Load jobs.

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "xxxx",
            "password": "xxxx",
            "column": ["k1", "k2", "v1", "v2"],
            "connection": [
              {
                "table": ["table1", "table2"],
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax_test1"]
              },
              {
                "table": ["table3", "table4"],
                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/datax_test2"]
              }
            ]
          }
        },
        "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "xxxx",
            "password": "xxxx",
            "database": "xxxx",
            "table": "xxxx",
            "column": ["k1", "k2", "v1", "v2"],
            "preSql": [],
            "postSql": [],
            "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
            "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
            "loadProps": {}
          }
        }
      }
    ]
  }
}

Writer parameters

Parameter Description Required Default
username Username for the StarRocks database. Yes
password Password for the StarRocks database. Yes
database StarRocks database name. Yes
table StarRocks table name. Yes
loadUrl HTTP endpoint of the StarRocks frontend node (FE). Use the format fe_ip:fe_http_port. Specify one or more FE endpoints. Yes
column Column names to write to in the destination table, separated by commas. For example: ["id","name","age"]. To import all columns, use ["*"]. Yes
preSql SQL statement to run before writing data. No
postSql SQL statement to run after writing data. No
jdbcUrl Java Database Connectivity (JDBC) URL of the destination database, used to execute the SQL statements before and after writing data to the destination table. No
maxBatchRows Maximum number of rows per Stream Load job. No 500,000
maxBatchSize Maximum number of bytes per Stream Load job. No 104,857,600
flushInterval Time between the end of one Stream Load job and the start of the next, in milliseconds. No 300,000
loadProps Additional Stream Load request parameters. See Stream Load. No

Customize the import format

By default, DataX Writer produces CSV output. Use loadProps to override the format.

Change the column and row delimiters:

"loadProps": {
  "column_separator": "\\x01",
  "row_delimiter": "\\x02"
}

Switch to JSON format:

"loadProps": {
  "format": "json",
  "strip_outer_array": true
}

Run a test

python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json

End-to-end example: sync from MySQL to StarRocks

This example walks through creating the source and destination tables, then running the DataX job.

1. Create the source MySQL table and insert sample data:

CREATE TABLE `sr_db`.sr_table (
  id INT,
  name VARCHAR(1024),
  event_time DATETIME
);

INSERT INTO `sr_db`.sr_table VALUES
  (1, "aaa", "2015-09-12 00:00:00"),
  (2, "bbb", "2015-09-12 00:00:00");

2. Create the destination StarRocks table:

CREATE TABLE IF NOT EXISTS load_db.datax_into_tbl (
  id         INT,
  name       STRING,
  event_time DATETIME
) ENGINE=OLAP
DUPLICATE KEY(id, name)
DISTRIBUTED BY HASH(id, name) BUCKETS 1
PROPERTIES (
  "replication_num" = "1"
);

3. Create the DataX job file:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "username",
            "password": "***",
            "column": ["id", "name", "event_time"],
            "connection": [
              {
                "table": ["sr_table"],
                "jdbcUrl": ["jdbc:mysql://rm-*****.mysql.rds.aliyuncs.com:3306/sr_db"]
              }
            ]
          }
        },
        "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "admin",
            "password": "****",
            "database": "load_db",
            "table": "datax_load_tbl",
            "column": ["id", "name", "event_time"],
            "preSql": [],
            "postSql": [],
            "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
            "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
            "loadProps": {}
          }
        }
      }
    ]
  }
}

4. Run the job. A successful run produces output similar to:

Task start time    : 2023-04-07 13:05:55
Task end time      : 2023-04-07 13:06:05
Task execution time:                  10s
Average traffic    :               2 B/s
Write speed        :            0 rec/s
Records read       :                    2
Read/write failures:                    0

Sync MaxCompute data using DataWorks

DataWorks provides a visual interface for creating batch synchronization nodes that move data from MaxCompute to StarRocks without writing DataX job files manually.

Steps

  1. Create a workspace. In the DataWorks console, create a workspace. See Create a workspace.

  2. Prepare your MaxCompute data. Create a test table and upload data to the MaxCompute data source. See Create tables and upload data.

  3. Add a StarRocks data source.

    1. On the Workspaces page, find your workspace and click Data Integration in the Actions column.

    2. In the left-side navigation pane, click Data Source.

    3. In the upper-right corner, click Add data source.

    4. In the Add data source dialog box, add a StarRocks data source.

  4. Create a batch synchronization node.

    1. Create a workflow. See Create a workflow.

    2. In the workflow, create a batch synchronization node. See Configure a batch synchronization node by using the codeless UI.

  5. Verify the data. After the node runs, view the imported records in your StarRocks instance. See View metadata.

Usage notes

  • The ApsaraDB RDS for MySQL instance and the StarRocks instance must be in the same VPC and vSwitch for direct connectivity.

  • jdbcUrl is used to execute pre- and post-import SQL. If you do not need preSql or postSql, you can leave jdbcUrl blank.