All Products
Search
Document Center

ApsaraDB for SelectDB:Impor\ data\ menggunakan\ Spark

Last Updated:Mar 29, 2026

Spark Doris Connector memungkinkan Anda memuat volume data besar ke ApsaraDB for SelectDB menggunakan kluster komputasi terdistribusi Spark. Connector ini membaca data dari sumber hulu—MySQL, PostgreSQL, Hadoop Distributed File System (HDFS), Amazon Simple Storage Service (Amazon S3), dan lainnya—ke dalam DataFrame, lalu menuliskannya ke SelectDB menggunakan Stream Load. Anda juga dapat membaca data dari tabel SelectDB melalui Spark Java Database Connectivity (JDBC).

Cara kerja

Spark Doris Connector menjembatani sumber data eksternal Anda dan ApsaraDB for SelectDB. Spark melakukan pra-pemrosesan data di kluster komputasi terdistribusi, lalu connector tersebut menuliskannya ke SelectDB melalui Stream Load. Pendekatan ini menggantikan penulisan berbasis JDBC pada satu node dengan pipeline paralel yang berskala sesuai dengan kluster Spark Anda.

image

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Instal Spark Doris Connector

Pilih salah satu metode instalasi berikut.

Opsi 1: Tambahkan dependensi Maven

Tambahkan dependensi berikut ke file pom.xml Anda. Untuk versi lainnya, lihat Maven Repository.

<dependency>
    <groupId>org.apache.doris</groupId>
    <artifactId>spark-doris-connector-3.2_2.12</artifactId>
    <version>1.3.2</version>
</dependency>

Opsi 2: Unduh paket JAR

Unduh paket JAR yang sesuai dengan versi Spark dan Scala Anda. Untuk versi lainnya, lihat Maven Repository.

Paket JAR berikut dikompilasi dengan Java 8. Hubungi dukungan teknis jika Anda memerlukan versi Java yang berbeda. String versi menyertakan, dari kiri ke kanan: versi Spark, versi Scala, dan versi Spark Doris Connector.
VersiPaket JAR
2.4-2.12-1.3.2spark-doris-connector-2.4_2.12-1.3.2
3.1-2.12-1.3.2spark-doris-connector-3.1_2.12-1.3.2
3.2-2.12-1.3.2spark-doris-connector-3.2_2.12-1.3.2

Setelah mengunduh, deploy JAR berdasarkan mode kluster Spark Anda:

  • Mode kluster lokal: Tempatkan file JAR di direktori jars instalasi Spark Anda.

  • Mode kluster YARN: Unggah file JAR sebagai paket penyebaran awal.

    1. Unggah file JAR ke HDFS: ``bash hdfs dfs -mkdir /spark-jars/ hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar /spark-jars/ ``

    2. Tambahkan dependensi ke konfigurasi kluster Anda: `` spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar ``

Impor data

Spark Doris Connector mendukung dua API penulisan: Spark SQL dan DataFrame. Keduanya menggunakan format sumber data doris dan berbagi parameter konfigurasi yang sama.

SparkSQL

Jalankan shell Spark SQL dan kirimkan tugas menggunakan tampilan temporary:

bin/spark-sql
val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"

CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id, order_amount, order_status FROM tmp_tb;

DataFrame

Jalankan shell Spark dan kirimkan tugas penulisan:

bin/spark-shell
val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "Pending Payment"),
  ("2", 200, null),
  ("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

Parameter

Semua parameter berlaku untuk Spark SQL dan DataFrame kecuali jika dinyatakan sebaliknya.

Parameter koneksi

ParameterDefaultWajibDeskripsi
fenodesNoneYaTitik akhir HTTP instans SelectDB. Format: <host>:<http-port>. Untuk menemukan titik akhir Anda, buka halaman Instance Details di Konsol ApsaraDB for SelectDB. Di bawah Informasi Dasar > Informasi Jaringan, salin VPC Endpoint atau Public Endpoint serta HTTP Port. Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.
table.identifierNoneYaTabel target dalam format <database>.<table>. Contoh: test_db.test_table.
userNoneYaUsername untuk menghubungkan ke instans SelectDB.
passwordNoneYaPassword untuk menghubungkan ke instans SelectDB.

Parameter baca

ParameterDefaultWajibDeskripsi
request.retries3TidakJumlah maksimum percobaan ulang permintaan ke instans SelectDB.
request.connect.timeout.ms30000TidakTimeout koneksi untuk permintaan ke instans SelectDB, dalam milidetik.
request.read.timeout.ms30000TidakTimeout baca untuk permintaan ke instans SelectDB, dalam milidetik.
request.query.timeout.s3600TidakTimeout kueri untuk instans SelectDB, dalam detik. Nilai default adalah 1 jam. Atur ke -1 untuk tanpa timeout.
request.tablet.sizeInteger.MAX_VALUETidakJumlah tablet SelectDB yang dipetakan ke setiap partisi Resilient Distributed Dataset (RDD). Nilai yang lebih kecil menghasilkan lebih banyak partisi, meningkatkan paralelisme di sisi Spark dengan biaya tekanan tambahan pada SelectDB.
read.fieldNoneTidakKolom yang akan dibaca dari tabel SelectDB. Pisahkan beberapa kolom dengan koma.
batch.size1024TidakJumlah maksimum baris yang dibaca dari node backend per permintaan. Nilai yang lebih besar mengurangi jumlah koneksi dan overhead latensi jaringan.
exec.mem.limit2147483648TidakBatas memori untuk satu kueri, dalam byte. Default adalah 2 GB.
deserialize.arrow.asyncfalseTidakApakah data Arrow dideserialisasi ke RowBatch secara asinkron.
deserialize.queue.size64TidakUkuran antrian internal untuk deserialisasi Arrow asinkron. Hanya berlaku ketika deserialize.arrow.async bernilai true.
filter.query.in.max.count100TidakJumlah maksimum nilai dalam klausa IN untuk penurunan predikat. Jika jumlah nilai melebihi ambang batas ini, Spark menangani filter secara lokal.
ignore-typeNoneTidakJenis bidang yang diabaikan saat membaca skema untuk tampilan temporary. Contoh: bitmap,hll.

Parameter tulis

ParameterDefaultWajibDeskripsi
write.fieldsNoneTidakBidang yang akan ditulis ke tabel SelectDB, atau urutan penulisan bidang. Pisahkan beberapa bidang dengan koma. Default-nya adalah semua bidang sesuai urutan kolom tabel.
sink.batch.size100000TidakJumlah maksimum baris yang ditulis ke backend per batch.
sink.max-retries0TidakJumlah maksimum percobaan ulang setelah kegagalan penulisan.
sink.properties.formatcsvTidakFormat data untuk Stream Load. Nilai yang valid: csv, json, arrow.
sink.properties.*--TidakParameter Stream Load, diteruskan langsung ke pekerjaan Stream Load yang mendasarinya. Misalnya, atur sink.properties.column_separator untuk menentukan pemisah kolom. Untuk semua parameter yang tersedia, lihat Impor data menggunakan Stream Load.
sink.task.partition.sizeNoneTidakJumlah partisi untuk menulis ke SelectDB. Gunakan ini untuk mengurangi frekuensi penulisan ketika penyaringan RDD menghasilkan banyak partisi kecil. Gunakan bersama dengan sink.task.use.repartition.
sink.task.use.repartitionfalseTidakApakah akan merepartisi data sebelum menulis. false menggunakan coalesce (tanpa shuffle). true menggunakan repartition, yang menghasilkan partisi lebih merata tetapi menambahkan overhead shuffle.
sink.batch.interval.ms50TidakInterval antar batch penulisan, dalam milidetik.
sink.enable-2pcfalseTidakApakah akan menggunakan commit dua fase (2PC). Saat diaktifkan, transaksi hanya dikomit setelah seluruh pekerjaan Spark selesai. Jika ada tugas yang gagal, semua transaksi yang telah diprekomit akan di-rollback.
sink.auto-redirecttrueTidakApakah akan mengarahkan ulang permintaan Stream Load melalui node frontend. Saat diaktifkan, Anda tidak perlu menentukan alamat node backend secara eksplisit.
sink.streaming.passthroughfalseTidak(Hanya DataFrame) Menulis nilai kolom pertama tanpa transformasi.

Rekomendasi produksi

Pertimbangkan pengaturan berikut untuk beban kerja produksi:

  • Pengiriman exactly-once: Aktifkan sink.enable-2pc=true untuk memastikan semua data dikomit hanya setelah pekerjaan Spark berhasil. Kegagalan tugas apa pun akan meng-rollback semua transaksi yang telah diprekomit, sehingga mencegah penulisan parsial.

  • Kontrol partisi: Jika penyaringan RDD menghasilkan banyak partisi kecil, atur sink.task.partition.size bersama dengan sink.task.use.repartition=true untuk menggabungkan partisi dan mengurangi frekuensi penulisan.

  • Percobaan ulang saat gagal: Atur sink.max-retries ke bilangan bulat positif (misalnya, 3) untuk secara otomatis mencoba ulang kegagalan penulisan sementara.

Contoh end-to-end

Contoh ini mengimpor data dari database MySQL ke ApsaraDB for SelectDB menggunakan Spark SQL dan DataFrame.

Lingkungan contoh:

Perangkat LunakJavaSparkScalaSelectDB
Versi1.83.1.22.123.0.4

Siapkan lingkungan

1. Konfigurasi Spark.

Unduh dan ekstrak paket instalasi Spark:

wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
tar xvzf spark-3.1.2-bin-hadoop3.2.tgz

Tempatkan spark-doris-connector-3.2_2.12-1.3.2.jar di direktori SPARK_HOME/jars.

2. Buat tabel sumber di MySQL.

CREATE TABLE `employees` (
  `emp_no` int NOT NULL,
  `birth_date` date NOT NULL,
  `first_name` varchar(14) NOT NULL,
  `last_name` varchar(16) NOT NULL,
  `gender` enum('M','F') NOT NULL,
  `hire_date` date NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;

Gunakan Data Management (DMS) untuk menghasilkan data uji.

3. Buat tabel target di SelectDB.

Hubungkan ke instans SelectDB Anda melalui protokol MySQL, lalu jalankan:

CREATE DATABASE test_db;

USE test_db;
CREATE TABLE employees (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`)
DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;

4. Ajukan permohonan titik akhir publik.

Ajukan permohonan titik akhir publik untuk instans SelectDB Anda. Lihat Ajukan atau rilis titik akhir publik.

5. Tambahkan IP host Spark Anda ke daftar putih.

Tambahkan alamat IP publik lingkungan Spark Anda ke daftar putih alamat IP SelectDB. Lihat Konfigurasi daftar putih alamat IP.

Impor menggunakan Spark SQL

  1. Jalankan shell Spark SQL:

    bin/spark-sql
  2. Kirimkan tugas impor:

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. Setelah pekerjaan selesai, login ke Konsol ApsaraDB for SelectDB untuk memverifikasi data yang diimpor.

Impor menggunakan DataFrame

  1. Jalankan shell Spark:

    bin/spark-shell
  2. Kirimkan tugas impor:

    val mysqlDF = spark.read.format("jdbc")
        .option("url", "jdbc:mysql://host:port/test_db")
        .option("dbtable", "employees")
        .option("driver", "com.mysql.jdbc.Driver")
        .option("user", "admin")
        .option("password", "****")
        .load()
    
    mysqlDF.write.format("doris")
        .option("fenodes", "host:httpPort")
        .option("table.identifier", "test_db.employees")
        .option("user", "admin")
        .option("password", "****")
        .option("sink.batch.size", 100000)
        .option("sink.max-retries", 3)
        .option("sink.properties.format", "json")
        .save()
  3. Setelah pekerjaan selesai, login ke Konsol ApsaraDB for SelectDB untuk memverifikasi data yang diimpor.

Langkah berikutnya