DataX Writer can write data to StarRocks tables. In the underlying implementation, DataX Writer uses the Stream Load method to import data to StarRocks in the CSV or JSON format. The data read by DataX Writer is cached and imported to StarRocks in batches to improve the write performance. Alibaba Cloud DataWorks has integrated with the import capability of DataX and can synchronize MaxCompute data to EMR StarRocks. This topic describes how DataX Writer works. This topic also describes how to use DataWorks to run batch synchronization nodes.
Background information
DataX Writer writes data in the following process: Data source -> Reader -> DataX channel -> Writer -> StarRocks.
Feature description
Prepare the environment
You can download the DataX plug-in and DataX source code for testing.
You can run the python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json command during testing.
Sample code
The following sample code provides an example on how to read data from a MySQL database and import data to a StarRocks database:
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"column": [ "k1", "k2", "v1", "v2" ],
"connection": [
{
"table": [ "table1", "table2" ],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax_test1"
]
},
{
"table": [ "table3", "table4" ],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/datax_test2"
]
}
]
}
},
"writer": {
"name": "starrockswriter",
"parameter": {
"username": "xxxx",
"password": "xxxx",
"database": "xxxx",
"table": "xxxx",
"column": ["k1", "k2", "v1", "v2"],
"preSql": [],
"postSql": [],
"jdbcUrl": "jdbc:mysql://172.28.**.**:9030/",
"loadUrl": ["172.28.**.**:8030", "172.28.**.**:8030"],
"loadProps": {}
}
}
}
]
}
}The following table describes the parameters in the sample code.
Parameter | Description | Required | Default value |
username | The account that is used to connect to the StarRocks database. | Yes | No default value |
password | The password that is used to connect to the StarRocks database. | Yes | No default value |
database | The name of the StarRocks database. | Yes | No default value |
table | The name of the StarRocks table. | Yes | No default value |
loadUrl | The endpoint of the StarRocks frontend (FE) that is used to access Stream Load, in the | Yes | No default value |
column | The names of the columns to which you want to write data in the destination table. Separate the names with commas (,). Example: Important This parameter is required. If you want to import data to all columns, you can use | Yes | No default value |
preSql | The SQL statement that you want to execute before you write data to the destination table. | No | No default value |
postSql | The SQL statement that you want to execute after data is written to the destination table. | No | No default value |
jdbcUrl | The Java Database Connectivity (JDBC) connection to the destination database. The JDBS connection is used to execute the SQL statement that you want to execute before you write data to the destination table or after data is written to the destination table. | No | No default value |
maxBatchRows | The maximum number of rows that can be imported in a single Stream Load. | No | 500000 |
maxBatchSize | The maximum number of bytes that can be imported in a single Stream Load. | No | 104857600 |
flushInterval | The interval between the end of the last Stream Load and the start of the next Stream Load. Unit: ms. | No | 300000 ms |
loadProps | The request parameters of Stream Load. For more information, see Stream Load. | No | No default value |
Data type conversion
By default, the imported data is converted into strings, with \t as the column delimiter and \n as the row delimiter to form a CSV file. The CSV file is imported during Stream Load. Examples:
The following example shows how to change the column delimiter by configuring loadProps:
"loadProps": { "column_separator": "\\x01", "row_delimiter": "\\x02" }The following example shows how to change the import format to JSON by configuring loadProps:
"loadProps": { "format": "json", "strip_outer_array": true }
Use DataWorks to run a batch synchronization node
Create a workspace in the DataWorks console. For more information, see Create a workspace.
Create a test table and upload data to the MaxCompute data source in the DataWorks console. For more information, see Create tables and upload data.
Create a StarRocks data source.
On the Workspaces page of the DataWorks console, find the workspace that you want to manage and click Data Integration in the Actions column of the workspace.
In the left-side navigation pane of the Data Integration page, click Data Source.
In the upper-right corner of the Data Source page, click Add data source.
In the Add data source dialog box, add a StarRocks data source.
Create a batch synchronization node.
Create a workflow. For more information, see Create a workflow.
Find the workflow and create a batch synchronization node. For more information, see Configure a batch synchronization node by using the codeless UI.
Query data in the StarRocks cluster. For more information, see Getting started.