本文记录通过SDK收发消息时常见的问题。

开源客户端是否可以直接访问云上服务?

消息队列RabbitMQ版完全兼容开源RabbitMQ。开源RabbitMQ可以直接访问云上服务。您需要通过消息队列RabbitMQ版控制台生成静态用户名密码之后,通过静态账户直接访问云上服务。如何创建静态用户名密码,请参见创建静态用户名密码

支持哪些语言的开源SDK?

开源RabbitMQ提供的多语言或框架SDK消息队列RabbitMQ版全部都支持。具体信息,请参见表 1

是否可以直接使用开源RabbitMQ JMS Client?

消息队列RabbitMQ版不支持直接使用开源RabbitMQ JMS Client。您需要通过Maven配置依赖库才能接入消息队列RabbitMQ版收发消息。如何通过Maven配置依赖库来收发消息,请参见JMS概述

JMS标准有哪些接口不支持?

消息队列RabbitMQ版支持JMS 1.1。具体兼容的接口信息,请参见JMS接口兼容性

如果是自动ACK,是否支持通过Reject来触发消息重新入队列?

不可以。您可以使用basicReject方法否定应答单条消息,或者使用basicNack方法否定应答一条或多条消息,并使消息重入队列。

参数 说明
deliveryTag Channel的消息投递的唯一标识符。
requeue 被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange。
示例代码
channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) {
   @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                      AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
        channel.basicReject(envelope.getDeliveryTag(), true);
    }
});
参数 说明
deliveryTag Channel的消息投递的唯一标识符。
multiple 是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
requeue 被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;如果设置为false,则消息被丢弃或发送到死信Exchange。更多信息,请参见死信Exchange。
示例代码
channel.basicConsume("test", false, "consumertag", new DefaultConsumer(channel) {
   @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                      AMQP.BasicProperties properties, byte[] body)
            throws IOException {
        System.out.println("Rejected: " + new String(body, "UTF-8") + ", deliveryTag: " + envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
        channel.basicNack(envelope.getDeliveryTag(), true, true);
    }
});

如何设置Message ID?

如果您需追踪和识别消息,可以在消息队列RabbitMQ版的Producer客户端设置Message ID属性,为每条消息设置唯一标识符。

消息队列RabbitMQ版的Producer客户端设置Basic.Propertiesmessage-id属性。示例代码如下:

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId("messageid").build(); channel.basicPublish("${ExchangeName}", "BindingKey", true, props, ("消息发送Body").getBytes(StandardCharsets.UTF_8));

properties = pika.BasicProperties(app_id='example-publisher', content_type='application/json', 'message_id'='messageid')

$msg = new AMQPMessage($msgBody, ['application_headers'=>$amqpTable,'content_type' => 'text/plain', 'delivery_mode' => 2,'message_id' => 'messageid',]);

err = ch.Publish( "helloExchange", "hello", false, false, amqp.Publishing { ContentType: "text/plain", Body: []byte(body), MessageId: "messageId", })

channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);

如何删除队列消息?

您可以使用Java客户端库中queuePurge方法删除某个队列的所有消息。示例代码如下:

channel.queuePurge("queue-name");