このトピックでは、オープンソースの RabbitMQ SDK を使用して ApsaraMQ for RabbitMQ ブローカーに接続する場合の注意事項について説明します。
クライアントを使用して ApsaraMQ for RabbitMQ ブローカーに接続する場合、自動接続回復機能を設定する必要がありますか?
com.rabbitmq.client.ConnectionFactory を設定する場合は、ブローカーのアップグレード中にクライアントがブローカーに再接続できるように、自動接続回復を有効にする必要があります。有効にしないと、メッセージの読み取りと書き込みが中断されます。
// 自動接続回復を有効にするかどうかを指定します。このパラメータを true に設定すると、自動接続回復が有効になります。このパラメータを false に設定すると、自動接続回復は無効になります。
factory.setAutomaticRecoveryEnabled(true);
// 2 回の連続した自動回復の間隔を指定します。単位:ミリ秒。
factory.setNetworkRecoveryInterval(5000);メッセージ生成中にどのような注意事項を守る必要がありますか?
メッセージの生成または消費中に、接続を頻繁に有効または無効にしないでください。持続的接続を使用することをお勧めします。このようにして、クライアントは、メッセージを送受信するためにクライアントを使用するたびに接続を確立する必要がなくなります。接続の頻繁な確立は、多数のネットワークおよびブローカーリソースを消費し、ブローカーに対する SYN フラッド攻撃に対する保護をトリガーすることさえあります。詳細については、「接続」をご参照ください。
メッセージを送信する前に、パブリッシャーの確認を有効にするかどうかを決定します。パブリッシャーの確認を有効にすると、ブローカーはローカルメソッドを呼び出して、メッセージの受信を確認します。メッセージの受信は、クライアントがブローカーから返された publishAck を受信した場合にのみ確認されることに注意してください。クライアントがメッセージの送信または publishAck の受信に失敗した場合は、メッセージの損失を防ぐために、クライアントでメッセージのリトライを実装します。
説明パブリッシャーの確認を有効にすると、メッセージ送信モードが同期から非同期に変更され、メッセージ送信のレイテンシが増加する可能性があります。この場合、ブローカーは、メッセージがディスクに書き込まれたことを確認した後にのみ、メッセージに対して成功応答を返します。
次の例は、Java クライアントでパブリッシャーの確認を実装する方法を示しています。
チャネルを作成するときに、パブリッシャーの確認を有効にします。
// 接続とチャネルを作成します。 Connection connection = createConnection(hostName, userName, passWord, virtualHost); Channel channel = connection.createChannel(); // チャネルのパブリッシャーの確認を有効にします。 channel.confirmSelect();basicPublishメソッドを呼び出した後、ブローカーからの応答を待ってから、応答に基づいてビジネスロジックを実行します。// メッセージを送信します。このプロセスでは、既存のチャネルを再利用することをお勧めします。 channel.basicPublish(exchangeName, bindingKey, true, props, ("example body").getBytes(StandardCharsets.UTF_8)); // 確認のタイムアウト期間。単位:ミリ秒。この例では、値は 3000 に設定されています。これは、確認のタイムアウト期間が 3 秒であることを指定します。 if (channel.waitForConfirms(3000)) { // メッセージをディスクに書き込むためのロジック。 } else { // メッセージを再送信するためのロジック。 }
mandatoryパラメータが true に設定されているが、ルーティングの問題によりメッセージがキューに到達しない場合、ブローカーはクライアントによって追加されたReturnListener操作を呼び出します。// ルーティングエラーのコールバックをチャネルに追加します。 channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId())); // メッセージを送信します。3 番目のパラメータを true に設定します。 channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8));メッセージを送信する場合は、各メッセージにカスタム ID を指定することを強くお勧めします。ID は、メッセージの一意の識別子です。ID を使用して、メッセージとそのトレースをクエリできます。また、ID を使用して問題のトラブルシューティングを行うこともできます。詳細については、「メッセージ ID を指定するにはどうすればよいですか?」をご参照ください。
メッセージの送信中に、
basicPublish操作によって返されるエラーのタイプに基づいて例外をスローするかどうかを決定する必要があります。ExchangeNotExistエラーなどのビジネス例外を示すエラーが返された場合は、例外をスローする必要があります。メッセージの送信中に速度制限が発生した場合は、サービスの継続性を確保するために、既存の接続を閉じて新しいチャネルを作成および初期化することをお勧めします。
次のサンプルコードは、ロジックの一部のみを示しています。完全なサンプルコードについては、「メッセージを生成する」をご参照ください。
private void doSend(String content) throws Exception { try { // メッセージ ID を指定します。 String msgId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build(); channel.basicPublish(exchangeName, routingKey, true, props, content.getBytes(StandardCharsets.UTF_8)); // 確認のタイムアウト期間。単位:ミリ秒。この例では、値は 3000 に設定されています。これは、確認のタイムアウト期間が 3 秒であることを指定します。 if (channel.waitForConfirms(3000)) { // メッセージをディスクに書き込むためのロジック。 } else { // メッセージを再送信するためのロジック。 } } catch (Exception e) { // エラーメッセージを取得します。 String message = e.getMessage(); System.out.println(message); // チャネルが閉じているかどうかを確認します。 if (channelClosedByServer(message)) { // 既存のチャネルを閉じて新しいチャネルを作成します。 factory.closeCon(channel); channel = factory.createChannel(); // パブリッシャーの確認を有効にするなど、新しいチャネルを初期化します。 this.initChannel(); // メッセージを再送信します。 doSend(content); } else { // エラーがブローカーによって発生していない場合は、例外をスローするか、後続の処理ロジックを指定します。 throw e; } } } private boolean channelClosedByServer(String errorMsg) { if (errorMsg != null && errorMsg.contains("channel.close")) { return true; } else { return false; } }
メッセージ消費中にどのような注意事項を守る必要がありますか?
メッセージを消費する場合は、消費の偏りを防ぐ必要があります。詳細については、「接続とチャネル」トピックの「使用方法に関する注意事項」セクションをご参照ください。
コンシューマタグを指定する代わりに、ブローカーによって生成されたグローバルに一意のコンシューマタグを使用することをお勧めします。コンシューマタグを指定する場合は、タグが一意であることを確認してください。
メッセージを消費する場合は、
basic.basicQosメソッドを呼び出してサービス品質 ( QoS ) 値を指定し、クライアントにキャッシュできる未確認メッセージの数を制限できます。指定された QoS 値に達すると、ブローカーはクライアントへのメッセージのプッシュを停止します。クライアントが ACK をコミットした後、ブローカーは確認されたメッセージの数と同じ数のメッセージをクライアントにプッシュします。このようにして、ブローカーにキャッシュされた未確認メッセージの最大数は、QoS 値以下になります。チャネルまたはコンシューマの QoS 値を指定することもできます。channel.basicQos(100, true)は、チャネルで作成されたすべてのコンシューマが、ブローカーに合計 100 件の未確認メッセージをキャッシュできることを示します。channel.basicQos(100, false)またはchannel.basicQos(100)は、チャネルで作成された各コンシューマが、ブローカーに 100 件の未確認メッセージをキャッシュできることを示します。クライアントで QoS 値を指定しない場合は、ブローカーのデフォルトの QoS 値が使用されます。デフォルトでは、ブローカーは、各コンシューマがブローカーにキャッシュできる未確認メッセージの数を 100 に制限します。この設定は、channel.basicQos(100, false)と同等です。指定するカスタム QoS 値は 100 を超えることはできません。超えると、設定は有効にならず、デフォルト値が使用されます。消費能力が高くない場合は、小さい QoS 値を指定することをお勧めします。ブローカーに累積されたメッセージの数が指定された QoS 値に達すると、ブローカーはクライアントへのメッセージのプッシュを停止します。この場合、ブローカーが断続的にクライアントにメッセージをプッシュするのがわかります。コンシューマを再起動すると、ブローカーはメッセージのプッシュを復元します。この問題を解決するには、コンシューマの消費能力を向上させることをお勧めします。 autoACK モードでメッセージを消費する場合、basicQosは有効になりません。コンシューマが有効期間内にメッセージの ACK をコミットしない場合、消費リトライがトリガーされ、メッセージが再配信されます。失敗した各メッセージは最大 16 回リトライできます。メッセージが 16 回リトライされた後も消費に失敗した場合、メッセージは破棄されるか、デッドレター交換に送信されます。次の表に、さまざまなインスタンスエディションの消費タイムアウト期間を示します。
インスタンスリトライポリシーのパラメータ
インスタンスタイプ
サーバーレスインスタンス
サブスクリプションインスタンス
共有
排他的
Enterprise Edition
Enterprise Platinum Edition
プロビジョニングされた容量とエラスティックトラフィックによる支払い/メッセージリクエストによる支払い
プロビジョニングされた容量とエラスティックトラフィックによる支払い
消費タイムアウト期間
最大値:3 時間
デフォルト値:5 分
最大値:12 時間
デフォルト値:30 分
最大値:3 時間
デフォルト値:5 分
最大値:12 時間
デフォルト値:30 分
最大配信試行回数
最大値:16
デフォルト値:16
最大値:16
デフォルト値:16
最大値:16
デフォルト値:16
最大値:64
デフォルト値:16
basicConsumeメソッドと比較して、basicGetメソッドは、メッセージのプル効率と 1 秒あたりのトランザクション数 ( TPS ) の制限が低くなっています。本番環境では、メッセージの数が多い場合は、basicConsumeメソッドを使用してメッセージを消費することをお勧めします。queueDeclareやexchangeDeclareなどのメタデータ操作には、速度制限のしきい値が課せられます。メッセージの送信中に操作を呼び出すのではなく、ApsaraMQ for RabbitMQ コンソールでキューと交換を作成することをお勧めします。そうしないと、速度制限がトリガーされ、接続が閉じられる可能性があります。詳細については、「制限」をご参照ください。