ApsaraDB for SelectDB sepenuhnya kompatibel dengan Apache Doris. Anda dapat menggunakan Flink Doris Connector untuk mengimpor data historis dari sumber data seperti MySQL, Oracle, PostgreSQL, SQL Server, dan Kafka ke SelectDB. Selain itu, ketika Anda mengaktifkan pekerjaan Change Data Capture (CDC) di Flink, data inkremental dari sumber data juga disinkronkan ke SelectDB.
Ikhtisar
Anda hanya dapat menggunakan Flink Doris Connector untuk menulis data ke SelectDB. Untuk menggunakan Flink Doris Connector guna terhubung langsung ke node backend (BE) instans SelectDB demi pembacaan data yang efisien, hubungi tim dukungan teknis SelectDB untuk meminta izin akses yang diperlukan.
Anda juga dapat menggunakan Flink Java Database Connectivity (JDBC) Connector untuk membaca data dari SelectDB.
Flink Doris Connector menghubungkan Apache Flink dan Apache Doris. Connector ini memungkinkan Anda membaca dan menulis data ke Doris, yang mendukung pemrosesan dan analisis data real-time. Karena SelectDB sepenuhnya kompatibel dengan Apache Doris, Flink Doris Connector merupakan metode umum untuk mengalirkan data ke SelectDB.
Kemampuan stream processing Flink bergantung pada tiga komponen: Source, Transform, dan Sink. Fungsinya sebagai berikut:
Source:
Fungsi: Source adalah titik masuk aliran data Flink. Komponen ini membaca data dari sistem eksternal, seperti antrian pesan, database, atau sistem file.
Contoh: Anda dapat menggunakan Kafka sebagai sumber data untuk membaca pesan real-time atau membaca data dari file.
Transform:
Fungsi: Tahap transform memproses dan mengonversi aliran data masukan. Operasi ini dapat mencakup penyaringan, pemetaan, agregasi, dan windowing.
Contoh: Anda dapat memetakan aliran masukan untuk mengubah struktur datanya atau mengagregasi data guna menghitung metrik per menit.
Sink:
Fungsi: Sink adalah titik keluar aliran data Flink. Komponen ini menulis data yang telah diproses ke sistem eksternal, seperti database, file, atau antrian pesan.
Contoh: Anda dapat menulis hasil yang telah diproses ke database MySQL atau mengirim data ke topik Kafka lainnya.
Gambar berikut menunjukkan alur data saat Anda mengimpor data ke SelectDB menggunakan Flink Doris Connector.
Prasyarat
Sumber data, Flink, dan SelectDB dapat saling berkomunikasi melalui jaringan:
Ajukan titik akhir publik untuk instans ApsaraDB for SelectDB Anda. Untuk informasi selengkapnya, lihat Ajukan atau lepas titik akhir publik.
Jika Flink dan sumber data Anda keduanya merupakan produk Alibaba Cloud atau diterapkan pada instans Elastic Compute Service (ECS) Alibaba Cloud, serta berada dalam VPC yang sama dengan instans ApsaraDB for SelectDB Anda, Anda dapat melewati langkah ini.
Tambahkan alamat IP Flink dan sumber data ke daftar putih instans ApsaraDB for SelectDB Anda. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih alamat IP.
Flink dan Flink Doris Connector telah diinstal.
Tabel berikut menjelaskan persyaratan versi.
Versi Flink
Versi Flink Doris Connector
URL Unduh
Realtime Compute for Apache Flink: 1.17 atau lebih baru
Open source: 1.15 atau lebih baru
Versi 1.5.2 atau lebih baru. Disarankan menggunakan versi terbaru.
Untuk informasi selengkapnya tentang cara menginstal Flink Doris Connector, lihat Cara menginstal Flink Doris Connector.
Cara menginstal Flink Doris Connector
Anda dapat menginstal Flink Doris Connector sesuai skenario Anda.
Jika Anda mengimpor data ke SelectDB menggunakan Realtime Compute for Apache Flink, Anda dapat menggunakan fitur connector kustom untuk mengunggah, menggunakan, dan memperbarui
Flink Doris Connector. Untuk informasi selengkapnya, lihat Kelola connector kustom.Jika Anda menggunakan kluster Flink open-source yang dikelola sendiri, Anda dapat mengunduh paket JAR untuk versi
Flink Doris Connectoryang sesuai dan menempatkannya di direktori `lib` instalasi Flink Anda. Untuk tautan unduhan, lihat Paket JAR.Untuk mengimpor
Flink Doris Connectormenggunakan Maven, tambahkan kode berikut ke file konfigurasi dependensi proyek Anda. Untuk versi lainnya, lihat Repositori Maven.<!-- flink-doris-connector --> <dependency> <groupId>org.apache.doris</groupId> <artifactId>flink-doris-connector-1.16</artifactId> <version>1.5.2</version> </dependency>
Contoh penggunaan
Lingkungan contoh
Bagian ini memberikan contoh cara menggunakan Flink SQL, Flink CDC, dan DataStream untuk memigrasikan data dari tabel `employees` di database `test` instans MySQL ke tabel `employees` di database `test` instans SelectDB. Anda dapat memodifikasi parameter sesuai kebutuhan. Lingkungan contohnya sebagai berikut:
Lingkungan standalone Flink 1.16
Java
Database tujuan: test
Tabel tujuan: employees
Database sumber: test
Tabel sumber: employees
Persiapkan lingkungan
Persiapkan lingkungan Flink
Persiapkan lingkungan Java.
Flink bergantung pada lingkungan Java. Anda harus menginstal Java Development Kit (JDK) dan mengonfigurasi variabel lingkungan
JAVA_HOME.Versi Java yang diperlukan bergantung pada versi Flink. Untuk informasi selengkapnya tentang versi Java yang didukung Flink, lihat Kompatibilitas Java. Dalam contoh ini, Java 8 diinstal. Untuk informasi selengkapnya, lihat Instal JDK.
Unduh paket instalasi Flink `flink-1.16.3-bin-scala_2.12.tgz`. Jika versi ini kedaluwarsa, Anda dapat mengunduh versi lainnya. Untuk versi lainnya, lihat Apache Flink.
wget https://www.apache.si/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgzEkstrak paket tersebut.
tar -zxvf flink-1.16.3-bin-scala_2.12.tgzBuka direktori `lib` instalasi Flink Anda dan tambahkan connector yang diperlukan untuk operasi berikut:
Tambahkan Flink Doris Connector.
wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.16/1.5.2/flink-doris-connector-1.16-1.5.2.jarTambahkan Flink MySQL Connector.
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
Jalankan kluster Flink.
Jalankan perintah berikut di direktori `bin` instalasi Flink.
./start-cluster.sh
Persiapkan database dan tabel SelectDB tujuan
Buat instans ApsaraDB for SelectDB. Untuk informasi selengkapnya, lihat Buat instans.
Hubungkan ke instans tersebut. Untuk informasi selengkapnya, lihat Hubungkan ke instans.
Buat database `test`.
CREATE DATABASE test;Buat tabel `employees`.
USE test; -- Buat tabel. CREATE TABLE employees ( emp_no int NOT NULL, birth_date date, first_name varchar(20), last_name varchar(20), gender char(2), hire_date date ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Persiapkan database dan tabel MySQL sumber
Buat instans MySQL. Untuk informasi selengkapnya, lihat Buat instans ApsaraDB RDS for MySQL dan konfigurasikan database dengan cepat.
Buat database `test`.
CREATE DATABASE test;Buat tabel `employees`.
USE test; CREATE TABLE employees ( emp_no INT NOT NULL PRIMARY KEY, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE );Masukkan data.
INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) VALUES (1001, '1985-05-15', 'John', 'Doe', 'M', '2010-06-20'), (1002, '1990-08-22', 'Jane', 'Smith', 'F', '2012-03-15'), (1003, '1987-11-02', 'Robert', 'Johnson', 'M', '2015-07-30'), (1004, '1992-01-18', 'Emily', 'Davis', 'F', '2018-01-05'), (1005, '1980-12-09', 'Michael', 'Brown', 'M', '2008-11-21');
Impor data menggunakan Flink SQL
Jalankan layanan Flink SQL Client.
Jalankan perintah berikut di direktori `bin` instalasi Flink.
./sql-client.shKirim pekerjaan Flink di Flink SQL Client.
Buat tabel sumber MySQL.
Dalam pernyataan berikut, item dalam klausa `WITH` adalah konfigurasi untuk
MySQL CDC Source. Untuk informasi selengkapnya tentang item konfigurasi, lihat MySQL | Apache Flink CDC.CREATE TABLE employees_source ( emp_no INT, birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE, PRIMARY KEY (`emp_no`) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '127.0.0.1', 'port' = '3306', 'username' = 'root', 'password' = '****', 'database-name' = 'test', 'table-name' = 'employees' );Buat tabel sink SelectDB.
Dalam pernyataan berikut, item dalam klausa `WITH` adalah konfigurasi untuk SelectDB. Untuk informasi selengkapnya tentang item konfigurasi, lihat Item konfigurasi sink.
CREATE TABLE employees_sink ( emp_no INT , birth_date DATE, first_name STRING, last_name STRING, gender STRING, hire_date DATE ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.employees', 'username' = 'admin', 'password' = '****' );Sinkronkan data dari tabel sumber MySQL ke tabel sink SelectDB.
INSERT INTO employees_sink SELECT * FROM employees_source;
Lihat hasil impor data.
Hubungkan ke SelectDB dan jalankan pernyataan berikut untuk melihat hasilnya.
SELECT * FROM test.employees;
Impor data menggunakan Flink CDC
Realtime Compute for Apache Flink tidak mendukung pekerjaan JAR. Sebagai gantinya, layanan ini mendukung pekerjaan YAML untuk CDC 3.0.
Bagian ini menjelaskan cara menggunakan Flink CDC untuk mengimpor data dari database ke SelectDB.
Di direktori instalasi Flink, Anda dapat menggunakan program flink untuk menjalankan Flink CDC. Sintaksnya sebagai berikut:
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
<mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database> \
--database <selectdb-database-name> \
[--job-name <flink-job-name>] \
[--table-prefix <selectdb-table-prefix>] \
[--table-suffix <selectdb-table-suffix>] \
[--including-tables <mysql-table-name|name-regular-expr>] \
[--excluding-tables <mysql-table-name|name-regular-expr>] \
--mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
--oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
--sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
[--table-conf <selectdb-table-conf> [--table-conf <selectdb-table-conf> ...]]Parameter
Parameter | Deskripsi |
execution.checkpointing.interval | Interval checkpoint Flink. Ini memengaruhi frekuensi sinkronisasi data. Disarankan nilai `10s`. |
parallelism.default | Tingkat paralelisme untuk pekerjaan Flink. Tingkat paralelisme yang lebih tinggi dapat meningkatkan kecepatan sinkronisasi data. |
job-name | Nama pekerjaan Flink. |
database | Nama database di SelectDB tempat data disinkronkan. |
table-prefix | Awalan untuk nama tabel SelectDB. Contoh: |
table-suffix | Akhiran untuk nama tabel SelectDB. |
including-tables | Tabel yang akan disinkronkan. Gunakan tanda pipa vertikal (|) untuk memisahkan beberapa tabel. Ekspresi reguler didukung. Misalnya, |
excluding-tables | Tabel yang tidak disinkronkan. Konfigurasikan parameter ini dengan cara yang sama seperti `including-tables`. |
mysql-conf | Konfigurasi untuk MySQL CDC Source. Untuk informasi selengkapnya, lihat MySQL CDC Connector. Parameter |
oracle-conf | Konfigurasi untuk Oracle CDC Source. Untuk informasi selengkapnya, lihat Oracle CDC Connector. Parameter |
sink-conf | Semua konfigurasi untuk Doris Sink. Untuk informasi selengkapnya, lihat Item konfigurasi sink. |
table-conf | Item konfigurasi untuk tabel SelectDB. Ini adalah item yang terdapat dalam `properties` saat Anda membuat tabel SelectDB. |
Untuk sinkronisasi, Anda harus menambahkan dependensi Flink CDC yang sesuai ke direktori `$FLINK_HOME/lib`, seperti `flink-sql-connector-mysql-cdc-${version}.jar` atau `flink-sql-connector-oracle-cdc-${version}.jar`.
Versi Flink 1.15 dan yang lebih baru mendukung sinkronisasi database penuh. Untuk tautan unduhan berbagai versi Flink Doris Connector, lihat Flink Doris Connector.
Sink item konfigurasi
Parameter | Nilai default | Wajib | Deskripsi |
fenodes | None | Ya | Titik akhir dan port protokol HTTP instans ApsaraDB for SelectDB. Anda dapat memperoleh VPC Endpoint (atau Public Endpoint) dan HTTP Port dari halaman Instance Details > Network Information di konsol ApsaraDB for SelectDB. Contoh: |
table.identifier | None | Ya | Nama database dan tabel. Contoh: |
username | None | Ya | Username database untuk instans ApsaraDB for SelectDB. |
password | None | Ya | Password untuk pengguna database instans ApsaraDB for SelectDB. |
jdbc-url | None | Tidak | Informasi koneksi JDBC untuk instans ApsaraDB for SelectDB. Anda dapat memperoleh VPC Endpoint (atau Public Endpoint) dan MySQL Port dari halaman Instance Details > Network Information di konsol ApsaraDB for SelectDB. Contoh: |
auto-redirect | true | Tidak | Menentukan apakah permintaan Stream Load dialihkan. Jika diaktifkan, Stream Load menulis melalui frontend (FE), dan informasi backend (BE) tidak lagi diperoleh secara eksplisit. |
doris.request.retries | 3 | Tidak | Jumlah percobaan ulang untuk mengirim permintaan ke SelectDB. |
doris.request.connect.timeout | 30s | Tidak | Timeout koneksi untuk mengirim permintaan ke SelectDB. |
doris.request.read.timeout | 30s | Tidak | Timeout baca untuk mengirim permintaan ke SelectDB. |
sink.label-prefix | "" | Ya | Awalan label yang digunakan untuk impor Stream Load. Dalam skenario two-phase commit (2PC), ini harus unik secara global untuk menjamin Exactly-Once Semantics (EOS) Flink. |
sink.properties | None | Tidak | Parameter impor untuk Stream Load. Tentukan konfigurasi properti.
Untuk informasi selengkapnya tentang parameter, lihat Stream Load. |
sink.buffer-size | 1048576 | Tidak | Ukuran buffer data tulis dalam byte. Jangan ubah parameter ini. Gunakan konfigurasi default 1 MB. |
sink.buffer-count | 3 | Tidak | Jumlah buffer data tulis. Jangan ubah parameter ini. Gunakan konfigurasi default. |
sink.max-retries | 3 | Tidak | Jumlah maksimum percobaan ulang setelah kegagalan pada fase commit. Nilai default adalah 3. |
sink.use-cache | false | Tidak | Menentukan apakah cache memori digunakan untuk pemulihan saat terjadi pengecualian. Jika diaktifkan, cache menyimpan data dari periode checkpoint. |
sink.enable-delete | true | Tidak | Menentukan apakah event penghapusan disinkronkan. Hanya model Unique Key yang didukung. |
sink.enable-2pc | true | Tidak | Menentukan apakah protokol two-phase commit (2PC) diaktifkan. Nilai default adalah true, yang menjamin EOS. |
sink.enable.batch-mode | false | Tidak | Menentukan apakah mode batch digunakan untuk menulis ke SelectDB. Jika diaktifkan, waktu penulisan tidak bergantung pada checkpoint. Sebaliknya, waktu penulisan dikontrol oleh parameter `sink.buffer-flush.max-rows`, `sink.buffer-flush.max-bytes`, dan `sink.buffer-flush.interval`. Saat mode ini diaktifkan, EOS tidak dijamin. Anda dapat menggunakan model Unique Key untuk mencapai idempotensi. |
sink.flush.queue-size | 2 | Tidak | Ukuran antrean cache dalam mode pemrosesan batch. |
sink.buffer-flush.max-rows | 50000 | Tidak | Jumlah maksimum baris data yang ditulis dalam satu batch dalam mode pemrosesan batch. |
sink.buffer-flush.max-bytes | 10MB | Tidak | Jumlah maksimum byte yang ditulis dalam satu batch dalam mode pemrosesan batch. |
sink.buffer-flush.interval | 10s | Tidak | Interval untuk flushing cache secara asinkron dalam mode pemrosesan batch. Nilai minimum adalah 1s. |
sink.ignore.update-before | true | Tidak | Menentukan apakah event `update-before` diabaikan. Secara default, event ini diabaikan. |
Contoh sinkronisasi
Contoh sinkronisasi MySQL
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****Contoh sinkronisasi Oracle
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
oracle-sync-database \
--database test_db \
--oracle-conf hostname=127.0.0.1 \
--oracle-conf port=1521 \
--oracle-conf username=admin \
--oracle-conf password="password" \
--oracle-conf database-name=XE \
--oracle-conf schema-name=ADMIN \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****Contoh sinkronisasi PostgreSQL
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
postgres-sync-database \
--database db1\
--postgres-conf hostname=127.0.0.1 \
--postgres-conf port=5432 \
--postgres-conf username=postgres \
--postgres-conf password="123456" \
--postgres-conf database-name=postgres \
--postgres-conf schema-name=public \
--postgres-conf slot.name=test \
--postgres-conf decoding.plugin.name=pgoutput \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****Contoh sinkronisasi SQL Server
<FLINK_HOME>/bin/flink run \
-Dexecution.checkpointing.interval=10s \
-Dparallelism.default=1 \
-c org.apache.doris.flink.tools.cdc.CdcTools \
lib/flink-doris-connector-1.16-1.5.2.jar \
sqlserver-sync-database \
--database db1\
--sqlserver-conf hostname=127.0.0.1 \
--sqlserver-conf port=1433 \
--sqlserver-conf username=sa \
--sqlserver-conf password="123456" \
--sqlserver-conf database-name=CDC_DB \
--sqlserver-conf schema-name=dbo \
--including-tables "tbl1|test.*" \
--sink-conf fenodes=selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080 \
--sink-conf username=admin \
--sink-conf password=****Impor data menggunakan API DataStream
Dalam proyek Maven Anda, tambahkan dependensi yang diperlukan.
Dependensi Maven yang diperlukan
Kode inti Java.
Dalam kode berikut, parameter untuk tabel sumber MySQL dan tabel sink SelectDB sesuai dengan konfigurasi di bagian Impor data menggunakan Flink SQL. Untuk informasi selengkapnya, lihat MySQL | Apache Flink CDC dan Item konfigurasi sink.
package org.example; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.serializer.JsonDebeziumSchemaSerializer; import org.apache.doris.flink.tools.cdc.mysql.DateToStringConverter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Main { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(10000); Map<String, Object> customConverterConfigs = new HashMap<>(); customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, "numeric"); JsonDebeziumDeserializationSchema schema = new JsonDebeziumDeserializationSchema(false, customConverterConfigs); // Konfigurasikan tabel sumber MySQL. MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .hostname("rm-xxx.mysql.rds.aliyuncs***") .port(3306) .startupOptions(StartupOptions.initial()) .databaseList("db_test") .tableList("db_test.employees") .username("root") .password("test_123") .debeziumProperties(DateToStringConverter.DEFAULT_PROPS) .deserializer(schema) .serverTimeZone("Asia/Shanghai") .build(); // Konfigurasikan tabel sink SelectDB. DorisSink.Builder<String> sinkBuilder = DorisSink.builder(); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); dorisBuilder.setFenodes("selectdb-cn-xxx-public.selectdbfe.rds.aliyunc****:8080") .setTableIdentifier("db_test.employees") .setUsername("admin") .setPassword("test_123"); DorisOptions dorisOptions = dorisBuilder.build(); // Konfigurasikan parameter Stream Load di sink.properties. Properties properties = new Properties(); properties.setProperty("format", "json"); properties.setProperty("read_json_by_line", "true"); DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder.setStreamLoadProp(properties); sinkBuilder.setDorisExecutionOptions(executionBuilder.build()) .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build()) //serialize according to string .setDorisOptions(dorisOptions); DataStreamSource<String> dataStreamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source"); dataStreamSource.sinkTo(sinkBuilder.build()); env.execute("MySQL to SelectDB"); } }
Penggunaan lanjutan
Perbarui kolom parsial menggunakan Flink SQL
-- aktifkan checkpoint
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE cdc_mysql_source (
id INT
,name STRING
,bank STRING
,age INT
,PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'password',
'database-name' = 'database',
'table-name' = 'table'
);
CREATE TABLE selectdb_sink (
id INT,
name STRING,
bank STRING,
age INT
)
WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'database.table',
'username' = 'admin',
'password' = '****',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.properties.columns' = 'id,name,bank,age',
'sink.properties.partial_columns' = 'true' -- Aktifkan pembaruan kolom parsial.
);
INSERT INTO selectdb_sink SELECT id,name,bank,age FROM cdc_mysql_source;
Hapus data berdasarkan kolom tertentu menggunakan Flink SQL
Dalam skenario sumber CDC, Doris Sink membedakan jenis event berdasarkan `RowKind` dan memberikan nilai ke kolom tersembunyi __DORIS_DELETE_SIGN__ untuk melakukan penghapusan. Dalam skenario sumber pesan Kafka, Doris Sink tidak dapat langsung menggunakan `RowKind` untuk membedakan jenis operasi. Komponen ini harus mengandalkan bidang tertentu dalam pesan untuk menandai jenis operasi, seperti {"op_type":"delete",data:{...}}. Untuk data jenis ini, jika Anda ingin menghapus catatan di mana `op_type` bernilai 'delete', Anda harus secara eksplisit meneruskan nilai untuk kolom tersembunyi berdasarkan logika bisnis Anda. Contoh Flink SQL berikut menunjukkan cara menghapus data di SelectDB berdasarkan bidang tertentu dalam data Kafka.
-- Data contoh: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
CREATE TABLE KAFKA_SOURCE(
data STRING,
op_type STRING
) WITH (
'connector' = 'kafka',
...
);
CREATE TABLE SELECTDB_SINK(
id INT,
name STRING,
__DORIS_DELETE_SIGN__ INT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'db.table',
'username' = 'admin',
'password' = '****',
'sink.enable-delete' = 'false', -- false menunjukkan bahwa jenis event tidak diperoleh dari RowKind.
'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- Secara eksplisit tentukan kolom yang akan diimpor untuk Stream Load.
);
INSERT INTO SELECTDB_SINK
SELECT json_value(data,'$.id') as id,
json_value(data,'$.name') as name,
if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
FROM KAFKA_SOURCE;FAQ
Q: Bagaimana cara menulis data tipe BITMAP?
A: Contoh berikut menunjukkan caranya:
CREATE TABLE bitmap_sink ( dt INT, page STRING, user_id INT ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'test.bitmap_test', 'username' = 'admin', 'password' = '****', 'sink.label-prefix' = 'selectdb_label', 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)' );Q: Bagaimana cara mengatasi error:
errCode = 2, detailMessage = Label[label_0_1]has already been used, relate to txn[19650]?A: Dalam skenario Exactly-Once, pekerjaan Flink harus dimulai ulang dari checkpoint atau savepoint terbaru. Jika tidak, error ini terjadi. Jika Exactly-Once tidak diperlukan, Anda dapat mengatasi error ini dengan menonaktifkan protokol two-phase commit (2PC) (
sink.enable-2pc=false) atau menggunakan `sink.label-prefix` yang berbeda.Q: Bagaimana cara mengatasi error:
errCode = 2, detailMessage = transaction[19650]not found?A: Error ini terjadi selama fase commit. ID transaksi yang dicatat dalam checkpoint telah kedaluwarsa di sisi SelectDB. Melakukan commit transaksi lagi menyebabkan error ini. Anda tidak dapat memulai ulang dari checkpoint dalam kasus ini. Anda dapat memperpanjang waktu kedaluwarsa dengan memodifikasi parameter
streaming_label_keep_max_seconddi SelectDB. Nilai default adalah 12 jam.Q: Bagaimana cara mengatasi error:
errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100?A: Error ini terjadi karena jumlah pekerjaan impor data konkuren untuk database yang sama melebihi 100. Anda dapat mengatasi masalah ini dengan menyesuaikan parameter SelectDB
max_running_txn_num_per_db. Untuk informasi selengkapnya, lihat max_running_txn_num_per_db.Error ini juga dapat terjadi jika Anda sering mengubah label dan memulai ulang pekerjaan. Dalam skenario two-phase commit (2PC) yang menggunakan model Duplicate atau Aggregate Key, label untuk setiap pekerjaan harus unik. Saat memulai ulang dari checkpoint, pekerjaan Flink hanya membatalkan transaksi yang dimulai tetapi belum selesai (pre-committed tetapi belum committed). Jika Anda sering mengubah label dan memulai ulang, banyak transaksi yang berhasil pre-committed (txn) tidak dapat dibatalkan, yang menghabiskan slot transaksi. Dalam model Unique Key, Anda juga dapat menonaktifkan 2PC dan merancang sink untuk melakukan penulisan idempoten.
Q: Saat Flink menulis ke tabel yang menggunakan model Unique Key, bagaimana cara memastikan urutan data dalam satu batch?
A: Anda dapat menambahkan konfigurasi kolom sequence untuk memastikan urutan. Untuk informasi selengkapnya, lihat SEQUENCE.
Q: Mengapa data tidak disinkronkan meskipun pekerjaan Flink tidak melaporkan error?
A: Sebelum versi 1.1.0, connector menggunakan penulisan batch yang didorong data. Anda harus memeriksa apakah data ditulis dari sumber hulu. Mulai versi 1.1.0, penulisan bergantung pada checkpoint. Anda harus mengaktifkan checkpoint untuk menulis data.
Q: Bagaimana cara mengatasi error:
tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235?A: Error ini biasanya terjadi pada versi connector sebelum 1.1.0. Hal ini disebabkan oleh frekuensi penulisan yang terlalu tinggi, sehingga menghasilkan terlalu banyak versi. Anda dapat mengurangi frekuensi Stream Load dengan mengatur parameter
sink.buffer-flush.max-bytesdansink.buffer-flush.interval.Q: Bagaimana cara melewati data kotor selama impor Flink?
A: Selama impor Flink, jika terdapat data kotor, seperti masalah format atau panjang bidang, Stream Load gagal, dan Flink terus mencoba ulang. Untuk melewati data kotor, Anda dapat menonaktifkan mode ketat Stream Load (
strict_mode=false,max_filter_ratio=1) atau menyaring data sebelum mencapai operator sink.Q: Bagaimana korespondensi antara tabel sumber dan tabel SelectDB?
A: Saat Anda menggunakan Flink Doris Connector untuk mengimpor data, Anda harus memastikan dua hal. Pertama, kolom dan tipe tabel sumber harus sesuai dengan kolom dan tipe di Flink SQL. Kedua, kolom dan tipe di Flink SQL harus sesuai dengan kolom dan tipe tabel SelectDB.
Q: Bagaimana cara mengatasi error:
TApplicationException: get_next failed: out of sequence response: expected 4 but got 3?A: Error ini disebabkan oleh bug konkurensi dalam framework Thrift. Untuk mengatasi masalah ini, gunakan versi connector terbaru yang memungkinkan dan versi Flink yang kompatibel.
Q: Bagaimana cara mengatasi error:
DorisRuntimeException: Fail to abort transaction 26153 with urlhttp://192.168.XX.XX?A: Di log TaskManager, cari
abort transaction response. Kode respons HTTP menunjukkan apakah masalah berasal dari client atau server.