All Products
Search
Document Center

E-MapReduce:Dokumen

Last Updated:Mar 26, 2026

Penugasan ulang partisi, migrasi replika intra-broker, dan pemulihan broker semuanya menghasilkan I/O replikasi yang dapat mendesak traffic produsen dan konsumen normal. Terapkan pembatasan kecepatan untuk membatasi I/O maintenance tersebut agar traffic bisnis Anda tidak terpengaruh.

Dokumen ini berlaku untuk E-MapReduce (EMR) Kafka 2.4.1.

Parameter pengendalian aliran

Empat parameter mengatur pembatasan replikasi: dua diatur di tingkat topik untuk memilih replika mana yang akan dibatasi, dan dua diatur di tingkat broker untuk membatasi throughput.

ParameterRuang LingkupDeskripsi
leader.replication.throttled.replicasTopikReplika leader yang traffic maintenance-nya dibatasi. Format: [PartitionId]:[BrokerId],[PartitionId]:[BrokerId],... atau * untuk semua replika leader dalam topik tersebut.
follower.replication.throttled.replicasTopikReplika follower yang traffic maintenance-nya dibatasi. Formatnya sama seperti di atas.
leader.replication.throttled.rateBrokerBatas throughput baca untuk replika leader pada broker (byte per detik).
follower.replication.throttled.rateBrokerBatas throughput tulis untuk replika follower pada broker (byte per detik).

Catatan penggunaan

  • Tentukan apakah perlu menerapkan pengendalian aliran terhadap traffic O&M berdasarkan komposisi traffic, skenario bisnis, dan skenario O&M Kafka.

  • Tetapkan ambang batas pengendalian aliran berdasarkan volume traffic bisnis aktual Anda, toleransi latensi, kemampuan layanan Kafka untuk menanggung gangguan singkat, serta bandwidth I/O disk dan jaringan Anda.

  • Ambang batas yang terlalu rendah dapat menghentikan sepenuhnya operasi maintenance, sedangkan nilai ambang batas pengendalian aliran yang terlalu besar dapat menyebabkan persaingan I/O atau bandwidth penuh, yang berpotensi memengaruhi traffic bisnis normal.

  • Jadwalkan operasi maintenance pada jam sepi jika memungkinkan.

Kueri konfigurasi pengendalian aliran saat ini

Gunakan kafka-configs.sh untuk memeriksa parameter pengendalian aliran pada broker atau topik mana pun.

Kueri broker:

kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <broker-id> --describe

Kueri topik:

kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <topic-name> --describe

Pengendalian aliran traffic selama penugasan ulang partisi

kafka-reassign-partitions.sh menerapkan pengendalian aliran secara langsung melalui flag --throttle. Setelah penugasan ulang selesai, jalankan --verify untuk menghapus parameter pengendalian aliran dari topik dan broker yang terpengaruh.

Penting
  • Pengendalian aliran hanya berlaku untuk traffic penugasan ulang, bukan untuk operasi fetch replika normal.

  • Untuk menyesuaikan nilai throttle saat penugasan sedang berjalan, jalankan kembali --execute dengan nilai --throttle yang baru.

  • Jika penugasan telah dimulai dan Anda tidak menggunakan --throttle, konfigurasikan langsung leader.replication.throttled.replicas, follower.replication.throttled.replicas, leader.replication.throttled.rate, dan follower.replication.throttled.rate menggunakan kafka-configs.sh.

Prosedur:

  1. Login ke node master kluster Kafka Anda melalui SSH. Untuk detailnya, lihat Log on to a cluster.

  2. Buat topik uji coba.

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

    Verifikasi detail topik:

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  3. Simulasikan penulisan data untuk menghasilkan traffic latar belakang.

    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
  4. Buat rencana penugasan ulang partisi. Buat file bernama reassign.json dengan konten berikut:

    {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
  5. Jalankan penugasan ulang dengan throttle. Pada contoh ini, kecepatan tulis simulasi adalah 10 Mbit/dtk; throttle diatur ke 30 Mbit/dtk.

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute

    Untuk mengubah throttle saat penugasan sedang berjalan, jalankan kembali perintah tersebut dengan nilai baru:

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle <new-rate-in-bytes> --execute
  6. (Opsional) Verifikasi bahwa parameter pengendalian aliran telah diterapkan. Periksa broker 2:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe

    Periksa topik:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
  7. Verifikasi penugasan ulang dan hapus throttle.

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

    Setelah pekerjaan selesai, jalankan kembali perintah di atas untuk menghapus parameter fitur pengendalian aliran.

Pengendalian aliran traffic saat memindahkan replika antar direktori

Gunakan kafka-reassign-partitions.sh dengan --replica-alter-log-dirs-throttle untuk membatasi I/O intra-broker selama migrasi direktori replika. Throttle ini tercermin dalam parameter tingkat broker replica.alter.log.dirs.io.max.bytes.per.second.

Prosedur:

  1. Login ke node master kluster Kafka Anda melalui SSH. Untuk detailnya, lihat Log on to a cluster.

  2. Buat topik uji coba.

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

    Verifikasi detail topik:

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  3. Simulasikan penulisan data.

    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
  4. Buat rencana penugasan ulang dengan direktori target. Buat file bernama reassign.json dengan konten berikut:

    {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}
  5. Jalankan pemindahan replika dengan throttle.

    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  6. Verifikasi bahwa parameter pengendalian aliran telah diterapkan pada broker.

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  7. Verifikasi bahwa pemindahan telah selesai dan hapus throttle.

    Catatan Jalankan kembali --verify setelah pekerjaan selesai untuk menghapus parameter pengendalian aliran.
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify

Pengendalian aliran traffic selama pemulihan broker

Setelah broker dinyalakan ulang, broker tersebut melakukan sinkronisasi ulang data replika dari replika leader. Dalam skenario seperti migrasi broker atau perbaikan disk, traffic sinkronisasi ini bisa sangat besar. Konfigurasikan pengendalian aliran sebelum menghentikan broker untuk mencegah traffic pemulihan memengaruhi traffic bisnis.

Penting
  • Pengendalian aliran hanya berlaku untuk replikasi maintenance, bukan untuk operasi fetch replika normal.

  • Setelah broker pulih dan replikanya kembali masuk ke daftar in-sync replica (ISR), hapus parameter pengendalian aliran dengan kafka-configs.sh. Jika langkah ini dilewati, throttle akan tetap berlaku untuk seluruh traffic replikasi.

Prosedur:

  1. Login ke node master kluster Kafka Anda melalui SSH. Untuk detailnya, lihat Log on to a cluster.

  2. Buat topik uji coba.

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create

    Verifikasi detail topik:

    kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  3. Tulis data uji coba.

    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092
  4. Konfigurasikan pengendalian aliran pada topik dan pada setiap broker sebelum menghentikan Broker 1. Terapkan pengendalian aliran ke semua replika dalam topik:

    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"

    Terapkan laju pengendalian aliran ke setiap broker. Ulangi untuk setiap broker dalam kluster:

    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
    ......
  5. Hentikan Broker 1 di Konsol EMR.

  6. Hapus data replika pada Broker 1 untuk mensimulasikan kehilangan data.

    rm -rf /mnt/disk2/kafka/log/test-throttled-0/
  7. Nyalakan Broker 1 di Konsol EMR dan pastikan parameter pengendalian aliran aktif. Gunakan perintah di Kueri konfigurasi pengendalian aliran saat ini untuk memeriksa.

  8. Setelah replika Broker 1 pulih dan muncul dalam daftar ISR, hapus parameter pengendalian aliran. Hapus dari topik:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled

    Hapus dari setiap broker:

    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
    ......