All Products
Search
Document Center

Hologres:Baca dan tulis Hologres dengan Spark

Last Updated:Apr 22, 2026

Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar. Hologres terintegrasi secara efisien dengan Spark komunitas dan EMR Serverless Spark guna membangun gudang data dengan cepat. Konektor Spark Hologres mendukung pembuatan katalog Hologres dalam kluster Spark, memungkinkan pembacaan batch berkinerja tinggi dan impor melalui tabel eksternal yang mengungguli JDBC asli.

Limitations

Konektor Spark memerlukan Hologres versi 1.3 atau lebih baru. Anda dapat memeriksa versi Instans Anda di halaman Instance Details di konsol Hologres. Jika Instans Anda menggunakan versi sebelum 1.3, tingkatkan instans Anda atau bergabung dengan kelompok DingTalk Hologres (ID: 32314975) untuk meminta peningkatan.

Prerequisites

  • Gunakan lingkungan Spark yang mendukung perintah spark-sql, spark-shell, atau pyspark. Disarankan menggunakan Spark 3.3.0 atau lebih baru untuk menghindari masalah dependensi dan memperoleh akses ke fitur tambahan.

    • Anda dapat menggunakan Alibaba Cloud EMR Spark untuk menyiapkan lingkungan Spark secara cepat dan menghubungkannya ke instans Hologres. Untuk informasi selengkapnya, lihat EMR Spark features.

    • Atau, Anda dapat menyiapkan lingkungan Spark mandiri. Untuk informasi selengkapnya, lihat Apache Spark.

  • Untuk membaca dari dan menulis ke Hologres dengan Spark, Anda memerlukan konektor hologres-connector-spark-3.x. Topik ini menggunakan versi 1.5.2 sebagai contoh, yang dapat diunduh dari Maven Central Repository. Konektor ini bersifat open source. Untuk informasi selengkapnya, lihat Hologres-Connectors.

  • Untuk mengembangkan dan melakukan debugging lokal pekerjaan Spark dalam Java menggunakan IDE seperti IntelliJ IDEA, tambahkan dependensi Maven berikut ke file pom.xml Anda.

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.5.2</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

Hologres catalog

Konektor Hologres versi 1.5.2 dan yang lebih baru mendukung katalog Hologres, sehingga Anda dapat menggunakan tabel eksternal untuk membaca dari dan menulis ke Hologres.

Setiap katalog Hologres di Spark dipetakan ke satu database di Hologres. Setiap namespace di katalog Hologres dipetakan ke skema di database yang sesuai. Bagian berikut menjelaskan cara menggunakan katalog Hologres di Spark.

Catatan

Katalog Hologres tidak mendukung pembuatan tabel.

Topik ini menggunakan database dan tabel berikut di instans Hologres:

test_db -- Database
  public.test_table1 -- Tabel di skema public
  public.test_table2
  test_schema.test_table3  -- Tabel di skema test_schema 

Initialize a Hologres catalog

Jalankan spark-sql di kluster Spark, muat konektor Hologres, lalu tentukan parameter katalog.

spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db

Hologres catalog commands

  • Load a Hologres Catalog

    Katalog Hologres di Spark dipetakan ke database Hologres. Pemetaan ini tetap selama sesi berlangsung.

    USE hologres_external_test_db;
  • Query all namespaces

    Namespace di Spark dipetakan ke skema di Hologres. Skema default adalah public. Gunakan perintah USE untuk mengubah skema default.

    -- Lihat semua namespace di Katalog Hologres, yang sesuai dengan skema di database Hologres.
    SHOW NAMESPACES;
  • Query tables in a namespace

    • Query all tables

      SHOW TABLES;
    • Query tables in a specific namespace

      USE test_schema;
      SHOW TABLES;
      
      -- Atau, gunakan pernyataan berikut. 
      SHOW TABLES IN test_schema;
  • Read from and write to a table

    Gunakan pernyataan SELECT dan INSERT untuk membaca dari dan menulis ke tabel eksternal.

    -- Baca dari tabel.
    SELECT * FROM public.test_table1;
    
    -- Tulis ke tabel.
    INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;

Import data to Hologres

Data uji pada bagian ini berasal dari tabel customer dalam dataset TPC-H. Spark dapat membaca data dari file CSV dan menuliskannya ke tabel Hologres. Anda dapat mengunduh data sampel customer. Pernyataan SQL berikut membuat customer_holo_table.

CREATE TABLE customer_holo_table
(
  c_custkey    BIGINT ,
  c_name       TEXT   ,
  c_address    TEXT   ,
  c_nationkey  INT    ,
  c_phone      TEXT   ,
  c_acctbal    DECIMAL(15,2) ,
  c_mktsegment TEXT   ,
  c_comment    TEXT
);

Import using Spark-SQL

Di Spark-SQL, penggunaan katalog untuk memuat metadata tabel Hologres lebih praktis. Anda juga dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.

Catatan
  • Versi konektor Spark Hologres sebelum 1.5.2 tidak mendukung katalog. Anda hanya dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.

  • Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat parameters.

  1. Initialize a Hologres catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Import data from a CSV source to a Hologres table.

    Catatan

    Sintaks INSERT INTO di Spark tidak mendukung penggunaan column_list untuk menentukan subset kolom. Misalnya, Anda tidak dapat menggunakan INSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTable untuk menulis data hanya ke bidang c_custkey.

    Jika Anda ingin menulis data ke bidang tertentu, gunakan pernyataan CREATE TEMPORARY VIEW untuk mendeklarasikan tampilan sementara Hologres yang hanya berisi bidang yang diperlukan.

    Using a catalog

    -- Muat katalog Hologres.
    USE hologres_external_test_db;
    
    -- Buat sumber data CSV.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep "," -- Untuk pengujian lokal, gunakan jalur mutlak ke file.
    );
    
    -- Tulis data dari tabel CSV ke Hologres.
    INSERT INTO public.customer_holo_table SELECT * FROM csvTable;

    Using a temporary view

    -- Buat sumber data CSV.
    CREATE TEMPORARY VIEW csvTable (
        c_custkey BIGINT,
        c_name STRING,
        c_address STRING,
        c_nationkey INT,
        c_phone STRING,
        c_acctbal DECIMAL(15, 2),
        c_mktsegment STRING,
        c_comment STRING)
    USING csv OPTIONS (
        path "resources/customer", sep ","
    );
    
    -- Buat tampilan sementara Hologres.
    CREATE TEMPORARY VIEW hologresTable (
        c_custkey BIGINT,
        c_name STRING,
        c_phone STRING)
    USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        table "customer_holo_table"
    );
    
    INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;

Import using a DataFrame

Anda dapat menggunakan tool seperti spark-shell atau pyspark untuk mengembangkan pekerjaan Spark dan memanggil API write guna menulis data. Pekerjaan tersebut membaca data dari file CSV, mengonversinya menjadi DataFrame, lalu menulis DataFrame tersebut ke instans Hologres. Bagian berikut menyediakan contoh kode untuk berbagai bahasa pemrograman. Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat parameters.

Scala

import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode

// Skema sumber CSV.
val schema = StructType(Array(
  StructField("c_custkey", LongType),
  StructField("c_name", StringType),
  StructField("c_address", StringType),
  StructField("c_nationkey", IntegerType),
  StructField("c_phone", StringType),
  StructField("c_acctbal", DecimalType(15, 2)),
  StructField("c_mktsegment", StringType),
  StructField("c_comment", StringType)
))

// Baca data dari file CSV ke dalam DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")

// Tulis DataFrame ke Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;

public class SparkTest {
    public static void main(String[] args) {
        // Skema sumber CSV.
        List<StructField> asList =
                Arrays.asList(
                        DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
                        DataTypes.createStructField("c_name", DataTypes.StringType, true),
                        DataTypes.createStructField("c_address", DataTypes.StringType, true),
                        DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
                        DataTypes.createStructField("c_phone", DataTypes.StringType, true),
                        DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
                        DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
                        DataTypes.createStructField("c_comment", DataTypes.StringType, true));
        StructType schema = DataTypes.createStructType(asList);

        // Jalankan dalam mode lokal.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();

        // Baca data dari file CSV ke dalam DataFrame.
        // Untuk pengujian lokal, gunakan jalur mutlak ke data customer.
        Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");

        // Tulis DataFrame ke Hologres.
        csvDf.write.format("hologres").option(
           "username", "***").option(
           "password", "***").option(
           "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
           "table", "customer_holo_table").mode(
           "append").save();
    }
}

Tambahkan dependensi berikut ke file pom.xml Anda.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

from pyspark.sql.types import *

# Skema sumber CSV.
schema = StructType([
    StructField("c_custkey", LongType()),
    StructField("c_name", StringType()),
    StructField("c_address", StringType()),
    StructField("c_nationkey", IntegerType()),
    StructField("c_phone", StringType()),
    StructField("c_acctbal", DecimalType(15, 2)),
    StructField("c_mktsegment", StringType()),
    StructField("c_comment", StringType())
])

# Baca data dari file CSV ke dalam DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')

# Tulis DataFrame ke Hologres.
csvDf.write.format("hologres").option(
    "username", "***").option(
    "password", "***").option(
    "jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
    "table", "customer_holo_table").mode(
    "append").save()

Untuk menjalankan pekerjaan Spark dalam berbagai bahasa, ikuti langkah-langkah berikut:

  • Scala

    • Gunakan kode contoh untuk membuat file sparktest.scala dan jalankan perintah berikut untuk mengeksekusi pekerjaan.

      -- Muat dependensi. 
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- Untuk pengujian lokal, gunakan jalur mutlak untuk memuat file.
      scala> :load D:/sparktest.scala
    • Atau, Anda dapat menempelkan kode contoh langsung ke shell setelah dependensi dimuat.

  • Java

    Gunakan tool pengembangan untuk mengimpor kode contoh dan mengemasnya dengan Maven. Misalnya, jika file JAR keluaran adalah spark_test.jar, jalankan perintah berikut untuk mengeksekusi pekerjaan.

    -- Gunakan jalur mutlak ke paket JAR pekerjaan.
    spark-submit --class SparkTest  --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_test.jar
  • Python

    Setelah menjalankan perintah berikut, Anda dapat menempelkan kode contoh langsung ke shell untuk mengeksekusinya.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Read data from Hologres

  • Mulai dari versi 1.3.2, konektor Spark mendukung pembacaan data dari Hologres. Dibandingkan dengan jdbc-connector Spark default, spark-connector memberikan kinerja lebih baik dengan membaca data secara paralel berdasarkan shard tabel Hologres. Paralelisme baca bergantung pada jumlah shard di tabel. spark-connector dapat membatasi paralelisme menggunakan parameter read.max_task_count. Pekerjaan akhirnya menghasilkan Min(shardCount, max_task_count) tugas baca. Fitur ini juga mendukung inferensi skema. Jika Anda tidak menyediakan skema, konektor akan menginferensi skema Spark dari skema tabel Hologres.

  • Mulai dari versi konektor Spark 1.5.0, pembacaan data dari tabel Hologres mendukung predicate pushdown, LIMIT pushdown, dan column pruning. Anda juga dapat menggunakan SELECT QUERY Hologres untuk membaca data. Versi ini memperkenalkan mode baca batch, yang meningkatkan kinerja baca hingga 3–4 kali lipat dibandingkan versi sebelumnya.

Read data using Spark SQL

Saat menggunakan Spark SQL, Anda dapat memuat metadata tabel Hologres menggunakan katalog. Atau, Anda dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.

Catatan
  • Versi konektor Spark Hologres sebelum 1.5.2 tidak mendukung katalog. Anda hanya dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.

  • Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat Parameters.

  1. Initialize a Hologres catalog.

    spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
    --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
    --conf spark.sql.catalog.hologres_external_test_db.username=*** \
    --conf spark.sql.catalog.hologres_external_test_db.password=*** \
    --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
  2. Read data from Hologres.

    • Read data using a catalog.

      -- Muat katalog Hologres.
      USE hologres_external_test_db;
      
      -- Baca data dari tabel Hologres. Field pruning dan predicate pushdown didukung.
      SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;
    • Read data by creating a temporary table.

      Table

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.max_task_count "80", -- Jumlah maksimum tugas untuk membaca dari tabel Hologres.
        table "customer_holo_table"
      );
      
      -- Field pruning dan predicate pushdown didukung.
      SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;

      Query

      CREATE TEMPORARY VIEW hologresTable
      USING hologres OPTIONS (
        jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db",
        username "***", 
        password "***", 
        read.query "SELECT c_custkey,c_name,c_phone FROM customer_holo_table WHERE c_custkey < 500 LIMIT 10"
      );
      
      SELECT * FROM hologresTable LIMIT 5;

Read Hologres data into a DataFrame

Saat mengembangkan pekerjaan Spark menggunakan tool seperti spark-shell atau pyspark, Anda dapat memanggil API read Spark untuk memuat data ke dalam DataFrame. Contoh berikut menunjukkan cara membaca data dari tabel Hologres ke dalam DataFrame dalam berbagai bahasa pemrograman. Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat Parameters.

Scala

val readDf = (
  spark.read
    .format("hologres")
    .option("username", "***")
    .option("password", "***")
    .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
    .option("table", "customer_holo_table")
    .option("read.max_task_count", "80") // Jumlah maksimum tugas untuk membaca dari tabel Hologres.
    .load()
    .filter("c_custkey < 500")
)

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class SparkSelect {
    public static void main(String[] args) {
        
        // Jalankan dalam mode lokal.
        SparkSession spark = SparkSession.builder()
                .appName("Spark CSV Example")
                .master("local[*]") 
                .getOrCreate();
                
        Dataset<Row> readDf = (
           spark.read
                .format("hologres")
                .option("username", "***")
                .option("password", "***")
                .option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
                .option("table", "customer_holo_table")
                .option("read.max_task_count", "80") // Jumlah maksimum tugas untuk membaca dari tabel Hologres.
                .load()
                .filter("c_custkey < 500")
        );
        readDf.select("c_custkey", "c_name", "c_phone").show(10);
    }
}

Tambahkan dependensi berikut ke file Maven pom.xml Anda.

<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.13</artifactId>
      <version>3.5.4</version>
      <scope>provided</scope>
</dependency>

Python

readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()

readDf.select("c_custkey", "c_name", "c_phone").show(10)

Untuk menjalankan pekerjaan Spark dalam berbagai bahasa pemrograman:

  • Scala

    • Anda dapat menggunakan kode contoh untuk membuat file sparkselect.scala dan menjalankan pekerjaan dengan perintah berikut.

      -- Muat dependensi.
      spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
      
      -- Untuk pengujian lokal, gunakan jalur mutlak untuk memuat file.
      scala> :load D:/sparkselect.scala
    • Atau, setelah dependensi dimuat, Anda dapat menempelkan kode contoh langsung ke shell untuk menjalankannya.

  • Java

    Anda dapat menggunakan tool pengembangan untuk mengimpor kode contoh dan mengemasnya menggunakan Maven. Misalnya, jika JAR yang dikemas bernama spark_select.jar, jalankan pekerjaan dengan perintah berikut.

    -- Gunakan jalur mutlak untuk JAR pekerjaan.
    spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar  D:\spark_select.jar
  • Python

    Setelah menjalankan perintah berikut, Anda dapat menempelkan kode contoh langsung ke shell untuk menjalankannya.

    pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar

Parameters

General parameters

Parameter

Default

Required

Description

username

None

Yes

password

None

Yes

  • Secret AccessKey yang sesuai dengan ID AccessKey Anda. Untuk informasi selengkapnya, lihat Create an AccessKey.

  • Atau, kata sandi untuk akun tersebut.

table

None

Yes

Nama tabel Hologres yang akan dibaca atau ditulis.

Catatan

Saat membaca data, Anda dapat menggunakan parameter read.query sebagai gantinya.

jdbcurl

None

Yes

URL JDBC API data real-time Hologres, dalam format jdbc:postgresql://<host>:<port>/<db_name>. Untuk menemukan detail ini, buka Hologres console, klik Instances di panel navigasi kiri, pilih instans target Anda, dan temukan host serta nomor port di bagian Network Information pada halaman Instance Details.

enable_serverless_computing

false

No

Menentukan apakah akan menggunakan resource komputasi tanpa server. Parameter ini hanya berlaku untuk operasi baca dan operasi tulis dalam mode bulk_load. Untuk informasi selengkapnya, lihat Serverless Computing user guide.

serverless_computing_query_priority

3

No

Prioritas eksekusi untuk Komputasi Tanpa Server.

statement_timeout_seconds

28800 (8 jam)

No

Durasi timeout eksekusi kueri, dalam detik.

retry_count

3

No

Jumlah percobaan ulang jika koneksi gagal.

direct_connect

Koneksi langsung digunakan secara default jika didukung.

No

Menentukan apakah akan terhubung langsung ke FrontEnd Hologres (node akses). Throughput jaringan titik akhir sering menjadi bottleneck untuk operasi data batch. Secara default, konektor menggunakan koneksi langsung jika tersedia untuk meningkatkan throughput. Tetapkan nilai ini ke false untuk menonaktifkan perilaku ini.

Write parameters

Konektor Hologres mendukung parameter Spark SaveMode. Untuk SQL, ini sesuai dengan INSERT INTO atau INSERT OVERWRITE. Untuk DataFrame, Anda dapat mengatur SaveMode ke Append atau Overwrite saat menulis data. Mode Overwrite membuat tabel sementara untuk operasi tulis dan mengganti tabel asli setelah berhasil. Gunakan mode ini hanya jika diperlukan.

Parameter

Previous name

Default

Required

Description

write.mode

copy_write_mode

auto

No

Mode tulis. Untuk perbandingan mode tulis, lihat Batch write modes. Nilai yang valid:

  • auto (Default): Konektor secara otomatis memilih mode optimal berdasarkan versi Hologres dan metadata tabel tujuan. Logika pemilihan adalah sebagai berikut:

    1. Jika instans Hologres v2.2.25 atau lebih baru dan tabel memiliki primary key, mode bulk_load_on_conflict digunakan.

    2. Jika instans Hologres v2.1.0 atau lebih baru dan tabel tidak memiliki primary key, mode bulk_load digunakan.

    3. Jika instans Hologres v1.3 atau lebih baru, mode stream digunakan.

    4. Dalam kasus lain, mode insert digunakan.

  • stream: Menggunakan Fixed Plan untuk mempercepat eksekusi SQL. Dalam Fixed Plan, COPY adalah fitur yang diperkenalkan di Hologres v1.3. Dibandingkan metode INSERT, COPY memberikan throughput lebih tinggi (karena sifat streaming-nya), latensi data lebih rendah, dan konsumsi memori klien berkurang (karena tidak membatch data).

    Catatan

    Memerlukan konektor Hologres v1.3.0 atau lebih baru dan Hologres v1.3.34 atau lebih baru.

  • bulk_load: Menggunakan COPY batch. Dibandingkan COPY streaming dalam Fixed Plan, COPY batch memberikan beban lebih rendah pada instans Hologres dalam kondisi RPS tinggi tetapi hanya mendukung penulisan ke tabel tanpa primary key.

    Catatan

    Memerlukan konektor Hologres v1.4.2 atau lebih baru dan Hologres v2.1.0 atau lebih baru.

  • bulk_load_on_conflict: Menggunakan COPY batch untuk menulis ke tabel dengan primary key dan dapat menangani primary key duplikat. Secara default, impor data batch ke tabel Hologres dengan primary key memicu penguncian tabel, membatasi operasi tulis konkuren dari beberapa koneksi. Konektor dapat mendistribusikan ulang data berdasarkan kunci distribusi tabel tujuan, memungkinkan setiap tugas Spark menulis ke satu shard saja. Hal ini mengurangi penguncian tabel menjadi penguncian tingkat shard, memungkinkan penulisan konkuren dan meningkatkan kinerja tulis. Karena setiap koneksi hanya menyimpan data untuk beberapa shard, optimasi ini juga secara signifikan mengurangi jumlah file kecil dan menurunkan penggunaan memori Hologres. Pengujian menunjukkan bahwa repartisi data sebelum penulisan konkuren dapat mengurangi beban sistem sekitar 67% dibandingkan mode stream.

    Catatan

    Memerlukan konektor Hologres v1.4.2 atau lebih baru dan Hologres v2.2.25 atau lebih baru.

  • insert: Menulis data menggunakan metode INSERT.

  • stage: Menggunakan tabel Stage untuk penulisan batch. Mode ini mengimpor data hampir real-time melalui stage penyimpanan sementara lalu memuatnya ke tabel tujuan menggunakan pernyataan INSERT FROM SELECT. Dalam skenario batch, mode ini menawarkan kinerja lebih baik dan beban sistem lebih rendah dibandingkan mode lain. Mode ini juga mendukung penggunaan resource komputasi tanpa server. Operasi INSERT INTO dan INSERT OVERWRITE didukung. Memerlukan Hologres v4.1.0 atau lebih baru dan konektor v1.6.1 atau lebih baru.

write.copy.max_buffer_size

max_cell_buffer_size

52428800 (50 MB)

No

Ukuran maksimum buffer lokal saat menulis dalam mode COPY. Anda biasanya tidak perlu menyesuaikan nilai ini. Namun, Anda dapat menaikkan nilai ini jika terjadi luapan buffer saat menulis field besar, seperti string yang sangat panjang.

write.copy.dirty_data_check

copy_write_dirty_data_check

false

No

Menentukan apakah akan memeriksa data kotor. Jika diaktifkan, fitur ini dapat mengidentifikasi baris yang gagal ditulis secara tepat. Namun, hal ini memengaruhi kinerja tulis. Nonaktifkan fitur ini kecuali untuk troubleshooting.

write.on_conflict_action

INSERT_OR_REPLACE

INSERT_OR_REPLACE

No

Aksi yang diambil ketika operasi tulis menghadapi konflik primary key di tabel tujuan.

  • INSERT_OR_IGNORE: Jika terjadi konflik primary key, data tidak ditulis.

  • INSERT_OR_UPDATE: Jika terjadi konflik primary key, kolom yang ditentukan diperbarui.

  • INSERT_OR_REPLACE: Jika terjadi konflik primary key, semua kolom diperbarui.

Parameter berikut hanya berlaku ketika write.mode diatur ke insert.

Parameter

Previous name

Default

Required

Description

write.insert.dynamic_partition

dynamic_partition

false

No

Ketika write.mode adalah insert, mengatur nilai ini ke true secara otomatis membuat partisi yang belum ada saat menulis ke tabel partisi induk.

write.insert.batch_size

write_batch_size

512

No

Ukuran batch maksimum untuk setiap thread tulis. Commit batch dipicu ketika jumlah operasi Put mencapai nilai ini.

write.insert.batch_byte_size

write_batch_byte_size

2097152 (2 MB)

No

Ukuran batch maksimum dalam byte untuk setiap thread tulis. Nilai default adalah 2 MB. Commit batch dipicu ketika ukuran byte data Put mencapai nilai ini.

write.insert.max_interval_ms

write_max_interval_ms

10000

No

Commit batch dipicu jika waktu yang berlalu sejak commit terakhir melebihi nilai ini.

write.insert.thread_size

write_thread_size

1

No

Jumlah thread tulis konkuren. Setiap thread menggunakan satu koneksi database.

write.rps_limit

None

-1

No

Batas laju tulis per tugas, dalam baris per detik (RPS). Nilai default -1 menunjukkan tidak ada batas.

Read parameters

Parameter

Previous name

(v1.5.0 and earlier)

Default

Required

Description

read.mode

bulk_read

auto

No

Mode baca. Nilai yang valid:

  • auto (Default): Konektor secara otomatis memilih mode optimal berdasarkan versi Hologres dan metadata tabel. Logika pemilihan adalah sebagai berikut:

    1. Jika field yang akan dibaca mencakup tipe data JSONB, mode select digunakan.

    2. Jika instans v3.0.24 atau lebih baru, mode bulk_read_compressed digunakan.

    3. Dalam kasus lain, mode bulk_read digunakan.

  • bulk_read: Menggunakan metode COPY OUT untuk membaca data dalam format arrow, yang beberapa kali lebih cepat daripada mode select. Membaca tipe JSONB dari Hologres saat ini tidak didukung.

  • bulk_read_compressed: Membaca data terkompresi dalam format Arrow menggunakan metode COPY OUT, yang menghemat sekitar 45% bandwidth dibandingkan membaca data tidak terkompresi.

  • select: Menggunakan pernyataan SELECT standar untuk membaca data.

read.max_task_count

max_partition_count

80

No

Menentukan jumlah maksimum tugas konkuren untuk membaca data. Konektor membagi tabel menjadi beberapa partisi, dan setiap partisi diproses oleh satu tugas Spark. Jika jumlah shard tabel kurang dari nilai ini, jumlah partisi dibatasi pada jumlah shard.

read.copy.max_buffer_size

/

52428800 (50 MB)

No

Ukuran maksimum buffer lokal saat membaca dalam mode COPY. Jika terjadi pengecualian karena ukuran field besar, naikkan nilai ini.

read.push_down_predicate

push_down_predicate

true

No

Menentukan apakah akan mengaktifkan predicate pushdown. Saat diaktifkan, operasi seperti kondisi filter dan column pruning didorong ke sumber data.

read.push_down_limit

push_down_limit

true

No

Menentukan apakah akan mengaktifkan LIMIT pushdown.

read.select.batch_size

scan_batch_size

256

No

Berlaku saat read.mode diatur ke select. Menentukan jumlah baris yang diambil dalam satu operasi pemindaian saat membaca dari Hologres.

read.select.timeout_seconds

scan_timeout_seconds

60

No

Berlaku saat read.mode diatur ke select. Menentukan durasi timeout untuk operasi pemindaian saat membaca dari Hologres.

read.query

query

None

No

Gunakan query yang disediakan untuk membaca dari Hologres. Parameter ini tidak dapat digunakan bersama parameter table.

Catatan
  • Saat membaca data menggunakan metode query, Anda hanya dapat menggunakan satu tugas. Predicate pushdown tidak didukung.

  • Saat membaca data menggunakan metode table, operasi baca dibagi menjadi beberapa Tugas yang berjalan secara paralel berdasarkan ShardCount tabel Hologres.

read.split.strategy

None

auto

No

Menentukan strategi untuk membagi data tabel menjadi beberapa tugas untuk pembacaan paralel. Nilai yang valid:

  • auto (default): Konektor secara otomatis memilih strategi optimal.

  • shard: Membagi data berdasarkan shard di Hologres, dan setiap tugas membaca sebagian shard. Cocok untuk tabel dengan distribusi shard yang jelas.

  • range: Membagi data berdasarkan rentang nilai kolom tertentu. Pengaturan ini memerlukan parameter read.split.column, read.split.lower_bound, read.split.upper_bound, dan read.split.num.

  • partition membagi data berdasarkan nilai kolom partisi. Opsi ini harus digunakan dengan parameter read.split.column dan read.split.num. Berlaku untuk tabel partisi atau tampilan partisi.

Catatan

Didukung di konektor v1.6.1 dan yang lebih baru.

read.split.column

None

None

No

Nama kolom split. Parameter ini diperlukan saat read.split.strategy adalah range atau partition.

read.split.lower_bound

None

None

No

Batas bawah split diperlukan saat read.split.strategy diatur ke range. Menentukan nilai batas bawah untuk pembagian rentang.

read.split.upper_bound

None

None

No

Batas atas split menentukan batas atas untuk shard berbasis rentang. Parameter ini diperlukan saat read.split.strategy diatur ke range.

read.split.num

None

None

No

Jumlah split. Parameter ini diperlukan saat read.split.strategy adalah range atau partition. Menentukan jumlah tugas yang membagi data untuk dibaca.

Data type mapping

Spark type

Hologres type

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT

StringType

JSON

StringType

JSONB

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA

BinaryType

ROARINGBITMAP

ArrayType(IntegerType)

INT4[]

ArrayType(LongType)

INT8[]

ArrayType(FloatType)

FLOAT4[]

ArrayType(DoubleType)

FLOAT8[]

ArrayType(BooleanType)

BOOLEAN[]

ArrayType(StringType)

TEXT[]

Connection count calculation

Hologres-Connector-Spark menggunakan sejumlah koneksi JDBC tertentu untuk operasi baca dan tulis. Jumlah koneksi tergantung pada faktor-faktor berikut:

  • Paralelisme Spark, yaitu jumlah tugas konkuren yang berjalan selama eksekusi pekerjaan, yang terlihat di Spark UI.

  • Jumlah koneksi yang digunakan per tugas:

    • Saat menulis dalam mode COPY, setiap tugas menggunakan satu koneksi JDBC.

    • Saat menulis dalam mode INSERT, setiap tugas menggunakan write_thread_size koneksi JDBC.

    • Saat membaca data, setiap tugas menggunakan satu koneksi JDBC.

  • Operasi lain: Pekerjaan mungkin sementara menggunakan satu koneksi untuk tugas seperti pengambilan skema saat dimulai.

Anda dapat menghitung total jumlah koneksi untuk pekerjaan dengan rumus berikut:

Item

Connections

Querying metadata from the catalog

1

Reading data

parallelism * 1 + 1

Writing in COPY mode

parallelism * 1 + 1

Writing in INSERT mode

parallelism * write_thread_size + 1

Perhitungan ini mengasumsikan bahwa kapasitas Spark untuk tugas konkuren melebihi jumlah tugas yang dihasilkan pekerjaan.

Kapasitas Spark untuk tugas konkuren tergantung pada parameter yang dikonfigurasi pengguna, seperti spark.executor.instances, dan kebijakan pemisahan blok file Hadoop. Untuk informasi selengkapnya, lihat Apache Hadoop.

Spark Connector release notes

Version

Release date

New features

Bug fixes

1.6.1

2026-02

  • Menambahkan mode tulis stage, yang menggunakan tabel stage sementara untuk meningkatkan kinerja tulis. Fitur ini memerlukan Hologres v4.1.0 atau lebih baru.

  • Menambahkan dukungan untuk operasi INSERT INTO dan INSERT OVERWRITE dalam mode stage.

  • Mengoptimalkan logika pembersihan tabel sementara untuk operasi overwrite.

  • Menambahkan parameter write.rps_limit untuk mengontrol laju tulis per tugas.

  • Memperbaiki masalah replay sinkron paksa yang terjadi saat mengeksekusi DDL selama operasi overwrite.

1.6.0

2025-12

  • Menambahkan parameter read.split.strategy untuk mendukung tiga strategi pemisahan baca: shard, range, dan partition.

  • Menambahkan dukungan untuk tipe data JSONB dalam pembacaan berbasis Arrow.

  • Menambahkan parameter write.insert.ignore_null_when_update untuk mengabaikan nilai NULL selama operasi UPDATE.

  • Menambahkan dukungan untuk pelaporan cakupan pengujian JaCoCo.

  • Memperbaiki kesalahan saat menulis nilai date sebelum tahun 1970.

1.5.6

2025-11

  • Menambahkan dukungan untuk metode autentikasi AKV4.

  • Menambahkan parameter write.copy.disable_right_join untuk mengoptimalkan kinerja tulis COPY.

  • Mengoptimalkan logika pemeriksaan tipe kolom untuk penulisan katalog.

1.5.5

2025-10

  • Menambahkan dukungan untuk menulis data presisi rendah ke tipe data presisi tinggi.

  • Menambahkan awalan seperti appname dan taskid ke log untuk menyederhanakan troubleshooting.

  • Mengaktifkan kompresi Arrow LZ4 untuk operasi COPY OUT.

  • Mengoptimalkan pemformatan dan validasi parameter.

1.5.4

2025-09

  • Menambahkan parameter remove_u0000 untuk secara otomatis menghapus karakter U+0000 dari kolom teks.

  • Merombak Hologres Catalog untuk memetakan namespace ke skema.

  • Menambahkan kelas utilitas RepartitionUtil untuk sharding data.

  • Merombak dokumentasi dan memperbarui tautan.

1.5.2

2025-08

  • Memperkenalkan Hologres Catalog, yang memungkinkan pembacaan dari dan penulisan ke Hologres menggunakan tabel eksternal.

  • Menambahkan dukungan untuk predicate pushdown dan limit pushdown.

  • Mengaktifkan mode batch bulk_read untuk operasi baca, meningkatkan kinerja hingga satu orde besaran.

  • Performa baca telah dioptimalkan.

1.5.0

2025-06

  • Menambahkan dukungan untuk predicate pushdown, limit pushdown, dan column pruning.

  • Mengaktifkan pembacaan data menggunakan kueri SELECT.

  • Mengaktifkan mode batch untuk operasi baca.

  • Memperbaiki masalah terkait baca.

1.4.2

2025-04

  • Memperkenalkan mode tulis bulk_load untuk penulisan batch ke tabel tanpa primary key.

  • Menambahkan mode bulk_load_on_conflict untuk menangani konflik primary key di tabel dengan primary key.

  • Performa tulis yang dioptimalkan.

1.3.2

2025-02

  • Menambahkan dukungan untuk membaca data dari Hologres.

  • Mengaktifkan pembacaan paralel berdasarkan shard.

  • Menambahkan dukungan untuk inferensi skema otomatis.

  • Menstabilkan fungsionalitas baca awal.

1.3.0

2025-01

  • Memperkenalkan mode tulis stream (fixed COPY).

  • Menambahkan dukungan untuk fitur fixed COPY di Hologres v1.3.

  • Mengoptimalkan throughput dan latensi tulis.