Topik ini menjelaskan cara menyinkronkan data dari instance ApsaraMQ for Kafka ke kluster ApsaraDB for ClickHouse secara real-time.
Batasan
Hanya data dari instance ApsaraMQ for Kafka atau kluster Kafka yang dikelola sendiri yang dijalankan pada instance Elastic Compute Service (ECS) yang dapat disinkronkan ke instance ApsaraDB for ClickHouse.
Prasyarat
Kluster tujuan ApsaraDB for ClickHouse:
Kluster tujuan telah dibuat. Instance sumber ApsaraMQ for Kafka dan kluster tujuan berada di wilayah yang sama serta menggunakan virtual private cloud (VPC) yang sama. Untuk informasi lebih lanjut, lihat Membuat Kluster ApsaraDB for ClickHouse.
Akun telah dibuat untuk mengakses database di kluster tujuan dan memiliki izin untuk melakukan operasi pada database. Untuk informasi lebih lanjut, lihat Manajemen Akun.
Instance sumber ApsaraMQ for Kafka:
Topik telah dibuat. Untuk informasi lebih lanjut, lihat Langkah 1: Membuat Topik.
Grup konsumen telah dibuat. Untuk informasi lebih lanjut, lihat Langkah 2: Membuat Grup.
Catatan penggunaan
Jika sebuah topik berlangganan oleh tabel eksternal Kafka ApsaraDB for ClickHouse, topik tersebut tidak dapat dikonsumsi oleh konsumen lainnya.
Saat membuat tabel eksternal Kafka, tampilan materialisasi, dan tabel lokal, tipe bidang dalam ketiga tabel harus sesuai.
Prosedur
Contoh ini menunjukkan cara menyinkronkan data dari instance ApsaraMQ for Kafka ke tabel terdistribusi kafka_table_distributed di database default dari kluster ApsaraDB for ClickHouse yang menjalankan Edisi Kompatibel Komunitas.
Langkah 1: Memahami prinsip sinkronisasi
ApsaraDB for ClickHouse menggunakan mesin tabel Kafka bawaan dan mekanisme tampilan materialisasi untuk menyinkronkan data dari instance ApsaraMQ for Kafka. Ini mencapai konsumsi dan penyimpanan data secara real-time. Gambar berikut menunjukkan proses sinkronisasi.
Topik Kafka: Menentukan data sumber yang akan disinkronkan.
Tabel eksternal Kafka ApsaraDB for ClickHouse: Menarik data sumber dari topik Kafka tertentu.
Tampilan materialisasi: Membaca data sumber dari tabel eksternal Kafka dan memasukkan data ke tabel lokal ApsaraDB for ClickHouse.
Tabel lokal: Menyimpan data yang disinkronkan.
Langkah 2: Masuk ke database ApsaraDB for ClickHouse
Untuk informasi lebih lanjut tentang cara masuk ke database ApsaraDB for ClickHouse, lihat Gunakan DMS untuk Terhubung ke Kluster ApsaraDB for ClickHouse.
Langkah 3: Membuat tabel eksternal Kafka
ApsaraDB for ClickHouse menggunakan mesin tabel Kafka bawaan untuk menarik data sumber dari topik Kafka tertentu. Tabel eksternal Kafka memiliki fitur-fitur berikut:
Secara default, tabel eksternal Kafka tidak dapat di-query langsung.
Tabel eksternal Kafka hanya digunakan untuk mengonsumsi data Kafka dan tidak menyimpan data. Anda harus menggunakan tampilan materialisasi untuk memproses data dan memasukkan data ke tabel tujuan untuk penyimpanan.
Pernyataan contoh untuk membuat tabel:
Format bidang tabel eksternal Kafka harus sesuai dengan tipe data Kafka.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
...
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'host:port1,host:port2,host:port3',
kafka_topic_list = 'topic_name1,topic_name2,...',
kafka_group_name = 'group_name',
kafka_format = 'data_format'[,]
[kafka_row_delimiter = 'delimiter_symbol',]
[kafka_num_consumers = N,]
[kafka_thread_per_consumer = 1,]
[kafka_max_block_size = 0,]
[kafka_skip_broken_messages = N,]
[kafka_commit_every_batch = 0,]
[kafka_auto_offset_reset = N]Tabel berikut menjelaskan parameter dalam pernyataan sebelumnya.
Parameter | Diperlukan | Deskripsi |
kafka_broker_list | Ya | Titik akhir yang digunakan untuk terhubung ke instance ApsaraMQ for Kafka. Pisahkan titik akhir dengan koma (,). Untuk informasi lebih lanjut tentang cara melihat titik akhir, lihat Lihat titik akhir.
|
kafka_topic_list | Ya | Nama topik dalam instance ApsaraMQ for Kafka. Pisahkan nama topik dengan koma (,). Untuk informasi lebih lanjut tentang cara melihat nama topik, lihat Langkah 1: Membuat topik. |
kafka_group_name | Ya | Nama grup konsumen dalam instance ApsaraMQ for Kafka. Untuk informasi lebih lanjut, lihat Langkah 2: Membuat grup. |
kafka_format | Ya | Format badan pesan yang didukung oleh ApsaraDB for ClickHouse. Catatan Untuk informasi lebih lanjut tentang format badan pesan yang didukung oleh ApsaraDB for ClickHouse, lihat Format untuk Data Input dan Output. |
kafka_row_delimiter | Tidak | Pemisah baris yang digunakan untuk memisahkan baris. Nilai default adalah \n. Anda juga dapat mengatur parameter ini berdasarkan format pembatas aktual tempat data ditulis. |
kafka_num_consumers | Tidak | Jumlah konsumen yang mengonsumsi data dalam tabel. Nilai default: 1. Catatan
|
kafka_thread_per_consumer | Tidak | Menentukan apakah setiap konsumen memulai thread independen untuk konsumsi. Nilai default: 0. Nilai valid:
Untuk informasi lebih lanjut tentang cara meningkatkan kecepatan konsumsi, lihat Optimasi Kinerja Kafka. |
kafka_max_block_size | Tidak | Ukuran maksimum pesan Kafka yang dapat ditulis ke tabel dalam setiap batch. Nilai default: 65536. Unit: byte. |
kafka_skip_broken_messages | Tidak | Toleransi parser pesan Kafka terhadap data kotor. Nilai default: 0. Jika Anda mengatur |
kafka_commit_every_batch | Tidak | Menentukan seberapa sering operasi commit dilakukan. Nilai default: 0. Nilai valid:
|
kafka_auto_offset_reset | Tidak | Offset dari mana data dalam instance ApsaraMQ for Kafka yang digunakan sebagai sumber data dikonsumsi. Nilai valid:
Catatan Parameter ini tidak didukung oleh kluster ApsaraDB for ClickHouse versi 21.8. |
Untuk informasi lebih lanjut tentang parameter, lihat Kafka.
Pernyataan contoh:
CREATE TABLE default.kafka_src_table ON CLUSTER `default`
(-- Bidang yang mendefinisikan skema tabel
id Int32,
name String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'alikafka-post-cn-****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****1-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-****-3-vpc.alikafka.aliyuncs.com:9092',
kafka_topic_list = 'testforCK',
kafka_group_name = 'GroupForTestCK',
kafka_format = 'CSV';Langkah 4: Membuat tabel tujuan untuk penyimpanan
Pernyataan pembuatan tabel bervariasi berdasarkan edisi kluster.
Untuk kluster Edisi Enterprise, Anda hanya perlu membuat tabel lokal. Untuk kluster Edisi Kompatibel Komunitas, Anda dapat membuat tabel terdistribusi berdasarkan kebutuhan bisnis Anda. Untuk informasi lebih lanjut tentang pernyataan pembuatan tabel, lihat CREATE TABLE. Pernyataan contoh:
Edisi Enterprise
CREATE TABLE default.kafka_table_local ON CLUSTER default (
id Int32,
name String
) ENGINE = MergeTree()
ORDER BY (id);Jika pesan kesalahan ON CLUSTER is not allowed for Replicated database muncul saat Anda mengeksekusi pernyataan sebelumnya, Anda dapat memperbarui versi mesin minor kluster untuk menyelesaikan masalah ini. Untuk informasi lebih lanjut, lihat Perbarui Versi Mesin Minor.
Edisi Kompatibel Komunitas
Mesin tabel yang digunakan oleh kluster Edisi Single-replika dan Edisi Double-replika berbeda. Pilih mesin tabel berdasarkan jenis replika yang Anda gunakan.
Saat membuat tabel dalam kluster Edisi Double-replika, pastikan bahwa tabel-tabel tersebut menggunakan mesin tabel Replicated dari keluarga MergeTree. Jika tabel-tabel tersebut menggunakan mesin tabel non-Replicated, data pada tabel-tabel tersebut tidak direplikasi di antara replika. Hal ini dapat menyebabkan ketidaksesuaian data.
Edisi Single-replika
Buat tabel lokal.
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = MergeTree() ORDER BY (id);Opsional. Buat tabel terdistribusi.
Jika Anda ingin mengimpor data hanya ke tabel lokal, lewati langkah ini.
Jika Anda menggunakan kluster multi-node, kami sarankan Anda membuat tabel terdistribusi.
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Edisi Double-replika
Buat tabel lokal.
CREATE TABLE default.kafka_table_local ON CLUSTER default ( id Int32, name String ) ENGINE = ReplicatedMergeTree() ORDER BY (id);Opsional. Buat tabel terdistribusi.
Jika Anda ingin mengimpor data hanya ke tabel lokal, lewati langkah ini.
Jika Anda menggunakan kluster multi-node, kami sarankan Anda membuat tabel terdistribusi.
CREATE TABLE kafka_table_distributed ON CLUSTER default AS default.kafka_table_local ENGINE = Distributed(default, default, kafka_table_local, id);
Langkah 5: Membuat tampilan materialisasi
Selama sinkronisasi, ApsaraDB for ClickHouse menggunakan tampilan materialisasi untuk membaca data sumber dari tabel eksternal Kafka dan kemudian memasukkan data ke tabel lokal ApsaraDB for ClickHouse.
Pernyataan contoh untuk membuat tampilan materialisasi:
Pastikan bahwa bidang dalam klausa SELECT sesuai dengan skema tabel tujuan. Sebagai alternatif, Anda dapat menggunakan fungsi konversi untuk mengonversi format data, memastikan konsistensi skema.
CREATE MATERIALIZED VIEW <view_name> ON CLUSTER default TO <dest_table> AS SELECT * FROM <src_table>;Tabel berikut menjelaskan parameter.
Nama parameter | Diperlukan | Deskripsi | Contoh |
view_name | Ya | Nama tampilan. | consumer |
dest_table | Ya | Tabel tujuan yang digunakan untuk menyimpan data Kafka.
|
|
src_table | Ya | Tabel eksternal Kafka. | kafka_src_table |
Pernyataan contoh:
Edisi Enterprise
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_local AS SELECT * FROM kafka_src_table;Edisi Kompatibel Komunitas
Anda dapat mengeksekusi pernyataan contoh berikut untuk menyimpan data sumber di tabel terdistribusi kafka_table_distributed.
CREATE MATERIALIZED VIEW consumer ON CLUSTER default TO kafka_table_distributed AS SELECT * FROM kafka_src_table;Langkah 6: Periksa apakah data telah disinkronkan
Kirim pesan ke topik instance ApsaraMQ for Kafka.
Masuk ke Konsol ApsaraMQ for Kafka.
Di halaman Instances, klik nama instance yang ingin Anda kelola.
Di halaman Topics, temukan topik yang diperlukan dan pilih di kolom Actions.
Di panel Send to Produce and Consume Message, tentukan isi pesan yang ingin Anda kirim.
Dalam contoh ini, pesan
1,adan2,bdikirim.Klik OK.
Masuk ke database ApsaraDB for ClickHouse dan kueri tabel terdistribusi untuk memeriksa apakah data telah disinkronkan.
Untuk informasi lebih lanjut tentang cara masuk ke database ApsaraDB for ClickHouse, lihat Hubungkan ke ClickHouse menggunakan DMS.
Pernyataan contoh untuk memverifikasi data:
Edisi Enterprise
SELECT * FROM kafka_table_local;Edisi Kompatibel Komunitas
Pernyataan berikut menunjukkan cara mengkueri tabel terdistribusi.
Jika tabel tujuan adalah tabel lokal, Anda harus mengganti nama tabel terdistribusi dengan nama tabel lokal dalam pernyataan tersebut.
Jika Anda menggunakan kluster Edisi Kompatibel Komunitas dengan beberapa node, kami sarankan Anda mengkueri data dari tabel terdistribusi. Jika tidak, data hanya akan di-query dari satu node dalam kluster, yang menghasilkan hasil yang tidak lengkap.
SELECT * FROM kafka_table_distributed;Saat Anda menjalankan query dan menerima hasil yang sukses, ini menunjukkan bahwa data telah disinkronkan dari instance ApsaraMQ for Kafka ke kluster ApsaraDB for ClickHouse.
Hasil contoh:
┌─id─┬─name─┐ │ 1 │ a │ │ 2 │ b │ └────┴──────┘Jika hasil query berbeda dari yang Anda harapkan, Anda dapat melakukan pemecahan masalah lebih lanjut dengan mengikuti instruksi yang dijelaskan di (Opsional) Langkah 7: Lihat Status Konsumsi Tabel Eksternal Kafka.
(Opsional) Langkah 7: Lihat status konsumsi tabel eksternal Kafka
Jika data yang disinkronkan tidak sesuai dengan data dalam instance ApsaraMQ for Kafka, Anda dapat menggunakan tabel sistem untuk memeriksa status konsumsi tabel eksternal Kafka dan memecahkan masalah konsumsi pesan.
Edisi Kompatibel Komunitas dan Edisi Enterprise V23.8 dan yang lebih baru
Lihat status konsumsi tabel eksternal Kafka dengan mengkueri tabel sistem system.kafka_consumers. Pernyataan contoh:
select * from system.kafka_consumers;Tabel berikut menjelaskan bidang dalam tabel system.kafka_consumers.
Nama Bidang | Deskripsi |
database | Database tempat tabel eksternal Kafka berada. |
table | Nama tabel eksternal Kafka. |
consumer_id | ID konsumen Kafka. Sebuah tabel dapat dikonsumsi oleh beberapa konsumen. Konsumen ditentukan oleh parameter kafka_num_consumers selama pembuatan tabel eksternal Kafka. |
assignments.topic | Topik Kafka. |
assignments.partition_id | ID partisi Kafka. Sebuah partisi hanya dapat ditetapkan ke satu konsumen. |
assignments.current_offset | Offset saat ini. |
exceptions.time | Timestamp dari 10 pengecualian terbaru. |
exceptions.text | Teks dari 10 pengecualian terbaru. |
last_poll_time | Timestamp dari polling terbaru. |
num_messages_read | Jumlah pesan yang dibaca oleh konsumen. |
last_commit_time | Timestamp dari commit terbaru. |
num_commits | Total jumlah commit untuk konsumen. |
last_rebalance_time | Timestamp dari rebalancing Kafka terbaru. |
num_rebalance_revocations | Jumlah kali partisi dicabut untuk konsumen. |
num_rebalance_assignments | Jumlah kali konsumen ditetapkan ke kluster Kafka. |
is_currently_used | Menunjukkan apakah konsumen sedang digunakan. |
last_used | Waktu terakhir konsumen digunakan. Unit: mikrodetik. |
rdkafka_stat | Informasi statistik internal dari database. Untuk informasi lebih lanjut, lihat librdkafka. Nilai defaultnya adalah 3000, yang menunjukkan bahwa informasi statistik dihasilkan setiap tiga detik. Catatan Saat ApsaraDB for ClickHouse diatur dengan statistics_interval_ms=0, pengumpulan informasi statistik untuk tabel eksternal Kafka dapat dinonaktifkan. |
Lebih awal dari Edisi Kompatibel Komunitas V23.8
Lihat status konsumsi tabel eksternal Kafka dengan mengkueri tabel sistem system.kafka. Pernyataan contoh:
SELECT * FROM system.kafka;Tabel berikut menjelaskan bidang dalam tabel system.kafka.
Nama Bidang | Deskripsi |
database | Nama database tempat tabel eksternal Kafka berada. |
table | Nama tabel eksternal Kafka. |
topic | Nama topik yang dikonsumsi oleh tabel eksternal Kafka. |
consumer_group | Nama grup yang dikonsumsi oleh tabel eksternal Kafka. |
last_read_message_count | Jumlah pesan yang ditarik oleh tabel eksternal Kafka. |
status | Status konsumsi tabel eksternal Kafka. Nilai valid:
|
exception | Detail kesalahan. Catatan Ketika status diatur ke error, detail kesalahan akan ditampilkan. |