针对初次接触消息队列 RocketMQ 版的工程师,本文以 TCP 协议下的 Java 为例,提供操作示例帮助您从零开始搭建消息队列 RocketMQ 版测试工程。Demo 工程包含普通消息、顺序消息、事务消息、定时和延时消息的测试代码,以及相关 Spring 的配置。

前提条件

  • 安装 IDE。

    您可以使用 IntelliJ IDEA 或者 Eclipse,本文以 IntelliJ IDEA 为例。

    请下载 IntelliJ IDEA Ultimate 版本,并参考 IntelliJ IDEA 说明进行安装。详情请访问下载地址

  • 下载 Demo 工程。

    详情请访问 rocketmq-demo。下载到本地并解压后即可看到本地新增了 rocketmq-demo-master 文件夹,该文件夹包括纯 Java、Spring 以及 Spring Boot 的示例代码。

  • 下载安装 JDK。详情请访问 JDK 下载地址

配置 Demo 工程

  1. 将 Demo 工程文件导入 IntelliJ IDEA。
  2. 创建资源。

    您需要先到控制台创建所需资源,包括消息队列 RocketMQ 版的实例、Topic、Group ID(GID),以及鉴权需要的 AccessKey(AK)。

    更多详细信息和操作指导,请参见创建资源

  3. 配置 Demo。
    您需将在步骤 2 中创建好的资源信息配置到 2 个文件: MqConfig 类和 common.xml
    1. 按以下说明配置 MqConfig 类。
      public static final String TOPIC = "您刚创建的 Topic";
      public static final String GROUP_ID = "您刚创建的 Group ID";
      public static final String ORDER_TOPIC = "您刚创建的用于收发顺序消息的 Topic";
      public static final String ORDER_GROUP_ID = "您刚创建的用于收发顺序消息的 Group ID";
      public static final String ACCESS_KEY = "您的阿里云账号的 AccessKey ID";
      public static final String SECRET_KEY = "您的阿里云账号的 AccessKey Secret";
      public static final String TAG = "您自定义的消息 Tag 属性";
      public static final String NAMESRV_ADDR = "您刚创建的消息队列 RocketMQ 版实例的 TCP 接入点,可在消息队列 RocketMQ 版控制台实例详情页面获取 TCP 协议客户端接入点";                              
      说明
      • 创建 AccessKey(包括 AccessKey ID 和 AccessKey Secret)的具体步骤,请参见创建 AccessKey
      • 如果 RAM 子账号拥有该 Topic 的权限以及自己的 AccessKey,那么也可以使用 RAM 子账号的 AccessKey。
      • 参数与接口的更多信息,请参见接口和参数说明
    2. 配置 common.xml
       <props>
       <prop key="AccessKey">XXX</prop> <!-- 使用前请修改这些资源信息 -->
       <prop key="SecretKey">XXX</prop>
       <prop key="GROUP_ID">XXX</prop>
       <prop key="Topic">XXX</prop>
       <prop key="NAMESRV_ADDR">XXX</prop>
      </props>

以 Main 方式运行 Demo

  1. 发送消息。
    • 发送普通消息:
      • 以纯 Java 方式发送普通消息:运行 SimpleMQProducer 类。
      • 以 Spring 方式发送普通消息:运行 ProducerClient 类。
      • 以 Spring Boot 方式发送普通消息:运行 ProducerClient 类。
    • 发送事务消息:
      • 以纯 Java 方式发送事务消息:运行 SimpleTransactionProducer 类。

        LocalTransactionCheckerImpl 类为本地事务 check 接口类,用于校验事务。详情请参见收发事务消息

      • 以 Spring 方式发送事务消息:运行 TransactionProducerClient 类。
      • 以 Spring Boot 方式发送事务消息:运行 TransactionProducerClient 类。
      运行 SimpleTransactionProducer 类。
    • 发送顺序消息:
      • 以纯 Java 方式发送顺序消息:运行 SimpleOrderProducer 类。
      • 以 Spring 方式发送顺序消息:运行 OrderProducerClient 类。
      • 以 Spring Boot 方式发送顺序消息:运行 OrderProducerClient 类。

      此方式下,消息发布和消费都按顺序进行。详情请参见收发顺序消息

    • 发送定时和延时消息:运行 MQTimerProducer 类发送消息。延时 3 秒后投递。

      您也可以指定一个精确的投递时间,最长定时时间为 40 天。详情请参见收发定时消息

    消息队列 RocketMQ 版控制台,按 Topic 查询消息,可以看到消息已经发送至 Topic。
  2. 接收消息。
    • 接收普通消息:
      • 以纯 Java 方式接收普通消息:运行 SimpleMQConsumer 类。
      • 以 Spring 方式接收普通消息:运行 ConsumerClient 类。
      • 以 Spring Boot 方式接收普通消息:运行 ConsumerClient 类。
    • 接收事务消息:
      • 以纯 Java 方式接收事务消息:运行 SimpleMQConsumer 类。
      • 以 Spring 方式接收事务消息:运行 ConsumerClient 类。
      • 以 Spring Boot 方式接收事务消息:运行 ConsumerClient 类。
    • 接收顺序消息:
      • 以纯 Java 方式接收顺序消息:运行 SimpleOrderConsumer 类。
      • 以 Spring 方式接收顺序消息:运行 OrderConsumerClient 类。
      • 以 Spring Boot 方式接收顺序消息:运行 OrderConsumerClient 类。
    • 接收定时和延时消息:运行 SimpleMQConsumer 类。
      说明 Spring 和 Spring Boot 框架暂不支持收发定时和延时消息。
    可以看到消息被接收打印的日志。因为有初始化,所以需等待几秒,在生产环境中不会经常初始化。

结果验证:在消息队列 RocketMQ 版控制台查看消费者状态,可以看到启动的消费端已经在线,并且订阅关系一致。

更多信息