This topic describes the usage notes, parameters, and examples for using Kafka Rebalancer. In this topic, E-MapReduce (EMR) Kafka V2.4.1 is used in the example.

Background information

When you manage a Kafka cluster, you may encounter the following issues:
  • Uneven message distribution among leader partitions: This leads to unbalanced load among brokers and decreased read/write throughput.
  • Unbalanced data distribution among brokers: This results in higher disk usage of some brokers compared with the average disk usage of the cluster, which increases the risk of broker breakdown.
  • Unbalanced disk usage in a broker: The usage of some disks is significantly higher than the average disk usage in a broker, which increases the risks of replica offline and even broker breakdown.
  • Hot topics: This causes unbalanced load among disks.

If the preceding issues occur, load balancing is required. To perform load balancing, you need to perform operations such as electing new leader partition replicas and reassigning partitions. Kafka provides tools such as kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh for load balancing. However, you must configure these tools before you use them, which increases the O&M workload and difficulties.

The Rebalancer tool provided by EMR Kafka encapsulates tools such as kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh. This reduces the O&M workload and difficulties, and you can still use these tools for O&M tasks.

Usage notes

  • When you use Kafka Rebalancer, you must throttle the O&M traffic.
  • Kafka Rebalancer generates a reassignment file in the JSON format based on specific configuration items. You must check the generated file to ensure that the reassignment result meets your expectations.
  • To copy or move multiple partition replicas, you need to determine whether to use this tool for O&M based on the O&M duration. If the O&M task takes a long time, you can use the kafka-reassign-partitions.sh tool to split the O&M task. This way, you can perform the O&M task in different time periods.
  • If you want to monitor the O&M process by using the kafka-reassign-partitions.sh tool, you must manually save the JSON file of the reassignment configuration. This file is used as the input for the verify action of the kafka-reassign-partitions.sh tool.

Features

On the Elastic Compute Service (ECS) instance in an EMR cluster on which a broker is deployed, you can run kafka-rebalancer.sh file to view the features of this tool.
  • preferred-election: This feature allows you to balance the leadership in a topic. You need to set the topics parameter and balance the leadership for the specified topics. For more information, see preferred-election.

    This feature encapsulates the kafka-preferred-replica-election.sh tool.

  • balance-disks: This feature allows you to balance the distribution of disk partition replicas in a node based on disk usage. For more information, see balance-disks.
  • rebalance: This feature allows you to balance the distribution of disk partition replicas among nodes in a cluster based on disk usage. For more information, see rebalance.
  • remove-broker-ids: This feature allows you to remove all partition replicas from a broker. After replicas are removed from the broker, you can unpublish the broker. For more information, see remove-broker-ids.

preferred-election

If the leader partition replica is not on the preferred broker, unbalanced load may occur among brokers. In this case, the leadership must be balanced.

Parameters

topics: the topics in which preferred election is performed.

Examples

The following code shows how to trigger preferred election:
  1. Create a test topic.
    kafka-topics.sh --create --topic elelction-topic --bootstrap-server core-1-1:9092 --replication-factor 2 --partitions 50
  2. Trigger preferred election for the test topic.
    kafka-rebalancer.sh --zookeeper master-1-1:2181/emr-kafka --preferred-election --bootstrap-server core-1-1:9092 --topics elelction-topic

balance-disks

This feature is used to balance the allocation of partition replicas in a broker. This feature encapsulates the kafka-reassign-partitions.sh tool. Different from the kafka-reassign-partitions.sh tool, Kafka Rebalancer automatically generates a file to allocate partition replicas in a broker based on the disk usage of the broker.

Parameters

ParameterDescription
replica-alter-log-dirs-throttleThrottles the traffic consumed during the migration of replicas between log directories on a broker.
Note To prevent resource competition, you must set this parameter to a proper value when you use this feature.
thresholdThe difference threshold for the disk usage. Default value: 0.1. If the disk usage difference in a broker is greater than this value, replica migration is performed between disks of the broker.

Examples

  1. Use the kafka-rebalancer.sh tool to balance the replicas among disks in a broker.
    1. Create a test topic.
      kafka-topics.sh --create --topic balance-disks-topic --bootstrap-server core-1-1:9092 --replication-factor 2 --partitions 50
    2. Move partition replicas among disks of Broker 0 to simulate the scenario in which disk loads are unbalanced.
      mv /mnt/disk1/kafka/log/balance-disks-topic-* /mnt/disk2/kafka/log/
      mv /mnt/disk3/kafka/log/balance-disks-topic-* /mnt/disk4/kafka/log/
    3. Write test data to the test topic.
      kafka-producer-perf-test.sh --producer-props bootstrap.servers=core-1-1:9092 --num-records 70000000 --throughput 200000 --record-size 1000 --topic balance-disks-topic
    4. Balance the disk loads in Broker 0.
      kafka-rebalancer.sh --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --balance-disks 0 --replica-alter-log-dirs-throttle 50000000 --threshold 0.1
      Store the JSON string generated after "Current partition replica assignment" in the file named move.json.
      Important
      • You must store the JSON string generated after "Current partition replica movement" in the file named move.json. The JSON string is used as the value of the reassignment-json-file parameter for subsequent verification.
      • After the migration is complete, you can use the verify action in the kafka-reassign-partitions.sh tool to remove the throttling configuration parameter that is set for the topic and broker.
  2. Monitor and check the disk balance process.
    You can use the kafka-configs.sh tool to check the throttling parameter and the kafka-reassign-partitions.sh tool to check the migration process.
    1. Check whether the throttling parameter takes effect.
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 0 --describe
    2. View the disk balance task.
      kafka-reassign-partitions.sh --verify --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --reassignment-json-file move.json
      Note The input parameter move.json is the move.json file saved in Step 1.

rebalance

This feature is used to balance the allocation of partition replicas among brokers. This feature encapsulates the kafka-reassign-partitions.sh tool. Different from the kafka-reassign-partitions.sh tool, Kafka Rebalancer automatically generates a file to allocate partition replicas among brokers based on the disk usage of brokers.

Parameters

ParameterDescription
throttleThrottles the traffic consumed for partition replica migration during the process of reassignment.
Note To prevent resource competition, you must set this parameter to a proper value when you use this feature.
thresholdThe difference threshold for the disk usage. Default value: 0.1. If the disk usage difference in a broker is greater than this value, a rebalance task is triggered.

Examples

  1. Use the kafka-rebalancer.sh tool to balance the allocation of partition replicas among brokers.
    1. Create a test topic.
      kafka-topics.sh --create --topic rebalance-topic --bootstrap-server core-1-1:9092 --replica-assignment 0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1,0:1
    2. Write test data to the test topic.
      kafka-producer-perf-test.sh --topic rebalance-topic --num-records 7000000 --throughput 200000 --producer-props bootstrap.servers=core-1-1:9092 --record-size 1000
    3. View the rebalance task.
      kafka-rebalancer.sh --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --rebalance --throttle 100000000 --threshold 0.1
      Store the JSON string generated after "Current partition replica assignment" in the file named move.json.
      Important
      • You must store the JSON string generated after "Current partition replica movement" in the file named move.json. The JSON string is used as the value of the reassignment-json-file parameter for subsequent verification.
      • After the migration is complete, you can use the verify action in the kafka-reassign-partitions.sh tool to remove the throttling configuration parameter that is set for the topic and broker.
  2. Monitor and check the rebalance process.
    You can use the kafka-configs.sh tool to check the throttling parameter and the kafka-reassign-partitions.sh tool to check the migration process.
    1. Check whether the throttling parameter takes effect.
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 0 --describe
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name decommission-topic --describe
    2. View the rebalance task.
      kafka-reassign-partitions.sh --verify --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --reassignment-json-file move.json
      Note The input parameter move.json is the move.json file saved in Step 1.

remove-broker-ids

This feature is used to remove all partition replicas on a broker. If you want to unpublish a broker, you can use this feature to move all partition replicas on the broker to other brokers.

In most cases, a partition has three replicas. Therefore, you must retain three brokers in a cluster. We recommend that you do not unpublish brokers in a Kafka cluster that contains at most three brokers.

Parameters

throttle: throttles the traffic consumed for migrating partition replicas during the process of reassignment.
Note To prevent resource competition, you must set this parameter to a proper value when you use this feature.

Examples

  1. Use the kafka-rebalancer.sh tool to remove partition replicas from a broker.
    1. Create a test topic.
      kafka-topics.sh --create --topic decommission-topic --partitions 50 --replication-factor 2 --bootstrap-server core-1-1:9092
    2. Write test data to the test topic.
      kafka-producer-perf-test.sh --topic decommission-topic --num-records 70000000 --throughput 200000 --producer-props bootstrap.servers=core-1-1:9092 --record-size 1000
    3. Remove all the partition replicas from Broker 1.
      kafka-rebalancer.sh --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --remove-broker-ids 1 --throttle 50000000
      Store the JSON string generated after "Current partition replica assignment" in the file named move.json.
      Important
      • You must store the JSON string generated after "Current partition replica movement" in the file named move.json. The JSON string is used as the value of the reassignment-json-file parameter for subsequent verification.
      • After the migration is complete, you can use the verify action in the kafka-reassign-partitions.sh tool to remove the throttling configuration parameter that is set for the topic and broker.
  2. Monitor and check the remove process.
    You can use the kafka-configs.sh tool to check the throttling parameter, and the kafka-reassign-partitions.sh tool to check the migration of partition replicas. You can also use the kafka-log-dirs.sh tool to check whether the partition replicas to be removed from the broker are migrated to other brokers.
    1. Check whether the throttling parameter takes effect.
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type brokers --entity-name 0 --describe
      kafka-configs.sh --bootstrap-server core-1-1:9092 -entity-type topics --entity-name decommission-topic --describe
    2. View the rebalance task.
      kafka-reassign-partitions.sh --verify --bootstrap-server core-1-1:9092 --zookeeper master-1-1:2181/emr-kafka --reassignment-json-file move.json
      Note The input parameter move.json is the move.json file saved in Step 1.
    3. After partition replicas are removed from the broker, check the broker to ensure that no partition replica is left.
      kafka-log-dirs.sh --bootstrap-server core-1-1:9092 --broker-list 1 --describe