ApsaraDB for SelectDB terintegrasi dengan SeaTunnel. Anda dapat menggunakan SeaTunnel SelectDB Sink untuk mengimpor data tabel ke ApsaraDB for SelectDB. Topik ini menjelaskan cara menggunakan SeaTunnel SelectDB Sink untuk menyinkronkan data ke ApsaraDB for SelectDB.
Ikhtisar
SeaTunnel adalah platform integrasi data terdistribusi berperforma tinggi yang mudah digunakan dan mendukung sinkronisasi real-time sejumlah besar data. Anda dapat menggunakan SeaTunnel untuk membaca data dari sumber seperti MySQL, Hive, dan Kafka, lalu menulisnya ke ApsaraDB for SelectDB.
Prasyarat
Pastikan SeaTunnel versi 2.3.1 atau lebih baru telah diinstal.
Cara kerjanya
SeaTunnel memungkinkan Anda menulis data upstream dalam format JSON atau CSV ke ApsaraDB for SelectDB. Konfigurasi sink bervariasi tergantung pada format data.
Format JSON
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="json"
}
}
}Format CSV
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="csv"
file.column_separator=","
file.line_delimiter="\n"
}
}
}Tabel berikut menjelaskan parameter dalam konfigurasi sink.
Parameter | Diperlukan | Deskripsi |
load-url | Ya | Titik akhir dan Port HTTP yang digunakan untuk mengakses instans ApsaraDB for SelectDB. Untuk mendapatkan titik akhir virtual private cloud (VPC) atau titik akhir publik dan Port HTTP dari instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB dan buka halaman Instance Details dari instans yang ingin Anda lihat informasinya. Di bagian Network Information pada halaman Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter HTTP Port. Contoh: |
jdbc-url | Ya | Titik akhir dan Port MySQL yang digunakan untuk mengakses instans ApsaraDB for SelectDB. Untuk mendapatkan titik akhir VPC atau titik akhir publik dan Port MySQL dari instans ApsaraDB for SelectDB, lakukan operasi berikut: Masuk ke ApsaraDB for SelectDB dan buka halaman Instance Details dari instans yang ingin Anda lihat informasinya. Di bagian Network Information pada halaman Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter MySQL Port. Contoh: |
cluster-name | Ya | Nama kluster di instans ApsaraDB for SelectDB. Sebuah instans ApsaraDB for SelectDB mungkin berisi beberapa kluster. Pilih kluster berdasarkan kebutuhan bisnis Anda. |
username | Ya | Nama pengguna yang digunakan untuk menghubungkan ke instans ApsaraDB for SelectDB. |
password | Ya | Kata sandi yang digunakan untuk menghubungkan ke instans ApsaraDB for SelectDB. |
table.identifier | Ya | Nama tabel di instans ApsaraDB for SelectDB. Tentukan nama dalam format |
selectdb.config | Ya | Konfigurasi pekerjaan impor data.
|
sink.enable-delete | Tidak | Menentukan apakah akan mengaktifkan fitur penghapusan massal. Fitur ini hanya didukung untuk model kunci unik. |
sink.buffer-size | Tidak | Ukuran buffer maksimum. Satuan: byte. Nilai default setara dengan 10 MB. Jika ukuran buffer melebihi batas atas, semua konten dalam buffer akan dibuang ke Object Storage Service (OSS). Kami merekomendasikan Anda menggunakan nilai default. |
sink.buffer-count | Tidak | Jumlah maksimum catatan data yang dapat di-buffer. Nilai default: 10000. Jika jumlah catatan data yang di-buffer melebihi batas atas, semua konten dalam buffer akan dibuang ke OSS. Kami merekomendasikan Anda menggunakan nilai default. |
sink.max-retries | Tidak | Jumlah maksimum percobaan ulang dalam fase commit. Nilai default: 3. |
sink.enable-2pc | Tidak | Menentukan apakah akan mengaktifkan mode commit dua fase. Anda dapat mengaktifkan mode commit dua fase untuk memastikan semantik tepat-sekali. Nilai default: true. |
Contoh
Dalam contoh ini, SeaTunnel digunakan untuk mengimpor data dari database MySQL hulu ke ApsaraDB for SelectDB. Tabel berikut menjelaskan versi perangkat lunak dalam contoh.
Lingkungan | Versi |
Java Development Kit (JDK) | 1.8 |
SeaTunnel | 2.3.3 |
ApsaraDB for SelectDB | 3.0.4 |
Siapkan lingkungan
Konfigurasikan SeaTunnel.
Unduh dan ekstrak paket instalasi SeaTunnel. Dalam contoh ini, paket instalasi SeaTunnel apache-seatunnel-2.3.3-bin.tar.gz digunakan.
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz tar -xzvf apache-seatunnel-2.3.3-bin.tar.gzUbah file SEATUNNEL_HOME/config/plugin_config. Pertahankan hanya konektor yang diperlukan.
--connectors-v2-- connector-cdc-mysql connector-selectdb-cloud connector-jdbc connector-fake connector-console connector-assert --end--Instal plugin konektor SeaTunnel.
sh bin/install-plugin.shUnduh driver MySQL dan letakkan di direktori SEATUNNEL_HOME/jar.
cd lib/ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
Buat data yang akan diimpor. Dalam contoh ini, sejumlah kecil data sampel dibuat di database MySQL untuk diimpor.
Buat tabel uji di database MySQL.
CREATE TABLE `employees` ( `emp_no` INT NOT NULL, `birth_date` DATE NOT NULL, `first_name` VARCHAR(14) NOT NULL, `last_name` VARCHAR(16) NOT NULL, `gender` ENUM('M','F') NOT NULL, `hire_date` DATE NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3Gunakan Data Management (DMS) untuk menghasilkan data uji. Untuk informasi lebih lanjut, lihat Hasilkan data uji.
Konfigurasikan instans ApsaraDB for SelectDB.
Buat instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Buat instans.
Sambungkan ke instans ApsaraDB for SelectDB melalui protokol MySQL. Untuk informasi lebih lanjut, lihat Sambungkan ke instans.
Buat database uji dan tabel uji.
Buat database uji.
CREATE DATABASE test_db;Buat tabel uji.
USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Ajukan atau lepaskan titik akhir publik.
Tambahkan alamat IP publik SeaTunnel ke daftar putih alamat IP instans ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP.
Gunakan mesin SeaTunnel lokal untuk menyinkronkan data dari database MySQL ke instans ApsaraDB for SelectDB
Buat file konfigurasi
mysqlToSelectDB.confdan konfigurasikan informasi pekerjaan.env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ jdbc { url = "jdbc:mysql://host:ip/test_db" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "****" query = "select * from employees" } } sink { SelectDBCloud { load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" cluster-name="new_cluster" table.identifier="test_db.employees" username="admin" password="****" selectdb.config { file.type="json" } } }Kirim pekerjaan.
sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local