Spark adalah mesin analitik terpadu untuk pemrosesan data skala besar. Hologres terintegrasi erat dengan edisi komunitas Spark dan EMR Spark untuk membantu Anda membangun gudang data secara cepat. Konektor Spark yang disediakan oleh Hologres mendukung pembuatan Hologres Catalog di kluster Spark, memungkinkan pembacaan batch berkinerja tinggi dan impor menggunakan tabel eksternal. Pendekatan ini memberikan kinerja lebih baik dibandingkan Java Database Connectivity (JDBC) asli.
Batasan
Hanya instans Hologres versi 1.3 atau lebih baru yang mendukung konektor Spark. Anda dapat melihat versi instans saat ini di halaman Instance Details pada Hologres Management Console. Jika instans Anda menjalankan versi sebelum 1.3, lakukan Instance Upgrade atau bergabunglah dengan grup komunikasi Hologres dengan mencari nomor grup DingTalk 32314975 untuk meminta peningkatan versi.
Persiapan
-
Instal lingkungan Spark yang kompatibel tempat Anda dapat menjalankan perintah `spark-sql`, `spark-shell`, atau `pyspark`. Gunakan Spark 3.3.0 atau lebih baru untuk menghindari masalah dependensi dan mengakses lebih banyak fitur.
-
Gunakan EMR Spark Alibaba Cloud untuk membangun lingkungan Spark secara cepat dan menghubungkannya ke instans Hologres Anda. Untuk informasi selengkapnya, lihat EMR Spark features.
-
Anda juga dapat membangun lingkungan Spark sesuai preferensi Anda. Untuk informasi selengkapnya, lihat Apache Spark.
-
-
Untuk membaca dari atau menulis ke Hologres menggunakan Spark, rujuk paket JAR konektor
hologres-connector-spark-3.x. Topik ini menggunakan versi 1.5.2 dari konektor Spark. Anda dapat mengunduhnya dari Maven Central Repository. Semua sumber daya konektor bersifat open source. Untuk informasi selengkapnya, lihat Hologres-Connectors. -
Jika Anda mengembangkan pekerjaan Spark dalam Java dan melakukan debugging secara lokal menggunakan alat seperti IntelliJ IDEA, tambahkan dependensi Maven berikut ke file pom.xml Anda.
<dependency> <groupId>com.alibaba.hologres</groupId> <artifactId>hologres-connector-spark-3.x</artifactId> <version>1.5.2</version> <classifier>jar-with-dependencies</classifier> </dependency>
Hologres Catalog
Mulai dari versi 1.5.2, konektor Spark mendukung Hologres Catalog, memungkinkan Anda membaca dari dan menulis ke Hologres menggunakan tabel eksternal secara praktis.
Di Spark, setiap Hologres Catalog berkorespondensi dengan satu database di Hologres, dan setiap namespace dalam Hologres Catalog berkorespondensi dengan satu skema dalam database tersebut. Bagian berikut menunjukkan cara menggunakan Hologres Catalog di Spark.
Hologres Catalog tidak mendukung pembuatan tabel.
Topik ini menggunakan instans Hologres dengan nama database dan tabel sebagai berikut:
test_db --database
public.test_table1 --table in the public schema
public.test_table2
test_schema.test_table3 -- table in the test_schema schema
Inisialisasi Hologres Catalog
Di kluster Spark Anda, jalankan spark-sql, muat konektor Hologres, dan tentukan parameter Catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \
--conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \
--conf spark.sql.catalog.hologres_external_test_db.username=*** \
--conf spark.sql.catalog.hologres_external_test_db.password=*** \
--conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db
Perintah umum untuk Hologres Catalog
-
Muat Hologres Catalog
Hologres Catalog di Spark berkorespondensi tepat dengan satu database Hologres dan tidak dapat diubah selama penggunaan.
USE hologres_external_test_db; -
Kueri semua namespace.
Namespace di Spark berkorespondensi dengan skema di Hologres. Skema default adalah `public`. Anda dapat menggunakan instruksi
USEuntuk mengubah skema default.-- View all namespaces in the Hologres Catalog. This corresponds to all schemas in Hologres. SHOW NAMESPACES; -
Kueri tabel dalam namespace.
-
Kueri semua tabel.
SHOW TABLES; -
Kueri tabel dalam namespace tertentu.
USE test_schema; SHOW TABLES; -- Or use SHOW TABLES IN test_schema;
-
-
Baca dari dan tulis ke tabel.
Gunakan pernyataan SELECT dan INSERT untuk membaca dari dan menulis ke tabel eksternal Hologres dalam Catalog.
-- Read from a table. SELECT * FROM public.test_table1; -- Write to a table. INSERT INTO test_schema.test_table3 SELECT * FROM public.test_table1;
Impor data ke Hologres
Bagian ini menggunakan tabel customer dari dataset TPC-H sebagai data sumber untuk Hologres. Spark dapat membaca data tabel Hologres dalam format CSV. Unduh data customer. Pernyataan SQL untuk membuat skema tabel customer adalah sebagai berikut.
CREATE TABLE customer_holo_table
(
c_custkey BIGINT ,
c_name TEXT ,
c_address TEXT ,
c_nationkey INT ,
c_phone TEXT ,
c_acctbal DECIMAL(15,2) ,
c_mktsegment TEXT ,
c_comment TEXT
);
Impor data menggunakan Spark-SQL
Saat menggunakan Spark-SQL, lebih praktis memuat metadata tabel Hologres menggunakan Catalog. Anda juga dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.
-
Versi Hologres-Connector-Spark sebelum 1.5.2 tidak mendukung catalog. Anda harus mendeklarasikan tabel Hologres dengan membuat tabel sementara.
-
Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.
-
Inisialisasi Hologres Catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
Impor data dari tabel CSV sumber ke tabel eksternal Hologres.
CatatanSintaksis Spark INSERT INTO tidak mendukung penentuan daftar parsial kolom untuk penulisan menggunakan
column_list. Misalnya, Anda tidak dapat menggunakanINSERT INTO hologresTable(c_custkey) SELECT c_custkey FROM csvTableuntuk hanya menulis bidang `c_custkey`.Jika Anda ingin hanya menulis bidang tertentu, deklarasikan tabel sementara Hologres yang hanya berisi bidang tersebut menggunakan
CREATE TEMPORARY VIEW.Tulis menggunakan CATALOG
-- Load the Hologres Catalog. USE hologres_external_test_db; -- Create a CSV data source. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," -- For local testing, use the absolute path of the file. ); -- Write data from the CSV table to Hologres. INSERT INTO public.customer_holo_table SELECT * FROM csvTable;Tulis menggunakan TEMPORARY VIEW
-- Create a CSV data source. CREATE TEMPORARY VIEW csvTable ( c_custkey BIGINT, c_name STRING, c_address STRING, c_nationkey INT, c_phone STRING, c_acctbal DECIMAL(15, 2), c_mktsegment STRING, c_comment STRING) USING csv OPTIONS ( path "resources/customer", sep "," ); -- Create a Hologres temporary table. CREATE TEMPORARY VIEW hologresTable ( c_custkey BIGINT, c_name STRING, c_phone STRING) USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", table "customer_holo_table" ); INSERT INTO hologresTable SELECT c_custkey,c_name,c_phone FROM csvTable;
Impor data menggunakan DataFrame
Saat Anda mengembangkan pekerjaan Spark menggunakan spark-shell, pyspark, atau alat lainnya, Anda juga dapat memanggil antarmuka `write` DataFrame untuk menulis data. Bahasa pengembangan yang berbeda mengonversi data yang dibaca dari file CSV menjadi DataFrame, lalu menuliskannya ke instans Hologres. Berikut adalah contoh kode. Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.
Scala
import org.apache.spark.sql.types._
import org.apache.spark.sql.SaveMode
// Schema of the CSV source.
val schema = StructType(Array(
StructField("c_custkey", LongType),
StructField("c_name", StringType),
StructField("c_address", StringType),
StructField("c_nationkey", IntegerType),
StructField("c_phone", StringType),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType),
StructField("c_comment", StringType)
))
// Read data from the CSV file into a DataFrame.
val csvDf = spark.read.format("csv").schema(schema).option("sep", ",").load("resources/customer")
// Write the DataFrame to Hologres.
csvDf.write
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.mode(SaveMode.Append)
.save()
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.SaveMode;
import java.util.Arrays;
import java.util.List;
public class SparkTest {
public static void main(String[] args) {
// Schema of the CSV source.
List<StructField> asList =
Arrays.asList(
DataTypes.createStructField("c_custkey", DataTypes.LongType, true),
DataTypes.createStructField("c_name", DataTypes.StringType, true),
DataTypes.createStructField("c_address", DataTypes.StringType, true),
DataTypes.createStructField("c_nationkey", DataTypes.IntegerType, true),
DataTypes.createStructField("c_phone", DataTypes.StringType, true),
DataTypes.createStructField("c_acctbal", new DecimalType(15, 2), true),
DataTypes.createStructField("c_mktsegment", DataTypes.StringType, true),
DataTypes.createStructField("c_comment", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(asList);
// Run in local mode.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
// Read data from the CSV file into a DataFrame.
// For local testing, use the absolute path of the customer data.
Dataset<Row> csvDf = spark.read().format("csv").schema(schema).option("sep", ",").load("resources/customer");
// Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
}
}
Konfigurasi berikut diperlukan untuk file Maven.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
from pyspark.sql.types import *
# Schema of the CSV source.
schema = StructType([
StructField("c_custkey", LongType()),
StructField("c_name", StringType()),
StructField("c_address", StringType()),
StructField("c_nationkey", IntegerType()),
StructField("c_phone", StringType()),
StructField("c_acctbal", DecimalType(15, 2)),
StructField("c_mktsegment", StringType()),
StructField("c_comment", StringType())
])
# Read data from the CSV file into a DataFrame.
csvDf = spark.read.csv("resources/customer", header=False, schema=schema, sep=',')
# Write the DataFrame to Hologres.
csvDf.write.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").mode(
"append").save()
Untuk menjalankan pekerjaan Spark dalam bahasa yang berbeda, lakukan operasi berikut:
-
Scala
-
Anda dapat menggunakan contoh kode untuk menghasilkan file sparktest.scala dan menjalankan pekerjaan sebagai berikut.
-- Load dependencies. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- For local testing, use the absolute path to load the file. scala> :load D:/sparktest.scala -
Anda juga dapat menempelkan contoh kode langsung setelah memuat dependensi untuk menjalankannya.
-
-
Java
Anda dapat mengimpor contoh kode ke alat pengembangan Anda dan menggunakan Maven untuk membuat paketnya. Misalnya, nama paketnya bisa berupa spark_test.jar. Jalankan pekerjaan dengan kode berikut.
-- Use the absolute path for the job JAR package. spark-submit --class SparkTest --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_test.jar -
Python
Setelah menjalankan kode berikut, Anda dapat menempelkan contoh kode langsung untuk menjalankannya.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Baca data dari Hologres
-
Mulai dari versi spark-connector 1.3.2, Anda dapat membaca data dari Hologres. Dibandingkan dengan
jdbc-connectordefault Spark,spark-connectordapat membaca data secara konkuren berdasarkan shard tabel Hologres, sehingga memberikan kinerja lebih baik. Tingkat konkurensi baca bergantung pada jumlah shard dalam tabel.spark-connectordapat dibatasi oleh parameterread.max_task_count. Pekerjaan akhirnya menghasilkanMin(shardCount, max_task_count)tugas baca. Konektor ini juga mendukung inferensi skema. Jika Anda tidak menyediakan skema, konektor akan menginferensi skema sisi Spark dari skema tabel Hologres. -
Mulai dari versi spark-connector 1.5.0, pembacaan dari tabel Hologres mendukung predicate pushdown, LIMIT pushdown, dan column pruning. Konektor ini juga mendukung pembacaan data dengan meneruskan
SELECT QUERYHologres. Versi ini memperkenalkan pembacaan dalam mode batch, yang meningkatkan kinerja baca hingga tiga hingga empat kali lipat dibandingkan versi sebelumnya.
Baca data menggunakan Spark-SQL
Saat menggunakan Spark-SQL, lebih praktis memuat metadata tabel Hologres menggunakan Catalog. Anda juga dapat mendeklarasikan tabel Hologres dengan membuat tabel sementara.
-
Versi Hologres-Connector-Spark sebelum 1.5.2 tidak mendukung catalog. Anda harus mendeklarasikan tabel Hologres dengan membuat tabel sementara.
-
Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.
-
Inisialisasi Hologres Catalog.
spark-sql --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar \ --conf spark.sql.catalog.hologres_external_test_db=com.alibaba.hologres.spark3.HoloTableCatalog \ --conf spark.sql.catalog.hologres_external_test_db.username=*** \ --conf spark.sql.catalog.hologres_external_test_db.password=*** \ --conf spark.sql.catalog.hologres_external_test_db.jdbcurl=jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db -
Baca data dari Hologres.
-
Baca data menggunakan Catalog.
-- Load the Hologres Catalog. USE hologres_external_test_db; -- Read from the Hologres table. Column pruning and predicate pushdown are supported. SELECT c_custkey,c_name,c_phone FROM public.customer_holo_table WHERE c_custkey < 500 LIMIT 10; -
Baca data dengan membuat tabel sementara.
CREATE TEMPORARY VIEW(table)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.max_task_count "80", // The maximum number of tasks to use for reading the Hologres table. table "customer_holo_table" ); -- Column pruning and predicate pushdown are supported. SELECT c_custkey,c_name,c_phone FROM hologresTable WHERE c_custkey < 500 LIMIT 10;CREATE TEMPORARY VIEW(query)
CREATE TEMPORARY VIEW hologresTable USING hologres OPTIONS ( jdbcurl "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db", username "***", password "***", read.query "select c_custkey,c_name,c_phone from customer_holo_table where c_custkey < 500 limit 10" ); SELECT * FROM hologresTable LIMIT 5;
-
Baca data Hologres ke dalam DataFrame
Saat Anda mengembangkan pekerjaan Spark menggunakan spark-shell, pyspark, atau alat lainnya, Anda dapat memanggil antarmuka Read Spark untuk membaca data ke dalam DataFrame guna perhitungan selanjutnya. Berikut adalah contoh membaca tabel Hologres ke dalam DataFrame dalam berbagai bahasa. Untuk informasi selengkapnya tentang parameter Hologres-Connector-Spark, lihat Parameter description.
Scala
val readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
.load()
.filter("c_custkey < 500")
)
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Java
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSelect {
public static void main(String[] args) {
// Run in local mode.
SparkSession spark = SparkSession.builder()
.appName("Spark CSV Example")
.master("local[*]")
.getOrCreate();
Dataset<Row> readDf = (
spark.read
.format("hologres")
.option("username", "***")
.option("password", "***")
.option("jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db")
.option("table", "customer_holo_table")
.option("read.max_task_count", "80") // The maximum number of tasks to use for reading the Hologres table.
.load()
.filter("c_custkey < 500")
);
readDf.select("c_custkey", "c_name", "c_phone").show(10);
}
}
Konfigurasi berikut diperlukan untuk file Maven.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.4</version>
<scope>provided</scope>
</dependency>
Python
readDf = spark.read.format("hologres").option(
"username", "***").option(
"password", "***").option(
"jdbcurl", "jdbc:postgresql://hgpostcn-cn-***-vpc-st.hologres.aliyuncs.com:80/test_db").option(
"table", "customer_holo_table").option(
"read.max_task_count", "80").load()
readDf.select("c_custkey", "c_name", "c_phone").show(10)
Untuk menjalankan pekerjaan Spark dalam bahasa yang berbeda, lakukan operasi berikut:
-
Scala
-
Anda dapat menggunakan contoh kode untuk menghasilkan file sparkselect.scala dan menjalankan pekerjaan sebagai berikut.
-- Load dependencies. spark-shell --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar -- For local testing, use the absolute path to load the file. scala> :load D:/sparkselect.scala -
Anda juga dapat menempelkan contoh kode langsung setelah memuat dependensi untuk menjalankannya.
-
-
Java
Anda dapat mengimpor contoh kode ke alat pengembangan Anda dan menggunakan Maven untuk membuat paketnya. Misalnya, nama paketnya bisa berupa spark_select.jar. Jalankan pekerjaan dengan kode berikut.
-- Use the absolute path for the job JAR package. spark-submit --class SparkSelect --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar D:\spark_select.jar -
Python
Setelah menjalankan kode berikut, Anda dapat menempelkan contoh kode langsung untuk menjalankannya.
pyspark --jars hologres-connector-spark-3.x-1.5.2-jar-with-dependencies.jar
Deskripsi parameter
Parameter umum
|
Parameter |
Nilai default |
Wajib |
Deskripsi |
|
username |
None |
Ya |
|
|
password |
None |
Ya |
|
|
table |
None |
Ya |
Nama tabel Hologres yang akan dibaca atau ditulis. Catatan
Saat membaca data, Anda juga dapat menggunakan parameter |
|
jdbcurl |
None |
Ya |
URL JDBC untuk API data real-time Hologres, dalam format |
|
enable_serverless_computing |
false |
Tidak |
Menentukan apakah akan menggunakan sumber daya Serverless. Parameter ini hanya berlaku untuk operasi baca dan operasi tulis dalam mode |
|
serverless_computing_query_priority |
3 |
Tidak |
Prioritas eksekusi untuk Serverless Computing. |
|
statement_timeout_seconds |
28800 (8 jam) |
Tidak |
Periode timeout untuk eksekusi kueri, dalam detik. |
|
retry_count |
3 |
Tidak |
Jumlah percobaan ulang saat koneksi gagal. |
|
direct_connect |
Untuk lingkungan yang mendukung koneksi langsung, koneksi langsung digunakan secara default. |
Tidak |
Titik hambat untuk pembacaan dan penulisan data batch sering kali merupakan throughput jaringan Endpoint. Oleh karena itu, sistem menguji apakah lingkungan saat ini dapat terhubung langsung ke Frontend Hologres (node akses). Jika koneksi langsung didukung, koneksi tersebut digunakan secara default. Atur parameter ini ke |
Parameter tulis
Konektor Hologres mendukung parameter Spark SaveMode. Untuk SQL, ini berarti INSERT INTO atau INSERT OVERWRITE. Untuk DataFrame, ini berarti mengatur SaveMode ke Append atau Overwrite saat menulis. Overwrite membuat tabel sementara untuk penulisan dan menggantikan tabel asli setelah penulisan berhasil. Gunakan fitur ini dengan hati-hati.
|
Nama parameter |
Nama parameter sebelumnya |
Nilai default |
Wajib |
Deskripsi |
|
write.mode |
copy_write_mode |
auto |
Tidak |
Mode penulisan. Nilai yang valid adalah sebagai berikut. Untuk perbandingan mode penulisan, lihat Comparison of batch write modes.
|
|
write.copy.max_buffer_size |
max_cell_buffer_size |
52428800 (50 MB) |
Tidak |
Saat menulis dalam mode COPY, biasanya Anda tidak perlu menyesuaikan panjang maksimum buffer lokal. Namun, jika terjadi luapan buffer saat menulis field besar, seperti string yang sangat panjang, Anda dapat meningkatkan nilai ini. |
|
write.copy.dirty_data_check |
copy_write_dirty_data_check |
false |
Tidak |
Menentukan apakah akan melakukan validasi data kotor. Jika Anda mengaktifkan fitur ini, sistem dapat mengidentifikasi baris spesifik yang gagal ditulis saat data kotor ditemukan. Namun, hal ini memengaruhi kinerja penulisan. Jangan aktifkan fitur ini kecuali Anda sedang melakukan troubleshooting. |
|
write.on_conflict_action |
write_mode |
INSERT_OR_REPLACE |
Tidak |
Kebijakan yang digunakan saat memasukkan data ke tabel dengan primary key:
|
Parameter berikut hanya berlaku ketika write.mode diatur ke insert.
|
Nama parameter |
Nama parameter sebelumnya |
Nilai default |
Wajib |
Deskripsi |
|
write.insert.dynamic_partition |
dynamic_partition |
false |
Tidak |
Parameter ini hanya berlaku ketika |
|
write.insert.batch_size |
write_batch_size |
512 |
Tidak |
Ukuran batch maksimum untuk setiap thread penulisan. Batch dikomit saat jumlah operasi Put, setelah digabung oleh WriteMode, mencapai nilai ini. |
|
write.insert.batch_byte_size |
write_batch_byte_size |
2097152 (2 * 1024 * 1024) |
Tidak |
Ukuran batch maksimum untuk setiap thread penulisan, dalam byte. Nilai default adalah 2 MB. Batch dikomit saat ukuran byte data Put, setelah digabung oleh WriteMode, mencapai nilai ini. |
|
write.insert.max_interval_ms |
write_max_interval_ms |
10000 |
Tidak |
Batch dikomit jika waktu sejak komit terakhir melebihi nilai ini. |
|
write.insert.thread_size |
write_thread_size |
1 |
Tidak |
Jumlah thread penulisan konkuren. Setiap thread konkuren menempati satu koneksi database. |
Parameter baca
|
Nama parameter |
Nama parameter sebelumnya (Versi 1.5.0 dan sebelumnya) |
Nilai default |
Wajib |
Deskripsi |
|
read.mode |
bulk_read |
auto |
Tidak |
Mode baca. Nilai yang valid:
|
|
read.max_task_count |
max_partition_count |
80 |
Tidak |
Membagi tabel Hologres yang akan dibaca menjadi beberapa partisi. Setiap partisi berkorespondensi dengan satu tugas Spark. Jika `ShardCount` tabel Hologres kurang dari parameter ini, jumlah partisi paling banyak sebesar `ShardCount`. |
|
read.copy.max_buffer_size |
/ |
52428800 (50 MB) |
Tidak |
Saat membaca dalam mode COPY, ini adalah panjang maksimum buffer lokal. Jika terjadi pengecualian dengan field besar, tingkatkan nilai ini. |
|
read.push_down_predicate |
push_down_predicate |
true |
Tidak |
Menentukan apakah akan mengaktifkan predicate pushdown, seperti menerapkan kondisi filter selama kueri. Fitur ini mendukung pushdown kondisi filter umum dan column pruning. |
|
read.push_down_limit |
push_down_limit |
true |
Tidak |
Menentukan apakah akan mengaktifkan LIMIT pushdown. |
|
read.select.batch_size |
scan_batch_size |
256 |
Tidak |
Parameter ini hanya berlaku ketika |
|
read.select.timeout_seconds |
scan_timeout_seconds |
60 |
Tidak |
Parameter ini hanya berlaku ketika |
|
read.query |
query |
None |
Tidak |
Menggunakan Catatan
|
Pemetaan tipe data
|
Spark Type |
Tipe Hologres |
|
ShortType |
SMALLINT |
|
IntegerType |
INT |
|
LongType |
BIGINT |
|
StringType |
TEXT |
|
StringType |
JSON |
|
StringType |
JSONB |
|
DecimalType |
NUMERIC(38, 18) |
|
BooleanType |
BOOL |
|
DoubleType |
DOUBLE PRECISION |
|
FloatType |
FLOAT |
|
TimestampType |
TIMESTAMPTZ |
|
DateType |
DATE |
|
BinaryType |
BYTEA |
|
BinaryType |
ROARINGBITMAP |
|
ArrayType(IntegerType) |
INT4[] |
|
ArrayType(LongType) |
INT8[] |
|
ArrayType(FloatType) |
FLOAT4[] |
|
ArrayType(DoubleType) |
FLOAT8[] |
|
ArrayType(BooleanType) |
BOOLEAN[] |
|
ArrayType(StringType) |
TEXT[] |
Perhitungan koneksi
Saat Hologres-Connector-Spark membaca atau menulis data, konektor tersebut menggunakan sejumlah koneksi JDBC tertentu. Jumlah ini dapat dipengaruhi oleh faktor-faktor berikut:
-
Konkurensi Spark. Ini adalah jumlah tugas yang berjalan secara konkuren, yang dapat dilihat di Spark UI saat pekerjaan sedang berjalan.
-
Jumlah koneksi yang digunakan konektor untuk setiap tugas konkuren:
-
Untuk penulisan dalam mode COPY, setiap tugas konkuren hanya menggunakan satu koneksi JDBC.
-
Untuk penulisan dalam mode INSERT, setiap tugas konkuren menggunakan
write_thread_sizekoneksi JDBC. -
Untuk pembacaan, setiap tugas konkuren menggunakan satu koneksi JDBC.
-
-
Koneksi yang digunakan oleh aspek lain: Saat suatu pekerjaan dimulai, pekerjaan tersebut melakukan operasi seperti pengambilan skema, yang mungkin secara singkat membuat koneksi.
Oleh karena itu, jumlah total koneksi yang digunakan oleh pekerjaan dapat dihitung menggunakan rumus berikut:
|
Item pekerjaan |
Koneksi yang digunakan |
|
Query metadata dari Catalog |
1 |
|
Baca data |
parallelism × 1 + 1 |
|
Tulis dalam mode COPY |
parallelism × 1 + 1 |
|
Tulis dalam mode INSERT |
parallelism × write_thread_size + 1 |
Perhitungan koneksi di atas mengasumsikan bahwa jumlah tugas yang dapat dijalankan Spark secara konkuren lebih besar daripada jumlah tugas yang dihasilkan oleh pekerjaan.
Jumlah tugas konkuren yang dapat dijalankan Spark mungkin dipengaruhi oleh parameter yang diatur pengguna, seperti spark.executor.instances, dan juga mungkin dipengaruhi oleh kebijakan pemisahan file Hadoop. Untuk informasi selengkapnya, lihat Apache Hadoop.