Alibaba Cloud Message Queue for RabbitMQ is developed based on the open-source AMQP protocol. This topic describes how to push data from Message Queue for RabbitMQ to Function Compute by using EventBridge.

Usage notes

The region that supports the connector feature is subject to the region where Message Queue for RabbitMQ is activated.

Step 1: Create a custom event source

  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, choose Event-driven Operations > Event Sources.
  3. In the top navigation bar, select a region.
  4. In the Add Custom Event Source section, click Message Queue for RabbitMQ.
  5. In the Add Custom Event Source panel, enter a name and a description, select a Message Queue for RabbitMQ instance, a vhost, a queue, and a custom event bus, and then click OK.

Step 2: Create an event rule

Notice The event targets that you want to add for an event rule must be in the same region as the event rule.
  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, choose Event-driven Operations > Event Rules.
  3. In the top navigation bar, select a region.
  4. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Info step, enter a rule name in the Name field and a rule description in the Description field, and click Next Step.
    2. In the Configure Event Pattern step, set Event Pattern Type to Customized Pattern, specify an event pattern in the Event Pattern Content field, and then click Next Step.

      For more information, see Event patterns.

    3. In the Configure Targets step, configure an event target. Then, click Create.
      Note You can add a maximum of five event targets for an event rule.
      • Service Type: Click Function Compute.
      • Service: Select the service that you created.
      • Function: Select the function that you created.
      • Event: Select the type of event transformer.
        • Complete Event: The complete data structure is routed without transformation. The data structure is defined in the CloudEvents 1.0 protocol.
        • Partial Event: JSONPath is used to extract the content that needs to be routed to the event target.
        • Constant: The event serves as a trigger. Only constants are routed regardless of the event content.
        • Template: The event is routed in the format that is defined in a custom template to the event target. You can customize variables in the template.

          The following part shows sample variables and a sample template:

          Sample variables:

          {
            "source":"$.source",
            "type":"$.type"
          }

          Sample template:

          The event comes from ${source},event type is ${type}.

        For more information, see Event transformation.

      • Service Version and Alias: Select a service version and alias.

Step 3: Publish an event

import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Specify an endpoint. You can view the endpoint of your instance on the Instances page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // Specify a username. You can view your username on the Static Accounts Alerting page in the Message Queue for RabbitMQ console. 
        factory.setUsername("${UserName}");
        // Specify the password of the username. You can view the password on the Static Accounts Alerting page in the Message Queue for RabbitMQ console. 
        factory.setPassword("${PassWord}");
        // Specify whether to turn on the automatic recovery switch. If you set it to true, the switch is turned on. If you set it to false, the switch is turned off. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Specify a vhost name. Make sure that the vhost is created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Specify a default port. Use port 5672 for an unencrypted connection and port 5671 for an encrypted connection. 
        factory.setPort(5672);
        // Specify timeout periods based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // Send a message. 
        for (int i = 0; i < 100; i++  ) {
            // Make sure that ${ExchangeName} already exists in the Message Queue for RabbitMQ console and that the exchange type is consistent with that in the console. 
            // Set BindingKey to a binding key based on your business requirements. 
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("messageBody"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import java.util.UUID;

public class ProducerTest {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        // Specify an endpoint. You can view the endpoint of your instance on the Instances page in the Message Queue for RabbitMQ console. 
        factory.setHost("xxx.xxx.aliyuncs.com");
        // Specify a username. You can view your username on the Static Accounts Alerting page in the Message Queue for RabbitMQ console. 
        factory.setUsername("${UserName}");
        // Specify the password of the username. You can view the password on the Static Accounts Alerting page in the Message Queue for RabbitMQ console. 
        factory.setPassword("${PassWord}");
        // Specify whether to turn on the automatic recovery switch. If you set it to true, the switch is turned on. If you set it to false, the switch is turned off. 
        factory.setAutomaticRecoveryEnabled(true);
        factory.setNetworkRecoveryInterval(5000);
        // Specify a vhost name. Make sure that the vhost is created in the Message Queue for RabbitMQ console. 
        factory.setVirtualHost("${VhostName}");
        // Specify a default port. Use port 5672 for an unencrypted connection and port 5671 for an encrypted connection. 
        factory.setPort(5672);
        // Specify timeout periods based on the network environment. 
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // Send a message. 
        for (int i = 0; i < 100; i++  ) {
            // Make sure that ${ExchangeName} already exists in the Message Queue for RabbitMQ console and that the exchange type is consistent with that in the console. 
            // Set BindingKey to a binding key based on your business requirements. 
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().messageId(UUID.randomUUID().toString()).build();
            channel.basicPublish("${ExchangeName}", "BindingKey", true, props,
                    ("messageBody"  + i).getBytes(StandardCharsets.UTF_8));
        }
        connection.close();
    }
}

Verify the result

To verify the result, you can view logs in the Function Compute console.

  1. Log on to the Function Compute console.
  2. In the top navigation bar, select a region.
  3. In the left-side navigation pane, click Services and Functions.
  4. On the Services and Functions page, click the service to which you routed the event in the Services pane.
  5. On the Functions tab, find the function to which you routed the event and click the name of the function in the Function Name column.
  6. On the page that appears, click the Logs tab to view logs.
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****