Routine Load secara terus-menerus membaca pesan dari topik Kafka dan memuatnya ke dalam ApsaraDB for SelectDB. Setelah dibuat, pekerjaan Routine Load berjalan secara persisten—tidak perlu diajukan ulang setelah setiap batch.
Cara kerja
Saat Anda membuat pekerjaan Routine Load, frontend (FE) menghasilkan pekerjaan muat persisten dan membaginya menjadi beberapa task. Setiap task merupakan transaksi independen yang mengonsumsi irisan terbatas pesan Kafka. Sebuah task berakhir ketika ambang batas pertama tercapai: max_batch_interval, max_batch_rows, atau max_batch_size. Setelah sebuah task dikomit, task baru segera dijadwalkan.
Arsitektur ini memungkinkan penyesuaian konkurensi, ukuran batch, dan toleransi kesalahan tanpa menghentikan pekerjaan.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Kluster Kafka (versi 0.10.0.0 atau lebih baru) yang dapat diakses dari instans ApsaraDB for SelectDB Anda
Topik Kafka dengan pesan dalam format CSV atau JSON (pesan CSV harus berupa satu baris tanpa line feed di akhir)
Tabel tujuan di ApsaraDB for SelectDB
Konektivitas jaringan: Jika kluster Kafka Anda dideploy di lingkungan jaringan publik, instans ApsaraDB for SelectDB Anda harus memiliki akses jaringan publik untuk terhubung ke broker Kafka. Lihat Atasi masalah jaringan dengan sumber data.
Catatan penggunaan
Kafka 0.10.0.0 dan versi yang lebih baru didukung secara default. Untuk menggunakan versi Kafka lama (0.9.0, 0.8.2, 0.8.1, atau 0.8.0), gunakan salah satu pendekatan berikut:
Setel
kafka_broker_version_fallbackdalam konfigurasi backend (BE) ke versi Kafka target.Setel
property.broker.version.fallbacksaat membuat pekerjaan Routine Load.
Beberapa fitur tidak tersedia pada versi Kafka sebelum 0.10.0.0. Misalnya, offset partisi berbasis waktu tidak didukung.
Panduan cepat
Contoh ini memuat data CSV dari Kafka ke tabel SelectDB.
Langkah 1: Verifikasi data sampel di Kafka
1,Alice,30
2,Bob,25
3,Carol,35Langkah 2: Buat tabel tujuan
CREATE TABLE testdb.users (
id INT NOT NULL,
name VARCHAR(50),
age INT
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES ("replication_num" = "1");Langkah 3: Buat pekerjaan Routine Load
CREATE ROUTINE LOAD testdb.load_users ON users
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES (
"desired_concurrent_number" = "3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "users",
"property.kafka_default_offsets" = "OFFSET_BEGINNING"
);Langkah 4: Periksa status pekerjaan
SHOW ROUTINE LOAD FOR testdb.load_users;Buat pekerjaan Routine Load
Sintaks
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]Parameter
Parameter | Deskripsi |
| Nama pekerjaan. Dalam satu database, hanya satu pekerjaan dengan nama tertentu yang dapat berjalan pada satu waktu. |
| Nama tabel tujuan. |
| Mode penggabungan data. Default: |
| Parameter untuk memproses data yang diimpor. Lihat parameter load_properties. |
| Parameter tingkat pekerjaan. Lihat parameter job_properties. |
| Parameter koneksi Kafka. Lihat parameter data_source_properties. |
Parameter load_properties
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]Parameter | Contoh | Deskripsi |
|
| Pembatas kolom. Default: |
|
| Memetakan kolom sumber ke kolom tujuan dan menerapkan transformasi. Lihat Mengonversi data sumber. |
| — | Menyaring data sumber sebelum pemetaan kolom. Lihat Mengonversi data sumber. |
|
| Menyaring baris setelah pemetaan kolom. Lihat Mengonversi data sumber. |
|
| Partisi target. Jika dihilangkan, data diarahkan ke partisi yang sesuai secara otomatis. |
|
| Menentukan ekspresi kolom Delete Flag. Diperlukan saat |
| — | Menentukan Sequence Col untuk mempertahankan urutan baris selama impor. Hanya berlaku untuk tabel model Unique Key. |
Parameter job_properties
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)Sebuah task berakhir ketika ambang batas pertama darimax_batch_interval,max_batch_rows, ataumax_batch_sizetercapai.
Parameter | Default | Rentang valid | Deskripsi |
|
| Bilangan bulat > 0 | Maksimum task yang dapat berjalan secara konkuren untuk pekerjaan tersebut. Konkurensi aktual bergantung pada node kluster, beban, dan sumber data. Untuk throughput optimal, atur nilai ini ke |
|
| 5–60 detik | Durasi eksekusi maksimum per task. |
|
| ≥ 200000 | Maksimum baris yang dibaca per task. |
|
| 100 MB–1 GB | Maksimum byte yang dibaca per task. |
|
| Bilangan bulat > 0 | Maksimum baris error yang diperbolehkan dalam jendela pengambilan sampel (10 × |
|
|
| Jika |
| Zona waktu sesi | — | Zona waktu untuk semua fungsi terkait zona waktu dalam pekerjaan (misalnya, |
|
|
| Format pesan. |
| — | — | Jalur ekstraksi bidang JSON untuk data format JSON (misalnya, |
|
|
| Jika |
| — | — | Jalur node root untuk ekstraksi JSON (misalnya, |
| — | — | Maksimum thread paralel untuk mengirim data batch. Dibatasi oleh |
|
|
| Jika |
Perilaku mode ketat
Mode ketat mengatur cara penanganan kegagalan konversi tipe.
Mengimpor kolom TINYINT (NULL diizinkan):
Tipe data sumber | Nilai contoh | Hasil konversi | Mode ketat | Hasil |
NULL |
| — |
| NULL |
NOT NULL |
| NULL |
| Difilter (tidak valid) |
NOT NULL |
| NULL |
| NULL |
NOT NULL |
|
|
| Diimpor |
Mengimpor kolom DECIMAL(1,0) (NULL diizinkan):
Tipe data sumber | Nilai contoh | Hasil konversi | Mode ketat | Hasil |
NULL |
| — |
| NULL |
NOT NULL |
| NULL |
| Difilter (tidak valid) |
NOT NULL |
| NULL |
| NULL |
NOT NULL |
|
|
| Diimpor |
Nilai 10 melebihi rentang DECIMAL(1,0) tetapi tidak difilter oleh mode ketat, karena nilainya sendiri lolos konversi tipe. Nilai tersebut difilter kemudian selama proses ekstrak, transformasi, dan muat (ETL).Parameter data_source_properties
FROM KAFKA (
"key1" = "val1",
"key2" = "val2"
)Parameter | Deskripsi |
| Alamat broker. Format: |
| Topik Kafka yang akan dilanggan. |
| ID partisi yang dipisahkan koma untuk dilanggan. Contoh: |
| Offset awal untuk setiap partisi yang tercantum dalam |
| Properti klien Kafka kustom, setara dengan |
Offset berbasis waktu dan offset numerik tidak dapat dicampur dalam nilai kafka_offsets yang sama.
Kombinasi partisi dan offset
kafka_partitions, kafka_offsets, dan property.kafka_default_offsets berinteraksi sebagai berikut:
Metode |
|
|
| Perilaku |
1 | Tidak diatur | Tidak diatur | Tidak diatur | Semua partisi, dimulai dari offset akhir. |
2 | Tidak diatur | Tidak diatur | Set | Semua partisi, dimulai dari offset default yang ditentukan. |
3 | Set | Tidak diatur | Tidak diatur | Partisi yang ditentukan, dimulai dari offset akhir. |
4 | Set | Set | Tidak diatur | Partisi yang ditentukan, dimulai dari offset yang ditentukan. |
5 | Set | Tidak diatur | Tetapkan | Partisi yang ditentukan, dimulai dari offset default yang ditentukan. |
Contoh — partisi dengan offset campuran:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"Contoh — partisi dengan offset berbasis timestamp:
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"Contoh
Muat data CSV
Buat tabel tujuan:
CREATE TABLE test_table ( id INT, name VARCHAR(50), age INT, address VARCHAR(50), url VARCHAR(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES ("replication_num" = "1");Buat pekerjaan Routine Load yang membaca dari awal semua partisi:
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Buat pekerjaan Routine Load dengan mode ketat diaktifkan:
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Mulai mengonsumsi dari timestamp tertentu:
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
Muat data JSON
Routine Load mendukung dua struktur pesan JSON:
Objek JSON tunggal — satu catatan per pesan:
{"key1":"value1","key2":"value2","key3":"value3"}Array JSON — beberapa catatan per pesan:
[ {"key1":"value11","key2":"value12","key3":"value13","key4":14}, {"key1":"value21","key2":"value22","key3":"value23","key4":24} ]
Untuk mode impor multi-tabel, tambahkan prefiks nama tabel tujuan pada setiap pesan:
table_name|{"key1":"value1","key2":"value2","key3":"value3"}Contoh: muat data JSON
Buat tabel tujuan:
CREATE TABLE `example_tbl` ( `category` VARCHAR(24) NULL, `author` VARCHAR(24) NULL, `timestamp` BIGINT(20) NULL, `dt` INT(11) NULL, `price` DOUBLE REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`, `author`, `timestamp`, `dt`) PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`, `author`, `timestamp`) BUCKETS 4;Publikasikan kedua jenis pesan ke topik Kafka:
{"category":"value1331","author":"value1233","timestamp":1700346050,"price":1413}[ {"category":"value13z2","author":"vaelue13","timestamp":1705645251,"price":14330}, {"category":"lvalue211","author":"lvalue122","timestamp":1684448450,"price":24440} ]Muat dalam mode sederhana (nama bidang sesuai dengan nama kolom):
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category, price, author) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );Muat dengan ekstraksi jalur JSON eksplisit dan pengupasan array luar:
Kolom
dttidak muncul dalam data sumber. Nilainya diturunkan daritimestampmenggunakandt=from_unixtime(timestamp,'%Y%m%d')dalam klausaCOLUMNS.CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number" = "3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );
Koneksi ke kluster Kafka dengan otentikasi
ApsaraDB for SelectDB menggunakan library klien C++ librdkafka untuk terhubung ke Kafka. Untuk properti konfigurasi yang didukung, lihat referensi konfigurasi librdkafka.
Otentikasi SSL
Unggah file sertifikat yang diperlukan terlebih dahulu, lalu buat pekerjaan Routine Load.
Unggah file sertifikat:
CREATE FILE "ca.pem" PROPERTIES ("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES ("url" = "https://example_url/kafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES ("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");Buat pekerjaan Routine Load:
Properti
Wajib
Deskripsi
property.security.protocolSelalu
Atur ke
ssl.property.ssl.ca.locationSelalu
Jalur ke sertifikat CA yang mengotentikasi kunci publik broker Kafka.
property.ssl.certificate.locationHanya jika otentikasi klien diaktifkan di server Kafka
Jalur ke sertifikat kunci publik klien.
property.ssl.key.locationHanya jika otentikasi klien diaktifkan di server Kafka
Jalur ke file kunci privat klien.
property.ssl.key.passwordHanya jika otentikasi klien diaktifkan di server Kafka
Kata sandi untuk kunci privat klien.
CREATE ROUTINE LOAD db1.job1 ON tbl1 PROPERTIES ( "desired_concurrent_number" = "1" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );
Otentikasi PLAIN
CREATE ROUTINE LOAD db1.job1 ON tbl1
PROPERTIES (
"desired_concurrent_number" = "1"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.mechanism" = "PLAIN",
"property.sasl.username" = "admin",
"property.sasl.password" = "admin"
);Properti | Nilai | Deskripsi |
|
| Gunakan Simple Authentication and Security Layer (SASL) plaintext. |
|
| Mekanisme SASL. |
| — | Username SASL. |
| — | Password SASL. |
Otentikasi Kerberos
Sebelum membuat pekerjaan, deploy klien Kerberos kinit di semua node dalam kluster ApsaraDB for SelectDB Anda, konfigurasikan krb5.conf, dan tentukan layanan Key Distribution Center (KDC).
CREATE ROUTINE LOAD db1.job1 ON tbl1
PROPERTIES (
"desired_concurrent_number" = "1"
)
FROM KAFKA (
"kafka_broker_list" = "broker1:9092,broker2:9092",
"kafka_topic" = "my_topic",
"property.security.protocol" = "SASL_PLAINTEXT",
"property.sasl.kerberos.service.name" = "kafka",
"property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
"property.sasl.kerberos.principal" = "id@your.com"
);Properti | Deskripsi |
| Atur ke |
| Nama layanan broker Kafka. |
| Jalur absolut ke file lokal |
| Pihak yang berwenang Kerberos yang digunakan untuk terhubung ke kluster Kafka. |
Ubah pekerjaan Routine Load
Hanya pekerjaan dalam status PAUSED yang dapat diubah.
Sintaks
ALTER ROUTINE LOAD FOR [db.]job_name
[job_properties]
FROM data_source
[data_source_properties]Parameter yang dapat diubah
job_properties — parameter berikut dapat diubah:
desired_concurrent_numbermax_error_numbermax_batch_intervalmax_batch_rowsmax_batch_sizejsonpathsjson_rootstrip_outer_arraystrict_modetimezonenum_as_stringfuzzy_parse
data_source_properties — hanya properti Kafka yang dapat diubah:
kafka_partitionskafka_offsetskafka_broker_listkafka_topicProperti kustom (misalnya,
property.group.id)
kafka_partitionsdankafka_offsetshanya dapat mengubah offset partisi yang sedang dikonsumsi. Menambahkan partisi baru tidak didukung.
Contoh
Ubah konkurensi pekerjaan db1.label1 menjadi 1:
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES (
"desired_concurrent_number" = "1"
);Ubah konkurensi menjadi 10 dan sesuaikan offset partisi serta ID kelompok konsumen:
ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES (
"desired_concurrent_number" = "10"
)
FROM KAFKA (
"kafka_partitions" = "0, 1, 2",
"kafka_offsets" = "100, 200, 100",
"property.group.id" = "new_group"
);Jeda pekerjaan Routine Load
Sintaks
PAUSE ROUTINE LOAD FOR [db.]job_name;
PAUSE ALL ROUTINE LOAD;Contoh
Jeda pekerjaan test1:
PAUSE ROUTINE LOAD FOR test1;Jeda semua pekerjaan Routine Load di database saat ini:
PAUSE ALL ROUTINE LOAD;Lanjutkan pekerjaan Routine Load
Pekerjaan yang dilanjutkan akan terus mengonsumsi dari offset terakhir yang dikomit.
Sintaks
RESUME ROUTINE LOAD FOR [db.]job_name;
RESUME ALL ROUTINE LOAD;Contoh
Lanjutkan pekerjaan test1:
RESUME ROUTINE LOAD FOR test1;Lanjutkan semua pekerjaan Routine Load yang dijeda di database saat ini:
RESUME ALL ROUTINE LOAD;Hentikan pekerjaan Routine Load
Pekerjaan yang dihentikan tidak dapat dimulai ulang, dan data yang diimpor sebelum penghentian tidak dapat dikembalikan.
Sintaks
STOP ROUTINE LOAD FOR [db.]job_name;Contoh
STOP ROUTINE LOAD FOR test1;Lihat status pekerjaan Routine Load
Sintaks
SHOW [ALL] ROUTINE LOAD [FOR [db.]job_name];Tanpa ALL, hanya pekerjaan yang sedang berjalan dan dijeda yang dikembalikan. Dengan ALL, pekerjaan yang dihentikan dan dibatalkan juga disertakan.
Bidang output
Bidang | Deskripsi |
| ID pekerjaan, ditetapkan secara otomatis oleh ApsaraDB for SelectDB. |
| Nama pekerjaan. |
| Waktu pekerjaan dibuat. |
| Waktu terbaru pekerjaan dijeda. |
| Waktu pekerjaan berakhir (dihentikan atau dibatalkan). |
| Status pekerjaan saat ini: |
| Jenis sumber data. Selalu |
| Jumlah task yang sedang berjalan. |
| URL yang menunjuk ke log error. Buka URL apa pun di browser untuk melihat detail baris yang tidak valid. |
Contoh
Kueri semua pekerjaan bernama test1, termasuk yang dihentikan dan dibatalkan:
SHOW ALL ROUTINE LOAD FOR test1;Kueri hanya pekerjaan yang sedang berjalan bernama test1:
SHOW ROUTINE LOAD FOR test1;Kueri semua pekerjaan di example_db, termasuk yang dihentikan dan dibatalkan:
USE example_db;
SHOW ALL ROUTINE LOAD;Kueri hanya pekerjaan yang sedang berjalan di example_db:
USE example_db;
SHOW ROUTINE LOAD;Kueri pekerjaan tertentu berdasarkan database dan nama:
SHOW ROUTINE LOAD FOR example_db.test1;Konfigurasi sistem
Parameter FE dan BE berikut memengaruhi perilaku Routine Load. Semua parameter FE dapat dimodifikasi saat runtime.
Parameter | Cakupan | Default | Deskripsi |
| FE |
| Maksimum task konkuren yang dapat dijalankan sekaligus untuk pekerjaan Routine Load. Nilai default direkomendasikan. Mengatur terlalu tinggi dapat menghabiskan sumber daya kluster. |
| FE |
| Maksimum task konkuren pada setiap node BE. Nilai default direkomendasikan. |
| FE |
| Maksimum jumlah pekerjaan Routine Load (dalam status |
| BE |
| Maksimum konsumen yang dihasilkan per task. Misalnya, task yang mengonsumsi 6 partisi Kafka menghasilkan 3 konsumen, masing-masing menangani 2 partisi. |
| FE |
| Maksimum jumlah node BE yang boleh mati sebelum penjadwalan ulang pekerjaan otomatis diblokir. Nilai |
| FE |
| ApsaraDB for SelectDB mencoba menjadwalkan ulang pekerjaan yang dijeda hingga 3 kali dalam jendela ini. Setelah 3 kali gagal, pekerjaan dikunci dan memerlukan intervensi manual untuk dilanjutkan. |
Pertimbangan lainnya
Routine Load dan perubahan skema
Pekerjaan Routine Load tidak memblokir operasi
SCHEMA CHANGEatauROLLUP.Setelah
SCHEMA CHANGE, jika kolom sumber tidak lagi sesuai dengan tabel tujuan, jumlah baris error meningkat dan pekerjaan mungkin dijeda. Untuk mencegah hal ini, gunakan pemetaan kolom eksplisit dalam klausaCOLUMNSdan definisikan kolom tujuan sebagaiNULLABLEatau dengan nilaiDEFAULT.Jika partisi dihapus, pekerjaan dijeda karena partisi target tidak ditemukan.
Routine Load dan operasi tulis lainnya
Routine Load tidak bertentangan dengan operasi
LOADatauINSERT.Sebelum menjalankan operasi
DELETEpada tabel, jeda pekerjaan Routine Load untuk tabel tersebut dan tunggu hingga semua task yang sedang berjalan selesai.
Routine Load dan penghapusan tabel atau database
Jika tabel atau database tujuan dihapus, pekerjaan Routine Load secara otomatis dibatalkan.
Pembuatan topik Kafka otomatis
Jika topik yang ditentukan dalam CREATE ROUTINE LOAD tidak ada, broker Kafka mungkin membuatnya secara otomatis berdasarkan pengaturan auto.create.topics.enable:
true: Kafka membuat topik secara otomatis dengan partisinum.partitions. Pekerjaan langsung membacanya.false: Pekerjaan dijeda hingga topik dibuat dan data tersedia.
Persyaratan akses jaringan
Semua broker yang tercantum dalam
kafka_broker_listharus dapat dijangkau dari kluster ApsaraDB for SelectDB Anda.Jika
advertised.listenersdikonfigurasi di Kafka, alamat yang diiklankan juga harus dapat dijangkau.
STOPPED vs. PAUSED
Status | Perilaku |
| Pekerjaan dapat dilanjutkan dengan |
| Pekerjaan berakhir secara permanen dan dibersihkan oleh FE secara berkala. Tidak dapat dimulai ulang. |