このトピックでは、ApsaraMQ for Kafka のクライアントパラメーターを設定する方法について説明します。パラメーターを適切に設定することは、メッセージのスループット、配信の信頼性、コンシューマーの安定性に直接影響します。以下のセクションでは、プロデューサーとコンシューマーのパラメーターについて、本番ワークロード向けの推奨値とチューニングガイダンスを説明します。
プロデューサーのパラメーター
メッセージ配信
acks
送信が成功したと見なす前に、プロデューサーがブローカーから要求する確認応答 (ACK) の数を制御します。
| 値 | 動作 | トレードオフ |
|---|---|---|
0 | ブローカーからの確認応答なし。 | 最高のスループット、最大のデータ損失リスク。 |
1 | リーダーがデータを書き込んだ後の確認応答。 | スループットと耐久性のバランスが取れています。フォロワーがデータをレプリケートする前にリーダーに障害が発生した場合、データが失われる可能性があります。 |
all | リーダーとすべての同期レプリカ (ISR) がデータを書き込んだ後の確認応答。 | 最低のスループット、最強の耐久性。リーダーとすべての同期レプリカが同時に障害した場合にのみデータが失われます。 |
推奨値:厳密な耐久性よりもスループットを優先するほとんどのワークロードでは 1 です。
retries
失敗した送信をプロデューサーがリトライする最大回数。値を大きくすると、プロデューサーはリーダー選出など一時的なブローカーの障害から回復しやすくなります。retry.backoff.ms と組み合わせて、リトライのペースを制御します。
retry.backoff.ms
送信リトライ間の遅延時間。値が低すぎると、ブローカーのフェールオーバー中にリトライストームが発生する可能性があります。
| 推奨 | デフォルト | 単位 |
|---|---|---|
| 1000 | -- | ミリ秒 |
バッチ処理とスループット
バッチ処理は、複数のレコードを単一のリクエストにまとめることで、ネットワークのオーバーヘッドを償却します。バッチがいつ送信されるかは、サイズと時間の 2 つのパラメーターで制御されます。
batch.size
パーティションごとの単一バッチの最大サイズ。バッチがこのサイズに達すると、プロデューサーは即座に送信します。
| タイプ | デフォルト | 有効値 | 単位 |
|---|---|---|---|
| int | 16384 | [0,...] | バイト |
ほとんどのワークロードでは、デフォルト値の 16384 を維持してください。値を小さくすると、ネットワークリクエストの数が増加し、スループットが低下します。batch.size を増やす場合は、buffer.memory がより大きなバッチを収容できる十分な大きさであることを確認してください。
linger.ms
プロデューサーがバッチがいっぱいになるのを待ってから送信するまでの最大時間。これは TCP の Nagle アルゴリズムのように機能します。バッチが batch.size に達すると、linger タイマーに関係なく即座に送信されます。linger.ms が経過した時点でバッチがまだ batch.size 未満の場合、プロデューサーは蓄積されたものを送信します。
| 推奨 | デフォルト | 単位 |
|---|---|---|
| 100~1000 | 0 | ミリ秒 |
linger.ms を高くすると、メッセージごとのレイテンシーを犠牲にして、バッチ処理の効率とスループットが向上します。
メモリ管理
buffer.memory
プロデューサーがすべてのパーティションにわたる未送信レコードのバッファリングのために割り当てる合計メモリ。このプールが使い果たされると、max.block.ms に応じて send() はブロックされるか、例外をスローします。バッファーが小さすぎると、メモリ割り当てが遅くなったり、スループットが低下したり、送信がタイムアウトしたりする原因になります。
単位:バイト。デフォルト:33554432 (32 MB)。
サイジングの数式:
buffer.memory >= batch.size x number_of_partitions x 2たとえば、batch.size=16384 でパーティションが 50 個の場合:
16384 x 50 x 2 = 1,638,400 バイト (最小約 1.6 MB)スループット向上のために batch.size を増やす場合は、buffer.memory も比例してスケールしてください。
パーティショニング
partitioner.class
プロデューサーがレコードをパーティションに割り当てる方法を決定します。スティッキーパーティショニング戦略は、次のパーティションに移動する前に 1 つのパーティションのバッチを埋めることで、不完全なバッチの数を減らします。
| Kafka クライアントバージョン | デフォルト戦略 |
|---|---|
| 2.4 以降 | スティッキーパーティショニング (デフォルト) |
| 2.4 より前 | ラウンドロビン |
ご利用のプロデューサークライアントがバージョン 2.4 より前の場合は、バッチ処理の効率を向上させるために、スティッキーパーティショナーを明示的に設定してください。
コンシューマーのパラメーター
フェッチのチューニング
これらのパラメーターは、コンシューマーがフェッチリクエストごとに取得するデータ量を制御します。これらのチューニングは、スループットとレイテンシーの両方に影響します。
fetch.min.bytes
ブローカーがフェッチ応答を返す前に蓄積する最小データ量。値を大きくすると、フェッチ頻度とブローカーの CPU オーバーヘッドが減少し、スループットが向上しますが、エンドツーエンドのメッセージレイテンシーは増加します。単位:バイト。
この値を設定する前に、ご利用のプロデューサーのメッセージレートを評価してください。メッセージの到着が遅い場合、fetch.min.bytes を大きくすると不要な遅延が発生します。
fetch.max.wait.ms
ブローカーが応答を返す前に fetch.min.bytes を蓄積するために待機する最大時間。単位:ミリ秒。
ストレージタイプによって動作が異なります:
ローカルストレージ:ブローカーは、
fetch.min.bytesに達するかfetch.max.wait.msが経過するかのいずれか早い方まで待機します。クラウドストレージ:ブローカーは、
fetch.min.bytesに関係なく、新しいデータが到着するとすぐにレスポンスを返します。
max.partition.fetch.bytes
ブローカーが 1 回のフェッチでパーティションごとに返す最大データ量。単位:バイト。
セッション管理とリバランス
セッションとポーリングのパラメーターの設定ミスは、予期しないコンシューマーのリバランスの最も一般的な原因です。リバランスは、パーティションが再割り当てされるまでグループ内のすべての消費を一時停止するため、不要なリバランスのトリガーは避けてください。
session.timeout.ms
ブローカーがコンシューマーを停止したと見なし、リバランスをトリガーするまでのハートビート間の最大時間。
| 推奨 | 有効値 | デフォルト | 単位 |
|---|---|---|---|
| 30000~60000 | 6000~300000 | 10000 | ミリ秒 |
poll() とは独立してハートビートを送信します。それより前の Java バージョンまたは Java 以外のクライアントでは、ハートビートは poll() 呼び出し中に送信されるため、session.timeout.ms はデータ処理時間とハートビート間隔の両方を考慮する必要があります。heartbeat.interval.ms は session.timeout.ms の 3 分の 1 以下に設定してください。たとえば、session.timeout.ms が 45000 の場合、heartbeat.interval.ms は 15000 以下に設定します。max.poll.records
1 回の poll() 呼び出しで返されるレコードの最大数。コンシューマーが次の poll() のデッドラインまでにこの数のレコードを処理できない場合、ブローカーはコンシューマーを停止したと見なし、リバランスをトリガーします。
サイジングの数式:
max.poll.records < messages_per_thread_per_second x consumer_threads x session_timeout_secondsたとえば、スレッドあたり 500 msg/s、4 スレッド、セッションタイムアウト 45 秒の場合:
500 x 4 x 45 = 90,000セッションがタイムアウトする前にコンシューマーが常に処理を完了できるように、max.poll.records をこの値より低く設定してください。
max.poll.interval.ms
ブローカーがコンシューマーをグループから削除するまでの、連続した poll() 呼び出し間の最大間隔。このパラメーターは、ハートビートが別のスレッドで実行される Java クライアント 0.10.1 以降にのみ適用されます。
| デフォルト | 単位 |
|---|---|
| 300000 | ミリ秒 |
サイジングの数式:
max.poll.interval.ms > time_per_record x max.poll.recordsほとんどの場合、デフォルトの 300000 (5 分) で十分です。処理ロジックが非常に遅い場合にのみ、この値を増やしてください。
オフセット管理
enable.auto.commit
コンシューマーが一定間隔でオフセットを自動的にコミットするかどうかを制御します。
| 値 | 動作 |
|---|---|
true (デフォルト) | オフセットは auto.commit.interval.ms ミリ秒ごとに自動的にコミットされます。管理は簡単ですが、クラッシュ後に重複処理が発生する可能性があります。 |
false | アプリケーションで commitSync() または commitAsync() を明示的に呼び出す必要があります。べき等な処理と組み合わせることで、少なくとも 1 回のセマンティクスまたは 1 回限りのセマンティクスを実現する場合に使用します。 |
auto.commit.interval.ms
enable.auto.commit が true の場合の自動オフセットコミットの間隔。
| デフォルト | 単位 |
|---|---|
| 1000 | ミリ秒 |
間隔を短くすると、クラッシュ後の重複メッセージのウィンドウは減少しますが、ブローカーへのコミットリクエストの数は増加します。
auto.offset.reset
コンシューマーにコミット済みオフセットがない場合、またはコミット済みオフセットが無効な場合 (例:保持ポリシーによりオフセットが削除された場合) の動作を決定します。
| 値 | 動作 |
|---|---|
latest | 最新のオフセットから消費を開始します。新しいメッセージのみが対象です。 |
earliest | 利用可能な最も古いオフセットから消費を開始します。保持されているすべてのメッセージを再処理します。 |
none | 例外をスローします。アプリケーションがオフセットを手動で管理する場合に使用します。 |
推奨値:latest。earliest を使用すると、無効なオフセットが検出されるたびに、コンシューマーは保持されているすべてのメッセージを再処理します。これにより、重複処理やコンシューマーの遅延の急増につながる可能性があります。
アプリケーションがオフセット管理を手動で行う場合 (例:外部データベースにオフセットを保存する場合) は、これを none に設定してください。
パラメータークイックリファレンス
プロデューサーのパラメーター
| パラメーター | デフォルト | 推奨 | 単位 |
|---|---|---|---|
acks | -- | 1 (スループット) または all (耐久性) | -- |
retries | -- | 可用性の要件に基づいて設定 | -- |
retry.backoff.ms | 100 | 1000 | ms |
batch.size | 16384 | 16384 (デフォルト) | バイト |
linger.ms | 0 | 100~1000 | ms |
buffer.memory | 33554432 | >= batch.size x パーティション x 2 | バイト |
partitioner.class | スティッキー (2.4+) | スティッキーパーティショニング | -- |
コンシューマーのパラメーター
| パラメーター | デフォルト | 推奨 | 単位 |
|---|---|---|---|
fetch.min.bytes | 1 | プロデューサーのメッセージレートに基づいてチューニング | バイト |
fetch.max.wait.ms | 500 | -- | ms |
max.partition.fetch.bytes | 1048576 | -- | バイト |
session.timeout.ms | 10000 | 30000~60000 | ms |
heartbeat.interval.ms | 3000 | <= 1/3 の session.timeout.ms | ms |
max.poll.records | 500 | サイジングの数式を参照 | -- |
max.poll.interval.ms | 300000 | 300000 (デフォルト) | ms |
enable.auto.commit | true | 配信セマンティクスに依存 | -- |
auto.commit.interval.ms | 1000 | 1000 (デフォルト) | ms |
auto.offset.reset | latest | latest | -- |