如果您希望消息被投递后延迟一段时间被消费者消费,您可以使用消息队列RabbitMQ版的延时消息。消息队列RabbitMQ版原生支持延时消息,使用方式比开源RabbitMQ更简单。

什么是延时消息

延时消息是指在指定时间段之后才被消费者消费的消息。

应用场景

延时消息适用于以下场景:

  • 对消息生产和消费有时间窗口要求的场景。例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
  • 通过消息触发延时任务的场景。例如,在指定时间段之后向用户发送提醒消息。

使用限制

  • 延时时间的值必须为非负整数。单位为毫秒。
  • 延时时间的最大值为86,400,000,即1天。若延时时间超过最大值,则当作普通消息处理。

方案对比

消息队列RabbitMQ版和开源RabbitMQ支持的延时消息实现方案对比如下:

项目 开源RabbitMQ 消息队列RabbitMQ版
死信Exchange+Queue的消息存活时间 ✔️ ✔️
死信Exchange+消息的消息存活时间 ✔️ ✔️
开源延时消息插件方案 ✔️ ✔️
原生延时消息方案(推荐) ✔️

原生延时消息方案

消息队列RabbitMQ版通过对消息设置delay来实现延时效果。消息队列RabbitMQ版原生延时消息的流转过程如下:
  1. 生产者向Exchange发布设置了delay的消息。
  2. Exchange将消息路由至Queue。
  3. 在设置的delay时间到期后,消费者才能从Queue消费消息。
注意 消息队列RabbitMQ版原生延时消息,则不要为Queue设置消息存活时间。如果您为Queue设置消息存活时间,则会导致延时消息失效。

原生延时消息最佳实践

  • 生产者客户端

    消息队列RabbitMQ版原生延时消息的使用方式非常简单。您只需要在生产者客户端发布消息时,通过delay为消息设置一个延时时间。

    发布延时消息的Java示例代码如下:

    Map<String, Object> headers = new HashMap<>();
    headers.put("delay", "xx");
    AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).headers(headers).build();

    更多语言示例代码,请参见AMQP Demos

  • 消费者客户端

    为保证延时消息时效性,建议您在消费消息时使用push模式的basic.consume方法,而不要使用pull模式的basic.get方法。因为消息队列RabbitMQ版的消息是分布式存储的,如果您使用pull模式的basic.get方法获取消息,并不能保证正好从存储的节点获取消息。

开源延时消息插件方案

为了减少与开源RabbitMQ的差别,消息队列RabbitMQ版也基于原生的延时消息支持使用开源插件式的方式来使用延时消息,并免去插件的安装。具体使用流程如下:

  1. 声明x-delayed-message类型的Exchange,并填写该Exchange的扩展参数x-delayed-type以指定Exchange的路由类型。示例如下:
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-delayed-type", "direct");
    channel.exchangeDeclare("ExchangeName", "x-delayed-message", true, false, args);
    参数说明如下:
    参数 说明
    x-delayed-type Exchange的类型,指定路由规则。取值说明如下:
    • direct
    • fanout
    • topic
    • headers
    • x-jms-queue
    • x-jms-topic
    ExchangeName Exchange的名称。
    说明 请确保声明的Exchange已存在。具体步骤,请参见创建Exchange
    x-delayed-message 指定Exchange类型,以支持投递延时消息。
  2. 发送延时消息。在消息的Header属性中增加一个键为x-delay,值为毫秒数的键值对,并且指定发送的目标Exchange为上一步已声明的Exchange。示例如下:
    byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
    Map<String, Object> headers = new HashMap<String, Object>();
    headers.put("x-delay", 5000);
    AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
    channel.basicPublish("ExchangeName", "", props.build(), messageBodyBytes);

    该示例中,消息到达Exchange后,会在5000毫秒后投递到对应的Queue。

常见问题

为什么实际的延时时间大于设置的延时时间?

因为客户端使用了pull模式的basic.get方法消费消息。消息队列RabbitMQ版的消息是集群存储的,使用pull模式的basic.get方法路由到一台Broker时,可能无法及时拉取存储在其他Broker上的消息。