全部產品
Search
文件中心

E-MapReduce:DataX Writer

更新時間:Jul 10, 2024

DataX Writer外掛程式實現了寫入資料到StarRocks目的表的功能。在底層實現上,DataX Writer通過Stream Load以CSV或JSON格式匯入資料至StarRocks。內部將Reader讀取的資料進行緩衝後大量匯入至StarRocks,以提高寫入效能。阿里雲DataWorks已經整合了DataX匯入的能力,可以同步MaxCompute資料到EMR StarRocks。本文為您介紹DataX Writer原理,以及如何使用DataWorks進行離線同步任務。

背景資訊

DataX Writer總體的資料流為Source -> Reader -> DataX channel -> Writer -> StarRocks。

功能說明

環境準備

您可以下載DataX外掛程式DataX源碼進行測試:

測試時可以使用命令python datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug job.json

配置範例

從MySQL讀取資料後匯入至StarRocks。
{
    "job": {
        "setting": {
            "speed": {
                 "channel": 1
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "column": [ "k1", "k2", "v1", "v2" ],
                        "connection": [
                            {
                                "table": [ "table1", "table2" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test1"
                                ]
                            },
                            {
                                "table": [ "table3", "table4" ],
                                "jdbcUrl": [
                                     "jdbc:mysql://127.0.0.1:3306/datax_test2"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "starrockswriter",
                    "parameter": {
                        "username": "xxxx",
                        "password": "xxxx",
                        "database": "xxxx",
                        "table": "xxxx",
                        "column": ["k1", "k2", "v1", "v2"],
                        "preSql": [],
                        "postSql": [],
                        "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
                        "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030""],
                        "loadProps": {}
                    }
                }
            }
        ]
    }
}
相關參數描述如下表所示。
參數描述是否必選預設值
usernameStarRocks資料庫的使用者名稱。
passwordStarRocks資料庫的密碼。
databaseStarRocks資料庫的名稱。
tableStarRocks表的名稱。
loadUrlStarRocks FE的地址,用於Stream Load,可以為多個FE地址,格式為fe_ip:fe_http_port
column目的表需要寫入資料的欄位,欄位之間用英文逗號(,)分隔。例如,"column": ["id","name","age"]
重要 該參數必須指定。如果希望匯入所有欄位,可以使用["*"]
preSql寫入資料到目的表前,會先執行設定的標準語句。
postSql寫入資料到目的表後,會先執行設定的標準語句。
jdbcUrl目的資料庫的JDBC串連資訊,用於執行preSqlpostSql
maxBatchRows單次Stream Load匯入的最大行數。500000(50W)
maxBatchSize單次Stream Load匯入的最大位元組數。104857600 (100M)
flushInterval上一次Stream Load結束至下一次開始的時間間隔。單位為ms。300000(ms)
loadPropsStream Load的請求參數,詳情請參見Stream Load

類型轉換

預設傳入的資料均會被轉為字串,並以\t作為資料行分隔符號,\n作為行分隔字元,組成CSV檔案進行Stream Load匯入操作。類型轉換樣本如下:
  • 更改資料行分隔符號,則loadProps配置如下。
    "loadProps": {
        "column_separator": "\\x01",
        "row_delimiter": "\\x02"
    }
  • 更改匯入格式為JSON,則loadProps配置如下。
    "loadProps": {
        "format": "json",
        "strip_outer_array": true
    }

匯入案例

重要 請確保RDS MySQL和StarRocks執行個體在同一個網路VPC和VSW下。
  • 建立MySQL來源資料表
    create table `sr_db`.sr_table(id int, name varchar (1024) ,event_time  DATETIME);
    insert into `sr_db`.sr_table values (1,"aaa","2015-09-12 00:00:00"),(2,"bbb","2015-09-12 00:00:00");
  • 建立StarRocks資料表
    CREATE TABLE IF NOT EXISTS load_db.datax_into_tbl (
      id          INT,
      name           STRING,
      event_time  DATETIME
    ) ENGINE=OLAP
    DUPLICATE KEY(id, name)
    DISTRIBUTED BY HASH(id, name) BUCKETS 1
    PROPERTIES (
      "replication_num" = "1"
    );
  • 建立同步任務
    {
        "job": {
            "setting": {
                "speed": {
                     "channel": 1
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0
                }
            },
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "username": "username",
                            "password": "***",
                            "column": [ "id", "name", "event_time" ],
                            "connection": [
                                {
                                    "table": [ "sr_table"],
                                    "jdbcUrl": [
                                         "jdbc:mysql://rm-*****.mysql.rds.aliyuncs.com:3306/sr_db"
                                    ]
                                }
                            ]
                        }
                    },
                   "writer": {
                        "name": "starrockswriter",
                        "parameter": {
                            "username": "admin",
                            "password": "****",
                            "database": "load_db",
                            "table": "datax_load_tbl",
                            "column": ["id", "name", "event_time"],
                            "preSql": [],
                            "postSql": [],
                            "jdbcUrl": "jdbc:mysql://fe-c-*****-internal.starrocks.aliyuncs.com:9030/",
                            "loadUrl": ["fe-c-*****-internal.starrocks.aliyuncs.com:8030"],
                            "loadProps": {}
                        }
                    }
                }
            ]
        }
    }
    返回資訊如下所示。
    任務啟動時刻                    : 2023-04-07 13:05:55
    任務結束時刻                    : 2023-04-07 13:06:05
    任務總計耗時                    :                 10s
    任務平均流量                    :                2B/s
    記錄寫入速度                    :              0rec/s
    讀出記錄總數                    :                   2
    讀寫失敗總數                    :                   0

DataWorks離線同步使用方式

  1. 在DataWorks上建立工作空間,詳情請參見建立工作空間

  2. 在DataWorks上建立測試表並上傳資料到MaxCompute資料來源,詳情請參見建表並上傳資料

  3. 建立StarRocks資料來源。

    1. 在DataWorks的工作空間列表頁面,單擊目標工作空間操作列的Data Integration

    2. 在左側導覽列,單擊資料來源

    3. 單擊右上方的新增資料來源

    4. 新增資料來源對話方塊中,新增StarRocks類型的資料來源。

  4. 建立離線同步任務流程。

    1. 建立商務程序,詳情請參見建立商務程序

    2. 在目錄商務程序,建立離線同步任務,詳情請參見通過嚮導模式配置離線同步任務

  5. 在StarRocks執行個體中查看資料,詳情請參見查看中繼資料