全部产品
Search
文档中心

E-MapReduce:Membatasi Lalu Lintas O&M Broker Kafka

更新时间:Jun 24, 2025

Topik ini menjelaskan cara membatasi lalu lintas yang dihasilkan oleh operasi O&M pada kluster Kafka, mencegah gangguan terhadap lalu lintas bisnis normal. Contoh ini menggunakan Kafka 2.4.1 dalam pembuatan kluster E-MapReduce (EMR).

Informasi latar belakang

Lalu lintas O&M adalah lalu lintas I/O yang dihasilkan oleh operasi O&M. Fitur pembatasan dapat diaktifkan dalam skenario berikut untuk mengontrol lalu lintas O&M:
  • Skenario penugasan ulang partisi.
  • Skenario pemindahan replika antar direktori dalam node.
  • Skenario sinkronisasi data replika selama pemulihan broker dalam kluster.

Catatan penggunaan

  • Tentukan apakah akan membatasi lalu lintas O&M berdasarkan komposisi lalu lintas, skenario bisnis, dan kebutuhan operasional Kafka.
  • Ambang batas pembatasan harus ditetapkan sesuai dengan skenario bisnis tertentu. Nilai ambang batas yang terlalu kecil dapat menyebabkan kegagalan operasi O&M, sedangkan nilai yang terlalu besar dapat menyebabkan masalah seperti persaingan I/O atau bandwidth penuh, yang memengaruhi lalu lintas bisnis normal. Evaluasi sumber daya kluster untuk menentukan ambang batas yang sesuai.
  • Ambang batas pembatasan harus dipertimbangkan berdasarkan faktor-faktor seperti volume lalu lintas bisnis dalam topik, latensi yang dapat ditoleransi, kemungkinan gangguan layanan Kafka, serta kapasitas I/O disk dan jaringan dalam kluster Kafka.
  • Operasi O&M disarankan dilakukan selama jam-jam sepi.

Membatasi Lalu Lintas O&M Kafka

Parameter fitur pembatasan

ParameterDeskripsi
leader.replication.throttled.replicasReplika pemimpin dari partisi yang lalu lintas O&M-nya perlu dibatasi dalam sebuah topik.

Konfigurasikan parameter ini dalam format [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... atau gunakan tanda bintang (*) untuk menentukan semua replika pemimpin dari partisi dalam topik.

follower.replication.throttled.replicasReplika pengikut dari partisi yang lalu lintas O&M-nya perlu dibatasi dalam sebuah topik.

Konfigurasikan parameter ini dalam format [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... atau gunakan tanda bintang (*) untuk menentukan semua replika pengikut dari partisi dalam topik.

leader.replication.throttled.rateLalu lintas baca replika pemimpin pada broker.
follower.replication.throttled.rateLalu lintas tulis replika pengikut pada broker.

Memeriksa parameter fitur pembatasan

Gunakan perintah kafka-configs.sh untuk memeriksa parameter fitur pembatasan.

  • Jalankan perintah berikut untuk memeriksa parameter broker yang ditentukan:
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <your broker id> --describe
  • Jalankan perintah berikut untuk memeriksa parameter topik yang ditentukan:
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <your topic name> --describe

Membatasi lalu lintas O&M dalam skenario di mana partisi ditetapkan ulang

null
  • Nilai ambang batas pembatasan tidak boleh terlalu kecil, karena dapat menyebabkan kegagalan penugasan ulang partisi.
  • Fitur pembatasan tidak memengaruhi lalu lintas operasi fetch normal pada replika.
  • Setelah pekerjaan selesai, gunakan parameter verifikasi untuk menghapus parameter fitur pembatasan dari topik dan broker.
  • Jika parameter fitur pembatasan sudah dikonfigurasi, jalankan perintah execute untuk memodifikasinya.
  • Jika belum dikonfigurasi, gunakan perintah kafka-configs.sh untuk memodifikasi parameter leader.replication.throttled.replicas dan follower.replication.throttled.replicas untuk topik, serta leader.replication.throttled.rate dan follower.replication.throttled.rate untuk broker.

Alat kafka-reassign-partitions.sh digunakan untuk menetapkan ulang partisi, dengan parameter fitur pembatasan untuk menentukan ambang batas. Berikut adalah contoh skenario:

  1. Buat topik uji.
    1. Masuk ke node master kluster Kafka melalui SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
    2. Jalankan perintah berikut untuk membuat topik:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      Jalankan perintah berikut untuk memeriksa detail topik:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. Jalankan perintah berikut untuk mensimulasikan penulisan data:
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. Konfigurasikan parameter fitur pembatasan dan tetapkan ulang partisi.
    1. Buat file bernama reassign.json untuk penugasan ulang partisi dan tambahkan konten berikut:
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
    2. Jalankan perintah berikut untuk menetapkan ulang partisi:
      Dalam contoh ini, kecepatan penulisan simulasi adalah 10 Mbit/s. Tetapkan ambang batas pembatasan menjadi 30 Mbit/s untuk penugasan ulang partisi.
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
  4. Periksa parameter fitur pembatasan.
    • Jalankan perintah berikut untuk memeriksa parameter broker yang ditentukan:
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe
    • Jalankan perintah berikut untuk memeriksa parameter topik yang ditentukan:
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
  5. Lihat hasil pekerjaan.
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    null Setelah pekerjaan selesai, jalankan perintah di atas lagi untuk menghapus parameter fitur pembatasan.

Membatasi lalu lintas O&M dalam skenario di mana replika dalam node dipindahkan ke direktori yang berbeda

Alat kafka-reassign-partitions.sh dapat digunakan untuk memigrasi replika dalam broker. Parameter replica-alter-log-dirs-throttle membatasi migrasi I/O dalam broker. Berikut adalah contoh skenario:

  1. Buat topik uji.
    1. Masuk ke node master kluster Kafka melalui SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
    2. Jalankan perintah berikut untuk membuat topik:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      Jalankan perintah berikut untuk memeriksa detail topik:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. Jalankan perintah berikut untuk mensimulasikan penulisan data:
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. Konfigurasikan parameter replica-alter-log-dirs-throttle dan pindahkan replika.
    1. Buat file bernama reassign.json dan tuliskan direktori tujuan ke file tersebut. Tambahkan konten berikut:
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}
    2. Jalankan perintah berikut untuk memindahkan replika:
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  4. Periksa parameter fitur pembatasan.
    Jika Anda memindahkan replika antara direktori dalam broker, parameter Brokerreplica.alter.log.dirs.io.max.bytes.per.second digunakan untuk menentukan ambang batas pembatasan pada broker.
    Jalankan perintah berikut untuk memeriksa parameter broker yang ditentukan:
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  5. Lihat hasil pekerjaan.
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    null Setelah pekerjaan selesai, jalankan perintah di atas lagi untuk menghapus parameter fitur pembatasan.

Membatasi lalu lintas O&M dalam skenario di mana data dalam replika disinkronkan ketika broker dalam kluster dipulihkan

null
  • Nilai ambang batas pembatasan tidak boleh terlalu kecil, karena dapat menyebabkan kegagalan penugasan ulang partisi.
  • Fitur pembatasan tidak memengaruhi lalu lintas operasi fetch normal pada replika.
  • Setelah data dipulihkan, jalankan perintah kafka-configs.sh untuk menghapus parameter fitur pembatasan.

Setelah broker dihidupkan ulang, ia menyinkronkan data replika dari replika pemimpin. Dalam skenario seperti migrasi broker atau perbaikan disk yang rusak, broker perlu menyinkronkan data replika yang hilang untuk pemulihan data. Ini menghasilkan sejumlah besar lalu lintas sinkronisasi. Dalam skenario seperti itu, lalu lintas sinkronisasi harus dibatasi untuk mencegah gangguan terhadap lalu lintas normal akibat lonjakan lalu lintas pemulihan data. Berikut adalah contoh skenario:

  1. Buat topik uji.
    1. Masuk ke node master kluster Kafka melalui SSH. Untuk informasi lebih lanjut, lihat Masuk ke Kluster.
    2. Jalankan perintah berikut untuk membuat topik:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      Jalankan perintah berikut untuk memeriksa detail topik:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. Jalankan perintah berikut untuk menulis data uji:
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. Jalankan perintah kafka-configs.sh untuk mengonfigurasi parameter fitur pembatasan.
    // Konfigurasikan parameter fitur pembatasan untuk topik uji. 
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"
    // Konfigurasikan parameter fitur pembatasan untuk broker. 
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
    ......
  4. Hentikan Broker 1 di konsol EMR.
  5. Hapus data replika dari Broker 1 untuk mensimulasikan skenario kehilangan data.
    rm -rf /mnt/disk2/kafka/log/test-throttled-0/
  6. Mulai Broker 1 di konsol EMR dan periksa apakah parameter fitur pembatasan berlaku.
  7. Setelah data replika dipulihkan di Broker 1 dan replika muncul dalam daftar in-sync replica (ISR), jalankan perintah kafka-configs.sh untuk menghapus parameter fitur pembatasan.
    // Hapus parameter fitur pembatasan dari topik uji. 
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled
    // Hapus parameter fitur pembatasan dari broker.
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
    ......