全部產品
Search
文件中心

E-MapReduce:DataX Writer

更新時間:Jul 01, 2024

DataX Writer外掛程式實現了寫入資料到StarRocks目的表的功能。在底層實現上,DataX Writer通過Stream Load以CSV或JSON格式匯入資料至StarRocks。內部將Reader讀取的資料進行緩衝後大量匯入至StarRocks,以提高寫入效能。阿里雲DataWorks已經整合了DataX匯入的能力,可以同步MaxCompute資料到EMR StarRocks。本文為您介紹DataX Writer原理,以及如何使用DataWorks進行離線同步任務。

背景資訊

DataX Writer總體的資料流為Source -> Reader -> DataX channel -> Writer -> StarRocks。

功能說明

環境準備

您可以下載DataX外掛程式DataX源碼進行測試:

測試時可以使用命令python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json

配置範例

從MySQL讀取資料後匯入至StarRocks。

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "column": [ "k1", "k2", "v1", "v2" ],
                        "connection": [
                            {
                                "table": [ "table1", "table2" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test1"
                                ]
                            },
                            {
                                "table": [ "table3", "table4" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test2"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "starrockswriter",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "database": "xxxx",
                        "table": "xxxx",
                        "column": ["k1", "k2", "v1", "v2"],
                        "preSql": [],
                        "postSql": [],
                        "jdbcUrl": "jdbc:mysql://172.28.**.**:9030/",
                        "loadUrl": ["172.28.**.**:8030", "172.28.**.**:8030"],
                        "loadProps": {}
                    }
                }
            }
        ]
    }
}

相關參數描述如下表所示。

參數

描述

是否必選

預設值

username

StarRocks資料庫的使用者名稱。

password

StarRocks資料庫的密碼。

database

StarRocks資料庫的名稱。

table

StarRocks表的名稱。

loadUrl

StarRocks FE的地址,用於Stream Load,可以為多個FE地址,格式為fe_ip:fe_http_port

column

目的表需要寫入資料的欄位,欄位之間用英文逗號(,)分隔。例如,"column": ["id","name","age"]

重要

該參數必須指定。如果希望匯入所有欄位,可以使用["*"]

preSql

寫入資料到目的表前,會先執行設定的標準語句。

postSql

寫入資料到目的表後,會先執行設定的標準語句。

jdbcUrl

目的資料庫的JDBC串連資訊,用於執行preSqlpostSql

maxBatchRows

單次Stream Load匯入的最大行數。

500000(50W)

maxBatchSize

單次Stream Load匯入的最大位元組數。

104857600 (100M)

flushInterval

上一次Stream Load結束至下一次開始的時間間隔。單位為ms。

300000(ms)

loadProps

Stream Load的請求參數,詳情請參見Stream Load

類型轉換

預設傳入的資料均會被轉為字串,並以\t作為資料行分隔符號,\n作為行分隔字元,組成CSV檔案進行Stream Load匯入操作。類型轉換樣本如下:

  • 更改資料行分隔符號,則loadProps配置如下。

    "loadProps": {
        "column_separator": "\\x01",
        "row_delimiter": "\\x02"
    }
  • 更改匯入格式為JSON,則loadProps配置如下。

    "loadProps": {
        "format": "json",
        "strip_outer_array": true
    }

DataWorks離線同步使用方式

  1. 在DataWorks上建立工作空間,詳情請參見建立工作空間

  2. 在DataWorks上建立測試表並上傳資料到MaxCompute資料來源,詳情請參見建表並上傳資料

  3. 建立StarRocks資料來源。

    1. 在DataWorks的工作空間列表頁面,單擊目標工作空間操作列的Data Integration

    2. 在左側導覽列,單擊資料來源

    3. 單擊右上方的新增資料來源

    4. 新增資料來源對話方塊中,新增StarRocks類型的資料來源。

  4. 建立離線同步任務流程。

    1. 建立商務程序,詳情請參見建立商務程序

    2. 在目錄商務程序,建立離線同步任務,詳情請參見通過嚮導模式配置離線同步任務

  5. 在StarRocks叢集中查看資料,詳情請參見快速入門