Partition reassignment, intra-broker replica migration, and broker recovery all generate replication I/O that can crowd out normal producer and consumer traffic. Apply throttling to cap that maintenance I/O so your business traffic is not affected.
This document applies to E-MapReduce (EMR) Kafka 2.4.1.
Throttling parameters
Four parameters control replication throttling. Two are set at the topic level to select which replicas to throttle; two are set at the broker level to cap throughput.
| Parameter | Scope | Description |
|---|---|---|
leader.replication.throttled.replicas |
Topic | Leader replicas whose maintenance traffic is throttled. Format: [PartitionId]:[BrokerId],[PartitionId]:[BrokerId],... or * for all leader replicas in the topic. |
follower.replication.throttled.replicas |
Topic | Follower replicas whose maintenance traffic is throttled. Same format as above. |
leader.replication.throttled.rate |
Broker | Read throughput cap for leader replicas on the broker (bytes per second). |
follower.replication.throttled.rate |
Broker | Write throughput cap for follower replicas on the broker (bytes per second). |
Usage notes
-
Determine whether to throttle O&M traffic based on the traffic compositions, business scenarios, and O&M scenarios of Kafka.
-
Base the throttling threshold on your actual business traffic volume, latency tolerance, whether the Kafka service can be briefly interrupted, and the I/O bandwidth of your disks and network.
-
A threshold that is too low can stall the maintenance operation entirely. A large value of the throttling threshold may cause issues such as I/O competition or fully loaded bandwidth, which may affect normal business traffic.
-
Schedule maintenance operations during off-peak hours when possible.
Query the current throttling configuration
Use kafka-configs.sh to inspect throttling parameters on any broker or topic.
Query a broker:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <broker-id> --describe
Query a topic:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <topic-name> --describe
Throttle traffic during partition reassignment
kafka-reassign-partitions.sh applies throttling directly via the --throttle flag. After the reassignment is complete, run --verify to remove the throttling parameters from the affected topics and brokers.
-
The throttle applies only to reassignment traffic, not to normal replica fetch operations.
-
To adjust the throttle while a reassignment is running, re-run
--executewith the new--throttlevalue. -
If the reassignment has already started and you did not use
--throttle, configureleader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate, andfollower.replication.throttled.ratedirectly withkafka-configs.sh.
Procedure:
-
Log in to the master node of your Kafka cluster over SSH. For details, see Log on to a cluster.
-
Create a test topic.
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --createVerify the topic details:
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe -
Simulate data writing to generate background traffic.
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092 -
Create a partition reassignment plan. Create a file named
reassign.jsonwith the following content:{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]} -
Execute the reassignment with a throttle. In this example, the simulated write speed is 10 Mbit/s; the throttle is set to 30 Mbit/s.
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --executeTo change the throttle while the reassignment is running, re-run the command with the new value:
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle <new-rate-in-bytes> --execute -
(Optional) Verify that the throttling parameters are applied. Check broker 2:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describeCheck the topic:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe -
Verify the reassignment and remove the throttle.
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verifyAfter the job is complete, run the preceding command again to remove the parameters of the throttling feature.
Throttle traffic when moving replicas between directories
Use kafka-reassign-partitions.sh with --replica-alter-log-dirs-throttle to limit intra-broker I/O during replica directory migration. The throttle is reflected in the broker-level parameter replica.alter.log.dirs.io.max.bytes.per.second.
Procedure:
-
Log in to the master node of your Kafka cluster over SSH. For details, see Log on to a cluster.
-
Create a test topic.
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --createVerify the topic details:
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe -
Simulate data writing.
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092 -
Create the reassignment plan with the target directory. Create a file named
reassign.jsonwith the following content:{"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]} -
Execute the replica move with a throttle.
kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute -
Verify that the throttling parameter is applied on the broker.
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0 -
Verify the move is complete and remove the throttle.
Note Run--verifyagain after the job completes to remove the throttling parameters.kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
Throttle traffic during broker recovery
After a broker restarts, it re-syncs replica data from leader replicas. In scenarios such as broker migration or disk repair, this sync traffic can be substantial. Configure throttling before stopping the broker to prevent recovery traffic from affecting business traffic.
-
The throttle applies only to maintenance replication, not to normal replica fetch operations.
-
After the broker recovers and its replicas rejoin the in-sync replica (ISR) list, remove the throttling parameters with
kafka-configs.sh. If you skip this step, the throttle continues to apply to all replication traffic.
Procedure:
-
Log in to the master node of your Kafka cluster over SSH. For details, see Log on to a cluster.
-
Create a test topic.
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --createVerify the topic details:
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe -
Write test data.
kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1 linger.ms=0 bootstrap.servers=core-1-1:9092 -
Configure throttling on the topic and on each broker before stopping Broker 1. Apply throttling to all replicas in the topic:
kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"Apply throttling rates to each broker. Repeat for every broker in the cluster:
kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1 kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2 ...... -
Stop Broker 1 in the EMR console.
-
Delete the replica data on Broker 1 to simulate data loss.
rm -rf /mnt/disk2/kafka/log/test-throttled-0/ -
Start Broker 1 in the EMR console and confirm the throttling parameters are active. Use the commands in Query the current throttling configuration to check.
-
After Broker 1's replicas are recovered and appear in the ISR list, remove the throttling parameters. Remove from the topic:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttledRemove from each broker:
kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0 ......