クライアントは、ブローカーのアップグレード、ブローカーの再起動、またはネットワークのジッターが原因で、ブローカーから切断される可能性があります。このトピックでは、Java、Python、および PHP クライアントの自動接続回復を設定する方法について説明します。このトピックでは、サンプルコードも提供します。
原因
I/O 例外がスローされます。
ソケット読み取り操作がタイムアウトしました。
ブローカーのハートビートの欠落が検出されました。
解決策
Java
Java クライアント (amqp-client) 4.0.0 以降のバージョンでは、自動接続回復と自動トポロジー回復が自動的に有効になります。コードでパラメータを設定する必要はありません。
次のメソッドを使用して、自動接続回復と自動トポロジー回復を有効にすることができます。トポロジーの回復には、キュー、交換、バインディング、およびコンシューマーに対する操作が含まれます。
amqp-client | RabbitMQ-Spring | 説明 |
|
| 自動接続回復を有効にするかどうかを指定します。 |
|
| 2 回の連続する再試行の間隔を指定します。接続の回復に失敗した場合、クライアントは指定された間隔が経過した後に再接続を試みます。デフォルトの間隔は 5 秒です。 |
|
| 自動トポロジー回復を有効にするかどうかを指定します。トポロジーの回復には、キュー、交換、バインディング、およびコンシューマーに対する操作が含まれます。 |
Python
Pika は、オープンソースの RabbitMQ によって推奨される Python クライアントライブラリです。Pika では、自動接続回復を実装するためのパラメーターを設定することはできません。Python で自動接続回復を実装するには、コールバック関数を手動で記述する必要があります。
PHP
php-amqplib は、RabbitMQ などの Advanced Message Queuing Protocol(AMQP)と互換性のあるメッセージキューでメッセージを効率的にパブリッシュおよびコンシュームするために使用される PHP ライブラリです。 php-amqplib ライブラリでは、自動接続回復を実装するためのパラメータを設定することはできません。 PHP で自動接続回復を実装するには、手動でコードを作成する必要があります。
自動接続回復を設定する場合は、
php-amqplibライブラリのバージョンが 3.6.1 以降であることを確認してください。クライアントが AMQProxy を使用して RabbitMQ に接続されている場合、アイドル状態が原因でクライアントが切断されたときに、自動接続回復のコードは有効になりません。メッセージの送受信がまれなシナリオでは、アイドル状態が原因でクライアントの切断が発生する可能性があります。RabbitMQ のエンドポイントを使用してクライアントを RabbitMQ に接続することをお勧めします。
サンプルコード
Java
amqp-client
Connection および Topology の自動回復を有効にするクライアントのサンプルコードは次のとおりです。
ConnectionFactory factory = new ConnectionFactory();
// エンドポイント。インスタンスのエンドポイントは、ApsaraMQ for RabbitMQ コンソールの [インスタンスの詳細] ページで取得できます。
factory.setHost("xxx.xxx.aliyuncs.com");
// ${instanceId} を ApsaraMQ for RabbitMQ インスタンスの ID に置き換えます。インスタンス ID は、ApsaraMQ for RabbitMQ コンソールの [インスタンス] ページで取得できます。
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
// vhost 名。 ApsaraMQ for RabbitMQ コンソールで vhost が作成されていることを確認してください。
factory.setVirtualHost("${VhostName}");
// デフォルトポート。暗号化されていない接続にはポート 5672 を使用し、暗号化された接続にはポート 5671 を使用します。
factory.setPort(5672);
// タイムアウト期間。ネットワーク環境に基づいて値を設定します。
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// 自動接続回復を有効にするかどうかを指定します。
factory.setAutomaticRecoveryEnabled(true);
// 再試行間隔。値を 10 秒に設定します。
factory.setNetworkRecoveryInterval(10000);
// 自動トポロジー回復を有効にするかどうかを指定します。
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection(); RabbitMQ-Spring
次のサンプルコードは、コンシューマークライアントで自動接続回復と自動トポロジー回復を有効にする方法の例を示しています。完全なサンプルコードについては、SprintBootDemo を参照してください。
// RabbitMQ クライアントに接続するための ConnectionFactory オブジェクトを初期化します。
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// 仮想ホスト。 ApsaraMQ for RabbitMQ コンソールで仮想ホストを作成するか、次のパラメータを指定して自動的に作成することができます。
connectionFactory.setVirtualHost(virtualHost);
// 自動再接続が有効になっていることを確認してください。これにより、ブローカーのリリース中にクライアントがブローカーに再接続できます。
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
connectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(10000);
connectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true)
// キャッシュモード。このパラメータを CONNECTION に設定することをお勧めします。
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// CONNECTION モードでキャッシュできる接続の最大数。
connectionFactory.setConnectionCacheSize(10);
// CONNECTION モードでキャッシュできるチャネルの最大数。
connectionFactory.setChannelCacheSize(64);
return connectionFactory; Python
次のサンプルコードは、Pika を使用してコンシューマークライアントで自動接続回復を有効にする方法の例を示しています。
# -*- coding: utf-8 -*-
import logging
import time
import pika
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
class Consumer(object):
def __init__(self, amqp_url, queue):
self.should_reconnect = False
self._connection = None
self._channel = None
self._closing = False
self._url = amqp_url
self._queue = queue
def connect(self):
'''
接続を作成し、次のコールバックを設定します。
on_open_callback:接続が作成されたときに呼び出されるコールバック。
on_open_error_callback:接続の作成に失敗したときに呼び出されるコールバック。
on_close_callback:接続が閉じられたときに呼び出されるコールバック。
'''
return pika.SelectConnection(
parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
'''
接続が作成されたときに呼び出されるコールバック。
チャネルを作成し、次のコールバックを設定します。
on_channel_open:チャネルが作成されたときに呼び出されるコールバック。
'''
self._connection.channel(on_open_callback=self.on_channel_open)
def on_connection_open_error(self, _unused_connection, err):
"""
接続の作成に失敗したときに呼び出されるコールバック。
エラーメッセージを出力し、接続を再作成します。
"""
LOGGER.error('Connection open failed: %s', err)
self.reconnect()
def on_connection_closed(self, _unused_connection, reason):
"""
接続が閉じられたときに呼び出されるコールバック。
次のシナリオが発生する可能性があります。
1.接続は予期どおりに閉じられ、クライアントは終了します。
2.クライアントは予期せず切断され、接続の再作成を試みます。
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Connection closed, reconnect necessary: %s', reason)
self.reconnect()
def close_connection(self):
"""
接続を閉じます。
"""
if self._connection.is_closing or self._connection.is_closed:
LOGGER.info('Connection is closing or already closed')
else:
LOGGER.info('Closing connection')
self._connection.close()
def reconnect(self):
"""
self.should_reconnect パラメーターを True に設定し、I/O ループを停止します。
"""
self.should_reconnect = True
self.stop()
def on_channel_open(self, channel):
"""
チャネルが作成されたときに呼び出されるコールバック。
コールバックを設定します。
on_channel_closed:チャネルが閉じられたときに呼び出されるコールバック。
キューからのメッセージ消費を開始します。
"""
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self.start_consuming()
def on_channel_closed(self, channel, reason):
"""
チャネルが閉じられたときに呼び出されるコールバック。
チャネル情報を出力し、接続を閉じます。
"""
LOGGER.warning('Channel %i was closed: %s', channel, reason)
self.close_connection()
def start_consuming(self):
"""
キューからのメッセージ消費を開始します。
"""
LOGGER.info('start consuming...')
self._channel.basic_consume(
self._queue, self.on_message)
def on_message(self, _unused_channel, basic_deliver, properties, body):
"""
メッセージを消費し、確認応答 (ACK) をアップロードします。
"""
LOGGER.info('Received message: %s', body.decode())
# 消費ロジック。
self._channel.basic_ack(basic_deliver.delivery_tag)
def run(self):
"""
接続を作成し、I/O ループを開始します。
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""
I/O ループを停止します。
"""
if not self._closing:
self._closing = True
self._connection.ioloop.stop()
LOGGER.info('Stopped')
class AutoRecoveryConsumer(object):
def __init__(self, amqp_url, queue):
self._amqp_url = amqp_url
self._queue = queue
self._consumer = Consumer(self._amqp_url, queue)
def run(self):
"""
KeyboardInterrupt 例外がスローされるまで、while True ループを実行します。
run() メソッドでは、I/O ループがリッスンするキューが開始され、メッセージが処理されます。ループにより、コンシューマーは継続的に実行され、ブローカーに自動的に再接続できます。
"""
while True:
try:
self._consumer.run()
except KeyboardInterrupt:
self._consumer.stop()
break
self._maybe_reconnect()
def _maybe_reconnect(self):
"""
再接続が必要かどうかを判断します。2 回の連続する再接続の間隔は 1 秒です。
"""
if self._consumer.should_reconnect:
self._consumer.stop()
time.sleep(1)
self._consumer = Consumer(self._amqp_url, self._queue)
def main():
username = 'MjoxODgwNzcwODY5MD****'
password = 'NDAxREVDQzI2MjA0OT****'
host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com'
port = 5672
vhost = 'vhost_test'
# amqp_url: amqp://<username>:<password>@<host>:<port>/<vhost>
amqp_url = 'amqp://%s:%s@%s:%i/%s' % (username, password, host, port, vhost)
consumer = AutoRecoveryConsumer(amqp_url, 'QueueTest')
consumer.run()
if __name__ == '__main__':
main()PHP
次のサンプルコードは、php-amqplib を使用してコンシューマークライアントで自動接続回復を有効にする方法の例を示しています。
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
const ONE_SECOND = 1;
/**
* 接続を作成します。
*/
function connect() {
$host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com';
$username = 'NDAxREVDQzI2MjA0OT****';
$password = 'NDAxREVDQzI2MjA0OT****';
$port = 5672;
$vhost = 'vhost_test';
return new AMQPStreamConnection($host, $port, $username, $password, $vhost);
}
/**
* 接続を解放します。
*/
function cleanup_connection($connection) {
try {
if($connection !== null) {
$connection->close();
}
} catch (\ErrorException $e) {
}
}
$connection = null;
while(true){
try {
$connection = connect();
start_consuming($connection);
} catch (AMQPConnectionClosedException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\RuntimeException $e) {
echo 'ランタイム例外' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\ErrorException $e) {
echo 'エラー例外' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
}
}
/**
* 消費を開始します。
* @param AMQPStreamConnection $connection
*/
function start_consuming($connection) {
$queue = 'queueTest';
$consumerTag = 'consumer';
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
while ($channel->is_consuming()) {
$channel->wait();
}
}
/**
* メッセージを処理します。
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
// ビジネスロジックを処理します。
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
}制限
接続障害の検出には、完了するまでに一定の時間がかかります。パブリッシャーの確定メカニズムを使用して、この期間に送信されたメッセージが失われないようにすることができます。
チャネル例外は、自動接続回復をトリガーできません。ほとんどの場合、チャネル例外はアプリケーションレベルの問題であり、アプリケーション所有者が処理する必要があります。
自動接続回復によって、チャネルが自動的に回復することはありません。
接続が切断されると、接続によって宣言された排他キューが削除され、関連データがクリアされます。接続が自動的に回復した後、排他キューからのメッセージ消費は失敗します。
接続によって宣言された各コンシューマーに一意のコンシューマタグがあることを確認してください。接続内で複数のコンシューマーが同じコンシューマタグを持っている場合、接続の自動回復中に回復できるコンシューマーは 1 つだけです。コンシューマーのコンシューマタグを指定しない場合、ブローカーは自動的に一意のコンシューマタグを割り当てます。