Klien dapat terputus dari broker karena pembaruan broker, restart broker, atau gangguan jaringan. Topik ini menjelaskan cara mengonfigurasi pemulihan koneksi otomatis untuk klien Java, Python, dan PHP, serta menyediakan contoh kode.
Penyebab
Sebuah pengecualian I/O dilemparkan.
Operasi pembacaan soket habis waktu.
Deteksi hilangnya denyut jantung broker.
Solusi
Java
Pada klien Java (amqp-client) versi 4.0.0 dan yang lebih baru, pemulihan koneksi otomatis dan pemulihan topologi otomatis diaktifkan secara default. Tidak diperlukan konfigurasi tambahan dalam kode.
Anda dapat menggunakan metode berikut untuk mengaktifkan pemulihan koneksi otomatis dan pemulihan topologi otomatis. Pemulihan topologi mencakup tindakan pada antrian, pertukaran, pengikatan, dan konsumen.
amqp-client | RabbitMQ-Spring | Deskripsi |
|
| Menentukan apakah akan mengaktifkan pemulihan koneksi otomatis. |
|
| Menentukan interval antara dua percobaan ulang berturut-turut. Jika pemulihan koneksi gagal, klien akan mencoba menyambung kembali setelah interval yang ditentukan berlalu. Interval default adalah 5 detik. |
|
| Menentukan apakah akan mengaktifkan pemulihan topologi otomatis. Pemulihan topologi mencakup tindakan pada antrian, pertukaran, pengikatan, dan konsumen. |
Python
Pika adalah pustaka klien Python yang direkomendasikan oleh RabbitMQ open source. Pika tidak mendukung konfigurasi parameter untuk pemulihan koneksi otomatis. Untuk mengimplementasikan pemulihan koneksi otomatis di Python, Anda harus menulis fungsi panggilan balik secara manual.
PHP
php-amqplib adalah pustaka PHP yang digunakan untuk menerbitkan dan mengonsumsi pesan secara efisien dalam antrian pesan yang kompatibel dengan Protokol Pengaturan Pesan Lanjutan (AMQP), seperti RabbitMQ. Pustaka php-amqplib tidak mendukung konfigurasi parameter untuk pemulihan koneksi otomatis. Untuk mengimplementasikan pemulihan koneksi otomatis di PHP, Anda harus menulis kode secara manual.
Saat mengonfigurasi pemulihan koneksi otomatis, pastikan bahwa versi pustaka
php-amqplibadalah 3.6.1 atau yang lebih baru.Jika klien terhubung ke RabbitMQ menggunakan AMQProxy, kode untuk pemulihan koneksi otomatis tidak akan berfungsi saat klien terputus karena idle. Dalam skenario di mana pesan jarang dikirim dan diterima, pemutusan klien mungkin terjadi karena idle. Kami merekomendasikan agar Anda menghubungkan klien ke RabbitMQ menggunakan titik akhir RabbitMQ.
Contoh kode
Java
amqp-client
Berikut adalah contoh kode yang menunjukkan cara mengaktifkan pemulihan koneksi otomatis dan pemulihan topologi otomatis pada klien konsumen:
ConnectionFactory factory = new ConnectionFactory();
// Titik akhir. Anda bisa mendapatkan titik akhir instance di halaman Detail Instance di konsol ApsaraMQ for RabbitMQ.
factory.setHost("xxx.xxx.aliyuncs.com");
// Ganti ${instanceId} dengan ID instance ApsaraMQ for RabbitMQ. Anda bisa mendapatkan ID instance di halaman Instances di konsol ApsaraMQ for RabbitMQ.
factory.setCredentialsProvider(new AliyunCredentialsProvider("${instanceId}"));
// Nama vhost. Pastikan vhost dibuat di konsol ApsaraMQ for RabbitMQ.
factory.setVirtualHost("${VhostName}");
// Port default. Gunakan port 5672 untuk koneksi non-terenkripsi dan port 5671 untuk koneksi terenkripsi.
factory.setPort(5672);
// Periode timeout. Atur nilai berdasarkan lingkungan jaringan.
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
// Tentukan apakah akan mengaktifkan pemulihan koneksi otomatis.
factory.setAutomaticRecoveryEnabled(true);
// Interval percobaan ulang. Atur nilainya menjadi 10 detik.
factory.setNetworkRecoveryInterval(10000);
// Tentukan apakah akan mengaktifkan pemulihan topologi otomatis.
factory.setTopologyRecoveryEnabled(true);
Connection connection = factory.newConnection(); RabbitMQ-Spring
Berikut adalah contoh kode yang menunjukkan cara mengaktifkan pemulihan koneksi otomatis dan pemulihan topologi otomatis pada klien konsumen. Untuk informasi lebih lanjut tentang kode lengkap, lihat SprintBootDemo.
// Inisialisasi objek ConnectionFactory untuk terhubung ke klien RabbitMQ.
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
// Host virtual. Anda dapat membuat host virtual di konsol ApsaraMQ for RabbitMQ atau tentukan parameter berikut untuk membuatnya secara otomatis.
connectionFactory.setVirtualHost(virtualHost);
// Pastikan pemulihan otomatis diaktifkan. Dengan cara ini, klien dapat menyambung kembali ke broker selama rilis broker.
connectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
connectionFactory.getRabbitConnectionFactory().setNetworkRecoveryInterval(10000);
connectionFactory.getRabbitConnectionFactory().setTopologyRecoveryEnabled(true)
// Mode cache. Kami merekomendasikan Anda mengatur parameter ini ke CONNECTION.
connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CONNECTION);
// Jumlah maksimum koneksi yang dapat disimpan dalam mode CONNECTION.
connectionFactory.setConnectionCacheSize(10);
// Jumlah maksimum saluran yang dapat disimpan dalam mode CONNECTION.
connectionFactory.setChannelCacheSize(64);
return connectionFactory; Python
Berikut adalah contoh kode yang menunjukkan cara mengaktifkan pemulihan koneksi otomatis pada klien konsumen menggunakan Pika:
# -*- coding: utf-8 -*-
import logging
import time
import pika
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
LOGGER = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
class Consumer(object):
def __init__(self, amqp_url, queue):
self.should_reconnect = False
self._connection = None
self._channel = None
self._closing = False
self._url = amqp_url
self._queue = queue
def connect(self):
'''
Buat koneksi dan konfigurasikan callback berikut:
on_open_callback: callback yang dipanggil saat koneksi dibuat.
on_open_error_callback: callback yang dipanggil saat koneksi gagal dibuat.
on_close_callback: callback yang dipanggil saat koneksi ditutup.
'''
return pika.SelectConnection(
parameters=pika.URLParameters(self._url),
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed)
def on_connection_open(self, _unused_connection):
'''
Callback yang dipanggil saat koneksi dibuat.
Buat saluran dan konfigurasikan callback berikut:
on_channel_open: callback yang dipanggil saat saluran dibuat.
'''
self._connection.channel(on_open_callback=self.on_channel_open)
def on_connection_open_error(self, _unused_connection, err):
"""
Callback yang dipanggil saat koneksi gagal dibuat.
Cetak pesan kesalahan dan buat koneksi ulang.
"""
LOGGER.error('Koneksi gagal dibuka: %s', err)
self.reconnect()
def on_connection_closed(self, _unused_connection, reason):
"""
Callback yang dipanggil saat koneksi ditutup.
Skenario berikut mungkin terjadi:
1. Koneksi ditutup sesuai harapan dan klien keluar.
2. Klien terputus secara tak terduga dan mencoba membuat koneksi ulang.
"""
self._channel = None
if self._closing:
self._connection.ioloop.stop()
else:
LOGGER.warning('Koneksi ditutup, rekonfigurasi diperlukan: %s', reason)
self.reconnect()
def close_connection(self):
"""
Tutup koneksi.
"""
if self._connection.is_closing or self._connection.is_closed:
LOGGER.info('Koneksi sedang ditutup atau sudah ditutup')
else:
LOGGER.info('Menutup koneksi')
self._connection.close()
def reconnect(self):
"""
Setel parameter self.should_reconnect ke True dan hentikan loop I/O.
"""
self.should_reconnect = True
self.stop()
def on_channel_open(self, channel):
"""
Callback yang dipanggil saat saluran dibuat.
Konfigurasikan callback.
on_channel_closed: callback yang dipanggil saat saluran ditutup.
Mulai konsumsi pesan dari antrian.
"""
self._channel = channel
self._channel.add_on_close_callback(self.on_channel_closed)
self.start_consuming()
def on_channel_closed(self, channel, reason):
"""
Callback yang dipanggil saat saluran ditutup.
Cetak informasi saluran dan tutup koneksi
"""
LOGGER.warning('Saluran %i ditutup: %s', channel, reason)
self.close_connection()
def start_consuming(self):
"""
Mulai konsumsi pesan dari antrian.
"""
LOGGER.info('mulai mengonsumsi...')
self._channel.basic_consume(
self._queue, self.on_message)
def on_message(self, _unused_channel, basic_deliver, properties, body):
"""
Konsumsi pesan dan unggah pengakuan (ACK).
"""
LOGGER.info('Pesan diterima: %s', body.decode())
# Logika konsumsi.
self._channel.basic_ack(basic_deliver.delivery_tag)
def run(self):
"""
Buat koneksi dan mulai loop I/O.
"""
self._connection = self.connect()
self._connection.ioloop.start()
def stop(self):
"""
Hentikan loop I/O.
"""
if not self._closing:
self._closing = True
self._connection.ioloop.stop()
LOGGER.info('Dihentikan')
class AutoRecoveryConsumer(object):
def __init__(self, amqp_url, queue):
self._amqp_url = amqp_url
self._queue = queue
self._consumer = Consumer(self._amqp_url, queue)
def run(self):
"""
Lakukan loop while True hingga pengecualian KeyboardInterrupt dilemparkan.
Di metode run(), antrian tempat loop I/O mendengarkan dimulai dan pesan diproses. Loop memastikan bahwa konsumen dapat terus berjalan dan secara otomatis menyambung kembali ke broker.
"""
while True:
try:
self._consumer.run()
except KeyboardInterrupt:
self._consumer.stop()
break
self._maybe_reconnect()
def _maybe_reconnect(self):
"""
Tentukan apakah rekonfigurasi diperlukan. Interval antara dua rekonfigurasi berturut-turut adalah 1 detik.
"""
if self._consumer.should_reconnect:
self._consumer.stop()
time.sleep(1)
self._consumer = Consumer(self._amqp_url, self._queue)
def main():
username = 'MjoxODgwNzcwODY5MD****'
password = 'NDAxREVDQzI2MjA0OT****'
host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com'
port = 5672
vhost = 'vhost_test'
# amqp_url: amqp://<username>:<password>@<host>:<port>/<vhost>
amqp_url = 'amqp://%s:%s@%s:%i/%s' % (username, password, host, port, vhost)
consumer = AutoRecoveryConsumer(amqp_url, 'QueueTest')
consumer.run()
if __name__ == '__main__':
main()PHP
Berikut adalah contoh kode yang menunjukkan cara mengaktifkan pemulihan koneksi otomatis pada klien konsumen menggunakan php-amqplib:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
const ONE_SECOND = 1;
/**
* Buat koneksi.
*/
function connect() {
$host = '1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com';
$username = 'NDAxREVDQzI2MjA0OT****';
$password = 'NDAxREVDQzI2MjA0OT****';
$port = 5672;
$vhost = 'vhost_test';
return new AMQPStreamConnection($host, $port, $username, $password, $vhost);
}
/**
* Lepaskan koneksi.
*/
function cleanup_connection($connection) {
try {
if($connection !== null) {
$connection->close();
}
} catch (\ErrorException $e) {
}
}
$connection = null;
while(true){
try {
$connection = connect();
start_consuming($connection);
} catch (AMQPConnectionClosedException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage() . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\RuntimeException $e) {
echo 'Pengecualian runtime ' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
} catch(\ErrorException $e) {
echo 'Pengecualian error ' . PHP_EOL;
cleanup_connection($connection);
sleep(ONE_SECOND);
}
}
/**
* Mulai konsumsi.
* @param AMQPStreamConnection $connection
*/
function start_consuming($connection) {
$queue = 'queueTest';
$consumerTag = 'consumer';
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');
while ($channel->is_consuming()) {
$channel->wait();
}
}
/**
* Proses pesan.
* @param \PhpAmqpLib\Message\AMQPMessage $message
*/
function process_message($message)
{
// Proses logika bisnis.
echo "\n--------\n";
echo $message->body;
echo "\n--------\n";
$message->ack();
}Batasan
Pendeteksian kegagalan koneksi memerlukan waktu tertentu. Gunakan mekanisme Publisher Confirms untuk memastikan bahwa pesan yang dikirim selama periode ini tidak hilang.
Penyimpangan saluran tidak dapat memicu pemulihan koneksi otomatis. Penyimpangan saluran biasanya merupakan masalah tingkat aplikasi dan harus ditangani oleh pemilik aplikasi.
Pemulihan koneksi otomatis tidak menyebabkan saluran pulih secara otomatis.
Setelah koneksi terputus, antrian eksklusif yang dinyatakan oleh koneksi dihapus, dan data terkait dibersihkan. Setelah koneksi dipulihkan secara otomatis, konsumsi pesan dari antrian eksklusif gagal.
Pastikan setiap konsumen yang dinyatakan oleh koneksi memiliki tag konsumen yang unik. Jika beberapa konsumen memiliki tag konsumen yang sama dalam satu koneksi, hanya satu konsumen yang dapat dipulihkan selama pemulihan otomatis koneksi. Jika Anda tidak menentukan tag konsumen untuk konsumen, broker secara otomatis memberikan tag konsumen unik kepada konsumen tersebut.