BitSail adalah mesin integrasi data terdistribusi yang mendukung sinkronisasi offline, real-time, penuh, dan inkremental dari berbagai sumber data heterogen—termasuk MySQL, Hive, dan Kafka. ApsaraDB for SelectDB terintegrasi dengan BitSail melalui konektor SelectDB Sink, yang menulis data langsung ke instans SelectDB Anda melalui API Stream Load HTTP.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
BitSail versi 0.1.0 atau lebih baru yang telah diinstal
Instans ApsaraDB for SelectDB dengan setidaknya satu kluster
Port HTTP dan port MySQL dari instans SelectDB Anda (lihat Dapatkan detail koneksi)
Cara kerja
Konektor SelectDB Sink membaca bagian job.writer pada konfigurasi pekerjaan BitSail Anda dan mengalirkan data ke SelectDB. Setiap tugas penulisan:
Melakukan autentikasi ke instans SelectDB menggunakan kredensial yang Anda berikan.
Menyimpan catatan masuk dalam buffer hingga mencapai ukuran buffer yang dikonfigurasi atau hingga interval flush berakhir.
Mengirimkan data yang dibuffer ke tabel target melalui pernyataan COPY INTO (format JSON secara default) atau Stream Load.
Mencoba ulang penulisan yang gagal hingga batas percobaan ulang yang dikonfigurasi sebelum pekerjaan gagal.
Dapatkan detail koneksi
Untuk mendapatkan nilai titik akhir dan port yang diperlukan oleh konektor:
Masuk ke Konsol ApsaraDB for SelectDB.
Buka halaman Instance Details dari instans Anda.
Pada halaman Basic Information, temukan bagian Network Information.
Salin nilai VPC Endpoint atau Public Endpoint, beserta nilai HTTP Port dan MySQL Port.
Konfigurasikan konektor SelectDB Sink
Tambahkan blok job.writer ke file konfigurasi pekerjaan BitSail Anda dan atur parameter berikut.
Parameter wajib
| Parameter | Deskripsi | Contoh |
|---|---|---|
class | Kelas konektor penulisan. Selalu atur nilai ini menjadi com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink. | com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink |
load_url | Titik akhir dan port HTTP dari instans SelectDB Anda. | selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080 |
jdbc_url | Titik akhir dan port MySQL dari instans SelectDB Anda. | selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
cluster_name | Nama kluster dalam instans SelectDB Anda. | new_cluster |
user | Username untuk menghubungkan ke instans SelectDB. | admin |
password | Password untuk menghubungkan ke instans SelectDB. | — |
table_identifier | Tabel target dalam format <database>.<table>. | test_db.test_table |
columns | Definisi kolom untuk tabel target, termasuk indeks, nama, dan tipe. | Lihat contoh di bawah. |
Parameter opsional
Perilaku penulisan
| Parameter | Default | Deskripsi |
|---|---|---|
sink_write_mode | — | Mode penulisan. Atur ke BATCH_UPSERT untuk mengaktifkan mode upsert batch. |
sink_flush_interval_ms | 5000 | Frekuensi (dalam milidetik) data yang dibuffer dikirim ke SelectDB dalam mode upsert. |
sink_buffer_size | 1048576 (1 MB) | Ukuran maksimum buffer per penulisan, dalam byte. |
sink_buffer_count | 3 | Jumlah buffer penulisan yang diinisialisasi. |
sink_max_retries | 3 | Jumlah maksimum percobaan ulang untuk penulisan yang gagal. |
sink_enable_delete | — | Atur ke true untuk meneruskan event DELETE ke SelectDB. |
writer_parallelism_num | — | Jumlah tugas penulisan paralel. |
Format data
| Parameter | Default | Deskripsi |
|---|---|---|
load_contend_type | JSON | Format yang digunakan oleh pernyataan COPY INTO. Nilai yang valid: CSV, JSON. |
csv_field_delimiter | , | Pemisah bidang saat load_contend_type bernilai CSV. |
csv_line_delimiter | \n | Pemisah baris saat load_contend_type bernilai CSV. |
stream_load_properties | — | Properti tambahan yang ditambahkan ke URL Stream Load, dalam format Map<String,String>. |
Impor data sintetis ke SelectDB
Contoh ini menggunakan konektor FakeSource bawaan BitSail untuk menghasilkan catatan sintetis dan menuliskannya ke tabel SelectDB. Gunakan contoh ini untuk memverifikasi konfigurasi konektor Anda sebelum menghubungkan ke sumber data sesungguhnya.
Langkah 1: Siapkan lingkungan Anda
Unduh dan ekstrak paket instalasi BitSail:
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gzDi Konsol ApsaraDB for SelectDB, lakukan langkah-langkah berikut:
Buat instans SelectDB jika Anda belum memilikinya.
Hubungkan ke instans melalui protokol MySQL.
Buat database dan tabel uji coba:
CREATE DATABASE test_db; CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" );Ajukan permohonan titik akhir publik untuk instans tersebut.
Tambahkan alamat IP host BitSail ke daftar putih alamat IP instans.
Langkah 2: Buat konfigurasi pekerjaan
Buat file bernama test.json dengan konten berikut. Ganti nilai load_url, jdbc_url, cluster_name, user, dan password dengan nilai milik Anda sendiri.
{
"job": {
"common": {
"job_id": -2413,
"job_name": "bitsail_fake_to_selectdb_test",
"instance_id": -20413,
"user_name": "user"
},
"reader": {
"class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
"total_count": 300,
"rate": 10000,
"random_null_rate": 0,
"unique_fields": "id",
"columns_with_fixed_value": [
{
"name": "partition_date",
"fixed_value": "2022-10-10"
}
],
"columns": [
{ "index": 0, "name": "id", "type": "long" },
{ "index": 1, "name": "bigint_type", "type": "long" },
{ "index": 2, "name": "string_type", "type": "string" },
{ "index": 3, "name": "double_type", "type": "double" },
{ "index": 4, "name": "decimal_type", "type": "double" },
{ "index": 5, "name": "date_type", "type": "date.date" },
{ "index": 6, "name": "partition_date", "type": "string" }
]
},
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080",
"jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030",
"cluster_name": "new_cluster",
"user": "admin",
"password": "****",
"table_identifier": "test_db.test_table",
"columns": [
{ "index": 0, "name": "id", "type": "bigint" },
{ "index": 1, "name": "bigint_type", "type": "bigint" },
{ "index": 2, "name": "string_type", "type": "varchar" },
{ "index": 3, "name": "double_type", "type": "double" },
{ "index": 4, "name": "decimal_type", "type": "double" },
{ "index": 5, "name": "date_type", "type": "date" },
{ "index": 6, "name": "partition_date", "type": "date" }
]
}
}
}Langkah 3: Kirim pekerjaan
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.jsonJika pekerjaan berhasil, lakukan kueri pada tabel target untuk memastikan baris-baris telah ditulis:
SELECT COUNT(*) FROM test_db.test_table;Hasilnya harus menunjukkan 300 baris.