This topic describes how to push events from Message Queue for Apache RocketMQ to Function Compute by using EventBridge.

Note The connector feature is provided by EventBridge. For more information about the regions that support the feature, see Regions and endpoints.

Step 1: Create a connector

  1. Log on to the EventBridge console.
  2. In the top navigation bar, select a region.
  3. In the left-side navigation pane, click Custom Event Buses.
  4. On the Custom Event Buses page, find the required event bus and click Connectors in the Operations column.
  5. On the Connectors page, click Create Connector.
  6. In the Select Event Source step, click Message Queue for Apache RocketMQ.
  7. In the Configure Connector step, perform the following operations and click Create and Start:
    • Name: Enter the name of the connector.
    • Event Source: Select Message Queue for Apache RocketMQ.
    • Custom Event Bus: Select the custom event bus that you specify in Step 4.
    • Message Queue for Apache RocketMQ Instance: Select the created Message Queue for Apache RocketMQ instance.
    • Topic: Select the created topic.
    • Optional:Tag: Enter a tag that is used for filtering.
    • Consumer Offset: Select a consumer offset. The default value is the latest offset.

Step 2: Publish events

                           
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
            properties.put(PropertyKeyConst.AccessKey, "xxx");
            // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
            properties.put(PropertyKeyConst.SecretKey, "xxx");
            // Set a timeout period in milliseconds. 
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // Set a TCP Endpoint. You can view the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "xxx");

            Producer producer = ONSFactory.createProducer(properties);
            // Before you send a message, call the start method once to start the producer client. 
            producer.start();

            Message msg = new Message(
                    // The topic of the message. 
                    "TopicTestMQ",
                    // The message tag, which is similar to a Gmail tag. It is used to sort messages, and enables the consumer client to filter messages on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                    "TagA",
                    // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer clients must agree on the serialization and deserialization methods. 
                    "Hello MQ".getBytes());

            // The message key. We recommend that you specify a globally unique key. A unique key allows you to query and resend a message in the console if the message fails to be received. 
            // Note: Messages can be sent and received even if you do not set this parameter. 
            msg.setKey("ORDERID_100");

            // Send the message in asynchronous mode. The callback function returns the result to the client. 
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // The message is sent. 
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // Specify the logic to resend or persist the message if an error occurs. 
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // Obtain the msgId parameter before a callback occurs. 
            System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

            // Before you exit the application, shut down the producer object. Note: You can choose not to destroy the producer object. 
            producer.shutdown();
        }
    }                
                           
using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account based on the settings in the console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud Management Console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID that you created in the console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Set a TCP Endpoint. You can view the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // The log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: A producer instance is thread-secure and can be used to send messages to different topics. Each thread 
        // needs only one producer instance. 
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the consumer instance. 
        producer.start();

        // Create a message object. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}    
                           
#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // Create a producer instance and configure the parameters that are required to send messages. 
    ONSFactoryProperty factoryInfo;   
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "xxx");// The group ID that you created in the console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "xxx"); // Set a TCP Endpoint. You can view the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"xxx" );// The topic that you created in the console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "xxx");// The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "xxx");// The AccessKey ID that you created in the Alibaba Cloud Management Console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "xxx" );// The AccessKey secret that you created in the Alibaba Cloud Management Console. 


    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // Before you send a message, call the start method once to start the producer. 
    pProducer->start();

    Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            // The message tag, which is similar to a Gmail tag. It is used to sort messages, and enables the consumer client to filter messages on the Message Queue for Apache RocketMQ broker based on specified conditions. 
            "TagA",
            // The message body, which cannot be empty. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
            factoryInfo.getMessageContent()
    );

    // The message key. We recommend that you specify a globally unique key. 
    // A unique key allows you to query and resend a message in the console if the message fails to be received. 
    // Note: Messages can be sent and received even if you do not set this parameter. 
    msg.setKey("ORDERID_100");

    // Send the message. The message is sent if no error occurs. 
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // Specify the logic to process errors. 
    }
    // Before you exit your application, shut down the producer object. Otherwise, memory leakage may occur. 
    pProducer->shutdown();

    return 0;
}

Step 3: Create an event rule

  1. Log on to the EventBridge console.
  2. In the top navigation bar, select a region.
  3. In the left-side navigation pane, click Custom Event Buses.
  4. On the Custom Event Buses page, find the required event bus and click Rules in the Operations column.
  5. On the Rules page, click Create Rule.
  6. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Info step, enter a rule name in the Name field and a rule description in the Description field, and then click Next Step.
    2. In the Configure Event Pattern step, select Customized Pattern for Event Pattern Type, specify the event pattern in the Event Pattern Content section, and then click Next Step.

      For more information, see Event patterns.

    3. In the Configure Targets step, configure an event target.
      • Service type: Click Function Compute.
      • Service: Select the service that you created.
      • Function: Select the function that you created.
      • Event: Select the type of event transformer.
        • Complete Event: The complete data structure is delivered without conversion. The data structure is defined in the CloudEvents 1.0 protocol.
        • Partial Event: JSONPath is used to extract the content that needs to be delivered to the event target.
        • Constant: The event serves as a trigger. Only constants are delivered regardless of the event content.
        • Template: Events are routed based on a custom template. You can customize variables in the template.

          The following scripts shows examples of variables and a template.

          Variables:

          {
            "source":"$.source",
            "type":"$.type"
          }

          Template:

          The event comes from ${source},event type is ${type}.

        For more information, see Transform events.

      • Service Version and Alias: Select a service version and alias.
      Notice Make sure that the event target and the event rule are in the same region.
    4. Click Create.

Verify the result

To verify the result, you can view logs in the Function Compute console.

  1. Log on to the Function Compute console.
  2. In the top navigation bar, select a region.
  3. In the left-side navigation pane, click Service/Function.
  4. On the Service/Function page, click the service. Then, click the function name in the Function Name column on the Functions tab.
  5. On the page that appears, click the Log tab to view logs.
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****