Routine Load memungkinkan Anda menyerahkan pekerjaan impor yang berjalan terus-menerus untuk membaca dan mengimpor data secara kontinu dari sumber data tertentu ke dalam instans ApsaraDB for SelectDB. Topik ini menjelaskan cara menggunakan Routine Load untuk mengimpor data dari sumber data Kafka ke dalam instans ApsaraDB for SelectDB.
Prasyarat
Sumber data harus berupa sumber data Kafka. Pekerjaan Routine Load memungkinkan Anda mengakses kluster Kafka tanpa otentikasi atau kluster Kafka yang mendukung otentikasi PLAIN, SSL, atau Kerberos.
Pesan harus dalam format
CSVatauJSON. Dalam format CSV, setiap pesan ditampilkan sebagai satu baris tanpa jeda baris di akhir baris.
Catatan Penggunaan
Secara default, Kafka 0.10.0.0 dan versi lebih baru didukung. Jika Anda ingin menggunakan Kafka versi sebelum 0.10.0.0, seperti 0.9.0, 0.8.2, 0.8.1, atau 0.8.0, gunakan salah satu metode berikut:
Anda dapat mengatur nilai parameter
kafka_broker_version_fallbackdalam konfigurasi backend (BEs) ke versi Kafka sebelumnya yang ingin Anda gunakan.Anda juga dapat mengatur nilai parameter
property.broker.version.fallbackke versi Kafka sebelumnya saat membuat pekerjaan Routine Load.
Jika Anda menggunakan Kafka versi sebelum 0.10.0.0, beberapa fitur Routine Load mungkin tidak tersedia. Misalnya, Anda tidak dapat mengatur offset berbasis waktu untuk partisi Kafka.
Buat Pekerjaan Routine Load
Untuk menggunakan Routine Load, Anda harus membuat pekerjaan Routine Load. Pekerjaan Routine Load secara terus-menerus menjadwalkan tugas berdasarkan penjadwalan rutin. Setiap tugas mengonsumsi sejumlah pesan Kafka tertentu.
Sintaks
CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]Parameter
Parameter | Deskripsi |
| Nama pekerjaan Routine Load. Dalam database, jika beberapa pekerjaan memiliki nama yang sama, hanya satu dari pekerjaan tersebut yang dapat dijalankan pada satu waktu. |
| Nama tabel tujuan ke mana data akan diimpor. |
| Mode penggabungan data impor. Nilai default: |
| Parameter yang digunakan untuk memproses data yang diimpor. Untuk informasi lebih lanjut, lihat bagian Parameter load_properties dari topik ini. |
| Parameter yang terkait dengan pekerjaan Routine Load. Untuk informasi lebih lanjut, lihat bagian Parameter job_properties dari topik ini. |
| Tipe sumber data. Untuk informasi lebih lanjut, lihat bagian Parameter data_source_properties dari topik ini. |
Parameter load_properties
[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]Parameter | Contoh | Deskripsi |
| COLUMNS TERMINATED BY "," | Pemisah kolom. Nilai default: |
| (k1,k2,tmpk1,k3=tmpk1+1) | Pemetaan antara kolom dalam file yang diimpor dan kolom dalam tabel tujuan serta berbagai operasi konversi kolom. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber. |
| N/A | Kondisi untuk menyaring data sumber. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber. |
| WHERE k1>100 and k2=1000 | Kondisi untuk menyaring data yang diimpor. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber. |
| PARTITION(p1,p2,p3) | Partisi ke mana data diimpor dalam tabel tujuan. Jika Anda tidak menentukan partisi, data sumber akan diimpor secara otomatis ke partisi yang sesuai. |
| DELETE ON v3>100 | Pernyataan yang digunakan untuk menentukan kolom Delete Flag dalam data yang diimpor dan hubungan perhitungan. Catatan Parameter ini diperlukan jika parameter merge_type diatur ke MERGE. Parameter ini hanya valid untuk tabel yang menggunakan model Unique Key. |
| N/A | Pernyataan yang digunakan untuk menentukan kolom Sequence Col dalam data yang diimpor. Parameter ini digunakan untuk menjaga urutan data yang benar selama impor. Catatan Parameter ini hanya valid untuk tabel yang menggunakan model Unique Key. |
Parameter job_properties
PROPERTIES (
"key1" = "val1",
"key2" = "val2"
)Pekerjaan Routine Load dibagi menjadi beberapa tugas. Parameter max_batch_interval menentukan durasi eksekusi maksimum suatu tugas. Parameter max_batch_rows menentukan jumlah baris maksimum yang dapat dibaca oleh suatu tugas. Parameter max_batch_size menentukan jumlah byte maksimum yang dapat dibaca oleh suatu tugas. Jika salah satu ambang batas yang ditentukan oleh ketiga parameter tercapai, tugas berakhir.
Parameter | Contoh | Deskripsi |
| "desired_concurrent_number" = "3" | Jumlah maksimum tugas yang dapat berjalan secara bersamaan. Nilainya harus berupa bilangan bulat yang lebih besar dari 0. Nilai default: Catatan
|
| "max_batch_interval" = "20" | Durasi eksekusi maksimum setiap tugas. Satuan: detik. Nilai default: |
| "max_batch_rows" = "300000" | Jumlah baris maksimum yang dapat dibaca oleh setiap tugas. Nilai default: |
| "max_batch_size" = "209715200" | Jumlah byte maksimum yang dapat dibaca oleh setiap tugas. Satuan: byte. Nilai default: |
| "max_error_number"="3" | Jumlah baris kesalahan maksimum yang diizinkan dalam jendela sampling. Nilai default: Jendela sampling adalah sepuluh kali nilai parameter Catatan Baris yang disaring oleh kondisi |
| "strict_mode"="true" | Menentukan apakah mode ketat diaktifkan. Nilai default:
|
| "timezone" = "Africa/Abidjan" | Zona waktu yang digunakan untuk pekerjaan Routine Load. Secara default, zona waktu sesi digunakan. Catatan Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam pekerjaan Routine Load. |
| "format" = "json" | Format data yang diimpor. Nilai default: |
| -H "jsonpaths:[\"$.k2\",\"$.k1\"]" | Bidang yang akan diekstraksi dari data |
| -H "strip_outer_array:true" | Menentukan apakah data JSON ditampilkan sebagai array jika data yang diimpor dalam format |
| -H "json_root:$.RECORDS" | Root node data JSON jika data yang diimpor dalam format JSON. |
| N/A | Jumlah maksimum pekerjaan bersamaan untuk mengirim data untuk pemrosesan batch. Jika nilai parameter ini lebih besar dari nilai parameter |
| N/A | Menentukan apakah data diimpor hanya ke satu tablet dari partisi. Nilai default: false. Parameter ini tersedia hanya jika data diimpor ke tabel yang menggunakan model Duplicate Key dan berisi partisi acak. |
Hubungan antara mode ketat dan data sumber yang akan diimpor
Dalam contoh ini, kolom bertipe TINYLNT akan diimpor. Tabel berikut menjelaskan hubungan antara mode ketat dan data sumber jika sistem mengizinkan nilai kolom NULL untuk diimpor.
Data Sumber | Contoh | STRING ke INT | Mode Ketat | Hasil |
NULL | \N | N/A | true atau false | NULL |
NOT NULL | aaa atau 2000 | NULL | true | Data tidak valid (disaring) |
NOT NULL | aaa | NULL | false | NULL |
NOT NULL | 1 | 1 | true atau false | Data benar |
Dalam contoh ini, kolom bertipe DECIMAL(1,0) akan diimpor. Tabel berikut menjelaskan hubungan antara mode ketat dan data sumber jika sistem mengizinkan nilai kolom NULL untuk diimpor.
Data Sumber | Contoh | STRING ke INT | Mode Ketat | Hasil |
NULL | \N | N/A | true atau false | NULL |
NOT NULL | aaa | NULL | true | Data tidak valid (disaring) |
NOT NULL | aaa | NULL | false | NULL |
NOT NULL | 1 atau 10 | 1 | true atau false | Data benar |
Nilai 10 melebihi rentang yang diizinkan untuk tipe DECIMAL(1,0). Namun, nilai 10 tidak disaring dalam mode ketat karena nilai 10 memenuhi persyaratan tipe DECIMAL. Nilai 10 akhirnya disaring dalam proses ekstrak, transformasi, dan muat (ETL).
Parameter data_source_properties
FROM KAFKA
(
"key1" = "val1",
"key2" = "val2"
)Parameter | Deskripsi |
| Konfigurasi yang digunakan untuk terhubung ke broker dalam kluster Kafka. Format: Contoh: |
| Topik Kafka yang ingin Anda langgani. Format: |
| Partisi Kafka yang ingin Anda langgani dan offset awal setiap partisi. Jika Anda menentukan titik waktu tertentu, konsumsi data dimulai dari offset terbaru yang lebih besar dari atau sama dengan titik waktu tersebut. Anda dapat menentukan offset yang lebih besar dari atau sama dengan 0. Atau, atur parameter kafka_offsets ke salah satu nilai berikut:
Jika Anda tidak menentukan parameter ini, sistem berlangganan semua partisi dalam topik dari Contoh: Penting Format waktu tidak dapat dicampur dengan format offset. |
| Parameter Kafka kustom. Parameter ini setara dengan parameter --property dalam shell Kafka. Jika nilai parameter ini adalah file, Anda perlu menambahkan kata kunci |
Parameter properti
Jika Anda terhubung ke kluster Kafka menggunakan metode otentikasi SSL, Anda harus mengonfigurasi parameter berikut:
"property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg"Parameter
property.security.protocoldanproperty.ssl.ca.locationdiperlukan untuk menentukan metode yang digunakan untuk terhubung ke kluster Kafka dan lokasi sertifikat Otoritas Sertifikat (CA).Jika mode otentikasi klien diaktifkan untuk server Kafka, Anda harus mengonfigurasi parameter berikut:
"property.ssl.certificate.location" "property.ssl.key.location" "property.ssl.key.password"Parameter di atas digunakan untuk menentukan lokasi sertifikat kunci publik klien, lokasi file kunci privat klien, dan kata sandi untuk mengakses kunci privat klien.
Tentukan offset awal default partisi Kafka.
Secara default, jika parameter
kafka_partitions/kafka_offsetstidak ditentukan, data dari semua partisi dikonsumsi. Dalam hal ini, Anda dapat mengonfigurasi parameterkafka_default_offsetsuntuk menentukan offset awal setiap partisi. Nilai default:OFFSET_END, yang menunjukkan bahwa partisi dilanggan dari offset akhir."property.kafka_default_offsets" = "OFFSET_BEGINNING"
Untuk informasi lebih lanjut tentang parameter kustom yang didukung, lihat item konfigurasi klien dalam dokumen konfigurasi resmi librdkafka. Misalnya, parameter kustom berikut tersedia:
"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"Contoh
Buat Pekerjaan Routine Load
Buat tabel ke dalam mana Anda ingin mengimpor data dalam instans ApsaraDB for SelectDB. Contoh kode:
CREATE TABLE test_table ( id int, name varchar(50), age int, address varchar(50), url varchar(500) ) UNIQUE KEY(`id`, `name`) DISTRIBUTED BY HASH(id) BUCKETS 4 PROPERTIES("replication_num" = "1");Konfigurasikan parameter dalam contoh kode berikut untuk mengimpor data.
Buat pekerjaan Routine Load bernama test1 untuk tabel test_table dalam database example_db. Tentukan ID grup, ID klien, dan pemisah kolom, aktifkan sistem untuk secara otomatis mengonsumsi data dari semua partisi secara default, dan berlangganan partisi dari offset awal di mana data tersedia. Contoh kode:
CREATE ROUTINE LOAD example_db.test1 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Buat pekerjaan Routine Load bernama test2 untuk tabel test_table dalam database example_db. Aktifkan mode ketat untuk pekerjaan tersebut. Contoh kode:
CREATE ROUTINE LOAD example_db.test2 ON test_table COLUMNS TERMINATED BY ",", COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "property.kafka_default_offsets" = "OFFSET_BEGINNING" );Konsumsi data partisi dari titik waktu tertentu. Contoh kode:
CREATE ROUTINE LOAD example_db.test4 ON test_table PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "30", "max_batch_rows" = "300000", "max_batch_size" = "209715200" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.kafka_default_offset" = "2024-01-21 10:00:00" );
Impor Data JSON
Anda dapat menggunakan Routine Load untuk mengimpor data JSON dari dua jenis berikut:
Data JSON hanya berisi satu catatan data dan merupakan objek JSON.
Jika Anda menggunakan mode impor tabel tunggal, data JSON dalam format berikut. Dalam mode ini, nama tabel ditentukan dengan menjalankan pernyataan ON TABLE_NAME.
{"key1":"value1","key2":"value2","key3":"value3"}Jika Anda menggunakan mode dinamis atau multi-tabel, data JSON dalam format berikut. Dalam mode ini, nama tabel tidak ditentukan.
table_name|{"key1":"value1","key2":"value2","key3":"value3"}Data JSON adalah array yang berisi beberapa catatan data.
Jika Anda menggunakan mode impor tabel tunggal, data JSON dalam format berikut. Dalam mode ini, nama tabel ditentukan dengan menjalankan pernyataan ON TABLE_NAME.
[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]Jika Anda menggunakan mode dinamis atau multi-tabel, data JSON dalam format berikut. Dalam mode ini, nama tabel tidak ditentukan.
table_name|[ { "key1":"value11", "key2":"value12", "key3":"value13", "key4":14 }, { "key1":"value21", "key2":"value22", "key3":"value23", "key4":24 }, { "key1":"value31", "key2":"value32", "key3":"value33", "key4":34 } ]
Impor data JSON.
Buat tabel ke dalam mana Anda ingin mengimpor data dalam instans ApsaraDB for SelectDB. Contoh kode:
CREATE TABLE `example_tbl` ( `category` varchar(24) NULL COMMENT "", `author` varchar(24) NULL COMMENT "", `timestamp` bigint(20) NULL COMMENT "", `dt` int(11) NULL COMMENT "", `price` double REPLACE ) ENGINE=OLAP AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`) COMMENT "OLAP" PARTITION BY RANGE(`dt`) ( PARTITION p0 VALUES [("-2147483648"), ("20230509")), PARTITION p20200509 VALUES [("20230509"), ("20231010")), PARTITION p20200510 VALUES [("20231010"), ("20231211")), PARTITION p20200511 VALUES [("20231211"), ("20240512")) ) DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;Impor data JSON dari dua jenis di atas ke dalam topik. Contoh kode:
{ "category":"value1331", "author":"value1233", "timestamp":1700346050, "price":1413 }[ { "category":"value13z2", "author":"vaelue13", "timestamp":1705645251, "price":14330 }, { "category":"lvalue211", "author":"lvalue122", "timestamp":1684448450, "price":24440 } ]Impor data JSON dalam mode berbeda.
Impor data JSON dalam mode sederhana. Contoh kode:
CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl COLUMNS(category,price,author) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );Impor data JSON secara akurat. Contoh kode:
CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d')) PROPERTIES ( "desired_concurrent_number"="3", "max_batch_interval" = "20", "max_batch_rows" = "300000", "max_batch_size" = "209715200", "strict_mode" = "false", "format" = "json", "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]", "strip_outer_array" = "true" ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092", "kafka_topic" = "my_topic", "kafka_partitions" = "0,1,2", "kafka_offsets" = "0,0,0" );CatatanBidang partisi
dtdalam tabel tidak tersedia dalam data sampel. Nilai bidang dt dihasilkan dengan menggunakan konfigurasidt=from_unixtime(timestamp,'%Y%m%d')dalam pernyataan CREATE ROUTINE LOAD.
Akses Kluster Kafka yang Menggunakan Metode Otentikasi Berbeda
Contoh berikut menunjukkan cara mengakses kluster Kafka berdasarkan metode otentikasi kluster Kafka.
Akses kluster Kafka yang memiliki metode otentikasi SSL diaktifkan.
Untuk mengakses kluster Kafka yang memiliki metode otentikasi SSL diaktifkan, Anda harus menyediakan file sertifikat (ca.pem) yang digunakan untuk mengotentikasi kunci publik broker Kafka. Jika mode otentikasi klien diaktifkan untuk kluster Kafka, sertifikat kunci publik (client.pem), file kunci privat (client.key), dan kata sandi kunci privat klien juga diperlukan. Anda perlu mengunggah file yang diperlukan ke ApsaraDB for SelectDB terlebih dahulu dengan menjalankan pernyataan
CREATE FILE. Katalog diberi nama kafka.Unggah file. Contoh kode:
CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka"); CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka"); CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");Buat pekerjaan Routine Load. Contoh kode:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1" ) FROM KAFKA ( "kafka_broker_list"= "broker1:9091,broker2:9091", "kafka_topic" = "my_topic", "property.security.protocol" = "ssl", "property.ssl.ca.location" = "FILE:ca.pem", "property.ssl.certificate.location" = "FILE:client.pem", "property.ssl.key.location" = "FILE:client.key", "property.ssl.key.password" = "abcdefg" );CatatanApsaraDB for SelectDB mengakses kluster Kafka menggunakan pustaka klien Kafka C++
librdkafka. Untuk informasi lebih lanjut tentang parameter yang didukung oleh librdkafka, lihat dokumen Properti Konfigurasi darilibrdkafka.
Akses kluster Kafka yang memiliki metode otentikasi PLAIN diaktifkan.
Untuk mengakses kluster Kafka yang memiliki metode otentikasi PLAIN diaktifkan, Anda harus menambahkan konfigurasi berikut:
property.security.protocol=SASL_PLAINTEXT: Gunakan metode otentikasi teks biasa Lapisan Otentikasi dan Keamanan Sederhana (SASL).
property.sasl.mechanism=PLAIN: Tetapkan metode otentikasi SASL ke PLAIN.
property.sasl.username=admin: Tentukan nama pengguna SASL.
property.sasl.password=admin: Tentukan kata sandi SASL.
Buat pekerjaan Routine Load. Contoh kode:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol"="SASL_PLAINTEXT", "property.sasl.mechanism"="PLAIN", "property.sasl.username"="admin", "property.sasl.password"="admin" );Akses kluster Kafka yang memiliki metode otentikasi Kerberos diaktifkan.
Untuk mengakses kluster Kafka yang memiliki metode otentikasi Kerberos diaktifkan, Anda harus menambahkan konfigurasi berikut:
security.protocol=SASL_PLAINTEXT: Gunakan metode otentikasi teks biasa SASL.
sasl.kerberos.service.name=$SERVICENAME: Tentukan nama layanan broker.
sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab: Tentukan jalur file .keytab lokal.
sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}: Tentukan principal Kerberos yang digunakan oleh ApsaraDB for SelectDB untuk terhubung ke kluster Kafka.
Buat pekerjaan Routine Load. Contoh kode:
CREATE ROUTINE LOAD db1.job1 on tbl1 PROPERTIES ( "desired_concurrent_number"="1", ) FROM KAFKA ( "kafka_broker_list" = "broker1:9092,broker2:9092", "kafka_topic" = "my_topic", "property.security.protocol" = "SASL_PLAINTEXT", "property.sasl.kerberos.service.name" = "kafka", "property.sasl.kerberos.keytab" = "/etc/krb5.keytab", "property.sasl.kerberos.principal" = "id@your.com" );CatatanUntuk mengaktifkan ApsaraDB for SelectDB agar dapat mengakses kluster Kafka yang memiliki metode otentikasi Kerberos diaktifkan, Anda perlu menerapkan klien Kerberos kinit pada semua node yang sedang berjalan dalam kluster ApsaraDB for SelectDB, mengonfigurasi file krb5.conf, dan menentukan informasi layanan Pusat Distribusi Kunci (KDC).
Tetapkan parameter
property.sasl.kerberos.keytabke jalur mutlak file .keytab lokal, dan izinkan proses ApsaraDB for SelectDB untuk mengakses file .keytab lokal.
Modifikasi Pekerjaan Routine Load
Anda dapat memodifikasi pekerjaan Routine Load yang ada dan berada dalam status PAUSED.
Sintaks
ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]Parameter
Parameter | Deskripsi |
[db.]job_name | Nama pekerjaan yang akan dimodifikasi. |
tbl_name | Nama tabel ke mana data akan diimpor. |
job_properties | Parameter pekerjaan yang akan dimodifikasi. Hanya parameter berikut yang dapat dimodifikasi:
|
data_source | Tipe sumber data. Tetapkan parameter ini ke |
data_source_properties | Parameter sumber data. Hanya parameter berikut yang didukung:
Catatan Parameter |
Contoh
Ubah nilai parameter
desired_concurrent_numbermenjadi 1. Contoh kode:ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "1" );Ubah nilai parameter
desired_concurrent_numbermenjadi 10, dan modifikasi offset partisi serta ID grup. Contoh kode:ALTER ROUTINE LOAD FOR db1.label1 PROPERTIES ( "desired_concurrent_number" = "10" ) FROM kafka ( "kafka_partitions" = "0, 1, 2", "kafka_offsets" = "100, 200, 100", "property.group.id" = "new_group" );
Jeda Pekerjaan Routine Load
Anda dapat menjeda pekerjaan Routine Load dengan menjalankan pernyataan PAUSE, dan melanjutkan pekerjaan yang dijeda dengan menjalankan pernyataan RESUME.
Sintaks
PAUSE [ALL] ROUTINE LOAD FOR <job_name>;Parameter
Parameter | Deskripsi |
[db.]job_name | Nama pekerjaan yang akan dijeda. |
Contoh
Jalankan pernyataan berikut untuk menjeda pekerjaan Routine Load bernama test1:
PAUSE ROUTINE LOAD FOR test1;Jalankan pernyataan berikut untuk menjeda semua pekerjaan Routine Load:
PAUSE ALL ROUTINE LOAD;
Lanjutkan Pekerjaan Routine Load
Anda dapat melanjutkan pekerjaan Routine Load yang dijeda. Pekerjaan yang dilanjutkan akan terus mengonsumsi data dari offset terakhir yang dikonsumsi.
Sintaks
RESUME [ALL] ROUTINE LOAD FOR <job_name>Parameter
Parameter | Deskripsi |
[db.]job_name | Nama pekerjaan Routine Load yang akan dilanjutkan. |
Contoh
Jalankan pernyataan berikut untuk melanjutkan pekerjaan Routine Load bernama test1:
RESUME ROUTINE LOAD FOR test1;Jalankan pernyataan berikut untuk melanjutkan semua pekerjaan Routine Load:
RESUME ALL ROUTINE LOAD;
Hentikan Pekerjaan Routine Load
Anda dapat menghentikan pekerjaan Routine Load. Pekerjaan Routine Load yang dihentikan tidak dapat dimulai kembali. Setelah pekerjaan Routine Load dihentikan, data yang telah diimpor tidak dapat dikembalikan.
Sintaks
STOP ROUTINE LOAD FOR <job_name>;Parameter
Parameter | Deskripsi |
[db.]job_name | Nama pekerjaan yang akan dihentikan. |
Contoh
Jalankan pernyataan berikut untuk menghentikan pekerjaan Routine Load bernama test1:
STOP ROUTINE LOAD FOR test1;Kueri satu atau lebih Pekerjaan Routine Load
Anda dapat menjalankan pernyataan SHOW ROUTINE LOAD untuk menanyakan status satu atau lebih pekerjaan Routine Load.
Sintaks
SHOW [ALL] ROUTINE LOAD [FOR job_name];Parameter
Parameter | Deskripsi |
[db.]job_name | Nama pekerjaan yang ingin Anda tanyakan. |
Jika data yang diimpor dalam format tidak valid, informasi kesalahan detail dicatat dalam nilai parameter ErrorLogUrls. Nilai parameter ErrorLogUrls berisi beberapa URL. Anda dapat menyalin salah satu URL untuk menanyakan informasi kesalahan di browser.
Contoh
Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load bernama test1, termasuk pekerjaan yang dihentikan dan dibatalkan. Output hasil menampilkan setiap pekerjaan pada baris terpisah dan mungkin terdiri dari satu atau lebih baris tergantung jumlah pekerjaan.
SHOW ALL ROUTINE LOAD FOR test1;Jalankan pernyataan berikut untuk menanyakan pekerjaan Routine Load yang sedang berlangsung bernama test1:
SHOW ROUTINE LOAD FOR test1;Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load dalam database example_db, termasuk pekerjaan yang dihentikan dan dibatalkan. Output hasil menampilkan setiap pekerjaan pada baris terpisah dan mungkin terdiri dari satu atau lebih baris tergantung jumlah pekerjaan.
use example_db; SHOW ALL ROUTINE LOAD;Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load yang sedang berlangsung dalam database example_db:
use example_db; SHOW ROUTINE LOAD;Jalankan pernyataan berikut untuk menanyakan pekerjaan Routine Load yang sedang berlangsung bernama test1 dalam database example_db:
SHOW ROUTINE LOAD FOR example_db.test1;Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load bernama test1 dalam database example_db, termasuk pekerjaan yang dihentikan dan dibatalkan. Output hasil menampilkan setiap pekerjaan pada baris terpisah dan mungkin terdiri dari satu atau lebih baris tergantung jumlah pekerjaan.
SHOW ALL ROUTINE LOAD FOR example_db.test1;
Konfigurasi sistem terkait
Konfigurasi sistem terkait memengaruhi penggunaan Routine Load.
max_routine_load_task_concurrent_numParameter frontend (FE). Nilai default: 5. Anda dapat memodifikasi parameter ini saat runtime. Parameter ini menentukan jumlah maksimum tugas yang dapat berjalan secara bersamaan pada satu waktu untuk pekerjaan Routine Load. Kami merekomendasikan Anda menggunakan nilai default. Jika parameter ini disetel ke nilai besar, jumlah tugas bersamaan mungkin berlebihan dan menghabiskan banyak sumber daya kluster.
max_routine_load_task_num_per_beParameter FE. Nilai default: 5. Anda dapat memodifikasi parameter ini saat runtime. Parameter ini menentukan jumlah maksimum tugas yang dapat berjalan secara bersamaan pada satu waktu di setiap node BE. Kami merekomendasikan Anda menggunakan nilai default. Jika parameter ini disetel ke nilai besar, jumlah tugas bersamaan mungkin berlebihan dan menghabiskan banyak sumber daya kluster.
max_routine_load_job_numParameter FE. Nilai default: 100. Anda dapat memodifikasi parameter ini saat runtime. Parameter ini menentukan jumlah maksimum pekerjaan Routine Load yang dapat Anda kirimkan, termasuk pekerjaan dalam status NEED_SCHEDULED, RUNNING, atau PAUSED. Jika total jumlah pekerjaan Routine Load yang Anda kirimkan mencapai nilai maksimum, tidak ada pekerjaan lain yang dapat dikirimkan.
max_consumer_num_per_groupParameter BE. Nilai default: 3. Parameter ini menentukan jumlah maksimum konsumen yang dapat dihasilkan untuk mengonsumsi data dalam suatu tugas. Untuk sumber data Kafka, satu konsumen mungkin mengonsumsi data dari satu atau lebih partisi Kafka. Jika suatu tugas mengonsumsi data dari enam partisi Kafka, tiga konsumen dihasilkan. Setiap konsumen mengonsumsi data dari dua partisi. Jika hanya dua partisi yang ada, hanya dua konsumen yang dihasilkan, dan setiap konsumen mengonsumsi data dari satu partisi.
max_tolerable_backend_down_numParameter FE. Nilai default: 0. Jika kondisi tertentu terpenuhi, ApsaraDB for SelectDB menjadwalkan ulang pekerjaan dalam status PAUSED. Kemudian, status pekerjaan yang dijadwalkan ulang berubah menjadi RUNNING. Nilai 0 menunjukkan bahwa pekerjaan hanya dapat dijadwalkan ulang jika semua node BE hidup.
period_of_auto_resume_minParameter FE memiliki nilai default 5 menit, yang berarti ApsaraDB for SelectDB akan menjadwalkan ulang pekerjaan hingga tiga kali dalam jangka waktu tersebut. Jika pekerjaan gagal dijadwalkan ulang sebanyak tiga kali, pekerjaan tersebut akan terkunci dan tidak dijadwalkan ulang lagi. Intervensi manual dapat dilakukan untuk melanjutkan pekerjaan.
Deskripsi Lainnya
Hubungan antara pekerjaan Routine Load dan operasi ALTER TABLE
Pekerjaan Routine Load tidak memblokir operasi SCHEMA CHANGE atau ROLLUP. Namun, jika kolom dalam data sumber tidak sesuai dengan kolom dalam tabel tujuan setelah operasi SCHEMA CHANGE, jumlah data kesalahan akan meningkat dan pekerjaan akhirnya dijeda. Untuk mencegah masalah ini, disarankan untuk secara eksplisit menentukan pemetaan kolom dalam pekerjaan Routine Load serta menggunakan kolom NULLABLE atau kolom dengan batasan DEFAULT.
Jika partisi tabel dihapus, data mungkin gagal diimpor karena partisi tidak ditemukan, sehingga menyebabkan pekerjaan dijeda.
Hubungan antara pekerjaan Routine Load dan operasi LOAD, DELETE, dan INSERT
Pekerjaan Routine Load tidak bertentangan dengan operasi LOAD atau INSERT.
Untuk melakukan operasi DELETE pada tabel, pastikan tidak ada data yang sedang diimpor ke partisi tabel terkait. Oleh karena itu, sebelum menjalankan operasi DELETE, jeda pekerjaan Routine Load dan tunggu hingga semua tugas yang telah ditugaskan selesai.
Hubungan antara pekerjaan Routine Load dan operasi DROP DATABASE atau DROP TABLE
Jika database atau tabel yang menjadi tujuan impor pekerjaan Routine Load dihapus, pekerjaan tersebut secara otomatis dibatalkan.
Hubungan antara pekerjaan Routine Load untuk kluster Kafka dan topik Kafka
Jika
Kafka topicyang didefinisikan dalam pernyataan CREATE ROUTINE LOAD tidak tersedia di kluster Kafka, broker Kafka dapat secara otomatis membuat topik berdasarkan pengaturan parameter auto.create.topics.enable.Jika parameter
auto.create.topics.enablediatur ke true untuk broker Kafka, topik akan dibuat secara otomatis. Jumlah partisi yang dibuat ditentukan oleh parameter num.partitions broker Kafka. Pekerjaan Routine Load kemudian akan terus membaca data dari topik tersebut.Jika parameter
auto.create.topics.enablediatur ke false untuk broker Kafka, topik tidak akan dibuat secara otomatis. Dalam hal ini, pekerjaan Routine Load dijeda hingga data tersedia.
Oleh karena itu, jika Anda ingin topik dibuat secara otomatis, atur parameter
auto.create.topics.enableke true untuk broker dalam kluster Kafka.Pertimbangan untuk isolasi blok CIDR dan resolusi nama domain dalam lingkungan
Broker yang ditentukan saat membuat pekerjaan Routine Load harus dapat diakses oleh ApsaraDB for SelectDB.
Jika parameter
advertised.listenersdikonfigurasikan di Kafka, alamat dalam nilai parameteradvertised.listenersharus dapat diakses oleh ApsaraDB for SelectDB.
Tentukan partisi dan offset untuk konsumsi data.
Tentukan partisi dan offset untuk konsumsi data
kafka_partitions: partisi yang datanya akan dikonsumsi. Contoh: "0,1,2,3".kafka_offsets: offset awal setiap partisi. Jumlah offset yang ditentukan untuk parameter ini harus sama dengan jumlah partisi yang ditentukan untuk parameterkafka_partitions. Contoh: "1000,1000,2000,2000".property.kafka_default_offset: offset awal default partisi.
Saat membuat pekerjaan Routine Load, Anda dapat menggabungkan ketiga parameter tersebut menggunakan salah satu dari lima metode yang dijelaskan dalam tabel berikut.
Metode
kafka_partitions
kafka_offsets
property.kafka_default_offset
Perilaku
1
Tidak
Tidak
Tidak
Sistem secara otomatis mencari semua partisi topik Kafka dan mulai mengonsumsi data dari offset akhir partisi.
2
Tidak
Tidak
Ya
Sistem secara otomatis mencari semua partisi topik Kafka dan mulai mengonsumsi data dari offset default.
3
Ya
Tidak
Tidak
Sistem mulai mengonsumsi data dari offset akhir partisi yang ditentukan.
4
Ya
Ya
Tidak
Sistem mulai mengonsumsi data dari offset yang ditentukan dari partisi yang ditentukan.
5
Ya
Tidak
Ya
Sistem mulai mengonsumsi data dari offset default partisi yang ditentukan.
Perbedaan antara status STOPPED dan PAUSED
FE secara berkala membersihkan pekerjaan Routine Load dalam status STOPPED. Pekerjaan Routine Load dalam status PAUSED dapat dilanjutkan.