Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar. Hologres terintegrasi secara efisien dengan Spark komunitas dan EMR Serverless Spark guna membangun gudang data dengan cepat. Konektor Spark Hologres mendukung pembuatan katalog Hologres dalam kluster Spark, memungkinkan pembacaan batch berkinerja tinggi dan impor melalui tabel eksternal yang mengungguli JDBC asli.
Limitations
Konektor Spark memerlukan Hologres versi 1.3 atau lebih baru. Anda dapat memeriksa versi Instans Anda di halaman Instance Details di konsol Hologres. Jika Instans Anda menggunakan versi sebelum 1.3, tingkatkan instans Anda atau bergabung dengan kelompok DingTalk Hologres (ID: 32314975) untuk meminta peningkatan.
Prerequisites
Gunakan lingkungan Spark yang mendukung perintah
spark-sql,spark-shell, ataupyspark. Disarankan menggunakan Spark 3.3.0 atau lebih baru untuk menghindari masalah dependensi dan memperoleh akses ke fitur tambahan.Anda dapat menggunakan Alibaba Cloud EMR Spark untuk menyiapkan lingkungan Spark secara cepat dan menghubungkannya ke instans Hologres. Untuk informasi selengkapnya, lihat EMR Spark features.
Atau, Anda dapat menyiapkan lingkungan Spark mandiri. Untuk informasi selengkapnya, lihat Apache Spark.
Untuk membaca dari dan menulis ke Hologres dengan Spark, Anda memerlukan konektor
hologres-connector-spark-3.x. Topik ini menggunakan versi 1.5.2 sebagai contoh, yang dapat diunduh dari Maven Central Repository. Konektor ini bersifat open source. Untuk informasi selengkapnya, lihat Hologres-Connectors.Untuk mengembangkan dan melakukan debugging lokal pekerjaan Spark dalam Java menggunakan IDE seperti IntelliJ IDEA, tambahkan dependensi Maven berikut ke file pom.xml Anda.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres catalog
Konektor Hologres versi 1.5.2 dan yang lebih baru mendukung katalog Hologres, sehingga Anda dapat menggunakan tabel eksternal untuk membaca dari dan menulis ke Hologres.
Setiap katalog Hologres di Spark dipetakan ke satu database di Hologres. Setiap namespace di katalog Hologres dipetakan ke skema di database yang sesuai. Bagian berikut menjelaskan cara menggunakan katalog Hologres di Spark.
Katalog Hologres tidak mendukung pembuatan tabel.
Topik ini menggunakan database dan tabel berikut di instans Hologres:
test_db -- Database
public.test_table1 -- Tabel di skema public
public.test_table2
test_schema.test_table3 -- Tabel di skema test_schema Initialize a Hologres catalog
Jalankan spark-sql di kluster Spark, muat konektor Hologres, lalu tentukan parameter katalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbHologres catalog commands
Load a Hologres Catalog
Katalog Hologres di Spark dipetakan ke database Hologres. Pemetaan ini tetap selama sesi berlangsung.
USE hologres_external_test_db;Query all namespaces
Namespace di Spark dipetakan ke skema di Hologres. Skema default adalah public. Gunakan perintah
USEuntuk mengubah skema default.-- Lihat semua namespace di Katalog Hologres, yang sesuai dengan skema di database Hologres. SHOW NAMESPACES;Query tables in a namespace
Query all tables
SHOW TABLES;Query tables in a specific namespace
USE test_schema; SHOW TABLES; -- Atau, gunakan pernyataan berikut. SHOW TABLES IN test_schema;
Read from and write to a table
Gunakan pernyataan SELECT dan INSERT untuk membaca dari dan menulis ke tabel eksternal.
-- Baca dari tabel. SELECT * FROM public.test_table1; -- Tulis ke tabel. INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
Import data to Hologres
Data uji pada bagian ini berasal dari tabel customer dalam dataset TPC-H. Spark dapat membaca data dari file CSV dan menuliskannya ke tabel Hologres. Anda dapat mengunduh data sampel customer. Pernyataan SQL berikut membuat customer_holo_table.
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);Import using Spark-SQL
Di Spark-SQL, penggunaan katalog untuk memuat metadata tabel Hologres lebih praktis. Anda juga dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.
Versi konektor Spark Hologres sebelum 1.5.2 tidak mendukung katalog. Anda hanya dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.
Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat parameters.
Initialize a Hologres catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbImport data from a CSV source to a Hologres table.
CatatanSintaks INSERT INTO di Spark tidak mendukung penggunaan
column_listuntuk menentukan subset kolom. Misalnya, Anda tidak dapat menggunakanINSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTableuntuk menulis data hanya ke bidang c_custkey.Jika Anda ingin menulis data ke bidang tertentu, gunakan pernyataan
CREATE TEMPORARY VIEWuntuk mendeklarasikan tampilan sementara Hologres yang hanya berisi bidang yang diperlukan.Using a catalog
-- Muat katalog Hologres. USE hologres_external_test_db; -- Buat sumber data CSV. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- Untuk pengujian lokal, gunakan jalur mutlak ke file. ); -- Tulis data dari tabel CSV ke Hologres. INSERT INTO public.customer_holo_table SELECT * FROM csvTable;Using a temporary view
-- Buat sumber data CSV. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- Buat tampilan sementara Hologres. CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
Import using a DataFrame
Anda dapat menggunakan tool seperti spark-shell atau pyspark untuk mengembangkan pekerjaan Spark dan memanggil API write guna menulis data. Pekerjaan tersebut membaca data dari file CSV, mengonversinya menjadi DataFrame, lalu menulis DataFrame tersebut ke instans Hologres. Bagian berikut menyediakan contoh kode untuk berbagai bahasa pemrograman. Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat parameters.
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// Skema sumber CSV.
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// Baca data dari file CSV ke dalam DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// Tulis DataFrame ke Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// Skema sumber CSV.
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// Jalankan dalam mode lokal.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// Baca data dari file CSV ke dalam DataFrame.
// Untuk pengujian lokal, gunakan jalur mutlak ke data customer.
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// Tulis DataFrame ke Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save();
}
}Tambahkan dependensi berikut ke file pom.xml Anda.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
from pyspark.sql.types import *
# Skema sumber CSV.
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# Baca data dari file CSV ke dalam DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# Tulis DataFrame ke Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()Untuk menjalankan pekerjaan Spark dalam berbagai bahasa, ikuti langkah-langkah berikut:
Scala
Gunakan kode contoh untuk membuat file
sparktest.scaladan jalankan perintah berikut untuk mengeksekusi pekerjaan.-- Muat dependensi. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- Untuk pengujian lokal, gunakan jalur mutlak untuk memuat file. scala> :load D:/sparktest.scalaAtau, Anda dapat menempelkan kode contoh langsung ke shell setelah dependensi dimuat.
Java
Gunakan tool pengembangan untuk mengimpor kode contoh dan mengemasnya dengan Maven. Misalnya, jika file JAR keluaran adalah
spark_test.jar, jalankan perintah berikut untuk mengeksekusi pekerjaan.-- Gunakan jalur mutlak ke paket JAR pekerjaan. spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jarPython
Setelah menjalankan perintah berikut, Anda dapat menempelkan kode contoh langsung ke shell untuk mengeksekusinya.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Read data from Hologres
Mulai dari versi 1.3.2, konektor Spark mendukung pembacaan data dari Hologres. Dibandingkan dengan
jdbc-connectorSpark default,spark-connectormemberikan kinerja lebih baik dengan membaca data secara paralel berdasarkan shard tabel Hologres. Paralelisme baca bergantung pada jumlah shard di tabel.spark-connectordapat membatasi paralelisme menggunakan parameterread.max_task_count. Pekerjaan akhirnya menghasilkanMin(shardCount, max_task_count)tugas baca. Fitur ini juga mendukung inferensi skema. Jika Anda tidak menyediakan skema, konektor akan menginferensi skema Spark dari skema tabel Hologres.Mulai dari versi konektor Spark 1.5.0, pembacaan data dari tabel Hologres mendukung predicate pushdown, LIMIT pushdown, dan column pruning. Anda juga dapat menggunakan
SELECT QUERYHologres untuk membaca data. Versi ini memperkenalkan mode baca batch, yang meningkatkan kinerja baca hingga 3–4 kali lipat dibandingkan versi sebelumnya.
Read data using Spark SQL
Saat menggunakan Spark SQL, Anda dapat memuat metadata tabel Hologres menggunakan katalog. Atau, Anda dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.
Versi konektor Spark Hologres sebelum 1.5.2 tidak mendukung katalog. Anda hanya dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.
Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat Parameters.
Initialize a Hologres catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_dbRead data from Hologres.
Read data using a catalog.
-- Muat katalog Hologres. USE hologres_external_test_db; -- Baca data dari tabel Hologres. Field pruning dan predicate pushdown didukung. SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10;Read data by creating a temporary table.
Table
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", -- Jumlah maksimum tugas untuk membaca dari tabel Hologres. table "customer_holo_table" ); -- Field pruning dan predicate pushdown didukung. SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;Query
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "SELECT c_custkey,c_name,c_phone FROM customer_holo_table WHERE c_custkey < 500 LIMIT 10" ); SELECT * FROM hologresTable LIMIT 5;
Read Hologres data into a DataFrame
Saat mengembangkan pekerjaan Spark menggunakan tool seperti spark-shell atau pyspark, Anda dapat memanggil API read Spark untuk memuat data ke dalam DataFrame. Contoh berikut menunjukkan cara membaca data dari tabel Hologres ke dalam DataFrame dalam berbagai bahasa pemrograman. Untuk informasi selengkapnya tentang parameter konektor Spark Hologres, lihat Parameters.
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Jumlah maksimum tugas untuk membaca dari tabel Hologres.
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// Jalankan dalam mode lokal.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // Jumlah maksimum tugas untuk membaca dari tabel Hologres.
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}Tambahkan dependensi berikut ke file Maven pom.xml Anda.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)Untuk menjalankan pekerjaan Spark dalam berbagai bahasa pemrograman:
Scala
Anda dapat menggunakan kode contoh untuk membuat file
sparkselect.scaladan menjalankan pekerjaan dengan perintah berikut.-- Muat dependensi. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- Untuk pengujian lokal, gunakan jalur mutlak untuk memuat file. scala> :load D:/sparkselect.scalaAtau, setelah dependensi dimuat, Anda dapat menempelkan kode contoh langsung ke shell untuk menjalankannya.
Java
Anda dapat menggunakan tool pengembangan untuk mengimpor kode contoh dan mengemasnya menggunakan Maven. Misalnya, jika JAR yang dikemas bernama
spark_select.jar, jalankan pekerjaan dengan perintah berikut.-- Gunakan jalur mutlak untuk JAR pekerjaan. spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jarPython
Setelah menjalankan perintah berikut, Anda dapat menempelkan kode contoh langsung ke shell untuk menjalankannya.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Parameters
General parameters
Parameter | Default | Required | Description |
username | None | Yes |
|
password | None | Yes |
|
table | None | Yes | Nama tabel Hologres yang akan dibaca atau ditulis. Catatan Saat membaca data, Anda dapat menggunakan parameter |
jdbcurl | None | Yes | URL JDBC API data real-time Hologres, dalam format |
enable_serverless_computing | false | No | Menentukan apakah akan menggunakan resource komputasi tanpa server. Parameter ini hanya berlaku untuk operasi baca dan operasi tulis dalam mode |
serverless_computing_query_priority | 3 | No | Prioritas eksekusi untuk Komputasi Tanpa Server. |
statement_timeout_seconds | 28800 (8 jam) | No | Durasi timeout eksekusi kueri, dalam detik. |
retry_count | 3 | No | Jumlah percobaan ulang jika koneksi gagal. |
direct_connect | Koneksi langsung digunakan secara default jika didukung. | No | Menentukan apakah akan terhubung langsung ke FrontEnd Hologres (node akses). Throughput jaringan titik akhir sering menjadi bottleneck untuk operasi data batch. Secara default, konektor menggunakan koneksi langsung jika tersedia untuk meningkatkan throughput. Tetapkan nilai ini ke |
Write parameters
Konektor Hologres mendukung parameter Spark SaveMode. Untuk SQL, ini sesuai dengan INSERT INTO atau INSERT OVERWRITE. Untuk DataFrame, Anda dapat mengatur SaveMode ke Append atau Overwrite saat menulis data. Mode Overwrite membuat tabel sementara untuk operasi tulis dan mengganti tabel asli setelah berhasil. Gunakan mode ini hanya jika diperlukan.
Parameter | Previous name | Default | Required | Description |
write.mode | copy_write_mode | auto | No | Mode tulis. Untuk perbandingan mode tulis, lihat Batch write modes. Nilai yang valid:
|
write.copy.max_buffer_size | max_cell_buffer_size | 52428800 (50 MB) | No | Ukuran maksimum buffer lokal saat menulis dalam mode |
write.copy.dirty_data_check | copy_write_dirty_data_check | false | No | Menentukan apakah akan memeriksa data kotor. Jika diaktifkan, fitur ini dapat mengidentifikasi baris yang gagal ditulis secara tepat. Namun, hal ini memengaruhi kinerja tulis. Nonaktifkan fitur ini kecuali untuk troubleshooting. |
write.on_conflict_action | INSERT_OR_REPLACE | INSERT_OR_REPLACE | No | Aksi yang diambil ketika operasi tulis menghadapi konflik primary key di tabel tujuan.
|
Parameter berikut hanya berlaku ketika write.mode diatur ke insert.
Parameter | Previous name | Default | Required | Description |
write.insert.dynamic_partition | dynamic_partition | false | No | Ketika |
write.insert.batch_size | write_batch_size | 512 | No | Ukuran batch maksimum untuk setiap thread tulis. Commit batch dipicu ketika jumlah operasi |
write.insert.batch_byte_size | write_batch_byte_size | 2097152 (2 MB) | No | Ukuran batch maksimum dalam byte untuk setiap thread tulis. Nilai default adalah 2 MB. Commit batch dipicu ketika ukuran byte data |
write.insert.max_interval_ms | write_max_interval_ms | 10000 | No | Commit batch dipicu jika waktu yang berlalu sejak commit terakhir melebihi nilai ini. |
write.insert.thread_size | write_thread_size | 1 | No | Jumlah thread tulis konkuren. Setiap thread menggunakan satu koneksi database. |
write.rps_limit | None | -1 | No | Batas laju tulis per tugas, dalam baris per detik (RPS). Nilai default -1 menunjukkan tidak ada batas. |
Read parameters
Parameter | Previous name (v1.5.0 and earlier) | Default | Required | Description |
read.mode | bulk_read | auto | No | Mode baca. Nilai yang valid:
|
read.max_task_count | max_partition_count | 80 | No | Menentukan jumlah maksimum tugas konkuren untuk membaca data. Konektor membagi tabel menjadi beberapa partisi, dan setiap partisi diproses oleh satu tugas Spark. Jika jumlah shard tabel kurang dari nilai ini, jumlah partisi dibatasi pada jumlah shard. |
read.copy.max_buffer_size | / | 52428800 (50 MB) | No | Ukuran maksimum buffer lokal saat membaca dalam mode |
read.push_down_predicate | push_down_predicate | true | No | Menentukan apakah akan mengaktifkan predicate pushdown. Saat diaktifkan, operasi seperti kondisi filter dan column pruning didorong ke sumber data. |
read.push_down_limit | push_down_limit | true | No | Menentukan apakah akan mengaktifkan |
read.select.batch_size | scan_batch_size | 256 | No | Berlaku saat |
read.select.timeout_seconds | scan_timeout_seconds | 60 | No | Berlaku saat |
read.query | query | None | No | Gunakan Catatan
|
read.split.strategy | None | auto | No | Menentukan strategi untuk membagi data tabel menjadi beberapa tugas untuk pembacaan paralel. Nilai yang valid:
Catatan Didukung di konektor v1.6.1 dan yang lebih baru. |
read.split.column | None | None | No | Nama kolom split. Parameter ini diperlukan saat |
read.split.lower_bound | None | None | No | Batas bawah split diperlukan saat |
read.split.upper_bound | None | None | No | Batas atas split menentukan batas atas untuk shard berbasis rentang. Parameter ini diperlukan saat |
read.split.num | None | None | No | Jumlah split. Parameter ini diperlukan saat |
Data type mapping
Spark type | Hologres type |
ShortType | SMALLINT |
IntegerType | INT |
LongType | BIGINT |
StringType | TEXT |
StringType | JSON |
StringType | JSONB |
DecimalType | NUMERIC(38, 18) |
BooleanType | BOOL |
DoubleType | DOUBLE PRECISION |
FloatType | FLOAT |
TimestampType | TIMESTAMPTZ |
DateType | DATE |
BinaryType | BYTEA |
BinaryType | ROARINGBITMAP |
ArrayType(IntegerType) | INT4[] |
ArrayType(LongType) | INT8[] |
ArrayType(FloatType) | FLOAT4[] |
ArrayType(DoubleType) | FLOAT8[] |
ArrayType(BooleanType) | BOOLEAN[] |
ArrayType(StringType) | TEXT[] |
Connection count calculation
Hologres-Connector-Spark menggunakan sejumlah koneksi JDBC tertentu untuk operasi baca dan tulis. Jumlah koneksi tergantung pada faktor-faktor berikut:
Paralelisme Spark, yaitu jumlah tugas konkuren yang berjalan selama eksekusi pekerjaan, yang terlihat di Spark UI.
Jumlah koneksi yang digunakan per tugas:
Saat menulis dalam mode COPY, setiap tugas menggunakan satu koneksi JDBC.
Saat menulis dalam mode INSERT, setiap tugas menggunakan
write_thread_sizekoneksi JDBC.Saat membaca data, setiap tugas menggunakan satu koneksi JDBC.
Operasi lain: Pekerjaan mungkin sementara menggunakan satu koneksi untuk tugas seperti pengambilan skema saat dimulai.
Anda dapat menghitung total jumlah koneksi untuk pekerjaan dengan rumus berikut:
Item | Connections |
Querying metadata from the catalog | 1 |
Reading data | parallelism * 1 + 1 |
Writing in COPY mode | parallelism * 1 + 1 |
Writing in INSERT mode | parallelism * write_thread_size + 1 |
Perhitungan ini mengasumsikan bahwa kapasitas Spark untuk tugas konkuren melebihi jumlah tugas yang dihasilkan pekerjaan.
Kapasitas Spark untuk tugas konkuren tergantung pada parameter yang dikonfigurasi pengguna, seperti spark.executor.instances, dan kebijakan pemisahan blok file Hadoop. Untuk informasi selengkapnya, lihat Apache Hadoop.
Spark Connector release notes
Version | Release date | New features | Bug fixes |
1.6.1 | 2026-02 |
|
|
1.6.0 | 2025-12 |
|
|
1.5.6 | 2025-11 |
|
|
1.5.5 | 2025-10 |
|
|
1.5.4 | 2025-09 |
|
|
1.5.2 | 2025-08 |
|
|
1.5.0 | 2025-06 |
|
|
1.4.2 | 2025-04 |
|
|
1.3.2 | 2025-02 |
|
|
1.3.0 | 2025-01 |
|
|