すべてのプロダクト
Search
ドキュメントセンター

ApsaraMQ for RabbitMQ:Spring との統合

最終更新日:Dec 03, 2025

ApsaraMQ for RabbitMQ は、Spring フレームワーク用の SDK を提供します。このトピックでは、Spring SDK を統合してメッセージを送受信する方法について説明します。

前提条件

デモプロジェクト

SpringBootDemo.zip をクリックして、デモプロジェクトをダウンロードします。

手順 1: パラメーターの設定

application.properties ファイルまたは application.yml ファイルで、構成パラメーターを設定します。 次の例では、application.properties ファイルを使用します。

# エンドポイント。 ApsaraMQ for RabbitMQ コンソールの [インスタンス詳細] ページでエンドポイントを取得します。 
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# ApsaraMQ for RabbitMQ への接続に使用されるポート。
spring.rabbitmq.port=5672
# インスタンスの静的ユーザー名。 ApsaraMQ for RabbitMQ コンソールの [静的アカウント] ページでユーザー名を表示します。 
spring.rabbitmq.username=******
# インスタンスの静的パスワード。 ApsaraMQ for RabbitMQ コンソールの [静的アカウント] ページでパスワードを表示します。 
spring.rabbitmq.password=******
# 仮想ホスト。論理的な分離を提供します。 ApsaraMQ for RabbitMQ コンソールの [Vhosts] ページで仮想ホストを表示します。 
spring.rabbitmq.virtual-host=test_vhost
# メッセージ確認 (Ack) モード。
# 1. none: コンシューマーがメッセージを受信すると、消費が成功したかどうかに関係なく、サーバーはメッセージが正常に処理されたと見なします。 これは RabbitMQ の autoAck モードです。
# 2. auto: メッセージが正常に消費されると、クライアントは自動的に ack を送信します。 メッセージの処理に失敗した場合、クライアントは nack を送信するか、例外をスローします。 Channel.basicAck() を明示的に呼び出す必要はありません。
# 3. manual: 手動で Ack を送信します。 メッセージが正常に消費された後、Channel.basicAck() を明示的に呼び出す必要があります。
spring.rabbitmq.listener.simple.acknowledge-mode=manual

# キャッシュモードを CONNECTION に設定します。 ApsaraMQ for RabbitMQ は分散アーキテクチャを使用します。 CONNECTION モードでは、クライアントはクラスター内の複数のサービスノードによりバランスの取れた方法で接続できます。 このメソッドは、負荷のホットスポットを効果的に防止し、メッセージの送信と消費の効率を向上させることができます。
spring.rabbitmq.cache.connection.mode=connection
# 必要に応じて値を調整します。
spring.rabbitmq.cache.connection.size=50
# 必要に応じて値を調整します。
spring.rabbitmq.cache.channel.size=1

# コンシューマーが一度に処理できる未確認 (Ack) メッセージの最大数 (QoS)。 ApsaraMQ for RabbitMQ サーバーは、QoS 値として min{prefetch, 100} を使用します。 コンシューマーの処理能力が低い場合は、この値を小さくしてください。
spring.rabbitmq.listener.simple.prefetch=100
# RabbitMQ リスナーの同時コンシューマーの最小数。 必要に応じて値を調整します。
spring.rabbitmq.listener.simple.concurrency=10
# RabbitMQ リスナーの同時コンシューマーの最大数。 消費率が十分に高い場合、クライアントは max-concurrency のコンシューマーを起動してメッセージを消費します。
spring.rabbitmq.listener.simple.max-concurrency=20

必要に応じて、次のオプションの構成を追加できます。

オプションの構成

プロパティ

説明

spring.rabbitmq.addresses

クライアントが接続するサーバーアドレス。 複数のアドレスはコンマ (,) で区切ります。

spring.rabbitmq.hostspring.rabbitmq.addresses の両方を設定した場合、spring.rabbitmq.addresses パラメーターが優先されます

spring.rabbitmq.dynamic

AmqpAdmin bean を作成するかどうかを指定します。 デフォルト値は true です。

spring.rabbitmq.connection-timeout

接続タイムアウト。 単位: ミリ秒。 値 0 はタイムアウトがないことを示します。

spring.rabbitmq.requested-heartbeat

ハートビートタイムアウト。 単位: 秒。 デフォルト値は 60 です。

spring.rabbitmq.publisher-confirms

パブリッシャー確認メカニズムを有効にするかどうかを指定します。

spring.rabbitmq.publisher-returns

パブリッシャーリターンメカニズムを有効にするかどうかを指定します。

spring.rabbitmq.ssl.enabled

SSL 証明書認証を有効にするかどうかを指定します。

spring.rabbitmq.ssl.key-store

SSL 証明書を保持するキーストアへのパス。

spring.rabbitmq.ssl.key-store-password

キーストアにアクセスするためのパスワード。

spring.rabbitmq.ssl.trust-store

SSL 証明書を持つ信頼できるアドレス。

spring.rabbitmq.ssl.trust-store-password

トラストストアにアクセスするためのパスワード。

spring.rabbitmq.ssl.algorithm

TLSv1.2 など、SSL で使用されるアルゴリズム。

spring.rabbitmq.ssl.validate-server-certificate

サーバー証明書の検証を有効にするかどうかを指定します。

spring.rabbitmq.ssl.verify-hostname

ホスト検証を有効にするかどうかを指定します。

spring.rabbitmq.cache.channel.size

キャッシュに保持するチャンネルの数。

spring.rabbitmq.cache.channel.checkout-timeout

キャッシュサイズに達したときにキャッシュからチャンネルを取得するためのタイムアウト。

単位: ミリ秒。 値 0 は、常に新しいチャンネルが作成されることを意味します。

spring.rabbitmq.cache.connection.size

キャッシュされた接続の数。 このパラメーターは、CONNECTION モードでのみ有効です。

spring.rabbitmq.cache.connection.mode

接続キャッシュモード。 有効な値は次のとおりです:

  • CHANNEL

  • CONNECTION

CONNECTION モードを推奨します。

spring.rabbitmq.listener.type

リスナーコンテナーのタイプ。 有効な値は次のとおりです:

  • simple

  • direct

デフォルト値は simple です。

spring.rabbitmq.listener.simple.auto-startup

アプリケーションの起動時にコンテナーを自動的に起動するかどうかを指定します。 デフォルト値は true です。

spring.rabbitmq.listener.simple.acknowledge-mode

メッセージ確認モード。 有効な値は次のとおりです:

  • none: コンシューマーがメッセージを受信すると、消費が成功したかどうかに関係なく、サーバーはメッセージが正常に処理されたと見なします。 これは RabbitMQ の autoAck モードです。

  • manual: 手動で Ack を送信します。 メッセージが正常に消費された後、Basic.ack を明示的に呼び出す必要があります。

  • auto: メッセージが正常に消費されると、クライアントは自動的に ack を送信します。 メッセージの処理に失敗した場合、クライアントは nack を送信するか、例外をスローします。 Basic.ack を明示的に呼び出す必要はありません。

デフォルト値は auto です。

spring.rabbitmq.listener.simple.concurrency

コンシューマーの最小数。

spring.rabbitmq.listener.simple.max-concurrency

コンシューマーの最大数。

spring.rabbitmq.listener.simple.prefetch

コンシューマーが一度に処理できる未確認 (Ack) メッセージの最大数。 これは、Basic.qos メソッドを呼び出して QoS 値を設定するのと同じです。 トランザクションが使用される場合、この値はトランザクションサイズ以上である必要があります。

spring.rabbitmq.listener.simple.transaction-size

トランザクションで処理するメッセージの数。

spring.rabbitmq.listener.simple.default-requeue-rejected

拒否されたメッセージを再キューイングするかどうかを指定します。 デフォルト値は true です。

spring.rabbitmq.listener.simple.missing-queues-fatal

宣言されたキューがブローカーで利用できない場合にリスナーが失敗するかどうか、または実行時に 1 つ以上のキューが削除された場合にコンテナーが停止するかどうかを指定します。 デフォルト値は true です。

spring.rabbitmq.listener.simple.idle-event-interval

アイドルコンテナーイベントをパブリッシュする間隔。 単位: ミリ秒。

spring.rabbitmq.template.mandatory

必須メッセージングを有効にするかどうかを指定します。 デフォルト値は false です。

spring.rabbitmq.template.receive-timeout

receive() 操作のタイムアウト。

spring.rabbitmq.template.reply-timeout

sendAndReceive() 操作のタイムアウト。

手順 2: SDK を使用してメッセージを送受信する

メッセージの生成

RabbitMQService で、依存性注入を介して RabbitTemplate を取得し、その send メソッドを呼び出してメッセージを送信します。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Service
public class RabbitMQService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMessage(String exchange, String routingKey, String content) {
        // MessageId を設定します。
        String msgId = UUID.randomUUID().toString();
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setMessageId(msgId);
        // メッセージを作成します。
        Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
        /*
         * send() インターフェイスを呼び出してメッセージを送信します。
         * exchange: exchange の名前。
         * routingKey: ルーティングキー。
         * message: メッセージの内容。
         * correlationData はパブリッシャー確認に使用されます。
         */
        rabbitTemplate.send(exchange, routingKey, message, null);
    }
}

メッセージの消費

@RabbitListener アノテーションを使用してメッセージを消費します。

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Arrays;

@Component
public class MessageListener {

     /**
     * メッセージを受信します。
     * @param message メッセージ。
     * @param channel チャンネル。
     * @throws IOException
     * queues を作成したキューの名前に置き換えます。
     */
    @RabbitListener(queues = "myQueue")
    public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
        // メッセージ消費のビジネスロジックを入力します。
        ...
        // Ack 有効期間 (消費タイムアウト) 内に Ack を返す必要があります。 そうしないと、確認が無効になり、メッセージが再配信されます。
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

以下は、RabbitListener の一般的なオプション構成です。

オプションの構成

プロパティ

説明

ackMode

カスタムメッセージ確認モード。 これは、spring.rabbitmq.listener.simple.acknowledge-mode 設定を上書きします。

admin

AMQP リソース管理のための AmqpAdmin へのリファレンス。

autoStartup

アプリケーションの起動時にコンテナーを自動的に起動するかどうかを指定します。 これは、spring.rabbitmq.listener.simple.auto-startup 設定を上書きします。

bindings

キューと exchange 間のバインディングの配列。バインディング情報が含まれます。

concurrency

リスナーコンテナーの同時スレッド数を設定します。

errorHandler

リスナーメソッドによってスローされた例外のエラーハンドラを設定します。

exclusive

キューの排他モードを有効にします。 これは、コンシューマーがキューへの排他的アクセス権を持ち、他のコンシューマーがそこからメッセージを受信できないことを意味します。 この機能は現在サポートされていません。

queues

このリスナーがリッスンするキューを宣言します。

queuesToDeclare

明示的に宣言するキューを指定します。