全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Upsert Kafka

更新时间:Jul 06, 2025

Topik ini menjelaskan cara menggunakan Konektor Upsert Kafka.

Informasi latar belakang

Konektor Upsert Kafka digunakan untuk membaca dan menulis data ke topik Kafka dengan metode upsert.

  • Untuk tabel sumber, Konektor Upsert Kafka mengonversi data dalam topik Kafka menjadi aliran log perubahan. Setiap catatan dalam aliran log mewakili pembaruan atau penghapusan. Jika kunci dalam topik Kafka cocok dengan kunci dalam catatan data, nilai dari catatan tersebut akan menimpa nilai yang ada, diinterpretasikan sebagai UPDATE. Jika tidak ada kunci yang cocok, nilai baru dimasukkan, diinterpretasikan sebagai INSERT. Semua catatan diinterpretasikan sebagai UPSERT (INSERT atau UPDATE). Jika nilai kunci adalah null, catatan diinterpretasikan sebagai DELETE.

  • Untuk tabel sink atau sink ingest data, Konektor Upsert Kafka mengonsumsi aliran log perubahan dari sumber. Data INSERT dan UPDATE_AFTER ditulis sebagai pesan Kafka normal, sedangkan data DELETE ditulis sebagai pesan Kafka dengan nilai null. Jika nilai kunci dalam catatan adalah null, pesan Kafka terkait dihapus. Flink mempartisi data berdasarkan kolom kunci utama, memastikan bahwa pesan dengan kunci utama yang sama diurutkan dan ditulis ke partisi yang sama.

Item

Deskripsi

Tipe tabel

Tabel sumber, tabel sink, sink ingest data

Mode operasi

Mode streaming

Format data

avro, avro-confluent, csv, json, dan raw

Metrik

  • Metrik untuk tabel sumber

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerScond

    • currentEmitEventTimeLag

    • currentFetchEventTimeLag

    • sourceIdleTime

    • pendingRecords

  • Metrik untuk tabel sink

    • numRecordsOut

    • numRecordsOutPerSecond

    • numBytesOut

    • numBytesOutPerSecond

    • currentSendTime

Tipe API

SQL API dan YAML API ingest data

Pembaruan atau penghapusan data dalam tabel sink

Ya

Prasyarat

Batasan

  • Hanya Realtime Compute for Apache Flink dengan Ververica Runtime (VVR) 2.0.0 atau versi lebih baru yang mendukung konektor Apache Kafka.

  • Konektor Upsert Kafka hanya mendukung pembacaan dan penulisan data dari Apache Kafka 0.10 atau versi lebih baru.

  • Konektor Upsert Kafka hanya mendukung opsi klien Apache Kafka 2.8. Untuk detail konfigurasi produsen dan konsumen Kafka, lihat Konfigurasi Konsumen dan Konfigurasi Produsen.

  • Jika tabel sink Upsert Kafka menggunakan semantik exactly-once, mekanisme transaksi Kafka harus diaktifkan. Versi kluster Kafka harus Apache Kafka 0.11 atau lebih baru.

SQL

Konektor Upsert Kafka digunakan untuk membaca dan menulis data ke topik Kafka dengan metode upsert.

Sintaks

CREATE TABLE upsert_kafka_sink(
user_region STRING,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(user_region) NOT ENFORCED
)WITH(
'connector'='upsert-kafka',
'topic'='<yourTopicName>',
'properties.bootstrap.servers'='...',
'key.format'='avro',
'value.format'='avro'
);

Opsi konektor dalam klausa WITH

  • Opsi Umum

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Tipe tabel.

    String

    Ya

    Tidak ada nilai default

    Setel nilainya menjadi upsert-kafka.

    properties.bootstrap.servers

    Alamat IP atau titik akhir dan nomor port broker Kafka.

    String

    Ya

    Tidak ada nilai default

    Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

    properties.*

    Opsi yang dikonfigurasi untuk klien Kafka.

    String

    Tidak

    Tidak ada nilai default

    Akhiran opsi ini harus sesuai dengan aturan yang didefinisikan di Konfigurasi Produsen dan Konfigurasi Konsumen.

    Flink menghapus awalan properties. dan meneruskan kunci dan nilai yang telah diubah ke klien Kafka. Sebagai contoh, Anda dapat mengatur properties.allow.auto.create.topics menjadi false untuk menonaktifkan pembuatan topik otomatis.

    Anda tidak dapat memodifikasi konfigurasi opsi berikut dengan menambahkan awalan properties., karena nilai opsi tersebut ditimpa setelah Anda menggunakan konektor Upsert Kafka:

    • key.deserializer

    • value.deserializer

    key.format

    Format yang digunakan untuk membaca atau menulis bidang kunci pesan Kafka.

    String

    Ya

    Tidak ada nilai default

    Anda harus mengonfigurasi opsi key.fields atau key.fields-prefix jika Anda mengonfigurasi opsi ini.

    Nilai valid:

    • csv

    • json

    • avro

    • debezium-json

    • canal-json

    • maxwell-json

    • avro-confluent

    • raw

    key.fields-prefix

    Awalan kustom untuk semua bidang kunci dalam pesan Kafka. Anda dapat mengonfigurasi parameter ini untuk mencegah konflik nama dengan bidang nilai.

    String

    Tidak

    Tidak ada nilai default

    Opsi ini hanya digunakan untuk membedakan nama kolom tabel sumber dan tabel sink. Awalan dihapus dari nama kolom saat bidang kunci pesan Kafka diurai dan dibuat.

    Catatan

    Jika Anda mengonfigurasi opsi ini, Anda harus mengatur opsi value.fields-include menjadi EXCEPT_KEY.

    value.format

    Format yang digunakan untuk membaca atau menulis bidang nilai pesan Kafka.

    String

    Ya

    Tidak ada nilai default

    Konfigurasi opsi ini setara dengan konfigurasi opsi format. Opsi format tidak dapat digunakan bersama dengan opsi value.format. Jika Anda mengonfigurasi kedua opsi tersebut, terjadi konflik.

    value.fields-include

    Menentukan apakah akan menyertakan bidang yang sesuai dengan kunci pesan saat bidang nilai pesan Kafka diurai atau dibuat.

    String

    Ya

    ALL

    Nilai valid:

    • ALL: Semua bidang diproses sebagai bidang nilai pesan Kafka. Ini adalah nilai default.

    • EXCEPT_KEY: Semua bidang kecuali bidang yang ditentukan oleh opsi key.fields diproses sebagai bidang nilai pesan Kafka.

    topic

    Nama topik tempat data dibaca atau ke mana data ditulis.

    String

    Ya

    Tidak ada nilai default

    N/A

  • Opsi Eksklusif Sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    sink.parallelism

    Paralelisme operator dalam tabel sink Kafka.

    Integer

    Tidak

    Paralelisme operator hulu, yang ditentukan oleh kerangka kerja.

    N/A

    sink.buffer-flush.max-rows

    Jumlah maksimum catatan data yang dapat di-cache sebelum cache diperbarui.

    Integer

    Tidak

    0 (dinonaktifkan)

    Jika tabel sink menerima sejumlah besar pembaruan pada kunci yang sama, hanya catatan data terakhir dari kunci tersebut yang disimpan dalam cache. Dalam hal ini, caching data dalam tabel sink membantu mengurangi jumlah data yang ditulis ke topik Kafka. Ini mencegah pesan nisan potensial dikirim ke topik Kafka.

    Catatan

    Jika Anda ingin mengaktifkan caching data untuk tabel sink, Anda harus mengatur opsi sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari 0.

    sink.buffer-flush.interval

    Interval pembaruan cache.

    Durasi

    Tidak

    0 (dinonaktifkan)

    Unit bisa milidetik, detik, menit, atau jam. Sebagai contoh, Anda dapat mengonfigurasi 'sink.buffer-flush.interval'='1 s'.

    Jika tabel sink menerima sejumlah besar pembaruan pada kunci yang sama, hanya catatan data terakhir dari kunci tersebut yang disimpan dalam cache. Dalam hal ini, caching data dalam tabel sink membantu mengurangi jumlah data yang ditulis ke topik Kafka. Ini mencegah pesan nisan potensial dikirim ke topik Kafka.

    Catatan

    Jika Anda ingin mengaktifkan caching data untuk tabel sink, Anda harus mengatur opsi sink.buffer-flush.max-rows dan sink.buffer-flush.interval ke nilai yang lebih besar dari 0.

Ingest data

Konektor Upsert Kafka dapat digunakan untuk mengembangkan draft YAML untuk ingest data dan sebagai sink untuk menulis data dalam format JSON, termasuk bidang kunci utama dalam tubuh pesan.

Sintaks

sink:
  type: upsert-kafka
  name: upsert-kafka Sink
  properties.bootstrap.servers: localhost:9092
  # ApsaraMQ for Kafka
  aliyun.kafka.accessKeyId: ${secret_values.kafka-ak}
  aliyun.kafka.accessKeySecret: ${secret_values.kafka-sk}
  aliyun.kafka.instanceId: ${instancd-id}
  aliyun.kafka.endpoint: ${endpoint}
  aliyun.kafka.regionId: ${region-id}

Opsi konektor

Opsi

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

type

Tipe konektor sink

STRING

Ya

Tidak ada nilai default

Setel opsi menjadi upsert-kafka.

name

Nama konektor sink

STRING

Tidak

Tidak ada nilai default

N/A

properties.bootstrap.servers

Alamat IP atau titik akhir dan nomor port broker Kafka.

STRING

Ya

Tidak ada nilai default

Format: host:port,host:port,host:port. Pisahkan beberapa pasangan host:port dengan koma (,).

properties.*

Opsi yang dikonfigurasi untuk klien Kafka.

STRING

Tidak

Tidak ada nilai default

Akhiran opsi ini harus sesuai dengan aturan yang didefinisikan di Konfigurasi Produsen.

Flink menghapus awalan properties. dan meneruskan kunci dan nilai yang telah diubah ke klien Kafka. Sebagai contoh, Anda dapat mengatur properties.allow.auto.create.topics menjadi false untuk menonaktifkan pembuatan topik otomatis.

sink.delivery-guarantee

Semantik pengiriman untuk sink Kafka

STRING

Tidak

at-least-once

Nilai valid:

  • none: Semantik pengiriman tidak menjamin apa pun. Data mungkin hilang atau diduplikasi.

  • at-least-once: Semantik at-least-once memastikan bahwa tidak ada data yang hilang. Namun, data mungkin diduplikasi. Ini adalah nilai default.

  • exactly-once: Transaksi Kafka digunakan untuk memastikan semantik exactly-once. Ini memastikan bahwa data tidak hilang atau diduplikasi.

sink.add-tableId-to-header-enabled

Apakah akan menulis informasi tabel ke header.

BOOLEAN

Tidak

false

Jika Anda mengatur opsi menjadi true, namespace, schemaName, dan tableName ditulis ke header.

aliyun.kafka.accessKeyId

ID AccessKey akun Alibaba Cloud Anda.

STRING

Tidak

Tidak ada nilai default

Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey.

Catatan

Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.accessKeySecret

Rahasia AccessKey akun Alibaba Cloud.

STRING

Tidak

Tidak ada nilai default

Untuk informasi lebih lanjut, lihat Dapatkan pasangan AccessKey.

Catatan

Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.instanceId

ID instance ApsaraMQ for Kafka

STRING

Tidak

Tidak ada nilai default

Anda dapat melihat ID instance di halaman Detail Instance.

Catatan

Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.endpoint

Titik akhir ApsaraMQ for Kafka.

STRING

Tidak

Tidak ada nilai default

Untuk informasi lebih lanjut, lihat Titik Akhir.

Catatan

Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka.

aliyun.kafka.regionId

ID wilayah instance tempat Anda ingin membuat topik.

STRING

Tidak

Tidak ada nilai default

Untuk informasi lebih lanjut, lihat Titik Akhir.

Catatan

Anda harus mengonfigurasi opsi ini saat menyinkronkan data ke ApsaraMQ for Kafka.

Tipe perubahan yang didukung

Konektor Upsert Kafka mendukung semua tipe perubahan untuk pemasukan data. Namun, perlu diperhatikan bahwa data harus dimasukkan dari tabel sumber dengan skema tetap melalui pekerjaan Flink SQL menggunakan Konektor Upsert Kafka.

Kode sampel

  • Kode sampel untuk tabel sumber

    Buat tabel sumber Kafka yang berisi data penjelajahan pengguna situs web.

    CREATE TABLE pageviews(
    user_id BIGINT,
    page_id BIGINT,
    viewtime TIMESTAMP,
    user_region STRING,
    WATERMARK FOR viewtime AS viewtime - INTERVAL '2' SECOND
    )WITH(
    'connector'='kafka',
    'topic'='<yourTopicName>',
    'properties.bootstrap.servers'='...',
    'format'='json'
    );
  • Kode sampel untuk tabel sink

    • Buat tabel sink Upsert Kafka.

      CREATE TABLE pageviews_per_region(
      user_region STRING,
      pv BIGINT,
      uv BIGINT,
      PRIMARY KEY(user_region) NOT ENFORCED
      )WITH(
      'connector'='upsert-kafka',
      'topic'='<yourTopicName>',
      'properties.bootstrap.servers'='...',
      'key.format'='avro',
      'value.format'='avro'
      );
    • Tulis data penjelajahan pengguna situs web ke tabel sink.

      INSERT INTO pageviews_per_region
      SELECT
      user_region,
      COUNT(*),
      COUNT(DISTINCTuser_id)
      FROM pageviews
      GROUP BY user_region;
  • Sink Ingest Data

    source:
      type: mysql
      name: MySQL Source
      hostname: ${mysql.hostname}
      port: ${mysql.port}
      username: ${mysql.username}
      password: ${mysql.password}
      tables: ${mysql.source.table}
      server-id: 8601-8604
    
    sink:
      type: upsert-kafka
      name: Upsert Kafka Sink
      properties.bootstrap.servers: ${upsert.kafka.bootstraps.server}
      aliyun.kafka.accessKeyId: ${upsert.kafka.aliyun.ak}
      aliyun.kafka.accessKeySecret: ${upsert.kafka.aliyun.sk}
      aliyun.kafka.instanceId: ${upsert.kafka.aliyun.instanceid}
      aliyun.kafka.endpoint: ${upsert.kafka.aliyun.endpoint}
      aliyun.kafka.regionId: ${upsert.kafka.aliyun.regionid}
    
    route:
      - source-table: ${mysql.source.table}
        sink-table: ${upsert.kafka.topic}

Referensi