This topic describes how to throttle the traffic that is generated by the O&M operations on Kafka clusters. This prevents normal business traffic from being affected by O&M traffic. In this example, Kafka 2.4.1 is selected during the creation of an E-MapReduce (EMR) cluster.

Background information

O&M traffic is the I/O traffic that is generated by O&M operations. You can enable the throttling feature in the following O&M scenarios to throttle O&M traffic:
  • Scenarios where partitions are reassigned
  • Scenarios where replicas within a node are moved to different directories
  • Scenarios where data in replicas is synchronized when brokers in a cluster are recovered

Usage notes

  • You must determine whether to throttle O&M traffic based on the traffic compositions, business scenarios, and O&M scenarios of Kafka.
  • The throttling threshold of O&M traffic must be specified based on specific business scenarios. In most cases, if the value of the throttling threshold is small, O&M operations cannot be performed successfully. A large value of the throttling threshold may cause issues such as I/O competition or fully loaded bandwidth. This may affect normal business traffic. You must evaluate the resources in a cluster and specify an appropriate throttling threshold.
  • The throttling threshold must be specified based on factors such as the amount of business traffic in topics, the latency that the business can withstand, whether the Kafka service can be interrupted in business scenarios, and the I/O bandwidth of disks and networks in a Kafka cluster.
  • In most cases, we recommend that you perform O&M operations during off-peak hours.

Throttle the O&M traffic of Kafka

Parameters of the throttling feature

ParameterDescription
leader.replication.throttled.replicasThe leader replicas of the partitions whose O&M traffic needs to be throttled in a topic.

Configure this parameter in the format of [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or use an asterisk (*) to specify all the leader replicas of the partitions in the topic.

follower.replication.throttled.replicasThe follower replicas of the partitions whose O&M traffic needs to be throttled in a topic.

Configure this parameter in the format of [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or use an asterisk (*) to specify all the follower replicas of the partitions in the topic.

leader.replication.throttled.rateThe read traffic of leader replicas on a broker.
follower.replication.throttled.rateThe write traffic of follower replicas on a broker.

Query the parameters of the throttling feature

You can run the kafka-configs.sh command to query the parameters of the throttling feature.

  • Run the following command to query the parameters of the specified broker:
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name <your broker id> --describe
  • Run the following command to query the parameters of the specified topic:
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name <your topic name> --describe

Throttle O&M traffic in scenarios where partitions are reassigned

Important
  • The value of the throttling threshold cannot be too small. Otherwise, partition reassignment may fail to be triggered.
  • The throttling feature does not throttle the traffic of normal fetch operations on replicas.
  • After a job is complete, you must use the verify parameter to remove the parameters of the throttling feature from topics and brokers.
  • If you have already configured the parameters of the throttling feature, you can run the execute command to modify the parameters.
  • If you have not configured the parameters of the throttling feature, you can run the kafka-configs.sh command to modify the leader.replication.throttled.replicas and follower.replication.throttled.replicas parameters for a topic, and modify the leader.replication.throttled.rate and follower.replication.throttled.rate parameters for a broker.

In most cases, the kafka-reassign-partitions.sh tool is used to reassign partitions, and the parameters of the throttling feature are used to specify the throttling threshold. The following section provides an example of this scenario:

  1. Create a test topic.
    1. Log on to the master node of your Kafka cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to create a topic:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      Run the following command to query the details of the topic:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. Run the following command to simulate data writing:
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. Configure the parameters of the throttling feature and reassign partitions.
    1. Create a file named reassign.json for partition reassignment and add the following content to the file:
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","any","any"]}]}
    2. Run the following command to reassign partitions:
      In this example, the simulated writing speed is 10 Mbit/s. Set the throttling threshold to 30 Mbit/s for partition reassignment.
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --throttle 30000000 --execute
  4. Query the parameters of the throttling feature.
    • Run the following command to query the parameters of the specified broker:
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 2 --describe
    • Run the following command to query the parameters of the specified topic:
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name test-throttled --describe
  5. View the results of the job.
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    Note After the job is complete, run the preceding command again to remove the parameters of the throttling feature.

Throttle O&M traffic in scenarios where replicas within a node are moved to different directories

The kafka-reassign-partitions.sh tool can be used to migrate replicas within a broker. The replica-alter-log-dirs-throttle parameter can be used to limit the migration I/O within a broker. The following section provides an example of this scenario:

  1. Create a test topic.
    1. Log on to the master node of your Kafka cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to create a topic:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      Run the following command to query the details of the topic:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. Run the following command to simulate data writing:
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. Configure the replica-alter-log-dirs-throttle parameter and move replicas.
    1. Create a file named reassign.json and write the destination directory to the reassign.json file. Add the following content to the file:
      {"version":1,"partitions":[{"topic":"test-throttled","partition":0,"replicas":[2,0,3],"log_dirs":["any","/mnt/disk1/kafka/log","any"]}]}
    2. Run the following command to move replicas:
      kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --replica-alter-log-dirs-throttle 30000000 --execute
  4. Query the parameters of the throttling feature.
    If you move replicas between directories within a broker, the Brokerreplica.alter.log.dirs.io.max.bytes.per.second parameter is used to specify the throttling threshold on the broker.
    Run the following command to query the parameters of the specified broker:
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --describe --entity-name 0
  5. View the results of the job.
    kafka-reassign-partitions.sh --bootstrap-server core-1-1:9092 --reassignment-json-file reassign.json --verify
    Note After the job is complete, run the preceding command again to remove the parameters of the throttling feature.

Throttle O&M traffic in scenarios where data in replicas is synchronized when brokers in a cluster are recovered

Important
  • The value of the throttling threshold cannot be too small. Otherwise, partition reassignment may fail to be triggered.
  • The throttling feature does not throttle the traffic of normal fetch operations on replicas.
  • After data is recovered, you must run the kafka-configs.sh command to remove the parameters of the throttling feature.

After a broker is restarted, it synchronizes replica data from leader replicas. In scenarios such as broker migration and damaged disk repair, brokers need to synchronize the lost replica data for data recovery. This results in a large amount of synchronization traffic. In such scenarios, the synchronization traffic must be throttled to prevent normal traffic from being affected by the traffic surges caused by data recovery. The following section provides an example of this scenario:

  1. Create a test topic.
    1. Log on to the master node of your Kafka cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to create a topic:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --partitions 1 --replication-factor 3 --create
      Run the following command to query the details of the topic:
      kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test-throttled --describe
  2. Run the following command to write test data:
    kafka-producer-perf-test.sh --topic test-throttled --record-size 1000 --num-records 600000000 --print-metrics --throughput 10240 --producer-props acks=-1  linger.ms=0 bootstrap.servers=core-1-1:9092
  3. Run the kafka-configs.sh command to configure the parameters of the throttling feature.
    // Configure the parameters of the throttling feature for the test topic. 
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type topics --entity-name test-throttled --alter --add-config "leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*"
    // Configure the parameters of the throttling feature for brokers. 
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 0
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 1
    kafka-configs.sh --bootstrap-server core-1-1:9092 --entity-type brokers --alter --add-config "leader.replication.throttled.rate=1024,follower.replication.throttled.rate=1024" --entity-name 2
    ......
  4. Stop Broker 1 in the EMR console.
  5. Delete replica data from Broker 1 to simulate a data loss scenario.
    rm -rf /mnt/disk2/kafka/log/test-throttled-0/
  6. Start Broker 1 in the EMR console and check whether the parameters of the throttling feature take effect.
  7. After replica data is recovered on Broker 1 and the replicas appear in the in-sync replica (ISR) list, run the kafka-configs.sh command to remove the parameters of the throttling feature.
    // Remove the parameters of the throttling feature from the test topic. 
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas' --entity-name test-throttled
    // Remove the parameters of the throttling feature from brokers.
    kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --alter --delete-config 'leader.replication.throttled.replicas,follower.replication.throttled.replicas,leader.replication.throttled.rate,follower.replication.throttled.rate' --entity-name 0
    ......