全部产品
Search
文档中心

ApsaraDB for SelectDB:Gunakan Flink untuk mengimpor data

更新时间:Jul 30, 2025

ApsaraDB for SelectDB sepenuhnya kompatibel dengan Apache Doris. Anda dapat menggunakan Flink Doris Connector untuk mengimpor data historis dari sumber data seperti MySQL, Oracle, PostgreSQL, SQL Server, dan Kafka ke SelectDB. Jika Anda membuat pekerjaan Flink yang menggunakan konektor penangkapan perubahan data (CDC) Flink, data inkremental juga disinkronkan dari sumber data ke SelectDB.

Ikhtisar

Catatan
  • Anda hanya dapat menggunakan Flink Doris Connector untuk menulis data ke sebuah SelectDB instance. Jika Anda ingin menggunakan Flink Doris Connector untuk terhubung ke node di kluster backend (BE) dari instance SelectDB untuk membaca data secara efisien, Anda harus menghubungi dukungan teknis dari SelectDB untuk meminta izin yang diperlukan.

  • Anda juga dapat menggunakan Flink JDBC Connector untuk menulis data ke sebuah SelectDB instance.

Flink Doris Connector digunakan untuk menghubungkan Apache Flink ke Apache Doris. Anda dapat menggunakan Flink Doris Connector untuk membaca data dari dan menulis data ke Apache Doris untuk pemrosesan dan analisis data real-time. Flink Doris Connector umumnya digunakan untuk mengimpor data ke SelectDB dalam mode streaming karena SelectDB sepenuhnya kompatibel dengan Apache Doris.

Kemampuan pemrosesan aliran Flink diimplementasikan berdasarkan komponen sumber, transformasi, dan sink.

  • Sumber:

    • Membaca aliran data dari sistem eksternal. Sistem eksternal mencakup antrian pesan seperti Apache Kafka, database, dan sistem file.

    • Sebagai contoh, sumber dapat digunakan untuk membaca pesan dari Kafka secara real-time atau membaca data dari file.

  • Transformasi:

    • Memproses dan mentransformasi aliran data masukan selama tahap transformasi. Operasi transformasi mencakup penyaringan, pemetaan, agregasi, dan mendefinisikan jendela.

    • Sebagai contoh, operasi pemetaan dapat dilakukan pada aliran data masukan untuk mengonversi struktur data atau data dapat di-aggregat untuk menghitung metrik tertentu per menit.

  • Sink:

    • Mengirimkan aliran data yang telah diproses ke sistem eksternal. Sink dapat digunakan untuk menulis data ke database, file, dan antrian pesan.

    • Sebagai contoh, sink dapat digunakan untuk menulis data yang telah diproses ke database MySQL atau topik Kafka.

Gambar berikut menunjukkan cara data diimpor ke SelectDB dengan menggunakan Flink Doris Connector.

Prasyarat

  • Sumber data dan Flink terhubung ke SelectDB. Untuk menetapkan koneksi jaringan, lakukan langkah-langkah berikut:

    1. Ajukan titik akhir publik untuk instance ApsaraDB for SelectDB yang ingin Anda gunakan. Untuk informasi lebih lanjut, lihat Ajukan atau lepaskan titik akhir publik.

      Lewati langkah ini jika Anda menggunakan Realtime Compute for Apache Flink dan sumber data yang disediakan oleh Alibaba Cloud atau open source Flink dan sumber data yang diterapkan pada instance Elastic Compute Service (ECS) dan layanan Alibaba Cloud atau instance ECS berada di virtual private cloud (VPC) yang sama dengan instance ApsaraDB for SelectDB.

    2. Tambahkan alamat IP terkait Flink dan sumber data ke daftar putih instance ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP.

  • Flink Doris Connector telah diinstal.

    Tabel berikut menjelaskan persyaratan versi.

    Versi Flink

    Versi Flink Doris Connector

    Tautan unduhan

    Realtime Compute for Apache Flink: V1.17 atau lebih baru

    Open source Flink: V1.15 atau lebih baru

    V1.5.2 atau lebih baru

    Flink Doris Connector

    Untuk informasi lebih lanjut, lihat Instal Flink Doris Connector.

Instal Flink Doris Connector

Anda dapat menggunakan salah satu metode berikut untuk menginstal Flink Doris Connector berdasarkan kebutuhan bisnis Anda:

  • Unggah, gunakan, dan perbarui Flink Doris Connector dengan menggunakan fitur konektor kustom Realtime Compute for Apache Flink. Metode ini berlaku jika Anda mengimpor data dengan menggunakan Realtime Compute for Apache Flink.SelectDB Untuk informasi lebih lanjut, lihat Kelola konektor kustom.

  • Unduh paket JAR versi yang diperlukan dari Flink Doris Connector ke direktori lib direktori instalasi Flink. Metode ini berlaku jika Anda menggunakan kluster open source Flink yang dikelola sendiri. Untuk mengunduh paket JAR, kunjungi org/apache/doris.

  • Instal dependensi Maven dari Flink Doris Connector. Contoh kode sampel berikut memberikan contoh. Untuk mendapatkan dependensi untuk versi lainnya, kunjungi org/apache/doris.

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  

Contoh

Lingkungan sampel

Contoh berikut menunjukkan cara mengimpor data dari tabel employees di database test di instance ApsaraDB RDS for MySQL ke tabel employees di database test di instance SelectDB dengan menggunakan Flink SQL, Flink CDC, dan DataStream. Anda dapat memodifikasi parameter yang sesuai berdasarkan kebutuhan bisnis Anda. Lingkungan sampel:

  • Kluster standalone Flink versi 1.16

  • Java

  • Database tujuan: test

  • Tabel tujuan: employees

  • Database sumber: test

  • Tabel sumber: employees

Persiapkan lingkungan

Lingkungan Flink

  1. Siapkan lingkungan Java.

    Jalannya Flink bergantung pada lingkungan Java. Oleh karena itu, Anda harus menginstal Java Development Kit (JDK) dan mengonfigurasi variabel lingkungan JAVA_HOME.

    Versi Java yang diperlukan terkait dengan versi Flink. Untuk informasi lebih lanjut tentang versi Java yang didukung oleh Flink, lihat Kompatibilitas Java. Dalam contoh ini, Java 8 diinstal. Untuk informasi lebih lanjut, lihat bagian "Langkah 2: Instal JDK" dari topik Deploy lingkungan web Java pada instance yang menjalankan Alibaba Cloud Linux 2, Alibaba Cloud Linux 3, atau CentOS 7.x.

  2. Unduh paket instalasi Flink flink-1.16.3-bin-scala_2.12.tgz. Jika versi ini telah kedaluwarsa, unduh versi lainnya di Situs resmi Apache Flink.

    wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  3. Dekompresi paket instalasi.

    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  4. Pergi ke direktori lib direktori instalasi Flink untuk menginstal konektor yang diperlukan.

    • Instal Flink Doris Connector.

      wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
    • Instal Flink MySQL Connector.

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
  5. Mulai kluster Flink.

    Jalankan perintah berikut di direktori bin direktori instalasi Flink:

    ./start-cluster.sh 

Tabel tujuan dan database di ApsaraDB for SelectDB

  1. Buat instance ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Buat instance.

  2. Hubungkan ke instance ApsaraDB for SelectDB. Untuk informasi lebih lanjut, lihat Hubungkan ke instance.

  3. Buat database uji bernama test.

    CREATE DATABASE test;
  4. Buat tabel uji bernama employees.

    USE test;
    
    -- Buat tabel.
    CREATE TABLE employees (
        emp_no       int NOT NULL,
        birth_date   date,
        first_name   varchar(20),
        last_name    varchar(20),
        gender       char(2),
        hire_date    date
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Tabel sumber dan database di ApsaraDB RDS for MySQL

  1. Buat instance ApsaraDB RDS for MySQL. Untuk informasi lebih lanjut, lihat Langkah 1: Buat instance ApsaraDB RDS for MySQL dan konfigurasikan database.

  2. Buat database uji bernama test.

    CREATE DATABASE test;
  3. Buat tabel uji bernama employees.

    USE test;
    
    CREATE TABLE employees (
        emp_no INT NOT NULL PRIMARY KEY,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name VARCHAR(20),
        gender CHAR(2),
        hire_date DATE
    );
  4. Masukkan data ke dalam tabel.

    INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES
    (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'),
    (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'),
    (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'),
    (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'),
    (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');

Impor data dengan menggunakan Flink SQL

  1. Mulai klien SQL Flink.

    Jalankan perintah berikut di direktori bin pada direktori instalasi Flink:

    ./sql-client.sh
  2. Kirim pekerjaan Flink di klien SQL Flink dengan melakukan langkah-langkah berikut:

    1. Buat tabel sumber di database MySQL.

      Dalam contoh ini, konfigurasi terkait konektor MySQL CDC ditentukan di WITH. Untuk informasi lebih lanjut tentang item konfigurasi, lihat MySQL CDC Connector.

      CREATE TABLE employees_source (
          emp_no INT,
          birth_date DATE,
          first_name STRING,
          last_name STRING,
          gender STRING,
          hire_date DATE,
          PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '127.0.0.1', 
          'port' = '3306',
          'username' = 'root',
          'password' = '****',
          'database-name' = 'test',
          'table-name' = 'employees'
      );
    2. Buat tabel sink di instance SelectDB.

      Dalam contoh ini, konfigurasi terkait instance SelectDB ditentukan di WITH. Untuk informasi lebih lanjut tentang item konfigurasi, lihat Item konfigurasi Doris Sink.

      CREATE TABLE employees_sink (
          emp_no       INT ,
          birth_date   DATE,
          first_name   STRING,
          last_name    STRING,
          gender       STRING,
          hire_date    DATE
      ) 
      WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'test.employees',
        'username' = 'admin',
        'password' = '****'
      );
    3. Impor data dari tabel sumber di database MySQL ke tabel sink di instance SelectDB.

      INSERT INTO employees_sink SELECT * FROM employees_source;
  3. Verifikasi hasil impor data.

    Hubungkan ke instance SelectDB dan jalankan perintah berikut untuk memeriksa apakah data telah diimpor:

    SELECT * FROM test.employees;

Impor data dengan menggunakan Flink CDC

Penting

Jika Anda mengimpor data dengan menggunakan Realtime Compute for Apache Flink, Flink CDC tidak mendukung pekerjaan JAR. Flink CDC 3.0 mengimpor data dengan menggunakan pekerjaan YAML.

Contoh berikut menunjukkan cara menggunakan Flink CDC untuk mengimpor data dari database ke instance SelectDB:

Di direktori tempat Flink diinstal, gunakan bin/flink untuk menjalankan pekerjaan Flink CDC. Sintaks:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

Parameter

Parameter

Deskripsi

execution.checkpointing.interval

Interval checkpoint, yang memengaruhi frekuensi sinkronisasi data. Kami sarankan Anda mengatur parameter ini menjadi 10s.

parallelism.default

Paralelisme pekerjaan Flink. Anda dapat meningkatkan paralelisme secara tepat untuk mempercepat sinkronisasi data.

job-name

Nama pekerjaan Flink.

database

Nama database tempat data diimpor di instance SelectDB.

table-prefix

Awalan yang ditambahkan ke nama tabel SelectDB. Contoh: --table-prefix ods_.

table-suffix

Akhiran yang ditambahkan ke nama tabel SelectDB.

including-tables

Tabel tempat data akan disinkronkan. Anda dapat menggunakan tanda vertikal (|) untuk memisahkan beberapa nama tabel. Ekspresi reguler juga didukung. Contoh: --including-tables table1|tbl.*, yang menentukan bahwa data disinkronkan dari table1 dan semua tabel yang namanya dimulai dengan tbl.

excluding-tables

Tabel yang akan dikecualikan. Anda dapat menentukan parameter ini dengan cara yang sama seperti menentukan parameter including-tables.

mysql-conf

Item konfigurasi sumber MySQL CDC. Untuk informasi lebih lanjut tentang item konfigurasi, lihat MySQL CDC Connector. Parameter hostname, username, password, dan database-name diperlukan.

oracle-conf

Item konfigurasi sumber Oracle CDC. Untuk informasi lebih lanjut tentang item konfigurasi, lihat Oracle CDC Connector. Parameter hostname, username, password, database-name, dan schema-name diperlukan.

sink-conf

Item konfigurasi Doris Sink. Untuk informasi lebih lanjut, lihat bagian Item konfigurasi Doris Sink dari topik ini.

table-conf

Item konfigurasi tabel SelectDB. Item konfigurasi terdapat dalam properti saat tabel SelectDB dibuat.

Catatan
  1. Untuk menyinkronkan data, Anda harus menginstal dependensi Flink CDC, seperti flink-sql-connector-mysql-cdc-${version}.jar dan flink-sql-connector-oracle-cdc-${version}.jar, di direktori $FLINK_HOME/lib.

  2. Jika Anda ingin menyinkronkan data penuh dari database, versi Flink harus 1.15 atau lebih baru. Untuk informasi lebih lanjut tentang cara mengunduh Flink Doris Connector versi berbeda, kunjungi org/apache/doris.

Item konfigurasi Doris Sink

Parameter

Nilai default

Diperlukan

Deskripsi

fenodes

Tidak ada nilai default

Ya

Titik akhir dan port HTTP yang digunakan untuk mengakses instance ApsaraDB for SelectDB.

Untuk mendapatkan titik akhir VPC atau publik dan port HTTP dari instance ApsaraDB for SelectDB, lakukan langkah-langkah berikut: Masuk ke konsol ApsaraDB for SelectDB dan buka halaman Instance Details dari instance yang ingin Anda lihat informasinya. Di bagian Network Information halaman Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter HTTP Port.

Contoh: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

Tidak ada nilai default

Ya

Nama database dan tabel. Contoh: test_db.test_table.

username

Tidak ada nilai default

Ya

Nama pengguna yang digunakan untuk terhubung ke instance ApsaraDB for SelectDB.

password

Tidak ada nilai default

Ya

Kata sandi yang digunakan untuk terhubung ke instance ApsaraDB for SelectDB.

jdbc-url

Tidak ada nilai default

Tidak

String koneksi JDBC yang digunakan untuk mengakses instance ApsaraDB for SelectDB.

Untuk mendapatkan titik akhir VPC atau publik dan port MySQL dari instance ApsaraDB for SelectDB, lakukan langkah-langkah berikut: Masuk ke konsol ApsaraDB for SelectDB dan buka halaman Instance Details dari instance yang ingin Anda lihat informasinya. Di bagian Network Information halaman Informasi Dasar, lihat nilai parameter VPC Endpoint atau Public Endpoint dan parameter MySQL Port.

Contoh: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030.

auto-redirect

true

Tidak

Menentukan apakah akan mengarahkan ulang permintaan Stream Load. Jika Anda mengatur parameter ini ke true, permintaan Stream Load dikirim ke frontend (FE). Informasi backend (BE) tidak lagi ditampilkan.

doris.request.retries

3

Tidak

Jumlah maksimum percobaan ulang yang diizinkan untuk mengirim permintaan ke instance SelectDB.

doris.request.connect.timeout

30s

Tidak

Periode timeout koneksi untuk mengirim permintaan ke instance SelectDB.

doris.request.read.timeout

30s

Tidak

Periode timeout baca untuk mengirim permintaan ke instance SelectDB.

sink.label-prefix

""

Ya

Awalan label yang digunakan oleh Stream Load. Dalam skenario komit dua fase, awalan label harus unik secara global untuk memastikan semantik tepat-sekali Flink.

sink.properties

Tidak ada nilai default

Tidak

Properti impor data Stream Load. Konfigurasikan properti dalam salah satu format berikut:

  • Format CSV:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • Format JSON:

    sink.properties.format='json' 

Untuk informasi lebih lanjut tentang properti, lihat Gunakan Stream Load untuk mengimpor data.

sink.buffer-size

1048576

Tidak

Ukuran buffer data tulis. Unit: byte. Kami sarankan Anda menggunakan nilai default, yang setara dengan 1 MB.

sink.buffer-count

3

Tidak

Jumlah buffer data tulis. Kami sarankan Anda menggunakan nilai default.

sink.max-retries

3

Tidak

Jumlah maksimum percobaan ulang yang diizinkan setelah permintaan commit gagal. Nilai default: 3.

sink.use-cache

false

Tidak

Menentukan apakah akan menggunakan cache memori untuk pemulihan jika terjadi pengecualian. Jika Anda mengatur parameter ini ke true, cache akan menyimpan data yang dihasilkan selama pembuatan checkpoint.

sink.enable-delete

true

Tidak

Menentukan apakah akan menghapus event secara sinkron. Hanya model Unique key yang didukung.

sink.enable-2pc

true

Tidak

Menentukan apakah akan mengaktifkan mode komit dua fase. Nilai default: true. Anda dapat mengaktifkan mode komit dua fase untuk memastikan semantik tepat-sekali.

sink.enable.batch-mode

false

Tidak

Menentukan apakah akan mengaktifkan mode batch untuk menulis data ke instance SelectDB. Jika Anda mengaktifkan mode batch, waktu saat data ditulis ke instance ApsaraDB for SelectDB tidak bergantung pada checkpoint. Sebagai gantinya, waktu tersebut ditentukan oleh parameter sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, dan sink.buffer-flush.interval.

Semantik tepat-sekali tidak dijamin setelah mode batch diaktifkan, dan model Unique key dapat digunakan untuk menerapkan idempotensi.

sink.flush.queue-size

2

Tidak

Ukuran antrian cache dalam mode batch.

sink.buffer-flush.max-rows

50000

Tidak

Jumlah maksimum baris data yang dapat ditulis dalam satu batch dalam mode batch.

sink.buffer-flush.max-bytes

10MB

Tidak

Jumlah maksimum byte yang dapat ditulis dalam satu batch dalam mode batch.

sink.buffer-flush.interval

10s

Tidak

Interval di mana cache diperbarui secara asinkron dalam mode batch. Nilai minimum adalah 1. Unit: detik.

sink.ignore.update-before

true

Tidak

Menentukan apakah akan mengabaikan event update-before. Nilai default: true.

Contoh

Impor data dari database MySQL

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Impor data dari database Oracle

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Impor data dari database PostgreSQL

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Impor data dari database SQL Server

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Impor data dengan menggunakan DataStream

  1. Instal dependensi Maven yang diperlukan di proyek Maven.

    Dependensi Maven

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
  2. Jalankan kode inti Java.

    Dalam potongan kode berikut, parameter yang digunakan untuk mengonfigurasi tabel sumber di database MySQL dan tabel sink di instance ApsaraDB for SelectDB sesuai dengan yang dijelaskan dalam bagian Impor data dengan menggunakan Flink SQL dari topik ini. Untuk informasi lebih lanjut, lihat MySQL CDC Connector dan bagian Item konfigurasi Doris Sink dari topik ini.

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
            
            // Konfigurasikan tabel sumber di database MySQL.
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs***")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            // Konfigurasikan tabel sink di instance ApsaraDB for SelectDB.
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
            DorisOptions dorisOptions = dorisBuilder.build();
    
            // Konfigurasikan parameter terkait Stream Load sebagai properti.
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

Penggunaan Lanjutan

Gunakan Flink SQL untuk memperbarui kolom parsial

-- aktifkan checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial.columns' = 'true' -- Izinkan kolom parsial untuk diperbarui.
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

Gunakan Flink SQL untuk menghapus data dari kolom tertentu

Dalam skenario di mana sumber data mendukung CDC, Doris Sink mengidentifikasi jenis event berdasarkan RowKind dan memberikan nilai ke kolom tersembunyi __DORIS_DELETE_SIGN__ untuk menghapus data. Dalam skenario di mana pesan Kafka berfungsi sebagai sumber data, Doris Sink tidak dapat mengidentifikasi jenis operasi berdasarkan RowKind. Dalam hal ini, Doris Sink perlu menandai jenis operasi berdasarkan bidang tertentu dalam pesan, seperti {"op_type":"delete",data:{...}}. Doris Sink dapat menggunakan tanda ini untuk menghapus data yang nilainya pada bidang op_type adalah delete. Dalam hal ini, Doris Sink perlu secara eksplisit merujuk nilai kolom tersembunyi berdasarkan logika bisnis. Contoh berikut menunjukkan cara menggunakan Flink SQL untuk menghapus data dari instance ApsaraDB for SelectDB berdasarkan bidang tertentu dalam data Kafka:

-- Dalam contoh ini, data sumber berisi bidang {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}.
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- Nilai false menunjukkan bahwa jenis event tidak diidentifikasi berdasarkan RowKind.
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- Kolom yang diimpor oleh Stream Load.
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

FAQ

  • T: Bagaimana cara menulis data tipe BITMAP?

    A: Contoh kode berikut menunjukkan cara menulis data tipe BITMAP:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'doris', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • T: Apa yang harus saya lakukan jika kesalahan errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650] dilaporkan?

    A: Dalam skenario tepat-sekali, pekerjaan Flink harus dimulai dari checkpoint terbaru atau savepoint saat di-restart. Jika tidak, kesalahan di atas akan dilaporkan. Jika tepat-sekali tidak diperlukan, Anda dapat mengatur parameter sink.enable-2pc ke false untuk menonaktifkan mode komit dua fase atau memodifikasi parameter sink.label-prefix.

  • T: Apa yang harus saya lakukan jika kesalahan errCode = 2, detailMessage = transaction[19650]not found dilaporkan?

    A: Kesalahan ini terjadi selama fase commit. Jika ID transaksi yang tercatat dalam checkpoint telah kedaluwarsa di instance ApsaraDB for SelectDB Anda dan Anda mencoba melakukan commit transaksi lagi, kesalahan ini terjadi. Dalam hal ini, Anda tidak dapat memulai transaksi dari checkpoint. Anda dapat memodifikasi parameter streaming_label_keep_max_second dari instance ApsaraDB for SelectDB untuk memperpanjang periode validitas. Periode validitas default adalah 12 jam.

  • T: Apa yang harus saya lakukan jika kesalahan errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100 dilaporkan?

    A: Kesalahan ini dilaporkan karena jumlah pekerjaan impor data konkuren dari database yang sama melebihi 100. Anda dapat menyelesaikan kesalahan ini dengan memodifikasi parameter max_running_txn_num_per_db dari instance ApsaraDB for SelectDB Anda. Untuk informasi lebih lanjut, lihat max_running_txn_num_per_db dalam Konfigurasi FE.

    Jika Anda sering memodifikasi label untuk pekerjaan dan memulai ulang pekerjaan, kesalahan ini mungkin terjadi. Dalam skenario komit dua fase di mana model Duplicate key atau Aggregate key digunakan, label setiap pekerjaan harus unik. Pekerjaan Flink secara aktif membatalkan transaksi yang sebelumnya dimulai tetapi belum selesai ketika pekerjaan Flink di-restart dari checkpoint. Jika Anda sering memodifikasi label untuk pekerjaan dan memulai ulang pekerjaan, sejumlah besar transaksi precommitted tidak dapat dibatalkan dan menghabiskan kuota transaksi. Jika model Unique key digunakan, Anda juga dapat menonaktifkan mode komit dua fase dan menerapkan penulisan idempoten dengan mengonfigurasi Doris Sink.

  • T: Bagaimana cara memastikan urutan sekelompok data saat Flink menulis data ke tabel yang menggunakan model Unique key?

    A: Anda dapat menambahkan kolom urutan untuk memastikan urutan sekelompok data. Untuk informasi lebih lanjut, lihat Kolom Urutan.

  • T: Mengapa pekerjaan Flink gagal mengimpor data saat tidak ada kesalahan yang dilaporkan untuk pekerjaan Flink?

    A: Jika Flink Doris Connector 1.1.0 atau versi sebelumnya digunakan untuk mengimpor data, data ditulis dalam mode batch. Semua penulisan data didorong oleh data. Anda harus memeriksa apakah data ditulis ke sumber data upstream. Jika Flink Doris Connector versi lebih baru dari 1.1.0 digunakan, penulisan data bergantung pada checkpoint. Anda harus mengaktifkan checkpoint untuk menulis data.

  • T: Apa yang harus saya lakukan jika kesalahan tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235 dilaporkan?

    A: Dalam banyak kasus, kesalahan ini dilaporkan saat menggunakan Flink Doris Connector 1.1.0 atau versi sebelumnya. Kesalahan ini disebabkan oleh frekuensi penulisan yang terlalu tinggi. Anda dapat mengurangi frekuensi Stream Load dengan menentukan parameter sink.buffer-flush.max-bytes dan sink.buffer-flush.interval.

  • T: Bagaimana cara melewati data kotor saat saya mengimpor data menggunakan Flink?

    A: Saat Anda menggunakan Flink untuk mengimpor data, jika terdapat data kotor, seperti data yang format atau panjang bidangnya tidak sesuai dengan persyaratan, Stream Load akan melaporkan kesalahan. Dalam hal ini, Flink akan terus mencoba mengimpor data secara berulang. Untuk melewati data kotor, Anda dapat menonaktifkan mode ketat Stream Load dengan mengatur parameter strict_mode ke false dan parameter max_filter_ratio ke 1 atau memfilter data sebelum operator Sink.

  • T: Bagaimana cara memetakan tabel sumber ke tabel tujuan ApsaraDB for SelectDB?

    A: Saat Anda menggunakan Flink Doris Connector untuk mengimpor data, perhatikan dua hal berikut: 1. Kolom dan tipe pada tabel sumber harus sesuai dengan yang ada di Flink SQL. 2. Kolom dan tipe di Flink SQL harus sesuai dengan tabel tujuan ApsaraDB for SelectDB.

  • T: Apa yang harus saya lakukan jika kesalahan TApplicationException: get_next failed: out of sequence response: expected 4 but got 3 dilaporkan?

    A: Kesalahan ini disebabkan oleh bug konkurensi dalam kerangka Thrift. Kami sarankan Anda menggunakan versi terbaru dari Flink Doris Connector dan versi Flink yang kompatibel.

  • T: Apa yang harus saya lakukan jika kesalahan DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX dilaporkan?

    A: Anda dapat mencari log untuk abort transaction response di TaskManager dan memeriksa apakah kesalahan disebabkan oleh klien atau server berdasarkan kode status HTTP yang dikembalikan.