Topik ini menjelaskan cara menggunakan konektor SelectDB kustom untuk menulis data ke ApsaraDB for SelectDB.
Informasi latar belakang
ApsaraDB for SelectDB adalah layanan gudang data real-time generasi berikutnya yang sepenuhnya dikelola dan di-hosting di Alibaba Cloud, serta 100% kompatibel dengan Apache Doris. Layanan ini memungkinkan Anda menganalisis sejumlah besar data secara efisien. Untuk informasi lebih lanjut tentang manfaat dan kasus penggunaannya, lihat Apa itu ApsaraDB for SelectDB.
Tabel berikut menguraikan kemampuan yang didukung oleh konektor SelectDB kustom.
Item | Deskripsi |
Jenis yang didukung | Tabel sink; sink ingesti data |
Mode operasi | Streaming dan batch |
Format data | JSON dan CSV |
Metrik | Tidak tersedia |
API | DataStream API dan SQL API |
Pembaruan/penghapusan data di sink | Didukung |
Fitur
Sinkronisasi database.
Semantik tepat-sekali, memastikan tidak ada duplikasi atau kehilangan data.
Kompatibilitas dengan Apache Doris versi 1.0 atau lebih baru, memungkinkan sinkronisasi data tanpa hambatan ke Apache Doris melalui konektor SelectDB kustom.
Catatan penggunaan
Hanya Ververica Runtime (VVR) versi 8.0.10 atau lebih baru yang mendukung konektor SelectDB kustom.
Jika Anda memiliki pertanyaan saat menggunakan konektor SelectDB kustom, ajukan tiket ke ApsaraDB for SelectDB.
Prasyarat untuk menyinkronkan data ke ApsaraDB for SelectDB adalah sebagai berikut:
Sebuah instans ApsaraDB for SelectDB telah dibuat. Untuk informasi lebih lanjut, lihat Buat Instans.
Daftar putih alamat IP telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Daftar Putih Alamat IP.
SQL
Konektor SelectDB dapat digunakan sebagai tabel sink dalam pekerjaan SQL.
Unggah dan konfigurasikan konektor
Mulai dari VVR 11.1, konektor SelectDB menjadi konektor bawaan, sehingga Anda dapat melewati langkah-langkah berikut.
Klik file JAR untuk mengunduh file JAR konektor SelectDB (versi 1.15 hingga 1.17).
Unggah file JAR konektor SelectDB ke Konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Kelola Konektor Kustom.
Buat draft SQL dan gunakan konektor SelectDB kustom.
Atur opsi
connectormenjadidoris. Untuk informasi tentang opsi sink lainnya, lihat Item Konfigurasi Sink Doris.
Sintaksis
CREATE TABLE selectdb_sink (
emp_no INT ,
birth_date DATE,
first_name STRING,
last_name STRING,
gender STRING,
hire_date DATE
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'test.employees',
'username' = 'admin',
'password' = '****',
'sink.enable-delete' = 'true'
);Pemetaan tipe data
Lihat bagian "Pemetaan Tipe" topik konektor Flink Doris di dokumentasi Doris.
Gunakan konektor
Bagian ini mengilustrasikan cara menyinkronkan data dari ApsaraDB RDS for MySQL ke ApsaraDB for SelectDB menggunakan konektor SelectDB kustom.
Persiapkan sinkronisasi data.
Buat ruang kerja Flink, instans ApsaraDB RDS for MySQL, dan instans ApsaraDB for SelectDB.
Di konsol ApsaraDB RDS for MySQL, buat database bernama
order_dw_mysqldan tabel bernamaorders, lalu impor data uji ke dalam tabel tersebut.CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);Setelah terhubung ke instans ApsaraDB for SelectDB menggunakan DMS, buat database bernama
selectdbdan tabel bernamaselecttable.CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;Tambahkan CIDR Block dari vSwitch di ruang kerja Flink Anda ke daftar putih alamat IP instans ApsaraDB for SelectDB Anda. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi daftar putih alamat IP?.
Di Konsol Realtime Compute for Apache Flink, kembangkan pekerjaan SQL dan mulailah.
Buat katalog MySQL bernama
mysqlcatalog. Untuk informasi lebih lanjut, lihat Kelola Katalog MySQL.Klik file JAR untuk mengunduh file JAR konektor SelectDB (versi 1.15 hingga 1.17), lalu unggah file JAR tersebut. Untuk informasi lebih lanjut, lihat Kelola Konektor Kustom.
Pergi ke , klik New untuk membuat draft stream kosong, dan salin kode berikut ke dalam draft:
CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;Klik Deploy dan mulai penerapan dalam mode awal. Untuk informasi lebih lanjut, lihat Buat Penerapan dan Mulai Penerapan.
Setelah terhubung ke instans ApsaraDB for SelectDB menggunakan DMS, kueri data di tabel
selecttable.SELECT * FROM `selecttable` ;
Ingesti data
Konektor SelectDB dapat digunakan sebagai sink ingesti data.
Sintaksis
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1
Opsi konfigurasi
Opsi | Deskripsi | Diperlukan? | Tipe Data | Nilai default | Catatan |
| Tipe sink. | Ya | String | Tidak ada nilai default | Atur ke |
| Nama sink. | Tidak | String | Tidak ada nilai default | |
| Titik akhir dan port HTTP dari instans ApsaraDB for SelectDB. | Ya | String | Tidak ada nilai default | Untuk mendapatkan titik akhir VPC atau publik dari instans SelectDB Anda, pergi ke ApsaraDB for SelectDB, klik nama instans Anda, dan temukan informasi di bagian Network Information. Contoh: |
| Alamat HTTP BE. | Tidak | String | Tidak ada nilai default | Contoh: |
| Informasi koneksi JDBC dari instans ApsaraDB for SelectDB. | Tidak | String | Tidak ada nilai default | Untuk mendapatkan titik akhir VPC atau publik dan port MySQL dari instans SelectDB Anda, pergi ke ApsaraDB for SelectDB, klik nama instans Anda, dan temukan informasi di bagian Network Information. Contoh: |
| Nama pengguna kluster dari instans ApsaraDB for SelectDB. | Ya | String | Tidak ada nilai default | |
| Kata sandi kluster dari instans ApsaraDB for SelectDB. | Tidak | String | Tidak ada nilai default | |
| Menentukan apakah akan mengalihkan permintaan stream load. Saat diaktifkan, stream load akan menulis data melalui FE tanpa secara eksplisit mendapatkan informasi BE. | Tidak | String |
| Apakah akan menulis melalui pengalihan FE dan langsung terhubung ke BE untuk menulis |
| Pengkodean karakter untuk klien HTTP. | Tidak | Boolean |
| |
| Menentukan apakah akan menggunakan mode batch untuk menulis ke SelectDB. Saat diaktifkan, penulisan tidak bergantung pada checkpoint, tetapi dikendalikan oleh Saat diaktifkan, semantik tepat-sekali tidak dijamin. Untuk mencapai idempotensi, gunakan model Unik. | Tidak | Boolean |
| |
| Ukuran antrian untuk penulisan batch. | Tidak | Integer |
| |
| Jumlah maksimum catatan yang akan dibuang dalam satu batch. | Tidak | Integer |
| |
| Jumlah maksimum byte yang akan dibuang dalam satu batch. | Tidak | Integer |
| |
| Interval pembuangan. Jika waktu ini terlampaui, data akan dibuang secara asinkron. Minimum: 1 detik. | Tidak | String | 10s | |
sink.properties. | Parameter impor untuk Stream Load. Silakan masukkan konfigurasi properti.
| Tidak | String | Tidak ada nilai default | Contoh: |
| Konfigurasi properti untuk pembuatan tabel. | Tidak | String | Tidak ada nilai default | Contoh: |
Pemetaan tipe data
Tipe Flink CDC | Tipe SelectDB |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) Catatan Dalam Doris, string dikodekan UTF-8, sehingga setiap karakter Inggris membutuhkan 1 byte dan setiap karakter Cina membutuhkan 3 byte. Oleh karena itu panjangnya dikalikan dengan 3 di sini. Panjang maksimum CHAR adalah 255. Jika terlampaui, CHAR akan otomatis berubah menjadi VARCHAR. |
VARCHAR(n) | VARCHAR(n*3) Catatan Dalam Doris, string dikodekan UTF-8, sehingga setiap karakter Inggris membutuhkan 1 byte dan setiap karakter Cina membutuhkan 3 byte. Oleh karena itu panjangnya dikalikan dengan 3 di sini. Panjang maksimum VARCHAR adalah 65533. Jika terlampaui, VARCHAR akan otomatis berubah menjadi STRING. |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |