ApsaraDB for SelectDB相容Apache Doris,支援通過Spark Doris Connector,利用Spark的分散式運算能力匯入大批量資料。本文介紹使用Spark Doris Connector同步資料至ApsaraDB for SelectDB的基本原理和使用方式。
功能介紹
Spark Doris Connector是雲資料庫 SelectDB 版匯入大批量資料的方式之一。基於Spark的分散式運算能力,您可以將上遊資料來源(MySQL、PostgreSQL、HDFS、S3等)中的大量資料讀取到DataFrame中,再通過Spark Doris Connector匯入到ApsaraDB for SelectDB表中。同時,您也可以使用Spark的JDBC方式來讀取ApsaraDB for SelectDB表中的資料。
工作原理
ApsaraDB for SelectDB通過Spark Doris Connector匯入資料的工作原理如下圖所示。在這種架構下,Spark Doris Connector通常作為外部資料寫入到ApsaraDB for SelectDB的橋樑,利用其分散式運算叢集對資料做預先處理,加速了整個資料鏈路的資料流動,從而替代了傳統的低效能JDBC串連寫入方式。
前提條件
若使用Spark Doris Connector進行資料匯入,必須確保使用的Connector包版本為 1.3.1 及之上。
引入Spark Doris Connector依賴
可以選擇如下任一的方式擷取Doris Connector依賴。
採用Maven時,引入依賴的方式如下所示。更多依賴版本請參見Maven倉庫。
<dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.2</version> </dependency>通過JAR包的方式引入Connector。
以下為您提供了三個常用的Connector,建議根據Spark版本選擇對應的Connector包。更多依賴版本請參見Maven倉庫。
說明下述JAR包使用Java 8進行編譯,如果您有其他版本的需求,請聯絡ApsaraDB for SelectDB支援人員。
下述列表中Connector列從左至右版本依次含義為該jar包支援的Spark版本、使用的Scala版本以及Connector版本。
Connector
Runtime JAR
2.4-2.12-1.3.2
3.1-2.12-1.3.2
3.2-2.12-1.3.2
擷取到Jar包後,可通過如下方式使用:
Local方式運行Spark,將下載的JAR包放置於Spark安裝目錄的jars目錄下。
Yarn叢集模式運行Spark,將JAR檔案放入預部署套件中。樣本如下:
將spark-doris-connector-3.2_2.12-1.3.2.jar上傳到HDFS。
hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/在叢集中添加spark-doris-connector-3.2_2.12-1.3.2.jar依賴。
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar
使用方式
在Spark Client運行Spark後,或者引入Connector包進入Spark開發環境後,您可以通過Spark SQL方式或者Dataframe方式進行資料的同步操作。以下為如何將上遊的Spark資料同步到ApsaraDB for SelectDB的樣本。
Spark SQL方式
val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
"table.identifier"="${selectdbTable}",
"fenodes"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.format"="json"
);
INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;參數配置項說明
參數 | 預設值 | 是否必填 | 描述 |
fenodes | 無 | 是 | ApsaraDB for SelectDB執行個體的HTTP協議訪問地址。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。 樣本: |
table.identifier | 無 | 是 | ApsaraDB for SelectDB執行個體的表名,格式為: |
request.retries | 3 | 否 | 向SelectDB發送請求的重試次數 |
request.connect.timeout.ms | 30000 | 否 | 向SelectDB發送請求的連線逾時時間 |
request.read.timeout.ms | 30000 | 否 | 向SelectDB發送請求的讀取逾時時間 |
request.query.timeout.s | 3600 | 否 | 查詢SelectDB的逾時時間,預設值為1小時,-1表示無逾時限制 |
request.tablet.size | Integer.MAX_VALUE | 否 | 一個RDD Partition對應的SelectDB Tablet個數。 此數值設定越小,則會產生越多的Partition。從而提升Spark側的並行度,但同時會對SelectDB造成更大的壓力。 |
read.field | 無 | 否 | 讀取SelectDB表的列名列表,多列之間使用逗號分隔 |
batch.size | 1024 | 否 | 一次從BE讀取資料的最大行數。增大此數值可減少Spark與SelectDB之間建立串連的次數。從而減輕網路延遲所帶來的額外時間開銷。 |
exec.mem.limit | 2147483648 | 否 | 單個查詢的記憶體限制。預設為 2GB,單位為位元組。 |
deserialize.arrow.async | false | 否 | 是否支援非同步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。 |
deserialize.queue.size | 64 | 否 | 非同步轉換Arrow格式的內部處理隊列,當 |
write.fields | 無 | 否 | 指定寫入SelectDB表的欄位或者欄位順序,多列之間使用逗號分隔。預設寫入時要按照SelectDB表欄位順序寫入全部欄位。 |
sink.batch.size | 100000 | 否 | 單次寫BE的最大行數。 |
sink.max-retries | 0 | 否 | 寫BE失敗之後的重試次數。 |
sink.properties.format | csv | 否 | Stream Load的資料格式。共支援3種格式:csv,json,arrow。 |
sink.properties.* | -- | 否 | Stream Load的匯入參數。例如:指定資料行分隔符號: |
sink.task.partition.size | 無 | 否 | SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最後寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。 此數值設定越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 |
sink.task.use.repartition | false | 否 | 是否採用repartition方式控制SelectDB寫入Partition數。預設值為 false,採用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 運算元,可能會導致整個計算並行度降低)。 如果設定為true,則採用repartition方式(注意:可設定最後Partition數,但會額外增加shuffle開銷)。 |
sink.batch.interval.ms | 50 | 否 | 每個批次sink的間隔時間,單位 ms。 |
sink.enable-2pc | false | 否 | 是否開啟兩階段交易認可。開啟後將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。 |
sink.auto-redirect | true | 否 | 是否重新導向 StreamLoad 請求。開啟後 StreamLoad 將通過 FE 寫入,不再顯式擷取 BE 資訊。 |
user | 無 | 是 | 訪問ApsaraDB for SelectDB執行個體的使用者名稱。 |
password | 無 | 是 | 訪問ApsaraDB for SelectDB執行個體的密碼。 |
filter.query.in.max.count | 100 | 否 | 謂詞下推中,in運算式value列表元素最大數量。超過此數量,則in運算式條件過濾在Spark側處理。 |
ignore-type | 無 | 否 | 指在定臨時視圖中,讀取 schema 時要忽略的欄位類型。 例如: |
DataFrame方式
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "待付款"),
("2", 200, null),
("3", 300, "已收貨")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("fenodes", selectdbHttpPort)
.option("table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 100000)
.option("sink.max-retries", 3)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()參數配置項說明
參數 | 預設值 | 是否必填 | 描述 |
fenodes | 無 | 是 | ApsaraDB for SelectDB執行個體的HTTP協議訪問地址。 您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠。 樣本: |
table.identifier | 無 | 是 | ApsaraDB for SelectDB執行個體的表名,格式為: |
request.retries | 3 | 否 | 向SelectDB發送請求的重試次數 |
request.connect.timeout.ms | 30000 | 否 | 向SelectDB發送請求的連線逾時時間 |
request.read.timeout.ms | 30000 | 否 | 向SelectDB發送請求的讀取逾時時間 |
request.query.timeout.s | 3600 | 否 | 查詢SelectDB的逾時時間,預設值為1小時,-1表示無逾時限制 |
request.tablet.size | Integer.MAX_VALUE | 否 | 一個RDD Partition對應的SelectDB Tablet個數。 此數值設定越小,則會產生越多的Partition。從而提升Spark側的並行度,但同時會對SelectDB造成更大的壓力。 |
read.field | 無 | 否 | 讀取SelectDB表的列名列表,多列之間使用逗號分隔 |
batch.size | 1024 | 否 | 一次從BE讀取資料的最大行數。增大此數值可減少Spark與SelectDB之間建立串連的次數。從而減輕網路延遲所帶來的額外時間開銷。 |
exec.mem.limit | 2147483648 | 否 | 單個查詢的記憶體限制。預設為 2GB,單位為位元組。 |
deserialize.arrow.async | false | 否 | 是否支援非同步轉換Arrow格式到spark-doris-connector迭代所需的RowBatch。 |
deserialize.queue.size | 64 | 否 | 非同步轉換Arrow格式的內部處理隊列,當 |
write.fields | 無 | 否 | 指定寫入SelectDB表的欄位或者欄位順序,多列之間使用逗號分隔。預設寫入時要按照SelectDB表欄位順序寫入全部欄位。 |
sink.batch.size | 100000 | 否 | 單次寫BE的最大行數。 |
sink.max-retries | 0 | 否 | 寫BE失敗之後的重試次數。 |
sink.properties.format | csv | 否 | Stream Load的資料格式。共支援3種格式:csv,json,arrow。 |
sink.properties.* | -- | 否 | Stream Load的匯入參數。例如:指定資料行分隔符號: |
sink.task.partition.size | 無 | 否 | SelectDB寫入任務對應的Partition個數。Spark RDD經過過濾等操作,最後寫入的 Partition 數可能會比較大,但每個Partition對應的記錄數比較少,導致寫入頻率增加和計算資源浪費。 此數值設定越小,可以降低SelectDB寫入頻率,減少SelectDB合并壓力。該參數配合 |
sink.task.use.repartition | false | 否 | 是否採用repartition方式控制SelectDB寫入Partition數。預設值為 false,採用coalesce方式控制(注意:如果在寫入之前沒有 Spark action 運算元,可能會導致整個計算並行度降低)。 如果設定為true,則採用repartition方式(注意:設定最後Partition數,但會額外增加shuffle開銷)。 |
sink.batch.interval.ms | 50 | 否 | 每個批次sink的間隔時間,單位 ms。 |
sink.enable-2pc | false | 否 | 是否開啟兩階段交易認可。開啟後將會在作業結束時提交事務,而部分任務失敗時會將所有預提交狀態的事務會滾。 |
sink.auto-redirect | true | 否 | 是否重新導向 StreamLoad 請求。開啟後 StreamLoad 將通過 FE 寫入, 不再顯式擷取 BE 資訊。 |
user | 無 | 是 | 訪問ApsaraDB for SelectDB執行個體的使用者名稱。 |
password | 無 | 是 | 訪問ApsaraDB for SelectDB執行個體的密碼。 |
filter.query.in.max.count | 100 | 否 | 謂詞下推中,in運算式value列表元素最大數量。超過此數量,則in運算式條件過濾在Spark側處理。 |
ignore-type | 無 | 否 | 指在定臨時視圖中,讀取 schema 時要忽略的欄位類型。 例如: |
sink.streaming.passthrough | false | 否 | 將第一列的值不經過處理直接寫入。 |
使用樣本
樣本環境中各個軟體的版本如下:
軟體 | Java | Spark | Scala | SelectDB |
版本 | 1.8 | 3.1.2 | 2.12 | 3.0.4 |
環境準備
配置Spark環境。
下載並解壓Spark安裝包。本樣本中使用Spark安裝包:spark-3.1.2-bin-hadoop3.2.tgz。
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz tar xvzf spark-3.1.2-bin-hadoop3.2.tgz將spark-doris-connector-3.2_2.12-1.3.2.jar放到SPARK_HOME/jars目錄下。
構造需要匯入的資料。本文以MySQL為例,構造少量範例資料來完成匯入。
建立MySQL測試表。
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3使用DMS構建測試資料,詳情請參見測試資料構建。
配置ApsaraDB for SelectDB執行個體。
建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體。
通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體。
建立測試資料庫和測試表。
建立測試資料庫。
CREATE DATABASE test_db;建立測試表。
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址。
將Spark環境的公網IP添加到IP白名單中,詳情請參見設定白名單。
同步MySQL資料到SelectDB
Spark SQL方式
本樣本為如何使用Spark SQL方式將上遊的MySQL資料匯入至ApsaraDB for SelectDB。
啟動spark-sql服務。
bin/spark-sql在spark-sql上提交任務。
CREATE TEMPORARY VIEW mysql_tbl USING jdbc OPTIONS( "url"="jdbc:mysql://host:port/test_db", "dbtable"="employees", "driver"="com.mysql.jdbc.Driver", "user"="admin", "password"="****" ); CREATE TEMPORARY VIEW selectdb_tbl USING doris OPTIONS( "table.identifier"="test_db.employees", "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080", "user"="admin", "password"="****", "sink.properties.format"="json" ); INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;Spark任務執行完成後,登入ApsaraDB for SelectDB,查看通過Spark匯入的資料。
DataFrame方式
本樣本為如何使用DataFrame方式將上遊的MySQL資料匯入至ApsaraDB for SelectDB。
啟動spark-shell服務。
bin/spark-shell在spark-shell上提交任務。
val mysqlDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://host:port/test_db") .option("dbtable", "employees") .option("driver", "com.mysql.jdbc.Driver") .option("user", "admin") .option("password", "****") .load() mysqlDF.write.format("doris") .option("fenodes", "host:httpPort") .option("table.identifier", "test_db.employees") .option("user", "admin") .option("password", "****") .option("sink.batch.size", 100000) .option("sink.max-retries", 3) .option("sink.properties.format", "json") .save()Spark任務執行完成後,登入ApsaraDB for SelectDB,查看通過Spark匯入的資料。