全部产品
Search
文档中心

ApsaraDB for ClickHouse:Menyinkronkan data dari instance ApsaraMQ for Kafka

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menyinkronkan data dari instance ApsaraMQ for Kafka ke kluster ApsaraDB for ClickHouse secara real-time.

Batasan

Hanya data dari instance ApsaraMQ for Kafka atau kluster Kafka yang dikelola sendiri yang dijalankan pada instance Elastic Compute Service (ECS) yang dapat disinkronkan ke instance ApsaraDB for ClickHouse.

Prasyarat

  • Kluster tujuan ApsaraDB for ClickHouse:

    • Kluster tujuan telah dibuat. Instance sumber ApsaraMQ for Kafka dan kluster tujuan berada di wilayah yang sama serta menggunakan virtual private cloud (VPC) yang sama. Untuk informasi lebih lanjut, lihat Membuat Kluster ApsaraDB for ClickHouse.

    • Akun telah dibuat untuk mengakses database di kluster tujuan dan memiliki izin untuk melakukan operasi pada database. Untuk informasi lebih lanjut, lihat Manajemen Akun.

  • Instance sumber ApsaraMQ for Kafka:

Catatan penggunaan

  • Jika sebuah topik berlangganan oleh tabel eksternal Kafka ApsaraDB for ClickHouse, topik tersebut tidak dapat dikonsumsi oleh konsumen lainnya.

  • Saat membuat tabel eksternal Kafka, tampilan materialisasi, dan tabel lokal, tipe bidang dalam ketiga tabel harus sesuai.

Prosedur

Contoh ini menunjukkan cara menyinkronkan data dari instance ApsaraMQ for Kafka ke tabel terdistribusi kafka_table_distributed di database default dari kluster ApsaraDB for ClickHouse yang menjalankan Edisi Kompatibel Komunitas.

Langkah 1: Memahami prinsip sinkronisasi

ApsaraDB for ClickHouse menggunakan mesin tabel Kafka bawaan dan mekanisme tampilan materialisasi untuk menyinkronkan data dari instance ApsaraMQ for Kafka. Ini mencapai konsumsi dan penyimpanan data secara real-time. Gambar berikut menunjukkan proses sinkronisasi.

  • Topik Kafka: Menentukan data sumber yang akan disinkronkan.

  • Tabel eksternal Kafka ApsaraDB for ClickHouse: Menarik data sumber dari topik Kafka tertentu.

  • Tampilan materialisasi: Membaca data sumber dari tabel eksternal Kafka dan memasukkan data ke tabel lokal ApsaraDB for ClickHouse.

  • Tabel lokal: Menyimpan data yang disinkronkan.

Langkah 2: Masuk ke database ApsaraDB for ClickHouse

Untuk informasi lebih lanjut tentang cara masuk ke database ApsaraDB for ClickHouse, lihat Gunakan DMS untuk Terhubung ke Kluster ApsaraDB for ClickHouse.

Langkah 3: Membuat tabel eksternal Kafka

ApsaraDB for ClickHouse menggunakan mesin tabel Kafka bawaan untuk menarik data sumber dari topik Kafka tertentu. Tabel eksternal Kafka memiliki fitur-fitur berikut:

  • Secara default, tabel eksternal Kafka tidak dapat di-query langsung.

  • Tabel eksternal Kafka hanya digunakan untuk mengonsumsi data Kafka dan tidak menyimpan data. Anda harus menggunakan tampilan materialisasi untuk memproses data dan memasukkan data ke tabel tujuan untuk penyimpanan.

Pernyataan contoh untuk membuat tabel:

Penting

Format bidang tabel eksternal Kafka harus sesuai dengan tipe data Kafka.

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'host:port1,host:port2,host:port3',
    kafka_topic_list = 'topic_name1,topic_name2,...',
    kafka_group_name = 'group_name',
    kafka_format = 'data_format'[,]
    [kafka_row_delimiter = 'delimiter_symbol',]
    [kafka_num_consumers = N,]
    [kafka_thread_per_consumer = 1,]
    [kafka_max_block_size = 0,]
    [kafka_skip_broken_messages = N,]
    [kafka_commit_every_batch = 0,]
    [kafka_auto_offset_reset = N]

Tabel berikut menjelaskan parameter dalam pernyataan sebelumnya.

Parameter

Diperlukan

Deskripsi

kafka_broker_list

Ya

Titik akhir yang digunakan untuk terhubung ke instance ApsaraMQ for Kafka. Pisahkan titik akhir dengan koma (,). Untuk informasi lebih lanjut tentang cara melihat titik akhir, lihat Lihat titik akhir.

  • Jika Anda menggunakan instance ApsaraMQ for Kafka, ApsaraDB for ClickHouse secara default mem-parsing nama domain instance tersebut.

  • Jika Anda menggunakan kluster Kafka yang dikelola sendiri, ApsaraDB for ClickHouse terhubung ke kluster tersebut menggunakan alamat IP atau nama domain kustom dalam format tetap. Aturan nama domain yang didukung adalah sebagai berikut:

    1. Nama domain yang berakhiran .com.

    2. Nama domain yang berakhiran .local dan mengandung salah satu kata kunci berikut: kafka, mysql, dan rabbitmq.

kafka_topic_list

Ya

Nama topik dalam instance ApsaraMQ for Kafka. Pisahkan nama topik dengan koma (,). Untuk informasi lebih lanjut tentang cara melihat nama topik, lihat Langkah 1: Membuat topik.

kafka_group_name

Ya

Nama grup konsumen dalam instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 2: Membuat grup.

kafka_format

Ya

Format badan pesan yang didukung oleh ApsaraDB for ClickHouse.

Catatan

Untuk informasi lebih lanjut tentang format badan pesan yang didukung oleh ApsaraDB for ClickHouse, lihat Format untuk Data Input dan Output.

kafka_row_delimiter

Tidak

Pemisah baris yang digunakan untuk memisahkan baris. Nilai default adalah \n. Anda juga dapat mengatur parameter ini berdasarkan format pembatas aktual tempat data ditulis.

kafka_num_consumers

Tidak

Jumlah konsumen yang mengonsumsi data dalam tabel. Nilai default: 1.

Catatan
  • Jika throughput satu konsumen tidak cukup, tentukan jumlah konsumen yang lebih besar.

  • Jumlah konsumen tidak boleh melebihi jumlah partisi dalam topik karena hanya satu konsumen yang dapat ditetapkan per partisi.

kafka_thread_per_consumer

Tidak

Menentukan apakah setiap konsumen memulai thread independen untuk konsumsi. Nilai default: 0. Nilai valid:

  • 0: Semua konsumen menggunakan satu thread untuk konsumsi.

  • 1: Setiap konsumen memulai thread independen untuk konsumsi.

Untuk informasi lebih lanjut tentang cara meningkatkan kecepatan konsumsi, lihat Optimasi Kinerja Kafka.

kafka_max_block_size

Tidak

Ukuran maksimum pesan Kafka yang dapat ditulis ke tabel dalam setiap batch. Nilai default: 65536. Unit: byte.

kafka_skip_broken_messages

Tidak

Toleransi parser pesan Kafka terhadap data kotor. Nilai default: 0. Jika Anda mengatur kafka_skip_broken_messages=N menjadi N, mesin Kafka melewati N pesan Kafka yang tidak dapat di-parse. Satu pesan setara dengan satu baris data.

kafka_commit_every_batch

Tidak

Menentukan seberapa sering operasi commit dilakukan. Nilai default: 0. Nilai valid:

  • 0: Operasi commit dilakukan hanya setelah seluruh blok ditulis.

  • 1: Operasi commit dilakukan setelah satu batch data ditulis.

kafka_auto_offset_reset

Tidak

Offset dari mana data dalam instance ApsaraMQ for Kafka yang digunakan sebagai sumber data dikonsumsi. Nilai valid:

  • earliest (nilai default): Data dalam instance ApsaraMQ for Kafka dikonsumsi dari offset paling awal.

  • latest: Data dalam instance ApsaraMQ for Kafka dikonsumsi dari offset paling baru.

Catatan

Parameter ini tidak didukung oleh kluster ApsaraDB for ClickHouse versi 21.8.

Untuk informasi lebih lanjut tentang parameter, lihat Kafka.

Pernyataan contoh:

CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- Bidang yang mendefinisikan skema tabel
    id Int32,
    name String               
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
    kafka_topic_list = 'testforCK',
    kafka_group_name = 'GroupForTestCK',
    kafka_format = 'CSV';

Langkah 4: Membuat tabel tujuan untuk penyimpanan

Pernyataan pembuatan tabel bervariasi berdasarkan edisi kluster.

Untuk kluster Edisi Enterprise, Anda hanya perlu membuat tabel lokal. Untuk kluster Edisi Kompatibel Komunitas, Anda dapat membuat tabel terdistribusi berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang pernyataan pembuatan tabel, lihat CREATE TABLE. Pernyataan contoh:

Edisi Enterprise

CREATE TABLE default.kafka_table_local ON CLUSTER default (
  id Int32,
  name String
) ENGINE = MergeTree()
ORDER BY (id);

Jika pesan kesalahan ON CLUSTER is not allowed for Replicated database muncul saat Anda mengeksekusi pernyataan sebelumnya, Anda dapat memperbarui versi mesin minor kluster untuk menyelesaikan masalah ini. Untuk informasi lebih lanjut, lihat Perbarui Versi Mesin Minor.

Edisi Kompatibel Komunitas

Mesin tabel yang digunakan oleh kluster Edisi Single-replika dan Edisi Double-replika berbeda. Pilih mesin tabel berdasarkan jenis replika yang Anda gunakan.

Penting

Saat membuat tabel dalam kluster Edisi Double-replika, pastikan bahwa tabel-tabel tersebut menggunakan mesin tabel Replicated dari keluarga MergeTree. Jika tabel-tabel tersebut menggunakan mesin tabel non-Replicated, data pada tabel-tabel tersebut tidak direplikasi di antara replika. Hal ini dapat menyebabkan ketidaksesuaian data.

Edisi Single-replika

  1. Buat tabel lokal.

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = MergeTree()
    ORDER BY (id);
  2. Opsional. Buat tabel terdistribusi.

    Jika Anda ingin mengimpor data hanya ke tabel lokal, lewati langkah ini.

    Jika Anda menggunakan kluster multi-node, kami sarankan Anda membuat tabel terdistribusi.

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

Edisi Double-replika

  1. Buat tabel lokal.

    CREATE TABLE default.kafka_table_local ON CLUSTER default (
      id Int32,
      name String
    ) ENGINE = ReplicatedMergeTree()
    ORDER BY (id);
  2. Opsional. Buat tabel terdistribusi.

    Jika Anda ingin mengimpor data hanya ke tabel lokal, lewati langkah ini.

    Jika Anda menggunakan kluster multi-node, kami sarankan Anda membuat tabel terdistribusi.

    CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local
    ENGINE = Distributed(default, default, kafka_table_local, id);

Langkah 5: Membuat tampilan materialisasi

Selama sinkronisasi, ApsaraDB for ClickHouse menggunakan tampilan materialisasi untuk membaca data sumber dari tabel eksternal Kafka dan kemudian memasukkan data ke tabel lokal ApsaraDB for ClickHouse.

Pernyataan contoh untuk membuat tampilan materialisasi:

Penting

Pastikan bahwa bidang dalam klausa SELECT sesuai dengan skema tabel tujuan. Sebagai alternatif, Anda dapat menggunakan fungsi konversi untuk mengonversi format data, memastikan konsistensi skema.

CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;

Tabel berikut menjelaskan parameter.

Nama parameter

Diperlukan

Deskripsi

Contoh

view_name

Ya

Nama tampilan.

consumer

dest_table

Ya

Tabel tujuan yang digunakan untuk menyimpan data Kafka.

  • Edisi Kompatibel Komunitas kluster:

    • Jika Anda menggunakan kluster multi-node, kami sarankan Anda mengimpor data ke tabel terdistribusi.

    • Jika tabel tujuan adalah tabel lokal, data disimpan di tabel lokal.

  • Edisi Enterprise kluster: Kluster Edisi Enterprise tidak mendukung tabel terdistribusi. Data disimpan di tabel lokal.

  • Edisi Kompatibel Komunitas: kafka_table_distributed

  • Edisi Enterprise: kafka_table_local

src_table

Ya

Tabel eksternal Kafka.

kafka_src_table

Pernyataan contoh:

Edisi Enterprise

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;

Edisi Kompatibel Komunitas

Anda dapat mengeksekusi pernyataan contoh berikut untuk menyimpan data sumber di tabel terdistribusi kafka_table_distributed.

CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;

Langkah 6: Periksa apakah data telah disinkronkan

  1. Kirim pesan ke topik instance ApsaraMQ for Kafka.

    1. Masuk ke Konsol ApsaraMQ for Kafka.

    2. Di halaman Instances, klik nama instance yang ingin Anda kelola.

    3. Di halaman Topics, temukan topik yang diperlukan dan pilih More > Send Message di kolom Actions.

    4. Di panel Send to Produce and Consume Message, tentukan isi pesan yang ingin Anda kirim.

      Dalam contoh ini, pesan 1,a dan 2,b dikirim.

    5. Klik OK.

  2. Masuk ke database ApsaraDB for ClickHouse dan kueri tabel terdistribusi untuk memeriksa apakah data telah disinkronkan.

    Untuk informasi lebih lanjut tentang cara masuk ke database ApsaraDB for ClickHouse, lihat Hubungkan ke ClickHouse menggunakan DMS.

    Pernyataan contoh untuk memverifikasi data:

    Edisi Enterprise

    SELECT * FROM kafka_table_local; 

    Edisi Kompatibel Komunitas

    Pernyataan berikut menunjukkan cara mengkueri tabel terdistribusi.

    • Jika tabel tujuan adalah tabel lokal, Anda harus mengganti nama tabel terdistribusi dengan nama tabel lokal dalam pernyataan tersebut.

    • Jika Anda menggunakan kluster Edisi Kompatibel Komunitas dengan beberapa node, kami sarankan Anda mengkueri data dari tabel terdistribusi. Jika tidak, data hanya akan di-query dari satu node dalam kluster, yang menghasilkan hasil yang tidak lengkap.

    SELECT * FROM kafka_table_distributed; 

    Saat Anda menjalankan query dan menerima hasil yang sukses, ini menunjukkan bahwa data telah disinkronkan dari instance ApsaraMQ for Kafka ke kluster ApsaraDB for ClickHouse.

    Hasil contoh:

    ┌─id─┬─name─┐
    │  1 │  a   │
    │  2 │  b   │
    └────┴──────┘

    Jika hasil query berbeda dari yang Anda harapkan, Anda dapat melakukan pemecahan masalah lebih lanjut dengan mengikuti instruksi yang dijelaskan di (Opsional) Langkah 7: Lihat Status Konsumsi Tabel Eksternal Kafka.

(Opsional) Langkah 7: Lihat status konsumsi tabel eksternal Kafka

Jika data yang disinkronkan tidak sesuai dengan data dalam instance ApsaraMQ for Kafka, Anda dapat menggunakan tabel sistem untuk memeriksa status konsumsi tabel eksternal Kafka dan memecahkan masalah konsumsi pesan.

Edisi Kompatibel Komunitas dan Edisi Enterprise V23.8 dan yang lebih baru

Lihat status konsumsi tabel eksternal Kafka dengan mengkueri tabel sistem system.kafka_consumers. Pernyataan contoh:

select * from system.kafka_consumers;

Tabel berikut menjelaskan bidang dalam tabel system.kafka_consumers.

Nama Bidang

Deskripsi

database

Database tempat tabel eksternal Kafka berada.

table

Nama tabel eksternal Kafka.

consumer_id

ID konsumen Kafka.

Sebuah tabel dapat dikonsumsi oleh beberapa konsumen. Konsumen ditentukan oleh parameter kafka_num_consumers selama pembuatan tabel eksternal Kafka.

assignments.topic

Topik Kafka.

assignments.partition_id

ID partisi Kafka.

Sebuah partisi hanya dapat ditetapkan ke satu konsumen.

assignments.current_offset

Offset saat ini.

exceptions.time

Timestamp dari 10 pengecualian terbaru.

exceptions.text

Teks dari 10 pengecualian terbaru.

last_poll_time

Timestamp dari polling terbaru.

num_messages_read

Jumlah pesan yang dibaca oleh konsumen.

last_commit_time

Timestamp dari commit terbaru.

num_commits

Total jumlah commit untuk konsumen.

last_rebalance_time

Timestamp dari rebalancing Kafka terbaru.

num_rebalance_revocations

Jumlah kali partisi dicabut untuk konsumen.

num_rebalance_assignments

Jumlah kali konsumen ditetapkan ke kluster Kafka.

is_currently_used

Menunjukkan apakah konsumen sedang digunakan.

last_used

Waktu terakhir konsumen digunakan. Unit: mikrodetik.

rdkafka_stat

Informasi statistik internal dari database. Untuk informasi lebih lanjut, lihat librdkafka.

Nilai defaultnya adalah 3000, yang menunjukkan bahwa informasi statistik dihasilkan setiap tiga detik.

Catatan

Saat ApsaraDB for ClickHouse diatur dengan statistics_interval_ms=0, pengumpulan informasi statistik untuk tabel eksternal Kafka dapat dinonaktifkan.

Lebih awal dari Edisi Kompatibel Komunitas V23.8

Lihat status konsumsi tabel eksternal Kafka dengan mengkueri tabel sistem system.kafka. Pernyataan contoh:

SELECT * FROM system.kafka;

Tabel berikut menjelaskan bidang dalam tabel system.kafka.

Nama Bidang

Deskripsi

database

Nama database tempat tabel eksternal Kafka berada.

table

Nama tabel eksternal Kafka.

topic

Nama topik yang dikonsumsi oleh tabel eksternal Kafka.

consumer_group

Nama grup yang dikonsumsi oleh tabel eksternal Kafka.

last_read_message_count

Jumlah pesan yang ditarik oleh tabel eksternal Kafka.

status

Status konsumsi tabel eksternal Kafka. Nilai valid:

  • no_view: Tidak ada tampilan yang dibuat untuk tabel eksternal Kafka.

  • attach_view: Tampilan telah dibuat untuk tabel eksternal Kafka.

  • normal: Status normal.

    Saat tabel eksternal Kafka mengonsumsi data, status tabel eksternal Kafka adalah normal.

  • skip_parse: Kesalahan parsing dilewati.

  • error: Terjadi pengecualian konsumsi.

exception

Detail kesalahan.

Catatan

Ketika status diatur ke error, detail kesalahan akan ditampilkan.

FAQ