All Products
Search
Document Center

Realtime Compute for Apache Flink:Upsert Kafka

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara menggunakan konektor Upsert Kafka.

Informasi latar belakang

Konektor Upsert Kafka membaca data dari dan menulis data ke topik Kafka menggunakan operasi upsert.

  • Sebagai tabel sumber, konektor dapat mengonversi data yang disimpan di Kafka menjadi aliran changelog. Setiap catatan dalam aliran tersebut merepresentasikan event pembaruan atau penghapusan. Nilai dalam sebuah catatan data diinterpretasikan sebagai UPDATE terhadap nilai terakhir dengan kunci yang sama, jika kunci tersebut ada. Jika kunci tidak ada, pembaruan tersebut diperlakukan sebagai INSERT. Dalam istilah tabel, sebuah catatan dalam aliran changelog merupakan UPSERT—juga dikenal sebagai INSERT atau UPDATE—karena setiap baris yang ada dengan kunci yang sama akan ditimpa. Pesan dengan nilai kosong diperlakukan sebagai pesan DELETE.

  • Sebagai tabel sink atau sink ingesti data, konektor dapat mengonsumsi aliran changelog yang dihasilkan oleh pekerjaan hulu. Konektor menulis data INSERT atau UPDATE_AFTER sebagai pesan Kafka biasa, sedangkan data DELETE ditulis sebagai pesan Kafka dengan nilai kosong, yang menunjukkan bahwa pesan untuk kunci yang bersangkutan telah dihapus. Flink mempartisi data berdasarkan nilai kolom kunci primer, sehingga memastikan bahwa pesan dengan kunci primer yang sama tetap terurut. Akibatnya, pesan pembaruan atau penghapusan untuk kunci primer yang sama ditulis ke partisi yang sama.

Kategori

Deskripsi

Jenis yang didukung

Tabel sumber, tabel sink, dan sink ingesti data

Mode eksekusi

Mode streaming

Format data

avro, avro-confluent, csv, json, dan raw

Metrik pemantauan spesifik

  • Tabel sumber

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Tabel sink

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Jenis API

SQL dan pekerjaan YAML ingesti data

Pembaruan atau penghapusan data dalam tabel sink

Ya

Prasyarat

Batasan

  • Konektor Kafka hanya didukung oleh Flink yang menggunakan Ververica Runtime (VVR) 2.0.0 atau versi yang lebih baru.

  • Konektor hanya mendukung pembacaan dari dan penulisan ke Apache Kafka 0.10 atau versi yang lebih baru.

  • Konektor hanya mendukung parameter client Apache Kafka 2.8. Untuk informasi selengkapnya, lihat dokumentasi konfigurasi consumer dan producer dari Apache Kafka.

  • Jika tabel sink Upsert Kafka menggunakan semantik tepat-sekali (exactly-once semantics), fitur transaksi harus diaktifkan pada kluster Kafka tujuan. Kluster tersebut harus merupakan Apache Kafka 0.11 atau versi yang lebih baru.

  • Tabel sumber Upsert Kafka hanya mendukung mode startup earliest-offset. Mode ini tidak dapat dikonfigurasi. Konektor membaca semua data perubahan historis untuk mendapatkan changelog lengkap, sehingga memastikan Anda dapat memproses changelog tersebut secara utuh dalam SQL dan seluruh pipeline menyediakan semantik tepat-sekali. Jika Anda menentukan mode startup lain, seperti berdasarkan timestamp atau latest-offset, konektor akan membaca changelog yang tidak lengkap, yang dapat menyebabkan masalah kebenaran data dalam komputasi hilir.

SQL

Konektor Upsert Kafka membaca data dari dan menulis data ke topik Kafka menggunakan operasi upsert.

Sintaksis

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

DENGAN parameter

  • Umum

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Bawaan

    Keterangan

    connector

    Jenis tabel.

    String

    Ya

    None

    Atur nilainya ke `upsert-kafka`.

    properties.bootstrap.servers

    Alamat broker Kafka.

    String

    Ya

    None

    Formatnya adalah host:port,host:port,host:port. Pisahkan beberapa alamat dengan koma (,).

    properties.*

    Parameter untuk client Kafka.

    String

    Tidak

    None

    Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk producer dan consumer.

    Flink menghapus awalan `properties.` dan meneruskan konfigurasi yang tersisa ke client Kafka. Misalnya, Anda dapat menggunakan 'properties.allow.auto.create.topics' = 'false' untuk menonaktifkan pembuatan topik otomatis.

    Jangan ubah konfigurasi berikut dengan cara ini, karena konektor Kafka akan menimpanya:

    • key.deserializer

    • value.deserializer

    key.format

    Format untuk bagian kunci pesan Kafka.

    String

    Ya

    None

    Jika Anda mengonfigurasi parameter ini, Anda juga harus mengonfigurasi key.fields atau key.fields-prefix.

    Nilai yang valid:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Hal ini menghindari konflik nama dengan bidang dalam format nilai.

    String

    Tidak

    None

    Parameter ini hanya digunakan untuk membedakan nama kolom antara tabel sumber dan tabel sink. Awalan dihapus saat bagian kunci pesan Kafka diurai dan dihasilkan.

    Catatan

    Jika Anda mengonfigurasi parameter ini, Anda harus mengatur value.fields-include ke `EXCEPT_KEY`.

    value.format

    Format untuk bagian nilai pesan Kafka.

    String

    Ya

    None

    Parameter ini setara dengan format. Anda hanya dapat mengonfigurasi salah satunya. Konflik terjadi jika Anda mengonfigurasi kedua parameter format dan value.format.

    value.fields-include

    Menentukan apakah bidang yang sesuai dengan kunci pesan disertakan saat bagian nilai pesan Kafka diurai atau dihasilkan.

    String

    Ya

    ALL

    Nilai yang valid:

    • ALL (Default): Semua kolom diproses sebagai bagian nilai pesan Kafka.

    • EXCEPT_KEY: Semua kolom kecuali bidang yang didefinisikan oleh `key.fields` diproses sebagai bagian nilai pesan Kafka.

    topic

    Nama topik untuk dibaca atau ditulis.

    String

    Ya

    None

    None.

  • Parameter khusus sink

    Parameter

    Deskripsi

    Tipe data

    Wajib

    Bawaan

    Keterangan

    sink.parallelism

    Konkurensi operator sink Kafka.

    Integer

    Tidak

    Konkurensi operator hulu, yang ditentukan oleh framework.

    None.

    sink.buffer-flush.max-rows

    Jumlah maksimum catatan yang dapat di-cache sebelum cache dikosongkan.

    Integer

    Tidak

    0 (dinonaktifkan)

    Jika tabel sink menerima banyak pembaruan pada kunci yang sama, cache hanya menyimpan catatan terakhir untuk kunci tersebut. Hal ini membantu mengurangi jumlah data yang dikirim ke topik Kafka dan menghindari potensi pesan tombstone.

    Catatan

    Untuk mengaktifkan caching sink, atur kedua parameter sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari nol.

    sink.buffer-flush.interval

    Interval pengosongan cache.

    Duration

    Tidak

    0 (dinonaktifkan)

    Unitnya dapat berupa milidetik (ms), detik (s), menit (min), atau jam (h). Contohnya, 'sink.buffer-flush.interval'='1 s'.

    Jika tabel sink menerima banyak pembaruan pada kunci yang sama, cache hanya menyimpan catatan terakhir untuk kunci tersebut. Hal ini membantu mengurangi jumlah data yang dikirim ke topik Kafka dan menghindari potensi pesan tombstone.

    Catatan

    Untuk mengaktifkan caching sink, atur kedua parameter sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari nol.

Ingesti data

Konektor Upsert Kafka dapat digunakan sebagai sink dalam pekerjaan ingesti data YAML. Data ditulis dalam format JSON, dan bidang kunci primer juga disertakan dalam isi pesan.

Sintaksis

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # ApsaraMQ for Kafka
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

Parameter

Parameter

Deskripsi

Tipe data

Wajib

Bawaan

Keterangan

type

Jenis sink.

STRING

Ya

None

Atur nilainya ke `upsert-kafka`.

name

Nama sink.

STRING

Tidak

None

None.

properties.bootstrap.servers

Alamat broker Kafka.

STRING

Ya

None

Formatnya adalah host:port,host:port,host:port. Pisahkan beberapa alamat dengan koma (,).

properties.*

Parameter untuk client Kafka.

STRING

Tidak

None

Akhiran harus merupakan konfigurasi yang didefinisikan dalam dokumentasi resmi Kafka untuk producer.

Flink menghapus awalan `properties.` dan meneruskan konfigurasi yang tersisa ke client Kafka. Misalnya, Anda dapat menggunakan 'properties.allow.auto.create.topics' = 'false' untuk menonaktifkan pembuatan topik otomatis.

sink.delivery-guarantee

Pola semantik untuk operasi penulisan.

STRING

Tidak

at-least-once

Nilai yang valid:

  • none: Tidak ada jaminan. Data dapat hilang atau duplikat.

  • at-least-once (Default): Menjamin tidak ada data yang hilang, tetapi data dapat duplikat.

  • exactly-once: Menggunakan transaksi Kafka untuk menjamin bahwa data tidak hilang atau duplikat.

sink.add-tableId-to-header-enabled

Menentukan apakah informasi tabel ditulis ke header.

BOOLEAN

Tidak

false

Jika diaktifkan, `namespace`, `schemaName`, dan `tableName` ditulis ke header.

aliyun.kafka.accessKeyId

ID AccessKey Akun Alibaba Cloud Anda.

STRING

Tidak

None

Untuk informasi selengkapnya, lihat Buat pasangan Kunci Akses.

Catatan

Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.accessKeySecret

Rahasia AccessKey Akun Alibaba Cloud Anda.

STRING

Tidak

None

Untuk informasi selengkapnya, lihat Buat pasangan Kunci Akses.

Catatan

Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.instanceId

ID instans ApsaraMQ for Kafka.

STRING

Tidak

None

Anda dapat melihat detail instans di antarmuka Alibaba Cloud Kafka.

Catatan

Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.endpoint

Titik akhir API untuk ApsaraMQ for Kafka.

STRING

Tidak

None

Untuk informasi selengkapnya, lihat Titik akhir.

Catatan

Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.regionId

ID wilayah instans tempat topik berada.

STRING

Tidak

None

Untuk informasi selengkapnya, lihat Titik akhir.

Catatan

Konfigurasikan parameter ini saat Anda menyinkronkan data ke ApsaraMQ for Kafka.

Perubahan tipe yang didukung

Konektor Upsert Kafka untuk ingesti data mendukung semua jenis operasi perubahan. Namun, untuk membaca data tersebut, Anda harus menggunakan konektor SQL Flink Upsert Kafka dengan skema tetap.

Contoh

  • Tabel sumber

    Buat tabel sumber Kafka yang berisi data penelusuran pengguna website.

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • Tabel sink

    • Buat tabel sink Upsert Kafka.

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • Tulis data penelusuran pengguna website ke tabel sink.

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCT user_id)
      FROM pageviews
      GROUP BY user_region;
  • sink pengambilan data

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

Praktik terbaik