ApsaraDB for SelectDB kompatibel dengan Apache Doris. Anda dapat menggunakan Spark Doris Connector untuk mengimpor sejumlah besar data dengan memanfaatkan kemampuan komputasi terdistribusi dari Spark. Topik ini menjelaskan cara kerja Spark Doris Connector dan bagaimana menggunakannya untuk mengimpor data ke ApsaraDB for SelectDB.
Ikhtisar
Anda dapat menggunakan Spark Doris Connector sebagai salah satu metode untuk mengimpor sejumlah besar data ke ApsaraDB for SelectDB. Dengan memanfaatkan kemampuan komputasi terdistribusi dari Spark, Anda dapat membaca data dalam jumlah besar dari sumber data hulu seperti MySQL, PostgreSQL, Sistem File Terdistribusi Hadoop (HDFS), dan Amazon Simple Storage Service (Amazon S3) ke dalam DataFrame, lalu mengimpor data tersebut ke tabel di ApsaraDB for SelectDB menggunakan Spark Doris Connector. Selain itu, Anda juga dapat membaca data dari tabel di ApsaraDB for SelectDB menggunakan Java Database Connectivity (JDBC) Spark.
Cara kerjanya
Gambar berikut menunjukkan cara menggunakan Spark Doris Connector untuk mengimpor data ke ApsaraDB for SelectDB. Dalam arsitektur yang ditunjukkan pada gambar, Spark Doris Connector berfungsi sebagai jembatan untuk menulis data eksternal ke ApsaraDB for SelectDB, serta memproses data tersebut menggunakan kluster komputasi terdistribusi. Ini mempercepat aliran data melalui seluruh tautan, sehingga menggantikan metode impor data tradisional berperforma rendah menggunakan JDBC.
Prasyarat
Versi Spark Doris Connector harus 1.3.1 atau lebih baru.
Instal dependensi Spark Doris Connector
Anda dapat menggunakan salah satu metode berikut untuk menginstal dependensi Spark Doris Connector:
Instal dependensi Maven Spark Doris Connector. Contoh kode berikut menunjukkan sebuah contoh. Untuk mendapatkan dependensi versi lainnya, kunjungi Maven Repository.
<dependency> <groupId>org.apache.doris</groupId> <artifactId>spark-doris-connector-3.2_2.12</artifactId> <version>1.3.2</version> </dependency>Unduh paket JAR Spark Doris Connector.
Tabel berikut mencantumkan tiga paket umum untuk Spark Doris Connector. Unduh paket JAR sesuai dengan versi Spark Anda. Untuk mendapatkan dependensi versi lainnya, kunjungi Maven Repository.
CatatanPaket JAR berikut dikompilasi menggunakan Java 8. Jika Anda ingin menggunakan versi Java lainnya, hubungi dukungan teknis untuk ApsaraDB for SelectDB.
Versi paket JAR menunjukkan versi Spark yang didukung, versi Scala, dan versi Spark Doris Connector dari kiri ke kanan.
Versi
Paket JAR
2.4-2.12-1.3.2
3.1-2.12-1.3.2
3.2-2.12-1.3.2
Setelah mengunduh paket JAR, gunakan salah satu metode berikut untuk menjalankan Spark:
Untuk kluster Spark yang berjalan dalam mode kluster lokal, tempatkan paket JAR Spark Doris Connector di direktori jars pada direktori instalasi Spark.
Untuk kluster Spark yang berjalan dalam mode kluster YARN, unggah paket JAR Spark Doris Connector sebagai paket pra-penerapan. Contoh:
Unggah paket spark-doris-connector-3.2_2.12-1.3.2.jar ke HDFS kluster Spark.
hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/Tambahkan dependensi pada paket spark-doris-connector-3.2_2.12-1.3.2.jar ke kluster.
spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar
Prosedur
Setelah menjalankan Spark pada klien Spark atau mengimpor paket Spark Doris Connector ke lingkungan pengembangan Spark, Anda dapat menyinkronkan data menggunakan Spark SQL atau DataFrame. Contoh berikut menunjukkan cara mengimpor data hulu ke instans ApsaraDB for SelectDB.
Gunakan Spark SQL
val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
"table.identifier"="${selectdbTable}",
"fenodes"="${selectdbHttpPort}",
"user"="${selectdbUser}",
"password"="${selectdbPwd}",
"sink.properties.format"="json"
);
INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;Parameter
Parameter | Nilai default | Diperlukan | Deskripsi |
fenodes | Tidak ada | Ya | Titik akhir yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB melalui HTTP. Untuk mendapatkan titik akhir Virtual Private Cloud (VPC) atau titik akhir publik dan port HTTP dari instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB dan buka halaman Instance Details dari instans yang ingin Anda lihat informasinya. Di bagian Network Information tab Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter HTTP Port. Contoh: |
table.identifier | Tidak ada | Ya | Nama tabel di instans ApsaraDB for SelectDB. Format: |
request.retries | 3 | Tidak | Jumlah maksimum percobaan ulang yang diizinkan untuk mengirim permintaan ke instans ApsaraDB for SelectDB. |
request.connect.timeout.ms | 30000 | Tidak | Periode waktu habis koneksi untuk permintaan ke instans ApsaraDB for SelectDB. |
request.read.timeout.ms | 30000 | Tidak | Periode waktu habis baca untuk permintaan ke instans ApsaraDB for SelectDB. |
request.query.timeout.s | 3600 | Tidak | Periode waktu habis kueri untuk instans ApsaraDB for SelectDB. Nilai default menunjukkan satu jam. Nilai -1 menunjukkan periode waktu habis tanpa batas. |
request.tablet.size | Integer.MAX_VALUE | Tidak | Jumlah tablet ApsaraDB for SelectDB yang sesuai dengan partisi RDD. Semakin kecil nilainya, semakin banyak partisi yang dihasilkan. Ini meningkatkan paralelisme di sisi Spark, tetapi juga memberi lebih banyak tekanan pada ApsaraDB for SelectDB. |
read.field | Tidak ada | Tidak | Kolom di tabel ApsaraDB for SelectDB yang ingin Anda baca datanya. Pisahkan beberapa kolom dengan koma (,). |
batch.size | 1024 | Tidak | Jumlah maksimum baris yang dapat dibaca dari backend sekaligus. Jika lebih banyak baris dibaca dari backend sekaligus, jumlah koneksi yang dibuat antara Spark dan ApsaraDB for SelectDB berkurang. Ini mengurangi overhead waktu tambahan yang disebabkan oleh latensi jaringan. |
exec.mem.limit | 2147483648 | Tidak | Ambang batas memori untuk satu kueri. Nilai default menunjukkan 2 GB. Unit: byte. |
deserialize.arrow.async | false | Tidak | Menentukan apakah akan secara asinkron mendeserialisasi data Arrow ke RowBatch yang diperlukan untuk iterasi Spark Doris Connector. |
deserialize.queue.size | 64 | Tidak | Ukuran antrian pemrosesan internal untuk mendeserialisasi data Arrow secara asinkron. Parameter ini hanya berlaku jika parameter |
write.fields | Tidak ada | Tidak | Bidang yang ingin Anda tulis ke tabel ApsaraDB for SelectDB atau urutan penulisan bidang. Pisahkan beberapa bidang dengan koma (,). Secara default, semua bidang ditulis ke tabel ApsaraDB for SelectDB berdasarkan urutan bidang di tabel ApsaraDB for SelectDB. |
sink.batch.size | 100000 | Tidak | Jumlah maksimum baris yang dapat ditulis ke backend sekaligus. |
sink.max-retries | 0 | Tidak | Jumlah maksimum percobaan ulang setelah data gagal ditulis ke backend. |
sink.properties.format | csv | Tidak | Format data yang didukung oleh Stream Load. Nilai valid: csv, json, dan arrow. |
sink.properties.* | -- | Tidak | Parameter yang digunakan untuk mengirimkan pekerjaan Stream Load. Misalnya, Anda dapat menentukan |
sink.task.partition.size | Tidak ada | Tidak | Jumlah partisi tempat data ditulis di instans ApsaraDB for SelectDB. Jika Anda melakukan operasi seperti penyaringan pada RDD di Spark, jumlah partisi tempat data ditulis mungkin bertambah, tetapi setiap partisi hanya berisi sejumlah kecil catatan. Ini meningkatkan frekuensi operasi tulis dan membuang-buang sumber daya komputasi. Nilai yang lebih kecil menunjukkan frekuensi yang lebih rendah untuk menulis data ke instans ApsaraDB for SelectDB. Ini mengurangi tekanan penggabungan data untuk instans ApsaraDB for SelectDB. Anda harus menggunakan parameter ini dengan parameter |
sink.task.use.repartition | false | Tidak | Menentukan apakah akan mempartisi ulang data ke sejumlah partisi tertentu sebelum data ditulis ke instans ApsaraDB for SelectDB. Nilai default: false, yang menunjukkan bahwa coalesce digunakan. Jika tidak ada tindakan yang dipanggil sebelum operasi tulis, paralelisme komputasi mungkin kurang optimal. Jika parameter ini diatur ke true, repartitioning diaktifkan. Anda dapat menentukan jumlah partisi tempat data dipartisi ulang. Ini meningkatkan overhead shuffle. |
sink.batch.interval.ms | 50 | Tidak | Interval penulisan data ke sink. Unit: milidetik. |
sink.enable-2pc | false | Tidak | Menentukan apakah protokol komit dua fase (2PC) digunakan untuk menulis data. Jika Anda mengatur parameter ini ke true, transaksi akan dikomit setelah pekerjaan Spark selesai. Jika beberapa tugas gagal, semua transaksi yang telah dikomit sebelumnya akan dibatalkan. |
sink.auto-redirect | true | Tidak | Menentukan apakah permintaan Stream Load dialihkan. Jika Anda mengatur parameter ini ke true, permintaan Stream Load dimuat menggunakan frontend. Dengan cara ini, Anda tidak perlu secara eksplisit mendapatkan informasi backend. |
user | Tidak ada | Ya | Nama pengguna yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB. |
password | Tidak ada | Ya | Kata sandi yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB. |
filter.query.in.max.count | 100 | Tidak | Jumlah maksimum nilai yang dapat disertakan dalam klausa IN saat Spark melakukan penurunan predikat. Jika jumlah nilai dalam klausa IN melebihi ambang batas yang ditentukan, operasi filter ditangani oleh Spark. |
ignore-type | Tidak ada | Tidak | Tipe bidang yang ingin Anda abaikan saat membaca skema untuk tampilan sementara. Contoh: |
Gunakan DataFrame
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
("1", 100, "Pending Payment"),
("2", 200, null),
("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")
df.write
.format("doris")
.option("fenodes", selectdbHttpPort)
.option("table.identifier", selectdbTable)
.option("user", selectdbUser)
.option("password", selectdbPwd)
.option("sink.batch.size", 100000)
.option("sink.max-retries", 3)
.option("sink.properties.file.column_separator", "\t")
.option("sink.properties.file.line_delimiter", "\n")
.save()Parameter
Parameter | Nilai default | Diperlukan | Deskripsi |
fenodes | Tidak ada | Ya | Titik akhir yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB melalui HTTP. Untuk mendapatkan titik akhir VPC atau titik akhir publik dan port HTTP dari instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB dan buka halaman Instance Details dari instans yang ingin Anda lihat informasinya. Di bagian Network Information tab Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter HTTP Port. Contoh: |
table.identifier | Tidak ada | Ya | Nama tabel di instans ApsaraDB for SelectDB. Format: |
request.retries | 3 | Tidak | Jumlah maksimum percobaan ulang yang diizinkan untuk mengirim permintaan ke instans ApsaraDB for SelectDB. |
request.connect.timeout.ms | 30000 | Tidak | Periode waktu habis koneksi untuk permintaan ke instans ApsaraDB for SelectDB. |
request.read.timeout.ms | 30000 | Tidak | Periode waktu habis baca untuk permintaan ke instans ApsaraDB for SelectDB. |
request.query.timeout.s | 3600 | Tidak | Periode waktu habis kueri untuk instans ApsaraDB for SelectDB. Nilai default menunjukkan satu jam. Nilai -1 menunjukkan periode waktu habis tanpa batas. |
request.tablet.size | Integer.MAX_VALUE | Tidak | Jumlah tablet ApsaraDB for SelectDB yang sesuai dengan partisi RDD. Semakin kecil nilainya, semakin banyak partisi yang dihasilkan. Ini meningkatkan paralelisme di sisi Spark, tetapi juga memberi lebih banyak tekanan pada ApsaraDB for SelectDB. |
read.field | Tidak ada | Tidak | Kolom di tabel ApsaraDB for SelectDB yang ingin Anda baca datanya. Pisahkan beberapa kolom dengan koma (,). |
batch.size | 1024 | Tidak | Jumlah maksimum baris yang dapat dibaca dari backend sekaligus. Jika lebih banyak baris dibaca dari backend sekaligus, jumlah koneksi yang dibuat antara Spark dan ApsaraDB for SelectDB berkurang. Ini mengurangi overhead waktu tambahan yang disebabkan oleh latensi jaringan. |
exec.mem.limit | 2147483648 | Tidak | Ambang batas memori untuk satu kueri. Nilai default menunjukkan 2 GB. Unit: byte. |
deserialize.arrow.async | false | Tidak | Menentukan apakah akan secara asinkron mendeserialisasi data Arrow ke RowBatch yang diperlukan untuk iterasi Spark Doris Connector. |
deserialize.queue.size | 64 | Tidak | Ukuran antrian pemrosesan internal untuk mendeserialisasi data Arrow secara asinkron. Parameter ini hanya berlaku jika parameter |
write.fields | Tidak ada | Tidak | Bidang yang ingin Anda tulis ke tabel ApsaraDB for SelectDB atau urutan penulisan bidang. Pisahkan beberapa bidang dengan koma (,). Secara default, semua bidang ditulis ke tabel ApsaraDB for SelectDB berdasarkan urutan bidang di tabel ApsaraDB for SelectDB. |
sink.batch.size | 100000 | Tidak | Jumlah maksimum baris yang dapat ditulis ke backend sekaligus. |
sink.max-retries | 0 | Tidak | Jumlah maksimum percobaan ulang setelah data gagal ditulis ke backend. |
sink.properties.format | csv | Tidak | Format data yang didukung oleh Stream Load. Nilai valid: csv, json, dan arrow. |
sink.properties.* | -- | Tidak | Parameter yang digunakan untuk mengirimkan pekerjaan Stream Load. Misalnya, Anda dapat menentukan |
sink.task.partition.size | Tidak ada | Tidak | Jumlah partisi tempat data ditulis di instans ApsaraDB for SelectDB. Jika Anda melakukan operasi seperti penyaringan pada RDD di Spark, jumlah partisi tempat data ditulis mungkin bertambah, tetapi setiap partisi hanya berisi sejumlah kecil catatan. Ini meningkatkan frekuensi operasi tulis dan membuang-buang sumber daya komputasi. Nilai yang lebih kecil menunjukkan frekuensi yang lebih rendah untuk menulis data ke instans ApsaraDB for SelectDB. Ini mengurangi tekanan penggabungan data untuk instans ApsaraDB for SelectDB. Anda harus menggunakan parameter ini dengan parameter |
sink.task.use.repartition | false | Tidak | Menentukan apakah akan mempartisi ulang data ke sejumlah partisi tertentu sebelum data ditulis ke instans ApsaraDB for SelectDB. Nilai default: false, yang menunjukkan bahwa coalesce digunakan. Jika tidak ada tindakan yang dipanggil sebelum operasi tulis, paralelisme komputasi mungkin kurang optimal. Jika parameter ini diatur ke true, repartitioning diaktifkan. Anda dapat menentukan jumlah partisi tempat data dipartisi ulang. Ini meningkatkan overhead shuffle. |
sink.batch.interval.ms | 50 | Tidak | Interval penulisan data ke sink. Unit: milidetik. |
sink.enable-2pc | false | Tidak | Menentukan apakah protokol 2PC digunakan untuk menulis data. Jika Anda mengatur parameter ini ke true, transaksi akan dikomit setelah pekerjaan Spark selesai. Jika beberapa tugas gagal, semua transaksi yang telah dikomit sebelumnya akan dibatalkan. |
sink.auto-redirect | true | Tidak | Menentukan apakah permintaan Stream Load dialihkan. Jika Anda mengatur parameter ini ke true, permintaan Stream Load dimuat menggunakan frontend. Dengan cara ini, Anda tidak perlu secara eksplisit mendapatkan informasi backend. |
user | Tidak ada | Ya | Nama pengguna yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB. |
password | Tidak ada | Ya | Kata sandi yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB. |
filter.query.in.max.count | 100 | Tidak | Jumlah maksimum nilai yang dapat disertakan dalam klausa IN saat Spark melakukan penurunan predikat. Jika jumlah nilai dalam klausa IN melebihi ambang batas yang ditentukan, operasi filter ditangani oleh Spark. |
ignore-type | Tidak ada | Tidak | Tipe bidang yang ingin Anda abaikan saat membaca skema untuk tampilan sementara. Contoh: |
sink.streaming.passthrough | false | Tidak | Menulis nilai di kolom pertama tanpa pemrosesan. |
Contoh
Tabel berikut mencantumkan versi semua perangkat lunak dalam lingkungan contoh.
Perangkat Lunak | Java | Spark | Scala | SelectDB |
Versi | 1.8 | 3.1.2 | 2.12 | 3.0.4 |
Siapkan lingkungan
Konfigurasikan lingkungan Spark.
Unduh dan dekompres paket instalasi Spark. Dalam contoh ini, paket instalasi Spark spark-3.1.2-bin-hadoop3.2.tgz digunakan.
wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz tar xvzf spark-3.1.2-bin-hadoop3.2.tgzLetakkan file spark-doris-connector-3.2_2.12-1.3.2.jar di direktori SPARK_HOME/jars.
Buat data yang akan diimpor. Dalam contoh ini, sejumlah kecil data diimpor dari database MySQL.
Buat tabel uji di database MySQL.
CREATE TABLE `employees` ( `emp_no` int NOT NULL, `birth_date` date NOT NULL, `first_name` varchar(14) NOT NULL, `last_name` varchar(16) NOT NULL, `gender` enum('M','F') NOT NULL, `hire_date` date NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3Gunakan Data Management (DMS) untuk menghasilkan data uji. Untuk informasi lebih lanjut, lihat Hasilkan Data Uji.
Konfigurasikan instans ApsaraDB for SelectDB.
Buat instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Buat Instans.
Sambungkan ke instans ApsaraDB for SelectDB melalui protokol MySQL. Untuk informasi lebih lanjut, lihat Sambungkan ke Instans ApsaraDB for SelectDB.
Buat database uji dan tabel uji.
Buat database uji.
CREATE DATABASE test_db;Buat tabel uji.
USE test_db; CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Ajukan atau Lepaskan Titik Akhir Publik.
Tambahkan alamat IP publik lingkungan Spark ke daftar putih alamat IP instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.
Impor data dari database MySQL ke instans ApsaraDB for SelectDB
Gunakan Spark SQL
Contoh berikut menunjukkan cara menggunakan Spark SQL untuk mengimpor data dari database MySQL hulu ke instans ApsaraDB for SelectDB.
Mulai layanan spark-sql.
bin/spark-sqlKirim tugas di spark-sql.
CREATE TEMPORARY VIEW mysql_tbl USING jdbc OPTIONS( "url"="jdbc:mysql://host:port/test_db", "dbtable"="employees", "driver"="com.mysql.jdbc.Driver", "user"="admin", "password"="****" ); CREATE TEMPORARY VIEW selectdb_tbl USING doris OPTIONS( "table.identifier"="test_db.employees", "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080", "user"="admin", "password"="****", "sink.properties.format"="json" ); INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;Setelah tugas Spark selesai, masuk ke ApsaraDB for SelectDB untuk melihat data yang diimpor menggunakan Spark.
Gunakan DataFrame
Contoh berikut menunjukkan cara menggunakan DataFrame untuk mengimpor data dari database MySQL hulu ke instans ApsaraDB for SelectDB.
Mulai layanan spark-shell.
bin/spark-shellKirim tugas di spark-shell.
val mysqlDF = spark.read.format("jdbc") .option("url", "jdbc:mysql://host:port/test_db") .option("dbtable", "employees") .option("driver", "com.mysql.jdbc.Driver") .option("user", "admin") .option("password", "****") .load() mysqlDF.write.format("doris") .option("fenodes", "host:httpPort") .option("table.identifier", "test_db.employees") .option("user", "admin") .option("password", "****") .option("sink.batch.size", 100000) .option("sink.max-retries", 3) .option("sink.properties.format", "json") .save()Setelah tugas Spark selesai, masuk ke ApsaraDB for SelectDB untuk melihat data yang diimpor menggunakan Spark.