全部產品
Search
文件中心

ApsaraDB for SelectDB:通過SeaTunnel匯入資料

更新時間:Nov 16, 2024

SeaTunnel整合ApsaraDB for SelectDB,支援使用SeaTunnel SelectDB Sink匯入表資料至ApsaraDB for SelectDB。本文將為您介紹使用SeaTunnel SelectDB Sink同步資料至ApsaraDB for SelectDB的使用方式。

概述

SeaTunnel是一款簡單易用、高效能的分布式Data Integration平台,支援海量資料即時同步。您可以通過SeaTunnel平台讀取MySQL、Hive、Kafka等資料來源中的海量資料,然後由SeaTunnel SelectDB Sink將資料寫入到ApsaraDB for SelectDB中。

前提條件

SeaTunnel 2.3.1版本及以上。

使用方式

SeaTunnel支援以JSON格式或CSV格式將上遊資料寫入到ApsaraDB for SelectDB,不同寫入方式的配置文法如下。

JSON格式

sink { 
  SelectDB { 
    load-url="ip:http_port" 
    jdbc-url="ip:mysql_port" 
    cluster-name="Cluster" 
    table.identifier="test_db.test_table" 
    username="admin" 
    password="****" 
    selectdb.config { 
      file.type="json" 
    } 
  }
}

CSV格式

sink { 
  SelectDB { 
    load-url="ip:http_port" 
    jdbc-url="ip:mysql_port" 
    cluster-name="Cluster" 
    table.identifier="test_db.test_table" 
    username="admin" 
    password="****" 
    selectdb.config { 
      file.type="csv" 
      file.column_separator="," 
      file.line_delimiter="\n" 
    } 
  }
}

參數說明如下。

參數

是否必填

說明

load-url

ApsaraDB for SelectDB執行個體的訪問地址和HTTP協議連接埠。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

ApsaraDB for SelectDB執行個體的訪問地址和MySQL協議連接埠。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

cluster-name

ApsaraDB for SelectDB執行個體中的叢集名稱。執行個體中可能包含多個叢集,可按需選擇。

username

ApsaraDB for SelectDB執行個體的使用者名稱。

password

ApsaraDB for SelectDB執行個體對應使用者名稱的密碼。

table.identifier

ApsaraDB for SelectDB執行個體的表名,格式為庫名.表名。樣本:test_db.test_table

selectdb.config

寫入任務的屬性配置。

  • CSV寫入如下:

    selectdb.config { file.type='csv' file.column_separator=',' file.line_delimiter='\n' } 
  • JSON寫入如下:

    selectdb.config { file.type="json" file.strip_outer_array="false" }

sink.enable-delete

是否開啟大量刪除功能(僅支援Unique表)。

sink.buffer-size

緩衝的最大容量,單位位元組,預設為:10MB,當緩衝超過最大容量時,會將緩衝中的內容全部flush到Object Storage Service上,不建議修改。

sink.buffer-count

緩衝的最大條數,預設為:10000,當緩衝超過最大條數時,會將緩衝中的內容全部flush到Object Storage Service上,不建議修改。

sink.max-retries

Commit階段的最大重試次數。預設3次。

sink.enable-2pc

是否啟用兩階段交易認可,以確保exact-once語義。預設為true。

使用樣本

以MySQL資料來源為例,為您介紹如何通過SeaTunnel將上遊的MySQL資料匯入至ApsaraDB for SelectDB。樣本中各軟體版本如下:

環境

版本

JDK

1.8

SeaTunnel

2.3.3

SelectDB

3.0.4

環境準備

  1. 配置SeaTunnel環境。

    1. 下載並解壓SeaTunnel安裝包。本樣本中使用SeaTunnel安裝包:apache-seatunnel-2.3.3-bin.tar.gz。

      wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
      tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
    2. 修改SEATUNNEL_HOME/config/plugin_config設定檔,保留需要的Connector外掛程式。

      --connectors-v2--
      connector-cdc-mysql
      connector-selectdb-cloud
      connector-jdbc
      connector-fake
      connector-console
      connector-assert
      --end--
    3. 安裝SeaTunnel Connector外掛程式。

      sh bin/install-plugin.sh
    4. 下載MySQL驅動並放至SEATUNNEL_HOME/jar目錄。

      cd lib/
      wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
  2. 構造需要匯入的資料。本文以MySQL為例,構造少量範例資料來完成匯入。

    1. 建立MySQL測試表。

      CREATE TABLE `employees` (
        `emp_no` INT NOT NULL,
        `birth_date` DATE NOT NULL,
        `first_name` VARCHAR(14) NOT NULL,
        `last_name` VARCHAR(16) NOT NULL,
        `gender` ENUM('M','F') NOT NULL,
        `hire_date` DATE NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. 使用DMS構建測試資料,詳情請參見測試資料構建

  3. 配置ApsaraDB for SelectDB執行個體。

    1. 建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體

    2. 通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體

    3. 建立測試資料庫和測試表。

      1. 建立測試資料庫。

        CREATE DATABASE test_db;
      2. 建立測試表。

        USE test_db;
        CREATE TABLE employees (
            emp_no       INT NOT NULL,
            birth_date   DATE,
            first_name   VARCHAR(20),
            last_name    VARCHAR(20),
            gender       CHAR(2),
            hire_date    DATE
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
    4. 開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址

    5. 將SeaTunnel環境的公網IP添加到IP白名單中,詳情請參見設定白名單

通過SuaTunnel本地引擎同步MySQL資料到SelectDB

  1. 建立設定檔mysqlToSelectDB.conf,配置任務資訊。

    env {
      execution.parallelism = 2
      job.mode = "BATCH"
      checkpoint.interval = 10000
    }
    
    source{
      jdbc {
        url = "jdbc:mysql://host:ip/test_db"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "admin"
        password = "****"
        query = "select * from employees"
      }
    }
     
    sink {
      SelectDBCloud {
        load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080"
        jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030"
        cluster-name="new_cluster"
        table.identifier="test_db.employees"
        username="admin"
        password="****"
        selectdb.config {
            file.type="json"
        }
      }
    }
  2. 命令列提交任務。

    sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local