Spark Doris Connector memungkinkan Anda memuat volume data besar ke ApsaraDB for SelectDB menggunakan kluster komputasi terdistribusi Spark. Connector ini membaca data dari sumber hulu—MySQL, PostgreSQL, Hadoop Distributed File System (HDFS), Amazon Simple Storage Service (Amazon S3), dan lainnya—ke dalam DataFrame, lalu menuliskannya ke SelectDB menggunakan Stream Load. Anda juga dapat membaca data dari tabel SelectDB melalui Spark Java Database Connectivity (JDBC).
Cara kerja
Spark Doris Connector menjembatani sumber data eksternal Anda dan ApsaraDB for SelectDB. Spark melakukan pra-pemrosesan data di kluster komputasi terdistribusi, lalu connector tersebut menuliskannya ke SelectDB melalui Stream Load. Pendekatan ini menggantikan penulisan berbasis JDBC pada satu node dengan pipeline paralel yang berskala sesuai dengan kluster Spark Anda.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Spark Doris Connector versi 1.3.1 atau yang lebih baru telah diinstal
Instans ApsaraDB for SelectDB. Lihat Buat instans
Koneksi ke instans SelectDB. Lihat Hubungkan ke instans ApsaraDB for SelectDB
Alamat IP publik lingkungan Spark Anda telah ditambahkan ke daftar putih alamat IP SelectDB. Lihat Konfigurasi daftar putih alamat IP
Instal Spark Doris Connector
Pilih salah satu metode instalasi berikut.
Opsi 1: Tambahkan dependensi Maven
Tambahkan dependensi berikut ke file pom.xml Anda. Untuk versi lainnya, lihat Maven Repository.
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-3.2_2.12</artifactId>
<version>1.3.2</version>
</dependency>Opsi 2: Unduh paket JAR
Unduh paket JAR yang sesuai dengan versi Spark dan Scala Anda. Untuk versi lainnya, lihat Maven Repository.
Paket JAR berikut dikompilasi dengan Java 8. Hubungi dukungan teknis jika Anda memerlukan versi Java yang berbeda. String versi menyertakan, dari kiri ke kanan: versi Spark, versi Scala, dan versi Spark Doris Connector.
| Versi | Paket JAR |
|---|---|
| 2.4-2.12-1.3.2 | spark-doris-connector-2.4_2.12-1.3.2 |
| 3.1-2.12-1.3.2 | spark-doris-connector-3.1_2.12-1.3.2 |
| 3.2-2.12-1.3.2 | spark-doris-connector-3.2_2.12-1.3.2 |
Setelah mengunduh, deploy JAR berdasarkan mode kluster Spark Anda:
Mode kluster lokal: Tempatkan file JAR di direktori
jarsinstalasi Spark Anda.Mode kluster YARN: Unggah file JAR sebagai paket penyebaran awal.
Unggah file JAR ke HDFS: ``
bash hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar /spark-jars/``Tambahkan dependensi ke konfigurasi kluster Anda: ``
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar``
Impor data
Spark Doris Connector mendukung dua API penulisan: Spark SQL dan DataFrame. Keduanya menggunakan format sumber data doris dan berbagi parameter konfigurasi yang sama.
SparkSQL
Jalankan shell Spark SQL dan kirimkan tugas menggunakan tampilan temporary:
bin/spark-sqlval selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
"table.identifier"="${selectdbTable}",
"fenodes"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.format"="json"
);
INSERT INTO test_order SELECT order_id, order_amount, order_status FROM tmp_tb;DataFrame
Jalankan shell Spark dan kirimkan tugas penulisan:
bin/spark-shellval spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "Pending Payment"),
("2", 200, null),
("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("fenodes", selectdbHttpPort)
.option("table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 100000)
.option("sink.max-retries", 3)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()Parameter
Semua parameter berlaku untuk Spark SQL dan DataFrame kecuali jika dinyatakan sebaliknya.
Parameter koneksi
| Parameter | Default | Wajib | Deskripsi |
|---|---|---|---|
fenodes | None | Ya | Titik akhir HTTP instans SelectDB. Format: <host>:<http-port>. Untuk menemukan titik akhir Anda, buka halaman Instance Details di Konsol ApsaraDB for SelectDB. Di bawah Informasi Dasar > Informasi Jaringan, salin VPC Endpoint atau Public Endpoint serta HTTP Port. Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080. |
table.identifier | None | Ya | Tabel target dalam format <database>.<table>. Contoh: test_db.test_table. |
user | None | Ya | Username untuk menghubungkan ke instans SelectDB. |
password | None | Ya | Password untuk menghubungkan ke instans SelectDB. |
Parameter baca
| Parameter | Default | Wajib | Deskripsi |
|---|---|---|---|
request.retries | 3 | Tidak | Jumlah maksimum percobaan ulang permintaan ke instans SelectDB. |
request.connect.timeout.ms | 30000 | Tidak | Timeout koneksi untuk permintaan ke instans SelectDB, dalam milidetik. |
request.read.timeout.ms | 30000 | Tidak | Timeout baca untuk permintaan ke instans SelectDB, dalam milidetik. |
request.query.timeout.s | 3600 | Tidak | Timeout kueri untuk instans SelectDB, dalam detik. Nilai default adalah 1 jam. Atur ke -1 untuk tanpa timeout. |
request.tablet.size | Integer.MAX_VALUE | Tidak | Jumlah tablet SelectDB yang dipetakan ke setiap partisi Resilient Distributed Dataset (RDD). Nilai yang lebih kecil menghasilkan lebih banyak partisi, meningkatkan paralelisme di sisi Spark dengan biaya tekanan tambahan pada SelectDB. |
read.field | None | Tidak | Kolom yang akan dibaca dari tabel SelectDB. Pisahkan beberapa kolom dengan koma. |
batch.size | 1024 | Tidak | Jumlah maksimum baris yang dibaca dari node backend per permintaan. Nilai yang lebih besar mengurangi jumlah koneksi dan overhead latensi jaringan. |
exec.mem.limit | 2147483648 | Tidak | Batas memori untuk satu kueri, dalam byte. Default adalah 2 GB. |
deserialize.arrow.async | false | Tidak | Apakah data Arrow dideserialisasi ke RowBatch secara asinkron. |
deserialize.queue.size | 64 | Tidak | Ukuran antrian internal untuk deserialisasi Arrow asinkron. Hanya berlaku ketika deserialize.arrow.async bernilai true. |
filter.query.in.max.count | 100 | Tidak | Jumlah maksimum nilai dalam klausa IN untuk penurunan predikat. Jika jumlah nilai melebihi ambang batas ini, Spark menangani filter secara lokal. |
ignore-type | None | Tidak | Jenis bidang yang diabaikan saat membaca skema untuk tampilan temporary. Contoh: bitmap,hll. |
Parameter tulis
| Parameter | Default | Wajib | Deskripsi |
|---|---|---|---|
write.fields | None | Tidak | Bidang yang akan ditulis ke tabel SelectDB, atau urutan penulisan bidang. Pisahkan beberapa bidang dengan koma. Default-nya adalah semua bidang sesuai urutan kolom tabel. |
sink.batch.size | 100000 | Tidak | Jumlah maksimum baris yang ditulis ke backend per batch. |
sink.max-retries | 0 | Tidak | Jumlah maksimum percobaan ulang setelah kegagalan penulisan. |
sink.properties.format | csv | Tidak | Format data untuk Stream Load. Nilai yang valid: csv, json, arrow. |
sink.properties.* | -- | Tidak | Parameter Stream Load, diteruskan langsung ke pekerjaan Stream Load yang mendasarinya. Misalnya, atur sink.properties.column_separator untuk menentukan pemisah kolom. Untuk semua parameter yang tersedia, lihat Impor data menggunakan Stream Load. |
sink.task.partition.size | None | Tidak | Jumlah partisi untuk menulis ke SelectDB. Gunakan ini untuk mengurangi frekuensi penulisan ketika penyaringan RDD menghasilkan banyak partisi kecil. Gunakan bersama dengan sink.task.use.repartition. |
sink.task.use.repartition | false | Tidak | Apakah akan merepartisi data sebelum menulis. false menggunakan coalesce (tanpa shuffle). true menggunakan repartition, yang menghasilkan partisi lebih merata tetapi menambahkan overhead shuffle. |
sink.batch.interval.ms | 50 | Tidak | Interval antar batch penulisan, dalam milidetik. |
sink.enable-2pc | false | Tidak | Apakah akan menggunakan commit dua fase (2PC). Saat diaktifkan, transaksi hanya dikomit setelah seluruh pekerjaan Spark selesai. Jika ada tugas yang gagal, semua transaksi yang telah diprekomit akan di-rollback. |
sink.auto-redirect | true | Tidak | Apakah akan mengarahkan ulang permintaan Stream Load melalui node frontend. Saat diaktifkan, Anda tidak perlu menentukan alamat node backend secara eksplisit. |
sink.streaming.passthrough | false | Tidak | (Hanya DataFrame) Menulis nilai kolom pertama tanpa transformasi. |
Rekomendasi produksi
Pertimbangkan pengaturan berikut untuk beban kerja produksi:
Pengiriman exactly-once: Aktifkan
sink.enable-2pc=trueuntuk memastikan semua data dikomit hanya setelah pekerjaan Spark berhasil. Kegagalan tugas apa pun akan meng-rollback semua transaksi yang telah diprekomit, sehingga mencegah penulisan parsial.Kontrol partisi: Jika penyaringan RDD menghasilkan banyak partisi kecil, atur
sink.task.partition.sizebersama dengansink.task.use.repartition=trueuntuk menggabungkan partisi dan mengurangi frekuensi penulisan.Percobaan ulang saat gagal: Atur
sink.max-retrieske bilangan bulat positif (misalnya,3) untuk secara otomatis mencoba ulang kegagalan penulisan sementara.
Contoh end-to-end
Contoh ini mengimpor data dari database MySQL ke ApsaraDB for SelectDB menggunakan Spark SQL dan DataFrame.
Lingkungan contoh:
| Perangkat Lunak | Java | Spark | Scala | SelectDB |
|---|---|---|---|---|
| Versi | 1.8 | 3.1.2 | 2.12 | 3.0.4 |
Siapkan lingkungan
1. Konfigurasi Spark.
Unduh dan ekstrak paket instalasi Spark:
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xvzf spark-3.1.2-bin-hadoop3.2.tgzTempatkan spark-doris-connector-3.2_2.12-1.3.2.jar di direktori SPARK_HOME/jars.
2. Buat tabel sumber di MySQL.
CREATE TABLE `employees` (
`emp_no` int NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;Gunakan Data Management (DMS) untuk menghasilkan data uji.
3. Buat tabel target di SelectDB.
Hubungkan ke instans SelectDB Anda melalui protokol MySQL, lalu jalankan:
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE employees (
emp_no int NOT NULL,
birth_date date,
first_name varchar(20),
last_name varchar(20),
gender char(2),
hire_date date
)
UNIQUE KEY(`emp_no`)
DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;4. Ajukan permohonan titik akhir publik.
Ajukan permohonan titik akhir publik untuk instans SelectDB Anda. Lihat Ajukan atau rilis titik akhir publik.
5. Tambahkan IP host Spark Anda ke daftar putih.
Tambahkan alamat IP publik lingkungan Spark Anda ke daftar putih alamat IP SelectDB. Lihat Konfigurasi daftar putih alamat IP.
Impor menggunakan Spark SQL
Jalankan shell Spark SQL:
bin/spark-sqlKirimkan tugas impor:
CREATE TEMPORARY VIEW mysql_tbl USING jdbc OPTIONS( "url"="jdbc:mysql://host:port/test_db", "dbtable"="employees", "driver"="com.mysql.jdbc.Driver", "user"="admin", "password"="****" ); CREATE TEMPORARY VIEW selectdb_tbl USING doris OPTIONS( "table.identifier"="test_db.employees", "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080", "user"="admin", "password"="****", "sink.properties.format"="json" ); INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;Setelah pekerjaan selesai, login ke Konsol ApsaraDB for SelectDB untuk memverifikasi data yang diimpor.
Impor menggunakan DataFrame
Jalankan shell Spark:
bin/spark-shellKirimkan tugas impor:
val mysqlDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://host:port/test_db") .option("dbtable", "employees") .option("driver", "com.mysql.jdbc.Driver") .option("user", "admin") .option("password", "****") .load() mysqlDF.write.format("doris") .option("fenodes", "host:httpPort") .option("table.identifier", "test_db.employees") .option("user", "admin") .option("password", "****") .option("sink.batch.size", 100000) .option("sink.max-retries", 3) .option("sink.properties.format", "json") .save()Setelah pekerjaan selesai, login ke Konsol ApsaraDB for SelectDB untuk memverifikasi data yang diimpor.