全部产品
Search
文档中心

ApsaraDB for SelectDB:Gunakan Routine Load untuk mengimpor data

更新时间:Jul 30, 2025

Routine Load memungkinkan Anda menyerahkan pekerjaan impor yang berjalan terus-menerus untuk membaca dan mengimpor data secara kontinu dari sumber data tertentu ke dalam instans ApsaraDB for SelectDB. Topik ini menjelaskan cara menggunakan Routine Load untuk mengimpor data dari sumber data Kafka ke dalam instans ApsaraDB for SelectDB.

Prasyarat

  • Sumber data harus berupa sumber data Kafka. Pekerjaan Routine Load memungkinkan Anda mengakses kluster Kafka tanpa otentikasi atau kluster Kafka yang mendukung otentikasi PLAIN, SSL, atau Kerberos.

  • Pesan harus dalam format CSV atau JSON. Dalam format CSV, setiap pesan ditampilkan sebagai satu baris tanpa jeda baris di akhir baris.

Catatan Penggunaan

Secara default, Kafka 0.10.0.0 dan versi lebih baru didukung. Jika Anda ingin menggunakan Kafka versi sebelum 0.10.0.0, seperti 0.9.0, 0.8.2, 0.8.1, atau 0.8.0, gunakan salah satu metode berikut:

  • Anda dapat mengatur nilai parameter kafka_broker_version_fallback dalam konfigurasi backend (BEs) ke versi Kafka sebelumnya yang ingin Anda gunakan.

  • Anda juga dapat mengatur nilai parameter property.broker.version.fallback ke versi Kafka sebelumnya saat membuat pekerjaan Routine Load.

Catatan

Jika Anda menggunakan Kafka versi sebelum 0.10.0.0, beberapa fitur Routine Load mungkin tidak tersedia. Misalnya, Anda tidak dapat mengatur offset berbasis waktu untuk partisi Kafka.

Buat Pekerjaan Routine Load

Untuk menggunakan Routine Load, Anda harus membuat pekerjaan Routine Load. Pekerjaan Routine Load secara terus-menerus menjadwalkan tugas berdasarkan penjadwalan rutin. Setiap tugas mengonsumsi sejumlah pesan Kafka tertentu.

Sintaks

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan Routine Load. Dalam database, jika beberapa pekerjaan memiliki nama yang sama, hanya satu dari pekerjaan tersebut yang dapat dijalankan pada satu waktu.

tbl_name

Nama tabel tujuan ke mana data akan diimpor.

merge_type

Mode penggabungan data impor. Nilai default: APPEND, yang menentukan bahwa impor adalah operasi tambah standar. Anda dapat mengatur parameter ini ke MERGE atau DELETE hanya untuk tabel yang menggunakan model Unique Key. Jika parameter ini diatur ke MERGE, pernyataan DELETE ON harus digunakan untuk menentukan kolom yang berfungsi sebagai kolom Delete Flag. Jika parameter ini diatur ke DELETE, semua data yang diimpor akan dihapus dari tabel tujuan.

load_properties

Parameter yang digunakan untuk memproses data yang diimpor. Untuk informasi lebih lanjut, lihat bagian Parameter load_properties dari topik ini.

job_properties

Parameter yang terkait dengan pekerjaan Routine Load. Untuk informasi lebih lanjut, lihat bagian Parameter job_properties dari topik ini.

data_source_properties

Tipe sumber data. Untuk informasi lebih lanjut, lihat bagian Parameter data_source_properties dari topik ini.

Parameter load_properties

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

Parameter

Contoh

Deskripsi

column_separator

COLUMNS TERMINATED BY ","

Pemisah kolom. Nilai default: \t.

columns_mapping

(k1,k2,tmpk1,k3=tmpk1+1)

Pemetaan antara kolom dalam file yang diimpor dan kolom dalam tabel tujuan serta berbagai operasi konversi kolom. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber.

preceding_filter

N/A

Kondisi untuk menyaring data sumber. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber.

where_predicates

WHERE k1>100 and k2=1000

Kondisi untuk menyaring data yang diimpor. Untuk informasi lebih lanjut, lihat Mengonversi Data Sumber.

partitions

PARTITION(p1,p2,p3)

Partisi ke mana data diimpor dalam tabel tujuan. Jika Anda tidak menentukan partisi, data sumber akan diimpor secara otomatis ke partisi yang sesuai.

DELETE ON

DELETE ON v3>100

Pernyataan yang digunakan untuk menentukan kolom Delete Flag dalam data yang diimpor dan hubungan perhitungan.

Catatan

Parameter ini diperlukan jika parameter merge_type diatur ke MERGE. Parameter ini hanya valid untuk tabel yang menggunakan model Unique Key.

ORDER BY

N/A

Pernyataan yang digunakan untuk menentukan kolom Sequence Col dalam data yang diimpor. Parameter ini digunakan untuk menjaga urutan data yang benar selama impor.

Catatan

Parameter ini hanya valid untuk tabel yang menggunakan model Unique Key.

Parameter job_properties

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
Catatan

Pekerjaan Routine Load dibagi menjadi beberapa tugas. Parameter max_batch_interval menentukan durasi eksekusi maksimum suatu tugas. Parameter max_batch_rows menentukan jumlah baris maksimum yang dapat dibaca oleh suatu tugas. Parameter max_batch_size menentukan jumlah byte maksimum yang dapat dibaca oleh suatu tugas. Jika salah satu ambang batas yang ditentukan oleh ketiga parameter tercapai, tugas berakhir.

Parameter

Contoh

Deskripsi

desired_concurrent_number

"desired_concurrent_number" = "3"

Jumlah maksimum tugas yang dapat berjalan secara bersamaan. Nilainya harus berupa bilangan bulat yang lebih besar dari 0. Nilai default: 3. Pekerjaan Routine Load dibagi menjadi beberapa tugas. Parameter ini menentukan jumlah maksimum tugas yang dapat berjalan secara bersamaan untuk suatu pekerjaan.

Catatan
  1. Jumlah tugas yang sebenarnya berjalan secara bersamaan mungkin tidak mencapai ambang batas yang ditentukan. Jumlah sebenarnya bervariasi berdasarkan jumlah node dalam kluster, beban kerja, dan sumber data.

  2. Jika Anda meningkatkan konkurensi, pekerjaan Routine Load dapat dipercepat berdasarkan kluster terdistribusi. Namun, jika konkurensi terlalu besar, sejumlah besar file berukuran kecil mungkin ditulis. Kami merekomendasikan Anda mengatur parameter ini ke Jumlah inti dalam kluster/16.

max_batch_interval

"max_batch_interval" = "20"

Durasi eksekusi maksimum setiap tugas. Satuan: detik. Nilai default: 10. Nilai valid: 5 hingga 60.

max_batch_rows

"max_batch_rows" = "300000"

Jumlah baris maksimum yang dapat dibaca oleh setiap tugas. Nilai default: 200000. Nilainya harus lebih besar dari atau sama dengan 200000.

max_batch_size

"max_batch_size" = "209715200"

Jumlah byte maksimum yang dapat dibaca oleh setiap tugas. Satuan: byte. Nilai default: 104857600, yaitu 100 MB. Nilai valid berkisar antara 100 MB hingga 1 GB.

max_error_number

"max_error_number"="3"

Jumlah baris kesalahan maksimum yang diizinkan dalam jendela sampling. Nilai default: 0, yang menentukan bahwa tidak ada baris kesalahan yang diizinkan. Nilainya harus berupa bilangan bulat yang lebih besar dari 0.

Jendela sampling adalah sepuluh kali nilai parameter max_batch_rows. Jika jumlah baris kesalahan dalam jendela sampling lebih besar dari ambang batas, pekerjaan Routine Load akan dijeda dan intervensi manual diperlukan untuk memeriksa kualitas data.

Catatan

Baris yang disaring oleh kondisi WHERE tidak dianggap sebagai baris kesalahan.

strict_mode

"strict_mode"="true"

Menentukan apakah mode ketat diaktifkan. Nilai default: false. Dalam mode ketat, jika nilai NOT NULL dari kolom sumber dikonversi menjadi nilai NULL dari kolom tujuan yang sesuai setelah konversi tipe kolom, nilai tersebut disaring. Jika mode ketat diaktifkan, data setelah konversi tipe kolom disaring secara ketat selama proses impor berdasarkan aturan berikut:

  • Data kesalahan disaring setelah konversi tipe kolom. Data kesalahan merujuk pada data NULL yang dihasilkan dalam kolom tujuan dari data NOT NULL kolom sumber setelah konversi tipe kolom.

  • Mode ketat tidak berlaku untuk kolom tujuan yang nilainya dihasilkan oleh fungsi.

  • Jika kolom tujuan membatasi nilai ke rentang tertentu dan nilai kolom sumber mendukung konversi tipe tetapi nilai yang dikonversi tidak termasuk dalam rentang tersebut, mode ketat tidak berlaku untuk kolom tujuan. Misalnya, nilai kolom sumber adalah 10 dan kolom tujuan bertipe DECIMAL(1,0). Nilai 10 dapat dikonversi tetapi nilai yang dikonversi tidak termasuk dalam rentang yang ditentukan untuk kolom tujuan. Dalam hal ini, mode ketat tidak berlaku untuk kolom tujuan.

timezone

"timezone" = "Africa/Abidjan"

Zona waktu yang digunakan untuk pekerjaan Routine Load. Secara default, zona waktu sesi digunakan.

Catatan

Parameter ini memengaruhi hasil semua fungsi terkait zona waktu yang terlibat dalam pekerjaan Routine Load.

format

"format" = "json"

Format data yang diimpor. Nilai default: CSV. Format JSON juga didukung.

jsonpaths

-H "jsonpaths:[\"$.k2\",\"$.k1\"]"

Bidang yang akan diekstraksi dari data JSON jika data yang diimpor dalam format JSON.

strip_outer_array

-H "strip_outer_array:true"

Menentukan apakah data JSON ditampilkan sebagai array jika data yang diimpor dalam format JSON. Jika parameter strip_outer_array diatur ke true, data JSON ditampilkan sebagai array. Setiap elemen dalam data JSON dianggap sebagai baris data. Nilai default: false.

json_root

-H "json_root:$.RECORDS"

Root node data JSON jika data yang diimpor dalam format JSON. ApsaraDB for SelectDB mengekstraksi dan mengurai elemen root node yang ditentukan oleh parameter json_root. Secara default, parameter ini dibiarkan kosong.

send_batch_parallelism

N/A

Jumlah maksimum pekerjaan bersamaan untuk mengirim data untuk pemrosesan batch. Jika nilai parameter ini lebih besar dari nilai parameter max_send_batch_parallelism_per_job dari konfigurasi BE, BE menggunakan nilai parameter max_send_batch_parallelism_per_job.

load_to_single_tablet

N/A

Menentukan apakah data diimpor hanya ke satu tablet dari partisi. Nilai default: false. Parameter ini tersedia hanya jika data diimpor ke tabel yang menggunakan model Duplicate Key dan berisi partisi acak.

Hubungan antara mode ketat dan data sumber yang akan diimpor

Dalam contoh ini, kolom bertipe TINYLNT akan diimpor. Tabel berikut menjelaskan hubungan antara mode ketat dan data sumber jika sistem mengizinkan nilai kolom NULL untuk diimpor.

Data Sumber

Contoh

STRING ke INT

Mode Ketat

Hasil

NULL

\N

N/A

true atau false

NULL

NOT NULL

aaa atau 2000

NULL

true

Data tidak valid (disaring)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1

1

true atau false

Data benar

Dalam contoh ini, kolom bertipe DECIMAL(1,0) akan diimpor. Tabel berikut menjelaskan hubungan antara mode ketat dan data sumber jika sistem mengizinkan nilai kolom NULL untuk diimpor.

Data Sumber

Contoh

STRING ke INT

Mode Ketat

Hasil

NULL

\N

N/A

true atau false

NULL

NOT NULL

aaa

NULL

true

Data tidak valid (disaring)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1 atau 10

1

true atau false

Data benar

Catatan

Nilai 10 melebihi rentang yang diizinkan untuk tipe DECIMAL(1,0). Namun, nilai 10 tidak disaring dalam mode ketat karena nilai 10 memenuhi persyaratan tipe DECIMAL. Nilai 10 akhirnya disaring dalam proses ekstrak, transformasi, dan muat (ETL).

Parameter data_source_properties

FROM KAFKA
(
    "key1" = "val1",
    "key2" = "val2"
)

Parameter

Deskripsi

kafka_broker_list

Konfigurasi yang digunakan untuk terhubung ke broker dalam kluster Kafka. Format: ip:host. Pisahkan beberapa konfigurasi dengan koma (,).

Contoh: "kafka_broker_list"="broker1:9092,broker2:9092".

kafka_topic

Topik Kafka yang ingin Anda langgani.

Format: "kafka_topic"="my_topic".

kafka_partitions/kafka_offsets

Partisi Kafka yang ingin Anda langgani dan offset awal setiap partisi. Jika Anda menentukan titik waktu tertentu, konsumsi data dimulai dari offset terbaru yang lebih besar dari atau sama dengan titik waktu tersebut.

Anda dapat menentukan offset yang lebih besar dari atau sama dengan 0. Atau, atur parameter kafka_offsets ke salah satu nilai berikut:

  • OFFSET_BEGINNING: Berlangganan partisi dari offset di mana data tersedia.

  • OFFSET_END: Berlangganan partisi dari offset akhir.

  • Tentukan titik waktu dalam format yyyy-MM-dd HH:mm:ss. Contoh: 2021-05-22 11:00:00.

Jika Anda tidak menentukan parameter ini, sistem berlangganan semua partisi dalam topik dari offset akhir.

Contoh:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "101,0,OFFSET_BEGINNING,OFFSET_END"
"kafka_partitions" = "0,1,2,3",
"kafka_offsets" = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"
Penting

Format waktu tidak dapat dicampur dengan format offset.

property

Parameter Kafka kustom. Parameter ini setara dengan parameter --property dalam shell Kafka.

Jika nilai parameter ini adalah file, Anda perlu menambahkan kata kunci FILE: sebelum nilai tersebut.

Parameter properti

  • Jika Anda terhubung ke kluster Kafka menggunakan metode otentikasi SSL, Anda harus mengonfigurasi parameter berikut:

    "property.security.protocol" = "ssl",
    "property.ssl.ca.location" = "FILE:ca.pem",
    "property.ssl.certificate.location" = "FILE:client.pem",
    "property.ssl.key.location" = "FILE:client.key",
    "property.ssl.key.password" = "abcdefg"

    Parameter property.security.protocol dan property.ssl.ca.location diperlukan untuk menentukan metode yang digunakan untuk terhubung ke kluster Kafka dan lokasi sertifikat Otoritas Sertifikat (CA).

    Jika mode otentikasi klien diaktifkan untuk server Kafka, Anda harus mengonfigurasi parameter berikut:

    "property.ssl.certificate.location"
    "property.ssl.key.location"
    "property.ssl.key.password"

    Parameter di atas digunakan untuk menentukan lokasi sertifikat kunci publik klien, lokasi file kunci privat klien, dan kata sandi untuk mengakses kunci privat klien.

  • Tentukan offset awal default partisi Kafka.

    Secara default, jika parameter kafka_partitions/kafka_offsets tidak ditentukan, data dari semua partisi dikonsumsi. Dalam hal ini, Anda dapat mengonfigurasi parameter kafka_default_offsets untuk menentukan offset awal setiap partisi. Nilai default: OFFSET_END, yang menunjukkan bahwa partisi dilanggan dari offset akhir.

    "property.kafka_default_offsets" = "OFFSET_BEGINNING"

Untuk informasi lebih lanjut tentang parameter kustom yang didukung, lihat item konfigurasi klien dalam dokumen konfigurasi resmi librdkafka. Misalnya, parameter kustom berikut tersedia:

"property.client.id" = "12345",
"property.ssl.ca.location" = "FILE:ca.pem"

Contoh

Buat Pekerjaan Routine Load

  1. Buat tabel ke dalam mana Anda ingin mengimpor data dalam instans ApsaraDB for SelectDB. Contoh kode:

    CREATE TABLE test_table
    (
        id int,
        name varchar(50),
        age int,
        address varchar(50),
        url varchar(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES("replication_num" = "1");
  2. Konfigurasikan parameter dalam contoh kode berikut untuk mengimpor data.

    • Buat pekerjaan Routine Load bernama test1 untuk tabel test_table dalam database example_db. Tentukan ID grup, ID klien, dan pemisah kolom, aktifkan sistem untuk secara otomatis mengonsumsi data dari semua partisi secara default, dan berlangganan partisi dari offset awal di mana data tersedia. Contoh kode:

      CREATE ROUTINE LOAD example_db.test1 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • Buat pekerjaan Routine Load bernama test2 untuk tabel test_table dalam database example_db. Aktifkan mode ketat untuk pekerjaan tersebut. Contoh kode:

      CREATE ROUTINE LOAD example_db.test2 ON test_table
      COLUMNS TERMINATED BY ",",
      COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offsets" = "OFFSET_BEGINNING"
      );
    • Konsumsi data partisi dari titik waktu tertentu. Contoh kode:

      CREATE ROUTINE LOAD example_db.test4 ON test_table
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "30",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200"
      ) FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092",
          "kafka_topic" = "my_topic",
          "property.kafka_default_offset" = "2024-01-21 10:00:00"
      );

Impor Data JSON

Anda dapat menggunakan Routine Load untuk mengimpor data JSON dari dua jenis berikut:

  • Data JSON hanya berisi satu catatan data dan merupakan objek JSON.

    Jika Anda menggunakan mode impor tabel tunggal, data JSON dalam format berikut. Dalam mode ini, nama tabel ditentukan dengan menjalankan pernyataan ON TABLE_NAME.

    {"key1":"value1","key2":"value2","key3":"value3"}

    Jika Anda menggunakan mode dinamis atau multi-tabel, data JSON dalam format berikut. Dalam mode ini, nama tabel tidak ditentukan.

    table_name|{"key1":"value1","key2":"value2","key3":"value3"}
  • Data JSON adalah array yang berisi beberapa catatan data.

    Jika Anda menggunakan mode impor tabel tunggal, data JSON dalam format berikut. Dalam mode ini, nama tabel ditentukan dengan menjalankan pernyataan ON TABLE_NAME.

    [
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

    Jika Anda menggunakan mode dinamis atau multi-tabel, data JSON dalam format berikut. Dalam mode ini, nama tabel tidak ditentukan.

       table_name|[
        {   
            "key1":"value11",
            "key2":"value12",
            "key3":"value13",
            "key4":14
        },
        {
            "key1":"value21",
            "key2":"value22",
            "key3":"value23",
            "key4":24
        },
        {
            "key1":"value31",
            "key2":"value32",
            "key3":"value33",
            "key4":34
        }
    ]

Impor data JSON.

  1. Buat tabel ke dalam mana Anda ingin mengimpor data dalam instans ApsaraDB for SelectDB. Contoh kode:

    CREATE TABLE `example_tbl` (
       `category` varchar(24) NULL COMMENT "",
       `author` varchar(24) NULL COMMENT "",
       `timestamp` bigint(20) NULL COMMENT "",
       `dt` int(11) NULL COMMENT "",
       `price` double REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`,`author`,`timestamp`,`dt`)
    COMMENT "OLAP"
    PARTITION BY RANGE(`dt`)
    (
      PARTITION p0 VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`,`author`,`timestamp`) BUCKETS 4;
  2. Impor data JSON dari dua jenis di atas ke dalam topik. Contoh kode:

    {
        "category":"value1331",
        "author":"value1233",
        "timestamp":1700346050,
        "price":1413
    }
    [
        {
            "category":"value13z2",
            "author":"vaelue13",
            "timestamp":1705645251,
            "price":14330
        },
        {
            "category":"lvalue211",
            "author":"lvalue122",
            "timestamp":1684448450,
            "price":24440
        }
    ]
  3. Impor data JSON dalam mode berbeda.

    • Impor data JSON dalam mode sederhana. Contoh kode:

      CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
      COLUMNS(category,price,author)
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
       );
    • Impor data JSON secara akurat. Contoh kode:

      CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
      COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
      PROPERTIES
      (
          "desired_concurrent_number"="3",
          "max_batch_interval" = "20",
          "max_batch_rows" = "300000",
          "max_batch_size" = "209715200",
          "strict_mode" = "false",
          "format" = "json",
          "jsonpaths" = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
          "strip_outer_array" = "true"
      )
      FROM KAFKA
      (
          "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
          "kafka_topic" = "my_topic",
          "kafka_partitions" = "0,1,2",
          "kafka_offsets" = "0,0,0"
      );
      Catatan

      Bidang partisi dt dalam tabel tidak tersedia dalam data sampel. Nilai bidang dt dihasilkan dengan menggunakan konfigurasi dt=from_unixtime(timestamp,'%Y%m%d') dalam pernyataan CREATE ROUTINE LOAD.

Akses Kluster Kafka yang Menggunakan Metode Otentikasi Berbeda

Contoh berikut menunjukkan cara mengakses kluster Kafka berdasarkan metode otentikasi kluster Kafka.

  1. Akses kluster Kafka yang memiliki metode otentikasi SSL diaktifkan.

    Untuk mengakses kluster Kafka yang memiliki metode otentikasi SSL diaktifkan, Anda harus menyediakan file sertifikat (ca.pem) yang digunakan untuk mengotentikasi kunci publik broker Kafka. Jika mode otentikasi klien diaktifkan untuk kluster Kafka, sertifikat kunci publik (client.pem), file kunci privat (client.key), dan kata sandi kunci privat klien juga diperlukan. Anda perlu mengunggah file yang diperlukan ke ApsaraDB for SelectDB terlebih dahulu dengan menjalankan pernyataan CREATE FILE. Katalog diberi nama kafka.

    1. Unggah file. Contoh kode:

      CREATE FILE "ca.pem" PROPERTIES("url" = "https://example_url/kafka-key/ca.pem", "catalog" = "kafka");
      CREATE FILE "client.key" PROPERTIES("url" = "https://example_urlkafka-key/client.key", "catalog" = "kafka");
      CREATE FILE "client.pem" PROPERTIES("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
    2. Buat pekerjaan Routine Load. Contoh kode:

      CREATE ROUTINE LOAD db1.job1 on tbl1
      PROPERTIES
      (
          "desired_concurrent_number"="1"
      )
      FROM KAFKA
      (
          "kafka_broker_list"= "broker1:9091,broker2:9091",
          "kafka_topic" = "my_topic",
          "property.security.protocol" = "ssl",
          "property.ssl.ca.location" = "FILE:ca.pem",
          "property.ssl.certificate.location" = "FILE:client.pem",
          "property.ssl.key.location" = "FILE:client.key",
          "property.ssl.key.password" = "abcdefg"
      );
      Catatan

      ApsaraDB for SelectDB mengakses kluster Kafka menggunakan pustaka klien Kafka C++ librdkafka. Untuk informasi lebih lanjut tentang parameter yang didukung oleh librdkafka, lihat dokumen Properti Konfigurasi dari librdkafka.

  2. Akses kluster Kafka yang memiliki metode otentikasi PLAIN diaktifkan.

    Untuk mengakses kluster Kafka yang memiliki metode otentikasi PLAIN diaktifkan, Anda harus menambahkan konfigurasi berikut:

    1. property.security.protocol=SASL_PLAINTEXT: Gunakan metode otentikasi teks biasa Lapisan Otentikasi dan Keamanan Sederhana (SASL).

    2. property.sasl.mechanism=PLAIN: Tetapkan metode otentikasi SASL ke PLAIN.

    3. property.sasl.username=admin: Tentukan nama pengguna SASL.

    4. property.sasl.password=admin: Tentukan kata sandi SASL.

    Buat pekerjaan Routine Load. Contoh kode:

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol"="SASL_PLAINTEXT",
        "property.sasl.mechanism"="PLAIN",
        "property.sasl.username"="admin",
        "property.sasl.password"="admin"
    );
    
  3. Akses kluster Kafka yang memiliki metode otentikasi Kerberos diaktifkan.

    Untuk mengakses kluster Kafka yang memiliki metode otentikasi Kerberos diaktifkan, Anda harus menambahkan konfigurasi berikut:

    1. security.protocol=SASL_PLAINTEXT: Gunakan metode otentikasi teks biasa SASL.

    2. sasl.kerberos.service.name=$SERVICENAME: Tentukan nama layanan broker.

    3. sasl.kerberos.keytab=/etc/security/keytabs/${CLIENT_NAME}.keytab: Tentukan jalur file .keytab lokal.

    4. sasl.kerberos.principal=${CLIENT_NAME}/${CLIENT_HOST}: Tentukan principal Kerberos yang digunakan oleh ApsaraDB for SelectDB untuk terhubung ke kluster Kafka.

    Buat pekerjaan Routine Load. Contoh kode:

    CREATE ROUTINE LOAD db1.job1 on tbl1
    PROPERTIES (
    "desired_concurrent_number"="1",
     )
    FROM KAFKA
    (
        "kafka_broker_list" = "broker1:9092,broker2:9092",
        "kafka_topic" = "my_topic",
        "property.security.protocol" = "SASL_PLAINTEXT",
        "property.sasl.kerberos.service.name" = "kafka",
        "property.sasl.kerberos.keytab" = "/etc/krb5.keytab",
        "property.sasl.kerberos.principal" = "id@your.com"
    );
    Catatan
    • Untuk mengaktifkan ApsaraDB for SelectDB agar dapat mengakses kluster Kafka yang memiliki metode otentikasi Kerberos diaktifkan, Anda perlu menerapkan klien Kerberos kinit pada semua node yang sedang berjalan dalam kluster ApsaraDB for SelectDB, mengonfigurasi file krb5.conf, dan menentukan informasi layanan Pusat Distribusi Kunci (KDC).

    • Tetapkan parameter property.sasl.kerberos.keytab ke jalur mutlak file .keytab lokal, dan izinkan proses ApsaraDB for SelectDB untuk mengakses file .keytab lokal.

Modifikasi Pekerjaan Routine Load

Anda dapat memodifikasi pekerjaan Routine Load yang ada dan berada dalam status PAUSED.

Sintaks

ALTER ROUTINE LOAD FOR <job_name>
[job_properties]
FROM <data_source>
[data_source_properties]

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan yang akan dimodifikasi.

tbl_name

Nama tabel ke mana data akan diimpor.

job_properties

Parameter pekerjaan yang akan dimodifikasi. Hanya parameter berikut yang dapat dimodifikasi:

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source

Tipe sumber data. Tetapkan parameter ini ke KAFKA.

data_source_properties

Parameter sumber data. Hanya parameter berikut yang didukung:

  1. kafka_partitions

  2. kafka_offsets

  3. kafka_broker_list

  4. kafka_topic

  5. Properti kustom, seperti property.group.id.

Catatan

Parameter kafka_partitions dan kafka_offsets digunakan untuk memodifikasi offset partisi Kafka yang akan dikonsumsi. Anda hanya dapat memodifikasi partisi yang dikonsumsi. Anda tidak dapat menambahkan partisi.

Contoh

  • Ubah nilai parameter desired_concurrent_number menjadi 1. Contoh kode:

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "1"
    );
  • Ubah nilai parameter desired_concurrent_number menjadi 10, dan modifikasi offset partisi serta ID grup. Contoh kode:

    ALTER ROUTINE LOAD FOR db1.label1
    PROPERTIES
    (
        "desired_concurrent_number" = "10"
    )
    FROM kafka
    (
        "kafka_partitions" = "0, 1, 2",
        "kafka_offsets" = "100, 200, 100",
        "property.group.id" = "new_group"
    );

Jeda Pekerjaan Routine Load

Anda dapat menjeda pekerjaan Routine Load dengan menjalankan pernyataan PAUSE, dan melanjutkan pekerjaan yang dijeda dengan menjalankan pernyataan RESUME.

Sintaks

PAUSE [ALL] ROUTINE LOAD FOR <job_name>;

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan yang akan dijeda.

Contoh

  • Jalankan pernyataan berikut untuk menjeda pekerjaan Routine Load bernama test1:

    PAUSE ROUTINE LOAD FOR test1;
  • Jalankan pernyataan berikut untuk menjeda semua pekerjaan Routine Load:

    PAUSE ALL ROUTINE LOAD;

Lanjutkan Pekerjaan Routine Load

Anda dapat melanjutkan pekerjaan Routine Load yang dijeda. Pekerjaan yang dilanjutkan akan terus mengonsumsi data dari offset terakhir yang dikonsumsi.

Sintaks

RESUME [ALL] ROUTINE LOAD FOR <job_name>

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan Routine Load yang akan dilanjutkan.

Contoh

  • Jalankan pernyataan berikut untuk melanjutkan pekerjaan Routine Load bernama test1:

    RESUME ROUTINE LOAD FOR test1;
  • Jalankan pernyataan berikut untuk melanjutkan semua pekerjaan Routine Load:

    RESUME ALL ROUTINE LOAD;

Hentikan Pekerjaan Routine Load

Anda dapat menghentikan pekerjaan Routine Load. Pekerjaan Routine Load yang dihentikan tidak dapat dimulai kembali. Setelah pekerjaan Routine Load dihentikan, data yang telah diimpor tidak dapat dikembalikan.

Sintaks

STOP ROUTINE LOAD FOR <job_name>;

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan yang akan dihentikan.

Contoh

Jalankan pernyataan berikut untuk menghentikan pekerjaan Routine Load bernama test1:

STOP ROUTINE LOAD FOR test1;

Kueri satu atau lebih Pekerjaan Routine Load

Anda dapat menjalankan pernyataan SHOW ROUTINE LOAD untuk menanyakan status satu atau lebih pekerjaan Routine Load.

Sintaks

SHOW [ALL] ROUTINE LOAD [FOR job_name];

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan yang ingin Anda tanyakan.

Catatan

Jika data yang diimpor dalam format tidak valid, informasi kesalahan detail dicatat dalam nilai parameter ErrorLogUrls. Nilai parameter ErrorLogUrls berisi beberapa URL. Anda dapat menyalin salah satu URL untuk menanyakan informasi kesalahan di browser.

Contoh

  • Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load bernama test1, termasuk pekerjaan yang dihentikan dan dibatalkan. Output hasil menampilkan setiap pekerjaan pada baris terpisah dan mungkin terdiri dari satu atau lebih baris tergantung jumlah pekerjaan.

    SHOW ALL ROUTINE LOAD FOR test1;
  • Jalankan pernyataan berikut untuk menanyakan pekerjaan Routine Load yang sedang berlangsung bernama test1:

    SHOW ROUTINE LOAD FOR test1;
  • Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load dalam database example_db, termasuk pekerjaan yang dihentikan dan dibatalkan. Output hasil menampilkan setiap pekerjaan pada baris terpisah dan mungkin terdiri dari satu atau lebih baris tergantung jumlah pekerjaan.

    use example_db;
    SHOW ALL ROUTINE LOAD;
  • Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load yang sedang berlangsung dalam database example_db:

    use example_db;
    SHOW ROUTINE LOAD;
  • Jalankan pernyataan berikut untuk menanyakan pekerjaan Routine Load yang sedang berlangsung bernama test1 dalam database example_db:

    SHOW ROUTINE LOAD FOR example_db.test1;
  • Jalankan pernyataan berikut untuk menanyakan semua pekerjaan Routine Load bernama test1 dalam database example_db, termasuk pekerjaan yang dihentikan dan dibatalkan. Output hasil menampilkan setiap pekerjaan pada baris terpisah dan mungkin terdiri dari satu atau lebih baris tergantung jumlah pekerjaan.

    SHOW ALL ROUTINE LOAD FOR example_db.test1;

Konfigurasi sistem terkait

Konfigurasi sistem terkait memengaruhi penggunaan Routine Load.

  • max_routine_load_task_concurrent_num

    Parameter frontend (FE). Nilai default: 5. Anda dapat memodifikasi parameter ini saat runtime. Parameter ini menentukan jumlah maksimum tugas yang dapat berjalan secara bersamaan pada satu waktu untuk pekerjaan Routine Load. Kami merekomendasikan Anda menggunakan nilai default. Jika parameter ini disetel ke nilai besar, jumlah tugas bersamaan mungkin berlebihan dan menghabiskan banyak sumber daya kluster.

  • max_routine_load_task_num_per_be

    Parameter FE. Nilai default: 5. Anda dapat memodifikasi parameter ini saat runtime. Parameter ini menentukan jumlah maksimum tugas yang dapat berjalan secara bersamaan pada satu waktu di setiap node BE. Kami merekomendasikan Anda menggunakan nilai default. Jika parameter ini disetel ke nilai besar, jumlah tugas bersamaan mungkin berlebihan dan menghabiskan banyak sumber daya kluster.

  • max_routine_load_job_num

    Parameter FE. Nilai default: 100. Anda dapat memodifikasi parameter ini saat runtime. Parameter ini menentukan jumlah maksimum pekerjaan Routine Load yang dapat Anda kirimkan, termasuk pekerjaan dalam status NEED_SCHEDULED, RUNNING, atau PAUSED. Jika total jumlah pekerjaan Routine Load yang Anda kirimkan mencapai nilai maksimum, tidak ada pekerjaan lain yang dapat dikirimkan.

  • max_consumer_num_per_group

    Parameter BE. Nilai default: 3. Parameter ini menentukan jumlah maksimum konsumen yang dapat dihasilkan untuk mengonsumsi data dalam suatu tugas. Untuk sumber data Kafka, satu konsumen mungkin mengonsumsi data dari satu atau lebih partisi Kafka. Jika suatu tugas mengonsumsi data dari enam partisi Kafka, tiga konsumen dihasilkan. Setiap konsumen mengonsumsi data dari dua partisi. Jika hanya dua partisi yang ada, hanya dua konsumen yang dihasilkan, dan setiap konsumen mengonsumsi data dari satu partisi.

  • max_tolerable_backend_down_num

    Parameter FE. Nilai default: 0. Jika kondisi tertentu terpenuhi, ApsaraDB for SelectDB menjadwalkan ulang pekerjaan dalam status PAUSED. Kemudian, status pekerjaan yang dijadwalkan ulang berubah menjadi RUNNING. Nilai 0 menunjukkan bahwa pekerjaan hanya dapat dijadwalkan ulang jika semua node BE hidup.

  • period_of_auto_resume_min

    Parameter FE memiliki nilai default 5 menit, yang berarti ApsaraDB for SelectDB akan menjadwalkan ulang pekerjaan hingga tiga kali dalam jangka waktu tersebut. Jika pekerjaan gagal dijadwalkan ulang sebanyak tiga kali, pekerjaan tersebut akan terkunci dan tidak dijadwalkan ulang lagi. Intervensi manual dapat dilakukan untuk melanjutkan pekerjaan.

Deskripsi Lainnya

  • Hubungan antara pekerjaan Routine Load dan operasi ALTER TABLE

    • Pekerjaan Routine Load tidak memblokir operasi SCHEMA CHANGE atau ROLLUP. Namun, jika kolom dalam data sumber tidak sesuai dengan kolom dalam tabel tujuan setelah operasi SCHEMA CHANGE, jumlah data kesalahan akan meningkat dan pekerjaan akhirnya dijeda. Untuk mencegah masalah ini, disarankan untuk secara eksplisit menentukan pemetaan kolom dalam pekerjaan Routine Load serta menggunakan kolom NULLABLE atau kolom dengan batasan DEFAULT.

    • Jika partisi tabel dihapus, data mungkin gagal diimpor karena partisi tidak ditemukan, sehingga menyebabkan pekerjaan dijeda.

  • Hubungan antara pekerjaan Routine Load dan operasi LOAD, DELETE, dan INSERT

    • Pekerjaan Routine Load tidak bertentangan dengan operasi LOAD atau INSERT.

    • Untuk melakukan operasi DELETE pada tabel, pastikan tidak ada data yang sedang diimpor ke partisi tabel terkait. Oleh karena itu, sebelum menjalankan operasi DELETE, jeda pekerjaan Routine Load dan tunggu hingga semua tugas yang telah ditugaskan selesai.

  • Hubungan antara pekerjaan Routine Load dan operasi DROP DATABASE atau DROP TABLE

    Jika database atau tabel yang menjadi tujuan impor pekerjaan Routine Load dihapus, pekerjaan tersebut secara otomatis dibatalkan.

  • Hubungan antara pekerjaan Routine Load untuk kluster Kafka dan topik Kafka

    Jika Kafka topic yang didefinisikan dalam pernyataan CREATE ROUTINE LOAD tidak tersedia di kluster Kafka, broker Kafka dapat secara otomatis membuat topik berdasarkan pengaturan parameter auto.create.topics.enable.

    • Jika parameter auto.create.topics.enable diatur ke true untuk broker Kafka, topik akan dibuat secara otomatis. Jumlah partisi yang dibuat ditentukan oleh parameter num.partitions broker Kafka. Pekerjaan Routine Load kemudian akan terus membaca data dari topik tersebut.

    • Jika parameter auto.create.topics.enable diatur ke false untuk broker Kafka, topik tidak akan dibuat secara otomatis. Dalam hal ini, pekerjaan Routine Load dijeda hingga data tersedia.

    Oleh karena itu, jika Anda ingin topik dibuat secara otomatis, atur parameter auto.create.topics.enable ke true untuk broker dalam kluster Kafka.

  • Pertimbangan untuk isolasi blok CIDR dan resolusi nama domain dalam lingkungan

    • Broker yang ditentukan saat membuat pekerjaan Routine Load harus dapat diakses oleh ApsaraDB for SelectDB.

    • Jika parameter advertised.listeners dikonfigurasikan di Kafka, alamat dalam nilai parameter advertised.listeners harus dapat diakses oleh ApsaraDB for SelectDB.

  • Tentukan partisi dan offset untuk konsumsi data.

    Tentukan partisi dan offset untuk konsumsi data

    • kafka_partitions: partisi yang datanya akan dikonsumsi. Contoh: "0,1,2,3".

    • kafka_offsets: offset awal setiap partisi. Jumlah offset yang ditentukan untuk parameter ini harus sama dengan jumlah partisi yang ditentukan untuk parameter kafka_partitions. Contoh: "1000,1000,2000,2000".

    • property.kafka_default_offset: offset awal default partisi.

    Saat membuat pekerjaan Routine Load, Anda dapat menggabungkan ketiga parameter tersebut menggunakan salah satu dari lima metode yang dijelaskan dalam tabel berikut.

    Metode

    kafka_partitions

    kafka_offsets

    property.kafka_default_offset

    Perilaku

    1

    Tidak

    Tidak

    Tidak

    Sistem secara otomatis mencari semua partisi topik Kafka dan mulai mengonsumsi data dari offset akhir partisi.

    2

    Tidak

    Tidak

    Ya

    Sistem secara otomatis mencari semua partisi topik Kafka dan mulai mengonsumsi data dari offset default.

    3

    Ya

    Tidak

    Tidak

    Sistem mulai mengonsumsi data dari offset akhir partisi yang ditentukan.

    4

    Ya

    Ya

    Tidak

    Sistem mulai mengonsumsi data dari offset yang ditentukan dari partisi yang ditentukan.

    5

    Ya

    Tidak

    Ya

    Sistem mulai mengonsumsi data dari offset default partisi yang ditentukan.

  • Perbedaan antara status STOPPED dan PAUSED

    FE secara berkala membersihkan pekerjaan Routine Load dalam status STOPPED. Pekerjaan Routine Load dalam status PAUSED dapat dilanjutkan.