All Products
Search
Document Center

EventBridge:Use event streams to route messages between ApsaraMQ for RabbitMQ instances

Last Updated:Dec 30, 2024

This topic describes how to use the event stream feature provided by EventBridge to route messages between ApsaraMQ for RabbitMQ instances.

Prerequisites

Background information

As a type of lightweight channel for processing end-to-end streaming data in real time, event streams allow you to filter and transform lightweight streaming data and synchronize data between data warehouses, between data processing programs, and between data analysis and data processing systems. You can use an event stream to route messages that are produced on a source ApsaraMQ for RabbitMQ instance to a destination ApsaraMQ for RabbitMQ instance without the need to define an event bus. For more information, see Overview.

Step 1: Create an event stream for the destination instance

Note

EventBridge allows you to create an event stream to route messages only between ApsaraMQ for RabbitMQ instances that are deployed in the same region.

  1. Log on to the EventBridge console.

  2. In the top navigation bar, select a region.

  3. In the left-side navigation pane, click Event Streams.

  4. On the Event Streams page, click Create Event Stream.

  5. On the Create Event Stream page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:

    • Task Creation

        1. In the Source step, set the Data Provider parameter to ApsaraMQ for RabbitMQ and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

          Parameter

          Description

          Example

          Region

          The region where the source ApsaraMQ for RabbitMQ instance resides.

          China (Hangzhou)

          ApsaraMQ for RabbitMQ Instance

          The source ApsaraMQ for RabbitMQ instance.

          amqp-cn-7pp2mwbc****

          Vhost

          The vhost of the source ApsaraMQ for RabbitMQ instance.

          test

          Queue

          The queue in which the messages that you want to route are stored.

          test

          Messages

          The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

          100

          Interval (Unit: Seconds)

          The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 specifies that messages are sent immediately after aggregation.

          3

        2. In the Filtering step, specify the rule used to filter events and click Next Step. You do not need to configure the Transformation step in message routing scenarios.

        3. In the Sink step, set the Service Type parameter to ApsaraMQ for RabbitMQ and follow the on-screen instructions to configure other parameters. Then, click Save. The following table describes the parameters.

        4. Parameter

          Description

          Example

          Instance ID

          The destination ApsaraMQ for RabbitMQ instance that you created.

          amqp-cn-zvp2pny6****

          vhost

          The vhost that you created.

          test

          Destination Type

          • Exchange: A producer sends a message to an exchange, and then the exchange routes the message to one or more queues.

          • Queue: Each message is sent to one or more queues.

          Queue

          Exchange

          The exchange to which messages are routed on the ApsaraMQ for RabbitMQ instance. This parameter is required only if you set the Destination Type parameter to Exchange.

          exchange

          Queue

          The queue to which messages are routed on the ApsaraMQ for RabbitMQ instance. This parameter is required only if you set the Destination Type parameter to Queue.

          queue

          Message Routing Key

          EventBridge extracts data from an event by using JSONPath and routes the specified content of the event to the event target. This parameter is required only if you set the Destination Type parameter to Exchange.

          Partial Event

          $.data.key

          Message Body

          EventBridge extracts data from an event by using JSONPath and routes the specified event content to the event target.

          Partial Event

          $.data.body

          MessageId

          EventBridge extracts data from an event by using JSONPath and routes the specified event content to the event target.

          Partial Event

          $.data.props.messageId

          Custom Property

          EventBridge extracts data from an event by using JSONPath and routes the specified event content to the event target.

          Partial Event

          $.data.props
    • Task Property

      Configure the retry policy and dead-letter queue for the event stream. For more information, see Retry policies and dead-letter queues.

  6. Go back to the Event Streams page, find the event stream that you created, and then click Enable in the Actions column.

  7. In the Note message that appears, click OK.

    Enabling an event stream requires 30 to 60 seconds to complete. You can view the progress in the Status column of the event stream on the Event Streams page.

Step 2: Use an SDK to send a message

  1. Obtain the endpoint. Before you send and receive messages, you must obtain the endpoint that is used to access the ApsaraMQ for RabbitMQ instance in the ApsaraMQ for RabbitMQ console and specify the endpoint for a publisher. This way, the publisher can use the endpoint to access the ApsaraMQ for RabbitMQ instance.

    1. Log on to the ApsaraMQ for RabbitMQ console.

    2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for RabbitMQ instance that you want to manage resides.

    3. On the Instances page, click the name of the instance that you want to manage.

    4. On the Endpoint Information tab of the Instance Details page, move the pointer over the type of endpoint that you want to use. Then, click the 复制 icon next to the endpoint to copy the endpoint.

      Type

      Description

      Example

      Public endpoint

      You can access an instance over the Internet to read and write data. By default, pay-as-you-go instances support public endpoints. To use the public endpoint of a subscription instance, you must select a public endpoint when you create the subscription instance.

      XXX.mq-amqp.cn-hangzhou-a.aliyuncs.com

      VPC endpoint

      You can access an instance over a VPC to read and write data. By default, pay-as-you-go and subscription instances support VPC endpoints.

      XXX.mq-amqp.cn-hangzhou-a-internal.aliyuncs.com

  2. Install Java dependencies. Add the following dependency to the pom.xml file:

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.5.0</version> <!-- All versions of open source RabbitMQ are supported. -->
    </dependency>
  3. Generate a pair of username and password.

    1. Log on to the ApsaraMQ for RabbitMQ console.

    2. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for RabbitMQ instance that you want to manage resides.

    3. On the Instances page, click the name of the instance that you want to manage.

    4. In the left-side navigation pane, click Static Accounts.

    5. On the Static Accounts page, click Create Username/Password.

    6. In the Create Username/Password panel, enter an AccessKey ID in the AccessKey ID field and an AccessKey secret in the AccessKey Secret field. Then, click OK.

      On the Static Accounts page, the created pair of static username and password appears. The password is masked.用户名密码

    7. In the Password column of the created pair of static username and password, click Display to view the password.

  4. Produce messages. Create, compile, and run Qava to produce messages.

    Important

    Before you compile and run ProducerTest.java to produce messages, you must configure the parameters that are described in the Parameters table.

    Table 3. Parameters

    Parameter

    Example

    Description

    hostName

    1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com

    The endpoint that is used to access the ApsaraMQ for RabbitMQ instance.

    Port

    5672

    The default port. Use port 5672 for non-encrypted connections and port 5671 for encrypted connections.

    userName

    MjoxODgwNzcwODY5MD****

    The static username that is generated in the ApsaraMQ for RabbitMQ console. ApsaraMQ for RabbitMQ encodes the AccessKey pair of your Alibaba Cloud account or a RAM user within the account and the ID of the ApsaraMQ for RabbitMQ instance in the Base64 format to generate the static username. You can obtain the static username on the Static Accounts page in the ApsaraMQ for RabbitMQ console.

    passWord

    NDAxREVDQzI2MjA0OT****

    The static password that is generated in the ApsaraMQ for RabbitMQ console. ApsaraMQ for RabbitMQ uses the HMAC-SHA1 algorithm to generate a signature based on the AccessKey secret of your Alibaba Cloud account or a RAM user within the account and the timestamp parameter that indicates the current system time. Then, ApsaraMQ for RabbitMQ encodes the signature and the timestamp parameter in the Base64 format to generate the static password. You can obtain the static password on the Static Accounts page in the ApsaraMQ for RabbitMQ console.

    virtualHost

    Test

    The vhost that you created in the ApsaraMQ for RabbitMQ instance. You can view the information about the vhost on the vhost Details page in the ApsaraMQ for RabbitMQ console. For more information, see View the statistics of a vhost.

    ExchangeName

    ExchangeTest

    The exchange that you created in the ApsaraMQ for RabbitMQ instance. You can use the instance ID and vhost name to fuzzy search the created exchange on the Exchanges page in the ApsaraMQ for RabbitMQ console.

    BindingKey

    BindingKeyTest

    The binding key that is used to bind the exchange to a queue in ApsaraMQ for RabbitMQ. You can view the binding of the exchange and obtain the binding key on the Exchanges page in the ApsaraMQ for RabbitMQ console.

    QueueName

    QueueTest

    The queue that you created in the ApsaraMQ for RabbitMQ instance. This parameter is required only when you subscribe to messages. You can view the binding of the exchange and obtain the queue to which the exchange is bound on the Exchanges page in the ApsaraMQ for RabbitMQ console.

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.AlreadyClosedException;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmCallback;
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyManagementException;
    import java.security.KeyStoreException;
    import java.security.NoSuchAlgorithmException;
    import java.util.UUID;
    import java.util.concurrent.ConcurrentNavigableMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.TimeoutException;
    
    public class Producer {
        // The endpoint of the ApsaraMQ for RabbitMQ instance. 
        public static final String hostName = "1880770****.mq-amqp.cn-hangzhou-a.aliyuncs.com";
        // The static username of the ApsaraMQ for RabbitMQ instance. 
        public static final String userName = "MjoxODgwNzcwODY5MD****";
        // The static password of the ApsaraMQ for RabbitMQ instance. 
        public static final String password = "NDAxREVDQzI2MjA0OT****";
        // The name of the vhost of the ApsaraMQ for RabbitMQ instance. 
        public static final String virtualHost = "vhost_test";
    
        // If you want to use port 5671, you must set the enableSSL parameter to true. 
        public static final int port = 5672;
        public static final boolean enableSSL = false;
    
        private Channel channel;
        private final ConcurrentNavigableMap<Long/*deliveryTag*/, String/*msgId*/> outstandingConfirms;
        private final ConnectionFactory factory;
        private final String exchangeName;
        private final String queueName;
        private final String bindingKey;
    
        public Producer(ConnectionFactory factory, String exchangeName, String queueName, String bindingKey) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
            this.factory = factory;
            this.outstandingConfirms = new ConcurrentSkipListMap<>();
            this.channel = factory.createChannel();
            this.exchangeName = exchangeName;
            this.queueName = queueName;
            this.bindingKey = bindingKey;
        }
    
        public static void main(String[] args) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
            // Create a connection factory. 
            ConnectionFactory factory = new ConnectionFactory(hostName, port, userName, password, virtualHost, enableSSL);
    
            // Initialize the producer. 
            Producer producer = new Producer(factory, "ExchangeTest", "QueueTest", "BindingKeyTest");
    
            // Declare the producer. 
            producer.declare();
    
            producer.initChannel();
    
            // Send messages. 
            producer.doSend("hello,amqp");
        }
    
        private void initChannel() throws IOException {
            channel.confirmSelect();
    
            ConfirmCallback cleanOutstandingConfirms = (deliveryTag, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag, true);
    
                    for (Long tag : confirmed.keySet()) {
                        String msgId = confirmed.get(tag);
                        System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, tag, true);
                    }
    
                    confirmed.clear();
                } else {
                    String msgId = outstandingConfirms.remove(deliveryTag);
                    System.out.format("Message with msgId %s has been ack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, false);
                }
            };
            channel.addConfirmListener(cleanOutstandingConfirms, (deliveryTag, multiple) -> {
                String msgId = outstandingConfirms.get(deliveryTag);
                System.err.format("Message with msgId %s has been nack-ed. deliveryTag: %d, multiple: %b%n", msgId, deliveryTag, multiple);
                // send msg failed, re-publish
            });
    
    
            channel.addReturnListener(returnMessage -> System.out.println("return msgId=" + returnMessage.getProperties().getMessageId()));
        }
    
        private void declare() throws IOException {
            channel.exchangeDeclare(exchangeName, "direct", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, bindingKey);
        }
        
    
        private void doSend(String content) throws IOException, TimeoutException, NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
            try {
                String msgId = UUID.randomUUID().toString();
                AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(msgId).build();
    
                channel.basicPublish(exchangeName, bindingKey, true, props, content.getBytes(StandardCharsets.UTF_8));
    
                outstandingConfirms.put(channel.getNextPublishSeqNo(), msgId);
            } catch (AlreadyClosedException e) {
                //need reconnect if channel is closed.
                String message = e.getMessage();
    
                System.out.println(message);
    
                if (channelClosedByServer(message)) {
                    factory.closeCon(channel);
                    channel = factory.createChannel();
                    this.initChannel();
                    doSend(content);
                } else {
                    throw e;
                }
            }
        }
    
        private boolean channelClosedByServer(String errorMsg) {
            if (errorMsg != null
                && errorMsg.contains("channel.close")
                && errorMsg.contains("reply-code=541")
                && errorMsg.contains("reply-text=InternalError")) {
                return true;
            } else {
                return false;
            }
        }
    }

Step 3: Test the event stream

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. On the Instances page, click the name of the instance that you created in Step 1: Create an event stream for the destination instance.

  4. In the Basic Information section of the Instance Details page, click the Message Query tab.

  5. Select Query by Message ID or Query by Queue and configure the Time Range parameter and other related parameters. Then, click Query.

    查询消息

  6. Check whether the queried message content is the same as the content of the message that was sent in Step 2: Use an SDK to send a message.