このトピックでは、オープンソースのソフトウェア開発キット (SDK) を使用して ApsaraMQ for RabbitMQ ブローカーに接続し、メッセージを送受信する方法について説明します。例として Java SDK を使用します。
開始する前に
IntelliJ IDEA または Eclipse を使用できます。このトピックでは、IntelliJ IDEA Ultimate を例として使用します。
インスタンスのエンドポイントの取得
メッセージを送受信する前に、プロデューサーとコンシューマーのエンドポイントを設定する必要があります。クライアントはこのエンドポイントを使用して ApsaraMQ for RabbitMQ インスタンスにアクセスします。
ApsaraMQ for RabbitMQ コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
インスタンスリスト ページのトップナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。次に、インスタンスリストで、管理するインスタンスの名前をクリックします。
インスタンス詳細 ページで、アクセスポイント情報 タブに移動します。目的のエンドポイントにマウスポインターを移動し、
アイコンをクリックしてコピーします。タイプ
説明
例
パブリックエンドポイント
インターネットからデータを読み書きできます。従量課金インスタンスは、デフォルトでパブリックエンドポイントをサポートします。サブスクリプションインスタンスのパブリックエンドポイントを使用するには、インスタンスの購入時にインターネットアクセスを有効にする必要があります。
XXX.net.mq.amqp.aliyuncs.com
VPC エンドポイント
VPC 内でデータを読み書きできます。従量課金インスタンスとサブスクリプションインスタンスは、デフォルトで VPC エンドポイントをサポートします。
XXX.vpc.mq.amqp.aliyuncs.com
Java 依存関係ライブラリのインストール
IntelliJ IDEA で Java プロジェクトを作成します。
次の依存関係を pom.xml ファイルに追加します。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.5.0</version> <!-- すべてのオープンソースバージョンがサポートされています。 --> </dependency>
ユーザー名とパスワードの作成
オープンソースの RabbitMQ クライアントをクラウドサービスに接続するときは、ユーザー名とパスワードを作成する必要があります。次に、クライアント SDK の userName および passWord パラメーターを設定する必要があります。ApsaraMQ for RabbitMQ は、このユーザー名とパスワードを権限認証に使用します。
ユーザー名とパスワードを作成する方法は、インスタンスの ID と権限管理モードによって異なります。
オープンソースの ID 検証と権限管理
ApsaraMQ for RabbitMQ コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
インスタンスリスト ページのトップナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。次に、インスタンスリストで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、ユーザーと権限の管理 をクリックします。
ユーザーと権限の管理 ページで、ユーザ名とパスワードの作成 をクリックします。
ユーザ名とパスワードの作成 パネルで、ユーザ名、パスワード、および パスワードの確認 フィールドに入力し、[OK] をクリックします。
説明ユーザー名とパスワードを作成した後、ユーザーに権限を付与する必要があります。詳細については、「権限管理」をご参照ください。
Alibaba Cloud Resource Access Management (RAM)
ApsaraMQ for RabbitMQ コンソールにログインします。左側のナビゲーションウィンドウで、インスタンスリスト をクリックします。
インスタンスリスト ページのトップナビゲーションバーで、管理するインスタンスが存在するリージョンを選択します。次に、インスタンスリストで、管理するインスタンスの名前をクリックします。
左側のナビゲーションウィンドウで、ユーザーと権限の管理 をクリックします。
ユーザーと権限の管理 ページで、ユーザ名とパスワードの作成 をクリックします。
ユーザ名とパスワードの作成 パネルで、[AccessKey ID] と [AccessKey Secret] を入力し、[OK] をクリックします。
説明[AccessKey ID] と [AccessKey Secret] は、Alibaba Cloud RAM コンソールから取得できます。詳細については、「AccessKey ペアの作成」をご参照ください。
作成された静的なユーザー名とパスワードのペアが ユーザーと権限の管理 ページに表示されます。パスワードはマスクされています。

作成された静的なユーザー名とパスワードの パスワード 列で、パスワードの表示 をクリックしてパスワードを表示します。
クライアント接続の作成
オープンソースの RabbitMQ クライアントと ApsaraMQ for RabbitMQ ブローカー間の接続を確立するために、ConnectionFactory.java 接続ファクトリを作成します。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
public class ConnectionFactory {
private final String hostName;
private final int port;
private final String userName;
private final String password;
private final String virtualHost;
private final boolean enableSSL;
public ConnectionFactory(String hostName, int port, String userName,
String password, String virtualHost, boolean enableSSL) {
this.hostName = hostName;
this.port = port;
this.userName = userName;
this.password = password;
this.virtualHost = virtualHost;
this.enableSSL = enableSSL;
}
public Channel createChannel() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
// 新しい接続を作成します
Connection con = createCon();
// 新しいチャネルを作成します
return con.createChannel();
}
private Connection createCon() throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
factory.setHost(hostName);
factory.setUsername(userName);
factory.setPassword(password);
// 自動接続回復を有効にするかどうかを指定します。このパラメーターを true に設定すると、自動接続回復が有効になります。このパラメーターを false に設定すると、自動接続回復は無効になります。
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(5000);
factory.setVirtualHost(virtualHost);
// デフォルトのポート。
factory.setPort(port);
if (enableSSL) {
setSSL(factory);
}
// タイムアウト期間。ネットワーク環境に基づいてこのパラメーターを設定します。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
return factory.newConnection();
}
private void setSSL(com.rabbitmq.client.ConnectionFactory factory) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init((KeyStore) null);
sslContext.init(null, trustManagerFactory.getTrustManagers(), null);
factory.useSslProtocol(sslContext);
}
public void closeCon(Channel channel) {
if (channel != null && channel.getConnection() != null) {
try {
channel.getConnection().close();
} catch (Throwable t) {
}
}
}
}メッセージの生成
Java プロジェクトで、Producer.java という名前のプロデューサープログラムを作成します。SDK パラメーターの説明 に記載されている関連パラメーターを設定し、プログラムを実行します。メッセージ送信時の注意事項の詳細については、「メッセージを生成する際に知っておくべきことは何ですか?」をご参照ください。
サンプルコード:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
public class Producer {
// ApsaraMQ for RabbitMQ インスタンスのエンドポイント。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
// ApsaraMQ for RabbitMQ インスタンスの静的なユーザー名。
public static final String userName = "MjoxODgwNzcwODY5MD****";
// ApsaraMQ for RabbitMQ インスタンスの静的なパスワード。
public static final String password = "NDAxREVDQzI2MjA0OT****";
// ApsaraMQ for RabbitMQ インスタンスの vhost の名前。
public static final String virtualHost = "vhost_test";
// ポート 5671 を使用する場合は、enableSSL パラメーターを true に設定します。
public static final int port = 5672;
public static final boolean enableSSL = false;
private Channel channel;
private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
private final ConnectionFactory factory;
private final String exchangeName;
private final String queueName;
private final String routingKey;
public Producer(ConnectionFactory factory, String exchangeName, String queueName, String routingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
this.factory = factory;
this.outstandingConfirms = new ConcurrentSkipListMap<>();
this.channel = factory.createChannel();
this.exchangeName = exchangeName;
this.queueName = queueName;
this.routingKey = routingKey;
}
public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
// 接続ファクトリを作成します。
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
// プロデューサーを初期化します。
Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "RoutingKeyTest");
// プロデューサーを宣言します。
producer.declare();
producer.initChannel();
// メッセージを送信します。
producer.doSend("hello,amqp");
}
private void initChannel() throws IOException {
channel.confirmSelect();
ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
for (Long tag : confirmed.keySet()) {
String msgId = confirmed.get(tag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
}
confirmed.clear();
} else {
String msgId = outstandingConfirms.remove(deliveryTag);
System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
String msgId = outstandingConfirms.get(deliveryTag);
System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
// メッセージの送信に失敗しました。再発行します
});
channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
}
private void declare() throws IOException {
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
}
private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
try {
String msgId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
} catch (AlreadyClosedException e) {
// チャネルが閉じている場合は再接続が必要です。
String message = e.getMessage();
System.out.println(message);
if (channelClosedByServer(message)) {
factory.closeCon(channel);
channel = factory.createChannel();
this.initChannel();
doSend(content);
} else {
throw e;
}
}
}
private boolean channelClosedByServer(String errorMsg) {
if (errorMsg != null
&& errorMsg.contains("channel.close")
&& errorMsg.contains("reply-code=541")
&& errorMsg.contains("reply-text=InternalError")) {
return true;
} else {
return false;
}
}
}ApsaraMQ for RabbitMQ インスタンスは、ピーク時の秒間トランザクション数 (TPS) に基づいてスロットリングをトリガーする場合があります。詳細については、「インスタンススロットリングのベストプラクティス」をご参照ください。
メッセージのサブスクライブ
Java プロジェクトで、Consumer.java という名前のコンシューマープログラムを作成します。SDK パラメーターの説明 に記載されている関連パラメーターを設定し、プログラムを実行します。
サンプルコード:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
public class Consumer {
// ApsaraMQ for RabbitMQ インスタンスのエンドポイント。
public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
// ApsaraMQ for RabbitMQ インスタンスの静的なユーザー名。
public static final String userName = "MjoxODgwNzcwODY5MD****";
// ApsaraMQ for RabbitMQ インスタンスの静的なパスワード。
public static final String password = "NDAxREVDQzI2MjA0OT****";
// ApsaraMQ for RabbitMQ インスタンスの vhost の名前。
public static final String virtualHost = "vhost_test";
// ポート 5671 を使用する場合は、enableSSL パラメーターを true に設定します。
public static final int port = 5672;
public static final boolean enableSSL = false;
private final Channel channel;
private final String queue;
public Consumer(Channel channel, String queue) {
this.channel = channel;
this.queue = queue;
}
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
Channel channel = factory.createChannel();
channel.basicQos(50);
// ApsaraMQ for RabbitMQ インスタンス上のキューの名前。キュー名は、プロデューサーに指定したキュー名と同じである必要があります。
Consumer consumer = new Consumer(channel, "queue-1");
consumer.consume();
}
public void consume() throws IOException, InterruptedException {
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
// メッセージを処理します。
System.out.println("receive: msgId=" + properties.getMessageId());
// コンシューマーは有効期間内に確認応答 (ack) をコミットする必要があります。そうしないと、メッセージは再度プッシュされます。メッセージは最大 16 回プッシュできます。
// 16 回試行してもメッセージのプッシュに失敗した場合、メッセージは破棄されるか、デッドレター交換に送信されます。
// 有効期間は、プロフェッショナル版インスタンスでは 1 分、エンタープライズ版およびサーバーレス版インスタンスでは 5 分、プラチナ版インスタンスでは 30 分です。
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
channel.getConnection().close();
} catch (IOException e) {
System.out.println("close connection error." + e);
}
latch.countDown();
}));
latch.await();
}
}SDK パラメーターの説明
パラメーター | 例 | 説明 |
hostName | XXX.net.mq.amqp.aliyuncs.com | ApsaraMQ for RabbitMQ インスタンスのエンドポイント。エンドポイントの取得方法の詳細については、「インスタンスのエンドポイントの取得」をご参照ください。 |
Port | 5672 | デフォルトのポート。暗号化されていない接続にはポート 5672 を、暗号化された接続にはポート 5671 を使用します。 |
userName | MjoxODgwNzcwODY5MD**** | クライアントを ApsaraMQ for RabbitMQ ブローカーに接続する際に権限認証に使用される静的なユーザー名です。 事前に ApsaraMQ for RabbitMQ コンソール で静的なユーザー名を作成する必要があります。 詳細については、「ユーザー名とパスワードの作成」をご参照ください。 |
passWord | NDAxREVDQzI2MjA0OT**** | クライアントを ApsaraMQ for RabbitMQ ブローカーに接続する際に権限認証に使用される静的なパスワードです。 事前に ApsaraMQ for RabbitMQ コンソール で静的なパスワードを作成する必要があります。 詳細については、「ユーザー名とパスワードの作成」をご参照ください。 |
virtualHost | amqp_vhost | ApsaraMQ for RabbitMQ インスタンスで作成した vhost。事前に ApsaraMQ for RabbitMQ コンソール で vhost を作成する必要があります。 詳細については、「ステップ 2: リソースの作成」をご参照ください。 |
exchangeName | ExchangeTest | ApsaraMQ for RabbitMQ インスタンスで作成した exchange。 事前に ApsaraMQ for RabbitMQ コンソール で exchange を作成する必要があります。 詳細については、「ステップ 2: リソースの作成」をご参照ください。 |
queueName | QueueTest | ApsaraMQ for RabbitMQ インスタンスで作成したキュー。 事前に ApsaraMQ for RabbitMQ コンソール でキューを作成する必要があります。 詳細については、「ステップ 2: リソースの作成」をご参照ください。 |
routingKey | RoutingKeyTest | ApsaraMQ for RabbitMQ で exchange をキューにバインドするために使用されるルーティングキー。 事前に ApsaraMQ for RabbitMQ コンソール でバインディングを作成する必要があります。 詳細については、「ステップ 2: リソースの作成」をご参照ください。 |
exchangeType | topic | exchange のタイプ。ApsaraMQ for RabbitMQ は、次のタイプの exchange をサポートしています。詳細については、「Exchanges」をご参照ください。
重要 指定した exchange のタイプが、exchange の作成時に選択した exchange のタイプと同じであることを確認してください。 |
参考資料
ApsaraMQ for RabbitMQ はオープンソースの RabbitMQ と完全に互換性があり、複数のプログラミング言語の SDK をサポートしています。他のプログラミング言語の SDK の詳細については、「AMQP をサポートするさまざまなプログラミング言語とフレームワーク用のオープンソース RabbitMQ SDK」をご参照ください。その他のパラメーターの詳細については、「オープンソース RabbitMQ クライアントドキュメント」をご参照ください。
クライアントの実行中にエラーが発生した場合、原因の特定と解決策については、「エラーコード」をご参照ください。
ApsaraMQ for RabbitMQ コンソールでメッセージまたはそのトレースをクエリして、メッセージの送受信ステータスを確認できます。詳細については、「メッセージのクエリ」および「メッセージトレース」をご参照ください。