All Products
Search
Document Center

E-MapReduce:DataX Writer

Last Updated:May 09, 2023

DataX Writer can write data to StarRocks tables. In the underlying implementation, DataX Writer uses the Stream Load method to import data to StarRocks in the CSV or JSON format. The data read by DataX Writer is cached and imported to StarRocks in batches to improve the write performance. Alibaba Cloud DataWorks has integrated with the import capability of DataX and can synchronize MaxCompute data to EMR StarRocks. This topic describes how DataX Writer works. This topic also describes how to use DataWorks to run batch synchronization nodes.

Background information

DataX Writer writes data in the following process: Data source -> Reader -> DataX channel -> Writer -> StarRocks.

Feature description

Prepare the environment

You can download the DataX plug-in and DataX source code for testing.

You can run the python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json command during testing.

Sample code

The following sample code provides an example on how to read data from a MySQL database and import data to a StarRocks database:

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "column": [ "k1", "k2", "v1", "v2" ],
                        "connection": [
                            {
                                "table": [ "table1", "table2" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test1"
                                ]
                            },
                            {
                                "table": [ "table3", "table4" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test2"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "starrockswriter",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "database": "xxxx",
                        "table": "xxxx",
                        "column": ["k1", "k2", "v1", "v2"],
                        "preSql": [],
                        "postSql": [],
                        "jdbcUrl": "jdbc:mysql://172.28.**.**:9030/",
                        "loadUrl": ["172.28.**.**:8030", "172.28.**.**:8030"],
                        "loadProps": {}
                    }
                }
            }
        ]
    }
}

The following table describes the parameters in the sample code.

Parameter

Description

Required

Default value

username

The account that is used to connect to the StarRocks database.

Yes

No default value

password

The password that is used to connect to the StarRocks database.

Yes

No default value

database

The name of the StarRocks database.

Yes

No default value

table

The name of the StarRocks table.

Yes

No default value

loadUrl

The endpoint of the StarRocks frontend (FE) that is used to access Stream Load, in the fe_ip:fe_http_port format. You can specify the endpoints of one or more FEs.

Yes

No default value

column

The names of the columns to which you want to write data in the destination table. Separate the names with commas (,). Example: "column": ["id","name","age"].

Important

This parameter is required. If you want to import data to all columns, you can use ["*"].

Yes

No default value

preSql

The SQL statement that you want to execute before you write data to the destination table.

No

No default value

postSql

The SQL statement that you want to execute after data is written to the destination table.

No

No default value

jdbcUrl

The Java Database Connectivity (JDBC) connection to the destination database. The JDBS connection is used to execute the SQL statement that you want to execute before you write data to the destination table or after data is written to the destination table.

No

No default value

maxBatchRows

The maximum number of rows that can be imported in a single Stream Load.

No

500000

maxBatchSize

The maximum number of bytes that can be imported in a single Stream Load.

No

104857600

flushInterval

The interval between the end of the last Stream Load and the start of the next Stream Load. Unit: ms.

No

300000 ms

loadProps

The request parameters of Stream Load. For more information, see Stream Load.

No

No default value

Data type conversion

By default, the imported data is converted into strings, with \t as the column delimiter and \n as the row delimiter to form a CSV file. The CSV file is imported during Stream Load. Examples:

  • The following example shows how to change the column delimiter by configuring loadProps:

    "loadProps": {
        "column_separator": "\\x01",
        "row_delimiter": "\\x02"
    }
  • The following example shows how to change the import format to JSON by configuring loadProps:

    "loadProps": {
        "format": "json",
        "strip_outer_array": true
    }

Use DataWorks to run a batch synchronization node

  1. Create a workspace in the DataWorks console. For more information, see Create a workspace.

  2. Create a test table and upload data to the MaxCompute data source in the DataWorks console. For more information, see Create tables and upload data.

  3. Create a StarRocks data source.

    1. On the Workspaces page of the DataWorks console, find the workspace that you want to manage and click Data Integration in the Actions column of the workspace.

    2. In the left-side navigation pane of the Data Integration page, click Data Source.

    3. In the upper-right corner of the Data Source page, click Add data source.

    4. In the Add data source dialog box, add a StarRocks data source.

  4. Create a batch synchronization node.

    1. Create a workflow. For more information, see Create a workflow.

    2. Find the workflow and create a batch synchronization node. For more information, see Configure a batch synchronization node by using the codeless UI.

  5. Query data in the StarRocks cluster. For more information, see Getting started.