全部產品
Search
文件中心

ApsaraDB for SelectDB:通過DataX匯入資料

更新時間:Jul 06, 2024

本文介紹使用DataX Doris Writer同步資料至ApsaraDB for SelectDB

概述

DataX是阿里巴巴開源的一個異構資料來源離線同步工具,致力於實現包括關係型資料庫(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各種異構資料來源之間穩定高效的資料同步功能。您可以通過DataX服務讀取上遊資料,然後由DataX Doris Writer將資料寫入到ApsaraDB for SelectDB

前提條件

  • 已安裝Maven環境。

  • 已安裝Python3.6及以上版本。

使用樣本

如下以MySQL資料來源為例,介紹在Linux環境下如何通過DataX將MySQL資料匯入至ApsaraDB for SelectDB

步驟一:配置DataX環境

  1. 下載DataX程式包代碼,外掛程式代碼下載請訪問Doris社區

  2. 運行DataX程式包中的init-env.sh指令碼,構建DataX開發環境。

    sh init-env.sh
  3. 編譯mysqlreader和doriswriter。

    1. 編譯整個DataX專案。

      cd DataX/
      mvn package assembly:assembly -Dmaven.test.skip=true

      編譯產出結果在target/datax/datax/.目錄下。

      說明

      hdfsreader,hdfswriter,ossreader和osswriter這四個外掛程式需要額外的jar包,如果不需要這些外掛程式,可以在DataX/pom.xml中刪除這些外掛程式的模組。

    2. 單獨編譯mysqlreader和selectdbwriter外掛程式。

      mvn clean install -pl plugin-rdbms-util,mysqlreader,doriswriter -DskipTests

步驟二:構造需要匯入的資料

  1. 建立MySQL測試表。

    CREATE TABLE `employees` (
      `emp_no` int NOT NULL,
      `birth_date` date NOT NULL,
      `first_name` varchar(14) NOT NULL,
      `last_name` varchar(16) NOT NULL,
      `gender` enum('M','F') NOT NULL,
      `hire_date` date NOT NULL,
      PRIMARY KEY (`emp_no`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
  2. 使用DMS構建測試資料,詳情請參見測試資料構建

步驟三:配置ApsaraDB for SelectDB執行個體。

  1. 通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體

  2. 建立測試資料庫和測試表。

    1. 建立測試資料庫。

      CREATE DATABASE test_db;
    2. 建立測試表。

      USE test_db;
      CREATE TABLE employees (
          emp_no       int NOT NULL,
          birth_date   date,
          first_name   varchar(20),
          last_name    varchar(20),
          gender       char(2),
          hire_date    date
      )
      UNIQUE KEY(`emp_no`)
      DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

步驟四:通過DataX服務同步MySQL資料到SelectDB

  1. 開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址

  2. 將DataX主機的公網IP添加到IP白名單中,詳情請參見設定白名單

  3. 建立設定檔mysqlToSelectDB.json,配置任務資訊。

    {
      "job":{
        "content":[
          {
            "reader":{
                "name": "mysqlreader",
                "parameter": {
                    "column": [
                        "emp_no",
                        "birth_date",
                        "first_name",
                        "last_name",
                        "gender",
                        "hire_date"
                    ],
                    "where": "emp_no>0",
                    "connection": [
                        {
                            "jdbcUrl": [
                                "jdbc:mysql://host:port/test_db?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                            ],
                            "table": [
                                "employees"
                            ]
                        }
                    ],
                    "password": "123456",
                    "splitPk": "emp_no",
                    "username": "admin"
                }
            },
            "writer":{
              "name":"doriswriter",
              "parameter":{
                "loadUrl":[
                  "selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:8080"
                ],
                "loadProps":{
                  "format":"json",
                  "strip_outer_array":"true"
                },
                "column":[
                    "emp_no",
                    "birth_date",
                    "first_name",
                    "last_name",
                    "gender",
                    "hire_date"
                ],
                "username":"admin",
                "password":"123456",
                "postSql":[
    
                ],
                "preSql":[
    
                ],
                "connection":[
                  {
                    "jdbcUrl":"jdbc:mysql://selectdb-cn-xxx-public.selectdbfe.rds.aliyuncs.com:9030/test_db",
                    "table":[
                      "employees"
                    ],
                    "selectedDatabase":"test_db"
                  }
                ],
                "maxBatchRows":1000000,
                "batchSize":536870912000
              }
            }
          }
        ],
        "setting":{
          "errorLimit":{
            "percentage":0.02,
            "record":0
          },
          "speed":{
            "channel":5
          }
        }
      }
    }

  4. 參數說明

    參數

    是否必填

    預設值

    描述

    jdbcUrl

    JDBC串連URLjdbc:mysql://<ip>:<port>

    您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取IP地址和MySQL協議連接埠。

    IP地址:VPC地址公網地址

    連接埠:MySQL協議連接埠

    樣本:jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

    說明

    如果DataX主機和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下請使用公網地址。

    loadUrl

    SelectDB的HTTP協議訪問地址<ip>:<port>

    您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取IP地址和HTTP協議連接埠。

    IP地址:VPC地址公網地址

    連接埠:HTTP協議連接埠

    樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

    說明

    如果DataX主機和SelectDB在同一VPC下,即可使用VPC地址。如果不在同一VPC下請使用公網地址。

    username

    ApsaraDB for SelectDB執行個體的使用者名稱。

    password

    ApsaraDB for SelectDB執行個體對應使用者的密碼。

    connection.selectedDatabase

    需要寫入的ApsaraDB for SelectDB資料庫名稱。

    connection.table

    需要寫入的ApsaraDB for SelectDB表名稱。

    column

    目的表需要寫入資料的欄位,這些欄位將作為產生的JSON資料的欄位名。欄位之間用英文逗號分隔。樣本:"column": ["id","name","age"]

    preSql

    寫入資料到目的表前,會先執行這裡的標準語句。

    postSql

    寫入資料到目的表後,會執行這裡的標準語句。

    maxBatchRows

    500000

    每批次匯入資料的最大行數。和batchSize共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。

    batchSize

    104857600

    每批次匯入資料的最巨量資料量。和maxBatchRows共同控制每批次的匯入數量。每批次資料達到兩個閾值之一,即開始匯入這一批次的資料。預設值為100 M。

    maxRetries

    3

    每批次匯入資料失敗後的重試次數。

    labelPrefix

    datax_doris_writer_

    每批次上傳檔案的label首碼。最終的label將由 'labelPrefix + UUID'組成全域唯一的label,確保資料不會重複匯入。

    loadProps

    與Stream Load的請求參數相同。詳情請參見Stream Load參數說明。配置匯入資料格式使用參數format,匯入資料格式預設使用CSV,支援JSON,詳情請參考類型轉換

    flushInterval

    30000

    資料寫入批次的時間間隔。預設為30000ms

  5. 命令列提交任務。

    cd target/datax/datax/bin
    python datax.py ../mysqlToSelectDB.json

類型轉換

預設傳入的資料均會被轉為字串,並以\t作為資料行分隔符號,\n作為行分隔字元,組成csv檔案進行SelectDB匯入操作。 預設是csv格式匯入,如需更改資料行分隔符號, 則正確配置loadProps即可,樣本如下。

"loadProps": {
    "format": "csv",
    "column_separator": "\\x01",
    "line_delimiter": "\\x02"
}

如需更改匯入格式為JSON,則正確配置loadProps下的format即可,樣本如下。

"loadProps": {
    "format": "json",
    "strip_outer_array": true
}