全部產品
Search
文件中心

ApsaraDB for SelectDB:通過Spark匯入資料

更新時間:Jul 06, 2024

ApsaraDB for SelectDB相容Apache Doris,支援通過Spark Doris Connector,利用Spark的分散式運算能力匯入大批量資料。本文介紹使用Spark Doris Connector同步資料至ApsaraDB for SelectDB的基本原理和使用方式。

功能介紹

Spark Doris Connector是雲資料庫 SelectDB 版匯入大批量資料的方式之一。基於Spark的分散式運算能力,您可以將上遊資料來源(MySQL、PostgreSQL、HDFS、S3等)中的大量資料讀取到DataFrame中,再通過Spark Doris Connector匯入到ApsaraDB for SelectDB表中。同時,您也可以使用Spark的JDBC方式來讀取ApsaraDB for SelectDB表中的資料。

工作原理

ApsaraDB for SelectDB通過Spark Doris Connector匯入資料的工作原理如下圖所示。在這種架構下,Spark Doris Connector通常作為外部資料寫入到ApsaraDB for SelectDB的橋樑,利用其分散式運算叢集對資料做預先處理,加速了整個資料鏈路的資料流動,從而替代了傳統的低效能JDBC串連寫入方式。

前提條件

若使用Spark Doris Connector進行資料匯入,必須確保使用的Connector包版本為 1.3.1 及之上。

引入Spark Doris Connector依賴

可以選擇如下任一的方式擷取Doris Connector依賴。

  • 採用Maven時,引入依賴的方式如下所示。更多依賴版本請參見Maven倉庫

    <dependency>
        <groupId>org.apache.doris</groupId>
        <artifactId>spark-doris-connector-3.2_2.12</artifactId>
        <version>1.3.2</version>
    </dependency>
  • 通過JAR包的方式引入Connector。

    以下為您提供了三個常用的Connector,建議根據Spark版本選擇對應的Connector包。更多依賴版本請參見Maven倉庫

    說明
    • 下述JAR包使用Java 8進行編譯,如果您有其他版本的需求,請聯絡ApsaraDB for SelectDB支援人員。

    • 下述列表中Connector列從左至右版本依次含義為該jar包支援的Spark版本、使用的Scala版本以及Connector版本。

    Connector

    Runtime JAR

    2.4-2.12-1.3.2

    spark-doris-connector-2.4_2.12-1.3.2

    3.1-2.12-1.3.2

    spark-doris-connector-3.1_2.12-1.3.2

    3.2-2.12-1.3.2

    spark-doris-connector-3.2_2.12-1.3.2

    擷取到Jar包後,可通過如下方式使用:

    • Local方式運行Spark,將下載的JAR包放置於Spark安裝目錄的jars目錄下。

    • Yarn叢集模式運行Spark,將JAR檔案放入預部署套件中。樣本如下:

      1. 將spark-doris-connector-3.2_2.12-1.3.2.jar上傳到HDFS。

        hdfs dfs -mkdir /spark-jars/ 
        hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
      2. 在叢集中添加spark-doris-connector-3.2_2.12-1.3.2.jar依賴。

        spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

使用方式

在Spark Client運行Spark後,或者引入Connector包進入Spark開發環境後,您可以通過Spark SQL方式或者Dataframe方式進行資料的同步操作。以下為如何將上遊的Spark資料同步到ApsaraDB for SelectDB的樣本。

Spark SQL方式

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

參數配置項說明

參數

預設值

是否必填

描述

fenodes

ApsaraDB for SelectDB執行個體的HTTP協議訪問地址。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

ApsaraDB for SelectDB執行個體的表名,格式為:庫名.表名。例如:test_db.test_table

request.retries

3

向SelectDB發送請求的重試次數

request.connect.timeout.ms

30000

向SelectDB發送請求的連線逾時時間

request.read.timeout.ms

30000

向SelectDB發送請求的讀取逾時時間

request.query.timeout.s

3600

查詢SelectDB的逾時時間,預設值為1小時,-1表示無逾時限制

request.tablet.size

Integer.MAX_VALUE

一個RDD Partition對應的SelectDB Tablet個數。

此數值設定越小,則會產生越多的Partition。從而提升Spark側的並行度,但同時會對SelectDB造成更大的壓力。

read.field

讀取SelectDB表的列名列表,多列之間使用逗號分隔

batch.size

1024

一次從BE讀取資料的最大行數。增大此數值可減少Spark與SelectDB之間建立串連的次數。從而減輕網路延遲所帶來的額外時間開銷。

exec.mem.limit

2147483648

單個查詢的記憶體限制。預設為 2GB,單位為位元組。

deserialize.arrow.async

false

是否支援非同步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。

deserialize.queue.size

64

非同步轉換Arrow格式的內部處理隊列,當deserialize.arrow.async為true時生效。

write.fields

指定寫入SelectDB表的欄位或者欄位順序,多列之間使用逗號分隔。預設寫入時要按照SelectDB表欄位順序寫入全部欄位。

sink.batch.size

100000

單次寫BE的最大行數。

sink.max-retries

0

寫BE失敗之後的重試次數。

sink.properties.format

csv

Stream Load的資料格式。共支援3種格式:csv,json,arrow。

sink.properties.*

--

Stream Load的匯入參數。例如:指定資料行分隔符號:'sink.properties.column_separator' = ','。參數更多資訊,請參見Stream Load

sink.task.partition.size

SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最後寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。

此數值設定越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 sink.task.use.repartition使用。

sink.task.use.repartition

false

是否採用repartition方式控制SelectDB寫入Partition數。預設值為 false,採用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 運算元,可能會導致整個計算並行度降低)。

如果設定為true,則採用repartition方式(注意:可設定最後Partition數,但會額外增加shuffle開銷)。

sink.batch.interval.ms

50

每個批次sink的間隔時間,單位 ms。

sink.enable-2pc

false

是否開啟兩階段交易認可。開啟後將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。

sink.auto-redirect

true

是否重新導向 StreamLoad 請求。開啟後 StreamLoad 將通過 FE 寫入,不再顯式擷取 BE 資訊。

user

訪問ApsaraDB for SelectDB執行個體的使用者名稱。

password

訪問ApsaraDB for SelectDB執行個體的密碼。

filter.query.in.max.count

100

謂詞下推中,in運算式value列表元素最大數量。超過此數量,則in運算式條件過濾在Spark側處理。

ignore-type

指在定臨時視圖中,讀取 schema 時要忽略的欄位類型。

例如:'ignore-type'='bitmap,hll'

DataFrame方式

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "待付款"),
  ("2", 200, null),
  ("3", 300, "已收貨")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

參數配置項說明

參數

預設值

是否必填

描述

fenodes

ApsaraDB for SelectDB執行個體的HTTP協議訪問地址。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

table.identifier

ApsaraDB for SelectDB執行個體的表名,格式為:庫名.表名。例如:test_db.test_table

request.retries

3

向SelectDB發送請求的重試次數

request.connect.timeout.ms

30000

向SelectDB發送請求的連線逾時時間

request.read.timeout.ms

30000

向SelectDB發送請求的讀取逾時時間

request.query.timeout.s

3600

查詢SelectDB的逾時時間,預設值為1小時,-1表示無逾時限制

request.tablet.size

Integer.MAX_VALUE

一個RDD Partition對應的SelectDB Tablet個數。

此數值設定越小,則會產生越多的Partition。從而提升Spark側的並行度,但同時會對SelectDB造成更大的壓力。

read.field

讀取SelectDB表的列名列表,多列之間使用逗號分隔

batch.size

1024

一次從BE讀取資料的最大行數。增大此數值可減少Spark與SelectDB之間建立串連的次數。從而減輕網路延遲所帶來的額外時間開銷。

exec.mem.limit

2147483648

單個查詢的記憶體限制。預設為 2GB,單位為位元組。

deserialize.arrow.async

false

是否支援非同步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。

deserialize.queue.size

64

非同步轉換Arrow格式的內部處理隊列,當deserialize.arrow.async為true時生效。

write.fields

指定寫入SelectDB表的欄位或者欄位順序,多列之間使用逗號分隔。預設寫入時要按照SelectDB表欄位順序寫入全部欄位。

sink.batch.size

100000

單次寫BE的最大行數。

sink.max-retries

0

寫BE失敗之後的重試次數。

sink.properties.format

csv

Stream Load的資料格式。共支援3種格式:csv,json,arrow。

sink.properties.*

--

Stream Load的匯入參數。例如:指定資料行分隔符號:'sink.properties.column_separator' = ','。更多參數請參考:Stream Load

sink.task.partition.size

SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最後寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。

此數值設定越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 sink.task.use.repartition使用。

sink.task.use.repartition

false

是否採用repartition方式控制SelectDB寫入Partition數。預設值為 false,採用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 運算元,可能會導致整個計算並行度降低)。

如果設定為true,則採用repartition方式(注意:設定最後Partition數,但會額外增加shuffle開銷)。

sink.batch.interval.ms

50

每個批次sink的間隔時間,單位 ms。

sink.enable-2pc

false

是否開啟兩階段交易認可。開啟後將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。

sink.auto-redirect

true

是否重新導向 StreamLoad 請求。開啟後 StreamLoad 將通過 FE 寫入, 不再顯式擷取 BE 資訊。

user

訪問ApsaraDB for SelectDB執行個體的使用者名稱。

password

訪問ApsaraDB for SelectDB執行個體的密碼。

filter.query.in.max.count

100

謂詞下推中,in運算式value列表元素最大數量。超過此數量,則in運算式條件過濾在Spark側處理。

ignore-type

指在定臨時視圖中,讀取 schema 時要忽略的欄位類型。

例如:'ignore-type'='bitmap,hll'

sink.streaming.passthrough

false

將第一列的值不經過處理直接寫入。

使用樣本

樣本環境中各個軟體的版本如下:

軟體

Java

Spark

Scala

SelectDB

版本

1.8

3.1.2

2.12

3.0.4

環境準備

  • 配置Spark環境。

    1. 下載並解壓Spark安裝包。本樣本中使用Spark安裝包:spark-3.1.2-bin-hadoop3.2.tgz。

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
    2. 將spark-doris-connector-3.2_2.12-1.3.2.jar放到SPARK_HOME/jars目錄下。

  • 構造需要匯入的資料。本文以MySQL為例,構造少量範例資料來完成匯入。

    1. 建立MySQL測試表。

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. 使用DMS構建測試資料,詳情請參見測試資料構建

  • 配置ApsaraDB for SelectDB執行個體。

    1. 建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體

    2. 通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體

    3. 建立測試資料庫和測試表。

      1. 建立測試資料庫。

        CREATE DATABASE test_db;
      2. 建立測試表。

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. 開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址

    5. 將Spark環境的公網IP添加到IP白名單中,詳情請參見設定白名單

同步MySQL資料到SelectDB

Spark SQL方式

本樣本為如何使用Spark SQL方式將上遊的MySQL資料匯入至ApsaraDB for SelectDB

  1. 啟動spark-sql服務。

    bin/spark-sql
  2. 在spark-sql上提交任務。

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. Spark任務執行完成後,登入ApsaraDB for SelectDB,查看通過Spark匯入的資料。

DataFrame方式

本樣本為如何使用DataFrame方式將上遊的MySQL資料匯入至ApsaraDB for SelectDB

  1. 啟動spark-shell服務。

    bin/spark-shell
  2. 在spark-shell上提交任務。

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("doris")
    	.option("fenodes", "host:httpPort")
    	.option("table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.format", "json")
    	.save()
  3. Spark任務執行完成後,登入ApsaraDB for SelectDB,查看通過Spark匯入的資料。