症状
コンシューマークライアントが StickyAssignor パーティション割り当て戦略を使用すると、複数のコンシューマースレッドが同じパーティションを消費します。これにより、重複または順序不同のメッセージ処理が発生します。
原因
これは、Apache Kafka クライアントバージョン 2.3 以前における既知のバグ (KAFKA-7026 / KIP-341) です。StickyAssignor は、コンシューマーが古い割り当てデータでグループに再参加する際に、パーティション割り当ての重複排除を行いません。
次のシナリオでこの問題が再現されます。
コンシューマー C1 がリーダーとしてコンシューマーグループに参加し、パーティション
test-0が割り当てられます。コンシューマー C2 が同じグループに参加します。C1 は
test-0を保持し、C2 にはパーティションが割り当てられません。C1 が応答しなくなります (例えば、長い GC ポーズのため)。C2 が新しいリーダーになり、
test-0を引き継ぎます。C1 が回復し、古い割り当て (
test-0) でグループに再参加します。リバランス中に、C1 と C2 の両方がtest-0を既存の割り当てとして報告します。StickyAssignorは重複をチェックしないため、test-0を両方のコンシューマーに割り当てます。
ソリューション
オプション 1: Kafka クライアントを 2.3 以降にアップグレード (推奨)
このバグは Apache Kafka 2.3 で修正されています。ご利用のアプリケーションの Kafka クライアント依存関係をアップグレードしてください。
<!-- Maven の例 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version> <!-- またはそれ以降 -->
</dependency>オプション 2: 別のパーティション割り当て戦略への切り替え
すぐにアップグレードできない場合は、別のパーティション割り当て戦略に切り替えてください。次の表に、利用可能な戦略を示します。
| 戦略 | クラス名 | 説明 | トレードオフ |
|---|---|---|---|
| Range (デフォルト) | RangeAssignor | 各トピックのパーティションをコンシューマー全体に均等に分散します。 | シンプルで予測可能です。パーティション数がコンシューマー数の倍数でない場合、不均一な分散になる可能性があります。 |
| RoundRobin | RoundRobinAssignor | すべてのコンシューマーにラウンドロビン方式でパーティションを1つずつ割り当てます。 | Range よりもバランスが取れています。リバランス中にパーティションの移動が多くなる可能性があります。 |
| 連携スティッキー | CooperativeStickyAssignor | StickyAssignor と同じバランシングロジックですが、協調型リバランスプロトコルを使用してストップ・ザ・ワールドのリバランスを回避します。 | パーティションの移動を最小限に抑え、重複割り当てバグを回避します。イーガープロトコルアサイナーからの2段階の移行が必要です。 |
戦略を変更するには、partition.assignment.strategy コンシューマー構成プロパティを設定します。
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");CooperativeStickyAssignor への移行
実行中のコンシューマーグループでダウンタイムなしに StickyAssignor から CooperativeStickyAssignor へ移行するには、2段階のローリングリスタートを実行します。
現在の戦略に加えて、
CooperativeStickyAssignorをセカンダリ戦略として追加します。その後、すべてのコンシューマーのローリングリスタートを実行します。props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.StickyAssignor," + "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");すべてのコンシューマーが新しい構成を適用した後、
CooperativeStickyAssignorのみに切り替えます。その後、再度ローリングリスタートを実行します。props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor");
クライアントバージョン 2.3 以前ではStickyAssignorを使用しないでください。修正後も、CooperativeStickyAssignorは、すべてのコンシューマーを一時停止することなく増分リバランスをサポートするため、一般的に優れた選択肢です。