All Products
Search
Document Center

Realtime Compute for Apache Flink:Message Queue for Kafka

Last Updated:Mar 22, 2026

Topik ini menjelaskan konektor Message Queue for Apache Kafka.

Latar Belakang

Apache Kafka adalah sistem Message Queue open-source terdistribusi yang banyak digunakan di bidang data besar untuk Pemrosesan Data berkinerja tinggi, Analitik Streaming, dan Integrasi Data. Konektor Kafka untuk Realtime Compute for Apache Flink menggunakan Apache Kafka Client open-source untuk menyediakan throughput data berkinerja tinggi, mendukung pembacaan dan penulisan berbagai Format Data, serta menawarkan Semantik tepat-sekali.

Kategori

Deskripsi

Tipe yang didukung

Tabel Sumber, Tabel Sink, dan Sink Ingesti Data

Mode Jalankan

Mode Streaming

Format Data

Format data yang didukung

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Catatan
  • Format Data Protobuf bawaan hanya didukung untuk Ververica Runtime (VVR) 8.0.9 dan versi setelahnya.

  • Setiap Format Data yang didukung memiliki Parameter terkait yang dapat ditentukan dalam klausa WITH. Untuk informasi selengkapnya, lihat Formats.

Metrik

Metrik

  • Tabel Sumber

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Sink Table

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi selengkapnya tentang metrik, lihat Monitoring metrics.

Tipe API

SQL API, DataStream API, dan Data Ingestion YAML API

Memperbarui atau menghapus data di Tabel Sink

Konektor hanya mendukung penyisipan data ke Tabel Sink. Pembaruan dan penghapusan tidak didukung.

Catatan

Untuk informasi selengkapnya tentang cara memperbarui atau menghapus data di Tabel Sink, lihat Upsert Kafka.

Prasyarat

Lengkapi prasyarat berdasarkan tipe kluster Kafka Anda:

  • Menghubungkan ke kluster ApsaraMQ for Kafka

    • Versi kluster Kafka minimal 0.11.

    • Anda telah membuat kluster ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Langkah 3: Buat sumber daya.

    • Ruang kerja Flink dan kluster Kafka berada dalam Virtual Private Cloud (VPC) yang sama, dan Anda telah menambahkan Blok CIDR ruang kerja Flink ke daftar putih ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Konfigurasi daftar putih.

    Penting

    Batasan penulisan data ke ApsaraMQ for Kafka:

    • ApsaraMQ for Kafka tidak mendukung penulisan data dalam format kompresi Zstandard (zstd).

    • ApsaraMQ for Kafka tidak mendukung penulisan idempoten atau transaksional, sehingga mencegah penggunaan semantik tepat-sekali yang disediakan oleh tabel sink Kafka. Mulai dari Realtime Compute Engine VVR 8.0.0, konektor Kafka menggunakan Kafka client 3.x, di mana properti properties.enable.idempotence secara default bernilai true. Oleh karena itu, untuk mencegah kegagalan penulisan saat menggunakan Realtime Compute Engine VVR 8.0.0 atau versi setelahnya untuk menulis ke ApsaraMQ for Kafka, Anda harus menambahkan konfigurasi properties.enable.idempotence=false ke definisi tabel sink Anda. Untuk perbandingan mesin penyimpanan dan batasan fitur ApsaraMQ for Kafka, lihat Perbandingan antara mesin penyimpanan.

  • Menghubungkan ke kluster Apache Kafka yang dikelola sendiri

    • Versi kluster Apache Kafka yang dikelola sendiri minimal 0.11.

    • Terdapat konektivitas jaringan antara ruang kerja Flink dan kluster Apache Kafka yang dikelola sendiri. Untuk detail cara menghubungkan ke kluster melalui internet publik, lihat FAQ tentang konektivitas jaringan.

    • Hanya opsi konfigurasi klien untuk Apache Kafka versi 2.8 yang didukung. Untuk informasi selengkapnya, lihat dokumentasi Apache Kafka Consumer Configs dan Producer Configs.

Catatan

Penulisan transaksional tidak disarankan karena adanya keterbatasan desain yang diketahui pada Apache Flink dan Apache Kafka. Saat Anda mengatur sink.delivery-guarantee = 'exactly-once', konektor Kafka mengaktifkan penulisan transaksional, dengan masalah yang diketahui sebagai berikut:

  • ID Transaksi baru dihasilkan untuk setiap checkpoint. Jika interval checkpoint terlalu pendek, jumlah ID Transaksi yang berlebihan dapat dibuat. Hal ini dapat menyebabkan koordinator di kluster Apache Kafka kehabisan memori, sehingga mengganggu stabilitas kluster.

  • Instance Producer dibuat untuk setiap transaksi. Jika terlalu banyak transaksi yang dikomit secara bersamaan, TaskManager dapat kehabisan memori, sehingga mengganggu stabilitas pekerjaan Apache Flink.

  • Jika beberapa pekerjaan Apache Flink menggunakan sink.transactional-id-prefix yang sama, ID Transaksi yang dihasilkan dapat bertabrakan. Ketika operasi penulisan gagal pada satu pekerjaan, hal ini dapat mencegah Log Start Offset (LSO) partisi Apache Kafka maju. Hal ini memengaruhi semua konsumen partisi tersebut.

Jika Anda memerlukan Semantik tepat-sekali, gunakan konektor Upsert Kafka untuk menulis ke tabel kunci primer, memastikan idempotensi. Jika Anda harus menggunakan penulisan transaksional, lihat Catatan penggunaan Semantik tepat-sekali.

Pemecahan Masalah Konektivitas Jaringan

Error Timed out waiting for a node assignment saat Pekerjaan Realtime Compute for Apache Flink gagal dimulai biasanya menunjukkan adanya masalah konektivitas jaringan antara Realtime Compute for Apache Flink dan kluster Kafka.

Klien Kafka terhubung ke broker sebagai berikut:

  1. Klien menggunakan alamat yang ditentukan dalam bootstrap.servers untuk membuat koneksi awal ke kluster Kafka.

  2. Kluster Kafka mengembalikan metadata untuk setiap broker, termasuk titik akhirnya.

  3. Klien kemudian menggunakan titik akhir ini untuk terhubung ke broker guna membaca atau menulis data.

Meskipun alamat bootstrap.servers dapat dijangkau, klien tidak dapat membaca atau menulis data jika Kafka mengembalikan titik akhir broker yang salah. Masalah ini sering terjadi pada arsitektur jaringan yang menggunakan proxy, penerusan port, atau jalur sewa.

Langkah Pemecahan Masalah

Message Queue for Kafka

  1. Konfirmasi tipe Titik Akhir

    • Titik Akhir Default (jaringan internal)

    • Titik Akhir SASL (jaringan internal dengan otentikasi)

    • Titik Akhir Publik (memerlukan aplikasi terpisah)

    Gunakan fitur Network Probe di konsol pengembangan Realtime Compute for Apache Flink untuk mengesampingkan masalah konektivitas dengan alamat bootstrap.servers.

  2. Periksa grup keamanan dan daftar putih

    Tambahkan Blok CIDR ruang kerja Realtime Compute for Apache Flink ke daftar putih instance Kafka Anda. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC dan Konfigurasi daftar putih.

  3. Periksa konfigurasi SASL (jika diaktifkan)

    Jika Anda menggunakan titik akhir SASL_SSL, pastikan mekanisme JAAS, SSL, dan SASL dikonfigurasi dengan benar di Pekerjaan Realtime Compute for Apache Flink Anda. Otentikasi yang hilang dapat menyebabkan koneksi gagal selama fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan otentikasi.

Kafka yang Dikelola Sendiri

  1. Gunakan fitur Network Probe

    Fitur ini membantu Anda mengesampingkan masalah konektivitas dengan alamat bootstrap.servers dan memverifikasi bahwa titik akhir internal atau publik yang benar digunakan.

  2. Periksa grup keamanan dan daftar putih

    • Grup keamanan untuk instance Elastic Compute Service (ECS) harus mengizinkan lalu lintas masuk pada port titik akhir Kafka, biasanya 9092 atau 9093.

    • Pastikan firewall apa pun pada instance ECS mengizinkan trafik dari VPC ruang kerja Realtime Compute for Apache Flink Anda. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC.

  3. Periksa konfigurasi

    1. Gunakan tool zkCli.sh atau zookeeper-shell.sh untuk login ke kluster ZooKeeper yang digunakan Kafka.

    2. Jalankan perintah untuk mendapatkan metadata broker. Misalnya, jalankan get /brokers/ids/0. Di bidang endpoints respons, temukan alamat yang diiklankan Kafka kepada klien.

      example

    3. Gunakan fitur Network Probe di konsol pengembangan Realtime Compute for Apache Flink untuk menguji apakah alamat ini dapat diakses.

      Catatan
      • Jika alamat tidak dapat diakses, hubungi administrator Kafka Anda untuk memeriksa dan memperbaiki konfigurasi listeners dan advertised.listeners agar alamat yang diiklankan dapat diakses dari Realtime Compute for Apache Flink.

      • Untuk informasi selengkapnya tentang koneksi klien Kafka, lihat Troubleshoot Connectivity.

  4. Periksa konfigurasi SASL (jika diaktifkan)

    Jika Anda menggunakan titik akhir SASL_SSL, pastikan mekanisme JAAS, SSL, dan SASL dikonfigurasi dengan benar di Pekerjaan Realtime Compute for Apache Flink Anda. Otentikasi yang hilang dapat menyebabkan koneksi gagal selama fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan otentikasi.

SQL

Konektor Kafka dapat digunakan sebagai tabel sumber atau tabel sink dalam pekerjaan SQL.

Sintaksis

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Kolom metadata

Definisikan kolom metadata di Tabel Sumber atau Tabel Sink untuk mengakses atau menulis metadata pesan Kafka. Misalnya, saat mendefinisikan beberapa topik dalam klausa WITH, gunakan kolom metadata di Tabel Sumber untuk mengidentifikasi topik sumber setiap catatan. Kode berikut menunjukkan contohnya.

CREATE TABLE kafka_source (
  -- Baca topik pesan sebagai kolom `record_topic`
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Baca timestamp dari ConsumerRecord sebagai kolom `ts`
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Baca offset pesan sebagai kolom `record_offset`
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Tulis timestamp dari kolom `ts` sebagai timestamp ProducerRecord ke Kafka
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

Tabel berikut mencantumkan kolom metadata yang didukung oleh Tabel Sumber dan Sink Kafka.

Kunci

Tipe

Deskripsi

Cakupan

topic

STRING NOT NULL METADATA VIRTUAL

Topik pesan.

Tabel sumber

partition

INT NOT NULL METADATA VIRTUAL

ID partisi pesan.

Tabel sumber

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Header pesan.

Tabel sumber dan tabel sink

leader-epoch

INT NOT NULL METADATA VIRTUAL

Leader-epoch pesan.

Tabel sumber

offset

BIGINT NOT NULL METADATA VIRTUAL

Offset pesan.

Tabel sumber

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

Timestamp pesan.

Tabel sumber dan tabel sink

timestamp-type

STRING NOT NULL METADATA VIRTUAL

Tipe timestamp pesan. Nilai yang valid adalah:

  • NoTimestampType: Tidak ada timestamp yang didefinisikan dalam pesan.

  • CreateTime: Waktu pesan dibuat.

  • LogAppendTime: Waktu pesan ditambahkan ke log broker Kafka.

Tabel sumber

__raw_key__

STRING NOT NULL METADATA VIRTUAL

Kunci pesan mentah.

Tabel sumber dan tabel sink

Catatan

Parameter ini hanya didukung di Ververica Runtime (VVR) 11.4 dan versi setelahnya.

__raw_value__

STRING NOT NULL METADATA VIRTUAL

Nilai pesan mentah.

Tabel sumber dan tabel sink

Catatan

Parameter ini hanya didukung di Ververica Runtime (VVR) 11.4 dan versi setelahnya.

DENGAN parameter

  • Umum

    Parameter

    Deskripsi

    Tipe

    Wajib

    Default

    Keterangan

    connector

    Konektor yang digunakan.

    String

    Ya

    N/A

    Nilainya harus kafka.

    properties.bootstrap.servers

    Daftar alamat broker Kafka.

    String

    Ya

    N/A

    Format: host:port,host:port,.... Pisahkan alamat dengan koma (,).

    properties.*

    Properti tambahan untuk klien Kafka.

    String

    Tidak

    N/A

    Kunci properti harus merupakan opsi yang valid sebagaimana didefinisikan dalam dokumentasi resmi Apache Kafka untuk Producer Configs dan Consumer Configs.

    Realtime Compute for Apache Flink menghapus prefiks properties. dan meneruskan pasangan kunci-nilai yang tersisa ke klien Kafka yang mendasari. Misalnya, Anda dapat mengatur 'properties.allow.auto.create.topics' = 'false' untuk menonaktifkan pembuatan topik otomatis.

    Anda tidak dapat menggunakan metode ini untuk mengonfigurasi opsi berikut karena Konektor Kafka akan menimpanya:

    • key.deserializer

    • value.deserializer

    format

    Format untuk serialisasi dan deserialisasi nilai pesan Kafka.

    String

    Tidak

    N/A

    Format yang didukung:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Catatan

    Untuk informasi selengkapnya, lihat Format options.

    key.format

    Format untuk serialisasi dan deserialisasi kunci pesan Kafka.

    String

    Tidak

    N/A

    Format yang didukung:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Catatan

    Saat menggunakan konfigurasi ini, key.options wajib diisi.

    key.fields

    Bidang dari skema tabel yang digunakan sebagai kunci pesan Kafka.

    String

    Tidak

    N/A

    Pisahkan beberapa nama bidang dengan titik koma (;). Misalnya, 'field1;field2'.

    key.fields-prefix

    Prefiks khusus untuk semua bidang kunci guna mencegah konflik nama dengan bidang nilai.

    String

    Tidak

    N/A

    Prefiks ini digunakan untuk membedakan antara bidang kunci dan bidang nilai. Prefiks ini dihapus sebelum serialisasi kunci atau setelah deserialisasi.

    Catatan

    Jika Anda menggunakan opsi ini, value.fields-include harus diatur ke EXCEPT_KEY.

    value.format

    Format untuk serialisasi dan deserialisasi nilai pesan Kafka.

    String

    Tidak

    N/A

    Konfigurasi ini setara dengan format. Anda hanya dapat mengatur salah satu dari format atau value.format. Jika keduanya dikonfigurasi, value.format akan menggantikan format.

    value.fields-include

    Menentukan apakah bidang kunci disertakan dalam format nilai.

    String

    Tidak

    ALL

    Nilai yang valid:

    • ALL: Nilai pesan Kafka mencakup semua kolom tabel.

    • EXCEPT_KEY: Nilai pesan Kafka mencakup semua kolom tabel kecuali yang didefinisikan dalam key.fields.

  • Tabel sumber

    Parameter

    Deskripsi

    Type

    Wajib

    Default

    Keterangan

    topic

    Topik atau topik yang akan dibaca.

    String

    Tidak

    N/A

    Untuk berlangganan beberapa topik, pisahkan nama mereka dengan titik koma (;), misalnya, 'topic-1;topic-2'.

    Catatan

    Anda dapat menentukan opsi ini atau topic-pattern, tetapi tidak keduanya.

    topic-pattern

    Ekspresi reguler yang menentukan topik mana yang akan berlangganan. Konsumen berlangganan ke semua topik yang namanya sesuai dengan pola ini.

    String

    Tidak

    N/A

    Catatan

    Anda dapat menentukan opsi ini atau topic, tetapi tidak keduanya.

    properties.group.id

    ID Kelompok Konsumen untuk sumber Kafka.

    String

    Tidak

    KafkaSource-{Nama-Tabel-Sumber}

    Jika Anda menggunakan ID Kelompok Konsumen untuk pertama kalinya, Anda juga harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan Offset Startup awal.

    scan.startup.mode

    Offset Startup untuk Konsumen Kafka.

    String

    Tidak

    group-offsets

    Nilai yang valid:

    • earliest-offset: Mulai membaca dari Offset paling awal yang tersedia.

    • latest-offset: Mulai membaca dari Offset terbaru.

    • group-offsets: Mulai membaca dari Offset yang dikomit dari properties.group.id yang ditentukan.

    • timestamp: Mulai membaca dari scan.startup.timestamp-millis yang ditentukan.

    • specific-offsets: Mulai membaca dari Offset yang ditentukan dalam scan.startup.specific-offsets.

    Catatan

    Opsi ini hanya berlaku saat Pekerjaan dimulai dari State bersih. Jika Pekerjaan dimulai ulang dari Checkpoint, selalu dilanjutkan dari Offset yang disimpan dalam State Checkpoint.

    scan.startup.specific-offsets

    Offset awal untuk setiap partisi saat scan.startup.mode adalah specific-offsets.

    String

    Tidak

    N/A

    Misalnya, partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    Timestamp awal dalam milidetik saat scan.startup.mode diatur ke timestamp.

    Long

    Tidak

    N/A

    Unitnya adalah milidetik.

    scan.topic-partition-discovery.interval

    Interval untuk menemukan partisi baru dalam topik.

    Duration

    Tidak

    5 menit

    Konektor secara berkala menemukan dan membaca dari partisi baru. Saat menggunakan topic-pattern, Konektor juga menemukan topik baru yang sesuai dengan pola tersebut. Atur interval ke nilai non-positif untuk menonaktifkan fitur ini.

    Catatan

    Di Ververica Runtime (VVR) 6.0.x, penemuan partisi dinamis dinonaktifkan secara default. Mulai dari VVR 8.0, fitur ini diaktifkan secara default dengan interval penemuan 5 menit.

    scan.header-filter

    Memfilter pesan berdasarkan header pesan Kafka.

    String

    Tidak

    N/A

    Kunci header dan nilainya dipisahkan dengan titik dua (:). Beberapa kondisi header dihubungkan menggunakan operator logika (& dan |). Operator logika NOT (!) juga didukung. Misalnya, depart:toy|depart:book&!env:test menyimpan data Kafka jika header berisi depart=toy atau depart=book dan tidak berisi env=test.

    Catatan
    • Opsi ini hanya didukung di Ververica Runtime (VVR) 8.0.6 dan versi setelahnya.

    • Tanda kurung dalam ekspresi tidak didukung.

    • Operasi logika dievaluasi dari kiri ke kanan.

    • Nilai header dikonversi ke string UTF-8 untuk perbandingan.

    scan.check.duplicated.group.id

    Menentukan apakah akan memeriksa apakah Konsumen aktif lain menggunakan properties.group.id.

    Boolean

    Tidak

    false

    Nilai yang valid:

    • true: Sebelum memulai Pekerjaan, sistem memeriksa duplikasi Kelompok Konsumen. Jika ditemukan, Pekerjaan gagal untuk mencegah konflik.

    • false: Memulai Pekerjaan tanpa memeriksa konflik.

    Catatan

    Opsi ini hanya didukung di Ververica Runtime (VVR) 6.0.4 dan versi setelahnya.

  • Tabel sink

    Parameter

    Deskripsi

    Type

    Wajib

    Default

    Keterangan

    topic

    Topik yang akan ditulis.

    String

    Ya

    N/A

    N/A

    sink.partitioner

    Menentukan cara memetakan catatan dari instance sink paralel ke partisi Kafka.

    String

    Tidak

    default

    Nilai yang valid:

    • default: Menggunakan partitioner Kafka default.

    • fixed: Setiap instance sink paralel menulis ke partisi Kafka tetap.

    • round-robin: Catatan didistribusikan ke partisi secara round-robin.

    • Partitioner khusus: Untuk menggunakan partitioner khusus, berikan nama kelas lengkap dari subclass FlinkKafkaPartitioner, misalnya, org.mycompany.MyPartitioner.

    sink.delivery-guarantee

    Jaminan pengiriman untuk sink.

    String

    Tidak

    at-least-once

    Nilai yang valid:

    • none: Tidak memberikan jaminan. Catatan mungkin hilang atau diduplikasi.

    • at-least-once: Menjamin bahwa tidak ada catatan yang hilang, tetapi mungkin diduplikasi.

    • exactly-once: Menggunakan transaksi Kafka untuk memberikan Semantik tepat-sekali, memastikan catatan tidak hilang maupun diduplikasi.

    Catatan

    Saat menggunakan Semantik exactly-once, Anda juga harus menentukan sink.transactional-id-prefix.

    sink.transactional-id-prefix

    Prefiks untuk ID transaksi, wajib saat sink.delivery-guarantee adalah exactly-once.

    String

    Ya, jika sink.delivery-guarantee adalah exactly-once

    N/A

    Wajib hanya saat sink.delivery-guarantee diatur ke exactly-once.

    sink.parallelism

    Paralelisme Operator sink.

    Integer

    Tidak

    N/A

    Secara default, framework menentukan Paralelisme berdasarkan Operator hulu.

Keamanan dan otentikasi

Jika kluster Kafka memerlukan koneksi aman atau otentikasi, tambahkan prefiks konfigurasi keamanan dan otentikasi terkait dengan properties. dan atur di parameter WITH. Contoh konfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS adalah sebagai berikut.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

Contoh berikut menunjukkan cara menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /* Konfigurasi SSL */
  /* Path ke truststore untuk sertifikat CA server. */
  /* File yang diunggah menggunakan Artifacts disimpan di direktori /flink/usrlib/. */
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /* Jika otentikasi klien diperlukan, Anda juga harus mengonfigurasi path ke keystore (kunci privat). */
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /* Algoritma yang digunakan untuk memverifikasi hostname server. String kosong menonaktifkan verifikasi hostname. */
  'properties.ssl.endpoint.identification.algorithm' = '',
  /* Konfigurasi SASL */
  /* Atur mekanisme SASL ke SCRAM-SHA-256. */
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /* Konfigurasi JAAS. */
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

Anda dapat menggunakan fitur Artifacts dari Konsol Komputasi Real-time untuk mengunggah sertifikat CA dan kunci privat yang disebutkan dalam contoh. File yang diunggah disimpan di direktori /flink/usrlib. Untuk menggunakan file sertifikat CA bernama my-truststore.jks, Anda dapat mengatur properti 'properties.ssl.truststore.location' di klausa WITH dengan salah satu dari dua cara berikut:

  • Atur 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks'. Metode ini menghindari pengunduhan file secara dinamis dari Object Storage Service (OSS) saat runtime, tetapi tidak mendukung Mode Debug.

  • Jika versi mesin Realtime Compute adalah VVR 11.5 atau versi setelahnya, Anda dapat mengonfigurasi properties.ssl.truststore.location dan properties.ssl.keystore.location ke path OSS absolut. Format path file adalah oss://flink-fullymanaged-<ID Ruang Kerja>/artifacts/namespaces/<Nama Namespace>/<nama file>. Metode ini mengunduh file OSS secara dinamis selama runtime Flink dan mendukung Mode Debug.

Catatan
  • Verifikasi konfigurasi Anda: Contoh dalam topik ini menunjukkan konfigurasi umum. Sebelum mengonfigurasi konektor Kafka, hubungi tim O&M Kafka Anda untuk mendapatkan pengaturan keamanan dan otentikasi yang benar.

  • Escaping: Berbeda dengan Apache Flink asli, editor SQL Realtime Compute for Apache Flink secara default melakukan escaping tanda kutip ganda ("). Oleh karena itu, Anda tidak perlu menambahkan backslash (\) untuk melakukan escaping tanda kutip ganda yang digunakan untuk username dan password dalam opsi properties.sasl.jaas.config.

Offset awal tabel sumber

Startup mode

Anda dapat mengonfigurasi opsi scan.startup.mode untuk menentukan offset dari mana tabel sumber Kafka mulai membaca data. Nilai yang valid meliputi:

  • earliest-offset: Mulai membaca dari offset paling awal.

  • latest-offset: Mulai membaca dari offset terbaru.

  • group-offsets: Mulai membaca dari offset yang dikomit untuk kelompok konsumen yang ditentukan dalam properties.group.id.

  • timestamp: Mulai membaca dari pesan pertama dengan timestamp lebih besar dari atau sama dengan nilai yang ditentukan dalam scan.startup.timestamp-millis.

  • specific-offsets: Mulai membaca dari offset partisi tertentu yang ditentukan dalam scan.startup.specific-offsets.

Catatan
  • Jika Anda tidak menentukan mode startup, default-nya adalah 'group-offsets'.

  • Opsi scan.startup.mode hanya berlaku untuk pekerjaan tanpa status. Saat pekerjaan berstatus dimulai, selalu mengonsumsi dari offset yang disimpan dalam statusnya.

Kode berikut memberikan contoh:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Konsumsi dari offset paling awal.
  'scan.startup.mode' = 'earliest-offset',
  -- Konsumsi dari offset terbaru.
  'scan.startup.mode' = 'latest-offset',
  -- Konsumsi dari offset yang dikomit kelompok konsumen "my-group".
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- Jika "my-group" digunakan pertama kali, konsumsi dimulai dari offset paling awal.
  'properties.auto.offset.reset' = 'latest', -- Jika "my-group" digunakan pertama kali, konsumsi dimulai dari offset terbaru.
  -- Konsumsi dari timestamp tertentu dalam milidetik: 1655395200000.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Konsumsi dari offset tertentu.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Prioritas offset awal

Offset awal untuk tabel sumber ditentukan berdasarkan prioritas berikut, dari tertinggi ke terendah:

Prioritas (tertinggi ke terendah)

Offset yang disimpan dalam Checkpoint atau Savepoint.

Waktu mulai yang dipilih di Konsol Komputasi Real-time selama startup pekerjaan.

Offset awal yang ditentukan oleh scan.startup.mode di klausa WITH.

Jika scan.startup.mode tidak ditentukan, group-offsets digunakan untuk memulai konsumsi dari offset kelompok konsumen yang sesuai.

Jika offset yang ditentukan oleh langkah-langkah ini tidak valid, misalnya karena telah kedaluwarsa atau terjadi masalah di kluster Kafka, sistem mereset offset sesuai kebijakan yang ditentukan dalam properties.auto.offset.reset. Jika opsi ini tidak dikonfigurasi, sistem melemparkan exception yang memerlukan intervensi pengguna.

Skenario umum adalah mengonsumsi data dengan ID kelompok konsumen baru. Tabel sumber terlebih dahulu menanyakan kluster Kafka untuk offset yang dikomit kelompok tersebut. Karena ID kelompok baru, tidak ditemukan offset yang valid. Akibatnya, sistem mereset offset sesuai kebijakan yang ditentukan dalam properties.auto.offset.reset. Oleh karena itu, saat mengonsumsi dengan ID kelompok baru, Anda harus mengonfigurasi opsi properties.auto.offset.reset.

Mengkomit offset sumber

Tabel sumber Kafka hanya mengkomit offset konsumen saat ini ke kluster Kafka setelah checkpoint berhasil. Jika interval checkpoint panjang, offset konsumen di kluster Kafka tertinggal. Selama checkpoint, tabel sumber Kafka menyimpan progres bacaannya saat ini dalam statusnya. Sistem menggunakan status ini untuk pemulihan kesalahan dan tidak bergantung pada offset yang dikomit ke kluster Kafka. Offset dikomit hanya untuk memantau progres bacaan di Kafka. Kegagalan komit offset tidak memengaruhi akurasi data.

Pemartisi sink kustom

Jika strategi partisi bawaan Kafka tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan partitioner khusus dengan memperluas kelas FlinkKafkaPartitioner. Setelah pengembangan selesai, kompilasi kode Anda menjadi paket JAR dan unggah menggunakan fitur Artifacts di konsol Realtime Compute. Setelah paket JAR diunggah dan direferensikan, atur parameter sink.partitioner di klausa WITH ke nama kelas lengkap partitioner Anda, misalnya, org.mycompany.MyPartitioner.

Kafka, Upsert Kafka, dan katalog Kafka JSON

Kafka adalah sistem antrian pesan append-only yang tidak mendukung pembaruan atau penghapusan data. Dalam SQL streaming, tabel sink Kafka standar tidak dapat menangani data Change Data Capture (CDC) hulu atau logika penarikan kembali operator seperti agregat dan join. Jika Anda perlu menulis data yang berisi perubahan atau penarikan kembali, gunakan tabel sink Upsert Kafka.

Untuk menyederhanakan sinkronisasi batch data Change Data Capture (CDC) dari satu atau beberapa tabel database hulu ke Kafka, Anda dapat menggunakan katalog Kafka JSON. Jika data yang disimpan di Kafka dalam format JSON, katalog Kafka JSON memungkinkan Anda melewati langkah mendefinisikan skema dan parameter WITH. Untuk detailnya, lihat Kelola katalog Kafka JSON.

Contoh

Contoh 1: Baca dari dan tulis ke Kafka

Contoh ini membaca data dari topik Kafka sumber dan menulisnya ke topik sink. Datanya dalam format CSV.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Contoh 2: Sinkronisasi skema dan data tabel

Anda dapat menggunakan konektor Kafka untuk menyinkronkan pesan dari topik Kafka ke Hologres secara real time. Untuk mencegah pesan duplikat di Hologres selama failover, Anda dapat menggunakan offset dan ID partisi pesan Kafka sebagai kunci primer komposit.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Opsional. Meratakan semua kolom bersarang.
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Contoh 3: Sinkronisasi kunci dan nilai Kafka

Jika kunci pesan Kafka berisi informasi relevan, Anda dapat menyinkronkan kunci dan nilainya.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Catatan

Kunci pesan Kafka tidak mendukung Evolusi Skema atau penguraian tipe otomatis. Anda harus mendeklarasikan skema secara manual.

Contoh 4: Sinkronisasi data dan lakukan komputasi

Menyinkronkan data dari Kafka ke Hologres sering kali memerlukan komputasi ringan.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
--Gunakan COALESCE untuk menangani nilai null.

Contoh 5: Uraikan JSON bersarang

Berikut adalah contoh pesan JSON:

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

Untuk menghindari penggunaan fungsi seperti JSON_VALUE(payload, '$.properties.owner') untuk mengurai bidang, Anda dapat langsung mendefinisikan struktur dalam DDL Sumber:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

Dengan cara ini, Flink mengurai JSON menjadi bidang terstruktur sekaligus selama fase baca. Kueri SQL selanjutnya kemudian dapat langsung menggunakan properties.owner tanpa panggilan fungsi tambahan, yang meningkatkan kinerja keseluruhan.

DataStream API

Penting

Untuk membaca atau menulis data dengan DataStream API, gunakan DataStream Connector yang sesuai untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi selengkapnya tentang cara menyiapkan DataStream Connector, lihat Integrasikan konektor DataStream.

  • Buat Kafka Source

    Kafka Source menyediakan kelas builder untuk membuat instance Kafka Source. Kode contoh berikut membangun Kafka Source yang mengonsumsi data dari Offset paling awal dari topik input-topic. Kelompok Konsumen adalah my-group, dan Nilai Pesan Kafka dideserialisasi sebagai string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    Untuk membangun Kafka Source, Anda harus menentukan properti berikut.

    Parameter

    Deskripsi

    BootstrapServers

    Daftar alamat broker Kafka. Atur properti ini dengan memanggil metode setBootstrapServers(String).

    GroupId

    ID Kelompok Konsumen. Atur properti ini dengan memanggil metode setGroupId(String).

    Topik atau Partisi

    Topik atau partisi untuk berlangganan. Kafka Source mendukung tiga metode berikut untuk berlangganan ke topik atau partisi:

    • Berlangganan ke semua partisi topik dalam daftar.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Pola topik: Berlangganan ke semua partisi topik yang namanya cocok dengan ekspresi reguler yang ditentukan.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Daftar partisi, tempat Anda dapat berlangganan ke partisi tertentu.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partisi 0 dari topik "topic-a"
              new TopicPartition("topic-b", 5)));  // Partisi 5 dari topik "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    Deserializer yang digunakan untuk mengurai pesan Kafka.

    Tentukan deserialisasi menggunakan metode setDeserializer(KafkaRecordDeserializationSchema). KafkaRecordDeserializationSchema mendefinisikan cara mengurai ConsumerRecord Kafka. Jika Anda hanya perlu mengurai Value dari Message Kafka, Anda dapat menggunakan salah satu metode berikut:

    • Gunakan metode setValueOnlyDeserializer(DeserializationSchema) dari kelas pembuat. DeserializationSchema menentukan cara mengurai data biner dari Message Value Kafka.

    • Gunakan kelas yang mengimplementasikan antarmuka Deserializer Kafka. Misalnya, Anda dapat menggunakan StringDeserializer untuk mengurai Nilai Pesan Kafka menjadi string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Catatan

    Untuk mengurai ConsumerRecord lengkap, Anda harus mengimplementasikan antarmuka KafkaRecordDeserializationSchema.

    POM

    Konektor DataStream Kafka tersedia di repositori pusat Maven.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    Saat menggunakan DataStream Connector Kafka, pertimbangkan properti berikut:

    • Offset Awal

      Kafka Source menentukan offset awalnya menggunakan Offset Initializer (OffsetsInitializer). Inisialisasi bawaan meliputi:

      Inisialisasi offset

      Kode

      Memulai konsumsi dari Offset paling awal.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Memulai konsumsi dari Offset terbaru.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Memulai konsumsi data yang timestamp-nya lebih besar dari atau sama dengan waktu yang ditentukan. Unitnya adalah milidetik.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Memulai konsumsi dari Offset yang dikomit oleh Kelompok Konsumen. Jika tidak ada Offset yang dikomit, menggunakan strategi reset yang ditentukan (misalnya, Offset paling awal).

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Konsumsi dimulai dari offset yang dikomit oleh kelompok konsumen, dan tidak ada kebijakan reset offset yang ditentukan.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Catatan
      • Jika inisialisasi bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan Offset Initializer khusus.

      • Jika Anda tidak menentukan Offset Initializer, default-nya adalah OffsetsInitializer.earliest().

    • Mode Streaming dan Mode Batch

      Kafka Source mendukung Mode Streaming dan Mode Batch. Secara default, beroperasi dalam Mode Streaming, di mana Pekerjaan berjalan tanpa henti hingga gagal atau dibatalkan. Untuk mengonfigurasi Kafka Source agar berjalan dalam Mode Batch, Anda dapat menggunakan setBounded(OffsetsInitializer) untuk menentukan Offset berhenti. Kafka Source keluar saat semua partisi mencapai offset berhenti yang ditentukan.

      Catatan

      Kafka Source dalam Mode Streaming biasanya tidak memiliki Offset berhenti. Namun, untuk tujuan pengujian, Anda dapat menggunakan setUnbounded(OffsetsInitializer) untuk menentukan Offset berhenti bahkan dalam Mode Streaming. Perhatikan perbedaan nama metode untuk menentukan Offset berhenti: setUnbounded untuk Mode Streaming dan setBounded untuk Mode Batch.

    • Penemuan Partisi Dinamis

      Untuk menangani penskalaan Topik atau pembuatan topik baru tanpa memulai ulang Pekerjaan Flink, Anda dapat mengaktifkan Penemuan Partisi Dinamis saat berlangganan ke topik berdasarkan pola. Fitur ini dinonaktifkan secara default dan harus diaktifkan secara eksplisit:

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Temukan partisi baru setiap 10 detik.
      Penting

      Fitur penemuan partisi dinamis bergantung pada mekanisme pembaruan metadata kluster Kafka. Jika kluster Kafka tidak memperbarui informasi partisi secara tepat waktu, partisi baru mungkin tidak ditemukan. Pastikan konfigurasi partition.discovery.interval.ms kluster Kafka sesuai dengan skenario aktual Anda.

    • Waktu Peristiwa dan Watermark

      Secara default, Kafka Source menggunakan timestamp dari Pesan Kafka sebagai Waktu Event. Anda dapat mendefinisikan strategi Watermark khusus untuk mengekstrak Waktu Event dari isi Pesan dan mengirimkan Watermark ke hulu.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      Untuk mempelajari lebih lanjut tentang strategi Watermark khusus, lihat Generating Watermarks.

      Catatan

      Jika subtugas sumber tidak aktif—misalnya, ketika partisi Kafka tidak memiliki data baru atau paralelisme sumber lebih tinggi daripada jumlah partisi Kafka—watermark untuk subtugas tersebut tidak akan maju, yang dapat menghambat komputasi jendela downstream.

      Untuk mengatasi masalah ini, pertimbangkan solusi berikut:

      • Konfigurasikan waktu tunggu idle sumber: Aktifkan properti table.exec.source.idle-timeout untuk menandai sumber idle sebagai sementara idle. Hal ini memungkinkan Watermark hilir maju.

      • Atur Paralelisme yang sesuai: Pastikan Paralelisme sumber tidak lebih besar dari jumlah partisi Kafka.

    • Offset Commit

      Saat checkpoint diaktifkan, Kafka Source mengkomit Offset konsumen saat ini ke Kafka saat Checkpoint selesai. Hal ini memastikan status Checkpoint Flink konsisten dengan Offset yang dikomit di broker Kafka. Jika checkpoint dinonaktifkan, Kafka Source mengandalkan mekanisme komit periodik otomatis konsumen Kafka internal. Fitur ini dikontrol oleh properti konsumen Kafka enable.auto.commit dan auto.commit.interval.ms.

      Catatan

      Kafka Source tidak bergantung pada offset yang dikomit untuk toleransi kesalahan dan pemulihan. Mengkomit offset hanya untuk memantau progres konsumen Kafka dan Kelompok Konsumen.

    • Properti lainnya

      Selain properti yang disebutkan, Anda dapat menggunakan setProperties(Properties) dan setProperty(String, String) untuk mengatur properti apa pun untuk Kafka Source dan konsumen Kafka yang mendasarinya. Kafka Source menyediakan properti spesifik berikut.

      Parameter

      Deskripsi

      client.id.prefix

      Menentukan prefiks ID klien untuk konsumen Kafka.

      partition.discovery.interval.ms

      Menentukan interval dalam milidetik untuk menemukan partisi baru. Nilai -1 menonaktifkan penemuan partisi dinamis.

      Catatan

      Dalam Batch Mode, properti ini secara otomatis diatur ke -1.

      register.consumer.metrics

      Menentukan apakah akan mendaftarkan metrik konsumen Kafka di Flink.

      Konfigurasi Konsumen Kafka Lainnya

      Untuk daftar lengkap konfigurasi konsumen Kafka, lihat dokumentasi resmi Apache Kafka.

      Penting

      Untuk memastikan operasi yang benar, DataStream Connector Kafka menimpa properti yang dikonfigurasi secara manual berikut:

      • key.deserializer selalu ditimpa ke org.apache.kafka.common.serialization.ByteArrayDeserializer.

      • value.deserializer selalu ditimpa ke org.apache.kafka.common.serialization.ByteArrayDeserializer.

      • auto.offset.reset.strategy ditimpa oleh strategi yang disediakan oleh OffsetsInitializer.

      Contoh berikut menunjukkan cara mengonfigurasi konsumen Kafka untuk menggunakan mekanisme SASL PLAIN dan menyediakan konfigurasi JAAS.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Pemantauan

      Kafka Source mengekspos metrik melalui sistem metrik Flink untuk pemantauan dan diagnostik.

      • Cakupan metrik

        Semua metrik untuk pembaca sumber Kafka terdaftar di bawah grup metrik KafkaSourceReader, yang merupakan subgrup dari grup metrik operator. Metrik yang terkait dengan Partisi Topik tertentu terdaftar di subgrup KafkaSourceReader.topic.<topic_name>.partition.<partition_id>.

        Misalnya, metrik Offset Konsumen saat ini (currentOffset) untuk Partisi 1 dari topik "my-topic" tersedia di .operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. Jumlah komit yang berhasil (commitsSucceeded) tersedia di .operator.KafkaSourceReader.commitsSucceeded.

      • Daftar metrik

        Metrik

        Deskripsi

        Cakupan

        currentOffset

        Offset konsumen saat ini pada sebuah partisi.

        TopicPartition

        committedOffset

        Offset terakhir yang dikomit untuk partisi.

        TopicPartition

        commitsSucceeded

        Jumlah total komit offset yang berhasil.

        KafkaSourceReader

        commitsFailed

        Jumlah komit yang gagal

        KafkaSourceReader

      • Metrik konsumen Kafka

        Metrik konsumen Kafka yang mendasari terdaftar dalam kelompok metrik KafkaSourceReader.KafkaConsumer. Sebagai contoh, metrik records-consumed-total Metrik terdaftar di .operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        Anda dapat menggunakan properti register.consumer.metrics Properti untuk menentukan apakah akan mendaftarkan metrik konsumen Kafka. Opsi ini diaktifkan secara default (true). Untuk informasi selengkapnya tentang metrik konsumen Kafka, lihat dokumentasi Apache Kafka.

  • Buat Kafka Sink

    Kafka Sink Flink menulis aliran data ke satu atau beberapa topik Kafka.

    DataStream<String> stream = ...
    
    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    
    KafkaSink<String> sink = KafkaSink.<String>builder()
            .setKafkaProducerConfig(kafkaProperties)
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                            .setTopic("my-topic")
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build())
            .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
            .build();
    
    stream.sinkTo(sink);

    Untuk membangun Kafka Sink, Anda harus mengonfigurasi properti berikut.

    Parameter

    Deskripsi

    Properti klien Kafka

    Properti bootstrap.servers Properti wajib diisi. Properti ini menentukan daftar broker Kafka yang dipisahkan koma.

    Serializer catatan

    Anda harus menyediakan KafkaRecordSerializationSchema untuk mengonversi data input menjadi ProducerRecord Kafka. Flink menyediakan builder skema yang menawarkan komponen umum, seperti serialisasi untuk kunci dan nilai pesan, pemilihan topik, dan partisi pesan. Anda juga dapat mengimplementasikan antarmuka yang sesuai untuk kontrol lebih rinci. Metode ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) dipanggil untuk setiap catatan masuk untuk menghasilkan ProducerRecord yang akan ditulis ke Kafka.

    ProducerRecord memberikan kontrol rinci tentang cara setiap catatan ditulis ke Kafka, memungkinkan Anda untuk:

    • Atur Topic tujuan.

    • Atur Message Key.

    • Menentukan Partisi tujuan.

    Jaminan pengiriman

    Parameter bootstrap.servers wajib diisi dan menentukan daftar broker Kafka yang dipisahkan koma.

    Jaminan pengiriman

    Saat checkpoint Flink diaktifkan, Kafka Sink Flink dapat memberikan semantik tepat-sekali. Selain mengaktifkan checkpoint, Anda dapat menggunakan parameter DeliveryGuarantee untuk menentukan jaminan pengiriman yang berbeda. Parameter DeliveryGuarantee menyediakan opsi berikut:

    • DeliveryGuarantee.NONE: (Default) Flink tidak memberikan jaminan. Data mungkin hilang atau diduplikasi.

    • DeliveryGuarantee.AT_LEAST_ONCE: Menjamin bahwa tidak ada data yang hilang, tetapi duplikasi mungkin terjadi.

    • DeliveryGuarantee.EXACTLY_ONCE: Menggunakan transaksi Kafka untuk memberikan semantik tepat-sekali.

      Catatan

      Saat menggunakan semantik EXACTLY_ONCE, lihat Pertimbangan untuk semantik EXACTLY_ONCE.

Ingesti Data

Anda dapat menggunakan Konektor Kafka sebagai Sumber atau Sink untuk membuat pekerjaan YAML untuk ingesti data.

Batasan

  • Gunakan Realtime Compute for Apache Flink (VVR) 11.1 atau versi setelahnya untuk mengingesti data CDC Flink dari Sumber Data Kafka.

  • Hanya JSON, Debezium JSON, dan Canal JSON yang didukung.

  • Hanya Realtime Compute for Apache Flink (VVR) 8.0.11 dan versi setelahnya yang mendukung pembacaan data dari satu tabel yang didistribusikan di beberapa Partisi.

Sintaksis

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Parameter

  • Umum

    Parameter

    Deskripsi

    Wajib

    Jenis

    Default

    Keterangan

    type

    Tipe sumber atau sink.

    Ya

    String

    N/A

    Nilainya harus kafka.

    name

    Nama sumber atau sink.

    Tidak

    String

    N/A

    N/A

    properties.bootstrap.servers

    Alamat broker Kafka.

    Ya

    String

    N/A

    Formatnya adalah host:port,host:port,host:port, dipisahkan dengan koma (,).

    properties.*

    Properti konfigurasi untuk klien Kafka.

    Tidak

    String

    N/A

    Kunci properti harus merupakan opsi yang valid sebagaimana didefinisikan dalam dokumentasi resmi Apache Kafka untuk Producer Configs dan Consumer Configs.

    Realtime Compute for Apache Flink (VVR) menghapus prefiks properties. sebelum meneruskan pasangan kunci-nilai yang tersisa ke klien Kafka yang mendasari. Misalnya, atur 'properties.allow.auto.create.topics' = 'false' untuk menonaktifkan pembuatan topik otomatis.

    key.format

    Format untuk serialisasi dan deserialisasi kunci pesan Kafka.

    Tidak

    String

    N/A

    • Untuk sumber, hanya format json yang didukung.

    • Untuk sink, nilai yang valid adalah:

      • csv

      • json

    Catatan

    Opsi ini hanya didukung di Realtime Compute for Apache Flink (VVR) 11.0.0 dan versi setelahnya.

    value.format

    Format untuk serialisasi dan deserialisasi nilai pesan Kafka.

    Tidak

    String

    debezium-json

    • Untuk sumber, nilai yang valid adalah:

      • debezium-json 

      • canal-json

      • json

    • Untuk sink, nilai yang valid adalah:

      • debezium-json 

      • canal-json

      • canal-protobuf

    Catatan
    • Format debezium-json dan canal-json memerlukan Realtime Compute for Apache Flink (VVR) versi 8.0.10 atau versi setelahnya.

    • Format json memerlukan Realtime Compute for Apache Flink (VVR) versi 11.0.0 atau versi setelahnya.

  • Parameter sumber

    Parameter

    Deskripsi

    Wajib

    Jenis

    Default

    Keterangan

    topic

    Topik atau topik yang akan dibaca.

    Tidak

    String

    N/A

    Untuk berlangganan beberapa topik, pisahkan nama mereka dengan titik koma (;), misalnya, topic-1;topic-2.

    Catatan

    Tentukan salah satu parameter ini atau topic-pattern, tetapi tidak keduanya.

    topic-pattern

    Ekspresi reguler yang cocok dengan nama topik untuk berlangganan.

    Tidak

    String

    N/A

    Catatan

    Tentukan salah satu parameter ini atau topic, tetapi tidak keduanya.

    properties.group.id

    ID kelompok konsumen.

    Tidak

    String

    N/A

    Saat menentukan ID kelompok konsumen baru, Anda harus mengatur parameter properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.

    scan.startup.mode

    Offset awal untuk konsumen Kafka.

    Tidak

    String

    group-offsets

    Nilai yang valid:

    • earliest-offset: Mulai membaca dari offset paling awal yang tersedia.

    • latest-offset: Mulai membaca dari offset terbaru.

    • group-offsets (Nilai default): Mulai membaca dari offset yang dikomit untuk properties.group.id yang ditentukan.

    • timestamp: Mulai membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

    • specific-offsets: Mulai membaca dari offset yang ditentukan oleh scan.startup.specific-offsets.

    Catatan

    Parameter ini hanya berlaku saat pekerjaan dimulai dengan startup tanpa status. Saat pekerjaan berstatus dimulai, selalu mengonsumsi dari offset yang disimpan dalam statusnya.

    scan.startup.specific-offsets

    Offset awal untuk setiap partisi saat scan.startup.mode diatur ke specific-offsets.

    Tidak

    String

    N/A

    Misalnya, partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    Timestamp awal dalam milidetik saat scan.startup.mode diatur ke timestamp.

    Tidak

    Long

    N/A

    Unitnya adalah milidetik.

    scan.topic-partition-discovery.interval

    Interval untuk menemukan partisi baru dalam topik secara dinamis.

    Tidak

    Duration

    5 menit

    Konektor secara berkala menemukan dan membaca dari partisi baru. Saat menggunakan topic-pattern, konektor juga menemukan topik baru yang cocok dengan pola tersebut. Untuk menonaktifkan penemuan, atur nilai ini ke 0 atau kurang.

    scan.check.duplicated.group.id

    Memeriksa apakah kelompok konsumen yang ditentukan oleh properties.group.id adalah duplikat.

    Tidak

    Boolean

    false

    Nilai yang valid:

    • true: Memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai. Jika ditemukan duplikat, pekerjaan gagal.

    • false: Memulai pekerjaan tanpa memeriksa konflik.

    schema.inference.strategy

    Strategi penguraian skema.

    Tidak

    String

    continuous

    Nilai yang valid:

    • continuous: Mengurai skema setiap catatan data. Jika skema tidak kompatibel, sistem menyimpulkan skema yang lebih luas dan menghasilkan event perubahan skema.

    • static: Melakukan penguraian skema hanya sekali saat pekerjaan dimulai. Data kemudian diurai berdasarkan skema awal ini, dan tidak ada event perubahan skema yang dihasilkan.

    Catatan

    scan.max.pre.fetch.records

    Jumlah maksimum pesan yang dikonsumsi dan diurai dari setiap partisi untuk inferensi skema awal.

    Tidak

    Int

    50

    Sebelum pemrosesan data dimulai, sistem melakukan pre-fetch dan mengonsumsi jumlah pesan terbaru yang ditentukan dari setiap partisi untuk menginisialisasi skema.

    key.fields-prefix

    Prefiks khusus yang ditambahkan ke nama bidang dari kunci pesan untuk menghindari konflik nama.

    Tidak

    String

    N/A

    Misalnya, jika parameter ini diatur ke key_, dan kunci pesan berisi bidang bernama a, nama bidang yang diurai menjadi key_a.

    Catatan

    Nilai key.fields-prefix tidak boleh menjadi prefiks dari nilai value.fields-prefix.

    value.fields-prefix

    Prefiks khusus yang ditambahkan ke nama bidang dari nilai pesan untuk menghindari konflik nama.

    Tidak

    String

    N/A

    Misalnya, jika parameter ini diatur ke value_, dan nilai pesan berisi bidang bernama b, nama bidang yang diurai menjadi value_b.

    Catatan

    Nilai value.fields-prefix tidak boleh menjadi prefiks dari nilai key.fields-prefix.

    metadata.list

    Kolom metadata yang akan diteruskan ke sink hilir.

    Tidak

    String

    N/A

    Kolom metadata yang tersedia meliputi topic, partition, offset, timestamp, timestamp-type, headers, dan leader-epoch. Pisahkan nama kolom dengan koma.

    scan.value.initial-schemas.ddls

    Pernyataan DDL yang menentukan skema awal untuk tabel tertentu.

    Tidak

    String

    N/A

    Gunakan titik koma (;) untuk memisahkan beberapa pernyataan DDL. Misalnya, gunakan CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT); untuk menentukan skema awal untuk tabel db1.t1 dan db1.t2, masing-masing.

    Skema tabel yang didefinisikan dalam DDL harus konsisten dengan tabel sink target dan mematuhi sintaksis SQL Flink.

    Catatan

    Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.

    ingestion.ignore-errors

    Menentukan apakah akan mengabaikan error penguraian data.

    Tidak

    Boolean

    false

    Catatan

    Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.

    ingestion.error-tolerance.max-count

    Jika ingestion.ignore-errors adalah true, parameter ini menentukan jumlah maksimum error penguraian yang dapat ditoleransi sebelum pekerjaan gagal.

    Tidak

    Integer

    -1

    Parameter ini hanya berlaku saat ingestion.ignore-errors diatur ke true. Nilai -1 menunjukkan toleransi tak terbatas, artinya exception penguraian tidak akan menyebabkan pekerjaan gagal.

    Catatan

    Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.

    • Parameter format Debezium JSON

      Parameter

      Wajib

      Tipe

      Default

      Deskripsi

      debezium-json.distributed-tables

      Tidak

      Boolean

      false

      Atur ke true jika data untuk satu tabel Debezium JSON didistribusikan di beberapa partisi.

      Catatan

      Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya.

      Penting

      Memodifikasi parameter ini memerlukan startup tanpa status.

      debezium-json.schema-include

      Tidak

      Boolean

      false

      Menentukan apakah pesan Debezium JSON menyertakan skema. Ini sesuai dengan properti value.converter.schemas.enable dalam konfigurasi Debezium Kafka Connect.

      Nilai yang valid:

      • true: Pesan Debezium JSON berisi skema.

      • false: Pesan Debezium JSON tidak berisi skema.

      debezium-json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewatkan baris yang menyebabkan exception penguraian.

      • false: Melemparkan error dan pekerjaan gagal.

      debezium-json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan mengurai semua tipe primitif sebagai tipe String saat mengurai skema tabel.

      Nilai yang valid:

      • true: Mengurai semua tipe primitif sebagai String.

      • false: Mengurai tipe berdasarkan aturan default.

    • Parameter format Canal JSON

      Parameter

      Wajib

      Tipe

      Default

      Deskripsi

      canal-json.distributed-tables

      Tidak

      Boolean

      false

      Jika data untuk satu tabel dalam Canal JSON didistribusikan di beberapa partisi, Anda harus mengaktifkan opsi ini.

      Catatan

      Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya.

      Penting

      Memodifikasi parameter ini memerlukan startup tanpa status.

      canal-json.database.include

      Tidak

      String

      N/A

      Ekspresi reguler opsional untuk memfilter changelog berdasarkan bidang metadata database dalam catatan Canal. Hanya catatan dari database yang cocok yang diproses. Ekspresi reguler ini kompatibel dengan kelas Pattern Java.

      canal-json.table.include

      Tidak

      String

      N/A

      Ekspresi reguler opsional untuk memfilter changelog berdasarkan bidang metadata table dalam catatan Canal. Hanya catatan dari tabel yang cocok yang diproses. Ekspresi reguler ini kompatibel dengan kelas Pattern Java.

      canal-json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewatkan baris saat ini jika terjadi exception penguraian.

      • false: Melemparkan error dan pekerjaan gagal dimulai.

      canal-json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan mengurai semua tipe primitif sebagai tipe String saat mengurai skema tabel.

      Nilai yang valid:

      • true: Mengurai semua tipe primitif sebagai String.

      • false: Mengurai tipe berdasarkan aturan default.

      canal-json.infer-schema.strategy

      Tidak

      String

      AUTO

      Menentukan strategi untuk mengurai skema tabel.

      Nilai yang valid:

      • AUTO: Secara otomatis mengurai skema dari data JSON. Disarankan jika data tidak berisi bidang sqlType, untuk mencegah kegagalan penguraian.

      • SQL_TYPE: Mengurai skema dari array sqlType dalam data Canal JSON. Kami menyarankan mengatur ini ke SQL_TYPE untuk mendapatkan tipe yang lebih tepat jika data berisi bidang sqlType.

      • MYSQL_TYPE: Mengurai skema dari array mysqlType dalam data Canal JSON.

      Untuk informasi selengkapnya tentang aturan pemetaan tipe sqlType, lihat Penguraian Skema Canal JSON.

      Catatan
      • Konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.1 dan versi setelahnya.

      • Nilai MYSQL_TYPE didukung di Ververica Runtime (VVR) 11.3 dan versi setelahnya.

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      Tidak

      Boolean

      true

      Menentukan apakah akan memetakan tipe MySQL TIMESTAMP ke tipe CDC TIMESTAMP.

      • true: Tipe MySQL TIMESTAMP dipetakan ke tipe CDC TIMESTAMP.

      • false: Tipe MySQL TIMESTAMP dipetakan ke tipe CDC TIMESTAMP_LTZ.

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      Tidak

      Boolean

      true

      Saat menggunakan strategi penguraian MYSQL_TYPE, menentukan apakah akan memetakan tipe MySQL TINYINT(1) ke tipe CDC BOOLEAN.

      • true: Tipe MySQL TINYINT(1) dipetakan ke tipe CDC BOOLEAN.

      • false: Tipe MySQL TINYINT(1) dipetakan ke tipe CDC TINYINT(1).

      Opsi ini hanya berlaku saat canal-json.infer-schema.strategy diatur ke MYSQL_TYPE.

    • Parameter format JSON

      Parameter

      Wajib

      Tipe

      Default

      Deskripsi

      json.timestamp-format.standard

      Tidak

      String

      SQL

      Format timestamp untuk data input dan output.

      • SQL: Mengurai timestamp input dalam format yyyy-MM-dd HH:mm:ss.s{precision}, seperti 2020-12-30 12:13:14.123.

      • ISO-8601: Mengurai timestamp input dalam format yyyy-MM-ddTHH:mm:ss.s{precision}, seperti 2020-12-30T12:13:14.123.

      json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewatkan baris saat ini jika terjadi exception penguraian.

      • false: Melemparkan error dan pekerjaan gagal dimulai.

      json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan mengurai semua tipe primitif sebagai tipe String saat mengurai skema tabel.

      Nilai yang valid:

      • true: Mengurai semua tipe primitif sebagai String.

      • false: Mengurai tipe berdasarkan aturan default.

      json.infer-schema.flatten-nested-columns.enable

      Tidak

      Boolean

      false

      Menentukan apakah akan memperluas kolom bersarang dalam data JSON secara rekursif. Nilai yang valid:

      • true: Memperluas kolom bersarang secara rekursif.

      • false: Memperlakukan kolom bersarang sebagai String.

      json.decode.parser-table-id.fields

      Tidak

      String

      N/A

      Menentukan apakah akan menggunakan nilai beberapa bidang JSON untuk menghasilkan tableId saat mengurai data dalam format JSON. Nilai beberapa bidang digabungkan dengan koma Inggris ,. Misalnya, jika data JSON adalah {"col0":"a", "col1","b", "col2","c"}, hasil yang dihasilkan adalah sebagai berikut:

      Konfigurasi

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

      json.infer-schema.fixed-types

      Tidak

      String

      N/A

      Saat Anda mengurai data JSON, Anda dapat menentukan tipe data untuk bidang tertentu. Gunakan koma , untuk memisahkan beberapa bidang. Misalnya, id BIGINT, name VARCHAR(10) menentukan bahwa bidang id bertipe BIGINT dan bidang name bertipe VARCHAR(10).

      Catatan
      • Opsi konfigurasi ini hanya didukung di Ververica Runtime (VVR) 11.5 dan versi setelahnya.

      • Saat menggunakan konfigurasi ini dengan Ververica Runtime (VVR) versi 11.5, Anda juga harus menambahkan konfigurasi scan.max.pre.fetch.records: 0.

  • Parameter tabel sink

    Parameter

    Deskripsi

    Wajib

    Tipe

    Default

    Keterangan

    type

    Tipe sink.

    Ya

    String

    None.

    Nilainya harus kafka.

    name

    Nama sink.

    Tidak

    String

    None.

    N/A

    topic

    Nama topik Kafka.

    Tidak

    String

    None.

    Jika parameter ini ditentukan, semua data ditulis ke topik ini.

    Catatan

    Jika tidak ditentukan, setiap catatan ditulis ke topik yang dinamai sesuai TableID-nya. TableID dibentuk dengan menggabungkan nama database dan tabel menggunakan titik (.), misalnya databaseName.tableName.

    partition.strategy

    Strategi penulisan data ke partisi Kafka.

    Tidak

    String

    all-to-zero

    Nilai yang valid:

    • all-to-zero (default): Menulis semua data ke Partisi 0.

    • hash-by-key: Menulis data ke partisi berdasarkan nilai hash kunci primer, sehingga catatan dengan kunci primer yang sama selalu ditulis ke partisi yang sama untuk menjaga urutan.

    sink.tableId-to-topic.mapping

    Pemetaan nama tabel upstream ke nama topik Kafka downstream.

    Tidak

    String

    None.

    Pisahkan setiap pemetaan dengan titik koma (;). Dalam satu pemetaan, pisahkan nama tabel hulu dan nama topik Kafka hulu dengan titik dua (:). Nama tabel dapat menggunakan ekspresi reguler. Untuk memetakan beberapa tabel ke satu topik yang sama, pisahkan nama tabel dengan koma (,). Contoh: mydb.mytable1:topic1;mydb.mytable2:topic2.

    Catatan

    Parameter ini memungkinkan Anda mengubah topik tujuan sambil tetap mempertahankan informasi nama tabel asli.

    • Parameter format Debezium JSON

      Parameter

      Wajib

      Type

      Default

      Deskripsi

      debezium-json.include-schema.enabled

      Tidak

      Boolean

      false

      Menentukan apakah informasi skema disertakan dalam data Debezium JSON.

      debezium-json.emit.full-table-id.enabled

      Tidak

      Boolean

      false

      Menentukan apakah ID tabel tiga bagian lengkap dituliskan ke bidang metadata Debezium JSON.

      Jika parameter ini diaktifkan, pemetaannya adalah sebagai berikut:

      Bagian ID Tabel CDC

      Debezium JSON Key

      Namespace

      db

      Skema

      schema

      Tabel

      table

      Jika parameter ini dinonaktifkan, pemetaannya adalah sebagai berikut:

      Bagian ID Tabel CDC

      Debezium JSON Key

      Namespace

      Tidak dipetakan

      Skema

      db

      Tabel

      table

      Catatan

      Parameter ini hanya didukung di Ververica Runtime (VVR) 11.6 dan versi setelahnya.

Contoh

  • Gunakan Kafka sebagai Sumber Ingesti Data:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Gunakan Kafka sebagai Sink Ingesti Data:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    Modul route menentukan Topik Kafka tujuan untuk Tabel Sumber.

Catatan

Secara default, fitur pembuatan Topik otomatis dinonaktifkan untuk ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat FAQ tentang pembuatan topik otomatis. Anda harus membuat Topik sebelum menulis data ke ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Langkah 3: Buat sumber daya.

Kebijakan untuk penguraian dan evolusi skema

Konektor Kafka mempertahankan skema semua tabel yang saat ini diketahui.

Inisialisasi skema tabel

Informasi Skema Tabel mencakup informasi kolom dan tipe data, informasi database dan tabel, serta informasi Kunci Primer. Bagian berikut menjelaskan cara menginisialisasi ketiga jenis informasi ini.

  • Informasi kolom dan tipe data

Pekerjaan Ingesti Data dapat secara otomatis menyimpulkan kolom dan tipe data untuk tabel dari data. Namun, dalam beberapa skenario, Anda mungkin ingin menentukan kolom dan tipe untuk tabel tertentu. Bergantung pada granularitas tipe yang ditentukan pengguna, terdapat tiga strategi untuk menginisialisasi skema tabel:

  1. Inferensi skema otomatis penuh

Sebelum membaca data dari Kafka, konektor Kafka mencoba mengonsumsi hingga scan.max.pre.fetch.records pesan dari setiap partisi, mengurai skema setiap pesan, dan menggabungkan skema ini untuk menginisialisasi skema tabel. Event pembuatan tabel kemudian dihasilkan berdasarkan skema yang diinisialisasi ini sebelum data benar-benar dikonsumsi.

Catatan

Untuk format Debezium JSON dan Canal JSON, informasi tabel terdapat dalam setiap pesan. Pesan yang diambil sebelumnya berdasarkan parameter scan.max.pre.fetch.records mungkin berisi data dari beberapa tabel. Oleh karena itu, jumlah catatan yang diambil sebelumnya untuk satu tabel tertentu tidak dapat ditentukan. Pengambilan sebelumnya dan inisialisasi skema hanya dilakukan sekali untuk setiap partisi sebelum pesannya dikonsumsi dan diproses. Jika data untuk tabel baru muncul nanti, skema yang diurai dari catatan pertama tabel tersebut digunakan sebagai skema awalnya, dan skema tidak diambil sebelumnya atau diinisialisasi lagi.

Penting

Distribusi data dari satu tabel di beberapa partisi hanya didukung di Ververica Runtime (VVR) 8.0.11 dan versi setelahnya, dan mengharuskan Anda mengatur opsi konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.

  1. Menentukan skema tabel awal

Dalam beberapa skenario, Anda mungkin perlu menentukan skema tabel awal secara manual—misalnya, saat menulis data dari Kafka ke tabel hilir yang telah dibuat sebelumnya. Untuk melakukannya, tambahkan parameter scan.value.initial-schemas.ddls. Berikut contoh konfigurasinya:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Set the initial table schema
  scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);

Pernyataan DDL harus cocok dengan skema tabel target. Konfigurasi ini menentukan tipe awal kolom id sebagai BIGINT dan kolom name sebagai VARCHAR(10) untuk tabel db1.t1, dan tipe awal kolom id sebagai BIGINT untuk tabel db1.t2.

Pernyataan DDL menggunakan sintaksis SQL Flink.

  1. Menetapkan tipe tetap untuk bidang tertentu

Dalam beberapa skenario, Anda mungkin ingin menetapkan tipe data tetap untuk bidang tertentu. Misalnya, untuk bidang yang mungkin disimpulkan sebagai tipe TIMESTAMP, Anda mungkin ingin mengeluarkannya sebagai string. Dalam hal ini, Anda dapat menambahkan parameter json.infer-schema.fixed-types untuk menentukan skema tabel awal. Parameter ini hanya berlaku saat format pesan adalah JSON. Berikut adalah contoh konfigurasi:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Set specific fields to a fixed type
  json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
  scan.max.pre.fetch.records: 0

Konfigurasi ini menentukan bahwa semua bidang id bertipe BIGINT dan semua bidang name bertipe VARCHAR(10).

Tipe data konsisten dengan tipe SQL Flink.

  • Informasi database dan tabel

    • Untuk format Canal JSON dan Debezium JSON, konektor mengurai informasi tabel, termasuk nama database dan tabel, dari setiap pesan.

    • Untuk format JSON, secara default, informasi tabel hanya berisi nama tabel, yaitu nama topik yang berisi data. Jika data Anda berisi informasi database dan tabel, Anda dapat menggunakan parameter json.infer-schema.fixed-types untuk menentukan bidang yang berisi informasi ini. Bidang ini kemudian dipetakan ke nama database dan tabel. Berikut adalah contoh konfigurasi:

      source:
        type: kafka
        name: Kafka Source
        properties.bootstrap.servers: host:9092
        topic: test-topic
        value.format: json
        scan.startup.mode: earliest-offset
        # Use the value of the col1 field as the database name and the value of the col2 field as the table name
        json.decode.parser-table-id.fields: col1,col2

      Dengan konfigurasi ini, konektor mengirim setiap catatan ke tabel di mana nama database adalah nilai bidang col1 dan nama tabel adalah nilai bidang col2.

  • Informasi kunci primer

    • Untuk format Canal JSON, bidang pkNames dalam data JSON menentukan Kunci Primer tabel.

    • Untuk format Debezium JSON dan JSON, data tidak berisi informasi Kunci Primer. Anda dapat menambahkan Kunci Primer ke tabel secara manual dengan menggunakan aturan transform:

      transform:
        - source-table: \.*.\.*
          projection: \*
          primary-keys: key1, key2

Penguraian skema dan evolusi skema

Setelah skema tabel diinisialisasi, jika schema.inference.strategy diatur ke static, Konektor Kafka mengurai nilai pesan setiap pesan berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, Konektor Kafka mengurai nilai pesan setiap pesan Kafka, mengidentifikasi kolom fisiknya, dan membandingkan skema yang dihasilkan dengan skema yang saat ini dipertahankan. Jika skema tidak konsisten, konektor mencoba menggabungkannya dan menghasilkan event perubahan skema tabel yang sesuai. Aturan penggabungan adalah sebagai berikut:

  • Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema, dan event dihasilkan untuk menambahkannya sebagai kolom nullable.

  • Jika kolom fisik yang diurai tidak berisi bidang yang ada dalam skema saat ini, bidang tersebut dipertahankan dan nilainya diisi dengan NULL. Event penghapusan kolom tidak dihasilkan.

  • Bidang dengan nama yang sama ditangani sebagai berikut:

    • Jika kolom memiliki tipe data yang sama tetapi presisi berbeda, tipe dengan presisi lebih besar digunakan, dan event perubahan tipe kolom dihasilkan.

    • Jika kolom memiliki tipe data berbeda, sistem menemukan tipe induk umum terkecil dalam pohon hierarki tipe di bawah ini. Sistem kemudian menggunakan tipe induk umum ini untuk kolom dan menghasilkan event perubahan tipe kolom.

      image

  • Kebijakan evolusi skema yang didukung:

    • Menambahkan kolom: Konektor menambahkan kolom baru ke akhir skema dan menyinkronkan datanya. Kolom baru diatur sebagai nullable.

    • Menghapus kolom: Event penghapusan kolom tidak dihasilkan. Sebaliknya, data selanjutnya untuk kolom tersebut diisi dengan NULL.

    • Mengganti nama kolom: Konektor memperlakukannya sebagai menghapus kolom lama dan menambahkan yang baru. Kolom baru ditambahkan ke akhir skema, dan nilai untuk kolom asli diisi dengan NULL.

    • Mengubah tipe kolom:

      • Untuk Sink hilir yang mendukung perubahan tipe kolom, Pekerjaan Ingesti Data dapat menangani perubahan tipe (misalnya, dari INT ke BIGINT) jika Sink hilir dikonfigurasi untuk memprosesnya. Kemampuan ini bergantung pada aturan perubahan tipe kolom yang didukung oleh sink tertentu. Lihat dokumentasi sink Anda untuk mengetahui aturan yang didukung.

      • Untuk Sink Hilir yang tidak mendukung perubahan tipe kolom, seperti Hologres, Anda dapat menggunakan Pemetaan Tipe Lebar. Fitur ini membuat tabel dengan tipe data yang lebih lebar di Sink Hilir saat pekerjaan dimulai. Sistem kemudian dapat menoleransi perubahan tipe kolom selama tipe baru sesuai dengan tipe yang lebih lebar yang telah didefinisikan di Sink Hilir.

  • Perubahan skema yang tidak didukung:

    • Perubahan pada kendala, seperti Kunci Primer atau indeks.

    • Mengubah kolom dari NOT NULL ke NULLABLE.

  • Penguraian skema Canal JSON

    Data Canal JSON mungkin berisi bidang sqlType opsional, yang mencatat informasi tipe yang tepat untuk kolom data. Untuk mendapatkan skema yang lebih akurat, Anda dapat mengatur canal-json.infer-schema.strategy ke SQL_TYPE untuk menggunakan tipe dari bidang sqlType. Pemetaan tipe adalah sebagai berikut:

    Tipe JDBC

    Type code

    Tipe CDC

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    Tipe data lainnya

Toleransi dan pengumpulan data kotor

Sumber Data Kafka Anda mungkin berisi catatan yang rusak, juga dikenal sebagai Data Kotor. Untuk mencegah Pekerjaan Anda sering gagal dan dimulai ulang, Anda dapat mengonfigurasinya untuk mengabaikan catatan tidak valid ini. Misalnya:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Aktifkan Toleransi Data Kotor
  ingestion.ignore-errors: true
  # Toleransi hingga 1000 catatan data kotor
  ingestion.error-tolerance.max-count: 1000

Konfigurasi ini mentoleransi hingga 1.000 catatan kotor, memungkinkan Pekerjaan berjalan normal. Jika jumlah catatan kotor melebihi ambang batas ini, Pekerjaan gagal, mendorong Anda untuk memvalidasi data Anda.

Untuk memastikan Pekerjaan Anda tidak pernah gagal karena Data Kotor, gunakan konfigurasi berikut:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Aktifkan Toleransi Data Kotor
  ingestion.ignore-errors: true
  # Toleransi semua catatan data kotor
  ingestion.error-tolerance.max-count: -1

Kebijakan Toleransi Data Kotor memastikan Pekerjaan Anda tidak gagal karena catatan tidak valid. Anda mungkin juga ingin menganalisis Data Kotor ini untuk menyesuaikan perilaku produsen data Kafka Anda. Seperti dijelaskan dalam Pengumpulan Data Kotor, Anda dapat melihat Data Kotor Pekerjaan di log TaskManager. Misalnya:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Aktifkan Toleransi Data Kotor
  ingestion.ignore-errors: true
  # Toleransi semua catatan data kotor
  ingestion.error-tolerance.max-count: -1

pipeline:
  dirty-data.collector:
    # Tulis data kotor ke file log TaskManager
    type: logger

Pemetaan nama tabel dan topik

Saat menggunakan Kafka sebagai Sink untuk Job Pengambilan Data, format pesan—seperti JSON Debezium atau JSON Canal—mencakup informasi nama tabel asli. Sistem downstream yang mengonsumsi pesan tersebut umumnya menggunakan nama tabel yang disematkan sebagai pengidentifikasi tabel aktual, bukan nama topik. Oleh karena itu, Anda harus mengonfigurasi strategi pemetaan antara nama tabel dan topik dengan cermat.

Anggaplah Anda perlu menyinkronkan dua tabel dari database MySQL: mydb.mytable1 dan mydb.mytable2. Strategi pemetaan berikut tersedia:

1. Tanpa strategi pemetaan

Tanpa strategi pemetaan apa pun, data untuk setiap tabel ditulis ke topik yang dinamai dalam format <Nama Database>.<Nama Tabel>. Oleh karena itu, data dari mydb.mytable1 ditulis ke topik bernama mydb.mytable1, dan data dari mydb.mytable2 ditulis ke topik bernama mydb.mytable2. Berikut adalah contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. Pemetaan aturan rute (Tidak disarankan)

Anda mungkin ingin menulis data ke topik tertentu alih-alih menggunakan format default <Nama Database>.<Nama Tabel>. Untuk melakukan ini, Anda dapat mengonfigurasi aturan rute. Berikut adalah contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable

Dalam hal ini, semua data dari mydb.mytable1 dan mydb.mytable2 ditulis ke satu topik bernama mytable.

Namun, saat Anda menggunakan aturan rute untuk mengubah topik tujuan, hal ini juga memodifikasi nama tabel dalam pesan Kafka (dalam format Debezium JSON atau Canal JSON). Dalam kasus ini, nama tabel dalam semua pesan Kafka menjadi mytable. Hal ini dapat menyebabkan perilaku tak terduga pada sistem yang mengonsumsi pesan dari topik ini.

3. Pemetaan dengan sink.tableId-to-topic.mapping (Direkomendasikan)

Untuk memetakan nama tabel ke topik sambil mempertahankan nama tabel sumber asli, gunakan parameter sink.tableId-to-topic.mapping. Berikut adalah contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

Atau, Anda dapat menggunakan konfigurasi berikut:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

Dalam kasus ini, seluruh data dari mydb.mytable1 dan mydb.mytable2 ditulis ke topik mytable, sedangkan nama tabel dalam pesan Kafka (dalam format JSON Debezium atau JSON Canal) tetap dipertahankan sebagai mydb.mytable1 atau mydb.mytable2. Hal ini memastikan bahwa sistem downstream dapat mengidentifikasi nama tabel sumber asli secara akurat.

Semantik EXACTLY_ONCE

  • Konfigurasikan Tingkat Isolasi Konsumen

    Semua aplikasi yang mengonsumsi data Kafka harus mengatur properti isolation.level:

    • read_committed: Hanya membaca data yang dikomit.

    • read_uncommitted (Default): Dapat membaca data yang belum dikomit.

    EXACTLY_ONCE bergantung pada read_committed. Jika tidak, konsumen mungkin melihat data yang belum dikomit, sehingga mengganggu konsistensi.

  • Timeout Transaksi dan Kehilangan Data

    Saat memulihkan dari Checkpoint, Realtime Compute for Apache Flink hanya mempertimbangkan Transaksi yang dikomit sebelum Checkpoint tersebut dimulai. Jika durasi antara Kegagalan Pekerjaan dan dimulainya ulang melebihi Timeout Transaksi Kafka, Kafka secara otomatis membatalkan Transaksi yang terbuka, yang dapat menyebabkan Kehilangan Data.

    • transaction.max.timeout.ms default untuk broker Kafka adalah 15 menit.

    • Secara default, Kafka Sink Flink mengatur parameter transaction.timeout.ms ke 1 jam.

    • Anda harus meningkatkan transaction.max.timeout.ms pada broker agar lebih besar dari atau sama dengan pengaturan di Flink.

  • Kolam Produsen dan Checkpoint Bersamaan

    Mode EXACTLY_ONCE menggunakan Kolam Produsen Kafka dengan ukuran tetap. Setiap checkpoint menggunakan satu produsen dari kolam ini. Jika jumlah checkpoint bersamaan melebihi ukuran kolam, Pekerjaan gagal.

    Konfigurasikan ukuran kolam produsen berdasarkan jumlah maksimum checkpoint bersamaan.

  • Batasan Pengurangan Paralelisme

    Jika Pekerjaan gagal sebelum Checkpoint pertama selesai, informasi Kolam Produsen asli hilang saat dimulai ulang. Oleh karena itu, jangan Mengurangi Paralelisme Pekerjaan sebelum Checkpoint pertama selesai. Jika pengurangan diperlukan, Paralelisme baru tidak boleh kurang dari FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • Transaksi Memblokir Pembacaan

    Dalam mode read_committed, transaksi apa pun yang belum dikomit atau dibatalkan akan memblokir operasi baca pada seluruh Topik.

    Misalnya:

    • Transaksi 1 menulis data.

    • Transaksi 2 menulis lebih banyak data dan dikomit.

    • Selama Transaksi 1 tetap terbuka, data dari Transaksi 2 yang telah dikomit tidak terlihat oleh Konsumen.

    Hal ini memiliki implikasi berikut:

    • Selama operasi normal, latensi visibilitas data kira-kira sama dengan interval checkpoint.

    • Jika Pekerjaan gagal, Topik apa pun yang sedang ditulisnya akan diblokir untuk Konsumen hingga Pekerjaan dimulai ulang atau Transaksi habis waktunya. Dalam kasus ekstrem, proses Timeout Transaksi itu sendiri juga dapat memengaruhi operasi baca.

FAQ