全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor SelectDB

更新时间:Aug 07, 2025

Topik ini menjelaskan cara menggunakan konektor SelectDB kustom untuk menulis data ke ApsaraDB for SelectDB.

Informasi latar belakang

ApsaraDB for SelectDB adalah layanan gudang data real-time generasi berikutnya yang sepenuhnya dikelola dan di-hosting di Alibaba Cloud, serta 100% kompatibel dengan Apache Doris. Layanan ini memungkinkan Anda menganalisis sejumlah besar data secara efisien. Untuk informasi lebih lanjut tentang manfaat dan kasus penggunaannya, lihat Apa itu ApsaraDB for SelectDB.

Tabel berikut menguraikan kemampuan yang didukung oleh konektor SelectDB kustom.

Item

Deskripsi

Jenis yang didukung

Tabel sink; sink ingesti data

Mode operasi

Streaming dan batch

Format data

JSON dan CSV

Metrik

Tidak tersedia

API

DataStream API dan SQL API

Pembaruan/penghapusan data di sink

Didukung

Fitur

  • Sinkronisasi database.

  • Semantik tepat-sekali, memastikan tidak ada duplikasi atau kehilangan data.

  • Kompatibilitas dengan Apache Doris versi 1.0 atau lebih baru, memungkinkan sinkronisasi data tanpa hambatan ke Apache Doris melalui konektor SelectDB kustom.

Catatan penggunaan

  • Hanya Ververica Runtime (VVR) versi 8.0.10 atau lebih baru yang mendukung konektor SelectDB kustom.

  • Jika Anda memiliki pertanyaan saat menggunakan konektor SelectDB kustom, ajukan tiket ke ApsaraDB for SelectDB.

  • Prasyarat untuk menyinkronkan data ke ApsaraDB for SelectDB adalah sebagai berikut:

SQL

Konektor SelectDB dapat digunakan sebagai tabel sink dalam pekerjaan SQL.

Unggah dan konfigurasikan konektor

Catatan

Mulai dari VVR 11.1, konektor SelectDB menjadi konektor bawaan, sehingga Anda dapat melewati langkah-langkah berikut.

  1. Klik file JAR untuk mengunduh file JAR konektor SelectDB (versi 1.15 hingga 1.17).

  2. Unggah file JAR konektor SelectDB ke Konsol Realtime Compute for Apache Flink. Untuk informasi lebih lanjut, lihat Kelola Konektor Kustom.

  3. Buat draft SQL dan gunakan konektor SelectDB kustom.

    Atur opsi connector menjadi doris. Untuk informasi tentang opsi sink lainnya, lihat Item Konfigurasi Sink Doris.

Sintaksis

CREATE TABLE selectdb_sink (
  emp_no       INT ,
  birth_date   DATE,
  first_name   STRING,
  last_name    STRING,
  gender       STRING,
  hire_date    DATE
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'test.employees',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'true'
);

Pemetaan tipe data

Lihat bagian "Pemetaan Tipe" topik konektor Flink Doris di dokumentasi Doris.

Gunakan konektor

Bagian ini mengilustrasikan cara menyinkronkan data dari ApsaraDB RDS for MySQL ke ApsaraDB for SelectDB menggunakan konektor SelectDB kustom.

  1. Persiapkan sinkronisasi data.

    1. Buat ruang kerja Flink, instans ApsaraDB RDS for MySQL, dan instans ApsaraDB for SelectDB.

    2. Di konsol ApsaraDB RDS for MySQL, buat database bernama order_dw_mysql dan tabel bernama orders, lalu impor data uji ke dalam tabel tersebut.

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. Setelah terhubung ke instans ApsaraDB for SelectDB menggunakan DMS, buat database bernama selectdb dan tabel bernama selecttable.

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. Tambahkan CIDR Block dari vSwitch di ruang kerja Flink Anda ke daftar putih alamat IP instans ApsaraDB for SelectDB Anda. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi daftar putih alamat IP?.

  2. Di Konsol Realtime Compute for Apache Flink, kembangkan pekerjaan SQL dan mulailah.

    1. Buat katalog MySQL bernama mysqlcatalog. Untuk informasi lebih lanjut, lihat Kelola Katalog MySQL.

    2. Klik file JAR untuk mengunduh file JAR konektor SelectDB (versi 1.15 hingga 1.17), lalu unggah file JAR tersebut. Untuk informasi lebih lanjut, lihat Kelola Konektor Kustom.

    3. Pergi ke Development > ETL, klik New untuk membuat draft stream kosong, dan salin kode berikut ke dalam draft:

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. Klik Deploy dan mulai penerapan dalam mode awal. Untuk informasi lebih lanjut, lihat Buat Penerapan dan Mulai Penerapan.

  3. Setelah terhubung ke instans ApsaraDB for SelectDB menggunakan DMS, kueri data di tabel selecttable.

    SELECT * FROM `selecttable` ;

Ingesti data

Konektor SelectDB dapat digunakan sebagai sink ingesti data.

Sintaksis

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: 127.0.0.1:8030
   username: root
   password: ""
   table.create.properties.replication_num: 1

Opsi konfigurasi

Opsi

Deskripsi

Diperlukan?

Tipe Data

Nilai default

Catatan

type

Tipe sink.

Ya

String

Tidak ada nilai default

Atur ke doris.

name

Nama sink.

Tidak

String

Tidak ada nilai default

fenodes

Titik akhir dan port HTTP dari instans ApsaraDB for SelectDB.

Ya

String

Tidak ada nilai default

Untuk mendapatkan titik akhir VPC atau publik dari instans SelectDB Anda, pergi ke ApsaraDB for SelectDB, klik nama instans Anda, dan temukan informasi di bagian Network Information.

Contoh: selectdb-sg-***.selectdbfe.ap-southeast-6.rds.aliyuncs.com:8080.

benodes

Alamat HTTP BE.

Tidak

String

Tidak ada nilai default

Contoh: 127.0.0.1:8040

jdbc-url

Informasi koneksi JDBC dari instans ApsaraDB for SelectDB.

Tidak

String

Tidak ada nilai default

Untuk mendapatkan titik akhir VPC atau publik dan port MySQL dari instans SelectDB Anda, pergi ke ApsaraDB for SelectDB, klik nama instans Anda, dan temukan informasi di bagian Network Information.

Contoh: jdbc:mysql://selectdb-sg-***.selectdbfe.ap-southeast-6.rds.aliyuncs.com:9030.

username

Nama pengguna kluster dari instans ApsaraDB for SelectDB.

Ya

String

Tidak ada nilai default

password

Kata sandi kluster dari instans ApsaraDB for SelectDB.

Tidak

String

Tidak ada nilai default

auto-redirect

Menentukan apakah akan mengalihkan permintaan stream load. Saat diaktifkan, stream load akan menulis data melalui FE tanpa secara eksplisit mendapatkan informasi BE.

Tidak

String

false

Apakah akan menulis melalui pengalihan FE dan langsung terhubung ke BE untuk menulis

charset-encoding

Pengkodean karakter untuk klien HTTP.

Tidak

Boolean

UTF-8

sink.enable.batch-mode

Menentukan apakah akan menggunakan mode batch untuk menulis ke SelectDB. Saat diaktifkan, penulisan tidak bergantung pada checkpoint, tetapi dikendalikan oleh sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, dan sink.buffer-flush.interval.

Saat diaktifkan, semantik tepat-sekali tidak dijamin. Untuk mencapai idempotensi, gunakan model Unik.

Tidak

Boolean

true

sink.flush.queue-size

Ukuran antrian untuk penulisan batch.

Tidak

Integer

2

sink.buffer-flush.max-rows

Jumlah maksimum catatan yang akan dibuang dalam satu batch.

Tidak

Integer

50000

sink.buffer-flush.max-bytes

Jumlah maksimum byte yang akan dibuang dalam satu batch.

Tidak

Integer

10485760(10MB)

sink.buffer-flush.interval

Interval pembuangan. Jika waktu ini terlampaui, data akan dibuang secara asinkron. Minimum: 1 detik.

Tidak

String

10s

sink.properties.

Parameter impor untuk Stream Load. Silakan masukkan konfigurasi properti.

  • Untuk format CSV, konfigurasikan:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • Untuk format JSON, konfigurasikan:

    sink.properties.format='json' 

Tidak

String

Tidak ada nilai default

Contoh: sink.properties.strict_mode: true. Untuk informasi lebih lanjut, lihat Gunakan Stream Load untuk mengimpor data.

table.create.properties.*

Konfigurasi properti untuk pembuatan tabel.

Tidak

String

Tidak ada nilai default

Contoh: table.create.properties.replication_num: 1. Lihat juga Properti tabel Doris.

Pemetaan tipe data

Tipe Flink CDC

Tipe SelectDB

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

Catatan

Dalam Doris, string dikodekan UTF-8, sehingga setiap karakter Inggris membutuhkan 1 byte dan setiap karakter Cina membutuhkan 3 byte. Oleh karena itu panjangnya dikalikan dengan 3 di sini. Panjang maksimum CHAR adalah 255. Jika terlampaui, CHAR akan otomatis berubah menjadi VARCHAR.

VARCHAR(n)

VARCHAR(n*3)

Catatan

Dalam Doris, string dikodekan UTF-8, sehingga setiap karakter Inggris membutuhkan 1 byte dan setiap karakter Cina membutuhkan 3 byte. Oleh karena itu panjangnya dikalikan dengan 3 di sini. Panjang maksimum VARCHAR adalah 65533. Jika terlampaui, VARCHAR akan otomatis berubah menjadi STRING.

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING