全部产品
Search
文档中心

Hologres:Baca dari dan tulis ke Hologres menggunakan Spark

更新时间:Feb 05, 2026

Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar. Hologres terintegrasi erat dengan edisi komunitas Spark dan EMR Spark untuk membantu Anda membangun gudang data secara cepat. Konektor Spark yang disediakan oleh Hologres mendukung pembuatan Hologres Catalog di kluster Spark, memungkinkan pembacaan batch berkinerja tinggi dan impor menggunakan tabel eksternal. Pendekatan ini memberikan kinerja lebih baik dibandingkan Java Database Connectivity (JDBC) asli.

Batasan

Hanya instans Hologres versi 1.3 atau lebih baru yang mendukung konektor Spark. Anda dapat melihat versi instans saat ini di halaman Instance Details pada Hologres Management Console. Jika instans Anda menjalankan versi sebelum 1.3, lakukan Instance Upgrade atau bergabunglah dengan grup komunikasi Hologres dengan mencari nomor grup DingTalk 32314975 untuk meminta peningkatan versi.

Persiapan

  • Instal lingkungan Spark yang kompatibel tempat Anda dapat menjalankan perintah `spark-sql`, `spark-shell`, atau `pyspark`. Gunakan Spark 3.3.0 atau lebih baru untuk menghindari masalah dependensi dan mengakses lebih banyak fitur.

    • Gunakan EMR Spark Alibaba Cloud untuk membangun lingkungan Spark secara cepat dan menghubungkannya ke instans Hologres Anda. Untuk informasi selengkapnya, lihat EMR Spark features.

    • Anda juga dapat membangun lingkungan Spark sesuai preferensi Anda. Untuk informasi selengkapnya, lihat Apache Spark.

  • Untuk membaca dari atau menulis ke Hologres menggunakan Spark, rujuk paket JAR konektor hologres-connector-spark-3.x. Topik ini menggunakan versi 1.5.2 dari konektor Spark. Anda dapat mengunduhnya dari Maven Central Repository. Semua sumber daya konektor bersifat open source. Untuk informasi selengkapnya, lihat Hologres-Connectors.

  • Jika Anda mengembangkan pekerjaan Spark dalam Java dan melakukan debugging secara lokal menggunakan alat seperti IntelliJ IDEA, tambahkan dependensi Maven berikut ke file pom.xml Anda.

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres Catalog

Mulai dari versi 1.5.2, konektor Spark mendukung Hologres Catalog, memungkinkan Anda membaca dari dan menulis ke Hologres menggunakan tabel eksternal secara praktis.

Di Spark, setiap Hologres Catalog berkorespondensi dengan satu database di Hologres, dan setiap namespace dalam Hologres Catalog berkorespondensi dengan satu skema dalam database tersebut. Bagian berikut menunjukkan cara menggunakan Hologres Catalog di Spark.

Catatan

Hologres Catalog tidak mendukung pembuatan tabel.

Topik ini menggunakan instans Hologres dengan nama database dan tabel sebagai berikut:

test_db --database
  public.test_table1 --table in the public schema
  public.test_table2
  test_schema.test_table3  -- table in the test_schema schema

Inisialisasi Hologres Catalog

Di kluster Spark Anda, jalankan spark-sql, muat konektor Hologres, dan tentukan parameter Catalog.

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Perintah umum untuk Hologres Catalog

  • Muat Hologres Catalog

    Hologres Catalog di Spark berkorespondensi tepat dengan satu database Hologres dan tidak dapat diubah selama penggunaan.

    USE hologres_external_test_db;
  • Kueri semua namespace.

    Namespace di Spark berkorespondensi dengan skema di Hologres. Skema default adalah `public`. Anda dapat menggunakan instruksi USE untuk mengubah skema default.

    -- View all namespaces in the Hologres Catalog. This corresponds to all schemas in Hologres.
    SHOW NAMESPACES;
  • Kueri tabel dalam namespace.

    • Kueri semua tabel.

      SHOW TABLES;
    • Kueri tabel dalam namespace tertentu.

      USE test_schema;
      SHOW TABLES;
      
      -- Or use 
      SHOW TABLES IN test_schema;
  • Baca dari dan tulis ke tabel.

    Gunakan pernyataan SELECT dan INSERT untuk membaca dari dan menulis ke tabel eksternal Hologres dalam Catalog.

    -- Read from a table.
    SELECT * FROM public.test_table1;
    
    -- Write to a table.
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Impor data ke Hologres

Bagian ini menggunakan tabel customer dari dataset TPC-H sebagai data sumber untuk Hologres. Spark dapat membaca data tabel Hologres dalam format CSV. Unduh data customer. Pernyataan SQL untuk membuat skema tabel customer adalah sebagai berikut.

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

Impor data menggunakan Spark-SQL

Saat menggunakan Spark-SQL, lebih praktis memuat metadata tabel Hologres menggunakan Catalog. Anda juga dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.

Catatan
  • Versi Hologres-Connector-Spark sebelum 1.5.2 tidak mendukung catalog. Anda harus mendeklarasikan tabel Hologres dengan membuat tabel sementara.

  • Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.

  1. Inisialisasi Hologres Catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Impor data dari tabel CSV sumber ke tabel eksternal Hologres.

    Catatan

    Sintaksis Spark INSERT INTO tidak mendukung penentuan daftar parsial kolom untuk penulisan menggunakan column_list. Misalnya, Anda tidak dapat menggunakan INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable untuk hanya menulis bidang `c_custkey`.

    Jika Anda ingin hanya menulis bidang tertentu, deklarasikan tabel sementara Hologres yang hanya berisi bidang tersebut menggunakan CREATE TEMPORARY VIEW.

    Tulis menggunakan CATALOG

    -- Load the Hologres Catalog.
    USE hologres_external_test_db;
    
    -- Create a CSV data source.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- For local testing, use the absolute path of the file.
    );
    
    -- Write data from the CSV table to Hologres.
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    Tulis menggunakan TEMPORARY VIEW

    -- Create a CSV data source.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- Create a Hologres temporary table.
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

Impor data menggunakan DataFrame

Saat Anda mengembangkan pekerjaan Spark menggunakan spark-shell, pyspark, atau alat lainnya, Anda juga dapat memanggil antarmuka `write` DataFrame untuk menulis data. Bahasa pengembangan yang berbeda mengonversi data yang dibaca dari file CSV menjadi DataFrame, lalu menuliskannya ke instans Hologres. Berikut adalah contoh kode. Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// Schema of the CSV source.
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// Read data from the CSV file into a DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// Write the DataFrame to Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // Schema of the CSV source.
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // Run in local mode.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // Read data from the CSV file into a DataFrame.
        // For local testing, use the absolute path of the customer data.
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // Write the DataFrame to Hologres.
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save()
    }
}

Konfigurasi berikut diperlukan untuk file Maven.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# Schema of the CSV source.
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# Read data from the CSV file into a DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

Untuk menjalankan pekerjaan Spark dalam bahasa yang berbeda, lakukan operasi berikut:

  • Scala

    • Anda dapat menggunakan contoh kode untuk menghasilkan file sparktest.scala dan menjalankan pekerjaan sebagai berikut.

      -- Load dependencies.
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- For local testing, use the absolute path to load the file.
      scala> :load D:/sparktest.scala
    • Anda juga dapat menempelkan contoh kode langsung setelah memuat dependensi untuk menjalankannya.

  • Java

    Anda dapat mengimpor contoh kode ke alat pengembangan Anda dan menggunakan Maven untuk membuat paketnya. Misalnya, nama paketnya bisa berupa spark_test.jar. Jalankan pekerjaan dengan kode berikut.

    -- Use the absolute path for the job JAR package.
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    Setelah menjalankan kode berikut, Anda dapat menempelkan contoh kode langsung untuk menjalankannya.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Baca data dari Hologres

  • Mulai dari versi spark-connector 1.3.2, Anda dapat membaca data dari Hologres. Dibandingkan dengan jdbc-connector default Spark, spark-connector dapat membaca data secara konkuren berdasarkan shard tabel Hologres, sehingga memberikan kinerja lebih baik. Tingkat konkurensi baca bergantung pada jumlah shard dalam tabel. spark-connector dapat dibatasi oleh parameter read.max_task_count. Pekerjaan akhirnya menghasilkan Min(shardCount, max_task_count) tugas baca. Konektor ini juga mendukung inferensi skema. Jika Anda tidak menyediakan skema, konektor akan menginferensi skema sisi Spark dari skema tabel Hologres.

  • Mulai dari versi spark-connector 1.5.0, pembacaan dari tabel Hologres mendukung predicate pushdown, LIMIT pushdown, dan column pruning. Konektor ini juga mendukung pembacaan data dengan meneruskan SELECT QUERY Hologres. Versi ini memperkenalkan pembacaan dalam mode batch, yang meningkatkan kinerja baca hingga tiga hingga empat kali lipat dibandingkan versi sebelumnya.

Baca data menggunakan Spark-SQL

Saat menggunakan Spark-SQL, lebih praktis memuat metadata tabel Hologres menggunakan Catalog. Anda juga dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.

Catatan
  • Versi Hologres-Connector-Spark sebelum 1.5.2 tidak mendukung catalog. Anda harus mendeklarasikan tabel Hologres dengan membuat tabel sementara.

  • Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.

  1. Inisialisasi Hologres Catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Baca data dari Hologres.

    • Baca data menggunakan Catalog.

      -- Load the Hologres Catalog.
      USE hologres_external_test_db;
      
      -- Read from the Hologres table. Column pruning and predicate pushdown are supported.
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • Baca data dengan membuat tabel sementara.

      CREATE TEMPORARY VIEW(table)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", // The maximum number of tasks to use for reading the Hologres table.
        table "customer_holo_table"
      );
      
      -- Column pruning and predicate pushdown are supported.
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      CREATE TEMPORARY VIEW(query)

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

Baca data Hologres ke dalam DataFrame

Saat Anda mengembangkan pekerjaan Spark menggunakan spark-shell, pyspark, atau alat lainnya, Anda dapat memanggil antarmuka Read Spark untuk membaca data ke dalam DataFrame guna perhitungan selanjutnya. Berikut adalah contoh membaca tabel Hologres ke dalam DataFrame dalam berbagai bahasa. Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // Run in local mode.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Konfigurasi berikut diperlukan untuk file Maven.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Untuk menjalankan pekerjaan Spark dalam bahasa yang berbeda, lakukan operasi berikut:

  • Scala

    • Anda dapat menggunakan contoh kode untuk menghasilkan file sparkselect.scala dan menjalankan pekerjaan sebagai berikut.

      -- Load dependencies.
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- For local testing, use the absolute path to load the file.
      scala> :load D:/sparkselect.scala
    • Anda juga dapat menempelkan contoh kode langsung setelah memuat dependensi untuk menjalankannya.

  • Java

    Anda dapat mengimpor contoh kode ke alat pengembangan Anda dan menggunakan Maven untuk membuat paketnya. Misalnya, nama paketnya bisa berupa spark_select.jar. Jalankan pekerjaan dengan kode berikut.

    -- Use the absolute path for the job JAR package.
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    Setelah menjalankan kode berikut, Anda dapat menempelkan contoh kode langsung untuk menjalankannya.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Deskripsi parameter

Parameter umum

Parameter

Nilai default

Wajib

Deskripsi

username

None

Ya

password

None

Ya

  • Rahasia AccessKey akun Anda. Untuk informasi selengkapnya, lihat Create an AccessKey.

  • Kata sandi untuk akun pengguna yang Anda buat.

table

None

Ya

Nama tabel Hologres yang akan dibaca atau ditulis.

Catatan

Saat membaca data, Anda juga dapat menggunakan parameter read.query sebagai gantinya.

jdbcurl

None

Ya

URL JDBC untuk API data real-time Hologres, dalam format jdbc:postgresql://<host>:<port>/<db_name>. Buka Hologres console, klik Instances di panel navigasi kiri, pilih instans target, lalu dapatkan host dan nomor port dari bagian Network Information pada halaman Instance Details.

enable_serverless_computing

false

Tidak

Menentukan apakah akan menggunakan sumber daya Serverless. Parameter ini hanya berlaku untuk operasi baca dan operasi tulis dalam mode bulk_load. Untuk informasi selengkapnya, lihat Serverless Computing Guide.

serverless_computing_query_priority

3

Tidak

Prioritas eksekusi untuk Serverless Computing.

statement_timeout_seconds

28800 (8 jam)

Tidak

Periode timeout untuk eksekusi kueri, dalam detik.

retry_count

3

Tidak

Jumlah percobaan ulang saat koneksi gagal.

direct_connect

Untuk lingkungan yang mendukung koneksi langsung, koneksi langsung digunakan secara default.

Tidak

Titik hambat untuk pembacaan dan penulisan data batch sering kali merupakan throughput jaringan Endpoint. Oleh karena itu, sistem menguji apakah lingkungan saat ini dapat terhubung langsung ke Frontend Hologres (node akses). Jika koneksi langsung didukung, koneksi tersebut digunakan secara default. Atur parameter ini ke false untuk menonaktifkan koneksi langsung.

Parameter tulis

Konektor Hologres mendukung parameter Spark SaveMode. Untuk SQL, ini berarti INSERT INTO atau INSERT OVERWRITE. Untuk DataFrame, ini berarti mengatur SaveMode ke Append atau Overwrite saat menulis. Overwrite membuat tabel sementara untuk penulisan dan menggantikan tabel asli setelah penulisan berhasil. Gunakan fitur ini dengan hati-hati.

Nama parameter

Nama parameter sebelumnya

Nilai default

Wajib

Deskripsi

write.mode

copy_write_mode

auto

Tidak

Mode penulisan. Nilai yang valid adalah sebagai berikut. Untuk perbandingan mode penulisan, lihat Comparison of batch write modes.

  • auto (default). Konektor secara otomatis memilih mode optimal berdasarkan versi dan metadata tabel tujuan. Logika pemilihan adalah sebagai berikut:

    1. Jika versi instans Hologres lebih baru dari V2.2.25 dan tabel memiliki primary key, mode bulk_load_on_conflict dipilih.

    2. Jika versi instans Hologres lebih baru dari V2.1.0 dan tabel tidak memiliki primary key, mode bulk_load dipilih.

    3. Jika versi instans Hologres lebih baru dari V1.3, mode stream dipilih.

    4. Dalam kasus lain, mode insert dipilih.

  • stream, yang menggunakan Fixed Plan to accelerate SQL execution. Di Fixed Plan, COPY adalah fitur yang diperkenalkan di Hologres V1.3. Dibandingkan metode INSERT, metode COPY memberikan throughput lebih tinggi karena menggunakan mode streaming, latensi data lebih rendah, dan konsumsi memori klien lebih rendah karena tidak mengakumulasi batch.

    Catatan

    Memerlukan Hologres Connector 1.3.0 atau lebih baru dan Hologres V1.3.34 atau lebih baru.

  • bulk_load, yaitu COPY batch. Dibandingkan dengan COPY streaming di Fixed Plan, COPY batch menghasilkan beban lebih rendah pada instans Hologres dalam kondisi records per second (RPS) yang sama. Namun, mode ini hanya mendukung penulisan ke tabel tanpa primary key.

    Catatan

    Memerlukan Hologres Connector 1.4.2 atau lebih baru dan Hologres V2.1.0 atau lebih baru.

  • bulk_load_on_conflict. Saat menulis ke tabel dengan primary key menggunakan COPY batch, mode ini menangani konflik primary key. Secara default, impor data batch ke tabel Hologres dengan primary key memicu penguncian tabel, yang membatasi kemampuan beberapa koneksi untuk menulis secara konkuren. Konektor mendukung redistribusi data berdasarkan kunci distribusi tabel Hologres tujuan. Hal ini memungkinkan setiap tugas Spark menulis data hanya ke satu shard, sehingga mengurangi tingkat penguncian dari tabel ke shard. Ini memungkinkan penulisan konkuren dan meningkatkan kinerja penulisan. Karena setiap koneksi hanya perlu mempertahankan data untuk beberapa shard, optimasi ini juga dapat secara signifikan mengurangi jumlah file kecil dan menurunkan penggunaan memori Hologres. Pengujian menunjukkan bahwa repartisi data sebelum penulisan konkuren dapat mengurangi beban sistem sekitar 67% dibandingkan dengan penulisan dalam mode stream.

    Catatan

    Memerlukan Hologres Connector 1.4.2 atau lebih baru dan Hologres V2.2.25 atau lebih baru.

  • insert. Menulis data menggunakan metode INSERT.

write.copy.max_buffer_size

max_cell_buffer_size

52428800 (50 MB)

Tidak

Saat menulis dalam mode COPY, biasanya Anda tidak perlu menyesuaikan panjang maksimum buffer lokal. Namun, jika terjadi luapan buffer saat menulis field besar, seperti string yang sangat panjang, Anda dapat meningkatkan nilai ini.

write.copy.dirty_data_check

copy_write_dirty_data_check

false

Tidak

Menentukan apakah akan melakukan validasi data kotor. Jika Anda mengaktifkan fitur ini, sistem dapat mengidentifikasi baris spesifik yang gagal ditulis saat data kotor ditemukan. Namun, hal ini memengaruhi kinerja penulisan. Jangan aktifkan fitur ini kecuali Anda sedang melakukan troubleshooting.

write.on_conflict_action

write_mode

INSERT_OR_REPLACE

Tidak

Kebijakan yang digunakan saat memasukkan data ke tabel dengan primary key:

  • INSERT_OR_IGNORE: Jika terjadi konflik primary key, data tidak ditulis.

  • INSERT_OR_UPDATE: Jika terjadi konflik primary key, kolom yang sesuai diperbarui.

  • INSERT_OR_REPLACE: Jika terjadi konflik primary key, semua kolom diperbarui.

Parameter berikut hanya berlaku ketika write.mode diatur ke insert.

Nama parameter

Nama parameter sebelumnya

Nilai default

Wajib

Deskripsi

write.insert.dynamic_partition

dynamic_partition

false

Tidak

Parameter ini hanya berlaku ketika copy_write_mode diatur ke insert. Jika diatur ke true, partisi yang tidak ada akan dibuat secara otomatis saat Anda menulis data ke tabel induk dari tabel partisi.

write.insert.batch_size

write_batch_size

512

Tidak

Ukuran batch maksimum untuk setiap thread penulisan. Batch dikomit saat jumlah operasi Put, setelah digabung oleh WriteMode, mencapai nilai ini.

write.insert.batch_byte_size

write_batch_byte_size

2097152 (2 * 1024 * 1024)

Tidak

Ukuran batch maksimum untuk setiap thread penulisan, dalam byte. Nilai default adalah 2 MB. Batch dikomit saat ukuran byte data Put, setelah digabung oleh WriteMode, mencapai nilai ini.

write.insert.max_interval_ms

write_max_interval_ms

10000

Tidak

Batch dikomit jika waktu sejak komit terakhir melebihi nilai ini.

write.insert.thread_size

write_thread_size

1

Tidak

Jumlah thread penulisan konkuren. Setiap thread konkuren menempati satu koneksi database.

Parameter baca

Nama parameter

Nama parameter sebelumnya

(Versi 1.5.0 dan sebelumnya)

Nilai default

Wajib

Deskripsi

read.mode

bulk_read

auto

Tidak

Mode baca. Nilai yang valid:

  • auto (default). Konektor Hologres secara otomatis memilih mode optimal berdasarkan versi dan metadata tabel tujuan. Logika pemilihan adalah sebagai berikut:

    1. Jika field yang akan dibaca mencakup tipe JSONB, mode `select` dipilih.

    2. Jika versi instans lebih baru dari 3.0.24, mode `bulk_read_compressed` dipilih.

    3. Dalam kasus lain, mode `bulk_read` dipilih.

  • bulk_read. Membaca data dalam format arrow menggunakan COPY OUT. Kinerjanya beberapa kali lebih tinggi dibandingkan mode select. Pembacaan tipe JSONB di Hologres tidak didukung.

  • bulk_read_compressed. Membaca data format arrow terkompresi menggunakan COPY OUT. Hal ini dapat menghemat sekitar 45% bandwidth dibandingkan data tidak terkompresi.

  • select. Membaca data menggunakan metode SELECT standar.

read.max_task_count

max_partition_count

80

Tidak

Membagi tabel Hologres yang akan dibaca menjadi beberapa partisi. Setiap partisi berkorespondensi dengan satu tugas Spark. Jika `ShardCount` tabel Hologres kurang dari parameter ini, jumlah partisi paling banyak sebesar `ShardCount`.

read.copy.max_buffer_size

/

52428800 (50 MB)

Tidak

Saat membaca dalam mode COPY, ini adalah panjang maksimum buffer lokal. Jika terjadi pengecualian dengan field besar, tingkatkan nilai ini.

read.push_down_predicate

push_down_predicate

true

Tidak

Menentukan apakah akan mengaktifkan predicate pushdown, seperti menerapkan kondisi filter selama kueri. Fitur ini mendukung pushdown kondisi filter umum dan column pruning.

read.push_down_limit

push_down_limit

true

Tidak

Menentukan apakah akan mengaktifkan LIMIT pushdown.

read.select.batch_size

scan_batch_size

256

Tidak

Parameter ini hanya berlaku ketika read_mode diatur ke select. Jumlah baris yang diambil sekaligus untuk operasi pemindaian saat membaca dari Hologres.

read.select.timeout_seconds

scan_timeout_seconds

60

Tidak

Parameter ini hanya berlaku ketika read_mode diatur ke select. Periode timeout untuk operasi pemindaian saat membaca dari Hologres.

read.query

query

None

Tidak

Menggunakan query yang ditentukan untuk membaca data dari Hologres. Anda dapat mengatur parameter ini atau parameter table, tetapi tidak keduanya sekaligus.

Catatan
  • Saat membaca data menggunakan metode query, hanya satu tugas yang dapat digunakan untuk pembacaan. Predicate pushdown tidak didukung.

  • Saat membaca data menggunakan metode table, operasi baca dibagi menjadi beberapa tugas untuk pembacaan konkuren berdasarkan `ShardCount` tabel Hologres.

Pemetaan tipe data

Spark Type

Tipe Hologres

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

Perhitungan koneksi

Saat Hologres-Connector-Spark membaca atau menulis data, konektor tersebut menggunakan sejumlah koneksi JDBC tertentu. Jumlah ini dapat dipengaruhi oleh faktor-faktor berikut:

  • Konkurensi Spark. Ini adalah jumlah tugas yang berjalan secara konkuren, yang dapat dilihat di Spark UI saat pekerjaan sedang berjalan.

  • Jumlah koneksi yang digunakan konektor untuk setiap tugas konkuren:

    • Untuk penulisan dalam mode COPY, setiap tugas konkuren hanya menggunakan satu koneksi JDBC.

    • Untuk penulisan dalam mode INSERT, setiap tugas konkuren menggunakan write_thread_size koneksi JDBC.

    • Untuk pembacaan, setiap tugas konkuren menggunakan satu koneksi JDBC.

  • Koneksi yang digunakan oleh aspek lain: Saat suatu pekerjaan dimulai, pekerjaan tersebut melakukan operasi seperti pengambilan skema, yang mungkin secara singkat membuat koneksi.

Oleh karena itu, jumlah total koneksi yang digunakan oleh pekerjaan dapat dihitung menggunakan rumus berikut:

Item pekerjaan

Koneksi yang digunakan

Query metadata dari Catalog

1

Baca data

parallelism × 1 + 1

Tulis dalam mode COPY

parallelism × 1 + 1

Tulis dalam mode INSERT

parallelism × write_thread_size + 1

Perhitungan koneksi di atas mengasumsikan bahwa jumlah tugas yang dapat dijalankan Spark secara konkuren lebih besar daripada jumlah tugas yang dihasilkan oleh pekerjaan.

Jumlah tugas konkuren yang dapat dijalankan Spark mungkin dipengaruhi oleh parameter yang diatur pengguna, seperti spark.executor.instances, dan juga mungkin dipengaruhi oleh kebijakan pemisahan file Hadoop. Untuk informasi selengkapnya, lihat Apache Hadoop.