すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for Kafka:プロデューサーのベストプラクティス

最終更新日:Nov 09, 2025

このトピックでは、メッセージ送信エラーを減らすのに役立つ ApsaraMQ for Kafka プロデューサーのベストプラクティスについて説明します。これらのベストプラクティスは、Java クライアントに基づいています。基本的な概念は他のプログラミング言語のクライアントでも同様ですが、実装の詳細は異なる場合があります。

メッセージを送信する

次のサンプルコードは、メッセージを送信する方法を示しています。

Future<RecordMetadata> metadataFuture = producer.send(new ProducerRecord<String, String>(
        topic,   // メッセージの Topic。
        null,   // パーティション番号。 プロデューサーにパーティションを割り当てさせるには、これを null に設定します。
        System.currentTimeMillis(),   // タイムスタンプ。
        String.valueOf(value.hashCode()),   // メッセージキー。
        value   // メッセージの値。
));

完全なサンプルコードについては、「SDK の概要」をご参照ください。

キーと値

ApsaraMQ for Kafka 0.10.2 のメッセージには、次の 2 つのフィールドがあります。

  • Key: メッセージの識別子。

  • Value: メッセージの内容。

追跡を簡素化するために、各メッセージに一意のキーを設定します。キーを使用してメッセージを追跡し、送信ログと消費ログを出力してそのステータスを確認できます。

多数のメッセージを送信する場合は、キーを設定しないでください。代わりに、スティッキーパーティショニング戦略を使用します。スティッキーパーティショニング戦略の詳細については、「スティッキーパーティショニング戦略」をご参照ください。

重要

ApsaraMQ for Kafka 0.11.0 以降のバージョンはヘッダーをサポートしています。ヘッダーを使用する場合は、サーバーをバージョン 2.2.0 にアップグレードする必要があります。

失敗した試行のリトライ

分散環境では、ネットワークの問題によりメッセージの送信が時折失敗することがあります。メッセージは送信されたが確認応答 (ACK) が失敗した場合、またはメッセージが正常に送信されなかった場合に、失敗が発生する可能性があります。

ApsaraMQ for Kafka は、仮想 IP アドレス (VIP) ネットワークアーキテクチャを使用します。このアーキテクチャでは、長時間アイドル状態の接続は自動的に閉じられます。したがって、非アクティブなクライアントは、しばしば connection reset by peer エラーを受け取ります。このエラーが発生した場合は、メッセージの送信をリトライする必要があります。

必要に応じて、次のリトライパラメーターを設定できます。

  • retries: メッセージの送信に失敗した場合のリトライ回数。

  • retry.backoff.ms: 失敗したメッセージのリトライ間隔。このパラメーターを 1000 に設定することをお勧めします。単位はミリ秒です。

メッセージを非同期で送信する

send インターフェイスは非同期です。送信結果を受け取るには、metadataFuture.get(timeout, TimeUnit.MILLISECONDS) を呼び出すことができます。

スレッドセーフ

プロデューサーはスレッドセーフであり、どの Topic にもメッセージを送信できます。通常、1 つのアプリケーションは 1 つのプロデューサーを使用します。

Acks

次のリストは、`acks` の設定について説明しています。

  • acks=0: サーバーからの応答は不要です。この設定は高いパフォーマンスを提供しますが、データ損失のリスクが高くなります。

  • acks=1: データがプライマリノードに書き込まれた後に応答が返されます。この設定は中程度のパフォーマンスと中程度のリスクのデータ損失を提供します。プライマリノードがダウンした場合にデータ損失が発生する可能性があります。

  • acks=all: データがプライマリノードに書き込まれ、レプリカノードに同期された後にのみ応答が返されます。この設定は低いパフォーマンスですが、高いデータセキュリティを提供します。プライマリノードとレプリカノードの両方がダウンした場合にのみデータが失われます。

送信パフォーマンスを向上させるために、acks=1 を設定できます。

断片化されたリクエストを減らして送信パフォーマンスを向上させる

通常、ApsaraMQ for Kafka の Topic には複数のパーティションがあります。ApsaraMQ for Kafka プロデューサークライアントがサーバーにメッセージを送信するとき、まず Topic のどのパーティションにメッセージを送信するかを決定する必要があります。同じパーティションに複数のメッセージを送信すると、プロデューサークライアントはメッセージをバッチにパッケージ化してサーバーに送信します。プロデューサークライアントは、バッチを処理するときに追加のオーバーヘッドを発生させます。小さなバッチは、プロデューサークライアントが多くのリクエストを生成する原因となる可能性があります。これにより、クライアントとサーバーでリクエストのキューイングが発生し、CPU 使用率が増加し、メッセージの送受信と消費の全体的なレイテンシーが増加します。適切なバッチサイズは、クライアントからサーバーへのリクエスト数を減らすことができ、メッセージ送信の全体的なスループットとレイテンシーを向上させます。

ApsaraMQ for Kafka プロデューサーは、2 つの主要なパラメーターでバッチ処理メカニズムを制御します。

  • batch.size: 各パーティションのメッセージキャッシュのサイズ。これはメッセージの数ではなく、メッセージコンテンツの合計バイト数です。キャッシュがこのサイズに達すると、バッチをサーバーに送信するためのネットワークリクエストがトリガーされます。batch.size が小さすぎると、送信パフォーマンスと安定性に影響を与える可能性があります。デフォルト値の 16384 を維持することをお勧めします。単位はバイトです。

  • linger.ms: メッセージがキャッシュに留まることができる最大時間。この時間を超えると、プロデューサークライアントは batch.size の制限を無視し、すぐにメッセージをサーバーに送信します。必要に応じて、linger.ms を 100 から 1000 の間の値に設定できます。単位はミリ秒です。

したがって、ApsaraMQ for Kafka プロデューサーがメッセージのバッチをサーバーに送信するタイミングは、batch.sizelinger.ms の両方によって決まります。必要に応じてこれらのパラメーターを調整できます。送信パフォーマンスを向上させ、サービスの安定性を確保するために、batch.size=16384 および linger.ms=1000 を設定することをお勧めします。

さらに、メッセージを送信するにはバージョン 2.4 以降のクライアントを使用することをお勧めします。このバージョンでは、デフォルトでスティッキーパーティショニング戦略が有効になります。この戦略により、断片化された送信が大幅に削減され、全体的な送信パフォーマンスが向上します。スティッキーパーティショニング戦略に関する Kafka Improvement Proposal (KIP) とパフォーマンスレポートの詳細については、「KIP-480: Sticky Partitioner - Apache Kafka - Apache Software Foundation」をご参照ください。

スティッキーパーティショニング戦略

同じパーティションに送信されたメッセージのみが同じバッチに配置されます。したがって、ApsaraMQ for Kafka プロデューサーで設定されるパーティショニング戦略は、バッチがどのように形成されるかを決定する要因です。ApsaraMQ for Kafka プロデューサーでは、partitioner 実装クラスを設定することで、ビジネスに適したパーティションを選択できます。キーが指定されたメッセージの場合、ApsaraMQ for Kafka プロデューサーのデフォルト戦略は、メッセージキーをハッシュ化し、ハッシュ結果に基づいてパーティションを選択することです。これにより、同じキーを持つメッセージが同じパーティションに送信されることが保証されます。

キーが指定されていないメッセージの場合、ApsaraMQ for Kafka バージョン 2.4 より前のデフォルト戦略は、Topic のすべてのパーティションを巡回することです。メッセージはラウンドロビン方式で各パーティションに送信されます。ただし、このデフォルト戦略ではバッチ処理のパフォーマンスが低下します。実際には、多くの小さなバッチが生成され、レイテンシーが増加する可能性があります。キーのないメッセージのパーティショニング効率が低いため、ApsaraMQ for Kafka はバージョン 2.4 でスティッキーパーティショニング戦略を導入しました。

スティッキーパーティショニング戦略は、キーのないメッセージが異なるパーティションに分散されることによって引き起こされる小さなバッチの問題を解決します。主な戦略は、パーティションのバッチが完了した後に別のパーティションをランダムに選択することです。後続のメッセージは、その新しいパーティションを可能な限り使用します。短期的には、この戦略はメッセージを同じパーティションに送信します。より長いランタイムにわたって、メッセージは依然としてすべてのパーティションに均等に分散されます。このアプローチにより、メッセージパーティションの偏りを防ぎ、レイテンシーを削減し、全体的なサービスパフォーマンスを向上させます。

ApsaraMQ for Kafka プロデューサークライアントのバージョン 2.4 以降を使用する場合、スティッキーパーティショニング戦略がデフォルトで使用されます。バージョン 2.4 より前のプロデューサークライアントを使用する場合、スティッキーパーティショニング戦略の原則に基づいて独自のパーティショニング戦略を実装できます。次に、partitioner.class パラメーターを使用して目的の戦略を設定できます。

スティッキーパーティショニング戦略の実装については、次の Java コードをご参照ください。このコードのロジックは、特定の時間間隔でパーティションを切り替えることです。

public class MyStickyPartitioner implements Partitioner {

    // 最後のパーティション切り替えの時刻を記録します。
    private long lastPartitionChangeTimeMillis = 0L;
    // 現在のパーティションを記録します。
    private int currentPartition = -1;
    // パーティションの切り替え間隔。 必要に応じて間隔を設定します。
    private long partitionChangeTimeGap = 100L;
    
    public void configure(Map<String, ?> configs) {}

    /**
     * 指定されたレコードのパーティションを計算します。
     *
     * @param topic Topic 名
     * @param key パーティション分割するキー (キーがない場合は null)
     * @param keyBytes パーティション分割するシリアル化されたキー (キーがない場合は null)
     * @param value パーティション分割する値 (または null)
     * @param valueBytes パーティション分割するシリアル化された値 (または null)
     * @param cluster 現在のクラスターのメタデータ
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // すべてのパーティション情報を取得します。
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null) {
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            int availablePartitionSize = availablePartitions.size();

            // 現在のアクティブなパーティションを確認します。
            if (availablePartitionSize > 0) {
                handlePartitionChange(availablePartitionSize);
                return availablePartitions.get(currentPartition).partition();
            } else {
                handlePartitionChange(numPartitions);
                return currentPartition;
            }
        } else {
            // キーを持つメッセージの場合、キーのハッシュ値に基づいてパーティションを選択します。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private void handlePartitionChange(int partitionNum) {
        long currentTimeMillis = System.currentTimeMillis();

        // 最後の切り替えからの時間が切り替え間隔を超えた場合は、次のパーティションに切り替えます。 それ以外の場合は、現在のパーティションを使用します。
        if (currentTimeMillis - lastPartitionChangeTimeMillis >= partitionChangeTimeGap
            || currentPartition < 0 || currentPartition >= partitionNum) {
            lastPartitionChangeTimeMillis = currentTimeMillis;
            currentPartition = Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;
        }
    }

    public void close() {}

}

OOM

ApsaraMQ for Kafka のバッチ設計に基づき、ApsaraMQ for Kafka はメッセージをキャッシュし、バッチで送信します。キャッシュされるメッセージが多すぎると、out-of-memory (OOM) エラーが発生する可能性があります。

  • buffer.memory: メッセージを送信するためのメモリプールのサイズ。メモリプールが小さすぎると、メモリの要求に時間がかかることがあります。これは送信パフォーマンスに影響を与え、送信タイムアウトを引き起こすことさえあります。buffer.memory を `batch.size × パーティション数 × 2` 以上に設定することをお勧めします。単位はバイトです。

  • buffer.memory のデフォルト値は 32 MB です。これは、単一のプロデューサーが十分なパフォーマンスを確保するのに十分です。

    重要

    同じ Java 仮想マシン (JVM) で複数のプロデューサーを起動すると、各プロデューサーが 32 MB のキャッシュスペースを占有する可能性があります。これにより、OOM エラーがトリガーされる可能性があります。

  • 本番環境では、通常、複数のプロデューサーを起動する必要はありません。特別な事情で必要な場合は、OOM エラーを避けるために buffer.memory のサイズを考慮する必要があります。

パーティションの順序

単一のパーティション内では、メッセージは送信された順序で保存および消費されます。したがって、メッセージはほとんどの場合順序付けられています。

デフォルトでは、可用性を向上させるために、ApsaraMQ for Kafka は単一パーティション内での厳密な順序を保証しません。アップグレード中または故障中に、少数のメッセージが順序不同になることがあります。これは、パーティションが失敗し、そのメッセージが別のパーティションにフェイルオーバーされたときに発生します。

ビジネスでパーティション内の厳密な順序付けが必要な場合は、Topic の作成時にローカル記憶域を選択できます。