全部产品
Search
文档中心

ApsaraDB for SelectDB:Gunakan Spark untuk mengimpor data

更新时间:Jul 30, 2025

ApsaraDB for SelectDB kompatibel dengan Apache Doris. Anda dapat menggunakan Spark Doris Connector untuk mengimpor sejumlah besar data dengan memanfaatkan kemampuan komputasi terdistribusi dari Spark. Topik ini menjelaskan cara kerja Spark Doris Connector dan bagaimana menggunakannya untuk mengimpor data ke ApsaraDB for SelectDB.

Ikhtisar

Anda dapat menggunakan Spark Doris Connector sebagai salah satu metode untuk mengimpor sejumlah besar data ke ApsaraDB for SelectDB. Dengan memanfaatkan kemampuan komputasi terdistribusi dari Spark, Anda dapat membaca data dalam jumlah besar dari sumber data hulu seperti MySQL, PostgreSQL, Sistem File Terdistribusi Hadoop (HDFS), dan Amazon Simple Storage Service (Amazon S3) ke dalam DataFrame, lalu mengimpor data tersebut ke tabel di ApsaraDB for SelectDB menggunakan Spark Doris Connector. Selain itu, Anda juga dapat membaca data dari tabel di ApsaraDB for SelectDB menggunakan Java Database Connectivity (JDBC) Spark.

Cara kerjanya

Gambar berikut menunjukkan cara menggunakan Spark Doris Connector untuk mengimpor data ke ApsaraDB for SelectDB. Dalam arsitektur yang ditunjukkan pada gambar, Spark Doris Connector berfungsi sebagai jembatan untuk menulis data eksternal ke ApsaraDB for SelectDB, serta memproses data tersebut menggunakan kluster komputasi terdistribusi. Ini mempercepat aliran data melalui seluruh tautan, sehingga menggantikan metode impor data tradisional berperforma rendah menggunakan JDBC.

Prasyarat

Versi Spark Doris Connector harus 1.3.1 atau lebih baru.

Instal dependensi Spark Doris Connector

Anda dapat menggunakan salah satu metode berikut untuk menginstal dependensi Spark Doris Connector:

  • Instal dependensi Maven Spark Doris Connector. Contoh kode berikut menunjukkan sebuah contoh. Untuk mendapatkan dependensi versi lainnya, kunjungi Maven Repository.

    <dependency>
        <groupId>org.apache.doris</groupId>
        <artifactId>spark-doris-connector-3.2_2.12</artifactId>
        <version>1.3.2</version>
    </dependency>
  • Unduh paket JAR Spark Doris Connector.

    Tabel berikut mencantumkan tiga paket umum untuk Spark Doris Connector. Unduh paket JAR sesuai dengan versi Spark Anda. Untuk mendapatkan dependensi versi lainnya, kunjungi Maven Repository.

    Catatan
    • Paket JAR berikut dikompilasi menggunakan Java 8. Jika Anda ingin menggunakan versi Java lainnya, hubungi dukungan teknis untuk ApsaraDB for SelectDB.

    • Versi paket JAR menunjukkan versi Spark yang didukung, versi Scala, dan versi Spark Doris Connector dari kiri ke kanan.

    Versi

    Paket JAR

    2.4-2.12-1.3.2

    spark-doris-connector-2.4_2.12-1.3.2

    3.1-2.12-1.3.2

    spark-doris-connector-3.1_2.12-1.3.2

    3.2-2.12-1.3.2

    spark-doris-connector-3.2_2.12-1.3.2

    Setelah mengunduh paket JAR, gunakan salah satu metode berikut untuk menjalankan Spark:

    • Untuk kluster Spark yang berjalan dalam mode kluster lokal, tempatkan paket JAR Spark Doris Connector di direktori jars pada direktori instalasi Spark.

    • Untuk kluster Spark yang berjalan dalam mode kluster YARN, unggah paket JAR Spark Doris Connector sebagai paket pra-penerapan. Contoh:

      1. Unggah paket spark-doris-connector-3.2_2.12-1.3.2.jar ke HDFS kluster Spark.

        hdfs dfs -mkdir /spark-jars/ 
        hdfs dfs -put /<your_local_path>/spark-doris-connector-3.2_2.12-1.3.2.jar/spark-jars/
      2. Tambahkan dependensi pada paket spark-doris-connector-3.2_2.12-1.3.2.jar ke kluster.

        spark.yarn.jars=hdfs:///spark-jars/spark-doris-connector-3.2_2.12-1.3.2.jar

Prosedur

Setelah menjalankan Spark pada klien Spark atau mengimpor paket Spark Doris Connector ke lingkungan pengembangan Spark, Anda dapat menyinkronkan data menggunakan Spark SQL atau DataFrame. Contoh berikut menunjukkan cara mengimpor data hulu ke instans ApsaraDB for SelectDB.

Gunakan Spark SQL

val selectdbHttpPort = "selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080"
val selectdbJdbc = "jdbc:mysql://selectdb-cn-****.selectdbfe.rds.aliyuncs.com:9030"
val selectdbUser = "admin"
val selectdbPwd = "****"
val selectdbTable = "test_db.test_order"
  
CREATE TEMPORARY VIEW test_order
USING doris
OPTIONS(
 "table.identifier"="${selectdbTable}",
 "fenodes"="${selectdbHttpPort}",
 "user"="${selectdbUser}",
 "password"="${selectdbPwd}",
 "sink.properties.format"="json"
);

INSERT INTO test_order SELECT order_id,order_amount,order_status FROM tmp_tb;

Parameter

Parameter

Nilai default

Diperlukan

Deskripsi

fenodes

Tidak ada

Ya

Titik akhir yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB melalui HTTP.

Untuk mendapatkan titik akhir Virtual Private Cloud (VPC) atau titik akhir publik dan port HTTP dari instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB dan buka halaman Instance Details dari instans yang ingin Anda lihat informasinya. Di bagian Network Information tab Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter HTTP Port.

Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

Tidak ada

Ya

Nama tabel di instans ApsaraDB for SelectDB. Format: Nama database.Nama tabel. Contoh: test_db.test_table.

request.retries

3

Tidak

Jumlah maksimum percobaan ulang yang diizinkan untuk mengirim permintaan ke instans ApsaraDB for SelectDB.

request.connect.timeout.ms

30000

Tidak

Periode waktu habis koneksi untuk permintaan ke instans ApsaraDB for SelectDB.

request.read.timeout.ms

30000

Tidak

Periode waktu habis baca untuk permintaan ke instans ApsaraDB for SelectDB.

request.query.timeout.s

3600

Tidak

Periode waktu habis kueri untuk instans ApsaraDB for SelectDB. Nilai default menunjukkan satu jam. Nilai -1 menunjukkan periode waktu habis tanpa batas.

request.tablet.size

Integer.MAX_VALUE

Tidak

Jumlah tablet ApsaraDB for SelectDB yang sesuai dengan partisi RDD.

Semakin kecil nilainya, semakin banyak partisi yang dihasilkan. Ini meningkatkan paralelisme di sisi Spark, tetapi juga memberi lebih banyak tekanan pada ApsaraDB for SelectDB.

read.field

Tidak ada

Tidak

Kolom di tabel ApsaraDB for SelectDB yang ingin Anda baca datanya. Pisahkan beberapa kolom dengan koma (,).

batch.size

1024

Tidak

Jumlah maksimum baris yang dapat dibaca dari backend sekaligus. Jika lebih banyak baris dibaca dari backend sekaligus, jumlah koneksi yang dibuat antara Spark dan ApsaraDB for SelectDB berkurang. Ini mengurangi overhead waktu tambahan yang disebabkan oleh latensi jaringan.

exec.mem.limit

2147483648

Tidak

Ambang batas memori untuk satu kueri. Nilai default menunjukkan 2 GB. Unit: byte.

deserialize.arrow.async

false

Tidak

Menentukan apakah akan secara asinkron mendeserialisasi data Arrow ke RowBatch yang diperlukan untuk iterasi Spark Doris Connector.

deserialize.queue.size

64

Tidak

Ukuran antrian pemrosesan internal untuk mendeserialisasi data Arrow secara asinkron. Parameter ini hanya berlaku jika parameter deserialize.arrow.async diatur ke true.

write.fields

Tidak ada

Tidak

Bidang yang ingin Anda tulis ke tabel ApsaraDB for SelectDB atau urutan penulisan bidang. Pisahkan beberapa bidang dengan koma (,). Secara default, semua bidang ditulis ke tabel ApsaraDB for SelectDB berdasarkan urutan bidang di tabel ApsaraDB for SelectDB.

sink.batch.size

100000

Tidak

Jumlah maksimum baris yang dapat ditulis ke backend sekaligus.

sink.max-retries

0

Tidak

Jumlah maksimum percobaan ulang setelah data gagal ditulis ke backend.

sink.properties.format

csv

Tidak

Format data yang didukung oleh Stream Load. Nilai valid: csv, json, dan arrow.

sink.properties.*

--

Tidak

Parameter yang digunakan untuk mengirimkan pekerjaan Stream Load. Misalnya, Anda dapat menentukan 'sink.properties.column_separator' = ',' untuk menentukan pemisah kolom. Untuk informasi lebih lanjut tentang parameter, lihat Gunakan Stream Load untuk mengimpor data.

sink.task.partition.size

Tidak ada

Tidak

Jumlah partisi tempat data ditulis di instans ApsaraDB for SelectDB. Jika Anda melakukan operasi seperti penyaringan pada RDD di Spark, jumlah partisi tempat data ditulis mungkin bertambah, tetapi setiap partisi hanya berisi sejumlah kecil catatan. Ini meningkatkan frekuensi operasi tulis dan membuang-buang sumber daya komputasi.

Nilai yang lebih kecil menunjukkan frekuensi yang lebih rendah untuk menulis data ke instans ApsaraDB for SelectDB. Ini mengurangi tekanan penggabungan data untuk instans ApsaraDB for SelectDB. Anda harus menggunakan parameter ini dengan parameter sink.task.use.repartition.

sink.task.use.repartition

false

Tidak

Menentukan apakah akan mempartisi ulang data ke sejumlah partisi tertentu sebelum data ditulis ke instans ApsaraDB for SelectDB. Nilai default: false, yang menunjukkan bahwa coalesce digunakan. Jika tidak ada tindakan yang dipanggil sebelum operasi tulis, paralelisme komputasi mungkin kurang optimal.

Jika parameter ini diatur ke true, repartitioning diaktifkan. Anda dapat menentukan jumlah partisi tempat data dipartisi ulang. Ini meningkatkan overhead shuffle.

sink.batch.interval.ms

50

Tidak

Interval penulisan data ke sink. Unit: milidetik.

sink.enable-2pc

false

Tidak

Menentukan apakah protokol komit dua fase (2PC) digunakan untuk menulis data. Jika Anda mengatur parameter ini ke true, transaksi akan dikomit setelah pekerjaan Spark selesai. Jika beberapa tugas gagal, semua transaksi yang telah dikomit sebelumnya akan dibatalkan.

sink.auto-redirect

true

Tidak

Menentukan apakah permintaan Stream Load dialihkan. Jika Anda mengatur parameter ini ke true, permintaan Stream Load dimuat menggunakan frontend. Dengan cara ini, Anda tidak perlu secara eksplisit mendapatkan informasi backend.

user

Tidak ada

Ya

Nama pengguna yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB.

password

Tidak ada

Ya

Kata sandi yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB.

filter.query.in.max.count

100

Tidak

Jumlah maksimum nilai yang dapat disertakan dalam klausa IN saat Spark melakukan penurunan predikat. Jika jumlah nilai dalam klausa IN melebihi ambang batas yang ditentukan, operasi filter ditangani oleh Spark.

ignore-type

Tidak ada

Tidak

Tipe bidang yang ingin Anda abaikan saat membaca skema untuk tampilan sementara.

Contoh: 'ignore-type'='bitmap,hll'

Gunakan DataFrame

val spark = SparkSession.builder().master("local[1]").getOrCreate()
val df = spark.createDataFrame(Seq(
  ("1", 100, "Pending Payment"),
  ("2", 200, null),
  ("3", 300, "Received")
)).toDF("order_id", "order_amount", "order_status")

df.write
  .format("doris")
  .option("fenodes", selectdbHttpPort)
  .option("table.identifier", selectdbTable)
  .option("user", selectdbUser)
  .option("password", selectdbPwd)
  .option("sink.batch.size", 100000)
  .option("sink.max-retries", 3)
  .option("sink.properties.file.column_separator", "\t")
  .option("sink.properties.file.line_delimiter", "\n")
  .save()

Parameter

Parameter

Nilai default

Diperlukan

Deskripsi

fenodes

Tidak ada

Ya

Titik akhir yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB melalui HTTP.

Untuk mendapatkan titik akhir VPC atau titik akhir publik dan port HTTP dari instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB dan buka halaman Instance Details dari instans yang ingin Anda lihat informasinya. Di bagian Network Information tab Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter HTTP Port.

Contoh: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

Tidak ada

Ya

Nama tabel di instans ApsaraDB for SelectDB. Format: Nama database.Nama tabel. Contoh: test_db.test_table.

request.retries

3

Tidak

Jumlah maksimum percobaan ulang yang diizinkan untuk mengirim permintaan ke instans ApsaraDB for SelectDB.

request.connect.timeout.ms

30000

Tidak

Periode waktu habis koneksi untuk permintaan ke instans ApsaraDB for SelectDB.

request.read.timeout.ms

30000

Tidak

Periode waktu habis baca untuk permintaan ke instans ApsaraDB for SelectDB.

request.query.timeout.s

3600

Tidak

Periode waktu habis kueri untuk instans ApsaraDB for SelectDB. Nilai default menunjukkan satu jam. Nilai -1 menunjukkan periode waktu habis tanpa batas.

request.tablet.size

Integer.MAX_VALUE

Tidak

Jumlah tablet ApsaraDB for SelectDB yang sesuai dengan partisi RDD.

Semakin kecil nilainya, semakin banyak partisi yang dihasilkan. Ini meningkatkan paralelisme di sisi Spark, tetapi juga memberi lebih banyak tekanan pada ApsaraDB for SelectDB.

read.field

Tidak ada

Tidak

Kolom di tabel ApsaraDB for SelectDB yang ingin Anda baca datanya. Pisahkan beberapa kolom dengan koma (,).

batch.size

1024

Tidak

Jumlah maksimum baris yang dapat dibaca dari backend sekaligus. Jika lebih banyak baris dibaca dari backend sekaligus, jumlah koneksi yang dibuat antara Spark dan ApsaraDB for SelectDB berkurang. Ini mengurangi overhead waktu tambahan yang disebabkan oleh latensi jaringan.

exec.mem.limit

2147483648

Tidak

Ambang batas memori untuk satu kueri. Nilai default menunjukkan 2 GB. Unit: byte.

deserialize.arrow.async

false

Tidak

Menentukan apakah akan secara asinkron mendeserialisasi data Arrow ke RowBatch yang diperlukan untuk iterasi Spark Doris Connector.

deserialize.queue.size

64

Tidak

Ukuran antrian pemrosesan internal untuk mendeserialisasi data Arrow secara asinkron. Parameter ini hanya berlaku jika parameter deserialize.arrow.async diatur ke true.

write.fields

Tidak ada

Tidak

Bidang yang ingin Anda tulis ke tabel ApsaraDB for SelectDB atau urutan penulisan bidang. Pisahkan beberapa bidang dengan koma (,). Secara default, semua bidang ditulis ke tabel ApsaraDB for SelectDB berdasarkan urutan bidang di tabel ApsaraDB for SelectDB.

sink.batch.size

100000

Tidak

Jumlah maksimum baris yang dapat ditulis ke backend sekaligus.

sink.max-retries

0

Tidak

Jumlah maksimum percobaan ulang setelah data gagal ditulis ke backend.

sink.properties.format

csv

Tidak

Format data yang didukung oleh Stream Load. Nilai valid: csv, json, dan arrow.

sink.properties.*

--

Tidak

Parameter yang digunakan untuk mengirimkan pekerjaan Stream Load. Misalnya, Anda dapat menentukan 'sink.properties.column_separator' = ',' untuk menentukan pemisah kolom. Untuk parameter lainnya, lihat Gunakan Stream Load untuk mengimpor data

sink.task.partition.size

Tidak ada

Tidak

Jumlah partisi tempat data ditulis di instans ApsaraDB for SelectDB. Jika Anda melakukan operasi seperti penyaringan pada RDD di Spark, jumlah partisi tempat data ditulis mungkin bertambah, tetapi setiap partisi hanya berisi sejumlah kecil catatan. Ini meningkatkan frekuensi operasi tulis dan membuang-buang sumber daya komputasi.

Nilai yang lebih kecil menunjukkan frekuensi yang lebih rendah untuk menulis data ke instans ApsaraDB for SelectDB. Ini mengurangi tekanan penggabungan data untuk instans ApsaraDB for SelectDB. Anda harus menggunakan parameter ini dengan parameter sink.task.use.repartition.

sink.task.use.repartition

false

Tidak

Menentukan apakah akan mempartisi ulang data ke sejumlah partisi tertentu sebelum data ditulis ke instans ApsaraDB for SelectDB. Nilai default: false, yang menunjukkan bahwa coalesce digunakan. Jika tidak ada tindakan yang dipanggil sebelum operasi tulis, paralelisme komputasi mungkin kurang optimal.

Jika parameter ini diatur ke true, repartitioning diaktifkan. Anda dapat menentukan jumlah partisi tempat data dipartisi ulang. Ini meningkatkan overhead shuffle.

sink.batch.interval.ms

50

Tidak

Interval penulisan data ke sink. Unit: milidetik.

sink.enable-2pc

false

Tidak

Menentukan apakah protokol 2PC digunakan untuk menulis data. Jika Anda mengatur parameter ini ke true, transaksi akan dikomit setelah pekerjaan Spark selesai. Jika beberapa tugas gagal, semua transaksi yang telah dikomit sebelumnya akan dibatalkan.

sink.auto-redirect

true

Tidak

Menentukan apakah permintaan Stream Load dialihkan. Jika Anda mengatur parameter ini ke true, permintaan Stream Load dimuat menggunakan frontend. Dengan cara ini, Anda tidak perlu secara eksplisit mendapatkan informasi backend.

user

Tidak ada

Ya

Nama pengguna yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB.

password

Tidak ada

Ya

Kata sandi yang digunakan untuk terhubung ke instans ApsaraDB for SelectDB.

filter.query.in.max.count

100

Tidak

Jumlah maksimum nilai yang dapat disertakan dalam klausa IN saat Spark melakukan penurunan predikat. Jika jumlah nilai dalam klausa IN melebihi ambang batas yang ditentukan, operasi filter ditangani oleh Spark.

ignore-type

Tidak ada

Tidak

Tipe bidang yang ingin Anda abaikan saat membaca skema untuk tampilan sementara.

Contoh: 'ignore-type'='bitmap,hll'

sink.streaming.passthrough

false

Tidak

Menulis nilai di kolom pertama tanpa pemrosesan.

Contoh

Tabel berikut mencantumkan versi semua perangkat lunak dalam lingkungan contoh.

Perangkat Lunak

Java

Spark

Scala

SelectDB

Versi

1.8

3.1.2

2.12

3.0.4

Siapkan lingkungan

  • Konfigurasikan lingkungan Spark.

    1. Unduh dan dekompres paket instalasi Spark. Dalam contoh ini, paket instalasi Spark spark-3.1.2-bin-hadoop3.2.tgz digunakan.

      wget https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
      tar xvzf spark-3.1.2-bin-hadoop3.2.tgz
    2. Letakkan file spark-doris-connector-3.2_2.12-1.3.2.jar di direktori SPARK_HOME/jars.

  • Buat data yang akan diimpor. Dalam contoh ini, sejumlah kecil data diimpor dari database MySQL.

    1. Buat tabel uji di database MySQL.

      CREATE TABLE `employees` (
        `emp_no` int NOT NULL,
        `birth_date` date NOT NULL,
        `first_name` varchar(14) NOT NULL,
        `last_name` varchar(16) NOT NULL,
        `gender` enum('M','F') NOT NULL,
        `hire_date` date NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. Gunakan Data Management (DMS) untuk menghasilkan data uji. Untuk informasi lebih lanjut, lihat Hasilkan Data Uji.

  • Konfigurasikan instans ApsaraDB for SelectDB.

    1. Buat instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Buat Instans.

    2. Sambungkan ke instans ApsaraDB for SelectDB melalui protokol MySQL. Untuk informasi lebih lanjut, lihat Sambungkan ke Instans ApsaraDB for SelectDB.

    3. Buat database uji dan tabel uji.

      1. Buat database uji.

        CREATE DATABASE test_db;
      2. Buat tabel uji.

        USE test_db;
        CREATE TABLE employees (
            emp_no       int NOT NULL,
            birth_date   date,
            first_name   varchar(20),
            last_name    varchar(20),
            gender       char(2),
            hire_date    date
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 32;
    4. Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Ajukan atau Lepaskan Titik Akhir Publik.

    5. Tambahkan alamat IP publik lingkungan Spark ke daftar putih alamat IP instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.

Impor data dari database MySQL ke instans ApsaraDB for SelectDB

Gunakan Spark SQL

Contoh berikut menunjukkan cara menggunakan Spark SQL untuk mengimpor data dari database MySQL hulu ke instans ApsaraDB for SelectDB.

  1. Mulai layanan spark-sql.

    bin/spark-sql
  2. Kirim tugas di spark-sql.

    CREATE TEMPORARY VIEW mysql_tbl
    USING jdbc
    OPTIONS(
     "url"="jdbc:mysql://host:port/test_db",
     "dbtable"="employees",
     "driver"="com.mysql.jdbc.Driver",
     "user"="admin",
     "password"="****"
    );
    
    CREATE TEMPORARY VIEW selectdb_tbl
    USING doris
    OPTIONS(
     "table.identifier"="test_db.employees",
     "fenodes"="selectdb-cn-****-public.selectdbfe.rds.aliyuncs.com:8080",
     "user"="admin",
     "password"="****",
     "sink.properties.format"="json"
    );
    
    INSERT INTO selectdb_tbl SELECT emp_no, birth_date, first_name, last_name, gender, hire_date FROM mysql_tbl;
  3. Setelah tugas Spark selesai, masuk ke ApsaraDB for SelectDB untuk melihat data yang diimpor menggunakan Spark.

Gunakan DataFrame

Contoh berikut menunjukkan cara menggunakan DataFrame untuk mengimpor data dari database MySQL hulu ke instans ApsaraDB for SelectDB.

  1. Mulai layanan spark-shell.

    bin/spark-shell
  2. Kirim tugas di spark-shell.

    val mysqlDF = spark.read.format("jdbc")
    	.option("url", "jdbc:mysql://host:port/test_db")
    	.option("dbtable", "employees")
    	.option("driver", "com.mysql.jdbc.Driver")
    	.option("user", "admin")
    	.option("password", "****")
    	.load()
    
    mysqlDF.write.format("doris")
    	.option("fenodes", "host:httpPort")
    	.option("table.identifier", "test_db.employees")
    	.option("user", "admin")
    	.option("password", "****")
    	.option("sink.batch.size", 100000)
    	.option("sink.max-retries", 3)
    	.option("sink.properties.format", "json")
    	.save()
  3. Setelah tugas Spark selesai, masuk ke ApsaraDB for SelectDB untuk melihat data yang diimpor menggunakan Spark.