全部产品
Search
文档中心

ApsaraDB for SelectDB:Impor data menggunakan Flink

更新时间:Feb 26, 2026

ApsaraDB for SelectDB sepenuhnya kompatibel dengan Apache Doris. Anda dapat menggunakan Flink Doris Connector untuk mengimpor data historis dari sumber data seperti MySQL, Oracle, PostgreSQL, SQL Server, dan Kafka ke SelectDB. Selain itu, ketika Anda mengaktifkan pekerjaan Change Data Capture (CDC) di Flink, data inkremental dari sumber data juga disinkronkan ke SelectDB.

Ikhtisar

Catatan

Anda hanya dapat menggunakan Flink Doris Connector untuk menulis data ke SelectDB. Untuk menggunakan Flink Doris Connector guna terhubung langsung ke node backend (BE) instans SelectDB demi pembacaan data yang efisien, hubungi tim dukungan teknis SelectDB untuk meminta izin akses yang diperlukan.

Anda juga dapat menggunakan Flink Java Database Connectivity (JDBC) Connector untuk membaca data dari SelectDB.

Flink Doris Connector menghubungkan Apache Flink dan Apache Doris. Connector ini memungkinkan Anda membaca dan menulis data ke Doris, yang mendukung pemrosesan dan analisis data real-time. Karena SelectDB sepenuhnya kompatibel dengan Apache Doris, Flink Doris Connector merupakan metode umum untuk mengalirkan data ke SelectDB.

Kemampuan stream processing Flink bergantung pada tiga komponen: Source, Transform, dan Sink. Fungsinya sebagai berikut:

  • Source:

    • Fungsi: Source adalah titik masuk aliran data Flink. Komponen ini membaca data dari sistem eksternal, seperti antrian pesan, database, atau sistem file.

    • Contoh: Anda dapat menggunakan Kafka sebagai sumber data untuk membaca pesan real-time atau membaca data dari file.

  • Transform:

    • Fungsi: Tahap transform memproses dan mengonversi aliran data masukan. Operasi ini dapat mencakup penyaringan, pemetaan, agregasi, dan windowing.

    • Contoh: Anda dapat memetakan aliran masukan untuk mengubah struktur datanya atau mengagregasi data guna menghitung metrik per menit.

  • Sink:

    • Fungsi: Sink adalah titik keluar aliran data Flink. Komponen ini menulis data yang telah diproses ke sistem eksternal, seperti database, file, atau antrian pesan.

    • Contoh: Anda dapat menulis hasil yang telah diproses ke database MySQL atau mengirim data ke topik Kafka lainnya.

Gambar berikut menunjukkan alur data saat Anda mengimpor data ke SelectDB menggunakan Flink Doris Connector.

image

Prasyarat

  • Sumber data, Flink, dan SelectDB dapat saling berkomunikasi melalui jaringan:

    1. Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB Anda. Untuk informasi selengkapnya, lihat Ajukan atau lepas titik akhir publik.

      Jika Flink dan sumber data Anda keduanya merupakan produk Alibaba Cloud atau diterapkan pada instans Elastic Compute Service (ECS) Alibaba Cloud, serta berada dalam VPC yang sama dengan instans ApsaraDB for SelectDB Anda, Anda dapat melewati langkah ini.

    2. Tambahkan alamat IP Flink dan sumber data ke daftar putih instans ApsaraDB for SelectDB Anda. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih alamat IP.

  • Flink dan Flink Doris Connector telah diinstal.

    Tabel berikut menjelaskan persyaratan versi.

    Versi Flink

    Versi Flink Doris Connector

    URL Unduh

    Realtime Compute for Apache Flink: 1.17 atau lebih baru

    Open source: 1.15 atau lebih baru

    Versi 1.5.2 atau lebih baru. Disarankan menggunakan versi terbaru.

    Flink Doris Connector

    Untuk informasi selengkapnya tentang cara menginstal Flink Doris Connector, lihat Cara menginstal Flink Doris Connector.

Cara menginstal Flink Doris Connector

Anda dapat menginstal Flink Doris Connector sesuai skenario Anda.

  • Jika Anda mengimpor data ke SelectDB menggunakan Realtime Compute for Apache Flink, Anda dapat menggunakan fitur connector kustom untuk mengunggah, menggunakan, dan memperbarui Flink Doris Connector. Untuk informasi selengkapnya, lihat Kelola connector kustom.

  • Jika Anda menggunakan kluster Flink open-source yang dikelola sendiri, Anda dapat mengunduh paket JAR untuk versi Flink Doris Connector yang sesuai dan menempatkannya di direktori `lib` instalasi Flink Anda. Untuk tautan unduhan, lihat Paket JAR.

  • Untuk mengimpor Flink Doris Connector menggunakan Maven, tambahkan kode berikut ke file konfigurasi dependensi proyek Anda. Untuk versi lainnya, lihat Repositori Maven.

    <!-- flink-doris-connector -->
    <dependency>
      <groupId>org.apache.doris</groupId>
      <artifactId>flink-doris-connector-1.16</artifactId>
      <version>1.5.2</version>
    </dependency>  

Contoh penggunaan

Lingkungan contoh

Bagian ini memberikan contoh cara menggunakan Flink SQL, Flink CDC, dan DataStream untuk memigrasikan data dari tabel `employees` di database `test` instans MySQL ke tabel `employees` di database `test` instans SelectDB. Anda dapat memodifikasi parameter sesuai kebutuhan. Lingkungan contohnya sebagai berikut:

  • Lingkungan standalone Flink 1.16

  • Java

  • Database tujuan: test

  • Tabel tujuan: employees

  • Database sumber: test

  • Tabel sumber: employees

Persiapkan lingkungan

Persiapkan lingkungan Flink

  1. Persiapkan lingkungan Java.

    Flink bergantung pada lingkungan Java. Anda harus menginstal Java Development Kit (JDK) dan mengonfigurasi variabel lingkungan JAVA_HOME.

    Versi Java yang diperlukan bergantung pada versi Flink. Untuk informasi selengkapnya tentang versi Java yang didukung Flink, lihat Kompatibilitas Java. Dalam contoh ini, Java 8 diinstal. Untuk informasi selengkapnya, lihat Instal JDK.

  2. Unduh paket instalasi Flink `flink-1.16.3-bin-scala_2.12.tgz`. Jika versi ini kedaluwarsa, Anda dapat mengunduh versi lainnya. Untuk versi lainnya, lihat Apache Flink.

    wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  3. Ekstrak paket tersebut.

    tar -zxvf flink-1.16.3-bin-scala_2.12.tgz
  4. Buka direktori `lib` instalasi Flink Anda dan tambahkan connector yang diperlukan untuk operasi berikut:

    • Tambahkan Flink Doris Connector.

      wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jar
    • Tambahkan Flink MySQL Connector.

      wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
  5. Jalankan kluster Flink.

    Jalankan perintah berikut di direktori `bin` instalasi Flink.

    ./start-cluster.sh 

Persiapkan database dan tabel SelectDB tujuan

  1. Buat instans ApsaraDB for SelectDB. Untuk informasi selengkapnya, lihat Buat instans.

  2. Hubungkan ke instans tersebut. Untuk informasi selengkapnya, lihat Hubungkan ke instans.

  3. Buat database `test`.

    CREATE DATABASE test;
  4. Buat tabel `employees`.

    USE test;
    
    -- Buat tabel.
    CREATE TABLE employees (
        emp_no       int NOT NULL,
        birth_date   date,
        first_name   varchar(20),
        last_name    varchar(20),
        gender       char(2),
        hire_date    date
    )
    UNIQUE KEY(`emp_no`)
    DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;

Persiapkan database dan tabel MySQL sumber

  1. Buat instans MySQL. Untuk informasi selengkapnya, lihat Buat instans ApsaraDB RDS for MySQL dan konfigurasikan database dengan cepat.

  2. Buat database `test`.

    CREATE DATABASE test;
  3. Buat tabel `employees`.

    USE test;
    
    CREATE TABLE employees (
        emp_no INT NOT NULL PRIMARY KEY,
        birth_date DATE,
        first_name VARCHAR(20),
        last_name VARCHAR(20),
        gender CHAR(2),
        hire_date DATE
    );
  4. Masukkan data.

    INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES
    (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'),
    (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'),
    (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'),
    (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'),
    (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');

Impor data menggunakan Flink SQL

  1. Jalankan layanan Flink SQL Client.

    Jalankan perintah berikut di direktori `bin` instalasi Flink.

    ./sql-client.sh
  2. Kirim pekerjaan Flink di Flink SQL Client.

    1. Buat tabel sumber MySQL.

      Dalam pernyataan berikut, item dalam klausa `WITH` adalah konfigurasi untuk MySQL CDC Source. Untuk informasi selengkapnya tentang item konfigurasi, lihat MySQL | Apache Flink CDC.

      CREATE TABLE employees_source (
          emp_no INT,
          birth_date DATE,
          first_name STRING,
          last_name STRING,
          gender STRING,
          hire_date DATE,
          PRIMARY KEY (`emp_no`) NOT ENFORCED
      ) WITH (
          'connector' = 'mysql-cdc',
          'hostname' = '127.0.0.1', 
          'port' = '3306',
          'username' = 'root',
          'password' = '****',
          'database-name' = 'test',
          'table-name' = 'employees'
      );
    2. Buat tabel sink SelectDB.

      Dalam pernyataan berikut, item dalam klausa `WITH` adalah konfigurasi untuk SelectDB. Untuk informasi selengkapnya tentang item konfigurasi, lihat Item konfigurasi sink.

      CREATE TABLE employees_sink (
          emp_no       INT ,
          birth_date   DATE,
          first_name   STRING,
          last_name    STRING,
          gender       STRING,
          hire_date    DATE
      ) 
      WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'test.employees',
        'username' = 'admin',
        'password' = '****'
      );
    3. Sinkronkan data dari tabel sumber MySQL ke tabel sink SelectDB.

      INSERT INTO employees_sink SELECT * FROM employees_source;
  3. Lihat hasil impor data.

    Hubungkan ke SelectDB dan jalankan pernyataan berikut untuk melihat hasilnya.

    SELECT * FROM test.employees;

Impor data menggunakan Flink CDC

Penting

Realtime Compute for Apache Flink tidak mendukung pekerjaan JAR. Sebagai gantinya, layanan ini mendukung pekerjaan YAML untuk CDC 3.0.

Bagian ini menjelaskan cara menggunakan Flink CDC untuk mengimpor data dari database ke SelectDB.

Di direktori instalasi Flink, Anda dapat menggunakan program flink untuk menjalankan Flink CDC. Sintaksnya sebagai berikut:

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
    --database <selectdb-database-name> \
    [--job-name <flink-job-name>] \
    [--table-prefix <selectdb-table-prefix>] \
    [--table-suffix <selectdb-table-suffix>] \
    [--including-tables <mysql-table-name|name-regular-expr>] \
    [--excluding-tables <mysql-table-name|name-regular-expr>] \
    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
    --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
    [--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]

Parameter

Parameter

Deskripsi

execution.checkpointing.interval

Interval checkpoint Flink. Ini memengaruhi frekuensi sinkronisasi data. Disarankan nilai `10s`.

parallelism.default

Tingkat paralelisme untuk pekerjaan Flink. Tingkat paralelisme yang lebih tinggi dapat meningkatkan kecepatan sinkronisasi data.

job-name

Nama pekerjaan Flink.

database

Nama database di SelectDB tempat data disinkronkan.

table-prefix

Awalan untuk nama tabel SelectDB. Contoh: --table-prefix ods_.

table-suffix

Akhiran untuk nama tabel SelectDB.

including-tables

Tabel yang akan disinkronkan. Gunakan tanda pipa vertikal (|) untuk memisahkan beberapa tabel. Ekspresi reguler didukung. Misalnya, --including-tables table1|tbl.* menyinkronkan `table1` dan semua tabel yang dimulai dengan `tbl.`.

excluding-tables

Tabel yang tidak disinkronkan. Konfigurasikan parameter ini dengan cara yang sama seperti `including-tables`.

mysql-conf

Konfigurasi untuk MySQL CDC Source. Untuk informasi selengkapnya, lihat MySQL CDC Connector. Parameter hostname, username, password, dan database-name wajib diisi.

oracle-conf

Konfigurasi untuk Oracle CDC Source. Untuk informasi selengkapnya, lihat Oracle CDC Connector. Parameter hostname, username, password, database-name, dan schema-name wajib diisi.

sink-conf

Semua konfigurasi untuk Doris Sink. Untuk informasi selengkapnya, lihat Item konfigurasi sink.

table-conf

Item konfigurasi untuk tabel SelectDB. Ini adalah item yang terdapat dalam `properties` saat Anda membuat tabel SelectDB.

Catatan
  1. Untuk sinkronisasi, Anda harus menambahkan dependensi Flink CDC yang sesuai ke direktori `$FLINK_HOME/lib`, seperti `flink-sql-connector-mysql-cdc-${version}.jar` atau `flink-sql-connector-oracle-cdc-${version}.jar`.

  2. Versi Flink 1.15 dan yang lebih baru mendukung sinkronisasi database penuh. Untuk tautan unduhan berbagai versi Flink Doris Connector, lihat Flink Doris Connector.

Sink item konfigurasi

Parameter

Nilai default

Wajib

Deskripsi

fenodes

None

Ya

Titik akhir dan port protokol HTTP instans ApsaraDB for SelectDB.

Anda dapat memperoleh VPC Endpoint (atau Public Endpoint) dan HTTP Port dari halaman Instance Details > Network Information di konsol ApsaraDB for SelectDB.

Contoh: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

table.identifier

None

Ya

Nama database dan tabel. Contoh: test_db.test_table.

username

None

Ya

Username database untuk instans ApsaraDB for SelectDB.

password

None

Ya

Password untuk pengguna database instans ApsaraDB for SelectDB.

jdbc-url

None

Tidak

Informasi koneksi JDBC untuk instans ApsaraDB for SelectDB.

Anda dapat memperoleh VPC Endpoint (atau Public Endpoint) dan MySQL Port dari halaman Instance Details > Network Information di konsol ApsaraDB for SelectDB.

Contoh: jdbc:mysql://selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030.

auto-redirect

true

Tidak

Menentukan apakah permintaan Stream Load dialihkan. Jika diaktifkan, Stream Load menulis melalui frontend (FE), dan informasi backend (BE) tidak lagi diperoleh secara eksplisit.

doris.request.retries

3

Tidak

Jumlah percobaan ulang untuk mengirim permintaan ke SelectDB.

doris.request.connect.timeout

30s

Tidak

Timeout koneksi untuk mengirim permintaan ke SelectDB.

doris.request.read.timeout

30s

Tidak

Timeout baca untuk mengirim permintaan ke SelectDB.

sink.label-prefix

""

Ya

Awalan label yang digunakan untuk impor Stream Load. Dalam skenario two-phase commit (2PC), ini harus unik secara global untuk menjamin Exactly-Once Semantics (EOS) Flink.

sink.properties

None

Tidak

Parameter impor untuk Stream Load. Tentukan konfigurasi properti.

  • Untuk format CSV, tulis:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • Untuk format JSON, tulis:

    sink.properties.format='json' 

Untuk informasi selengkapnya tentang parameter, lihat Stream Load.

sink.buffer-size

1048576

Tidak

Ukuran buffer data tulis dalam byte. Jangan ubah parameter ini. Gunakan konfigurasi default 1 MB.

sink.buffer-count

3

Tidak

Jumlah buffer data tulis. Jangan ubah parameter ini. Gunakan konfigurasi default.

sink.max-retries

3

Tidak

Jumlah maksimum percobaan ulang setelah kegagalan pada fase commit. Nilai default adalah 3.

sink.use-cache

false

Tidak

Menentukan apakah cache memori digunakan untuk pemulihan saat terjadi pengecualian. Jika diaktifkan, cache menyimpan data dari periode checkpoint.

sink.enable-delete

true

Tidak

Menentukan apakah event penghapusan disinkronkan. Hanya model Unique Key yang didukung.

sink.enable-2pc

true

Tidak

Menentukan apakah protokol two-phase commit (2PC) diaktifkan. Nilai default adalah true, yang menjamin EOS.

sink.enable.batch-mode

false

Tidak

Menentukan apakah mode batch digunakan untuk menulis ke SelectDB. Jika diaktifkan, waktu penulisan tidak bergantung pada checkpoint. Sebaliknya, waktu penulisan dikontrol oleh parameter `sink.buffer-flush.max-rows`, `sink.buffer-flush.max-bytes`, dan `sink.buffer-flush.interval`.

Saat mode ini diaktifkan, EOS tidak dijamin. Anda dapat menggunakan model Unique Key untuk mencapai idempotensi.

sink.flush.queue-size

2

Tidak

Ukuran antrean cache dalam mode pemrosesan batch.

sink.buffer-flush.max-rows

50000

Tidak

Jumlah maksimum baris data yang ditulis dalam satu batch dalam mode pemrosesan batch.

sink.buffer-flush.max-bytes

10MB

Tidak

Jumlah maksimum byte yang ditulis dalam satu batch dalam mode pemrosesan batch.

sink.buffer-flush.interval

10s

Tidak

Interval untuk flushing cache secara asinkron dalam mode pemrosesan batch. Nilai minimum adalah 1s.

sink.ignore.update-before

true

Tidak

Menentukan apakah event `update-before` diabaikan. Secara default, event ini diabaikan.

Contoh sinkronisasi

Contoh sinkronisasi MySQL

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Contoh sinkronisasi Oracle

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    oracle-sync-database \
    --database test_db \
    --oracle-conf hostname=127.0.0.1 \
    --oracle-conf port=1521 \
    --oracle-conf username=admin \
    --oracle-conf password="password" \
    --oracle-conf database-name=XE \
    --oracle-conf schema-name=ADMIN \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Contoh sinkronisasi PostgreSQL

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    postgres-sync-database \
    --database db1\
    --postgres-conf hostname=127.0.0.1 \
    --postgres-conf port=5432 \
    --postgres-conf username=postgres \
    --postgres-conf password="123456" \
    --postgres-conf database-name=postgres \
    --postgres-conf schema-name=public \
    --postgres-conf slot.name=test \
    --postgres-conf decoding.plugin.name=pgoutput \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Contoh sinkronisasi SQL Server

<FLINK_HOME>/bin/flink run \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.16-1.5.2.jar \
    sqlserver-sync-database \
    --database db1\
    --sqlserver-conf hostname=127.0.0.1 \
    --sqlserver-conf port=1433 \
    --sqlserver-conf username=sa \
    --sqlserver-conf password="123456" \
    --sqlserver-conf database-name=CDC_DB \
    --sqlserver-conf schema-name=dbo \
    --including-tables "tbl1|test.*" \
    --sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
    --sink-conf username=admin \
    --sink-conf password=****

Impor data menggunakan API DataStream

  1. Dalam proyek Maven Anda, tambahkan dependensi yang diperlukan.

    Dependensi Maven yang diperlukan

    <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <scala.version>2.12</scala.version>
            <java.version>1.8</java.version>
            <flink.version>1.16.3</flink.version>
            <fastjson.version>1.2.62</fastjson.version>
            <scope.mode>compile</scope.mode>
        </properties>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>28.1-jre</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.14.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-jdbc</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.doris</groupId>
                <artifactId>flink-doris-connector-1.16</artifactId>
                <version>1.5.2</version>
            </dependency>
    
            <dependency>
                <groupId>com.ververica</groupId>
                <artifactId>flink-sql-connector-mysql-cdc</artifactId>
                <version>2.4.2</version>
                <exclusions>
                    <exclusion>
                        <artifactId>flink-shaded-guava</artifactId>
                        <groupId>org.apache.flink</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-runtime-web</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
        </dependencies>
  2. Kode inti Java.

    Dalam kode berikut, parameter untuk tabel sumber MySQL dan tabel sink SelectDB sesuai dengan konfigurasi di bagian Impor data menggunakan Flink SQL. Untuk informasi selengkapnya, lihat MySQL | Apache Flink CDC dan Item konfigurasi sink.

    package org.example;
    
    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    
    import org.apache.doris.flink.cfg.DorisExecutionOptions;
    import org.apache.doris.flink.cfg.DorisOptions;
    import org.apache.doris.flink.sink.DorisSink;
    import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer;
    import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.enableCheckpointing(10000);
    
            Map<String, Object> customConverterConfigs = new HashMap<>();
            customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric");
            JsonDebeziumDeserializationSchema schema =
                    new JsonDebeziumDeserializationSchema(false, customConverterConfigs);
            
            // Konfigurasikan tabel sumber MySQL.
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("rm-xxx.mysql.rds.aliyuncs***")
                    .port(3306)
                    .startupOptions(StartupOptions.initial())
                    .databaseList("db_test")
                    .tableList("db_test.employees")
                    .username("root")
                    .password("test_123")
                    .debeziumProperties(DateToStringConverter.DEFAULT_PROPS)
                    .deserializer(schema)
                    .serverTimeZone("Asia/Shanghai")
                    .build();
    
            // Konfigurasikan tabel sink SelectDB.
            DorisSink.Builder<String> sinkBuilder = DorisSink.builder();
            DorisOptions.Builder dorisBuilder = DorisOptions.builder();
            dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080")
                    .setTableIdentifier("db_test.employees")
                    .setUsername("admin")
                    .setPassword("test_123");
            DorisOptions dorisOptions = dorisBuilder.build();
    
            // Konfigurasikan parameter Stream Load di sink.properties.
            Properties properties = new Properties();
            properties.setProperty("format", "json");
            properties.setProperty("read_json_by_line", "true");
            DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setStreamLoadProp(properties);
    
            sinkBuilder.setDorisExecutionOptions(executionBuilder.build())
                    .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string
                    .setDorisOptions(dorisOptions);
    
            DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
            dataStreamSource.sinkTo(sinkBuilder.build());
            env.execute("MySQL to SelectDB");
        }
    }

Penggunaan lanjutan

Perbarui kolom parsial menggunakan Flink SQL

-- aktifkan checkpoint
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE cdc_mysql_source (
   id INT
  ,name STRING
  ,bank STRING
  ,age INT
  ,PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '127.0.0.1',
 'port' = '3306',
 'username' = 'root',
 'password' = 'password',
 'database-name' = 'database',
 'table-name' = 'table'
);

CREATE TABLE selectdb_sink (
    id INT,
    name STRING,
    bank STRING,
    age INT
) 
WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'database.table',
  'username' = 'admin',
  'password' = '****',
  'sink.properties.format' = 'json',
  'sink.properties.read_json_by_line' = 'true',
  'sink.properties.columns' = 'id,name,bank,age',
  'sink.properties.partial_columns' = 'true' -- Aktifkan pembaruan kolom parsial.
);


INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;

Hapus data berdasarkan kolom tertentu menggunakan Flink SQL

Dalam skenario sumber CDC, Doris Sink membedakan jenis event berdasarkan `RowKind` dan memberikan nilai ke kolom tersembunyi __DORIS_DELETE_SIGN__ untuk melakukan penghapusan. Dalam skenario sumber pesan Kafka, Doris Sink tidak dapat langsung menggunakan `RowKind` untuk membedakan jenis operasi. Komponen ini harus mengandalkan bidang tertentu dalam pesan untuk menandai jenis operasi, seperti {"op_type":"delete",data:{...}}. Untuk data jenis ini, jika Anda ingin menghapus catatan di mana `op_type` bernilai 'delete', Anda harus secara eksplisit meneruskan nilai untuk kolom tersembunyi berdasarkan logika bisnis Anda. Contoh Flink SQL berikut menunjukkan cara menghapus data di SelectDB berdasarkan bidang tertentu dalam data Kafka.

-- Data contoh: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
  data STRING,
  op_type STRING
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE SELECTDB_SINK(
  id INT,
  name STRING,
  __DORIS_DELETE_SIGN__ INT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'db.table',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'false',        -- false menunjukkan bahwa jenis event tidak diperoleh dari RowKind.
  'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__'  -- Secara eksplisit tentukan kolom yang akan diimpor untuk Stream Load.
);

INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name, 
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
FROM KAFKA_SOURCE;

FAQ

  • Q: Bagaimana cara menulis data tipe BITMAP?

    A: Contoh berikut menunjukkan caranya:

    CREATE TABLE bitmap_sink (
      dt INT,
      page STRING,
      user_id INT 
    )
    WITH ( 
      'connector' = 'doris', 
      'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
      'table.identifier' = 'test.bitmap_test', 
      'username' = 'admin', 
      'password' = '****', 
      'sink.label-prefix' = 'selectdb_label', 
      'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
    );
  • Q: Bagaimana cara mengatasi error: errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]?

    A: Dalam skenario Exactly-Once, pekerjaan Flink harus dimulai ulang dari checkpoint atau savepoint terbaru. Jika tidak, error ini terjadi. Jika Exactly-Once tidak diperlukan, Anda dapat mengatasi error ini dengan menonaktifkan protokol two-phase commit (2PC) (sink.enable-2pc=false) atau menggunakan `sink.label-prefix` yang berbeda.

  • Q: Bagaimana cara mengatasi error: errCode = 2, detailMessage = transaction[19650]not found?

    A: Error ini terjadi selama fase commit. ID transaksi yang dicatat dalam checkpoint telah kedaluwarsa di sisi SelectDB. Melakukan commit transaksi lagi menyebabkan error ini. Anda tidak dapat memulai ulang dari checkpoint dalam kasus ini. Anda dapat memperpanjang waktu kedaluwarsa dengan memodifikasi parameter streaming_label_keep_max_second di SelectDB. Nilai default adalah 12 jam.

  • Q: Bagaimana cara mengatasi error: errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100?

    A: Error ini terjadi karena jumlah pekerjaan impor data konkuren untuk database yang sama melebihi 100. Anda dapat mengatasi masalah ini dengan menyesuaikan parameter SelectDB max_running_txn_num_per_db. Untuk informasi selengkapnya, lihat max_running_txn_num_per_db.

    Error ini juga dapat terjadi jika Anda sering mengubah label dan memulai ulang pekerjaan. Dalam skenario two-phase commit (2PC) yang menggunakan model Duplicate atau Aggregate Key, label untuk setiap pekerjaan harus unik. Saat memulai ulang dari checkpoint, pekerjaan Flink hanya membatalkan transaksi yang dimulai tetapi belum selesai (pre-committed tetapi belum committed). Jika Anda sering mengubah label dan memulai ulang, banyak transaksi yang berhasil pre-committed (txn) tidak dapat dibatalkan, yang menghabiskan slot transaksi. Dalam model Unique Key, Anda juga dapat menonaktifkan 2PC dan merancang sink untuk melakukan penulisan idempoten.

  • Q: Saat Flink menulis ke tabel yang menggunakan model Unique Key, bagaimana cara memastikan urutan data dalam satu batch?

    A: Anda dapat menambahkan konfigurasi kolom sequence untuk memastikan urutan. Untuk informasi selengkapnya, lihat SEQUENCE.

  • Q: Mengapa data tidak disinkronkan meskipun pekerjaan Flink tidak melaporkan error?

    A: Sebelum versi 1.1.0, connector menggunakan penulisan batch yang didorong data. Anda harus memeriksa apakah data ditulis dari sumber hulu. Mulai versi 1.1.0, penulisan bergantung pada checkpoint. Anda harus mengaktifkan checkpoint untuk menulis data.

  • Q: Bagaimana cara mengatasi error: tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235?

    A: Error ini biasanya terjadi pada versi connector sebelum 1.1.0. Hal ini disebabkan oleh frekuensi penulisan yang terlalu tinggi, sehingga menghasilkan terlalu banyak versi. Anda dapat mengurangi frekuensi Stream Load dengan mengatur parameter sink.buffer-flush.max-bytes dan sink.buffer-flush.interval.

  • Q: Bagaimana cara melewati data kotor selama impor Flink?

    A: Selama impor Flink, jika terdapat data kotor, seperti masalah format atau panjang bidang, Stream Load gagal, dan Flink terus mencoba ulang. Untuk melewati data kotor, Anda dapat menonaktifkan mode ketat Stream Load (strict_mode=false,max_filter_ratio=1) atau menyaring data sebelum mencapai operator sink.

  • Q: Bagaimana korespondensi antara tabel sumber dan tabel SelectDB?

    A: Saat Anda menggunakan Flink Doris Connector untuk mengimpor data, Anda harus memastikan dua hal. Pertama, kolom dan tipe tabel sumber harus sesuai dengan kolom dan tipe di Flink SQL. Kedua, kolom dan tipe di Flink SQL harus sesuai dengan kolom dan tipe tabel SelectDB.

  • Q: Bagaimana cara mengatasi error: TApplicationException: get_next failed: out of sequence response: expected 4 but got 3?

    A: Error ini disebabkan oleh bug konkurensi dalam framework Thrift. Untuk mengatasi masalah ini, gunakan versi connector terbaru yang memungkinkan dan versi Flink yang kompatibel.

  • Q: Bagaimana cara mengatasi error: DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX?

    A: Di log TaskManager, cari abort transaction response. Kode respons HTTP menunjukkan apakah masalah berasal dari client atau server.