All Products
Search
Document Center

EventBridge:Push events from ApsaraMQ for MQTT to Function Compute

Last Updated:Aug 11, 2023

This topic describes how to use EventBridge to push data from ApsaraMQ for MQTT to Function Compute.

Prerequisites

Before you start, make sure that the following operations are performed:

Usage notes

EventBridge does not allow you to pull events from topics in ApsaraMQ for MQTT. You can use the data outflow feature provided by ApsaraMQ for MQTT to forward data in the topics in ApsaraMQ for MQTT to the topics in ApsaraMQ for RocketMQ and then use ApsaraMQ for RocketMQ as a custom event source of EventBridge to integrate data in ApsaraMQ for MQTT into EventBridge.

Step 1: Create a data outflow rule

  1. Log on to the ApsaraMQ for MQTT console and click Instances in the left-side navigation pane.
  2. In the top navigation bar, select a region. On the Instances page, click the ID of the instance that you want to manage. The Instance Details page appears.
  3. In the left-side navigation pane, click Rules. In the upper-left corner of the Rules page, click Create Rule.
  4. On the Create Rule page, perform the following steps:
    1. In the Configure Basic Information step, configure the following parameters and click Next.
      ParameterExampleDescription
      Rule ID111111The ID of the rule. An ID is a globally unique identifier of a rule.
      • A rule ID can contain only letters, digits, hyphens (-), and underscores (_) and must contain at least one letter or digit.
      • A rule ID must be 3 to 64 characters in length. If the value contains more than 64 characters, it is automatically truncated.
      • The ID of a rule cannot be updated after the rule is created.
      Descriptionmigrate from rocketmqThe description of the rule.
      StatusEnableSpecifies whether to enable the current rule. Valid values:
      • Enable
      • Disable
      Rule TypeData OutboundThe type of the rule. Valid values:
    2. In the Configure Rule Source step, specify the data source and click Next.
      ParameterExampleDescription
      TopicTopicAThe source topic from which you want to export data. Specify a topic in ApsaraMQ for MQTT.
    3. In the Configure Rule Destination step, specify the destination to which the data is forwarded and click Create.
      ParameterExampleDescription
      Destination Service TypeMessage Queue for Apache RocketMQThe cloud service to which the data of the source topic is forwarded.
      Note Only ApsaraMQ for RocketMQ is supported.
      Message Queue for Apache RocketMQ InstanceMQ_INST_13801563067*****_BbyOD2jQThe ID of the ApsaraMQ for RocketMQ instance to which the data is forwarded.
      Note You can select only an instance that resides in the same region as the ApsaraMQ for MQTT instance.
      TopicTopicBThe ApsaraMQ for RocketMQ topic to which the data is forwarded. In this example, the data of the source topic is forwarded to Topic B.
    You can view the data outbound rule that you create on the Rules page.

Step 2: Create a custom event source

  1. Log on to the EventBridge console.
  2. In the left-side navigation pane, click Event Buses.
  3. In the top navigation bar, select the region.
  4. On the Event Buses page, click the name of the custom event bus that you want to manage.
  5. In the left-side navigation pane, click Event Sources.
  6. On the Event Source page, click Add Event Source.
  7. In the Add Custom Event Source panel, configure the Name and Description parameters, select Message Queue for Apache RocketMQ from the Event Provider drop-down list, select the created ApsaraMQ for RocketMQ resources, and then click OK.

Step 3: Create an event rule

Important The event targets that you want to configure for an event rule must reside in the same region as the event rule.
  1. Log on to the EventBridge console. In the left-side navigation pane, click Event Buses.
  2. In the top navigation bar, select a region. On the Event Buses page, click the name of the event bus that you want to manage.
  3. In the left-side navigation pane, click Event Rules. On the page that appears, click Create Rule.
  4. Perform the following operations in the Create Rule panel:
    1. In the Configure Basic Info step, enter a rule name in the Name field and a rule description in the Description field, and click Next Step.
    2. In the Configure Event Pattern step, set the Event Source Type parameter to Custom Event Source, select the custom event source that is created in Step 1 from the Event Source drop-down list, specify an event pattern in the Pattern Content code editor, and then click Next Step.

      For more information, see Event patterns.

    3. In the Configure Targets step, configure an event target and click Create.
      Note You can configure up to five event targets for an event rule.
      • Service Type: Select Function Compute.
      • Service: Select the service that you created in Function Compute.
      • Function: Select the function that you created in Function Compute.
      • Event: Select Template.

        The following part shows two sample variables and a sample template:

        Sample variables:

        {
          "source":"$.source",
          "type":"$.type"
        }

        Sample template:

        The event comes from ${source},event type is ${type}.

        For more information, see Event transformation.

      • Service Version and Alias: Select a service version or a service alias.
        • Default Version: The value is fixed to LATEST.
        • Specified Version: Select a service version. For more information, see Manage versions.
        • Specified Alias: Select a service alias. For more information, see Manage aliases.
      • Invocation Mode: Select Synchronous or Asynchronous. For more information, see Synchronous invocations and Overview.

Step 4: Publish an event

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

/**
 * The following sample code provides an example on how an ApsarMQ for MQTT client object sends messages to itself in signature authentication mode. Modify the parameters for client initialization based on your business requirements. 
 * In signature authentication mode, the AccessKey ID and AccessKey secret that are obtained from the Resource Access Management (RAM) console are used to calculate an independent signature for the authentication of the client object. 
 * In actual business scenarios, you can complete the signature process in a trusted environment because the AccessKey secret is private. 
 *
 * For information about the complete demo project, visit https://github.com/AliwareMQ/lmq-demo. 
 */
public class MQ4IoTProducerDemo {
    public static void main(String[] args) throws Exception {
        /**
         * The ID of the ApsaraMQ for MQTT instance. You can obtain the ID from the ApsaraMQ for MQTT console after you purchase the instance. 
         */
        String instanceId = "XXXXX";
        /**
         * The endpoint of the ApsaraMQ for MQTT instance. You can obtain the endpoint from the ApsaraMQ for MQTT console after you purchase and configure the instance. You must use the assigned domain name instead of the IP address to connect to the instance. Otherwise, an exception may occur on the client object. 
         */
        String endPoint = "XXXXX.mqtt.aliyuncs.com";
        /**
         * The AccessKey ID. You can obtain the AccessKey ID from the RAM console. 
         */
        String accessKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
        /**
         * The AccessKey secret. You can obtain the AccessKey secret from the RAM console. The secretKey parameter is required only in signature authentication mode. 
         */
        String secretKey = System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
        /**
         * The ID that the system assigns to the ApsaraMQ for MQTT client object. Each TCP connection must use a unique client ID. If different TCP connections use the same client ID, an exception occurs and the connections are closed. 
         * The value of the clientId parameter is in the GroupID@@@DeviceID format. GroupID is the ID of the consumer group that you created in the ApsaraMQ for MQTT console, and DeviceID is the custom ID of the device. The value of the clientId parameter can be up to 64 characters in length. 
         */
        String clientId = "GID_XXXXX@@@XXXXX";
        /**
         * The parent topic that you created in the ApsaraMQ for MQTT console. 
         * If you specify a topic that does not exist or a topic that the ApsaraMQ for MQTT client object is not authorized to access, the ApsaraMQ for MQTT broker closes the connection. 
         */
        final String parentTopic = "XXXXX";
        /**
         * ApsaraMQ for MQTT allows you to use a subtopic to filter messages. You can specify a string as a subtopic name. 
         */
        final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";
        /**
         * The quality of service (QoS) level in message transmission. Valid values: 0, 1, and 2. 
         */
        final int qosLevel = 0;
        ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);
        final MemoryPersistence memoryPersistence = new MemoryPersistence();
        /**
         * The protocol and port that are used by the ApsaraMQ for MQTT client object must match. 
         * If Secure Sockets Layer (SSL) encryption is enabled, the protocol and port must be specified as ssl://endpoint:8883. 
         */
        final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);
        /**
         * The timeout period for the ApsaraMQ for MQTT client object to wait for a response. After the timeout period ends, the client object no longer waits for a response. 
         */
        mqttClient.setTimeToWait(5000);
        final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
        mqttClient.setCallback(new MqttCallbackExtended() {
            @Override
            public void connectComplete(boolean reconnect, String serverURI) {
                /**
                 * The topic to which the consumer must subscribe at the earliest opportunity after the client connection is established. 
                 */
                System.out.println("connect success");
            }

            @Override
            public void connectionLost(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
                /**
                 * The callback that is invoked to consume messages. Make sure that the callback does not throw exceptions. If a response is returned for the callback, the messages are consumed. 
                 * Messages must be consumed in the specified period. If specific messages are not consumed within the timeout period specified by the ApsaraMQ for MQTT broker, the broker may attempt to resend the messages. Make sure that deduplication is performed to ensure idempotence for message consumption. For information about the specified timeout period, see Limits. 
                 */
                System.out.println(
                    "receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);
            }
        });
        mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());
        for (int i = 0; i < 10; i++) {
            MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());
            message.setQos(qosLevel);
            /**
             * The topic to which messages are sent. If the messages that are sent are normal messages, this topic must be the topic to which the consumer subscribes or the topic that can be matched by using wildcards. 
             */
            mqttClient.publish(mq4IotTopic, message);
            /**
             * ApsaraMQ for MQTT supports point-to-point (P2P) messaging. If the producer sends a message to an only consumer and obtains the value of the clientId parameter, a P2P message is sent. 
             * In P2P messaging, the consumer does not need to subscribe to the topic to which the producer sends messages. This way, the consumption logic is simplified. In P2P messaging, a topic must be specified in the {{parentTopic}}/p2p/{{targetClientId}} format. 
             */
            String receiverId = "xxx";
            final String p2pSendTopic = parentTopic + "/p2p/" + receiverId;
            message = new MqttMessage("hello mq4Iot p2p msg".getBytes());
            message.setQos(qosLevel);
            mqttClient.publish(p2pSendTopic, message);
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

Verify the results

To verify the results, you can view logs in the Function Compute console.

  1. Log on to the Function Compute console.
  2. In the left-side navigation pane, click Services & Functions.
  3. In the top navigation bar, select a region.
  4. On the Services page, find the service to which you routed the event and click Functions in the Actions column.
  5. On the Functions page, click the name of the function to which you routed the event.
  6. On the Function Details page, click the Logs tab to view logs.
    FC Invoke Start RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6****
    2020-11-19T11:11:34.161Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** [verbose] Receive Event v2 ==> The event comes from aliyun.ui,event type is ui:Created:PostObject.
    2020-11-19T11:11:34.167Z c2be67a7-fh1a-9619-ei4c-3c04gcf6c**** 
    FC Invoke End RequestId: c2be67a7-fh1a-9619-ei4c-3c04gcf6c****

FAQ

How can I locate the issue if the event fails to be published?

If an event fails to be published, you can view the response to the publishing request for troubleshooting. You can go to the EventBridge console and view the related information in the Event Delivery section of the Event Trace message. Then, take appropriate measures based on the response returned.

What can I do if an event fails to be published to Function Compute and the "[500]ConnectErrorconnectiontimedout" error is returned in the response?

You can perform the following steps:
  1. Log on to the Function Compute console. Execute the function to which the event is routed and check the execution duration.
  2. If the execution duration is longer than 15s, check the network connection. If the execution duration is shorter than 15s, check whether you can access the endpoint for the region where the service to which the event is routed is deployed.
  3. If you cannot access the endpoints of the region where Function Compute is deployed, contact the Function Compute engineers.