全部产品
Search
文档中心

Realtime Compute for Apache Flink:ApsaraMQ for Kafka

更新时间:Dec 04, 2025

Topik ini menjelaskan cara menggunakan konektor ApsaraMQ for Kafka.

Informasi latar belakang

Apache Kafka adalah sistem antrian pesan terdistribusi open source yang banyak digunakan dalam bidang data besar, seperti pemrosesan data berkinerja tinggi, analitik aliran, dan integrasi data. Konektor Kafka didasarkan pada klien Apache Kafka open source, menyediakan throughput data tinggi, mendukung pembacaan dan penulisan berbagai format data, serta menawarkan semantik tepat-sekali (exactly-once semantics) untuk Realtime Compute for Apache Flink.

Kategori

Detail

Jenis yang didukung

Tabel sumber, tabel sink, dan target integrasi data

Mode runtime

Mode streaming

Format data

Format data yang didukung

  • CSV

  • JSON

  • Apache Avro

  • Confluent Avro

  • Debezium JSON

  • Canal JSON

  • Maxwell JSON

  • Raw

  • Protobuf

Catatan
  • Hanya Ververica Runtime (VVR) 8.0.9 dan versi lebih baru yang mendukung format data Protobuf bawaan.

  • Setiap format data yang didukung memiliki item konfigurasi terkait yang dapat Anda gunakan langsung dalam klausa WITH. Untuk informasi selengkapnya, lihat dokumentasi komunitas Flink.

Metrik pemantauan spesifik

Metrik pemantauan spesifik

  • Tabel sumber

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Tabel sink

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi selengkapnya tentang metrik tersebut, lihat Deskripsi metrik.

Jenis API

SQL, DataStream, dan YAML integrasi data

Pembaruan atau penghapusan data di tabel sink

Konektor tidak mendukung pembaruan atau penghapusan data di tabel sink. Konektor hanya mendukung penyisipan data.

Catatan

Untuk fitur terkait pembaruan dan penghapusan data, lihat Upsert Kafka.

Prasyarat

Anda dapat menghubungkan ke kluster dengan salah satu cara berikut:

  • Menghubungkan ke kluster Alibaba Cloud ApsaraMQ for Kafka

    • Versi kluster Kafka minimal 0.11.

    • Anda telah membuat kluster ApsaraMQ for Kafka. Untuk informasi selengkapnya, lihat Buat sumber daya.

    • Ruang kerja Flink dan kluster Kafka berada dalam VPC yang sama, serta kluster ApsaraMQ for Kafka telah menambahkan Flink ke daftar putihnya. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih.

    Penting

    Batasan untuk menulis data ke ApsaraMQ for Kafka:

    • ApsaraMQ for Kafka tidak mendukung penulisan data dalam format kompresi zstd.

    • ApsaraMQ for Kafka tidak mendukung penulisan idempoten atau transaksional. Oleh karena itu, Anda tidak dapat menggunakan fitur semantik tepat-sekali dari tabel sink Kafka. Jika Anda menggunakan Ververica Runtime (VVR) 8.0.0 atau versi lebih baru, Anda harus menambahkan item konfigurasi properties.enable.idempotence=false ke tabel sink untuk menonaktifkan penulisan idempoten. Untuk perbandingan mesin penyimpanan dan batasan fitur ApsaraMQ for Kafka, lihat Perbandingan mesin penyimpanan.

  • Menghubungkan ke kluster Apache Kafka yang dikelola sendiri

    • Versi kluster Apache Kafka yang dikelola sendiri minimal 0.11.

    • Anda telah menetapkan konektivitas jaringan antara Flink dan kluster Apache Kafka yang dikelola sendiri. Untuk informasi tentang cara menghubungkan ke kluster yang dikelola sendiri melalui jaringan publik, lihat Pilih jenis koneksi jaringan.

    • Hanya item konfigurasi klien untuk Apache Kafka 2.8 yang didukung. Untuk informasi selengkapnya, lihat dokumentasi Apache Kafka untuk konfigurasi konsumen dan produsen.

Perhatian

Saat ini, penulisan transaksional tidak disarankan karena bug desain pada Flink dan Kafka. Saat Anda mengatur sink.delivery-guarantee = exactly-once, konektor Kafka mengaktifkan penulisan transaksional, dan terdapat tiga masalah yang diketahui:

  • Setiap checkpoint menghasilkan ID transaksi. Jika interval checkpoint terlalu singkat, terlalu banyak ID transaksi yang dihasilkan. Koordinator kluster Kafka mungkin kehabisan memori, sehingga mengganggu stabilitas kluster Kafka.

  • Setiap transaksi membuat instance produsen. Jika terlalu banyak transaksi dikomit secara bersamaan, Pengelola Tugas (TaskManager) mungkin kehabisan memori, sehingga mengganggu stabilitas pekerjaan Flink.

  • Jika beberapa pekerjaan Flink menggunakan sink.transactional-id-prefix yang sama, ID transaksi yang dihasilkan mungkin bertabrakan. Jika satu pekerjaan gagal menulis, hal ini akan memblokir Log Start Offset (LSO) partisi Kafka agar tidak maju, yang memengaruhi semua konsumen yang membaca data dari partisi tersebut.

Jika Anda memerlukan semantik tepat-sekali, gunakan Upsert Kafka untuk menulis ke tabel kunci primer dan pastikan idempotensi dengan kunci primer tersebut. Jika Anda harus menggunakan penulisan transaksional, lihat Pertimbangan untuk semantik EXACTLY_ONCE.

Pemecahan masalah konektivitas jaringan

Jika pekerjaan Flink melaporkan error Timed out waiting for a node assignment saat startup, penyebabnya biasanya adalah masalah konektivitas jaringan antara Flink dan Kafka.

Klien Kafka menghubungkan ke server sebagai berikut:

  1. Klien menggunakan alamat dalam bootstrap.servers untuk menghubungkan ke Kafka.

  2. Kafka mengembalikan metadata setiap broker dalam kluster, termasuk alamat koneksi mereka.

  3. Klien kemudian menggunakan alamat yang dikembalikan tersebut untuk menghubungkan ke setiap broker guna melakukan operasi baca dan tulis.

Meskipun alamat bootstrap.servers dapat diakses, klien tidak dapat membaca atau menulis data jika Kafka mengembalikan alamat broker yang salah. Masalah ini sering terjadi pada arsitektur jaringan yang menggunakan proxy, penerusan port, atau jalur sewa.

Langkah pemecahan masalah

ApsaraMQ for Kafka

  1. Konfirmasi jenis endpoint

    • Endpoint default (jaringan internal)

    • Endpoint SASL (jaringan internal + autentikasi)

    • Endpoint jaringan publik (memerlukan permintaan terpisah)

    Anda dapat menggunakan Konsol pengembangan Flink untuk melakukan diagnostik jaringan dan mengesampingkan masalah konektivitas dengan alamat bootstrap.servers.

  2. Periksa grup keamanan dan daftar putih

    Instans Kafka harus menambahkan Blok CIDR VPC tempat Flink berada ke daftar putihnya. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC dan Konfigurasikan daftar putih.

  3. Periksa konfigurasi SASL (jika diaktifkan)

    Jika Anda menggunakan endpoint SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Autentikasi yang tidak lengkap dapat menyebabkan koneksi gagal pada fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan autentikasi.

Kafka yang dikelola sendiri di ECS

  1. Gunakan Konsol pengembangan Flink untuk melakukan diagnostik jaringan.

    Evaluasi masalah konektivitas dengan alamat bootstrap.servers dan konfirmasi kebenaran endpoint jaringan internal dan publik.

  2. Periksa grup keamanan dan daftar putih

    • Grup keamanan ECS harus mengizinkan trafik pada port endpoint Kafka (biasanya 9092 atau 9093).

    • Instans ECS harus menambahkan Blok CIDR VPC tempat Flink berada ke daftar putihnya. Untuk informasi selengkapnya, lihat Lihat Blok CIDR VPC.

  3. Periksa konfigurasi

    1. Masuk ke kluster ZooKeeper yang digunakan oleh Kafka. Anda dapat menggunakan tool zkCli.sh atau zookeeper-shell.sh.

    2. Jalankan perintah untuk mengambil metadata broker. Contohnya: get /brokers/ids/0. Dalam hasil yang dikembalikan, temukan alamat yang diiklankan Kafka kepada klien di bidang endpoints.

      example

    3. Gunakan Konsol pengembangan Flink untuk melakukan diagnostik jaringan untuk menguji apakah alamat tersebut dapat dijangkau.

      Catatan
      • Jika alamat tersebut tidak dapat dijangkau, hubungi insinyur O&M Kafka untuk memeriksa dan memperbaiki konfigurasi listeners dan advertised.listeners. Pastikan alamat yang dikembalikan dapat diakses oleh Flink.

      • Untuk informasi selengkapnya tentang cara klien Kafka menghubungkan ke server, lihat Pemecahan Masalah Konektivitas.

  4. Periksa konfigurasi SASL (jika diaktifkan)

    Jika Anda menggunakan endpoint SASL_SSL, Anda harus mengonfigurasi mekanisme JAAS, SSL, dan SASL dengan benar dalam pekerjaan Flink Anda. Autentikasi yang tidak lengkap dapat menyebabkan koneksi gagal pada fase handshake, yang juga dapat muncul sebagai timeout. Untuk informasi selengkapnya, lihat Keamanan dan autentikasi.

SQL

Konektor Kafka dapat digunakan dalam pekerjaan SQL sebagai tabel sumber atau tabel sink.

Sintaks

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)

Kolom metadata

Anda dapat mendefinisikan kolom metadata di tabel sumber dan sink untuk mengakses atau menulis metadata pesan Kafka. Misalnya, jika Anda mendefinisikan beberapa topik dalam parameter WITH dan mendefinisikan kolom metadata di tabel sumber Kafka, data yang dibaca Flink ditandai dengan topik asalnya. Contoh berikut menunjukkan cara menggunakan kolom metadata.

CREATE TABLE kafka_source (
  -- Baca topik pesan sebagai bidang `record_topic`.
  `record_topic` STRING NOT NULL METADATA FROM 'topic' VIRTUAL,
  -- Baca timestamp dari ConsumerRecord sebagai bidang `ts`.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  -- Baca offset pesan sebagai bidang `record_offset`.
  `record_offset` BIGINT NOT NULL METADATA FROM 'offset' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

CREATE TABLE kafka_sink (
  -- Tulis timestamp dari bidang `ts` sebagai timestamp ProducerRecord ke Kafka.
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
  ...
) WITH (
  'connector' = 'kafka',
  ...
);

Tabel berikut mencantumkan kolom metadata yang didukung untuk tabel sumber dan sink Kafka.

Kunci

Tipe data

Deskripsi

Tabel sumber atau sink

topic

STRING NOT NULL METADATA VIRTUAL

Nama topik yang berisi pesan Kafka.

Tabel sumber

partition

INT NOT NULL METADATA VIRTUAL

ID partisi yang berisi pesan Kafka.

Tabel sumber

headers

MAP<STRING, BYTES> NOT NULL METADATA VIRTUAL

Header pesan Kafka.

Tabel sumber dan sink

leader-epoch

INT NOT NULL METADATA VIRTUAL

Leader epoch pesan Kafka.

Tabel sumber

offset

BIGINT NOT NULL METADATA VIRTUAL

Offset pesan Kafka.

Tabel sumber

timestamp

TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL METADATA VIRTUAL

Timestamp pesan Kafka.

Tabel sumber dan sink

timestamp-type

STRING NOT NULL METADATA VIRTUAL

Jenis timestamp pesan Kafka:

  • NoTimestampType: Tidak ada timestamp yang didefinisikan dalam pesan.

  • CreateTime: Waktu saat pesan dibuat.

  • LogAppendTime: Waktu saat pesan ditambahkan ke broker Kafka.

Tabel sumber

__raw_key__

STRING NOT NULL METADATA VIRTUAL

Bidang kunci pesan Kafka mentah.

Tabel sumber dan sink

__raw_value__

STRING NOT NULL METADATA VIRTUAL

Bidang nilai pesan Kafka mentah.

Tabel sumber dan sink

Parameter WITH

  • Umum

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    connector

    Jenis tabel.

    String

    Ya

    Tidak ada

    Nilainya tetap Kafka.

    properties.bootstrap.servers

    Alamat broker Kafka.

    String

    Ya

    Tidak ada

    Formatnya host:port,host:port,host:port. Pisahkan alamat dengan koma (,).

    properties.*

    Konfigurasi langsung untuk klien Kafka.

    String

    Tidak

    Tidak ada

    Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk produsen dan konsumen.

    Flink menghapus awalan properties. dan meneruskan konfigurasi yang tersisa ke klien Kafka. Misalnya, Anda dapat menggunakan 'properties.allow.auto.create.topics'='false' untuk menonaktifkan pembuatan topik otomatis.

    Jangan gunakan metode ini untuk memodifikasi konfigurasi berikut, karena akan ditimpa oleh konektor Kafka:

    • key.deserializer

    • value.deserializer

    format

    Format yang digunakan untuk membaca atau menulis bagian nilai pesan Kafka.

    String

    Tidak

    Tidak ada

    Format yang didukung:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Catatan

    Untuk informasi selengkapnya tentang pengaturan parameter format, lihat Parameter Format.

    key.format

    Format yang digunakan untuk membaca atau menulis bagian kunci pesan Kafka.

    String

    Tidak

    Tidak ada

    Format yang didukung:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    Catatan

    Jika Anda menggunakan konfigurasi ini, konfigurasi key.options wajib ditentukan.

    key.fields

    Bidang di tabel sumber atau sink yang sesuai dengan bagian kunci pesan Kafka.

    String

    Tidak

    Tidak ada

    Pisahkan beberapa nama bidang dengan titik koma (;). Contohnya: field1;field2

    key.fields-prefix

    Menentukan awalan khusus untuk semua bidang kunci pesan Kafka guna menghindari konflik nama dengan bidang di bagian nilai pesan.

    String

    Tidak

    Tidak ada

    Item konfigurasi ini hanya digunakan untuk membedakan nama kolom di tabel sumber dan sink. Awalan dihapus saat mengurai dan menghasilkan bagian kunci pesan Kafka.

    Catatan

    Jika Anda menggunakan konfigurasi ini, Anda harus mengatur value.fields-include ke EXCEPT_KEY.

    value.format

    Format yang digunakan untuk membaca atau menulis bagian nilai pesan Kafka.

    String

    Tidak

    Tidak ada

    Konfigurasi ini setara dengan format. Anda hanya dapat mengatur salah satu dari format atau value.format. Jika keduanya dikonfigurasi, value.format akan menimpa format.

    value.fields-include

    Menentukan apakah akan menyertakan bidang yang sesuai dengan bagian kunci pesan saat mengurai atau menghasilkan bagian nilai pesan Kafka.

    String

    Tidak

    ALL

    Nilai yang valid:

    • ALL (default): Semua kolom diproses sebagai bagian nilai pesan Kafka.

    • EXCEPT_KEY: Bidang yang tersisa, tidak termasuk yang didefinisikan di key.fields, diproses sebagai bagian nilai pesan Kafka.

  • Tabel sumber

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    topic

    Nama topik untuk dibaca.

    String

    Tidak

    Tidak ada

    Pisahkan beberapa nama topik dengan titik koma (;), misalnya topic-1;topic-2.

    Catatan

    Anda hanya dapat menentukan salah satu opsi topic atau topic-pattern.

    topic-pattern

    Ekspresi reguler yang cocok dengan nama topik untuk dibaca. Semua topik yang cocok dengan ekspresi reguler ini akan dibaca saat pekerjaan berjalan.

    String

    Tidak

    Tidak ada

    Catatan

    Anda hanya dapat menentukan salah satu opsi topic atau topic-pattern.

    properties.group.id

    ID kelompok konsumen.

    String

    Tidak

    KafkaSource-{source_table_name}

    Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.

    scan.startup.mode

    Offset awal untuk membaca data dari Kafka.

    String

    Tidak

    group-offsets

    Nilai yang valid:

    • earliest-offset: Mulai membaca dari partisi paling awal di Kafka.

    • latest-offset: Mulai membaca dari offset terbaru di Kafka.

    • group-offsets (default): Mulai membaca dari offset yang dikomit oleh properties.group.id yang ditentukan.

    • timestamp: Mulai membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

    • specific-offsets: Mulai membaca dari offset yang ditentukan oleh scan.startup.specific-offsets.

    Catatan

    Parameter ini berlaku saat pekerjaan dimulai tanpa state. Saat pekerjaan dimulai ulang dari checkpoint atau pulih dari state, pekerjaan akan menggunakan progres yang disimpan dalam state untuk melanjutkan pembacaan.

    scan.startup.specific-offsets

    Dalam mode startup specific-offsets, menentukan offset awal untuk setiap partisi.

    String

    Tidak

    Tidak ada

    Contoh: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    Dalam mode startup timestamp, menentukan timestamp offset awal.

    Long

    Tidak

    Tidak ada

    Unitnya milidetik.

    scan.topic-partition-discovery.interval

    Interval untuk menemukan topik dan partisi Kafka secara dinamis.

    Duration

    Tidak

    5 menit

    Interval pemeriksaan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur interval pemeriksaan partisi ke nilai non-positif. Saat penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data darinya. Dalam mode topic-pattern, sumber Kafka tidak hanya membaca data dari partisi baru topik yang ada, tetapi juga membaca data dari semua partisi topik baru yang cocok dengan ekspresi reguler.

    Catatan

    Dalam Ververica Runtime (VVR) 6.0.x, penemuan partisi dinamis dinonaktifkan secara default. Mulai dari VVR 8.0, fitur ini diaktifkan secara default dengan interval penemuan 5 menit.

    scan.header-filter

    Memfilter data berdasarkan apakah data Kafka berisi header tertentu.

    String

    Tidak

    Tidak ada

    Pisahkan kunci header dan nilai dengan titik dua (:). Hubungkan beberapa kondisi header dengan operator logika (&, |). Operator logika NOT (!) juga didukung. Misalnya, depart:toy|depart:book&!env:test menyimpan data Kafka yang headernya berisi depart=toy atau depart=book, dan tidak berisi env=test.

    Catatan
    • Parameter ini hanya didukung di Ververica Runtime (VVR) 8.0.6 dan versi lebih baru.

    • Tanda kurung tidak didukung dalam operasi logika.

    • Operasi logika dilakukan dari kiri ke kanan.

    • Nilai header dikonversi ke string dalam format UTF-8 untuk perbandingan dengan nilai header yang ditentukan.

    scan.check.duplicated.group.id

    Menentukan apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh properties.group.id.

    Boolean

    Tidak

    false

    Nilai yang valid:

    • true: Sebelum memulai pekerjaan, sistem memeriksa duplikasi kelompok konsumen. Jika ditemukan duplikasi, pekerjaan melaporkan error dan berhenti, mencegah konflik dengan kelompok konsumen yang ada.

    • false: Pekerjaan langsung dimulai tanpa memeriksa konflik kelompok konsumen.

    Catatan

    Parameter ini hanya didukung di VVR 6.0.4 dan versi lebih baru.

  • Tabel sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Nilai default

    Keterangan

    topic

    Nama topik untuk ditulis.

    String

    Ya

    Tidak ada

    Tidak ada

    sink.partitioner

    Mode pemetaan dari konkurensi Flink ke partisi Kafka.

    String

    Tidak

    default

    Nilai yang valid:

    • default: Menggunakan partitioner Kafka default.

    • fixed: Setiap konkurensi Flink sesuai dengan partisi Kafka tetap.

    • round-robin: Data dari konkurensi Flink dialokasikan ke partisi Kafka secara round-robin.

    • Partitioner kustom: Jika fixed dan round-robin tidak memenuhi kebutuhan Anda, Anda dapat membuat kelas turunan dari FlinkKafkaPartitioner untuk mendefinisikan partitioner kustom. Contohnya: org.mycompany.MyPartitioner

    sink.delivery-guarantee

    Semantik pengiriman untuk tabel sink Kafka.

    String

    Tidak

    at-least-once

    Nilai yang valid:

    • none: Tanpa jaminan. Data mungkin hilang atau diduplikasi.

    • at-least-once (default): Menjamin tidak ada data yang hilang, tetapi data mungkin diduplikasi.

    • exactly-once: Menggunakan transaksi Kafka untuk menjamin data tidak hilang atau diduplikasi.

    Catatan

    Saat menggunakan semantik exactly-once, parameter sink.transactional-id-prefix wajib ditentukan.

    sink.transactional-id-prefix

    Awalan untuk ID transaksi Kafka yang digunakan dalam semantik exactly-once.

    String

    Tidak

    Tidak ada

    Konfigurasi ini hanya berlaku saat sink.delivery-guarantee diatur ke exactly-once.

    sink.parallelism

    Tingkat paralelisme untuk operator tabel sink Kafka.

    Integer

    Tidak

    Tidak ada

    Konkurensi operator hulu ditentukan oleh framework.

Keamanan dan autentikasi

Jika kluster Kafka memerlukan koneksi aman atau autentikasi, Anda dapat menambahkan konfigurasi keamanan dan autentikasi terkait ke parameter WITH dengan awalan properties.. Contoh berikut menunjukkan cara mengonfigurasi tabel Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'PLAIN',
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";'
)

Contoh berikut menunjukkan cara menggunakan SASL_SSL sebagai protokol keamanan dan SCRAM-SHA-256 sebagai mekanisme SASL.

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka',
  ...
  'properties.security.protocol' = 'SASL_SSL',
  /*Konfigurasi SSL*/
  /*Konfigurasikan path ke truststore (sertifikat CA) yang disediakan oleh server.*/
  /*File yang diunggah melalui Manajemen File disimpan di path /flink/usrlib/.*/
  'properties.ssl.truststore.location' = '/flink/usrlib/kafka.client.truststore.jks',
  'properties.ssl.truststore.password' = 'test1234',
  /*Jika autentikasi klien diperlukan, konfigurasikan path ke keystore (kunci privat).*/
  'properties.ssl.keystore.location' = '/flink/usrlib/kafka.client.keystore.jks',
  'properties.ssl.keystore.password' = 'test1234',
  /*Algoritma bagi klien untuk memverifikasi alamat server. Nilai kosong menonaktifkan verifikasi alamat server.*/
  'properties.ssl.endpoint.identification.algorithm' = '',
  /*Konfigurasi SASL*/
  /*Atur mekanisme SASL ke SCRAM-SHA-256.*/
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  /*Konfigurasikan JAAS*/
  'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";'
)

Anda dapat mengunggah sertifikat CA dan kunci privat yang disebutkan dalam contoh ke platform menggunakan fitur Manajemen File di Konsol Realtime Compute. Setelah diunggah, file disimpan di direktori /flink/usrlib. Jika file sertifikat CA yang akan digunakan bernama my-truststore.jks, tentukan 'properties.ssl.truststore.location' = '/flink/usrlib/my-truststore.jks' dalam parameter WITH untuk menggunakan sertifikat ini.

Catatan
  • Contoh di atas berlaku untuk sebagian besar skenario konfigurasi. Sebelum mengonfigurasi konektor Kafka, hubungi insinyur O&M server Kafka untuk mendapatkan konfigurasi keamanan dan autentikasi yang benar.

  • Berbeda dengan Flink open source, editor SQL Realtime Compute for Apache Flink secara otomatis meng-escape tanda kutip ganda ("). Oleh karena itu, Anda tidak perlu menambahkan karakter escape (\) tambahan untuk tanda kutip ganda dalam username dan password saat mengonfigurasi properties.sasl.jaas.config.

Offset awal tabel sumber

Mode startup

Anda dapat menentukan offset baca awal untuk tabel sumber Kafka dengan mengonfigurasi scan.startup.mode:

  • earliest-offset: Mulai membaca dari offset paling awal partisi saat ini.

  • latest-offset: Mulai membaca dari offset terbaru partisi saat ini.

  • group-offsets: Mulai membaca dari offset yang dikomit oleh ID grup yang ditentukan. ID grup ditentukan oleh properties.group.id.

  • timestamp: Mulai membaca dari pesan pertama yang timestamp-nya lebih besar dari atau sama dengan waktu yang ditentukan. Timestamp ditentukan oleh scan.startup.timestamp-millis.

  • specific-offsets: Mulai mengonsumsi dari offset partisi yang ditentukan. Offset ditentukan oleh scan.startup.specific-offsets.

Catatan
  • Jika Anda tidak menentukan offset awal, konsumsi dimulai dari offset yang dikomit (group-offsets) secara default.

  • Parameter scan.startup.mode hanya berlaku untuk pekerjaan yang dimulai tanpa state. Saat pekerjaan stateful dimulai, pekerjaan mulai mengonsumsi dari offset yang disimpan dalam statenya.

Kode berikut memberikan contoh:

CREATE TEMPORARY TABLE kafka_source (
  ...
) WITH (
  'connector' = 'kafka',
  ...
  -- Mulai mengonsumsi dari offset paling awal.
  'scan.startup.mode' = 'earliest-offset',
  -- Mulai mengonsumsi dari offset terbaru.
  'scan.startup.mode' = 'latest-offset',
  -- Mulai mengonsumsi dari offset yang dikomit oleh kelompok konsumen "my-group".
  'properties.group.id' = 'my-group',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest', -- Jika "my-group" digunakan untuk pertama kalinya, mulai mengonsumsi dari offset paling awal.
  'properties.auto.offset.reset' = 'latest', -- Jika "my-group" digunakan untuk pertama kalinya, mulai mengonsumsi dari offset terbaru.
  -- Mulai mengonsumsi dari timestamp milidetik yang ditentukan 1655395200000.
  'scan.startup.mode' = 'timestamp',
  'scan.startup.timestamp-millis' = '1655395200000',
  -- Mulai mengonsumsi dari offset yang ditentukan.
  'scan.startup.mode' = 'specific-offsets',
  'scan.startup.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300'
);

Prioritas offset awal

Prioritas offset awal tabel sumber adalah sebagai berikut:

Prioritas dari tinggi ke rendah

Offset yang disimpan dalam checkpoint atau titik simpan

Waktu mulai yang dipilih di Konsol Realtime Compute saat memulai pekerjaan

Offset awal yang ditentukan oleh scan.startup.mode dalam parameter WITH

Jika scan.startup.mode tidak ditentukan, group-offsets digunakan, dan offset kelompok konsumen yang sesuai digunakan

Jika offset menjadi tidak valid pada langkah apa pun (misalnya, karena kedaluwarsa atau masalah kluster Kafka), sistem menggunakan kebijakan yang ditetapkan dalam properties.auto.offset.reset untuk mengatur ulang offset. Jika parameter ini tidak dikonfigurasi, terjadi exception dan memerlukan intervensi manual.

Skenario umum adalah memulai konsumsi dengan ID grup baru. Pertama, tabel sumber menanyakan kluster Kafka untuk offset yang dikomit oleh grup tersebut. Karena ini pertama kalinya ID grup digunakan, tidak ditemukan offset yang valid. Oleh karena itu, offset diatur ulang sesuai dengan kebijakan yang dikonfigurasi dalam parameter properties.auto.offset.reset. Saat mengonsumsi dengan ID grup baru, Anda harus mengonfigurasi properties.auto.offset.reset untuk menentukan kebijakan pengaturan ulang offset.

Komit offset tabel sumber

Tabel sumber Kafka hanya mengkomit offset konsumen saat ini ke kluster Kafka setelah checkpoint berhasil. Jika interval checkpoint panjang, offset konsumen yang diamati di kluster Kafka akan tertinggal. Selama checkpoint, tabel sumber Kafka menyimpan progres baca saat ini dalam statenya dan tidak bergantung pada offset yang dikomit ke kluster untuk pemulihan kesalahan. Pengkomitan offset hanya untuk memantau progres baca di sisi Kafka. Kegagalan komit offset tidak memengaruhi kebenaran data.

Partitioner kustom untuk tabel sink

Jika partitioner produsen Kafka bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan partitioner kustom untuk menulis data ke partisi yang sesuai. Partitioner kustom harus mewarisi dari FlinkKafkaPartitioner. Setelah pengembangan, kompilasi paket JAR dan unggah ke Konsol Realtime Compute menggunakan fitur Manajemen File. Setelah diunggah dan direferensikan, atur parameter sink.partitioner dalam klausa WITH ke path kelas lengkap partitioner tersebut, seperti org.mycompany.MyPartitioner.

Memilih antara Kafka, Upsert Kafka, dan katalog Kafka JSON

Kafka adalah sistem antrian pesan append-only yang tidak mendukung pembaruan atau penghapusan data. Oleh karena itu, Kafka tidak dapat menangani data Change Data Capture (CDC) hulu atau logika penarikan dari operator seperti agregasi dan join dalam SQL streaming. Untuk menulis data dengan perubahan atau penarikan ke Kafka, gunakan tabel sink Upsert Kafka, yang dirancang khusus untuk menangani data perubahan.

Untuk menyinkronkan data perubahan dari satu atau beberapa tabel di database hulu ke Kafka secara batch dengan mudah, Anda dapat menggunakan katalog Kafka JSON. Jika data yang disimpan di Kafka dalam format JSON, penggunaan katalog Kafka JSON menghilangkan kebutuhan untuk mendefinisikan skema dan parameter WITH. Untuk informasi selengkapnya, lihat Kelola katalog Kafka JSON.

Contoh

Contoh 1: Baca data dari Kafka dan tulis ke Kafka

Baca data Kafka dari topik bernama `source` dan tulis ke topik bernama `sink`. Datanya dalam format CSV.

CREATE TEMPORARY TABLE kafka_source (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'source',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE kafka_sink (
  id INT,
  name STRING,
  age INT
) WITH (
  'connector' = 'kafka',
  'topic' = 'sink',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'properties.group.id' = '<yourKafkaConsumerGroupId>',
  'format' = 'csv'
);

INSERT INTO kafka_sink SELECT id, name, age FROM kafka_source;

Contoh 2: Sinkronisasi skema dan data tabel

Sinkronisasi pesan dari topik Kafka ke Hologres secara real time. Dalam kasus ini, Anda dapat menggunakan offset dan ID partisi pesan Kafka sebagai kunci primer untuk memastikan tidak ada pesan duplikat di Hologres selama failover.

CREATE TEMPORARY TABLE kafkaTable (
  `offset` INT NOT NULL METADATA,
  `part` BIGINT NOT NULL METADATA FROM 'partition',
  PRIMARY KEY (`part`, `offset`) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.infer-schema.flatten-nested-columns.enable' = 'true'
    -- Opsional. Meratakan semua kolom bersarang.
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;

Contoh 3: Sinkronisasi skema tabel dan data kunci serta nilai pesan Kafka

Jika bagian kunci pesan Kafka sudah menyimpan informasi terkait, Anda dapat menyinkronkan kunci dan nilai dari Kafka.

CREATE TEMPORARY TABLE kafkaTable (
  `key_id` INT NOT NULL,
  `val_name` VARCHAR(200)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_',
  'value.fields-prefix' = 'val_',
  'value.fields-include' = 'EXCEPT_KEY'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka`(
WITH (
  'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable;
Catatan

Bagian kunci pesan Kafka tidak mendukung perubahan skema tabel atau penguraian tipe. Anda harus mendeklarasikannya secara manual.

Contoh 4: Sinkronisasi skema dan data tabel serta melakukan komputasi

Saat menyinkronkan data Kafka ke Hologres, Anda sering perlu melakukan komputasi ringan.

CREATE TEMPORARY TABLE kafkaTable (
  `distinct_id` INT NOT NULL,
  `properties` STRING,
  `timestamp` TIMESTAMP_LTZ METADATA,
  `date` AS CAST(`timestamp` AS DATE)
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '<yourKafkaBrokers>',
  'topic' = 'kafka_evolution_demo',
  'scan.startup.mode' = 'earliest-offset',
  'key.format' = 'json',
  'value.format' = 'json',
  'key.fields' = 'key_id',
  'key.fields-prefix' = 'key_'
);

CREATE TABLE IF NOT EXISTS hologres.kafka.`sync_kafka` WITH (
   'connector' = 'hologres'
) AS TABLE vvp.`default`.kafkaTable
ADD COLUMN
  `order_id` AS COALESCE(JSON_VALUE(`properties`, '$.order_id'), 'default');
-- Gunakan COALESCE untuk menangani nilai null.

Contoh 5: Uraikan JSON bersarang

Contoh pesan JSON

{
  "id": 101,
  "name": "VVP",
  "properties": {
    "owner": "Alibaba Cloud",
    "engine": "Flink"
  }
}

Untuk menghindari penggunaan fungsi seperti JSON_VALUE(payload, '$.properties.owner') untuk mengurai bidang nanti, Anda dapat mendefinisikan struktur langsung dalam DDL Sumber:

CREATE TEMPORARY TABLE kafka_source (
  id          VARCHAR,
  `name`      VARCHAR,
  properties  ROW<`owner` STRING, engine STRING>
) WITH (
  'connector' = 'kafka',
  'topic' = 'xxx',
  'properties.bootstrap.servers' = 'xxx',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

Dengan cara ini, Flink mengurai JSON menjadi bidang terstruktur pada tahap baca. Kueri SQL selanjutnya dapat langsung menggunakan properties.owner tanpa panggilan fungsi tambahan, menghasilkan kinerja keseluruhan yang lebih baik.

API Datastream

Penting

Untuk membaca dan menulis data menggunakan API DataStream, Anda harus menggunakan konektor DataStream yang sesuai untuk menghubungkan ke Realtime Compute for Apache Flink. Untuk informasi tentang cara menyiapkan konektor DataStream, lihat Gunakan konektor DataStream.

  • Buat sumber Kafka

    Sumber Kafka menyediakan kelas builder untuk membuat instance KafkaSource. Contoh kode berikut menunjukkan cara membuat sumber Kafka untuk mengonsumsi data dari offset paling awal `input-topic`. Kelompok konsumen bernama `my-group`, dan isi pesan Kafka dideserialisasi menjadi string.

    Java

    KafkaSource<String> source = KafkaSource.<String>builder()
        .setBootstrapServers(brokers)
        .setTopics("input-topic")
        .setGroupId("my-group")
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new SimpleStringSchema())
        .build();
    
    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

    Saat membuat KafkaSource, Anda harus menentukan parameter berikut.

    Parameter

    Deskripsi

    BootstrapServers

    Alamat broker Kafka. Konfigurasikan ini menggunakan metode setBootstrapServers(String).

    GroupId

    ID kelompok konsumen. Konfigurasikan ini menggunakan metode setGroupId(String).

    Topics or Partition

    Nama topik atau partisi yang berlangganan. Sumber Kafka menyediakan tiga cara berikut untuk berlangganan topik atau partisi:

    • Daftar topik: Berlangganan semua partisi dalam daftar topik.

      KafkaSource.builder().setTopics("topic-a","topic-b")
    • Pencocokan ekspresi reguler: Berlangganan semua partisi topik yang cocok dengan ekspresi reguler.

      KafkaSource.builder().setTopicPattern("topic.*")
    • Daftar partisi: Berlangganan partisi yang ditentukan.

      final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
              new TopicPartition("topic-a", 0),    // Partisi 0 dari topik "topic-a"
              new TopicPartition("topic-b", 5)));  // Partisi 5 dari topik "topic-b"
      KafkaSource.builder().setPartitions(partitionSet)

    Deserializer

    Deserializer untuk mengurai pesan Kafka.

    Tentukan deserializer menggunakan setDeserializer(KafkaRecordDeserializationSchema), di mana KafkaRecordDeserializationSchema mendefinisikan cara mengurai ConsumerRecord Kafka. Jika Anda hanya perlu mengurai data dalam isi pesan (nilai) pesan Kafka, Anda dapat melakukannya dengan salah satu cara berikut:

    • Gunakan metode setValueOnlyDeserializer(DeserializationSchema) dalam kelas builder KafkaSource yang disediakan oleh Flink. DeserializationSchema mendefinisikan cara mengurai data biner dalam isi pesan Kafka.

    • Gunakan parser yang disediakan oleh Kafka, yang mencakup beberapa kelas implementasi. Misalnya, Anda dapat menggunakan StringDeserializer untuk mengurai isi pesan Kafka menjadi string.

      import org.apache.kafka.common.serialization.StringDeserializer;
      
      KafkaSource.<String>builder()
              .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
    Catatan

    Untuk mengurai ConsumerRecord sepenuhnya, Anda harus mengimplementasikan antarmuka KafkaRecordDeserializationSchema sendiri.

    XML

    Konektor DataStream Kafka tersedia di Repositori Maven Central.

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>ververica-connector-kafka</artifactId>
        <version>${vvr-version}</version>
    </dependency>

    Saat menggunakan konektor DataStream Kafka, Anda perlu memahami properti Kafka berikut:

    • Offset awal konsumen

      Sumber Kafka dapat menentukan offset awal untuk konsumsi melalui inisialisator offset (OffsetsInitializer). Inisialisator offset bawaan meliputi yang berikut.

      Inisialisator offset

      Pengaturan kode

      Mulai mengonsumsi dari offset paling awal.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.earliest())

      Mulai mengonsumsi dari offset terbaru.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.latest())

      Mulai mengonsumsi dari data dengan timestamp lebih besar dari atau sama dengan waktu yang ditentukan, dalam milidetik.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.timestamp(1592323200000L))

      Mulai mengonsumsi dari offset yang dikomit oleh kelompok konsumen. Jika offset yang dikomit tidak ada, gunakan offset paling awal.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))

      Mulai mengonsumsi dari offset yang dikomit oleh kelompok konsumen, tanpa menentukan strategi pengaturan ulang offset.

      KafkaSource.builder().setStartingOffsets(OffsetsInitializer.committedOffsets())

      Catatan
      • Jika inisialisator bawaan tidak memenuhi kebutuhan Anda, Anda dapat mengimplementasikan inisialisator offset kustom.

      • Jika tidak ada inisialisator offset yang ditentukan, OffsetsInitializer.earliest() (offset paling awal) digunakan secara default.

    • Mode streaming dan batch

      Sumber Kafka mendukung mode runtime streaming dan batch. Secara default, sumber Kafka diatur untuk berjalan dalam mode streaming, sehingga pekerjaan tidak pernah berhenti sampai pekerjaan Flink gagal atau dibatalkan. Untuk mengonfigurasi sumber Kafka agar berjalan dalam mode batch, Anda dapat menggunakan setBounded(OffsetsInitializer) untuk menentukan offset berhenti. Saat semua partisi mencapai offset berhentinya, sumber Kafka keluar.

      Catatan

      Umumnya, tidak ada offset berhenti dalam mode streaming. Untuk memudahkan debugging kode, Anda dapat menggunakan setUnbounded(OffsetsInitializer) untuk menentukan offset berhenti dalam mode streaming. Perhatikan bahwa nama metode untuk menentukan offset berhenti dalam mode streaming dan batch (setUnbounded dan setBounded) berbeda.

    • Penemuan partisi dinamis

      Untuk menangani skenario seperti penskalaan topik atau pembuatan topik baru tanpa memulai ulang pekerjaan Flink, Anda dapat mengaktifkan fitur penemuan partisi dinamis dalam mode langganan topik atau partisi yang disediakan.

      Catatan

      Fitur penemuan partisi dinamis diaktifkan secara default, dan interval pemeriksaan partisi adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur interval pemeriksaan partisi ke nilai non-positif. Kode berikut memberikan contoh.

      KafkaSource.builder()
          .setProperty("partition.discovery.interval.ms", "10000") // Periksa partisi baru setiap 10 detik.
      Penting

      Fitur penemuan partisi dinamis bergantung pada mekanisme pembaruan metadata kluster Kafka. Jika kluster Kafka tidak memperbarui informasi partisi secara tepat waktu, partisi baru mungkin tidak ditemukan. Pastikan konfigurasi partition.discovery.interval.ms kluster Kafka sesuai dengan situasi aktual.

    • Waktu event dan watermark

      Secara default, sumber Kafka menggunakan timestamp dalam pesan Kafka sebagai waktu event. Anda dapat mendefinisikan strategi watermark kustom untuk mengekstrak waktu event dari pesan dan mengirim watermark ke downstream.

      env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy")

      Untuk informasi selengkapnya tentang strategi watermark kustom, lihat Generating Watermarks.

      Catatan

      Jika beberapa tugas sumber paralel menganggur untuk waktu yang lama (misalnya, jika partisi Kafka tidak memiliki input data untuk waktu yang lama, atau jika konkurensi sumber melebihi jumlah partisi Kafka), mekanisme pembuatan watermark mungkin gagal. Dalam kasus ini, sistem tidak dapat memicu perhitungan window secara normal, yang menyebabkan aliran pemrosesan data macet.

      Untuk mengatasi masalah ini, Anda dapat melakukan penyesuaian berikut:

      • Konfigurasikan mekanisme timeout watermark: Aktifkan parameter table.exec.source.idle-timeout untuk memaksa sistem menghasilkan watermark setelah periode timeout tertentu. Ini memastikan siklus perhitungan window berjalan.

      • Optimalkan sumber data: Kami merekomendasikan mempertahankan rasio partisi Kafka terhadap konkurensi sumber yang wajar (disarankan: jumlah partisi ≥ tingkat paralelisme sumber).

    • Consumer Offset Commit

      Sumber Kafka mengkomit offset konsumen saat ini ketika checkpoint selesai. Ini memastikan bahwa state checkpoint Flink konsisten dengan offset yang dikomit di broker Kafka. Jika checkpointing tidak diaktifkan, sumber Kafka bergantung pada logika komit offset otomatis internal konsumen Kafka. Fitur komit otomatis dikonfigurasi oleh item konfigurasi konsumen Kafka enable.auto.commit dan auto.commit.interval.ms.

      Catatan

      Sumber Kafka tidak bergantung pada offset yang dikomit di broker untuk memulihkan pekerjaan yang gagal. Pengkomitan offset hanya untuk melaporkan progres konsumsi konsumen Kafka dan kelompok konsumen untuk pemantauan di sisi broker.

    • Properti lainnya

      Selain properti yang disebutkan di atas, Anda dapat menggunakan setProperties(Properties) dan setProperty(String, String) untuk mengatur properti apa pun untuk sumber Kafka dan konsumen Kafka. KafkaSource biasanya memiliki item konfigurasi berikut.

      Item konfigurasi

      Deskripsi

      client.id.prefix

      Menentukan awalan ID klien untuk konsumen Kafka.

      partition.discovery.interval.ms

      Menentukan interval di mana sumber Kafka memeriksa partisi baru.

      Catatan

      Nilai partition.discovery.interval.ms ditimpa menjadi -1 dalam mode Batch.

      register.consumer.metrics

      Menentukan apakah akan mendaftarkan metrik konsumen Kafka di Flink.

      Konfigurasi konsumen Kafka lainnya

      Untuk informasi selengkapnya tentang konfigurasi konsumen Kafka, lihat Apache Kafka.

      Penting

      Konektor Kafka secara paksa menimpa beberapa parameter yang dikonfigurasi secara manual sebagai berikut:

      • key.deserializer selalu ditimpa ke ByteArrayDeserializer.

      • value.deserializer selalu ditimpa ke ByteArrayDeserializer.

      • auto.offset.reset.strategy ditimpa ke OffsetsInitializer#getAutoOffsetResetStrategy().

      Contoh berikut menunjukkan cara mengonfigurasi konsumen Kafka untuk menggunakan PLAIN sebagai mekanisme SASL dan menyediakan konfigurasi JAAS.

      KafkaSource.builder()
          .setProperty("sasl.mechanism", "PLAIN")
          .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")
    • Pemantauan

      Sumber Kafka mendaftarkan metrik di Flink untuk pemantauan dan diagnostik.

      • Cakupan metrik

        Semua metrik pembaca sumber Kafka didaftarkan di bawah grup metrik KafkaSourceReader, yang merupakan subgrup dari grup metrik operator. Metrik yang terkait dengan partisi topik tertentu didaftarkan di grup metrik KafkaSourceReader.topic.<topic_name>.partition.<partition_id>.

        Misalnya, offset konsumen saat ini (currentOffset) untuk partisi 1 topik "my-topic" didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.topic.my-topic.partition.1.currentOffset. Jumlah komit offset yang berhasil (commitsSucceeded) didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.commitsSucceeded.

      • Daftar metrik

        Nama metrik

        Deskripsi

        Cakupan

        currentOffset

        Offset konsumen saat ini.

        TopicPartition

        committedOffset

        Offset yang dikomit saat ini.

        TopicPartition

        commitsSucceeded

        Jumlah komit yang berhasil.

        KafkaSourceReader

        commitsFailed

        Jumlah komit yang gagal.

        KafkaSourceReader

      • Metrik konsumen Kafka

        Metrik konsumen Kafka didaftarkan di grup metrik KafkaSourceReader.KafkaConsumer. Misalnya, metrik konsumen Kafka records-consumed-total didaftarkan di <some_parent_groups>.operator.KafkaSourceReader.KafkaConsumer.records-consumed-total.

        Anda dapat menggunakan item konfigurasi register.consumer.metrics untuk menentukan apakah akan mendaftarkan metrik konsumen Kafka. Secara default, opsi ini diatur ke true. Untuk informasi selengkapnya tentang metrik konsumen Kafka, lihat Apache Kafka.

  • Buat sink Kafka

    Sink Kafka Flink dapat menulis data stream ke satu atau beberapa topik Kafka.

    DataStream<String> stream = ...
    
    
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", );
    KafkaSink<String> kafkaSink =
                    KafkaSink.<String>builder()
                            .setKafkaProducerConfig(kafkaProperties) // konfigurasi produsen
                            .setRecordSerializer(
                                    KafkaRecordSerializationSchema.builder()
                                            .setTopic("my-topic") // topik target
                                            .setKafkaValueSerializer(StringSerializer.class) // skema serialisasi
                                            .build())
                            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // toleransi kesalahan
                            .build();
    
    stream.sinkTo(kafkaSink);

    Anda perlu mengonfigurasi parameter berikut.

    Parameter

    Deskripsi

    Topik

    Nama topik default tempat data ditulis.

    Serialisasi data

    Saat membangun, Anda perlu menyediakan KafkaRecordSerializationSchema untuk mengonversi data input menjadi ProducerRecord Kafka. Flink menyediakan builder skema untuk menawarkan beberapa komponen umum, seperti serialisasi kunci/nilai pesan, pemilihan topik, dan partisi pesan. Anda juga dapat mengimplementasikan antarmuka yang sesuai untuk kontrol lebih lanjut. Metode ProducerRecord<byte[], byte[]> serialize(T element, KafkaSinkContext context, Long timestamp) dipanggil untuk setiap catatan data yang masuk untuk menghasilkan ProducerRecord yang akan ditulis ke Kafka.

    Anda dapat memiliki kontrol detail halus atas cara setiap catatan data ditulis ke Kafka. Dengan ProducerRecord, Anda dapat melakukan operasi berikut:

    • Atur nama topik tempat menulis.

    • Tentukan kunci pesan.

    • Tentukan partisi tempat data ditulis.

    Properti klien Kafka

    bootstrap.servers wajib diisi. Ini adalah daftar broker Kafka yang dipisahkan koma.

    Semantik toleransi kesalahan

    Saat checkpointing Flink diaktifkan, sink Kafka Flink dapat menjamin semantik tepat-sekali. Selain mengaktifkan checkpointing Flink, Anda juga dapat menentukan semantik toleransi kesalahan yang berbeda melalui parameter DeliveryGuarantee. Detail parameter DeliveryGuarantee adalah sebagai berikut:

    • DeliveryGuarantee.NONE: (Pengaturan default) Flink tidak memberikan jaminan apa pun. Data mungkin hilang atau diduplikasi.

    • DeliveryGuarantee.AT_LEAST_ONCE: Menjamin tidak ada data yang hilang, tetapi data mungkin diduplikasi.

    • DeliveryGuarantee.EXACTLY_ONCE: Menggunakan transaksi Kafka untuk menyediakan semantik tepat-sekali.

      Catatan

      Untuk pertimbangan saat menggunakan semantik EXACTLY_ONCE, lihat Pertimbangan untuk semantik EXACTLY_ONCE.

Integrasi data

Konektor Kafka dapat digunakan dalam pengembangan pekerjaan YAML integrasi data untuk membaca dari sumber atau menulis ke target.

Batasan

  • Kami merekomendasikan menggunakan Kafka sebagai sumber data sinkron untuk integrasi data Flink Change Data Capture (CDC) di Ververica Runtime (VVR) 11.1 dan versi lebih baru.

  • Hanya format JSON, Debezium JSON, dan Canal JSON yang didukung. Format data lain tidak didukung saat ini.

  • Untuk sumber data, hanya Ververica Runtime (VVR) 8.0.11 dan versi lebih baru yang mendukung pendistribusian data dari satu tabel ke beberapa partisi.

Sintaks

source:
  type: kafka
  name: Kafka source
  properties.bootstrap.servers: localhost:9092
  topic: ${kafka.topic}
sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: localhost:9092

Item konfigurasi

  • Umum

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    type

    Jenis sumber atau target.

    Ya

    String

    Tidak ada

    Atur parameter ini ke kafka.

    name

    Nama sumber atau target.

    Tidak

    String

    Tidak ada

    Tidak ada

    properties.bootstrap.servers

    Alamat broker Kafka.

    Ya

    String

    Tidak ada

    Formatnya host:port,host:port,host:port. Pisahkan alamat dengan koma (,).

    properties.*

    Konfigurasi langsung untuk klien Kafka.

    Tidak

    String

    Tidak ada

    Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk produsen dan konsumen.

    Flink menghapus awalan properties. dan meneruskan konfigurasi yang tersisa ke klien Kafka. Misalnya, Anda dapat menggunakan 'properties.allow.auto.create.topics' = 'false' untuk menonaktifkan pembuatan topik otomatis.

    key.format

    Format yang digunakan untuk membaca atau menulis bagian kunci pesan Kafka.

    Tidak

    String

    Tidak ada

    • Untuk sumber, hanya json yang didukung.

    • Untuk sink, nilai yang valid adalah:

      • csv

      • json

    Catatan

    Parameter ini hanya didukung di Ververica Runtime (VVR) 11.0.0 dan versi lebih baru.

    value.format

    Format yang digunakan untuk membaca atau menulis bagian nilai pesan Kafka.

    Tidak

    String

    debezium-json

    Nilai yang valid:

    • debezium-json 

    • canal-json

    • json

    Catatan
    • Format debezium-json dan canal-json hanya didukung di Ververica Runtime (VVR) 8.0.10 dan versi lebih baru.

    • Format json hanya didukung di Ververica Runtime (VVR) 11.0.0 dan versi lebih baru.

  • Tabel sumber

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    topic

    Nama topik untuk dibaca.

    Tidak

    String

    Tidak ada

    Pisahkan beberapa nama topik dengan titik koma (;), misalnya topic-1;topic-2.

    Catatan

    Anda hanya dapat menentukan salah satu opsi topic atau topic-pattern.

    topic-pattern

    Ekspresi reguler yang cocok dengan nama topik untuk dibaca. Semua topik yang cocok dengan ekspresi reguler ini akan dibaca saat pekerjaan berjalan.

    Tidak

    String

    Tidak ada

    Catatan

    Anda hanya dapat menentukan salah satu opsi topic atau topic-pattern.

    properties.group.id

    ID kelompok konsumen.

    Tidak

    String

    Tidak ada

    Jika ID grup yang ditentukan digunakan untuk pertama kalinya, Anda harus mengatur properties.auto.offset.reset ke earliest atau latest untuk menentukan offset awal.

    scan.startup.mode

    Offset awal untuk membaca data dari Kafka.

    Tidak

    String

    group-offsets

    Nilai yang valid:

    • earliest-offset: Mulai membaca dari partisi Kafka paling awal.

    • latest-offset: Mulai membaca dari offset Kafka terbaru.

    • group-offsets (default): Mulai membaca dari offset yang dikomit oleh properties.group.id yang ditentukan.

    • timestamp: Mulai membaca dari timestamp yang ditentukan oleh scan.startup.timestamp-millis.

    • specific-offsets: Mulai membaca dari offset yang ditentukan oleh scan.startup.specific-offsets.

    Catatan

    Parameter ini berlaku saat pekerjaan dimulai tanpa state. Saat pekerjaan dimulai ulang dari checkpoint atau pulih dari state, pekerjaan akan menggunakan progres yang disimpan dalam state untuk melanjutkan pembacaan.

    scan.startup.specific-offsets

    Dalam mode startup specific-offsets, menentukan offset awal untuk setiap partisi.

    Tidak

    String

    Tidak ada

    Contoh: partition:0,offset:42;partition:1,offset:300

    scan.startup.timestamp-millis

    Dalam mode startup timestamp, menentukan timestamp offset awal.

    Tidak

    Long

    Tidak ada

    Unitnya milidetik.

    scan.topic-partition-discovery.interval

    Interval untuk menemukan topik dan partisi Kafka secara dinamis.

    Tidak

    Duration

    5 menit

    Interval pemeriksaan partisi default adalah 5 menit. Untuk menonaktifkan fitur ini, Anda harus secara eksplisit mengatur interval pemeriksaan partisi ke nilai non-positif. Saat penemuan partisi dinamis diaktifkan, sumber Kafka dapat secara otomatis menemukan partisi baru dan membaca data darinya. Dalam mode topic-pattern, sumber Kafka tidak hanya membaca data dari partisi baru topik yang ada, tetapi juga membaca data dari semua partisi topik baru yang cocok dengan ekspresi reguler.

    scan.check.duplicated.group.id

    Menentukan apakah akan memeriksa duplikasi kelompok konsumen yang ditentukan oleh properties.group.id.

    Tidak

    Boolean

    false

    Nilai yang valid:

    • true: Sebelum memulai pekerjaan, sistem memeriksa duplikasi kelompok konsumen. Jika ditemukan duplikasi, pekerjaan melaporkan error, mencegah konflik dengan kelompok konsumen yang ada.

    • false: Pekerjaan langsung dimulai tanpa memeriksa konflik kelompok konsumen.

    schema.inference.strategy

    Strategi penguraian skema.

    Tidak

    String

    continuous

    Nilai yang valid:

    • continuous: Mengurai skema untuk setiap catatan data. Jika skema sebelumnya dan sesudahnya tidak kompatibel, skema yang lebih luas diurai, dan event perubahan skema dihasilkan.

    • static: Mengurai skema hanya sekali saat pekerjaan dimulai. Data selanjutnya diurai berdasarkan skema awal, dan tidak ada event perubahan skema yang dihasilkan.

    Catatan

    scan.max.pre.fetch.records

    Jumlah maksimum pesan yang dicoba dikonsumsi dan diurai untuk setiap partisi selama penguraian skema awal.

    Tidak

    Int

    50

    Sebelum pekerjaan benar-benar membaca dan memproses data, pekerjaan mencoba pra-mengonsumsi jumlah pesan terbaru yang ditentukan untuk setiap partisi untuk menginisialisasi informasi skema.

    key.fields-prefix

    Awalan kustom yang ditambahkan ke nama bidang yang diurai dari kunci pesan untuk menghindari konflik penamaan setelah mengurai kunci pesan Kafka.

    Tidak

    String

    Tidak ada

    Asumsikan item konfigurasi ini diatur ke key_. Jika kunci berisi bidang bernama `a`, nama bidang setelah mengurai kunci akan menjadi `key_a`.

    Catatan

    Nilai key.fields-prefix tidak boleh menjadi awalan dari value.fields-prefix.

    value.fields-prefix

    Awalan kustom yang ditambahkan ke nama bidang yang diurai dari nilai pesan untuk menghindari konflik penamaan setelah mengurai isi pesan Kafka.

    Tidak

    String

    Tidak ada

    Asumsikan item konfigurasi ini diatur ke value_. Jika nilai berisi bidang bernama `b`, nama bidang setelah mengurai nilai akan menjadi `value_b`.

    Catatan

    Nilai value.fields-prefix tidak boleh menjadi awalan dari key.fields-prefix.

    metadata.list

    Kolom metadata yang akan diteruskan ke downstream.

    Tidak

    String

    Tidak ada

    Kolom metadata yang tersedia meliputi topic, partition, offset, timestamp, timestamp-type, headers, leader-epoch, __raw_key__, dan __raw_value__. Pisahkan dengan koma.

    • Format Debezium JSON tabel sumber

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      debezium-json.distributed-tables

      Tidak

      Boolean

      false

      Jika data untuk satu tabel dalam Debezium JSON muncul di beberapa partisi, Anda perlu mengaktifkan opsi ini.

      Catatan

      Item konfigurasi ini hanya didukung di VVR 8.0.11 dan versi lebih baru.

      Penting

      Setelah memodifikasi item konfigurasi ini, Anda perlu memulai pekerjaan tanpa state.

      debezium-json.schema-include

      Tidak

      Boolean

      false

      Saat menyiapkan Debezium Kafka Connect, Anda dapat mengaktifkan konfigurasi Kafka value.converter.schemas.enable untuk menyertakan skema dalam pesan. Opsi ini menunjukkan apakah pesan Debezium JSON menyertakan skema.

      Nilai yang valid:

      • true: Pesan Debezium JSON menyertakan skema.

      • false: Pesan Debezium JSON tidak menyertakan skema.

      debezium-json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewatkan baris saat ini ketika terjadi error penguraian.

      • false (default): Melaporkan error, dan pekerjaan gagal dimulai.

      debezium-json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan mengurai semua tipe sebagai String saat mengurai skema tabel.

      Nilai yang valid:

      • true: Mengurai semua tipe data primitif sebagai String.

      • false (default): Mengurai sesuai aturan dasar.

    • Format Canal JSON tabel sumber

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      canal-json.distributed-tables

      Tidak

      Boolean

      false

      Jika data untuk satu tabel dalam Canal JSON muncul di beberapa partisi, Anda perlu mengaktifkan opsi ini.

      Catatan

      Item konfigurasi ini hanya didukung di VVR 8.0.11 dan versi lebih baru.

      Penting

      Setelah memodifikasi item konfigurasi ini, Anda perlu memulai pekerjaan tanpa state.

      canal-json.database.include

      Tidak

      String

      Tidak ada

      Ekspresi reguler opsional yang cocok dengan bidang metadata `database` dalam catatan Canal. Ini hanya membaca catatan changelog dari database yang ditentukan. String ekspresi reguler kompatibel dengan Pattern Java.

      canal-json.table.include

      Tidak

      String

      Tidak ada

      Ekspresi reguler opsional yang cocok dengan bidang metadata `table` dalam catatan Canal. Ini hanya membaca catatan changelog dari tabel yang ditentukan. String ekspresi reguler kompatibel dengan Pattern Java.

      canal-json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewatkan baris saat ini ketika terjadi error penguraian.

      • false (default): Melaporkan error, dan pekerjaan gagal dimulai.

      canal-json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan mengurai semua tipe sebagai String saat mengurai skema tabel.

      Nilai yang valid:

      • true: Mengurai semua tipe data primitif sebagai String.

      • false (default): Mengurai sesuai aturan dasar.

      canal-json.infer-schema.strategy

      Tidak

      String

      AUTO

      Strategi penguraian untuk skema tabel.

      Nilai yang valid:

      • AUTO (default): Mengurai secara otomatis dengan menganalisis data JSON. Jika data tidak berisi bidang `sqlType`, kami merekomendasikan menggunakan AUTO untuk menghindari kegagalan penguraian.

      • SQL_TYPE: Mengurai menggunakan array `sqlType` dalam data Canal JSON. Jika data berisi bidang `sqlType`, kami merekomendasikan mengatur canal-json.infer-schema.strategy ke SQL_TYPE untuk mendapatkan tipe yang lebih tepat.

      • MYSQL_TYPE: Mengurai menggunakan array `mysqlType` dalam data Canal JSON.

      Saat data Canal JSON di Kafka berisi bidang `sqlType` dan diperlukan pemetaan tipe yang lebih tepat, kami merekomendasikan mengatur canal-json.infer-schema.strategy ke SQL_TYPE.

      Untuk aturan pemetaan `sqlType`, lihat Penguraian skema untuk Canal JSON.

      Catatan
      • Konfigurasi ini didukung di VVR 11.1 dan versi lebih baru.

      • MYSQL_TYPE didukung di VVR 11.3 dan versi lebih baru.

      canal-json.mysql.treat-mysql-timestamp-as-datetime-enabled

      Tidak

      Boolean

      true

      Menentukan apakah akan memetakan tipe MySQL `timestamp` ke tipe CDC `timestamp`:

      • true (default): Tipe MySQL `timestamp` dipetakan ke tipe CDC `timestamp`.

      • false: Tipe MySQL `timestamp` dipetakan ke tipe CDC `timestamp_ltz`.

      canal-json.mysql.treat-tinyint1-as-boolean.enabled

      Tidak

      Boolean

      true

      Saat mengurai dengan MYSQL_TYPE, menentukan apakah akan memetakan tipe MySQL `tinyint(1)` ke tipe CDC `boolean`:

      • true (default): Tipe MySQL `tinyint(1)` dipetakan ke tipe CDC `boolean`.

      • false: Tipe MySQL `tinyint(1)` dipetakan ke tipe CDC `tinyint(1)`.

      Konfigurasi ini hanya berlaku saat canal-json.infer-schema.strategy diatur ke MYSQL_TYPE.

    • Format JSON tabel sumber

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      json.timestamp-format.standard

      Tidak

      String

      SQL

      Menentukan format timestamp input dan output. Nilai yang valid:

      • SQL: Mengurai timestamp input dalam format yyyy-MM-dd HH:mm:ss.s{precision}, misalnya, 2020-12-30 12:13:14.123.

      • ISO-8601: Mengurai timestamp input dalam format yyyy-MM-ddTHH:mm:ss.s{precision}, misalnya, 2020-12-30T12:13:14.123.

      json.ignore-parse-errors

      Tidak

      Boolean

      false

      Nilai yang valid:

      • true: Melewatkan baris saat ini ketika terjadi error penguraian.

      • false (default): Melaporkan error, dan pekerjaan gagal dimulai.

      json.infer-schema.primitive-as-string

      Tidak

      Boolean

      false

      Menentukan apakah akan mengurai semua tipe sebagai String saat mengurai skema tabel.

      Nilai yang valid:

      • true: Mengurai semua tipe data primitif sebagai String.

      • false (default): Mengurai sesuai aturan dasar.

      json.infer-schema.flatten-nested-columns.enable

      Tidak

      Boolean

      false

      Saat mengurai data JSON, menentukan apakah akan memperluas kolom bersarang dalam JSON secara rekursif. Nilai yang valid:

      • true: Memperluas secara rekursif.

      • false (default): Memperlakukan kolom bersarang sebagai String.

      json.decode.parser-table-id.fields

      Tidak

      String

      Tidak ada

      Saat mengurai data JSON, menentukan apakah akan menggunakan beberapa nilai bidang JSON untuk menghasilkan tableId. Hubungkan beberapa bidang dengan koma ,. Misalnya, jika data JSON adalah {"col0":"a", "col1","b", "col2","c"}, hasilnya adalah sebagai berikut:

      Konfigurasi

      tableId

      col0

      a

      col0,col1

      a.b

      col0,col1,col2

      a.b.c

  • Tabel sink

    Parameter

    Deskripsi

    Wajib

    Tipe data

    Nilai default

    Keterangan

    type

    Jenis target.

    Ya

    String

    Tidak ada

    Atur parameter ini ke kafka.

    name

    Nama target.

    Tidak

    String

    Tidak ada

    Tidak ada

    topic

    Nama topik Kafka.

    Tidak

    String

    Tidak ada

    Jika parameter ini ditentukan, semua data ditulis ke topik ini.

    Catatan

    Jika tidak ditentukan, setiap catatan data ditulis ke topik yang sesuai dengan string TableID-nya. TableID dibentuk dengan menggabungkan nama database dan nama tabel menggunakan titik (.), contohnya: databaseName.tableName.

    partition.strategy

    Strategi penulisan data ke partisi Kafka.

    Tidak

    String

    all-to-zero

    Nilai yang valid:

    • all-to-zero (default): Menulis semua data ke partisi 0.

    • hash-by-key: Menulis data ke partisi berdasarkan nilai hash kunci primer. Strategi ini memastikan bahwa catatan data dengan kunci primer yang sama ditulis ke partisi yang sama dan urutannya dipertahankan.

    sink.tableId-to-topic.mapping

    Pemetaan dari nama tabel hulu ke nama topik Kafka downstream.

    Tidak

    String

    Tidak ada

    Gunakan titik koma (;) untuk memisahkan beberapa pemetaan. Dalam setiap pemetaan, gunakan titik dua (:) untuk memisahkan nama tabel hulu dari nama topik downstream. Nama tabel dapat menggunakan ekspresi reguler. Untuk memetakan beberapa tabel ke satu topik yang sama, pisahkan nama tabel dengan koma (,). Contoh: mydb.mytable1:topic1;mydb.mytable2:topic2.

    Catatan

    Parameter ini memungkinkan Anda mengubah topik tujuan sambil tetap mempertahankan informasi nama tabel asli.

    • Format Debezium JSON tabel sink

      Parameter

      Wajib

      Tipe data

      Nilai default

      Deskripsi

      debezium-json.include-schema.enabled

      Tidak

      Boolean

      false

      Menentukan apakah data Debezium JSON menyertakan informasi skema.

Contoh

  • Gunakan Kafka sebagai sumber integrasi data:

    source:
      type: kafka
      name: Kafka source
      properties.bootstrap.servers: ${kafka.bootstraps.server}
      topic: ${kafka.topic}
      value.format: ${value.format}
      scan.startup.mode: ${scan.startup.mode}
     
    sink:
      type: hologres
      name: Hologres sink
      endpoint: <yourEndpoint>
      dbname: <yourDbname>
      username: ${secret_values.ak_id}
      password: ${secret_values.ak_secret}
      sink.type-normalize-strategy: BROADEN
  • Gunakan Kafka sebagai target integrasi data:

    source:
      type: mysql
      name: MySQL Source
      hostname: ${secret_values.mysql.hostname}
      port: ${mysql.port}
      username: ${secret_values.mysql.username}
      password: ${secret_values.mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: kafka
      name: Kafka Sink
      properties.bootstrap.servers: ${kafka.bootstraps.server}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${kafka.topic}

    Di sini, modul `route` digunakan untuk mengatur nama topik untuk menulis dari tabel sumber ke Kafka.

Catatan

ApsaraMQ for Kafka tidak mengaktifkan pembuatan topik otomatis secara default. Untuk informasi selengkapnya, lihat Masalah terkait pembuatan topik otomatis. Saat menulis ke ApsaraMQ for Kafka, Anda perlu membuat topik yang sesuai terlebih dahulu. Untuk informasi selengkapnya, lihat Langkah 3: Buat sumber daya.

Strategi penguraian skema tabel dan sinkronisasi perubahan

  • Pra-konsumsi pesan partisi dan inisialisasi skema tabel

    Konektor Kafka memelihara skema semua tabel yang diketahui saat ini. Sebelum membaca data Kafka, konektor Kafka mencoba pra-mengonsumsi hingga scan.max.pre.fetch.records pesan di setiap partisi. Konektor mengurai skema setiap catatan data dan kemudian menggabungkan skema tersebut untuk menginisialisasi informasi skema tabel. Selanjutnya, sebelum mengonsumsi data, konektor menghasilkan event pembuatan tabel yang sesuai berdasarkan skema yang diinisialisasi.

    Catatan

    Untuk format Debezium JSON dan Canal JSON, informasi tabel diperoleh dari pesan tertentu. Pesan scan.max.pre.fetch.records yang pra-dikonsumsi mungkin berisi data dari beberapa tabel. Oleh karena itu, jumlah catatan data yang pra-dikonsumsi untuk setiap tabel tidak dapat ditentukan. Pra-konsumsi dan inisialisasi skema tabel hanya dilakukan sekali sebelum mengonsumsi dan memproses pesan dari setiap partisi. Jika data tabel baru muncul nanti, skema tabel yang diurai dari catatan data pertama tabel tersebut digunakan sebagai skema awal. Skema untuk tabel tersebut tidak akan diinisialisasi ulang melalui pra-konsumsi.

    Penting

    Hanya VVR 8.0.11 dan versi lebih baru yang mendukung pendistribusian data dari satu tabel ke beberapa partisi. Untuk skenario ini, Anda perlu mengatur item konfigurasi debezium-json.distributed-tables atau canal-json.distributed-tables ke true.

  • Informasi tabel

    • Untuk format Canal JSON dan Debezium JSON, informasi tabel, termasuk database dan nama tabel, diurai dari pesan tertentu.

    • Untuk format JSON, informasi tabel hanya mencakup nama tabel, yaitu nama topik tempat data berada.

  • Informasi kunci primer

    • Untuk format Canal JSON, kunci primer tabel didefinisikan berdasarkan bidang `pkNames` dalam JSON.

    • Untuk format Debezium JSON dan JSON, JSON tidak berisi informasi kunci primer. Anda dapat menambahkan kunci primer ke tabel secara manual menggunakan aturan transformasi:

      transform:
        - source-table: \.*.\.*
          projection: \*
          primary-keys: key1, key2
  • Penguraian skema dan perubahan skema

    Setelah skema tabel diinisialisasi, jika schema.inference.strategy diatur ke `static`, konektor Kafka mengurai nilai setiap pesan berdasarkan skema tabel awal dan tidak menghasilkan event perubahan skema. Jika schema.inference.strategy diatur ke `continuous`, konektor Kafka mengurai isi setiap pesan Kafka untuk mengekstrak kolom fisik dan membandingkannya dengan skema yang dipelihara saat ini. Jika skema yang diurai tidak konsisten dengan skema saat ini, konektor mencoba menggabungkan skema dan menghasilkan event perubahan skema tabel yang sesuai. Aturan penggabungan adalah sebagai berikut:

    • Jika kolom fisik yang diurai berisi bidang yang tidak ada dalam skema saat ini, bidang tersebut ditambahkan ke skema, dan event penambahan kolom nullable dihasilkan.

    • Jika kolom fisik yang diurai tidak berisi bidang yang sudah ada dalam skema saat ini, bidang tersebut dipertahankan, dan datanya diisi dengan NULL. Tidak ada event penghapusan kolom yang dihasilkan.

    • Jika ada kolom dengan nama yang sama di kedua skema, ditangani sesuai skenario berikut:

      • Jika tipenya sama tetapi presisinya berbeda, tipe dengan presisi lebih besar digunakan, dan event perubahan tipe kolom dihasilkan.

      • Jika tipenya berbeda, sistem menemukan node induk paling rendah dalam struktur pohon yang ditunjukkan pada gambar berikut untuk digunakan sebagai tipe kolom dengan nama yang sama, dan event perubahan tipe kolom dihasilkan.

        image

  • Strategi perubahan skema yang didukung saat ini adalah sebagai berikut:

    • Tambah kolom: Menambahkan kolom yang sesuai ke akhir skema saat ini dan menyinkronkan data kolom baru. Kolom baru diatur sebagai nullable.

    • Hapus kolom: Tidak menghasilkan event hapus kolom. Sebagai gantinya, data untuk kolom tersebut secara otomatis diisi dengan nilai NULL.

    • Ganti nama kolom: Ini dianggap sebagai penambahan dan penghapusan kolom. Kolom yang diganti namanya ditambahkan ke akhir skema saat ini, dan data kolom sebelum penggantian nama diisi dengan nilai NULL.

    • Perubahan tipe kolom:

      • Untuk sistem downstream yang mendukung perubahan tipe kolom, setelah sink downstream mendukung penanganan perubahan tipe kolom, pekerjaan integrasi data mendukung perubahan tipe kolom biasa, misalnya, dari INT ke BIGINT. Perubahan tersebut bergantung pada aturan perubahan tipe kolom yang didukung oleh sink downstream. Tabel sink yang berbeda mendukung aturan perubahan tipe kolom yang berbeda. Untuk informasi selengkapnya, lihat dokumentasi untuk tabel sink tertentu.

      • Untuk sistem downstream yang tidak mendukung perubahan tipe kolom, seperti Hologres, Anda dapat menggunakan pemetaan tipe luas. Ini berarti membuat tabel dengan tipe yang lebih luas di sistem downstream saat pekerjaan dimulai. Saat terjadi perubahan tipe kolom, sistem menentukan apakah sink downstream dapat menerima perubahan tersebut, sehingga memberikan dukungan toleran untuk perubahan tipe kolom.

  • Perubahan skema yang saat ini tidak didukung:

    • Perubahan pada kendala seperti kunci primer atau indeks.

    • Mengubah dari NOT NULL ke NULLABLE.

  • Penguraian skema untuk Canal JSON

    Data Canal JSON mungkin berisi bidang `sqlType` opsional, yang mencatat informasi tipe yang tepat untuk kolom data. Untuk mengambil skema yang lebih akurat, Anda dapat menggunakan tipe dalam `sqlType` dengan mengatur canal-json.infer-schema.strategy ke SQL_TYPE. Hubungan pemetaan tipe adalah sebagai berikut:

    Tipe JDBC

    Kode Tipe

    Tipe CDC

    BIT

    -7

    BOOLEAN

    BOOLEAN

    16

    TINYINT

    -6

    TINYINT

    SMALLINT

    -5

    SMALLINT

    INTEGER

    4

    INT

    BIGINT

    -5

    BIGINT

    DECIMAL

    3

    DECIMAL(38,18)

    NUMERIC

    2

    REAL

    7

    FLOAT

    FLOAT

    6

    DOUBLE

    8

    DOUBLE

    BINARY

    -2

    BYTES

    VARBINARY

    -3

    LONGVARBINARY

    -4

    BLOB

    2004

    DATE

    91

    DATE

    TIME

    92

    TIME

    TIMESTAMP

    93

    TIMESTAMP

    CHAR

    1

    STRING

    VARCHAR

    12

    LONGVARCHAR

    -1

    Tipe lainnya

Strategi pemetaan nama tabel ke topik

Saat menggunakan Kafka sebagai target untuk pekerjaan integrasi data, Anda perlu mengonfigurasi strategi pemetaan nama tabel ke topik dengan hati-hati. Hal ini karena format pesan Kafka yang ditulis (Debezium JSON atau Canal JSON) juga berisi informasi nama tabel, dan konsumsi pesan Kafka selanjutnya sering menggunakan informasi nama tabel dalam data sebagai nama tabel aktual (bukan nama topik).

Anggaplah Anda perlu menyinkronkan dua tabel, `mydb.mytable1` dan `mydb.mytable2`, dari MySQL. Strategi konfigurasi yang mungkin adalah sebagai berikut:

1. Jangan mengonfigurasi strategi pemetaan apa pun

Tanpa strategi pemetaan apa pun, setiap tabel ditulis ke topik dengan format "nama_database.nama_tabel". Oleh karena itu, data dari `mydb.mytable1` ditulis ke topik bernama `mydb.mytable1`, dan data dari `mydb.mytable2` ditulis ke topik bernama `mydb.mytable2`. Kode berikut memberikan contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

2. Konfigurasikan aturan route untuk pemetaan (tidak disarankan)

Dalam banyak skenario, pengguna tidak ingin topik yang ditulis berada dalam format "nama_database.nama_tabel". Mereka ingin menulis data ke topik tertentu. Oleh karena itu, mereka mengonfigurasi aturan route untuk pemetaan. Kode berikut memberikan contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}
  
 route:
  - source-table: mydb.mytable1,mydb.mytable2
    sink-table: mytable1

Dalam kasus ini, semua data dari `mydb.mytable1` dan `mydb.mytable2` ditulis ke topik `mytable1`.

Namun, saat Anda menggunakan aturan route untuk mengubah nama topik yang ditulis, hal ini juga memodifikasi informasi nama tabel dalam pesan Kafka (format Debezium JSON atau Canal JSON). Dalam kasus ini, semua nama tabel dalam pesan Kafka menjadi `mytable1`. Hal ini dapat menyebabkan perilaku yang tidak diinginkan saat sistem lain mengonsumsi pesan Kafka dari topik ini.

3. Konfigurasikan parameter sink.tableId-to-topic.mapping untuk pemetaan (disarankan)

Untuk mengonfigurasi aturan pemetaan nama tabel ke topik sambil mempertahankan informasi nama tabel sumber, gunakan parameter `sink.tableId-to-topic.mapping`. Kode berikut memberikan contoh konfigurasi:

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1,mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

atau

source:
  type: mysql
  name: MySQL Source
  hostname: ${secret_values.mysql.hostname}
  port: ${mysql.port}
  username: ${secret_values.mysql.username}
  password: ${secret_values.mysql.password}
  tables: mydb.mytable1,mydb.mytable2
  server-id: 8601-8604
  sink.tableId-to-topic.mapping: mydb.mytable1:mytable;mydb.mytable2:mytable

sink:
  type: kafka
  name: Kafka Sink
  properties.bootstrap.servers: ${kafka.bootstraps.server}

Dalam kasus ini, semua data dari `mydb.mytable1` dan `mydb.mytable2` ditulis ke topik `mytable1`. Informasi nama tabel dalam pesan Kafka (format Debezium JSON atau Canal JSON) tetap `mydb.mytable1` atau `mydb.mytable2`. Saat sistem lain mengonsumsi pesan Kafka dari topik ini, mereka dapat memperoleh informasi nama tabel sumber dengan benar.

Pertimbangan untuk semantik EXACTLY_ONCE

  • Konfigurasikan tingkat isolasi konsumen

    Semua aplikasi yang mengonsumsi data Kafka harus mengatur `isolation.level`:

    • read_committed: Hanya membaca data yang telah dikomit.

    • read_uncommitted (default): Dapat membaca data yang belum dikomit.

    EXACTLY_ONCE bergantung pada read_committed. Jika tidak, konsumen mungkin melihat data yang belum dikomit, yang merusak konsistensi.

  • Timeout transaksi dan kehilangan data

    Saat Flink pulih dari checkpoint, Flink hanya bergantung pada transaksi yang telah dikomit sebelum checkpoint tersebut dimulai. Jika waktu antara kegagalan pekerjaan dan restart-nya melebihi timeout transaksi Kafka, Kafka secara otomatis membatalkan transaksi tersebut, menyebabkan kehilangan data.

    • transaction.max.timeout.ms default broker Kafka adalah 15 menit.

    • transaction.timeout.ms default sink Kafka Flink adalah 1 jam.

    • Anda harus meningkatkan transaction.max.timeout.ms di sisi broker agar tidak kurang dari pengaturan Flink.

  • Kolam produsen dan checkpoint konkuren

    Mode EXACTLY_ONCE menggunakan kolam produsen Kafka dengan ukuran tetap. Setiap checkpoint menempati satu produsen dari kolam. Jika jumlah checkpoint konkuren melebihi ukuran kolam, pekerjaan akan gagal.

    Sesuaikan ukuran kolam produsen berdasarkan jumlah maksimum checkpoint konkuren.

  • Batasan penskalaan turun tingkat paralelisme

    Jika pekerjaan gagal sebelum checkpoint pertama, informasi kolam produsen asli tidak dipertahankan saat restart. Oleh karena itu, jangan mengurangi tingkat paralelisme pekerjaan sebelum checkpoint pertama selesai. Jika Anda harus menskala turun, tingkat paralelisme tidak boleh lebih rendah dari FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR.

  • Pembacaan pemblokiran transaksi

    Dalam mode read_committed, transaksi terbuka apa pun (tidak dikomit maupun dibatalkan) memblokir pembacaan dari seluruh topik.

    Misalnya:

    • Transaksi 1 menulis data.

    • Transaksi 2 menulis dan mengkomit data.

    • Selama Transaksi 1 belum selesai, data dari Transaksi 2 tidak terlihat oleh konsumen.

    Oleh karena itu:

    • Saat operasi normal, latensi visibilitas data kira-kira sama dengan interval checkpoint.

    • Saat pekerjaan gagal, topik yang sedang ditulis akan memblokir konsumen sampai pekerjaan dijalankan ulang atau transaksi timeout. Dalam kasus ekstrem, timeout transaksi bahkan dapat memengaruhi pembacaan.

FAQ