ApsaraMQ for RabbitMQ は、Spring フレームワーク用の SDK を提供します。このトピックでは、Spring SDK を統合してメッセージを送受信する方法について説明します。
前提条件
ApsaraMQ for RabbitMQ コンソールで、インスタンス、vhost、exchange、キューなどのリソースを作成済みであること。 詳細については、「手順 2: リソースの作成」をご参照ください。
デモプロジェクト
SpringBootDemo.zip をクリックして、デモプロジェクトをダウンロードします。
手順 1: パラメーターの設定
application.properties ファイルまたは application.yml ファイルで、構成パラメーターを設定します。 次の例では、application.properties ファイルを使用します。
# エンドポイント。 ApsaraMQ for RabbitMQ コンソールの [インスタンス詳細] ページでエンドポイントを取得します。
spring.rabbitmq.host=XXXXXX.amqp.aliyuncs.com
# ApsaraMQ for RabbitMQ への接続に使用されるポート。
spring.rabbitmq.port=5672
# インスタンスの静的ユーザー名。 ApsaraMQ for RabbitMQ コンソールの [静的アカウント] ページでユーザー名を表示します。
spring.rabbitmq.username=******
# インスタンスの静的パスワード。 ApsaraMQ for RabbitMQ コンソールの [静的アカウント] ページでパスワードを表示します。
spring.rabbitmq.password=******
# 仮想ホスト。論理的な分離を提供します。 ApsaraMQ for RabbitMQ コンソールの [Vhosts] ページで仮想ホストを表示します。
spring.rabbitmq.virtual-host=test_vhost
# メッセージ確認 (Ack) モード。
# 1. none: コンシューマーがメッセージを受信すると、消費が成功したかどうかに関係なく、サーバーはメッセージが正常に処理されたと見なします。 これは RabbitMQ の autoAck モードです。
# 2. auto: メッセージが正常に消費されると、クライアントは自動的に ack を送信します。 メッセージの処理に失敗した場合、クライアントは nack を送信するか、例外をスローします。 Channel.basicAck() を明示的に呼び出す必要はありません。
# 3. manual: 手動で Ack を送信します。 メッセージが正常に消費された後、Channel.basicAck() を明示的に呼び出す必要があります。
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# キャッシュモードを CONNECTION に設定します。 ApsaraMQ for RabbitMQ は分散アーキテクチャを使用します。 CONNECTION モードでは、クライアントはクラスター内の複数のサービスノードによりバランスの取れた方法で接続できます。 このメソッドは、負荷のホットスポットを効果的に防止し、メッセージの送信と消費の効率を向上させることができます。
spring.rabbitmq.cache.connection.mode=connection
# 必要に応じて値を調整します。
spring.rabbitmq.cache.connection.size=50
# 必要に応じて値を調整します。
spring.rabbitmq.cache.channel.size=1
# コンシューマーが一度に処理できる未確認 (Ack) メッセージの最大数 (QoS)。 ApsaraMQ for RabbitMQ サーバーは、QoS 値として min{prefetch, 100} を使用します。 コンシューマーの処理能力が低い場合は、この値を小さくしてください。
spring.rabbitmq.listener.simple.prefetch=100
# RabbitMQ リスナーの同時コンシューマーの最小数。 必要に応じて値を調整します。
spring.rabbitmq.listener.simple.concurrency=10
# RabbitMQ リスナーの同時コンシューマーの最大数。 消費率が十分に高い場合、クライアントは max-concurrency のコンシューマーを起動してメッセージを消費します。
spring.rabbitmq.listener.simple.max-concurrency=20必要に応じて、次のオプションの構成を追加できます。
手順 2: SDK を使用してメッセージを送受信する
メッセージの生成
RabbitMQService で、依存性注入を介して RabbitTemplate を取得し、その send メソッドを呼び出してメッセージを送信します。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
@Service
public class RabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, String content) {
// MessageId を設定します。
String msgId = UUID.randomUUID().toString();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(msgId);
// メッセージを作成します。
Message message = new Message(content.getBytes(StandardCharsets.UTF_8), messageProperties);
/*
* send() インターフェイスを呼び出してメッセージを送信します。
* exchange: exchange の名前。
* routingKey: ルーティングキー。
* message: メッセージの内容。
* correlationData はパブリッシャー確認に使用されます。
*/
rabbitTemplate.send(exchange, routingKey, message, null);
}
}メッセージの消費
@RabbitListener アノテーションを使用してメッセージを消費します。
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Arrays;
@Component
public class MessageListener {
/**
* メッセージを受信します。
* @param message メッセージ。
* @param channel チャンネル。
* @throws IOException
* queues を作成したキューの名前に置き換えます。
*/
@RabbitListener(queues = "myQueue")
public void receiveFromMyQueue(Message message, Channel channel) throws IOException {
// メッセージ消費のビジネスロジックを入力します。
...
// Ack 有効期間 (消費タイムアウト) 内に Ack を返す必要があります。 そうしないと、確認が無効になり、メッセージが再配信されます。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}以下は、RabbitListener の一般的なオプション構成です。