Spark是用於大規模資料處理的統一分析引擎,Hologres已與Spark社區版及EMR Spark版實現高效整合,迅速協助企業構建資料倉儲。Hologres提供的Spark Connector支援在Spark叢集中建立Hologres Catalog,以外部表格的形式實現高效能的資料批量讀取和匯入,相較於原生JDBC,具有更為卓越的效能。
使用限制
僅V1.3及以上版本的Hologres執行個體支援Spark Connector。您可以在Hologres管理主控台的執行個體詳情頁查看當前執行個體版本。若您的執行個體是V1.3以下版本,請使用執行個體升級或通過搜尋(DingTalk群號:32314975)加入即時數倉Hologres交流群申請升級執行個體。
準備工作
-
您需要安裝對應版本的Spark環境,能夠運行spark-sql、spark-shell或pyspark命令,建議使用Spark 3.3.0及以上版本,以避免依賴問題,並獲得更豐富的功能體驗。
-
您可以使用阿里雲EMR Spark來快速構建Spark環境對接Hologres執行個體,詳情請參見EMR Spark功能。
-
您也可以獨立在所需環境中搭建Spark環境,詳情請參見Apache Spark。
-
-
Spark讀寫Hologres需要引用的連接器JAR包
hologres-connector-spark-3.x。本文使用當前最新版本1.5.2。您可以通過Maven中央倉庫進行下載。更多關於Connector的資源都已開源,詳情請參見Hologres-Connectors。 -
若您需要使用Java語言開發Spark作業,並在如IntelliJ IDEA工具進行本地調試時,可直接引入下方Maven依賴進行pom.xml檔案配置。
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres Catalog
Spark Connector從1.5.2版本開始支援Hologres Catalog,您可以通過外部表格的方式方便地讀寫Hologres。
Spark中每個Hologres Catalog對應Hologres中的各個Database,每個Hologres Catalog中的Namespace對應Database中的各個Schema。以下部分將展示如何在Spark中使用Hologres Catalog。
目前Hologres Catalog暫不支援建立表。
本文Hologres執行個體中對應資料庫和表名如下:
test_db --資料庫
public.test_table1 --public模式下表
public.test_table2
test_schema.test_table3 -- test_schema模式下表
Hologres Catalog初始化
在Spark叢集中啟動spark-sql,載入Hologres Connector並指定Catalog參數。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
Hologres Catalog常用命令
-
載入Hologres Catalog
Spark中的Hologres Catalog完全對應一個Hologres的Database,使用過程中無法更改。
USE hologres_external_test_db; -
查詢所有Namespace。
Spark中的Namespace,對應Hologres中的Schema,預設為public,使用過程中可以使用
USE指令調整預設的Schema。-- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。 SHOW NAMESPACES; -
查詢Namespace下的表。
-
查詢所有表。
SHOW TABLES; -
查詢指定Namespace下的表。
USE test_schema; SHOW TABLES; -- 或者使用 SHOW TABLES IN test_schema;
-
-
讀取和寫入表。
使用SELECT和INSERT語句對Catalog中的Hologres外部表格進行讀寫。
-- 讀取表。 SELECT * FROM public.test_table1; -- 寫入表。 INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
匯入Hologres
本節測試Hologres資料來源TPC-H資料集中的customer表。Spark可以通過CSV格式讀取Hologres表資料。您可以直接下載customer資料。customer表結構建立SQL如下。
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);
使用Spark-SQL匯入
在使用Spark-SQL時,藉助Catalog載入Hologres表的中繼資料更加便捷,同時也可以通過建立暫存資料表的方式來聲明一個Hologres表。
-
Hologres-Connector-Spark 1.5.2以下版本,不支援Catalog,只能通過建立暫存資料表的方式聲明Hologres表。
-
Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明。
-
初始化Hologres Catalog。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
從CSV源表匯入資料至Hologres外表。
說明Spark的INSERT INTO文法不支援通過
column_list指定部分列進行寫入,例如使用INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable來表示唯寫入c_custkey這一個欄位。若您希望寫入部分所需欄位,可以使用
CREATE TEMPORARY VIEW的方式聲明僅所需欄位的Hologres暫存資料表。CATALOG寫入
-- 載入Hologres Catalog。 USE hologres_external_test_db; -- 建立csv資料來源 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- 本地測試,直接用檔案所在絕對路徑。 ); -- 將csv表的資料寫入到Hologres中 INSERT INTO public.customer_holo_table SELECT * FROM csvTable;TEMPORARY VIEW寫入
-- 建立csv資料來源 CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- 建立hologres暫存資料表 CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
使用DataFrame匯入
使用spark-shell、pyspark等開發Spark作業,您也可以調用DataFrame的write介面來進行寫入。不同的開發語言會將讀取到CSV檔案資料轉為DataFrame後,再寫入Hologres執行個體,相關範例程式碼如下。Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明。
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// csv源的schema
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// 從csv檔案讀取資料為DataFrame
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// 將讀取到的DataFrame寫入到Hologres中
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// csv源的schema
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// 使用本地模式運行
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// 從csv檔案讀取資料為DataFrame
// 本地測試採用customer資料的絕對路徑
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// 將讀取到的DataFrame寫入到Hologres中
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}
Maven檔案所需的配置如下。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
from pyspark.sql.types import *
# csv源的schema
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# 從csv檔案讀取資料為DataFrame
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# 將讀取到的DataFrame寫入到Hologres中
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
Spark執行不同語言的作業,具體操作如下:
-
Scala
-
您可以用範例程式碼產生sparktest.scala檔案,通過如下方式執行作業。
-- 載入依賴 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- 本地測試使用絕對路徑負載檔案 scala> :load D:/sparktest.scala -
您也可以在載入依賴完成後,直接將範例程式碼粘貼進去執行。
-
-
Java
您可以使用開發工具引入範例程式碼,通過Maven工具完成打包。例如包名spark_test.jar。通過下方代碼執行作業。
-- 作業jar包採用絕對路徑 spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jar -
Python
您可以在下方代碼執行完成後,直接將範例程式碼粘貼進去執行。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
讀取Hologres
-
從spark-connector1.3.2版本起,支援讀取Hologres。相比Spark預設的
jdbc-connector,spark-connector可以按照Hologres表的shard進行並發讀取,效能更好。讀取的並發與表的shard數有關,spark-connector可以通過參數read.max_task_count進行限制,最終作業會產生Min(shardCount, max_task_count)個讀取Task。而且也支援Schema推斷,不傳入Schema時,會根據Hologres表的Schema推斷出Spark側的Schema。 -
從spark-connector1.5.0版本起,讀取Hologres表支援了謂詞下推,LIMIT下推以及欄位裁剪。同時,也支援傳入Hologres的
SELECT QUERY來讀取資料。此版本開始支援了批量模式讀取,相比之前版本,讀取效能提升3-4倍。
使用Spark-SQL讀取
使用Spark-SQL時,通過Catalog載入Hologres表的中繼資料更加方便,您也可以通過建立暫存資料表的方式聲明一個Hologres表。
-
Hologres-Connector-Spark 1.5.2以下版本,不支援Catalog,只能通過建立暫存資料表的方式聲明Hologres表。
-
Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明。
-
初始化Hologres Catalog。
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
讀取Hologres資料。
-
通過Catalog讀取。
-- 載入Hologres Catalog。 USE hologres_external_test_db; -- 讀取Hologres表,支援欄位裁剪和謂詞下推。 SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10; -
通過建立暫存資料表讀取。
CREATE TEMPORARY VIEW(table)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // Hologres表最多分為多少個task進行讀取 table "customer_holo_table" ); -- 支援欄位裁剪和謂詞下推 SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW(query)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
-
讀取Hologres資料為DataFrame
使用spark-shell、pyspark等開發Spark作業,可以調用Spark的Read介面將資料讀取為DataFrame以進行後續的計算。不同語言讀取Hologres表為DataFrame的樣本如下。Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明。
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres表最多分為多少個task進行讀取
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// 使用本地模式運行
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Hologres表最多分為多少個task進行讀取
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}
Maven檔案所需的配置如下。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Spark執行不同語言的作業,具體操作如下:
-
Scala
-
您可以用範例程式碼產生sparkselect.scala檔案,通過如下方式執行作業。
-- 載入依賴 spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- 本地測試使用絕對路徑負載檔案 scala> :load D:/sparkselect.scala -
您也可以在載入依賴完成後,直接將範例程式碼粘貼進去執行。
-
-
Java
您可以使用開發工具引入範例程式碼,通過Maven工具完成打包。例如包名spark_select.jar。通過下方代碼執行作業。
-- 作業jar包採用絕對路徑 spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jar -
Python
您可以在下方代碼執行完成後,直接將範例程式碼粘貼進去執行。
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
參數說明
通用參數
|
參數 |
預設值 |
是否必填 |
說明 |
|
username |
無 |
是 |
|
|
password |
無 |
是 |
|
|
table |
無 |
是 |
Hologres讀寫資料的表名稱。 說明
讀取資料時也可以選擇使用 |
|
jdbcurl |
無 |
是 |
Hologres即時資料API的jdbcUrl,格式為 |
|
enable_serverless_computing |
false |
否 |
是否使用Serverless資源。僅對讀取和 |
|
serverless_computing_query_priority |
3 |
否 |
Serverless Computing執行優先順序。 |
|
statement_timeout_seconds |
28800(8 小時) |
否 |
單位為s,表示QUERY執行的逾時時間。 |
|
retry_count |
3 |
否 |
當串連故障時的重試次數。 |
|
direct_connect |
對於可以直連的環境會預設使用直連。 |
否 |
批量資料讀寫的瓶頸通常是Endpoint的網路吞吐,因此我們會測試當前環境能否直連Hologres Frontend(接入節點),支援的情況下預設使用直連。此參數設定為 |
寫入參數
Hologres Connector支援Spark的SaveMode參數。對SQL來說,即INSERT INTO或者 INSERT OVERWRITE。對DataFrame來說,即WRITE時設定SaveMode為Append或者Overwrite。其中Overwrite會建立暫存資料表進行寫入,並在寫入成功之後替換原始表,請在必要時使用。
|
參數名 |
參數曾用名 |
預設值 |
是否必填 |
說明 |
|
write.mode |
copy_write_mode |
auto |
否 |
寫入的模式,取值如下,各寫入模式之間的對比請參見批量寫入模式對比。
|
|
write.copy.max_buffer_size |
max_cell_buffer_size |
52428800(50MB) |
否 |
使用COPY模式寫入時,本地buffer的最大長度通常無需調整,但在寫入欄位較大(如超長字串)導致buffer溢出時,可以調大。 |
|
write.copy.dirty_data_check |
copy_write_dirty_data_check |
false |
否 |
是否進行髒資料校正。如開啟此功能,一旦出現髒資料,能夠精確定位到寫入失敗的具體行。然而,這會對寫入效能產生一定影響,因此在非排查環節,建議不予開啟。 |
|
write.on_conflict_action |
write_mode |
INSERT_OR_REPLACE |
否 |
當INSERT目標表為有主鍵的表時,採用不同策略:
|
以下參數僅write.mode為insert時生效。
|
參數名 |
參數曾用名 |
預設值 |
是否必填 |
說明 |
|
write.insert.dynamic_partition |
dynamic_partition |
false |
否 |
|
|
write.insert.batch_size |
write_batch_size |
512 |
否 |
每個寫入線程的最大批次大小。在經過WriteMode合并後的Put數量達到WriteBatchSize時進行一次批量提交。 |
|
write.insert.batch_byte_size |
write_batch_byte_size |
2097152(2 * 1024 * 1024) |
否 |
每個寫入線程的最大批次大小,單位為Byte,預設2MB。在經過WriteMode合并後的Put資料位元組數達到WriteBatchByteSize時進行一次批量提交 |
|
write.insert.max_interval_ms |
write_max_interval_ms |
10000 |
否 |
距離上次提交超過WriteMaxIntervalMs的時間會觸發一次批量提交。 |
|
write.insert.thread_size |
write_thread_size |
1 |
否 |
寫入並發線程數(每個並發佔用1個資料庫連接)。 |
讀取參數
|
參數名 |
參數曾用名 (1.5.0及之前版本) |
預設值 |
是否必填 |
說明 |
|
read.mode |
bulk_read |
auto |
否 |
讀取的模式,取值如下:
|
|
read.max_task_count |
max_partition_count |
80 |
否 |
將讀取的Hologres表分為多個,每個分區對應一個Spark Task。如果Hologres表的ShardCount小於此參數,分區數量最多為ShardCount。 |
|
read.copy.max_buffer_size |
/ |
52428800(50MB) |
否 |
使用COPY模式讀取時,本地Buffer的最大長度,在欄位較大時出現異常應調大長度。 |
|
read.push_down_predicate |
push_down_predicate |
true |
否 |
是否進行謂詞下推,例如在查詢時應用的一些過濾條件。目前支援常見Filter過濾條件的下推,以及列裁剪。 |
|
read.push_down_limit |
push_down_limit |
true |
否 |
是否進行Limit下推。 |
|
read.select.batch_size |
scan_batch_size |
256 |
否 |
|
|
read.select.timeout_seconds |
scan_timeout_seconds |
60 |
否 |
|
|
read.query |
query |
無 |
否 |
使用傳入的 說明
|
資料類型映射
|
Spark類型 |
Hologres類型 |
|
ShortType |
SMALLINT |
|
IntegerType |
INT |
|
LongType |
BIGINT |
|
StringType |
TEXT |
|
StringType |
JSON |
|
StringType |
JSONB |
|
DecimalType |
NUMERIC(38, 18) |
|
BooleanType |
BOOL |
|
DoubleType |
DOUBLE PRECISION |
|
FloatType |
FLOAT |
|
TimestampType |
TIMESTAMPTZ |
|
DateType |
DATE |
|
BinaryType |
BYTEA |
|
BinaryType |
ROARINGBITMAP |
|
ArrayType(IntegerType) |
INT4[] |
|
ArrayType(LongType) |
INT8[] |
|
ArrayType(FloatType) |
FLOAT4[] |
|
ArrayType(DoubleType) |
FLOAT8[] |
|
ArrayType(BooleanType) |
BOOLEAN[] |
|
ArrayType(StringType) |
TEXT[] |
串連數計算
Hologres-Connector-Spark在進行讀寫時,會使用一定的JDBC串連數。可能受到如下因素影響:
-
Spark的並發,在作業運行時在Spark UI處可以看到的同時啟動並執行Task數量。
-
Connector每個並發使用的串連數:
-
COPY方式寫入,每個並發僅使用一個JDBC串連。
-
INSERT方式寫入,每個並發會使用
write_thread_size個JDBC串連。 -
讀取時,每個並發使用一個JDBC串連。
-
-
其他方面可能使用的串連數:作業啟動時,將執行Schema擷取等操作,可能會短暫建立一個串連。
因此作業使用的總的串連數可以通過如下公式計算:
|
工作項目 |
使用串連數 |
|
Catalog查詢中繼資料 |
1 |
|
讀取資料 |
parallelism * 1 + 1 |
|
寫入COPY模式 |
parallelism * 1 + 1 |
|
寫入INSERT模式 |
parallelism * write_thread_size + 1 |
以上串連數計算假設Spark可同時啟動並執行Task數大於任務產生Task數。
Spark同時可以啟動並執行Task並發可能受到使用者佈建的參數影響,如spark.executor.instances,也可能受到Hadoop對檔案分塊策略的影響,詳情請參見Apache Hadoop。