全部产品
Search
文档中心

Realtime Compute for Apache Flink:Message Queue for Kafka

更新时间:Feb 07, 2026

Topik ini menjelaskan cara menggunakan Kafka connector.

Informasi latar belakang

Apache Kafka adalah layanan message queue terdistribusi open source yang banyak digunakan dalam aplikasi big data, seperti pemrosesan data berkinerja tinggi, analitik streaming, dan integrasi data. Kafka connector mendukung ingesti dan egress data ber-throughput tinggi, operasi baca-tulis data dalam berbagai format, serta semantik tepat-sekali (exactly-once semantics) untuk Realtime Compute for Apache Flink dengan memanfaatkan klien Apache Kafka.

Kategori

Deskripsi

Tipe yang didukung

Tabel sumber, tabel sink, sink ingesti data

Mode eksekusi

Mode streaming

Format data

Format data yang didukung

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Catatan
  • Protobuf hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0.9 atau versi lebih baru.

  • Masing-masing format data yang didukung mencakup opsi konfigurasi terkait yang dapat Anda gunakan langsung dalam klausa WITH. Untuk informasi selengkapnya, lihat dokumentasi Flink.

Metrik Pemantauan Unik

Metrik pemantauan spesifik

  • Tabel sumber

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Tabel Tujuan

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi selengkapnya tentang metrik tersebut, lihat Metrik pemantauan.

Tipe API

SQL API, DataStream API, dan data ingestion YAML API

Apakah Anda dapat memperbarui atau menghapus data di tabel sink?

Anda tidak dapat memperbarui atau menghapus data di tabel sink. Anda hanya dapat menyisipkan data ke tabel sink.

Catatan

Untuk informasi selengkapnya tentang fitur terkait pembaruan atau penghapusan data, lihat Upsert Kafka.

Prasyarat

Anda dapat menghubungkan ke kluster menggunakan salah satu metode berikut, tergantung pada kebutuhan Anda:

  • Hubungkan ke kluster ApsaraMQ for Kafka

    • Versi kluster ApsaraMQ for Kafka adalah 0.11 atau lebih baru.

    • Kluster ApsaraMQ for Kafka telah dibuat. Untuk informasi selengkapnya, lihat Buat resource.

    • Ruang kerja Realtime Compute for Apache Flink berada dalam virtual private cloud (VPC) yang sama dengan kluster ApsaraMQ for Kafka, dan blok CIDR VPC Realtime Compute for Apache Flink telah ditambahkan ke daftar putih kluster ApsaraMQ for Kafka. Untuk informasi selengkapnya tentang cara mengonfigurasi daftar putih ApsaraMQ for Kafka, lihat Konfigurasi daftar putih.

    Penting

    Saat menulis data ke ApsaraMQ for Kafka, perhatikan hal berikut:

    • ApsaraMQ for Kafka tidak mendukung algoritma kompresi Zstandard untuk penulisan data.

    • ApsaraMQ for Kafka tidak mendukung operasi penulisan idempoten atau transaksional. Oleh karena itu, Anda tidak dapat menggunakan semantik tepat-sekali (exactly-once semantics) yang didukung oleh tabel sink Kafka. Jika pekerjaan Realtime Compute for Apache Flink Anda menggunakan Ververica Runtime (VVR) 8.0.0 atau versi lebih baru, nonaktifkan fitur penulisan idempoten dengan mengatur properties.enable.idempotence=false untuk tabel sink. Untuk informasi selengkapnya tentang perbandingan dan batasan mesin penyimpanan untuk ApsaraMQ for Kafka, lihat Perbandingan antar mesin penyimpanan.

  • Hubungkan ke kluster Apache Kafka yang dikelola sendiri

    • Versi kluster Apache Kafka yang dikelola sendiri adalah 0.11 atau lebih baru.

    • Terdapat koneksi jaringan antara Flink dan kluster Apache Kafka yang dikelola sendiri. Untuk informasi selengkapnya tentang menghubungkan kluster Apache Kafka yang dikelola sendiri melalui jaringan publik, lihat Opsi Konektivitas Jaringan.

    • Hanya opsi klien Apache Kafka 2.8 yang didukung. Untuk informasi selengkapnya, lihat dokumentasi Apache Kafka Consumer Configs dan Producer Configs.

Catatan penggunaan

Saat ini, penulisan transaksional tidak disarankan karena keterbatasan desain di komunitas Flink dan Kafka. Saat Anda mengatur sink.delivery-guarantee = exactly-once, Kafka connector mengaktifkan penulisan transaksional, yang memiliki tiga masalah yang diketahui:

  • Setiap checkpoint menghasilkan ID transaksi baru. Jika interval checkpoint terlalu pendek, jumlah ID transaksi menjadi berlebihan. Akibatnya, koordinator Kafka mungkin kehabisan memori, sehingga mengganggu stabilitas kluster Kafka.

  • Setiap transaksi membuat instance produsen. Jika terlalu banyak transaksi dikomit secara bersamaan, TaskManager mungkin kehabisan memori, sehingga mengganggu pekerjaan Flink.

  • Jika beberapa pekerjaan Flink menggunakan sink.transactional-id-prefix yang sama, ID transaksi yang dihasilkan mungkin bertabrakan. Jika suatu pekerjaan gagal menulis, hal ini akan memblokir Log Start Offset (LSO) partisi Kafka agar tidak maju, sehingga memengaruhi semua konsumen yang membaca dari partisi tersebut.

Jika Anda memerlukan semantik tepat-sekali, gunakan Upsert Kafka untuk menulis ke tabel dengan primary key dan mengandalkan primary key tersebut untuk memastikan idempotensi. Untuk menggunakan penulisan transaksional, lihat Catatan tentang Semantik Tepat-Sekali.

Pemecahan masalah konektivitas jaringan

Jika pekerjaan Flink melaporkan error Timed out waiting for a node assignment saat startup, hal ini biasanya menunjukkan adanya masalah konektivitas jaringan antara Flink dan Kafka.

Klien Kafka terhubung ke server seperti dijelaskan di bawah ini:

  1. Anda terhubung ke Kafka menggunakan alamat di bootstrap.servers.

  2. Kafka mengembalikan metadata untuk setiap broker di kluster, seperti titik akhirnya.

  3. Klien kemudian terhubung ke setiap broker menggunakan titik akhir yang dikembalikan untuk memproduksi atau mengonsumsi data.

Bahkan jika alamat bootstrap.servers dapat dijangkau, klien tidak dapat membaca atau menulis ke broker jika Kafka mengembalikan alamat broker yang salah. Masalah ini umum terjadi pada arsitektur jaringan yang menggunakan proxy, penerusan port, atau jalur sewa.

Langkah-langkah pemecahan masalah

ApsaraMQ for Kafka

  1. Konfirmasi jenis titik akses

    • Titik akses default (jaringan internal)

    • Titik akses SASL (jaringan internal dengan otentikasi)

    • Titik akses jaringan publik (memerlukan pengajuan terpisah)

    Gunakan konsol pengembangan Flink untuk menjalankan probe jaringan guna mengesampingkan masalah konektivitas dengan alamat bootstrap.servers.

  2. Periksa grup keamanan dan daftar putih

    Instans Kafka harus menambahkan VPC Flink ke daftar putihnya. Untuk informasi selengkapnya, lihat Lihat blok CIDR VPC dan Konfigurasi daftar putih.

  3. Periksa konfigurasi SASL (jika diaktifkan)

    Jika Anda menggunakan titik akhir SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Otentikasi yang tidak lengkap menyebabkan koneksi gagal selama fase handshake dan dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan Otentikasi.

Kafka yang dikelola sendiri (ECS)

  1. Gunakan konsol pengembangan Flink untuk melakukanprobe jaringan.

    Esampingkan masalah konektivitas dengan alamat bootstrap.servers dan verifikasi bahwa titik akhir publik dan internal sudah benar.

  2. Periksa grup keamanan dan daftar putih

    • Grup keamanan ECS harus mengizinkan traffic pada port titik akses Kafka (biasanya 9092 atau 9093).

    • VPC tempat Flink berada harus ditambahkan ke daftar putih instans ECS. Untuk informasi selengkapnya, lihat Lihat blok CIDR VPC.

  3. Pemecahan masalah konfigurasi

    1. Login ke kluster ZooKeeper yang digunakan oleh Kafka dan gunakan tool zkCli.sh atau zookeeper-shell.sh.

    2. Jalankan perintah untuk mengambil metadata broker. Contoh: get /brokers/ids/0. Di bidang endpoints respons, temukan alamat yang diiklankan Kafka kepada klien.

      example

    3. Lakukan probe jaringan di konsol pengembangan Flink untuk menguji apakah alamat tersebut dapat dijangkau.

      Catatan
      • Jika alamat tidak dapat dijangkau, hubungi engineer O&M Kafka untuk memeriksa dan memperbaiki konfigurasi listeners dan advertised.listeners agar alamat yang dikembalikan dapat diakses oleh Flink.

      • Untuk informasi selengkapnya tentang konektivitas klien dan server Kafka, lihat Pemecahan Masalah Konektivitas.

  4. Periksa konfigurasi SASL (jika diaktifkan)

    Jika Anda menggunakan titik akhir SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Otentikasi yang tidak lengkap menyebabkan koneksi gagal selama fase handshake atau muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan Otentikasi.

SQL

Anda dapat menggunakan Kafka connector dalam pekerjaan SQL sebagai tabel sumber atau tabel sink.

Sintaksis

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Kolom metadata

Anda dapat mendefinisikan kolom metadata di tabel sumber Kafka atau tabel sink Kafka untuk mengambil metadata pesan Kafka. Misalnya, jika beberapa topik didefinisikan dalam klausa WITH untuk tabel sumber Kafka dan kolom metadata didefinisikan di tabel sumber, topik tempat Flink membaca data akan ditandai. Potongan kode berikut memberikan contoh cara menggunakan kolom metadata:

CREATE TABLE kafka_source (
  -- Baca topik tempat pesan tersebut berada sebagai nilai bidang record_topic.
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Baca timestamp di ConsumerRecord sebagai nilai bidang ts.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Baca offset pesan sebagai nilai bidang record_offset.
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Tulis timestamp di bidang ts sebagai timestamp ProducerRecord ke Kafka.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

Tabel berikut menjelaskan kolom metadata yang didukung oleh tabel sumber dan tabel sink Kafka.

Kunci

Tipe data

Deskripsi

Tabel sumber atau tabel sink

topic

STRING NOT NULL METADATA VIRTUAL

Nama topik tempat pesan Kafka berada.

Tabel sumber

partition

INT NOT NULL METADATA VIRTUAL

ID partisi tempat pesan Kafka berada.

Tabel sumber

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Header pesan Kafka.

Tabel sumber dan tabel sink

leader-epoch

INT NOT NULL METADATA VIRTUAL

Leader epoch pesan Kafka.

Tabel sumber

offset

BIGINT NOT NULL METADATA VIRTUAL

Offset pesan Kafka.

Tabel sumber

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

Timestamp pesan Kafka.

Tabel sumber dan tabel sink

timestamp-type

STRING NOT NULL METADATA VIRTUAL

Tipe timestamp pesan Kafka:

  • NoTimestampType: menunjukkan bahwa tidak ada timestamp yang didefinisikan dalam pesan.

  • CreateTime: menunjukkan waktu ketika pesan dihasilkan.

  • LogAppendTime: menunjukkan waktu ketika pesan ditambahkan ke broker Kafka.

Tabel sumber

__raw_key__

STRING NOT NULL METADATA VIRTUAL

Bidang kunci mentah pesan Kafka.

Tabel sumber dan tabel sink

Catatan

Parameter ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.4 atau versi lebih baru.

__raw_value__

STRING NOT NULL METADATA VIRTUAL

Bidang nilai mentah pesan Kafka.

Tabel sumber dan tabel sink

Catatan

Parameter ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.4 atau versi lebih baru.

dengan parameter

  • Umum

    parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    connector

    Tipe tabel.

    String

    Ya

    Tidak ada

    Atur nilainya ke kafka.

    properties.bootstrap.servers

    Alamat IP dan nomor port broker Kafka.

    String

    Ya

    Tidak ada

    Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

    properties.*

    Opsi yang dikonfigurasi untuk klien Kafka.

    String

    Tidak

    Tidak ada

    Akhiran harus mematuhi aturan yang ditentukan dalam Producer Configs dan Consumer Configs.

    Flink menghapus awalan "properties." dan meneruskan konfigurasi yang tersisa ke klien Kafka. Misalnya, Anda dapat menonaktifkan pembuatan topik otomatis menggunakan 'properties.allow.auto.create.topics'='false'.

    Konfigurasi berikut tidak dapat dimodifikasi dengan cara ini karena Kafka connector menimpa konfigurasi tersebut:

    • key.deserializer

    • value.deserializer

    format

    Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.

    String

    Tidak

    Tidak ada nilai default.

    Format yang didukung

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Catatan

    Untuk informasi selengkapnya tentang opsi format, lihat Opsi Format.

    key.format

    Format yang digunakan untuk membaca atau menulis bidang kunci pesan Kafka.

    String

    Tidak

    Tidak ada nilai default.

    Format yang didukung

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Catatan

    Jika Anda menggunakan konfigurasi ini, konfigurasi key.options wajib diisi.

    key.fields

    Bidang kunci di tabel sumber atau tabel sink yang sesuai dengan bidang kunci pesan Kafka.

    String

    Tidak

    Tidak ada

    Pisahkan beberapa nama bidang dengan titik koma (;), seperti field1;field2.

    key.fields-prefix

    Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi opsi ini untuk mencegah konflik nama dengan bidang nilai.

    String

    Tidak

    Tidak ada

    Opsi ini hanya digunakan untuk membedakan nama kolom tabel sumber dan tabel sink. Awalan dihapus dari nama kolom saat bidang kunci pesan Kafka diurai atau dihasilkan.

    Catatan

    Saat Anda menggunakan konfigurasi ini, Anda harus mengatur opsi value.fields-include ke EXCEPT_KEY.

    value.format

    Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.

    String

    Tidak

    Tidak ada

    Konfigurasi ini setara dengan format, dan Anda hanya dapat mengonfigurasi salah satu dari format atau value.format. Jika Anda mengonfigurasi keduanya, value.format akan menimpa format.

    value.fields-include

    Menentukan apakah kunci pesan yang sesuai disertakan saat mengurai atau menghasilkan nilai pesan Kafka.

    String

    Tidak

    ALL

    Nilai yang valid:

    • ALL (default): Semua bidang diproses sebagai nilai pesan Kafka.

    • EXCEPT_KEY: Semua bidang kecuali bidang yang ditentukan oleh opsi key.fields diproses sebagai nilai pesan Kafka.

  • Tabel sumber

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    topic

    Nama topik tempat Anda ingin membaca data.

    String

    Tidak

    Tidak ada

    Pisahkan beberapa nama topik dengan titik koma (;), seperti topic-1 dan topic-2.

    Catatan

    Anda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.

    topic-pattern

    Ekspresi reguler yang digunakan untuk mencocokkan topik. Data dari semua topik yang namanya cocok dengan ekspresi reguler yang ditentukan akan dibaca saat deployment berjalan.

    String

    Tidak

    Tidak ada

    Catatan

    Anda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.

    properties.group.id

    ID kelompok konsumen.

    String

    Tidak

    KafkaSource-{Nama tabel sumber}

    Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.

    scan.startup.mode

    Offset awal untuk Kafka membaca data.

    String

    Tidak

    group-offsets

    Nilai yang valid:

    • earliest-offset: Kafka membaca data dari partisi paling awal.

    • latest-offset: Kafka membaca data dari offset terbaru.

    • group-offsets (default): membaca data dari offset yang dikomit oleh kelompok konsumen dengan ID yang ditentukan oleh opsi properties.group.id.

    • timestamp: Membaca data dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

    • specific-offsets: membaca data dari offset yang ditentukan oleh opsi scan.startup.specific-offsets.

    Catatan

    Opsi ini berlaku saat deployment dimulai tanpa state. Saat deployment dimulai ulang dari checkpoint atau dilanjutkan dari state tertentu, deployment akan memprioritaskan membaca data dari progres yang disimpan dalam data state.

    scan.startup.specific-offsets

    Offset awal setiap partisi saat opsi scan.startup.mode diatur ke specific-offsets.

    String

    Tidak

    Tidak ada

    Contoh: partition:0,offset:42;partition:1,offset:300.

    scan.startup.timestamp-millis

    Timestamp offset awal saat opsi scan.startup.mode diatur ke timestamp.

    Long

    Tidak

    Tidak ada

    Unit: milidetik.

    scan.topic-partition-discovery.interval

    Interval waktu untuk mendeteksi topik dan partisi Kafka secara dinamis.

    Duration

    Tidak

    5 menit

    Interval penemuan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur opsi ini ke nilai non-positif. Setelah fitur penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data dari partisi tersebut. Dalam mode topic-pattern, sumber Kafka membaca data dari partisi baru topik yang ada dan data dari semua partisi topik baru yang cocok dengan ekspresi reguler.

    Catatan

    Dalam Realtime Compute for Apache Flink yang menggunakan VVR 6.0.X, fitur penemuan partisi dinamis dinonaktifkan secara default. Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0 atau versi lebih baru, fitur ini diaktifkan secara default. Interval penemuan partisi default adalah 5 menit.

    scan.header-filter

    Penyaringan data Kafka berdasarkan apakah data tersebut mengandung header pesan tertentu.

    String

    Tidak

    Tidak ada

    Pisahkan kunci header dan nilai dengan titik dua (:). Pisahkan beberapa header dengan operator logika seperti AND (&) atau OR (|). Operator logika NOT (!) didukung. Misalnya, depart:toy|depart:book&!env:test menunjukkan bahwa data Kafka yang headernya mengandung depart=toy atau depart=book dan tidak mengandung env=test akan dipertahankan.

    Catatan
    • Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.6 atau versi lebih baru.

    • Operasi tanda kurung tidak didukung.

    • Operasi logika dilakukan dari kiri ke kanan secara berurutan.

    • Nilai header dalam format UTF-8 dikonversi menjadi string dan dibandingkan dengan nilai header yang ditentukan oleh opsi scan.header-filter.

    scan.check.duplicated.group.id

    Menentukan apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh parameter properties.group.id.

    Boolean

    Tidak

    false

    Nilai yang valid:

    • true: Memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai. Jika terdapat duplikasi kelompok konsumen, melaporkan error dan menangguhkan pekerjaan untuk mencegah konflik.

    • false: Tidak memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai.

    Catatan

    Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 6.0.4 atau versi lebih baru.

  • Khusus sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    topic

    Nama topik tempat data ditulis.

    String

    Ya

    Tidak ada

    N/A

    sink.partitioner

    Pola untuk memetakan konkurensi Flink ke partisi Kafka.

    String

    Tidak

    default

    Nilai yang valid:

    • default (default): Menggunakan partitioner Kafka default untuk mempartisi data.

    • fixed: Setiap partisi Flink sesuai dengan partisi Kafka tetap.

    • round-robin: Data dalam partisi Flink didistribusikan ke partisi Kafka secara round-robin.

    • Pola pemetaan partisi kustom: Anda dapat membuat subclass FlinkKafkaPartitioner untuk mengonfigurasi pola pemetaan partisi kustom, seperti org.mycompany.MyPartitioner.

    sink.delivery-guarantee

    Pola semantik untuk tabel sink Kafka.

    String

    Tidak

    at-least-once

    Nilai yang valid:

    • none: Jaminan pengiriman tidak dipastikan. Data mungkin hilang atau diduplikasi.

    • at-least-once (default): Memastikan data tidak hilang. Namun, data mungkin diduplikasi.

    • exactly-once: Transaksi Kafka digunakan untuk memastikan semantik tepat-sekali. Ini memastikan data tidak hilang atau diduplikasi.

    Catatan

    Anda harus mengonfigurasi opsi sink.transactional-id-prefix jika Anda mengatur opsi ini ke exactly-once.

    sink.transactional-id-prefix

    Awalan ID transaksi Kafka yang digunakan dalam semantik tepat-sekali.

    String

    Tidak

    Tidak ada

    Opsi ini hanya berlaku saat opsi sink.delivery-guarantee diatur ke exactly-once.

    sink.parallelism

    Konkurensi operator untuk tabel sink Kafka.

    Integer

    Tidak

    Tidak ada

    Konkurensi operator hulu, yang ditentukan oleh framework.

Keamanan dan otentikasi

Jika kluster Kafka Anda memerlukan koneksi aman atau otentikasi, tambahkan awalan properties. ke nama opsi keamanan dan otentikasi serta konfigurasikan dalam klausa WITH. Potongan kode berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme Simple Authentication and Security Layer (SASL) dan menyediakan konfigurasi Java Authentication and Authorization Service (JAAS).

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

Potongan kode berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*Konfigurasi Secure Sockets Layer (SSL).*/
  /*Tentukan jalur truststore sertifikat CA yang disediakan oleh server.*/
  /*Artefak yang diunggah disimpan di /flink/usrlib/.*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*Tentukan jalur file keystore kunci privat jika otentikasi klien diperlukan.*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*Algoritma yang digunakan klien untuk memverifikasi alamat server. Nilai null menunjukkan bahwa verifikasi alamat server dinonaktifkan.*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*Konfigurasi SASL.*/
  /*Konfigurasi SCRAM-SHA-256 sebagai mekanisme SASL.*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*Konfigurasi JAAS.*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

Anda dapat menggunakan fitur Artefak di konsol pengembangan untuk mengunggah sertifikat CA dan kunci privat dari contoh. Setelah diunggah, file tersebut disimpan di direktori /flink/usrlib. Jika file sertifikat CA yang ingin Anda gunakan bernama my-truststore.jks, Anda dapat mengatur parameter 'properties.ssl.truststore.location' dalam klausa WITH dengan dua cara berikut untuk menggunakan sertifikat ini:

  • Jika Anda mengatur 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks', Flink tidak perlu mengunduh file OSS secara dinamis selama runtime, tetapi mode debug tidak didukung.

  • Untuk versi mesin komputasi waktu nyata Ververica Runtime (VVR) 11.5 dan versi lebih baru, Anda dapat mengonfigurasi properties.ssl.truststore.location dan properties.ssl.keystore.location dengan jalur mutlak OSS. Format jalur file adalah oss://flink-fullymanaged-<ID ruang kerja>/artifacts/namespaces/<nama proyek>/<nama file>. Metode ini mengunduh file OSS secara dinamis selama runtime Flink dan mendukung mode debug.

Catatan
  • Konfirmasi konfigurasi: Potongan kode di atas berlaku untuk sebagian besar skenario konfigurasi. Sebelum mengonfigurasi Kafka connector, hubungi personel O&M server Kafka untuk mendapatkan informasi konfigurasi keamanan dan otentikasi yang benar.

  • Catatan escape: Berbeda dengan Apache Flink, editor SQL Realtime Compute for Apache Flink secara default melakukan escape tanda kutip ganda (") . Oleh karena itu, Anda tidak perlu menambahkan backslash (\) sebagai karakter escape untuk tanda kutip ganda (") yang digunakan untuk mengapit username dan password saat mengonfigurasi opsi properties.sasl.jaas.config.

Offset awal untuk tabel sumber Kafka

Mode startup

Anda dapat mengonfigurasi parameter scan.startup.mode untuk menentukan offset baca awal untuk tabel sumber Kafka:

  • earliest-offset: Membaca data dari offset paling awal partisi saat ini.

  • latest-offset: Membaca data dari offset terbaru partisi saat ini.

  • group-offsets: Membaca data dari offset yang dikomit oleh kelompok konsumen dengan ID yang ditentukan oleh opsi properties.group.id.

  • timestamp: Membaca data dari pesan pertama yang timestamp-nya lebih besar dari atau sama dengan timestamp yang ditentukan oleh scan.startup.timestamp-millis.

  • specific-offsets: Membaca data dari offset partisi yang ditentukan oleh opsi scan.startup.specific-offsets.

Catatan
  • Jika Anda tidak menentukan offset awal, group-offsets digunakan secara default.

  • scan.startup.mode hanya berlaku untuk pekerjaan tanpa status. Untuk pekerjaan berstatus, konsumsi dimulai dari offset yang disimpan dalam status.

Kode contoh:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Mengonsumsi data dari offset paling awal.
  'scan.startup.mode' = 'earliest-offset',
  -- Mengonsumsi data dari offset terbaru.
  'scan.startup.mode' = 'latest-offset',
  -- Mengonsumsi data dari offset yang dikomit oleh kelompok konsumen my-group.
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- Jika my-group digunakan untuk pertama kalinya, konsumsi dimulai dari offset paling awal.
  'properties.auto.offset.reset' = 'latest', -- Jika my-group digunakan untuk pertama kalinya, konsumsi dimulai dari offset terbaru.
  -- Mengonsumsi data dari timestamp 1655395200000, dalam milidetik.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Mengonsumsi data dari offset yang ditentukan.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Prioritas offset awal

Urutan prioritas untuk offset awal tabel sumber adalah sebagai berikut:

Prioritas (tertinggi ke terendah)

Offset yang disimpan dalam checkpoint atau titik simpan.

Waktu mulai yang ditentukan di konsol.

Offset awal yang ditentukan oleh parameter scan.startup.mode dalam parameter WITH.

Jika scan.startup.mode tidak ditentukan, sistem menggunakan group-offsets dan mengonsumsi data dari offset kelompok konsumen yang sesuai.

Jika offset menjadi tidak valid dalam salah satu langkah di atas karena kedaluwarsa atau masalah di kluster Kafka, kebijakan reset yang ditentukan oleh properties.auto.offset.reset akan digunakan. Jika item konfigurasi ini tidak diatur, akan dilemparkan exception yang memerlukan intervensi pengguna.

Dalam kebanyakan kasus, tabel sumber Kafka mulai membaca data dari offset yang dikomit oleh kelompok konsumen dengan ID grup baru. Saat tabel sumber Kafka menanyakan offset yang dikomit oleh kelompok konsumen di kluster Kafka, tidak ada offset valid yang dikembalikan karena ID grup digunakan untuk pertama kalinya. Dalam kasus ini, strategi reset yang dikonfigurasi oleh parameter properties.auto.offset.reset digunakan untuk mereset offset. Oleh karena itu, Anda harus mengonfigurasi parameter properties.auto.offset.reset untuk menentukan strategi reset offset.

Pengiriman Offset Tabel Sumber

Tabel sumber Kafka hanya mengirimkan offset konsumen ke kluster Kafka setelah operasi checkpoint berhasil. Jika interval checkpoint yang Anda tentukan terlalu besar, offset konsumen akan dikirimkan dengan penundaan ke kluster Kafka. Selama operasi checkpoint, tabel sumber Kafka menyimpan progres pembacaan data saat ini di backend status. Offset yang dikirimkan ke kluster Kafka tidak digunakan untuk pemulihan kesalahan. Offset yang dikirimkan hanya digunakan untuk memantau progres pembacaan data di Kafka. Akurasi data tidak terpengaruh bahkan jika offset gagal dikirimkan.

Partitioner kustom untuk tabel sink

Jika partitioner produsen Kafka bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan partitioner kustom untuk menulis data ke partisi tertentu. Partitioner kustom harus mewarisi FlinkKafkaPartitioner. Setelah pengembangan, kompilasi paket JAR dan gunakan fitur Manajemen File untuk mengunggahnya ke konsol Komputasi Waktu Nyata. Setelah paket JAR diunggah dan direferensikan, atur parameter sink.partitioner dalam klausa WITH. Nilai parameter harus berupa jalur kelas lengkap partitioner, seperti org.mycompany.MyPartitioner.

Perbandingan antara Kafka, Upsert Kafka, dan katalog JSON Kafka

Kafka adalah sistem antrian pesan yang hanya mendukung penyisipan data dan tidak mendukung pembaruan atau penghapusan. Oleh karena itu, Kafka tidak dapat memproses data Change Data Capture (CDC) dari sistem hulu atau logika penarikan dari operator seperti agregat dan join selama komputasi SQL streaming. Jika Anda ingin menulis data yang mengandung data perubahan atau data penarikan ke Kafka, gunakan tabel sink Upsert Kafka, yang melakukan pemrosesan khusus pada data perubahan.

Jika Anda ingin menyinkronkan data perubahan dari satu atau beberapa tabel data di database hulu ke Kafka secara batch, Anda dapat menggunakan katalog JSON Kafka. Jika data yang disimpan di Kafka dalam format JSON, Anda dapat menggunakan katalog JSON Kafka. Hal ini menghilangkan kebutuhan untuk mengonfigurasi skema dan opsi dalam klausa WITH. Untuk informasi selengkapnya, lihat Kelola katalog JSON Kafka.

Contoh

Contoh 1: Baca data dari topik Kafka dan tulis data tersebut ke topik Kafka lain

Kode contoh berikut membaca data dari topik Kafka sumber dan menulisnya ke topik Kafka sink. Data dalam format CSV.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Contoh 2: Sinkronisasi skema dan data tabel

Gunakan Kafka connector untuk menyinkronkan pesan dari topik Kafka ke Hologres secara real-time. Dengan mengonfigurasi offset dan ID partisi pesan Kafka sebagai primary key, Anda menghindari pesan duplikat di Hologres jika terjadi failover.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Opsional. Perluas semua kolom bersarang. 
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Contoh 3: Sinkronisasi skema dan data dalam kolom kunci dan nilai pesan Kafka

Bidang kunci pesan Kafka menyimpan informasi relevan. Anda dapat menyinkronkan data dalam kolom kunci dan nilai pesan Kafka secara bersamaan.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Catatan

Kunci pesan Kafka tidak mendukung evolusi skema dan penguraian tipe. Deklarasi manual diperlukan.

Contoh 4: Sinkronisasi skema dan data serta lakukan komputasi

Saat Anda menyinkronkan data dari Kafka ke Hologres, diperlukan perhitungan ringan.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Gunakan COALESCE untuk menangani nilai null.

Contoh 5: Uraikan data JSON bersarang

Pesan JSON contoh

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

Untuk menghindari penggunaan fungsi seperti JSON_VALUE(payload, '$.properties.owner') untuk mengurai bidang, Anda dapat langsung mendefinisikan struktur dalam DDL Sumber:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

Akibatnya, Flink mengurai JSON menjadi bidang terstruktur selama fase baca, dan query SQL selanjutnya langsung menggunakan properties.owner, tanpa memerlukan pemanggilan fungsi tambahan, meningkatkan kinerja keseluruhan.

DataStream API

Penting

Jika Anda ingin menggunakan DataStream API untuk membaca atau menulis data, Anda harus menggunakan konektor DataStream dari tipe terkait untuk terhubung ke Realtime Compute for Apache Flink. Untuk informasi selengkapnya tentang cara mengonfigurasi konektor DataStream, lihat Cara menggunakan konektor DataStream.

  • Buat sumber Kafka

    Sumber Kafka menyediakan kelas builder untuk membuat instance KafkaSource. Kode contoh berikut menunjukkan cara membuat sumber Kafka untuk mengonsumsi pesan dari offset paling awal topik "input-topic", dengan kelompok konsumen bernama my-group, dan mendeserialisasi isi pesan Kafka sebagai string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    Saat membuat KafkaSource, Anda harus menentukan parameter berikut.

    Parameter

    Deskripsi

    BootstrapServers

    Alamat broker Kafka. Anda dapat memanggil operasi setBootstrapServers(String) untuk mengonfigurasi alamat.

    GroupId

    ID kelompok konsumen. Anda dapat memanggil metode setGroupId(String) untuk mengonfigurasi ID.

    Topik atau Partisi

    Topik atau nama partisi tempat Anda berlangganan. Anda dapat mengonfigurasi sumber Kafka untuk berlangganan ke topik atau partisi menggunakan salah satu pola langganan berikut:

    • Daftar topik. Setelah Anda mengonfigurasi daftar topik, sumber Kafka berlangganan ke semua partisi dari topik yang ditentukan.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Pola topik. Setelah Anda menentukan ekspresi reguler, sumber Kafka berlangganan ke semua partisi dari topik yang cocok dengan ekspresi reguler yang ditentukan.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Daftar partisi. Setelah Anda mengonfigurasi daftar partisi, sumber Kafka berlangganan ke partisi yang ditentukan.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partisi 0 dari topik "topic-a"
              new TopicPartition("topic-b", 5)));  // Partisi 5 dari topik "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    Deserializer yang mendeserialisasi pesan Kafka.

    Anda dapat memanggil metode setDeserializer(KafkaRecordDeserializationSchema) untuk menentukan deserializer. Antarmuka KafkaRecordDeserializationSchema mendefinisikan cara objek ConsumerRecord dideserialisasi. Anda dapat menggunakan salah satu metode berikut untuk hanya mendeserialisasi bidang Value dalam pesan Kafka objek ConsumerRecord:

    • Sumber Kafka menyediakan metode setValueOnlyDeserializer(DeserializationSchema). Kelas DeserializationSchema mendefinisikan cara pesan Kafka yang disimpan sebagai nilai biner dideserialisasi.

    • Gunakan kelas yang mengimplementasikan Antarmuka Deserializer Kafka. Misalnya, Anda dapat menggunakan kelas StringDeserializer untuk mendeserialisasi pesan menjadi string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Catatan

    Jika Anda ingin mendeserialisasi objek ConsumerRecord, Anda harus membuat kelas yang mengimplementasikan antarmuka KafkaRecordDeserializationSchema.

    XML

    Konektor DataStream Kafka disimpan di repositori pusat Maven.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    Saat menggunakan konektor DataStream Kafka, Anda harus memahami properti Kafka berikut:

    • Offset awal

      Anda dapat menggunakan inisialisasi offset untuk menentukan offset untuk sumber Kafka saat sumber Kafka mulai membaca data. Inisialisasi offset adalah objek yang mengimplementasikan antarmuka OffsetsInitializer. Kelas KafkaSource menyediakan inisialisasi offset bawaan berikut.

      Inisialisasi offset

      Pengaturan Kode

      Membaca data dari offset paling awal.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Membaca data dari offset terbaru.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Memulai mengonsumsi data dengan timestamp lebih besar dari atau sama dengan waktu yang ditentukan (dalam milidetik).

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Mengonsumsi dari offset yang dikomit oleh kelompok konsumen. Jika tidak ada offset tersebut, gunakan offset paling awal.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Membaca data dari offset yang dikomit setiap partisi dan tidak ada strategi reset yang ditentukan.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Catatan
      • Jika inisialisasi offset bawaan tidak memenuhi kebutuhan bisnis Anda, Anda dapat membuat inisialisasi offset kustom.

      • Jika Anda tidak menentukan inisialisasi offset, inisialisasi offset OffsetsInitializer.earliest() digunakan secara default.

    • Mode eksekusi streaming dan mode eksekusi batch

      Sumber Kafka dapat beroperasi dalam mode streaming atau mode batch. Secara default, sumber Kafka beroperasi dalam mode streaming. Dalam mode ini, deployment terus berjalan hingga deployment gagal atau dibatalkan. Jika Anda ingin sumber Kafka beroperasi dalam mode batch, Anda dapat memanggil metode setBounded(OffsetsInitializer) untuk menentukan offset berhenti. Saat semua partisi mencapai offset berhentinya, sumber Kafka keluar.

      Catatan

      Dalam kebanyakan kasus, sumber Kafka yang beroperasi dalam mode streaming tidak memiliki offset berhenti. Jika Anda ingin men-debug sumber Kafka yang beroperasi dalam mode streaming, Anda dapat memanggil metode setUnbounded(OffsetsInitializer) untuk menentukan offset berhenti. Metode yang Anda gunakan untuk menentukan offset berhenti bervariasi tergantung pada apakah Anda menggunakan mode streaming atau mode batch.

    • Penemuan partisi dinamis

      Jika Anda ingin deployment yang sedang berjalan memproses data dari topik baru dan partisi baru yang cocok dengan pola langganan Anda tanpa me-restart deployment, Anda dapat mengaktifkan fitur penemuan partisi dinamis pada sumber Kafka. Dalam konektor DataStream, fitur ini dinonaktifkan secara default dan harus diaktifkan secara manual:

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Temukan partisi baru setiap 10 detik.
      Penting

      Fitur penemuan partisi dinamis bergantung pada mekanisme pembaruan metadata kluster Kafka. Jika kluster Kafka tidak segera memperbarui informasi partisi, partisi baru mungkin tidak ditemukan. Pastikan konfigurasi partition.discovery.interval.ms kluster Kafka sesuai dengan situasi aktual.

    • Waktu event dan watermark

      Secara default, sumber Kafka menggunakan timestamp yang dilampirkan pada catatan sebagai waktu event untuk catatan tersebut. Anda dapat mendefinisikan strategi watermark berdasarkan waktu event setiap catatan dan mengirimkan watermark ke layanan hilir.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      Untuk informasi selengkapnya tentang cara mendefinisikan strategi watermark kustom, lihat Generating Watermarks.

      Catatan

      Jika beberapa subtugas sumber tetap menganggur untuk jangka waktu yang lama, seperti partisi Kafka yang tidak menerima pesan baru atau konkurensi sumber melebihi jumlah partisi Kafka, pembuatan watermark mungkin gagal. Dalam kasus ini, komputasi window tidak dapat dipicu, dan pemrosesan data akan berhenti.

      Solusinya adalah sebagai berikut:

      • Konfigurasi mekanisme timeout watermark: Aktifkan parameter table.exec.source.idle-timeout untuk memaksa sistem menghasilkan watermark setelah periode timeout yang ditentukan, memastikan progres epoch komputasi window.

      • Optimalkan sumber data: Atur konkurensi sumber agar sama dengan atau kurang dari jumlah partisi Kafka.

    • Consumer Offset Commit

      Saat checkpoint dihasilkan, sumber Kafka mengirimkan offset konsumen Kafka setiap partisi ke broker Kafka. Hal ini memastikan bahwa offset konsumen Kafka yang dicatat di broker Kafka konsisten dengan status checkpoint. Konsumen Kafka dapat secara otomatis mengirimkan offset pada setiap partisi ke broker Kafka secara berkala. Anda dapat mengonfigurasi fitur pengiriman offset otomatis menggunakan opsi enable.auto.commit dan auto.commit.interval.ms. Jika Anda menonaktifkan fitur checkpointing, sumber Kafka mengandalkan konsumen Kafka untuk mengirimkan offset ke broker Kafka.

      Catatan

      Sumber Kafka tidak menggunakan offset yang dikirimkan yang dicatat di broker Kafka untuk toleransi kesalahan. Saat Anda mengirimkan offset, broker Kafka dapat memantau progres konsumsi catatan pada setiap partisi.

    • Properti tambahan

      Anda dapat memanggil metode setProperties(Properties) dan setProperty(String, String) untuk mengonfigurasi properti tambahan untuk sumber Kafka dan konsumen Kafka. Tabel berikut menjelaskan properti sumber Kafka.

      Item konfigurasi

      Deskripsi

      client.id.prefix

      Menentukan awalan untuk ID klien konsumen Kafka.

      partition.discovery.interval.ms

      Menentukan interval waktu saat sumber Kafka memeriksa partisi baru.

      Catatan

      Properti partition.discovery.interval.ms ditimpa menjadi -1 dalam mode batch.

      register.consumer.metrics

      Menentukan apakah akan mendaftarkan metrik untuk konsumen Kafka di Realtime Compute for Apache Flink.

      Konfigurasi Konsumen Kafka lainnya

      Untuk informasi selengkapnya tentang properti konsumen Kafka, lihat Apache Kafka.

      Penting

      Konektor DataStream Kafka menimpa nilai properti berikut:

      • key.deserializer: Nilai properti ini diatur ke ByteArrayDeserializer.

      • value.deserializer: Nilai properti ini diatur ke ByteArrayDeserializer.

      • auto.offset.reset.strategy: Nilai properti ini diatur ke OffsetsInitializer#getAutoOffsetResetStrategy().

      Kode contoh berikut menunjukkan cara konsumen Kafka terhubung ke kluster Kafka menggunakan konfigurasi JAAS dan mekanisme otentikasi SASL/PLAIN.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Pemantauan

      Sumber Kafka mendaftarkan metrik di Realtime Compute for Apache Flink untuk pemantauan dan diagnosis.

      • Cakupan metrik

        Semua metrik sumber Kafka didaftarkan di bawah grup metrik KafkaSourceReader. KafkaSourceReader adalah subgrup dari grup metrik operator. Metrik untuk partisi tertentu didaftarkan di grup metrik KafkaSourceReader.topic.<nama_topik>.partition.<id_partisi>.

        Misalnya, topik bernama my-topic dan partisi topik bernama 1. Offset konsumen partisi dilaporkan oleh metrik <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. Jumlah pengiriman offset konsumen yang berhasil diukur oleh metrik <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.

      • Metrik

        Metrik

        Deskripsi

        Cakupan

        currentOffset

        Offset Konsumen Saat Ini

        TopicPartition

        committedOffset

        Offset komit saat ini

        TopicPartition

        commitsSucceeded

        Jumlah pengiriman yang berhasil

        KafkaSourceReader

        commitsFailed

        Jumlah Pengiriman Gagal

        KafkaSourceReader

      • Metrik Konsumen Kafka

        Metrik untuk konsumen Kafka didaftarkan di grup metrik KafkaSourceReader.KafkaConsumer. Misalnya, metrik records-consumed-total didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        Anda dapat mengonfigurasi opsi register.consumer.metrics untuk menentukan apakah akan mendaftarkan metrik untuk konsumen Kafka. Secara default, opsi register.consumer.metrics diatur ke true. Untuk informasi selengkapnya tentang metrik untuk konsumen Kafka, lihat Apache Kafka.

  • Buat sink Kafka

    Sink Kafka dapat menulis data dari beberapa stream ke satu atau beberapa topik Kafka.

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // // konfigurasi produsen
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // topik tujuan
                                            .setKafkaValueSerializer(StringSerializer.class) // skema serialisasi
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // toleransi kesalahan
                            .build();
    
    stream.sinkTo(kafkaSink);

    Anda harus mengonfigurasi parameter berikut.

    Parameter

    Deskripsi

    Topik

    Nama topik tempat data ditulis.

    Serialisasi data

    Saat Anda membuat sink Kafka, Anda harus menyediakan KafkaRecordSerializationSchema untuk mengonversi data input menjadi objek ProducerRecord Kafka. Flink menyediakan builder skema yang menawarkan komponen umum, seperti serialisasi kunci dan nilai pesan, pemilihan topik, dan partisi pesan. Anda juga dapat mengimplementasikan antarmuka yang sesuai untuk kontrol lanjutan. Sink Kafka memanggil metode ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) untuk setiap catatan masuk untuk menghasilkan objek ProducerRecord yang merepresentasikan catatan yang telah diserialisasi. Kemudian, sink Kafka menulis objek ProducerRecord ke topik yang diperlukan.

    Anda dapat mengontrol cara setiap catatan ditulis ke Kafka secara detail. Menggunakan ProducerRecord, Anda dapat melakukan tindakan berikut:

    • Menyetel nama topik tujuan.

    • Menentukan kunci pesan.

    • Menentukan partisi tujuan.

    Properti klien Kafka

    Properti bootstrap.servers wajib diisi. Tentukan daftar alamat broker Kafka yang dipisahkan koma.

    Semantik toleransi kesalahan

    Setelah Anda mengaktifkan fitur checkpointing, sink Kafka dapat memastikan pengiriman tepat-sekali. Anda juga dapat mengonfigurasi parameter DeliveryGuarantee untuk menentukan semantik toleransi kesalahan yang berbeda. Detail tentang parameter DeliveryGuarantee adalah sebagai berikut:

    • DeliveryGuarantee.NONE: Tidak ada jaminan pengiriman yang disediakan oleh Flink. Data mungkin hilang atau diduplikasi.

    • DeliveryGuarantee.AT_LEAST_ONCE: Sink Kafka memastikan data tidak hilang. Namun, data mungkin diduplikasi.

    • DeliveryGuarantee.EXACTLY_ONCE: Sink Kafka memastikan data tidak hilang atau diduplikasi. Mekanisme transaksi Kafka digunakan untuk memastikan pengiriman tepat-sekali.

      Catatan

      Untuk informasi selengkapnya tentang semantik tepat-sekali, lihat Catatan penggunaan Semantic.EXACTLY_ONCE.

Ingesti Data

Anda dapat menggunakan Kafka connector dalam pekerjaan ingesti data berbasis YAML sebagai sumber atau sink.

Batasan

  • Kami menyarankan Anda menggunakan Kafka sebagai sumber data sinkron untuk ingesti data Flink CDC di Realtime Compute for Apache Flink yang menggunakan VVR 11.1 atau versi lebih baru.

  • Hanya format JSON, Debezium JSON, dan Canal JSON yang didukung. Format data lain tidak didukung.

  • Untuk sumber, data dalam tabel yang sama dapat didistribusikan ke beberapa partisi hanya di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.

Sintaksis

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Opsi konfigurasi

  • Umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    type

    Tipe sumber atau sink.

    Ya

    String

    Tidak ada

    Atur nilainya ke kafka.

    name

    Nama sumber atau sink.

    Tidak

    String

    Tidak ada nilai default.

    N/A

    properties.bootstrap.servers

    Alamat IP dan nomor port broker Kafka.

    Ya

    String

    Tidak ada

    Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

    properties.*

    Opsi yang dikonfigurasi untuk klien Kafka.

    Tidak

    String

    Tidak ada

    Akhiran harus berupa konfigurasi produsen atau konsumen yang ditentukan dalam dokumentasi resmi Kafka.

    Flink menghapus awalan properties. dan meneruskan kunci dan nilai yang diubah ke klien Kafka. Misalnya, Anda dapat mengatur properties.allow.auto.create.topics ke false untuk menonaktifkan pembuatan topik otomatis.

    key.format

    Format yang digunakan untuk membaca atau menulis bidang kunci pesan Kafka.

    Tidak

    String

    Tidak ada

    • Untuk sumber, hanya json yang didukung.

    • Untuk sink, nilai yang valid adalah:

      • csv

      • json

    Catatan

    Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.0.0 atau versi lebih baru.

    value.format

    Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.

    Tidak

    String

    debezium-json

    Nilai yang valid:

    • debezium-json

    • canal-json

    • json

    Catatan
    • Format debezium-json dan canal-json hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.10 atau versi lebih baru.

    • Format json hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.0.0 atau versi lebih baru.

  • tabel sumber

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    topic

    Nama topik tempat Anda ingin membaca data.

    Tidak

    String

    Tidak ada nilai default.

    Pisahkan beberapa nama topik dengan titik koma (;), seperti topic-1 dan topic-2.

    Catatan

    Anda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.

    topic-pattern

    Ekspresi reguler yang digunakan untuk mencocokkan topik. Data dari semua topik yang namanya cocok dengan ekspresi reguler yang ditentukan akan dibaca saat pekerjaan berjalan.

    Tidak

    String

    Tidak ada nilai default.

    Catatan

    Anda tidak dapat menggunakan opsi topic bersamaan dengan opsi topic-pattern.

    properties.group.id

    ID kelompok konsumen.

    Tidak

    String

    Tidak ada

    Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.

    scan.startup.mode

    Offset awal untuk Kafka membaca data.

    Tidak

    String

    group-offsets

    Nilai yang valid:

    • earliest-offset: Membaca data dari partisi paling awal.

    • latest-offset: Membaca data dari offset terbaru.

    • group-offsets (default): Mulai membaca dari offset yang dikomit untuk grup yang ditentukan di properties.group.id.

    • timestamp: Membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

    • specific-offsets: Baca dari offset yang ditentukan oleh scan.startup.specific-offsets.

    Catatan

    Opsi ini berlaku saat pekerjaan dimulai tanpa state. Saat pekerjaan dimulai ulang dari checkpoint atau dilanjutkan dari state tertentu, pekerjaan akan memprioritaskan membaca data dari progres yang disimpan dalam data state.

    scan.startup.specific-offsets

    Offset awal setiap partisi saat opsi scan.startup.mode diatur ke specific-offsets.

    Tidak

    String

    Tidak ada

    Misalnya partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    Timestamp offset awal saat opsi scan.startup.mode diatur ke timestamp.

    Tidak

    Long

    Tidak ada

    Unit: milidetik.

    scan.topic-partition-discovery.interval

    Interval waktu untuk mendeteksi topik dan partisi Kafka secara dinamis.

    Tidak

    Duration

    5 menit

    Interval penemuan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur opsi ini ke nilai non-positif. Setelah fitur penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data dari partisi tersebut. Dalam mode topic-pattern, sumber Kafka membaca data dari partisi baru topik yang ada dan data dari semua partisi topik baru yang cocok dengan ekspresi reguler.

    scan.check.duplicated.group.id

    Apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh properties.group.id.

    Tidak

    Boolean

    false

    Nilai yang valid:

    • true: Memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai. Jika terdapat duplikasi kelompok konsumen, melaporkan error dan menangguhkan pekerjaan untuk mencegah konflik.

    • false: Tidak memeriksa duplikasi kelompok konsumen sebelum pekerjaan dimulai.

    schema.inference.strategy

    Strategi inferensi skema.

    Tidak

    String

    continuous

    Nilai yang valid:

    • continuous: Menyimpulkan skema untuk setiap catatan. Jika skema tidak kompatibel, menyimpulkan skema yang lebih luas dan menghasilkan event perubahan skema.

    • static: Melakukan inferensi skema hanya sekali saat pekerjaan dimulai. Catatan selanjutnya diurai berdasarkan skema awal. Event perubahan skema tidak dihasilkan.

    Catatan

    scan.max.pre.fetch.records

    Jumlah maksimum pesan yang sistem coba konsumsi dan uraikan dalam partisi selama inferensi skema awal.

    Tidak

    Int

    50

    Sebelum pekerjaan membaca dan memproses data, sistem mencoba mengonsumsi jumlah tertentu pesan terbaru di muka dalam partisi untuk menginisialisasi informasi skema.

    key.fields-prefix

    Awalan yang ditambahkan ke bidang yang diurai dari bidang kunci dalam pesan Kafka. Konfigurasikan opsi ini untuk mencegah konflik penamaan setelah bidang kunci dalam pesan Kafka diurai.

    Tidak

    String

    Tidak ada

    Misalnya, jika opsi ini diatur ke key_, dan bidang kunci berisi bidang bernama a, maka nama bidang setelah penguraian adalah key_a.

    Catatan

    Nilai opsi key.fields-prefix tidak boleh menjadi awalan dari opsi value.fields-prefix.

    value.fields-prefix

    Awalan yang ditambahkan ke bidang yang diurai dari bidang nilai dalam pesan Kafka. Anda dapat mengonfigurasi opsi ini untuk mencegah konflik penamaan setelah bidang nilai dalam pesan Kafka diurai.

    Tidak

    String

    Tidak ada

    Misalnya, jika opsi ini diatur ke value_, dan bidang nilai berisi bidang bernama b, maka nama bidang setelah penguraian adalah value_b.

    Catatan

    Nilai opsi value.fields-prefix tidak boleh menjadi awalan dari opsi key.fields-prefix.

    metadata.list

    Kolom metadata untuk diteruskan ke penyimpanan hilir.

    Tidak

    String

    Tidak ada

    Kolom metadata yang tersedia meliputi topic, partition, offset, timestamp, timestamp-type, headers, leader-epoch, __raw_key__, dan __raw_value__, dipisahkan koma.

    scan.value.initial-schemas.ddls

    Tentukan skema awal untuk tabel tertentu menggunakan pernyataan DDL.

    Tidak

    String

    Tidak ada

    Beberapa pernyataan DDL dihubungkan dengan titik koma Inggris (;). Misalnya, gunakan CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT); untuk menentukan skema awal untuk tabel db1.t1 dan db1.t2.

    Struktur tabel DDL di sini harus sesuai dengan tabel target dan mematuhi aturan sintaksis SQL Flink.

    Catatan

    Versi VVR 11.5 dan versi lebih baru mendukung konfigurasi ini.

    ingestion.ignore-errors

    Menentukan apakah akan mengabaikan error selama penguraian data.

    Tidak

    Boolean

    false

    Catatan

    Konfigurasi ini didukung di VVR 11.5 dan versi lebih baru.

    ingestion.error-tolerance.max-count

    Jumlah error penguraian setelah pekerjaan gagal, jika error diabaikan selama penguraian data.

    Tidak

    Integer

    -1

    Opsi ini hanya berlaku saat ingestion.ignore-errors diaktifkan. Nilai default -1 berarti bahwa exception penguraian tidak memicu kegagalan pekerjaan.

    Catatan

    Ververica Runtime (VVR) versi 11.5 atau versi lebih baru mendukung konfigurasi ini.

    • Tabel sumber dalam format Debezium JSON

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      debezium-json.distributed-tables

      Tidak

      Boolean

      false

      Jika data tabel tunggal dalam Debezium JSON muncul di beberapa partisi, Anda harus mengaktifkan opsi ini.

      Catatan

      Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.

      Penting

      Setelah Anda mengonfigurasi opsi ini, Anda harus memulai deployment tanpa state.

      debezium-json.schema-include

      Tidak

      Boolean

      false

      Saat mengonfigurasi Debezium Kafka Connect, Anda dapat mengaktifkan konfigurasi Kafka value.converter.schemas.enable untuk menyertakan informasi skema dalam pesan. Opsi ini menentukan apakah pesan Debezium JSON menyertakan informasi skema.

      Nilai yang valid:

      • true: Pesan Debezium JSON menyertakan informasi skema.

      • false: Pesan Debezium JSON tidak menyertakan informasi skema.

      debezium-json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewati baris saat ini jika terjadi exception penguraian.

      • false (default): Mengembalikan error dan deployment gagal dimulai.

      debezium-json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan menginterpretasikan semua tipe data sebagai STRING saat mengurai skema tabel.

      Nilai yang valid:

      • true: Menginterpretasikan semua tipe dasar sebagai STRING.

      • Saat false (default), penguraian mengikuti aturan dasar.

    • Format JSON Canal untuk Tabel Sumber

      parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      canal-json.distributed-tables

      Tidak

      Boolean

      false

      Jika data tabel tunggal dalam Canal JSON muncul di beberapa partisi, Anda harus mengaktifkan opsi ini.

      Catatan

      Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 8.0.11 atau versi lebih baru.

      Penting

      Setelah Anda mengonfigurasi opsi ini, Anda harus memulai deployment tanpa state.

      canal-json.database.include

      Tidak

      String

      Tidak ada

      Ekspresi reguler opsional yang cocok dengan bidang metadata database dalam catatan Canal. Hanya changelog dari database yang ditentukan yang dibaca. String ekspresi reguler kompatibel dengan Pattern Java.

      canal-json.table.include

      Tidak

      String

      Tidak ada

      Ekspresi reguler opsional yang cocok dengan bidang metadata tabel dalam catatan Canal. Hanya catatan changelog dari tabel yang ditentukan yang dibaca. Ekspresi reguler kompatibel dengan Pattern Java.

      canal-json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewati baris saat ini jika terjadi exception penguraian.

      • false (default): Mengembalikan error dan deployment gagal dimulai.

      canal-json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan menginterpretasikan semua tipe data sebagai STRING saat mengurai skema tabel.

      Nilai yang valid:

      • true: Menginterpretasikan semua tipe dasar sebagai STRING.

      • false (default): Parser mengikuti aturan dasar.

      canal-json.infer-schema.strategy

      Tidak

      String

      AUTO

      Strategi inferensi skema.

      Nilai yang valid:

      • AUTO (default): Secara otomatis menyimpulkan skema dengan mengurai data JSON. Gunakan AUTO jika data Anda tidak mengandung bidang sqlType untuk menghindari kegagalan penguraian.

      • SQL_TYPE: Menyimpulkan skema menggunakan array sqlType dalam data Canal JSON. Jika data Anda mengandung bidang sqlType, kami sarankan mengatur canal-json.infer-schema.strategy ke SQL_TYPE untuk inferensi tipe yang lebih akurat.

      • MYSQL_TYPE: Menyimpulkan skema menggunakan array mysqlType dalam data Canal JSON.

      Jika data Canal JSON Anda di Kafka mengandung bidang sqlType dan Anda memerlukan pemetaan tipe yang lebih akurat, atur canal-json.infer-schema.strategy ke SQL_TYPE.

      Untuk informasi selengkapnya tentang aturan pemetaan sqlType, lihat Penguraian skema Canal JSON.

      Catatan
      • Ververica Runtime (VVR) 11.1 dan versi lebih baru mendukung konfigurasi ini.

      • MYSQL_TYPE hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.3 atau versi lebih baru.

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      Tidak

      Boolean

      true

      Menentukan apakah akan memetakan TIMESTAMP MySQL ke TIMESTAMP CDC:

      • true (default): Memetakan TIMESTAMP MySQL ke TIMESTAMP CDC.

      • false: Memetakan TIMESTAMP MySQL ke TIMESTAMP_LTZ CDC.

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      Tidak

      Boolean

      true

      Saat MYSQL_TYPE digunakan untuk penguraian, menentukan apakah akan memetakan TINYINT(1) MySQL ke BOOLEAN CDC:

      • true (default): Memetakan TINYINT(1) MySQL ke BOOLEAN CDC.

      • false: Memetakan TINYINT(1) MySQL ke TINYINT(1) CDC.

      Opsi ini hanya berlaku saat canal-json.infer-schema.strategy diatur ke MYSQL_TYPE.

    • Format JSON tabel sumber

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      json.timestamp-format.standard

      Tidak

      String

      SQL

      Menentukan format timestamp input dan output. Nilai yang valid:

      • SQL: Mengurai timestamp input dalam format yyyy-MM-dd HH:mm:ss.s{precision}, seperti 2020-12-30 12:13:14.123.

      • ISO-8601: Mengurai timestamp input dalam format yyyy-MM-ddTHH:mm:ss.s{precision}, seperti 2020-12-30T12:13:14.123.

      json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewati baris saat ini jika terjadi exception penguraian.

      • false (default): Mengembalikan error dan deployment gagal dimulai.

      json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan menginterpretasikan semua tipe data sebagai STRING saat mengurai skema tabel.

      Nilai yang valid:

      • true: Menginterpretasikan semua tipe dasar sebagai STRING.

      • false (default): Mengurai sesuai aturan dasar.

      json.infer-schema.flatten-nested-columns.enable

      Tidak

      Boolean

      false

      Saat mengurai data berformat JSON, Anda harus menentukan apakah akan memperluas kolom bersarang secara rekursif. Nilai parameter ini adalah sebagai berikut:

      • true: Memperluas kolom bersarang secara rekursif.

      • false (default): Memperlakukan tipe bersarang sebagai STRING.

      json.decode.parser-table-id.fields

      Tidak

      String

      Tidak ada

      Menentukan apakah akan menghasilkan tableId berdasarkan nilai bidang JSON tertentu, dengan beberapa bidang dipisahkan koma ,. Misalnya, jika data JSON adalah {"col0":"a", "col1","b", "col2","c"}, hasil yang dihasilkan adalah sebagai berikut:

      Konfigurasi

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

      json.infer-schema.fixed-types

      Tidak

      String

      Tidak ada nilai default.

      Saat mengurai data berformat JSON, tentukan tipe data eksak untuk bidang tertentu. Pisahkan beberapa bidang dengan koma Inggris (,). Misalnya, id BIGINT, name VARCHAR(10) menentukan bidang id dalam data JSON sebagai BIGINT dan bidang name sebagai VARCHAR(10).

      Saat menggunakan konfigurasi ini, Anda juga harus menambahkan konfigurasi scan.max.pre.fetch.records: 0.

      Catatan

      Opsi ini hanya didukung untuk Realtime Compute for Apache Flink yang menggunakan VVR 11.5 atau versi lebih baru.

  • Khusus sink

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    type

    Tipe sink.

    Ya

    String

    Tidak ada

    Atur nilainya ke Kafka.

    name

    Nama sink.

    Tidak

    String

    Tidak ada

    N/A

    topic

    Nama topik Kafka.

    Tidak

    String

    Tidak ada

    Jika opsi ini diaktifkan, semua data ditulis ke topik ini.

    Catatan

    Jika opsi ini tidak diaktifkan, setiap catatan data ditulis ke topik yang namanya berasal dari string ID tabelnya (gabungan nama database dan nama tabel, dipisahkan oleh titik (.)), seperti databaseName.tableName.

    partition.strategy

    Strategi partisi Kafka.

    Tidak

    String

    all-to-zero

    Nilai yang valid:

    • all-to-zero (default): Menulis semua data ke partisi 0.

    • hash-by-key: Menulis data ke partisi berdasarkan nilai hash primary key. Hal ini memastikan bahwa data dengan primary key yang sama berada di partisi yang sama dan terurut.

    sink.tableId-to-topic.mapping

    Pemetaan antara nama tabel leluhur dan nama topik Kafka hilir.

    Tidak

    String

    Tidak ada

    Setiap hubungan pemetaan dipisahkan oleh ;. Nama tabel leluhur dan nama topik Kafka hilir yang sesuai dipisahkan oleh :. Anda dapat menggunakan ekspresi reguler untuk nama tabel, serta menggabungkan beberapa tabel yang dipetakan ke topik yang sama menggunakan ,. Contoh: mydb.mytable1:topic1;mydb.mytable2:topic2.

    Catatan

    Mengonfigurasi parameter ini memungkinkan Anda memodifikasi topik yang dipetakan sambil mempertahankan informasi nama tabel asli.

    • Format JSON Debezium untuk tabel sink

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      debezium-json.include-schema.enabled

      Tidak

      Boolean

      false

      Menentukan apakah informasi skema disertakan dalam data JSON Debezium.

Contoh

  • Ingest data dari Kafka:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Ingest data ke Kafka:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    Di bagian route, tentukan nama topik Kafka tujuan.

Catatan

Secara default, fitur pembuatan topik otomatis dinonaktifkan untuk Kafka Alibaba Cloud. Untuk informasi selengkapnya, lihat FAQ tentang pembuatan topik otomatis. Saat Anda menulis data ke Kafka Alibaba Cloud, Anda harus membuat topik yang sesuai terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 3: Buat resource.

Kebijakan untuk penguraian dan evolusi skema

Kafka connector memelihara skema semua tabel yang diketahui.

Inisialisasi skema

Informasi skema mencakup informasi bidang dan tipe data, informasi database dan tabel, serta informasi primary key. Berikut ini menjelaskan cara menginisialisasi ketiga jenis informasi tersebut:

  • Informasi bidang dan tipe data

Pekerjaan ingesti data dapat menyimpulkan informasi bidang dan tipe data dari data secara otomatis. Namun, dalam beberapa skenario, Anda mungkin ingin menentukan informasi bidang dan tipe untuk tabel tertentu. Berdasarkan granularitas tipe bidang yang ditentukan pengguna, inisialisasi skema mendukung tiga strategi berikut:

  1. Skema disimpulkan sepenuhnya oleh sistem

Sebelum pesan Kafka dibaca, Kafka connector mencoba mengonsumsi pesan di setiap partisi, mengurai skema setiap catatan data, lalu menggabungkan skema untuk menginisialisasi informasi skema tabel. Jumlah pesan yang dapat dikonsumsi tidak lebih dari nilai opsi scan.max.pre.fetch.records. Sebelum data dikonsumsi, event pembuatan tabel dihasilkan berdasarkan skema yang diinisialisasi.

Catatan

Untuk format Debezium JSON dan Canal JSON, informasi tabel disertakan dalam pesan tertentu. Jumlah pesan yang akan dikonsumsi di muka ditentukan oleh parameter scan.max.pre.fetch.records. Pesan yang dikonsumsi di muka ini mungkin berisi data dari beberapa tabel. Oleh karena itu, jumlah catatan data yang dikonsumsi di muka untuk setiap tabel tidak dapat ditentukan. Konsumsi pesan di muka partisi dan inisialisasi skema tabel hanya dilakukan sekali sebelum konsumsi dan pemrosesan pesan yang sebenarnya untuk setiap partisi. Jika data tabel selanjutnya ada, skema tabel yang diurai dari catatan data pertama tabel tersebut digunakan sebagai skema tabel awal. Dalam kasus ini, konsumsi pesan di muka partisi dan inisialisasi skema tabel tidak akan dilakukan lagi.

Penting

Data dalam tabel tunggal dapat didistribusikan ke beberapa partisi hanya di Ververica Runtime (VVR) 8.0.11 atau versi lebih baru. Dalam skenario ini, Anda harus mengatur opsi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.

  1. Tentukan skema awal

Dalam beberapa skenario, Anda mungkin ingin menentukan skema tabel awal sendiri, misalnya saat Anda menulis data dari Kafka ke tabel turunan yang telah dibuat sebelumnya. Untuk melakukan ini, Anda dapat menambahkan parameter scan.value.initial-schemas.ddls. Kode berikut menunjukkan contoh konfigurasi:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Set the initial schema.
  scan.value.initial-schemas.ddls: CREATE TABLE db1.t1 (id BIGINT, name VARCHAR(10)); CREATE TABLE db1.t2 (id BIGINT);

Pernyataan CREATE TABLE harus sesuai dengan skema tabel target. Di sini, tipe awal bidang id di tabel db1.t1 diatur ke BIGINT, dan tipe awal bidang name diatur ke VARCHAR(10). Demikian pula, tipe awal bidang id di tabel db1.t2 diatur ke BIGINT.

Pernyataan CREATE TABLE menggunakan sintaksis SQL Flink.

  1. Perbaiki tipe bidang

Dalam beberapa skenario, Anda mungkin ingin memperbaiki tipe data bidang tertentu—misalnya, untuk bidang tertentu yang mungkin disimpulkan sebagai tipe TIMESTAMP, Anda ingin mengirimkannya sebagai string. Dalam kasus ini, Anda dapat menentukan skema tabel awal dengan menambahkan parameter json.infer-schema.fixed-types (hanya berlaku saat format pesan adalah JSON). Konfigurasi contoh:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Fix specific fields to static types.
  json.infer-schema.fixed-types: id BIGINT, name VARCHAR(10)
  scan.max.pre.fetch.records: 0

Hal ini memperbaiki tipe semua bidang id ke BIGINT dan semua bidang name ke VARCHAR(10).

Tipe di sini sesuai dengan tipe data SQL Flink.

  • Informasi database dan tabel

    • Untuk format Canal JSON dan Debezium JSON, nama database dan tabel diurai dari pesan individual.

    • Secara default, untuk pesan dalam format JSON, informasi tabel hanya berisi nama tabel—nama topik yang berisi data. Jika data Anda mencakup informasi database dan tabel, Anda dapat menggunakan parameter json.infer-schema.fixed-types untuk menentukan bidang yang berisi informasi ini. Kami memetakan bidang ini ke nama database dan nama tabel. Kode berikut menunjukkan contoh konfigurasi:

      source:
        type: kafka
        name: Kafka Source
        properties.bootstrap.servers: host:9092
        topic: test-topic
        value.format: json
        scan.startup.mode: earliest-offset
        # Use the value of the col1 field as the database name and the value of the col2 field as the table name.
        json.decode.parser-table-id.fields: col1,col2

      Hal ini menulis setiap catatan ke tabel yang nama databasenya adalah nilai bidang col1 dan nama tabelnya adalah nilai bidang col2.

  • kunci utama

    • Untuk format Canal JSON, primary key tabel didefinisikan berdasarkan bidang pkNames dalam JSON.

    • Untuk format Debezium JSON dan JSON, JSON tidak berisi informasi primary key. Anda dapat menambahkan primary key ke tabel secara manual menggunakan aturan transformasi:

      transform:
        - source-table: \.*\.\.*
          projection: \*
          primary-keys: key1, key2

Penguraian skema dan evolusi skema

Setelah sinkronisasi skema awal selesai, jika schema.inference.strategy diatur ke static, Kafka connector mengurai nilai setiap pesan berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke continuous, Kafka connector mengurai bagian nilai setiap pesan Kafka menjadi kolom fisik dan membandingkan kolom tersebut dengan skema yang saat ini dipelihara. Jika skema yang diurai tidak konsisten dengan skema saat ini, Kafka connector mencoba menggabungkan skema dan menghasilkan event perubahan skema tabel yang sesuai. Aturan penggabungan adalah sebagai berikut:

  • Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema saat ini, Kafka connector menambahkan bidang tersebut ke skema dan menghasilkan event penambahan kolom nullable.

  • Jika kolom fisik yang diurai tidak berisi bidang yang sudah ada dalam skema saat ini, bidang tersebut dipertahankan dan nilainya diisi dengan NULL. Event penghapusan kolom tidak dihasilkan.

  • Jika kolom fisik yang diurai dan skema saat ini berisi kolom dengan nama yang sama, tangani sebagai berikut:

    • Jika tipe datanya sama tetapi presisinya berbeda, gunakan tipe presisi yang lebih tinggi dan hasilkan event perubahan tipe kolom.

    • Jika tipe datanya berbeda, temukan node induk terkecil dalam struktur pohon sebagai tipe untuk kolom dengan nama yang sama dan hasilkan event perubahan tipe kolom.

      image

  • Opsi evolusi skema yang didukung:

    • Menambahkan kolom: Menambahkan kolom baru ke akhir skema saat ini dan menyinkronkan data kolom baru. Kolom baru diatur sebagai nullable.

    • Menghapus kolom: Tidak menghasilkan event penghapusan kolom. Sebaliknya, data selanjutnya untuk kolom tersebut secara otomatis diisi dengan nilai NULL.

    • Mengganti nama kolom: Dianggap sebagai menambahkan kolom dan menghapus kolom. Menambahkan kolom yang diganti namanya ke akhir skema dan mengisi data kolom asli dengan nilai NULL.

    • Mengubah tipe data kolom:

      • Jika sistem hilir mendukung perubahan tipe kolom, pekerjaan ingesti data mendukung perubahan tipe kolom biasa setelah sink hilir mendukung penanganan perubahan tipe kolom—misalnya, mengubah dari INT ke BIGINT. Perubahan tersebut bergantung pada aturan perubahan tipe kolom yang didukung oleh sink hilir. Tabel sink yang berbeda mendukung aturan perubahan tipe kolom yang berbeda. Lihat dokumentasi untuk tabel sink terkait untuk mempelajari aturan perubahan tipe kolom yang didukungnya.

      • Untuk sistem hilir yang tidak mendukung perubahan tipe kolom, seperti Hologres, Anda dapat menggunakan pemetaan tipe luas. Metode ini membuat tabel dengan tipe data yang lebih umum di sistem hilir saat pekerjaan dimulai. Saat terjadi perubahan tipe kolom, sistem menentukan apakah sink hilir dapat menerima perubahan tersebut, yang memungkinkan dukungan toleran terhadap perubahan tipe kolom.

  • Perubahan skema yang tidak didukung:

    • Perubahan pada constraint, seperti primary key atau indeks.

    • Perubahan dari NOT NULL ke NULLABLE.

  • Penguraian skema untuk Canal JSON

    Data berformat Canal JSON mungkin berisi bidang sqlType opsional, yang berisi informasi tipe yang tepat untuk kolom data. Untuk mendapatkan skema yang lebih akurat, Anda dapat mengatur konfigurasi canal-json.infer-schema.strategy ke SQL_TYPE untuk menggunakan tipe dari sqlType. Hubungan pemetaan tipe adalah sebagai berikut:

    Tipe data JDBC

    Kode tipe

    Tipe data CDC

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    Tipe data lainnya

Toleransi dan pengumpulan data kotor

Dalam beberapa kasus, sumber data Kafka Anda mungkin berisi data rusak (data kotor). Untuk mencegah restart pekerjaan yang sering karena data kotor tersebut, Anda dapat mengonfigurasi pekerjaan untuk mengabaikan exception tersebut. Konfigurasi contoh:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Enable dirty data tolerance.
  ingestion.ignore-errors: true
  # Tolerate up to 1000 dirty data records.
  ingestion.error-tolerance.max-count: 1000

Konfigurasi ini mentoleransi hingga 1000 catatan data kotor, memungkinkan pekerjaan Anda berjalan normal saat terdapat sedikit data kotor. Saat jumlah catatan data kotor melebihi ambang batas ini, pekerjaan gagal, mendorong Anda untuk memvalidasi data Anda.

Untuk memastikan pekerjaan Anda tidak pernah gagal karena data kotor, gunakan konfigurasi berikut:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Enable dirty data tolerance.
  ingestion.ignore-errors: true
  # Tolerate all dirty data.
  ingestion.error-tolerance.max-count: -1

Kebijakan toleransi data kotor mencegah pekerjaan gagal secara sering karena data abnormal. Anda mungkin juga ingin mempelajari lebih lanjut tentang data kotor untuk menyesuaikan perilaku produsen Kafka. Untuk proses yang dijelaskan dalam Pengumpulan Data Kotor, Anda dapat melihat data kotor pekerjaan Anda di log TaskManager. Kode berikut menunjukkan contoh konfigurasi:

source:
  type: kafka
  name: Kafka Source
  properties.bootstrap.servers: host:9092
  topic: test-topic
  value.format: json
  scan.startup.mode: earliest-offset
  # Enable dirty data tolerance.
  ingestion.ignore-errors: true
  # Tolerate all dirty data.
  ingestion.error-tolerance.max-count: -1

pipeline:
  dirty-data.collector:
    # Write dirty data to TaskManager log files.
    type: logger

Strategi pemetaan nama tabel dan topik

Saat menggunakan Kafka sebagai sink ingesti data, format pesan (debezium-json atau canal-json) sering kali mencakup informasi nama tabel. Konsumen biasanya menggunakan nama tabel ini—bukan nama topik—sebagai nama tabel sebenarnya. Oleh karena itu, konfigurasikan strategi pemetaan nama tabel dan topik dengan hati-hati.

Asumsikan Anda perlu menyinkronkan dua tabel—mydb.mytable1 dan mydb.mytable2—dari MySQL. Strategi pemetaan yang mungkin termasuk yang berikut:

1. Jangan mengonfigurasi strategi pemetaan apa pun

Tanpa strategi pemetaan apa pun, setiap tabel ditulis ke topik yang dinamai berdasarkan database dan tabel (misalnya, mydb.mytable1). Dengan demikian, data mydb.mytable1 masuk ke topik mydb.mytable1, dan data mydb.mytable2 masuk ke topik mydb.mytable2. Konfigurasi contoh:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. Konfigurasikan aturan rute untuk pemetaan (tidak disarankan)

Dalam banyak skenario, pengguna tidak ingin topik dinamai berdasarkan database dan tabel. Sebaliknya, mereka mengonfigurasi aturan rute untuk memetakan data ke topik tertentu. Konfigurasi contoh:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

Dalam kasus ini, semua data dari mydb.mytable1 dan mydb.mytable2 ditulis ke topik mytable1.

Namun, memodifikasi nama topik melalui aturan rute juga memodifikasi nama tabel dalam pesan Kafka (dalam format debezium-json atau canal-json). Akibatnya, semua pesan dalam topik ini memiliki nama tabel mytable1. Sistem lain yang mengonsumsi topik ini mungkin berperilaku tidak terduga.

3. Konfigurasikan parameter sink.tableId-to-topic.mapping untuk pemetaan (disarankan)

Untuk mempertahankan informasi nama tabel asli sambil memetakan ke topik kustom, gunakan parameter sink.tableId-to-topic.mapping. Konfigurasi contoh:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

Atau:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

Dalam konfigurasi ini, semua data dari `mydb.mytable1` dan `mydb.mytable2` ditulis ke topik `mytable1`, tetapi nama tabel asli (`mydb.mytable1` atau `mydb.mytable2`) dipertahankan dalam format pesan Kafka (`debezium-json` atau `canal-json`). Hal ini memungkinkan sistem lain yang mengonsumsi pesan dari topik ini untuk mengambil informasi nama tabel sumber dengan benar.

Pertimbangan semantik EXACTLY_ONCE

  • Konfigurasikan tingkat isolasi konsumen

    Semua aplikasi yang mengonsumsi data Kafka harus mengatur isolation.level:

    • read_committed: Hanya membaca data yang telah dikomit.

    • read_uncommitted (default): Memungkinkan Anda membaca data yang belum dikomit.

    EXACTLY_ONCE bergantung pada read_committed. Jika tidak, konsumen mungkin melihat data yang belum dikomit, yang merusak konsistensi.

  • Timeout transaksi dan kehilangan data

    Saat memulihkan dari checkpoint, Flink hanya bergantung pada transaksi yang dikomit sebelum checkpoint dimulai. Jika waktu antara crash pekerjaan dan restart melebihi timeout transaksi Kafka, Kafka secara otomatis membatalkan transaksi, menyebabkan kehilangan data.

    • Default broker Kafka transaction.max.timeout.ms = 15 menit.

    • Sink Kafka Flink mengatur transaction.timeout.ms ke 1 jam secara default.

    • Anda harus meningkatkan transaction.max.timeout.ms di broker agar lebih besar dari atau sama dengan pengaturan Flink.

  • Kolam produsen Kafka dan checkpoint konkuren

    Mode EXACTLY_ONCE menggunakan kolam produsen Kafka berukuran tetap. Setiap checkpoint mengonsumsi satu produsen dari kolam. Jika checkpoint konkuren melebihi ukuran kolam, pekerjaan gagal.

    Sesuaikan ukuran kolam produsen sesuai dengan jumlah maksimum checkpoint konkuren Anda.

  • Scale-in restrictions

    Jika pekerjaan gagal sebelum checkpoint pertama, informasi kolam produsen tidak dipertahankan setelah restart. Oleh karena itu, sebelum checkpoint pertama selesai, jangan menurunkan konkurensi pekerjaan. Jika Anda harus menurunkan skala, konkurensi tidak boleh kurang dari FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • Pemblokiran pembacaan transaksional

    Dalam mode read_committed, transaksi yang belum selesai (tidak dikomit maupun dibatalkan) memblokir pembacaan dari seluruh topik.

    Misalnya:

    • Transaksi 1 menulis data.

    • Transaksi 2 menulis dan melakukan commit data.

    • Hingga Transaksi 1 selesai, data Transaksi 2 tidak terlihat oleh konsumen.

    Oleh karena itu:

    • Selama operasi normal, visibilitas data tertunda sekitar interval rata-rata checkpoint.

    • Saat pekerjaan gagal, topik yang sedang ditulis memblokir konsumen hingga pekerjaan dimulai ulang atau transaksi timeout. Dalam kasus ekstrem, timeout transaksi bahkan dapat memengaruhi pembacaan.

FAQ