This topic describes how to push events from Message Queue for MQTT to Function Compute by using EventBridge.

Step 1: Create a data outbound rule

  1. Log on to the Message Queue for MQTT console.
  2. In the left-side navigation pane, click Instances.
  3. In the top navigation bar, select a region.
  4. Find the instance for which you want to create a data outbound rule in the instance list. In the Actions column, choose More > Rules.
  5. In the upper-left corner of the Rules page, click Create Rule.
  6. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Information step, set the following parameters and click Next.
      Parameter Example Description
      Rule ID 111111 The global unique identifier of a rule.
      • It can contain only letters, digits, hyphens (-), and underscores (_) and must contain at least one letter or digit.
      • It must be 3 to 64 characters in length. If the value contains more than 64 characters, it is automatically truncated.
      • It cannot be updated after the rule is created.
      Description migrate from rocketmq The description of the rule.
      Status Enable Specifies whether to enable the current rule. Valid values:
      • Enable
      • Disable
      Rule Type Data Outbound The type of the rule. Valid values:
    2. In the Configure Rule Source step, specify the data source and click Next.
      Parameter Example Description
      Topic TpoicA The source topic from which you want to export data. Specify a topic in Message Queue for MQTT.
    3. In the Configure Rule Destination step, specify the destination to which the data is forwarded and click Create.
      Parameter Example Description
      Destination Service Type Message Queue for Apache RocketMQ The cloud service to which the data of the source topic is forwarded.
      Note Only Message Queue for Apache RocketMQ is supported.
      Message Queue for Apache RocketMQ Instance MQ_INST_13801563067*****_BbyOD2jQ The ID of the Message Queue for Apache RocketMQ instance to which the data is forwarded.
      Topic TopicB The Message Queue for Apache RocketMQ topic to which the data is forwarded. In this example, the data of the source topic is forwarded to TopicB.
    You can view the data outbound rule that you create on the Rules page.

Step 2: Create a custom event source

  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, choose Event-driven Operations > Event Sources.
  3. In the top navigation bar, select a region.
  4. Click the Custom Event Sources tab. In the Add Custom Event Source section, click Message Queue for Apache RocketMQ.
  5. In the Add Custom Event Source panel, enter a name and a description, select a Message Queue for Apache RocketMQ instance, a topic, and a custom event bus, and then click OK.

Step 3: Create an event rule

Notice The event targets that you want to configure for an event rule must reside in the same region as the event rule.
  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, choose Event-driven Operations > Event Rules.
  3. In the top navigation bar, select a region.
  4. On the Event Rules page, select the created custom event bus from the Event Bus drop-down list and click Create Rule.
  5. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Info step, enter a rule name in the Name field and a rule description in the Description field, and click Next Step.
    2. In the Configure Event Pattern step, set the Event Source Type parameter to Custom Event Source, select the custom event source that is created in Step 1 from the Event Source drop-down list, specify an event pattern in the Event Pattern Content code editor, and then click Next Step.

      For more information, see Event patterns.

    3. In the Configure Targets step, configure an event target. Then, click Create.
      Note You can configure a maximum of five event targets for an event rule.
      • Service Type: Click Function Compute.
      • Service: Select the service that you created in Function Compute.
      • Function: Select the function that you created in Function Compute.
      • Event: Click Template.

        The following part shows sample variables and a sample template:

        Sample variables:

        {
          "source":"$.source",
          "type":"$.type"
        }

        Sample template:

        The event comes from ${source},event type is ${type}.

        For more information, see Event transformation.

      • Service Version and Alias: Select a service version or a service alias.
        • Default Version: The value is fixed to LATEST.
        • Specified Version: Select a service version. For more information, see Introduction to versions.
        • Specified Alias: Select a service alias. For more information, see Introduction to aliases.

Step 4: Publish an event

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * The following sample code provides an example on how a Message Queue for MQTT client object sends messages to itself in signature authentication mode. Modify the parameters for client initialization as needed. 
 * In signature authentication mode, the AccessKey ID and AccessKey secret obtained from the Resource Access Management (RAM) console are used to calculate an independent signature for the authentication of the client object. 
 * In actual business scenarios, you can complete the signature process in a trusted environment because the AccessKey secret is private. 
 *
 * For more information about the complete demo project, see https://github.com/AliwareMQ/lmq-demo. 
 */
public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * The ID of the Message Queue for MQTT instance. You can obtain the ID from the Message Queue for MQTT console after you purchase the instance. 
         */
        String instanceId = "XXXXX";
        /**
         * The endpoint of the Message Queue for MQTT instance. You can obtain the endpoint after you purchase and configure the instance. You must use the assigned domain name instead of the IP address to connect to the instance. Otherwise, an exception may occur on the client object. 
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * The AccessKey ID, which can be obtained from the RAM console. 
         */
        String accessKey = "XXXXX";
        /**
         * The AccessKey secret, which can be obtained from the RAM console. The secretKey parameter is required only for the signature authentication mode. 
         */
        String secretKey = "XXXXX";
        /**
         * The globally unique ID that the system assigns to the Message Queue for MQTT client object. Each TCP connection must use a unique client ID. If different TCP connections use the same client ID, an exception occurs and the connections are closed. 
         * The value of the clientId parameter is in the format of GroupID@@@DeviceID. GroupID is the group ID that you created in the Message Queue for MQTT console, and DeviceID is the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length. 
         */
        String clientId = "GID_XXXXX@@@XXXXX";
        /**
         * The parent topic that you created in the Message Queue for MQTT console. 
         * If you specify a topic that is not created or a topic that the Message Queue for MQTT client object is not authorized to access, the Message Queue for MQTT broker closes the connection. 
         */
        final String parentTopic = "XXXXX";
        /**
         * Message Queue for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic, as shown in the following code. 
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. 
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * The protocol and port used by the Message Queue for MQTT client object must match. 
         * If SSL encryption is enabled, the protocol and port must be specified in the format of ssl://endpoint:8883. 
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * The timeout period for the Message Queue for MQTT client object to wait for a response. After the timeout period ends, the client object no longer waits for a response. 
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * After a connection is established, the Message Queue for MQTT client object must subscribe to the required topic as soon as possible. 
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * The callback that is triggered to consume the messages that are sent. Make sure that the callback has no exception. If a response is returned for the callback, the messages are consumed. 
                 * The messages must be consumed within a specified period of time. If the time used to consume the messages exceeds the timeout period that is specified by the Message Queue for MQTT broker, the Message Queue for MQTT broker may retry sending the messages. Make sure that the idempotence of the business data is kept and the business data is de-duplicated. For more information about the specified timeout period, see Limits. 
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             * The topic to which messages are sent. This topic must be the one to which the consumer subscribes or be able to be matched by using wildcard characters. 
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * Message Queue for MQTT supports point-to-point (P2P) messaging. When the producer sends a message to the only consumer and obtains the value of the clientId parameter, a P2P message is sent. 
             * In P2P messaging, the consumer does not need to subscribe to the topic to which the producer sends messages. The logic at the consumer side is simplified. In P2P messaging, a topic must be specified in the format of {{parentTopic}}/p2p/{{targetClientId}}. 
             */
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

Verify the results

To verify the results, you can view logs in the Function Compute console.

  1. Log on to the Function Compute console.
  2. In the top navigation bar, select the region where the service resides.
  3. In the left-side navigation pane, click Services and Functions.
  4. On the Services and Functions page, click the service to which you routed the event in the Services pane.
  5. On the Functions tab, find the function to which you routed the event and click the name of the function in the Function Name column.
  6. On the page that appears, click the Logs tab to view logs.
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****

FAQ

If an event fails to be published, you can view the response to the publishing request for troubleshooting. You can go to the EventBridge console and view the related information in the Event Delivery section of the Event Trace message. Then, take appropriate measures based on the response returned.

What can I do if an event fails to be published to Function Compute and the "[500]ConnectErrorconnectiontimedout" error is returned in the response?

You can resolve this issue by performing the following steps:
  1. Log on to the Function Compute console. Execute the function to which the event is routed and check the execution duration.
  2. If the execution duration is longer than 15s, check the network connection. If the execution duration is shorter than 15s, check whether you can access the endpoint for the region where the service to which the event is routed is deployed.
  3. If you cannot access the endpoint, contact Function Compute engineers to seek help.