全部產品
Search
文件中心

Hologres:Spark讀寫Hologres

更新時間:Feb 04, 2026

Spark是用於大規模資料處理的統一分析引擎,Hologres已與Spark社區版及EMR Spark版實現高效整合,迅速協助企業構建資料倉儲。Hologres提供的Spark Connector支援在Spark叢集中建立Hologres Catalog,以外部表格的形式實現高效能的資料批量讀取和匯入,相較於原生JDBC,具有更為卓越的效能。

使用限制

僅V1.3及以上版本的Hologres執行個體支援Spark Connector。您可以在Hologres管理主控台執行個體詳情頁查看當前執行個體版本。若您的執行個體是V1.3以下版本,請使用執行個體升級或通過搜尋(DingTalk群號:32314975)加入即時數倉Hologres交流群申請升級執行個體。

準備工作

  • 您需要安裝對應版本的Spark環境,能夠運行spark-sql、spark-shell或pyspark命令,建議使用Spark 3.3.0及以上版本,以避免依賴問題,並獲得更豐富的功能體驗。

    • 您可以使用阿里雲EMR Spark來快速構建Spark環境對接Hologres執行個體,詳情請參見EMR Spark功能

    • 您也可以獨立在所需環境中搭建Spark環境,詳情請參見Apache Spark

  • Spark讀寫Hologres需要引用的連接器JAR包hologres-connector-spark-3.x。本文使用當前最新版本1.5.2。您可以通過Maven中央倉庫進行下載。更多關於Connector的資源都已開源,詳情請參見Hologres-Connectors

  • 若您需要使用Java語言開發Spark作業,並在如IntelliJ IDEA工具進行本地調試時,可直接引入下方Maven依賴進行pom.xml檔案配置。

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres Catalog

Spark Connector從1.5.2版本開始支援Hologres Catalog,您可以通過外部表格的方式方便地讀寫Hologres。

Spark中每個Hologres Catalog對應Hologres中的各個Database,每個Hologres Catalog中的Namespace對應Database中的各個Schema。以下部分將展示如何在Spark中使用Hologres Catalog。

說明

目前Hologres Catalog暫不支援建立表。

本文Hologres執行個體中對應資料庫和表名如下:

test_db --資料庫
  public.test_table1 --public模式下表
  public.test_table2
  test_schema.test_table3  -- test_schema模式下表 

Hologres Catalog初始化

在Spark叢集中啟動spark-sql,載入Hologres Connector並指定Catalog參數。

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Hologres Catalog常用命令

  • 載入Hologres Catalog

    Spark中的Hologres Catalog完全對應一個Hologres的Database,使用過程中無法更改。

    USE hologres_external_test_db;
  • 查詢所有Namespace。

    Spark中的Namespace,對應Hologres中的Schema,預設為public,使用過程中可以使用USE指令調整預設的Schema。

    -- 查看Hologres Catalog中的所有Namespace, 即Hologres中所有的Schema。
    SHOW NAMESPACES;
  • 查詢Namespace下的表。

    • 查詢所有表。

      SHOW TABLES;
    • 查詢指定Namespace下的表。

      USE test_schema;
      SHOW TABLES;
      
      -- 或者使用 
      SHOW TABLES IN test_schema;
  • 讀取和寫入表。

    使用SELECT和INSERT語句對Catalog中的Hologres外部表格進行讀寫。

    -- 讀取表。
    SELECT * FROM public.test_table1;
    
    -- 寫入表。
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

匯入Hologres

本節測試Hologres資料來源TPC-H資料集中的customer表。Spark可以通過CSV格式讀取Hologres表資料。您可以直接下載customer資料。customer表結構建立SQL如下。

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

使用Spark-SQL匯入

在使用Spark-SQL時,藉助Catalog載入Hologres表的中繼資料更加便捷,同時也可以通過建立暫存資料表的方式來聲明一個Hologres表。

說明
  • Hologres-Connector-Spark 1.5.2以下版本,不支援Catalog,只能通過建立暫存資料表的方式聲明Hologres表。

  • Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明

  1. 初始化Hologres Catalog。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. 從CSV源表匯入資料至Hologres外表。

    說明

    Spark的INSERT INTO文法不支援通過column_list指定部分列進行寫入,例如使用INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable來表示唯寫入c_custkey這一個欄位。

    若您希望寫入部分所需欄位,可以使用CREATE TEMPORARY VIEW 的方式聲明僅所需欄位的Hologres暫存資料表。

    CATALOG寫入

    -- 載入Hologres Catalog。
    USE hologres_external_test_db;
    
    -- 建立csv資料來源
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- 本地測試,直接用檔案所在絕對路徑。
    );
    
    -- 將csv表的資料寫入到Hologres中
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    TEMPORARY VIEW寫入

    -- 建立csv資料來源
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- 建立hologres暫存資料表
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

使用DataFrame匯入

使用spark-shell、pyspark等開發Spark作業,您也可以調用DataFrame的write介面來進行寫入。不同的開發語言會將讀取到CSV檔案資料轉為DataFrame後,再寫入Hologres執行個體,相關範例程式碼如下。Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// csv源的schema
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// 從csv檔案讀取資料為DataFrame
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// 將讀取到的DataFrame寫入到Hologres中
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // csv源的schema
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // 使用本地模式運行
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // 從csv檔案讀取資料為DataFrame
        // 本地測試採用customer資料的絕對路徑
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // 將讀取到的DataFrame寫入到Hologres中
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

Maven檔案所需的配置如下。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# csv源的schema
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# 從csv檔案讀取資料為DataFrame
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# 將讀取到的DataFrame寫入到Hologres中
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

Spark執行不同語言的作業,具體操作如下:

  • Scala

    • 您可以用範例程式碼產生sparktest.scala檔案,通過如下方式執行作業。

      -- 載入依賴 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- 本地測試使用絕對路徑負載檔案
      scala> :load D:/sparktest.scala
    • 您也可以在載入依賴完成後,直接將範例程式碼粘貼進去執行。

  • Java

    您可以使用開發工具引入範例程式碼,通過Maven工具完成打包。例如包名spark_test.jar。通過下方代碼執行作業。

    -- 作業jar包採用絕對路徑
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    您可以在下方代碼執行完成後,直接將範例程式碼粘貼進去執行。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

讀取Hologres

  • 從spark-connector1.3.2版本起,支援讀取Hologres。相比Spark預設的jdbc-connectorspark-connector可以按照Hologres表的shard進行並發讀取,效能更好。讀取的並發與表的shard數有關,spark-connector可以通過參數read.max_task_count進行限制,最終作業會產生Min(shardCount, max_task_count)個讀取Task。而且也支援Schema推斷,不傳入Schema時,會根據Hologres表的Schema推斷出Spark側的Schema。

  • 從spark-connector1.5.0版本起,讀取Hologres表支援了謂詞下推,LIMIT下推以及欄位裁剪。同時,也支援傳入Hologres的SELECT QUERY來讀取資料。此版本開始支援了批量模式讀取,相比之前版本,讀取效能提升3-4倍。

使用Spark-SQL讀取

使用Spark-SQL時,通過Catalog載入Hologres表的中繼資料更加方便,您也可以通過建立暫存資料表的方式聲明一個Hologres表。

說明
  • Hologres-Connector-Spark 1.5.2以下版本,不支援Catalog,只能通過建立暫存資料表的方式聲明Hologres表。

  • Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明

  1. 初始化Hologres Catalog。

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. 讀取Hologres資料。

    • 通過Catalog讀取。

      -- 載入Hologres Catalog。
      USE hologres_external_test_db;
      
      -- 讀取Hologres表,支援欄位裁剪和謂詞下推。
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • 通過建立暫存資料表讀取。

      CREATE TEMPORARY VIEW(table)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // Hologres表最多分為多少個task進行讀取
        table "customer_holo_table"
      );
      
      -- 支援欄位裁剪和謂詞下推
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      CREATE TEMPORARY VIEW(query)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

讀取Hologres資料為DataFrame

使用spark-shell、pyspark等開發Spark作業,可以調用Spark的Read介面將資料讀取為DataFrame以進行後續的計算。不同語言讀取Hologres表為DataFrame的樣本如下。Hologres-Connector-Spark更多參數資訊,詳情請參見參數說明

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // Hologres表最多分為多少個task進行讀取
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // 使用本地模式運行
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // Hologres表最多分為多少個task進行讀取
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Maven檔案所需的配置如下。

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Spark執行不同語言的作業,具體操作如下:

  • Scala

    • 您可以用範例程式碼產生sparkselect.scala檔案,通過如下方式執行作業。

      -- 載入依賴 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- 本地測試使用絕對路徑負載檔案
      scala> :load D:/sparkselect.scala
    • 您也可以在載入依賴完成後,直接將範例程式碼粘貼進去執行。

  • Java

    您可以使用開發工具引入範例程式碼,通過Maven工具完成打包。例如包名spark_select.jar。通過下方代碼執行作業。

    -- 作業jar包採用絕對路徑
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    您可以在下方代碼執行完成後,直接將範例程式碼粘貼進去執行。

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

參數說明

通用參數

參數

預設值

是否必填

說明

username

password

  • 當前帳號的AccessKey Secret。擷取方式請參見建立AccessKey

  • 建立的賬戶名所對應的密碼。

table

Hologres讀寫資料的表名稱。

說明

讀取資料時也可以選擇使用read.query參數來替換。

jdbcurl

Hologres即時資料API的jdbcUrl,格式為jdbc:postgresql://<host>:<port>/<db_name>。您可以進入Hologres管理主控台,單擊左側導覽列實例清單,選擇目標執行個體,在執行個體詳情頁下網絡資訊中擷取主機和連接埠號碼。

enable_serverless_computing

false

是否使用Serverless資源。僅對讀取和bulk_load模式寫入有效,詳情請參見Serverless Computing使用指南

serverless_computing_query_priority

3

Serverless Computing執行優先順序。

statement_timeout_seconds

28800(8 小時)

單位為s,表示QUERY執行的逾時時間。

retry_count

3

當串連故障時的重試次數。

direct_connect

對於可以直連的環境會預設使用直連。

批量資料讀寫的瓶頸通常是Endpoint的網路吞吐,因此我們會測試當前環境能否直連Hologres Frontend(接入節點),支援的情況下預設使用直連。此參數設定為false則不進行直連。

寫入參數

Hologres Connector支援Spark的SaveMode參數。對SQL來說,即INSERT INTO或者 INSERT OVERWRITE。對DataFrame來說,即WRITE時設定SaveMode為Append或者Overwrite。其中Overwrite會建立暫存資料表進行寫入,並在寫入成功之後替換原始表,請在必要時使用。

參數名

參數曾用名

預設值

是否必填

說明

write.mode

copy_write_mode

auto

寫入的模式,取值如下,各寫入模式之間的對比請參見批量寫入模式對比

  • auto(預設值)。Connector會根據版本和目標表的元資訊自動選擇最佳的模式,選擇邏輯如下:

    1. Hologres執行個體版本大於V2.2.25,表有主鍵,選擇bulk_load_on_conflict模式。

    2. Hologres執行個體版本大於V2.1.0,表無主鍵,選擇bulk_load模式。

    3. Hologres執行個體版本大於V1.3,選擇stream模式。

    4. 其他情況,選擇insert模式。

  • stream,即Fixed Plan加速SQL執行。在Fixed Plan中,COPY是Hologres V1.3新增的功能。與INSERT方法相比,COPY方式具有更高的輸送量(因其採用流模式)、更低的資料延遲以及更低的用戶端記憶體消耗(因其不攢批)。

    說明

    需要Hologres Connector 1.3.0及以上版本,Hologres v1.3.34及以上版本。

  • bulk_load,即批量COPY。批量COPY相比流式的Fixed Plan中COPY,在RPS條件下,Hologres執行個體的負載更低,但僅支援寫入無主鍵表。

    說明

    需要Hologres Connector 1.4.2及以上版本,Hologres v2.1.0及以上版本。

  • bulk_load_on_conflict,批量COPY寫入有主鍵表時,支援處理主鍵重複的情況。Hologres主鍵表的批量資料匯入預設會觸發表鎖,限制了多個串連同時進行並發寫入的能力。當前Connector支援根據目標Hologres表的Distribution Key對資料進行重分布,使每個Spark的Task只負責寫一個Shard的資料,將原本的表鎖降低至Shard層級,實現並發寫入,提升寫入效能。由於每個串連只需要維護很少Shard的資料,此最佳化也可以顯著降低小檔案的數量,降低Hologres的記憶體使用量。測試表明,對資料進行 Repartition之後再並發寫入,相比Stream模式寫入,可以減少約67%的系統負載。

    說明

    需要Hologres Connector 1.4.2及以上版本,Hologres v2.2.25及以上版本。

  • insert,使用INSERT方式寫入。

write.copy.max_buffer_size

max_cell_buffer_size

52428800(50MB)

使用COPY模式寫入時,本地buffer的最大長度通常無需調整,但在寫入欄位較大(如超長字串)導致buffer溢出時,可以調大。

write.copy.dirty_data_check

copy_write_dirty_data_check

false

是否進行髒資料校正。如開啟此功能,一旦出現髒資料,能夠精確定位到寫入失敗的具體行。然而,這會對寫入效能產生一定影響,因此在非排查環節,建議不予開啟。

write.on_conflict_action

write_mode

INSERT_OR_REPLACE

當INSERT目標表為有主鍵的表時,採用不同策略:

  • INSERT_OR_IGNORE當主鍵衝突時,不寫入。

  • INSERT_OR_UPDATE當主鍵衝突時,更新相應列。

  • INSERT_OR_REPLACE當主鍵衝突時,更新所有列。

以下參數僅write.modeinsert時生效。

參數名

參數曾用名

預設值

是否必填

說明

write.insert.dynamic_partition

dynamic_partition

false

copy_write_modeinsert時生效。true表示寫入分區表父表時,自動建立不存在的分區。

write.insert.batch_size

write_batch_size

512

每個寫入線程的最大批次大小。在經過WriteMode合并後的Put數量達到WriteBatchSize時進行一次批量提交。

write.insert.batch_byte_size

write_batch_byte_size

2097152(2 * 1024 * 1024)

每個寫入線程的最大批次大小,單位為Byte,預設2MB。在經過WriteMode合并後的Put資料位元組數達到WriteBatchByteSize時進行一次批量提交

write.insert.max_interval_ms

write_max_interval_ms

10000

距離上次提交超過WriteMaxIntervalMs的時間會觸發一次批量提交。

write.insert.thread_size

write_thread_size

1

寫入並發線程數(每個並發佔用1個資料庫連接)。

讀取參數

參數名

參數曾用名

(1.5.0及之前版本)

預設值

是否必填

說明

read.mode

bulk_read

auto

讀取的模式,取值如下:

  • auto(預設值)。Hologres Connector會根據版本和目標表的元資訊自動選擇最佳的模式,選擇邏輯如下:

    1. 如果讀取的欄位中包含JSONB類型,選擇select模式。

    2. 如果執行個體版本高於3.0.24,選擇bulk_read_compressed模式。

    3. 其他情況,選擇bulk_read模式。

  • bulk_read,使用COPY OUT的方式以arrow格式讀取資料,效能是select模式的數倍以上。暫不支援讀取Hologres中的JSONB類型。

  • bulk_read_compressed,使用COPY OUT的方式讀取壓縮過的arrow格式資料,相比壓縮前可以節省約45%的頻寬。

  • select,使用普通的SELECT方式讀取。

read.max_task_count

max_partition_count

80

將讀取的Hologres表分為多個,每個分區對應一個Spark Task。如果Hologres表的ShardCount小於此參數,分區數量最多為ShardCount。

read.copy.max_buffer_size

/

52428800(50MB)

使用COPY模式讀取時,本地Buffer的最大長度,在欄位較大時出現異常應調大長度。

read.push_down_predicate

push_down_predicate

true

是否進行謂詞下推,例如在查詢時應用的一些過濾條件。目前支援常見Filter過濾條件的下推,以及列裁剪。

read.push_down_limit

push_down_limit

true

是否進行Limit下推。

read.select.batch_size

scan_batch_size

256

read_mode設定為select時生效。讀取Hologres時,Scan操作一次Fetch的行數.

read.select.timeout_seconds

scan_timeout_seconds

60

read_mode設定為select時生效。讀取Hologres時,Scan操作的逾時時間。

read.query

query

使用傳入的query去讀取Hologres,此參數與table參數二者只能設定一個。

說明
  • 使用query方式讀取時,只能單Task讀取。且不支援謂詞下推。

  • 使用table方式讀取時,會根據Hologres表的ShardCount分為多個Task並發讀取。

資料類型映射

Spark類型

Hologres類型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

串連數計算

Hologres-Connector-Spark在進行讀寫時,會使用一定的JDBC串連數。可能受到如下因素影響:

  • Spark的並發,在作業運行時在Spark UI處可以看到的同時啟動並執行Task數量。

  • Connector每個並發使用的串連數:

    • COPY方式寫入,每個並發僅使用一個JDBC串連。

    • INSERT方式寫入,每個並發會使用write_thread_size個JDBC串連。

    • 讀取時,每個並發使用一個JDBC串連。

  • 其他方面可能使用的串連數:作業啟動時,將執行Schema擷取等操作,可能會短暫建立一個串連。

因此作業使用的總的串連數可以通過如下公式計算:

工作項目

使用串連數

Catalog查詢中繼資料

1

讀取資料

parallelism * 1 + 1

寫入COPY模式

parallelism * 1 + 1

寫入INSERT模式

parallelism * write_thread_size + 1

以上串連數計算假設Spark可同時啟動並執行Task數大於任務產生Task數。

Spark同時可以啟動並執行Task並發可能受到使用者佈建的參數影響,如spark.executor.instances,也可能受到Hadoop對檔案分塊策略的影響,詳情請參見Apache Hadoop