This topic describes how to push events from Message Queue for Apache RocketMQ to Function Compute by using EventBridge.

Step 1: Create a custom event source

  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, choose Event-driven Operations > Event Sources.
  3. In the top navigation bar, select a region.
  4. Click the Custom Event Sources tab. In the Add Custom Event Source section, click Message Queue for Apache RocketMQ.
  5. In the Add Custom Event Source panel, enter a name and a description, select a Message Queue for Apache RocketMQ instance, a topic, and a custom event bus, and then click OK.

Step 2: Create an event rule

Notice The event targets that you want to configure for an event rule must reside in the same region as the event rule.
  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, choose Event-driven Operations > Event Rules.
  3. In the top navigation bar, select a region.
  4. On the Event Rules page, select the created custom event bus from the Event Bus drop-down list and click Create Rule.
  5. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Info step, enter a rule name in the Name field and a rule description in the Description field, and click Next Step.
    2. In the Configure Event Pattern step, set the Event Source Type parameter to Custom Event Source, select the custom event source that is created in Step 1 from the Event Source drop-down list, specify an event pattern in the Event Pattern Content code editor, and then click Next Step.

      For more information, see Event patterns.

    3. In the Configure Targets step, configure an event target. Then, click Create.
      Note You can configure a maximum of five event targets for an event rule.
      • Service Type: Click Function Compute.
      • Service: Select the service that you created in Function Compute.
      • Function: Select the function that you created in Function Compute.
      • Event: Click Template.

        The following part shows sample variables and a sample template:

        Sample variables:

        {
          "source":"$.source",
          "type":"$.type"
        }

        Sample template:

        The event comes from ${source},event type is ${type}.

        For more information, see Event transformation.

      • Service Version and Alias: Select a service version or a service alias.
        • Default Version: The value is fixed to LATEST.
        • Specified Version: Select a service version. For more information, see Introduction to versions.
        • Specified Alias: Select a service alias. For more information, see Introduction to aliases.

Step 3: Publish an event

                           
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.OnExceptionContext;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendCallback;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;

    import java.util.Properties;

    public class ProducerTest {
        public static void main(String[] args) {
            Properties properties = new Properties();
            // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
            properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
            properties.put(PropertyKeyConst.SecretKey, "XXX");
            // Set a timeout period in milliseconds. 
            properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
            // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
            properties.put(PropertyKeyConst.NAMESRV_ADDR,
              "XXX");

            Producer producer = ONSFactory.createProducer(properties);
            // Before you send a message, call the start() method once to start the producer. 
            producer.start();

            Message msg = new Message(
                    // The topic of the message. 
                    "TopicTestMQ",
                    // The message tag, which is similar to a Gmail tag. It is used to sort messages and facilitates the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on specified conditions. 
                    "TagA",
                    // The message body in the binary format. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
                    "Hello MQ".getBytes());

            // The message key. We recommend that you specify a globally unique key. A unique key allows you to query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
            // Note: Messages can be sent and subscribed to even if you do not set this parameter. 
            msg.setKey("ORDERID_100");

            // Send the message in asynchronous mode. The callback function returns the result to the client. 
            producer.sendAsync(msg, new SendCallback() {
                @Override
                public void onSuccess(final SendResult sendResult) {
                    // The message is sent. 
                    System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
                }

                @Override
                public void onException(OnExceptionContext context) {
                    // Specify the logic to resend or persist the message if an error occurs. 
                    System.out.println("send message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
                }
            });

            // Obtain the msgId parameter before the callback function returns the result. 
            System.out.println("send message async. topic=" + msg.getTopic() + ", msgId=" + msg.getMsgID());

            // Before you exit the application, shut down the producer object. Note: You can choose not to shut down the producer object. 
            producer.shutdown();
        }
    }                
                           
using System;
using ons;

public class ProducerExampleForEx
{
    public ProducerExampleForEx()
    {
    }

    static void Main(string[] args) {
        // Configure your account based on the settings in the Alibaba Cloud Management Console. 
        ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
        // The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.AccessKey, "Your access key");
        // The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.SecretKey, "Your access secret");
        // The group ID that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "GID_example");
        // The topic that you created in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "T_example_topic_name");
        // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "NameSrv_Addr");
        // Specify the log path. 
        factoryInfo.setFactoryProperty(ONSFactoryProperty.LogPath, "C://log");

        // Create a producer instance. 
        // Note: A producer instance is thread-secure and can be used to send messages to different topics. Each thread 
         //requires only one producer instance.
        Producer producer = ONSFactory.getInstance().createProducer(factoryInfo);

        // Start the consumer instance. 
        producer.start();

        // Create a message object. 
        Message msg = new Message(factoryInfo.getPublishTopics(), "tagA", "Example message body");
        msg.setKey(Guid.NewGuid().ToString());
        for (int i = 0; i < 32; i++) {
            try
            {
                SendResultONS sendResult = producer.send(msg);
                Console.WriteLine("send success {0}", sendResult.getMessageId());
            }
            catch (Exception ex)
            {
                Console.WriteLine("send failure{0}", ex.ToString());
            }
        }

        // Before you exit your thread, shut down the producer instance. 
        producer.shutdown();

    }
}    
                           
#include "ONSFactory.h"
#include "ONSClientException.h"

using namespace ons;

int main()
{

    // Create a producer instance and set the parameters that are required to send messages. 
    ONSFactoryProperty factoryInfo;   
    factoryInfo.setFactoryProperty(ONSFactoryProperty::ProducerId, "XXX");// The group ID that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::NAMESRV_ADDR, "XXX"); // Set a TCP endpoint. You can query the TCP endpoint in the TCP Endpoint section of the Instance Details page in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::PublishTopics,"XXX" );// The topic that you created in the Message Queue for Apache RocketMQ console. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::MsgContent, "XXX");// The message content. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, "XXX");// The AccessKey ID that you created in the Alibaba Cloud Management Console for identity authentication. 
    factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, "XXX" );// The AccessKey secret that you created in the Alibaba Cloud Management Console for identity authentication. 


    //create producer;
    Producer *pProducer = ONSFactory::getInstance()->createProducer(factoryInfo);

    // Before you send a message, call the start() method once to start the producer. 
    pProducer->start();

    Message msg(
            //Message Topic
            factoryInfo.getPublishTopics(),
            // The message tag, which is similar to a Gmail tag. It is used to sort messages and facilitates the consumer to filter messages on the Message Queue for Apache RocketMQ broker based on specified conditions. 
            "TagA",
            // The message body, which cannot be empty. Message Queue for Apache RocketMQ does not process the message body. The producer and consumer must agree on the serialization and deserialization methods. 
            factoryInfo.getMessageContent()
    );

    // The message key. We recommend that you specify a globally unique key. 
    // A unique key allows you to query and resend a message in the Message Queue for Apache RocketMQ console if the message fails to be received. 
    // Note: Messages can be sent and subscribed to even if you do not set this parameter. 
    msg.setKey("ORDERID_100");

    // Send the message. The message is sent if no error occurs. 
    try
    {
        SendResultONS sendResult = pProducer->send(msg);
    }
    catch(ONSClientException & e)
    {
        // Specify the logic to process errors. 
    }
    // Before you exit your application, shut down the producer object. Otherwise, memory leaks may occur. 
    pProducer->shutdown();

    return 0;
}

Verify the results

To verify the results, you can view logs in the Function Compute console.

  1. Log on to the Function Compute console.
  2. In the top navigation bar, select the region where the service resides.
  3. In the left-side navigation pane, click Services and Functions.
  4. On the Services and Functions page, click the service to which you routed the event in the Services pane.
  5. On the Functions tab, find the function to which you routed the event and click the name of the function in the Function Name column.
  6. On the page that appears, click the Logs tab to view logs.
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****

FAQ

If an event fails to be published, you can view the response to the publishing request for troubleshooting. You can go to the EventBridge console and view the related information in the Event Delivery section of the Event Trace message. Then, take appropriate measures based on the response returned.

What can I do if an event fails to be published to Function Compute and the "[500]ConnectErrorconnectiontimedout" error is returned in the response?

You can resolve this issue by performing the following steps:
  1. Log on to the Function Compute console. Execute the function to which the event is routed and check the execution duration.
  2. If the execution duration is longer than 15s, check the network connection. If the execution duration is shorter than 15s, check whether you can access the endpoint for the region where the service to which the event is routed is deployed.
  3. If you cannot access the endpoint, contact Function Compute engineers to seek help.