All Products
Search
Document Center

ApsaraDB for SelectDB:Gunakan Routine Load untuk mengimpor data

Last Updated:Apr 29, 2026

Routine Load secara terus-menerus membaca pesan dari topik Kafka dan memuatnya ke dalam ApsaraDB for SelectDB. Setelah dibuat, pekerjaan Routine Load berjalan secara persisten—tidak perlu diajukan ulang setelah setiap batch.

Cara kerja

Saat Anda membuat pekerjaan Routine Load, frontend (FE) menghasilkan pekerjaan muat persisten dan membaginya menjadi beberapa task. Setiap task merupakan transaksi independen yang mengonsumsi irisan terbatas pesan Kafka. Sebuah task berakhir ketika ambang batas pertama tercapai: max_batch_interval, max_batch_rows, atau max_batch_size. Setelah sebuah task dikomit, task baru segera dijadwalkan.

Arsitektur ini memungkinkan penyesuaian konkurensi, ukuran batch, dan toleransi kesalahan tanpa menghentikan pekerjaan.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

  • Kluster Kafka (versi 0.10.0.0 atau lebih baru) yang dapat diakses dari instans ApsaraDB for SelectDB Anda

  • Topik Kafka dengan pesan dalam format CSV atau JSON (pesan CSV harus berupa satu baris tanpa line feed di akhir)

  • Tabel tujuan di ApsaraDB for SelectDB

  • Konektivitas jaringan: Jika kluster Kafka Anda dideploy di lingkungan jaringan publik, instans ApsaraDB for SelectDB Anda harus memiliki akses jaringan publik untuk terhubung ke broker Kafka. Lihat Atasi masalah jaringan dengan sumber data.

Catatan penggunaan

Kafka 0.10.0.0 dan versi yang lebih baru didukung secara default. Untuk menggunakan versi Kafka lama (0.9.0, 0.8.2, 0.8.1, atau 0.8.0), gunakan salah satu pendekatan berikut:

  • Setel kafka_broker_version_fallback dalam konfigurasi backend (BE) ke versi Kafka target.

  • Setel property.broker.version.fallback saat membuat pekerjaan Routine Load.

Beberapa fitur tidak tersedia pada versi Kafka sebelum 0.10.0.0. Misalnya, offset partisi berbasis waktu tidak didukung.

Panduan cepat

Contoh ini memuat data CSV dari Kafka ke tabel SelectDB.

Langkah 1: Verifikasi data sampel di Kafka

1,Alice,30
2,Bob,25
3,Carol,35

Langkah 2: Buat tabel tujuan

CREATE TABLE testdb.users (
    id      INT         NOT NULL,
    name    VARCHAR(50),
    age     INT
)
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 4
PROPERTIES ("replication_num" = "1");

Langkah 3: Buat pekerjaan Routine Load

CREATE ROUTINE LOAD testdb.load_users ON users
COLUMNS TERMINATED BY ",",
COLUMNS(id, name, age)
PROPERTIES (
    "desired_concurrent_number" = "3",
    "max_batch_interval"        = "20",
    "max_batch_rows"            = "300000",
    "max_batch_size"            = "209715200",
    "strict_mode"               = "false"
)
FROM KAFKA (
    "kafka_broker_list"              = "broker1:9092,broker2:9092",
    "kafka_topic"                    = "users",
    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
);

Langkah 4: Periksa status pekerjaan

SHOW ROUTINE LOAD FOR testdb.load_users;

Buat pekerjaan Routine Load

Sintaks

CREATE ROUTINE LOAD [db.]job_name ON tbl_name
[merge_type]
[load_properties]
[job_properties]
FROM data_source [data_source_properties]

Parameter

Parameter

Deskripsi

[db.]job_name

Nama pekerjaan. Dalam satu database, hanya satu pekerjaan dengan nama tertentu yang dapat berjalan pada satu waktu.

tbl_name

Nama tabel tujuan.

merge_type

Mode penggabungan data. Default: APPEND (penambahan standar). Hanya untuk tabel model Unique Key, atur ke MERGE (memerlukan DELETE ON) atau DELETE (menghapus semua baris yang diimpor).

load_properties

Parameter untuk memproses data yang diimpor. Lihat parameter load_properties.

job_properties

Parameter tingkat pekerjaan. Lihat parameter job_properties.

data_source_properties

Parameter koneksi Kafka. Lihat parameter data_source_properties.

Parameter load_properties

[column_separator],
[columns_mapping],
[preceding_filter],
[where_predicates],
[partitions],
[DELETE ON],
[ORDER BY]

Parameter

Contoh

Deskripsi

column_separator

COLUMNS TERMINATED BY ","

Pembatas kolom. Default: \t.

columns_mapping

(k1, k2, tmpk1, k3=tmpk1+1)

Memetakan kolom sumber ke kolom tujuan dan menerapkan transformasi. Lihat Mengonversi data sumber.

preceding_filter

Menyaring data sumber sebelum pemetaan kolom. Lihat Mengonversi data sumber.

where_predicates

WHERE k1 > 100 AND k2 = 1000

Menyaring baris setelah pemetaan kolom. Lihat Mengonversi data sumber.

partitions

PARTITION(p1, p2, p3)

Partisi target. Jika dihilangkan, data diarahkan ke partisi yang sesuai secara otomatis.

DELETE ON

DELETE ON v3 > 100

Menentukan ekspresi kolom Delete Flag. Diperlukan saat merge_type adalah MERGE. Hanya berlaku untuk tabel model Unique Key.

ORDER BY

Menentukan Sequence Col untuk mempertahankan urutan baris selama impor. Hanya berlaku untuk tabel model Unique Key.

Parameter job_properties

PROPERTIES (
    "key1" = "val1",
    "key2" = "val2"
)
Sebuah task berakhir ketika ambang batas pertama dari max_batch_interval, max_batch_rows, atau max_batch_size tercapai.

Parameter

Default

Rentang valid

Deskripsi

desired_concurrent_number

3

Bilangan bulat > 0

Maksimum task yang dapat berjalan secara konkuren untuk pekerjaan tersebut. Konkurensi aktual bergantung pada node kluster, beban, dan sumber data. Untuk throughput optimal, atur nilai ini ke jumlah core kluster / 16.

max_batch_interval

10 detik

5–60 detik

Durasi eksekusi maksimum per task.

max_batch_rows

200000

≥ 200000

Maksimum baris yang dibaca per task.

max_batch_size

104857600 (100 MB)

100 MB–1 GB

Maksimum byte yang dibaca per task.

max_error_number

0

Bilangan bulat > 0

Maksimum baris error yang diperbolehkan dalam jendela pengambilan sampel (10 × max_batch_rows). Jika melebihi, pekerjaan dijeda dan memerlukan intervensi manual. Baris yang difilter oleh kondisi WHERE tidak dihitung sebagai error.

strict_mode

false

true / false

Jika true, baris tempat nilai sumber NOT NULL dikonversi menjadi NULL di kolom tujuan akan difilter. Tidak berlaku untuk kolom yang diturunkan dari fungsi.

timezone

Zona waktu sesi

Zona waktu untuk semua fungsi terkait zona waktu dalam pekerjaan (misalnya, "Africa/Abidjan").

format

CSV

CSV / json

Format pesan.

jsonpaths

Jalur ekstraksi bidang JSON untuk data format JSON (misalnya, "[\"$.k2\",\"$.k1\"]").

strip_outer_array

false

true / false

Jika true, memperlakukan array JSON tingkat atas sebagai beberapa baris.

json_root

Jalur node root untuk ekstraksi JSON (misalnya, "$.RECORDS").

send_batch_parallelism

Maksimum thread paralel untuk mengirim data batch. Dibatasi oleh max_send_batch_parallelism_per_job dalam konfigurasi BE.

load_to_single_tablet

false

true / false

Jika true, memuat data ke satu tablet per partisi. Hanya berlaku untuk tabel model Duplicate Key dengan partisi acak.

Perilaku mode ketat

Mode ketat mengatur cara penanganan kegagalan konversi tipe.

Mengimpor kolom TINYINT (NULL diizinkan):

Tipe data sumber

Nilai contoh

Hasil konversi

Mode ketat

Hasil

NULL

\N

true atau false

NULL

NOT NULL

aaa atau 2000

NULL

true

Difilter (tidak valid)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1

1

true atau false

Diimpor

Mengimpor kolom DECIMAL(1,0) (NULL diizinkan):

Tipe data sumber

Nilai contoh

Hasil konversi

Mode ketat

Hasil

NULL

\N

true atau false

NULL

NOT NULL

aaa

NULL

true

Difilter (tidak valid)

NOT NULL

aaa

NULL

false

NULL

NOT NULL

1 atau 10

1

true atau false

Diimpor

Nilai 10 melebihi rentang DECIMAL(1,0) tetapi tidak difilter oleh mode ketat, karena nilainya sendiri lolos konversi tipe. Nilai tersebut difilter kemudian selama proses ekstrak, transformasi, dan muat (ETL).

Parameter data_source_properties

FROM KAFKA (
    "key1" = "val1",
    "key2" = "val2"
)

Parameter

Deskripsi

kafka_broker_list

Alamat broker. Format: host:port. Pisahkan beberapa broker dengan koma. Contoh: "broker1:9092,broker2:9092".

kafka_topic

Topik Kafka yang akan dilanggan.

kafka_partitions

ID partisi yang dipisahkan koma untuk dilanggan. Contoh: "0,1,2,3".

kafka_offsets

Offset awal untuk setiap partisi yang tercantum dalam kafka_partitions. Harus memiliki jumlah entri yang sama. Menerima offset numerik, OFFSET_BEGINNING, OFFSET_END, atau timestamp dalam format yyyy-MM-dd HH:mm:ss. Jika dihilangkan, default ke offset akhir semua partisi.

property.*

Properti klien Kafka kustom, setara dengan --property di shell Kafka. Untuk nilai berbasis file, tambahkan prefiks FILE: (misalnya, "FILE:ca.pem").

Penting

Offset berbasis waktu dan offset numerik tidak dapat dicampur dalam nilai kafka_offsets yang sama.

Kombinasi partisi dan offset

kafka_partitions, kafka_offsets, dan property.kafka_default_offsets berinteraksi sebagai berikut:

Metode

kafka_partitions

kafka_offsets

property.kafka_default_offsets

Perilaku

1

Tidak diatur

Tidak diatur

Tidak diatur

Semua partisi, dimulai dari offset akhir.

2

Tidak diatur

Tidak diatur

Set

Semua partisi, dimulai dari offset default yang ditentukan.

3

Set

Tidak diatur

Tidak diatur

Partisi yang ditentukan, dimulai dari offset akhir.

4

Set

Set

Tidak diatur

Partisi yang ditentukan, dimulai dari offset yang ditentukan.

5

Set

Tidak diatur

Tetapkan

Partisi yang ditentukan, dimulai dari offset default yang ditentukan.

Contoh — partisi dengan offset campuran:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets"    = "101,0,OFFSET_BEGINNING,OFFSET_END"

Contoh — partisi dengan offset berbasis timestamp:

"kafka_partitions" = "0,1,2,3",
"kafka_offsets"    = "2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00,2021-05-22 11:00:00"

Contoh

Muat data CSV

  1. Buat tabel tujuan:

    CREATE TABLE test_table (
        id      INT,
        name    VARCHAR(50),
        age     INT,
        address VARCHAR(50),
        url     VARCHAR(500)
    )
    UNIQUE KEY(`id`, `name`)
    DISTRIBUTED BY HASH(id) BUCKETS 4
    PROPERTIES ("replication_num" = "1");
  2. Buat pekerjaan Routine Load yang membaca dari awal semua partisi:

    CREATE ROUTINE LOAD example_db.test1 ON test_table
    COLUMNS TERMINATED BY ",",
    COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
    PROPERTIES (
        "desired_concurrent_number" = "3",
        "max_batch_interval"        = "20",
        "max_batch_rows"            = "300000",
        "max_batch_size"            = "209715200",
        "strict_mode"               = "false"
    )
    FROM KAFKA (
        "kafka_broker_list"              = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic"                    = "my_topic",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
  3. Buat pekerjaan Routine Load dengan mode ketat diaktifkan:

    CREATE ROUTINE LOAD example_db.test2 ON test_table
    COLUMNS TERMINATED BY ",",
    COLUMNS(k1, k2, k3, v1, v2, v3 = k1 * 100)
    PROPERTIES (
        "desired_concurrent_number" = "3",
        "max_batch_interval"        = "20",
        "max_batch_rows"            = "300000",
        "max_batch_size"            = "209715200",
        "strict_mode"               = "true"
    )
    FROM KAFKA (
        "kafka_broker_list"              = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic"                    = "my_topic",
        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
    );
  4. Mulai mengonsumsi dari timestamp tertentu:

    CREATE ROUTINE LOAD example_db.test4 ON test_table
    PROPERTIES (
        "desired_concurrent_number" = "3",
        "max_batch_interval"        = "30",
        "max_batch_rows"            = "300000",
        "max_batch_size"            = "209715200"
    )
    FROM KAFKA (
        "kafka_broker_list"             = "broker1:9092,broker2:9092",
        "kafka_topic"                   = "my_topic",
        "property.kafka_default_offset" = "2024-01-21 10:00:00"
    );

Muat data JSON

Routine Load mendukung dua struktur pesan JSON:

  • Objek JSON tunggal — satu catatan per pesan:

    {"key1":"value1","key2":"value2","key3":"value3"}
  • Array JSON — beberapa catatan per pesan:

    [
        {"key1":"value11","key2":"value12","key3":"value13","key4":14},
        {"key1":"value21","key2":"value22","key3":"value23","key4":24}
    ]

Untuk mode impor multi-tabel, tambahkan prefiks nama tabel tujuan pada setiap pesan:

table_name|{"key1":"value1","key2":"value2","key3":"value3"}

Contoh: muat data JSON

  1. Buat tabel tujuan:

    CREATE TABLE `example_tbl` (
        `category`  VARCHAR(24)  NULL,
        `author`    VARCHAR(24)  NULL,
        `timestamp` BIGINT(20)   NULL,
        `dt`        INT(11)      NULL,
        `price`     DOUBLE REPLACE
    ) ENGINE=OLAP
    AGGREGATE KEY(`category`, `author`, `timestamp`, `dt`)
    PARTITION BY RANGE(`dt`) (
        PARTITION p0        VALUES [("-2147483648"), ("20230509")),
        PARTITION p20200509 VALUES [("20230509"), ("20231010")),
        PARTITION p20200510 VALUES [("20231010"), ("20231211")),
        PARTITION p20200511 VALUES [("20231211"), ("20240512"))
    )
    DISTRIBUTED BY HASH(`category`, `author`, `timestamp`) BUCKETS 4;
  2. Publikasikan kedua jenis pesan ke topik Kafka:

    {"category":"value1331","author":"value1233","timestamp":1700346050,"price":1413}
    [
        {"category":"value13z2","author":"vaelue13","timestamp":1705645251,"price":14330},
        {"category":"lvalue211","author":"lvalue122","timestamp":1684448450,"price":24440}
    ]
  3. Muat dalam mode sederhana (nama bidang sesuai dengan nama kolom):

    CREATE ROUTINE LOAD example_db.test_json_label_1 ON example_tbl
    COLUMNS(category, price, author)
    PROPERTIES (
        "desired_concurrent_number" = "3",
        "max_batch_interval"        = "20",
        "max_batch_rows"            = "300000",
        "max_batch_size"            = "209715200",
        "strict_mode"               = "false",
        "format"                    = "json"
    )
    FROM KAFKA (
        "kafka_broker_list"  = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic"        = "my_topic",
        "kafka_partitions"   = "0,1,2",
        "kafka_offsets"      = "0,0,0"
    );
  4. Muat dengan ekstraksi jalur JSON eksplisit dan pengupasan array luar:

    Kolom dt tidak muncul dalam data sumber. Nilainya diturunkan dari timestamp menggunakan dt=from_unixtime(timestamp,'%Y%m%d') dalam klausa COLUMNS.
    CREATE ROUTINE LOAD example_db.test_json_label_3 ON example_tbl
    COLUMNS(category, author, price, timestamp, dt=from_unixtime(timestamp, '%Y%m%d'))
    PROPERTIES (
        "desired_concurrent_number" = "3",
        "max_batch_interval"        = "20",
        "max_batch_rows"            = "300000",
        "max_batch_size"            = "209715200",
        "strict_mode"               = "false",
        "format"                    = "json",
        "jsonpaths"                 = "[\"$.category\",\"$.author\",\"$.price\",\"$.timestamp\"]",
        "strip_outer_array"         = "true"
    )
    FROM KAFKA (
        "kafka_broker_list"  = "broker1:9092,broker2:9092,broker3:9092",
        "kafka_topic"        = "my_topic",
        "kafka_partitions"   = "0,1,2",
        "kafka_offsets"      = "0,0,0"
    );

Koneksi ke kluster Kafka dengan otentikasi

ApsaraDB for SelectDB menggunakan library klien C++ librdkafka untuk terhubung ke Kafka. Untuk properti konfigurasi yang didukung, lihat referensi konfigurasi librdkafka.

Otentikasi SSL

Unggah file sertifikat yang diperlukan terlebih dahulu, lalu buat pekerjaan Routine Load.

  1. Unggah file sertifikat:

    CREATE FILE "ca.pem"     PROPERTIES ("url" = "https://example_url/kafka-key/ca.pem",     "catalog" = "kafka");
    CREATE FILE "client.key" PROPERTIES ("url" = "https://example_url/kafka-key/client.key", "catalog" = "kafka");
    CREATE FILE "client.pem" PROPERTIES ("url" = "https://example_url/kafka-key/client.pem", "catalog" = "kafka");
  2. Buat pekerjaan Routine Load:

    Properti

    Wajib

    Deskripsi

    property.security.protocol

    Selalu

    Atur ke ssl.

    property.ssl.ca.location

    Selalu

    Jalur ke sertifikat CA yang mengotentikasi kunci publik broker Kafka.

    property.ssl.certificate.location

    Hanya jika otentikasi klien diaktifkan di server Kafka

    Jalur ke sertifikat kunci publik klien.

    property.ssl.key.location

    Hanya jika otentikasi klien diaktifkan di server Kafka

    Jalur ke file kunci privat klien.

    property.ssl.key.password

    Hanya jika otentikasi klien diaktifkan di server Kafka

    Kata sandi untuk kunci privat klien.

    CREATE ROUTINE LOAD db1.job1 ON tbl1
    PROPERTIES (
        "desired_concurrent_number" = "1"
    )
    FROM KAFKA (
        "kafka_broker_list"                 = "broker1:9091,broker2:9091",
        "kafka_topic"                       = "my_topic",
        "property.security.protocol"        = "ssl",
        "property.ssl.ca.location"          = "FILE:ca.pem",
        "property.ssl.certificate.location" = "FILE:client.pem",
        "property.ssl.key.location"         = "FILE:client.key",
        "property.ssl.key.password"         = "abcdefg"
    );

Otentikasi PLAIN

CREATE ROUTINE LOAD db1.job1 ON tbl1
PROPERTIES (
    "desired_concurrent_number" = "1"
)
FROM KAFKA (
    "kafka_broker_list"          = "broker1:9092,broker2:9092",
    "kafka_topic"                = "my_topic",
    "property.security.protocol" = "SASL_PLAINTEXT",
    "property.sasl.mechanism"    = "PLAIN",
    "property.sasl.username"     = "admin",
    "property.sasl.password"     = "admin"
);

Properti

Nilai

Deskripsi

property.security.protocol

SASL_PLAINTEXT

Gunakan Simple Authentication and Security Layer (SASL) plaintext.

property.sasl.mechanism

PLAIN

Mekanisme SASL.

property.sasl.username

Username SASL.

property.sasl.password

Password SASL.

Otentikasi Kerberos

Sebelum membuat pekerjaan, deploy klien Kerberos kinit di semua node dalam kluster ApsaraDB for SelectDB Anda, konfigurasikan krb5.conf, dan tentukan layanan Key Distribution Center (KDC).

CREATE ROUTINE LOAD db1.job1 ON tbl1
PROPERTIES (
    "desired_concurrent_number" = "1"
)
FROM KAFKA (
    "kafka_broker_list"                   = "broker1:9092,broker2:9092",
    "kafka_topic"                         = "my_topic",
    "property.security.protocol"          = "SASL_PLAINTEXT",
    "property.sasl.kerberos.service.name" = "kafka",
    "property.sasl.kerberos.keytab"       = "/etc/krb5.keytab",
    "property.sasl.kerberos.principal"    = "id@your.com"
);

Properti

Deskripsi

property.security.protocol

Atur ke SASL_PLAINTEXT.

property.sasl.kerberos.service.name

Nama layanan broker Kafka.

property.sasl.kerberos.keytab

Jalur absolut ke file lokal .keytab. Proses ApsaraDB for SelectDB harus memiliki akses baca ke file ini.

property.sasl.kerberos.principal

Pihak yang berwenang Kerberos yang digunakan untuk terhubung ke kluster Kafka.

Ubah pekerjaan Routine Load

Hanya pekerjaan dalam status PAUSED yang dapat diubah.

Sintaks

ALTER ROUTINE LOAD FOR [db.]job_name
[job_properties]
FROM data_source
[data_source_properties]

Parameter yang dapat diubah

job_properties — parameter berikut dapat diubah:

  • desired_concurrent_number

  • max_error_number

  • max_batch_interval

  • max_batch_rows

  • max_batch_size

  • jsonpaths

  • json_root

  • strip_outer_array

  • strict_mode

  • timezone

  • num_as_string

  • fuzzy_parse

data_source_properties — hanya properti Kafka yang dapat diubah:

  • kafka_partitions

  • kafka_offsets

  • kafka_broker_list

  • kafka_topic

  • Properti kustom (misalnya, property.group.id)

kafka_partitions dan kafka_offsets hanya dapat mengubah offset partisi yang sedang dikonsumsi. Menambahkan partisi baru tidak didukung.

Contoh

Ubah konkurensi pekerjaan db1.label1 menjadi 1:

ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES (
    "desired_concurrent_number" = "1"
);

Ubah konkurensi menjadi 10 dan sesuaikan offset partisi serta ID kelompok konsumen:

ALTER ROUTINE LOAD FOR db1.label1
PROPERTIES (
    "desired_concurrent_number" = "10"
)
FROM KAFKA (
    "kafka_partitions"  = "0, 1, 2",
    "kafka_offsets"     = "100, 200, 100",
    "property.group.id" = "new_group"
);

Jeda pekerjaan Routine Load

Sintaks

PAUSE ROUTINE LOAD FOR [db.]job_name;
PAUSE ALL ROUTINE LOAD;

Contoh

Jeda pekerjaan test1:

PAUSE ROUTINE LOAD FOR test1;

Jeda semua pekerjaan Routine Load di database saat ini:

PAUSE ALL ROUTINE LOAD;

Lanjutkan pekerjaan Routine Load

Pekerjaan yang dilanjutkan akan terus mengonsumsi dari offset terakhir yang dikomit.

Sintaks

RESUME ROUTINE LOAD FOR [db.]job_name;
RESUME ALL ROUTINE LOAD;

Contoh

Lanjutkan pekerjaan test1:

RESUME ROUTINE LOAD FOR test1;

Lanjutkan semua pekerjaan Routine Load yang dijeda di database saat ini:

RESUME ALL ROUTINE LOAD;

Hentikan pekerjaan Routine Load

Pekerjaan yang dihentikan tidak dapat dimulai ulang, dan data yang diimpor sebelum penghentian tidak dapat dikembalikan.

Sintaks

STOP ROUTINE LOAD FOR [db.]job_name;

Contoh

STOP ROUTINE LOAD FOR test1;

Lihat status pekerjaan Routine Load

Sintaks

SHOW [ALL] ROUTINE LOAD [FOR [db.]job_name];

Tanpa ALL, hanya pekerjaan yang sedang berjalan dan dijeda yang dikembalikan. Dengan ALL, pekerjaan yang dihentikan dan dibatalkan juga disertakan.

Bidang output

Bidang

Deskripsi

Id

ID pekerjaan, ditetapkan secara otomatis oleh ApsaraDB for SelectDB.

Name

Nama pekerjaan.

CreateTime

Waktu pekerjaan dibuat.

PauseTime

Waktu terbaru pekerjaan dijeda.

EndTime

Waktu pekerjaan berakhir (dihentikan atau dibatalkan).

State

Status pekerjaan saat ini: NEED_SCHEDULED, RUNNING, PAUSED, STOPPED, atau CANCELLED.

DataSourceType

Jenis sumber data. Selalu KAFKA untuk pekerjaan Routine Load.

CurrentTaskNum

Jumlah task yang sedang berjalan.

ErrorLogUrls

URL yang menunjuk ke log error. Buka URL apa pun di browser untuk melihat detail baris yang tidak valid.

Contoh

Kueri semua pekerjaan bernama test1, termasuk yang dihentikan dan dibatalkan:

SHOW ALL ROUTINE LOAD FOR test1;

Kueri hanya pekerjaan yang sedang berjalan bernama test1:

SHOW ROUTINE LOAD FOR test1;

Kueri semua pekerjaan di example_db, termasuk yang dihentikan dan dibatalkan:

USE example_db;
SHOW ALL ROUTINE LOAD;

Kueri hanya pekerjaan yang sedang berjalan di example_db:

USE example_db;
SHOW ROUTINE LOAD;

Kueri pekerjaan tertentu berdasarkan database dan nama:

SHOW ROUTINE LOAD FOR example_db.test1;

Konfigurasi sistem

Parameter FE dan BE berikut memengaruhi perilaku Routine Load. Semua parameter FE dapat dimodifikasi saat runtime.

Parameter

Cakupan

Default

Deskripsi

max_routine_load_task_concurrent_num

FE

5

Maksimum task konkuren yang dapat dijalankan sekaligus untuk pekerjaan Routine Load. Nilai default direkomendasikan. Mengatur terlalu tinggi dapat menghabiskan sumber daya kluster.

max_routine_load_task_num_per_be

FE

5

Maksimum task konkuren pada setiap node BE. Nilai default direkomendasikan.

max_routine_load_job_num

FE

100

Maksimum jumlah pekerjaan Routine Load (dalam status NEED_SCHEDULED, RUNNING, atau PAUSED). Tidak ada pekerjaan baru yang dapat dibuat setelah batas ini tercapai.

max_consumer_num_per_group

BE

3

Maksimum konsumen yang dihasilkan per task. Misalnya, task yang mengonsumsi 6 partisi Kafka menghasilkan 3 konsumen, masing-masing menangani 2 partisi.

max_tolerable_backend_down_num

FE

0

Maksimum jumlah node BE yang boleh mati sebelum penjadwalan ulang pekerjaan otomatis diblokir. Nilai 0 berarti semua node BE harus aktif untuk penjadwalan ulang.

period_of_auto_resume_min

FE

5 menit

ApsaraDB for SelectDB mencoba menjadwalkan ulang pekerjaan yang dijeda hingga 3 kali dalam jendela ini. Setelah 3 kali gagal, pekerjaan dikunci dan memerlukan intervensi manual untuk dilanjutkan.

Pertimbangan lainnya

Routine Load dan perubahan skema

  • Pekerjaan Routine Load tidak memblokir operasi SCHEMA CHANGE atau ROLLUP.

  • Setelah SCHEMA CHANGE, jika kolom sumber tidak lagi sesuai dengan tabel tujuan, jumlah baris error meningkat dan pekerjaan mungkin dijeda. Untuk mencegah hal ini, gunakan pemetaan kolom eksplisit dalam klausa COLUMNS dan definisikan kolom tujuan sebagai NULLABLE atau dengan nilai DEFAULT.

  • Jika partisi dihapus, pekerjaan dijeda karena partisi target tidak ditemukan.

Routine Load dan operasi tulis lainnya

  • Routine Load tidak bertentangan dengan operasi LOAD atau INSERT.

  • Sebelum menjalankan operasi DELETE pada tabel, jeda pekerjaan Routine Load untuk tabel tersebut dan tunggu hingga semua task yang sedang berjalan selesai.

Routine Load dan penghapusan tabel atau database

Jika tabel atau database tujuan dihapus, pekerjaan Routine Load secara otomatis dibatalkan.

Pembuatan topik Kafka otomatis

Jika topik yang ditentukan dalam CREATE ROUTINE LOAD tidak ada, broker Kafka mungkin membuatnya secara otomatis berdasarkan pengaturan auto.create.topics.enable:

  • true: Kafka membuat topik secara otomatis dengan partisi num.partitions. Pekerjaan langsung membacanya.

  • false: Pekerjaan dijeda hingga topik dibuat dan data tersedia.

Persyaratan akses jaringan

  • Semua broker yang tercantum dalam kafka_broker_list harus dapat dijangkau dari kluster ApsaraDB for SelectDB Anda.

  • Jika advertised.listeners dikonfigurasi di Kafka, alamat yang diiklankan juga harus dapat dijangkau.

STOPPED vs. PAUSED

Status

Perilaku

PAUSED

Pekerjaan dapat dilanjutkan dengan RESUME ROUTINE LOAD.

STOPPED

Pekerjaan berakhir secara permanen dan dibersihkan oleh FE secara berkala. Tidak dapat dimulai ulang.