After the Message Queue for Apache RocketMQ event source is integrated with Function Compute by using EventBridge, the Message Queue for Apache RocketMQ trigger (RocketMQ trigger) can trigger associated functions. The functions can be used to perform custom operations on the messages that are published to Message Queue for Apache RocketMQ. This topic describes how to create a RocketMQ trigger, configure function input parameters, and write and test code in the Function Compute console.

Overview

After you submit a request to create a trigger in the Function Compute console, Function Compute creates EventBridge resources based on the trigger configurations: Function Compute provides the Event Model and Event Stream Model to push messages. The models create the following resources:
After the trigger is created, you can view the trigger information in the Function Compute console. You can also view the information about the created resources in the EventBridge console. When a message is sent to a Message Queue for Apache RocketMQ instance that is used as a trigger source, function execution is triggered in Function Compute. Different push models support different parameters. For more information, see Step 2: Configure the input parameters of the function.
  • Event model: Each message is passed into a function as an event parameter. The event follows the specifications for CloudEvents. For information about the relationship between message content and CloudEvents, see Step 2: Configure the input parameters of the function.
  • Event stream model: One or more messages are passed into a function in batches based on your batch configurations. This model is suitable for scenarios in which end-to-end streaming data is processed.

Precautions

  • The Message Queue for Apache RocketMQ instance that is used as the trigger source must reside in the same region as the function in Function Compute.
  • When the number of created custom event buses and the number of created event rules reach the upper limits, you can no longer create a RocketMQ trigger of the event model.
  • When the number of created event streams reaches the upper limit, you can no longer create a RocketMQ trigger of the event stream model.
The following table describes the limits on the number of resources that can be created by using an Alibaba Cloud account in each region.
Item Upper limit
Number of custom event buses 10
Number of event rules created in each custom event bus 10
Number of event streams 30

Before you begin

Step 1: Create a trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Services & Functions.
  2. In the top navigation bar, select a region. On the Services page, click the desired service.
  3. On the Functions page, click the function that you want to manage.
  4. On the function details page, click the Triggers tab, select the version or alias from the Version or Alias drop-down list, and then click Create Trigger.
  5. In the Create Trigger panel, specify related parameters. After you specify the parameters, click OK.
    For information about the configurations of advanced features, see Advanced features of EventBridge triggers.
    Category Parameter Description Example
    Basic settings Trigger Type Select Message Queue for Apache RocketMQ from the drop-down list. Message Queue for Apache RocketMQ
    Name Enter a trigger name. rocketmq-trigger
    Version or Alias The default value is LATEST. If you want to create a trigger for another version or alias, switch to the specified version or alias in the upper-right corner of the function details page. For more information about versions and aliases of a service, see Manage versions and Manage aliases. LATEST
    Apache RocketMQ Instance Select a Message Queue for Apache RocketMQ instance from the drop-down list. MQ_INST_164901546557****_BX7****
    Topic Select a topic of the Message Queue for Apache RocketMQ instance from the drop-down list. topic1
    Tag Specify a tag for message filtering.

    The execution of a function is triggered only when a message that contains the specified filtering tag is received.

    tag
    Group ID Select the group ID of the created Message Queue for Apache RocketMQ instance. GID_group1
    Consumer Offset Select a consumer offset of the message. A consumer offset specifies the point at which Message Queue for Apache RocketMQ starts to pull messages from the event bus. Valid values:
    • Latest Offset: consumes messages from the latest offset.
    • Earliest Offset: consumes messages from the earliest offset.
    • Timestamp: consumes messages from the specified timestamp.
    Latest Offset
    Advanced settings Invocation Method Select a method to invoke the function.
    Valid values:
    • Synchronous Invocation: After an event triggers a function, Function Compute returns the execution result when the execution is complete. This is the default value. For more information, see Synchronous invocations.
    • Asynchronous Invocation: After an event triggers a function, Function Compute immediately returns a response and ensures that the function is executed at least once. However, the detailed execution result is not returned. This invocation method is suitable for functions that have higher scheduling latency. For more information about asynchronous invocation, see Overview.
    Synchronous Invocation
    Message Push Model The underlying application model when message data is pushed to Function Compute.
    Valid values:
    • Event Model: Each message is passed into a function as an event parameter. The event follows the specifications for CloudEvents. For information about the relationship between message content and CloudEvents, see Step 2: Configure the input parameters of the function.
    • Event Stream Model: One or more messages are passed into a function in batches based on your batch configurations. This model is suitable for scenarios in which end-to-end streaming data is processed.
    Event Model
    Batch Push

    This parameter is available only if you select Event Stream Model as the message push model.

    Batch push helps you aggregate multiple events. Batch push is triggered when the conditions of either Batch Push Messages or Batch Push Interval are met.

    For example, if you set Batch Push Messages to 100 and Batch Push Interval to 15 seconds, the batch push is triggered immediately when the number of messages reaches 100 in less than 15 seconds.

    Enable
    Batch Push Messages The maximum number of messages that are sent by each function invocation in a batch. Requests are sent when the number of backlog messages reaches the specified value. Valid values: 1 to 500. 1
    Batch Push Interval The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. A value of 0 indicates that messages are sent immediately after aggregation. 1
    Retry Policy

    This parameter is available only if you select Event Stream Model as the message push model.

    The retry policy to be used when a message fails to be pushed.

    Valid values:
    • Backoff Retry: A message push request can be retried for up to three times at random intervals in the range of 10 to 20 seconds.
    • Exponential Decay Retry: A message push request can be retried for up to 176 times, and the retry lasts for a maximum of one day. The interval between each retry is increased by a factor of 2 up to a maximum of 512 seconds: 1, 2, 4, 8, ... 512 seconds.
    Backoff Retry
    Fault Tolerance Policy

    This parameter is available only if you select Event Stream Model as the message push model.

    The method to handle errors.

    Valid values:
    • Fault Tolerance Allowed: Fault tolerance is allowed. Event processing is not blocked when an error occurs. The messages that fail after they are retried based on the retry policy are delivered to dead-letter queues or discarded based on your configurations.
    • Fault Tolerance Prohibited: Fault tolerance is prohibited. If an error occurs and the message fails after it is retried based on the retry policy, event processing is blocked.
    Fault Tolerance Allowed
    Dead-letter Queue

    This parameter is available only if you select Event Stream Model as the message push model.

    The message queue to which events that are not processed or have exceeded the number of retries are sent. If you disable this feature, messages that have exceeded the number of retries specified by the retry policy are discarded.

    Enable Dead-letter Queue
    Queue Type The type of the dead-letter queue.
    Valid values:
    • MNS
    • Message Queue for Apache RocketMQ
    MNS
    Queue Name The name of the dead-letter queue. test-queue

    After the trigger is created, it is displayed on the Triggers tab. To modify or delete an existing trigger, see Manage triggers.

Step 2: Configure the input parameters of the function

An Message Queue for Apache RocketMQ event source is passed to a function in the form of event that acts as an input parameter. You can manually pass event to a function to simulate a trigger event and test whether the code of the function is correct.

  1. On the function details page, click the Code tab and click the xialatubiao icon. From the drop-down list that appears, select Configure Test Parameters.
  2. In the Configure Test Parameters panel, click the Create New Test Event or Modify Existing Test Event tab, and specify Event Name and the event content. After you specify the parameters, click OK.
    Sample code for the event of the event model:
    {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
    }
    Sample code for the event of the event stream model:
    [
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        },
        {
        "id":"94ebc15f-f0db-4bbe-acce-56fb72fb****",
        "source":"RocketMQ-Function-rocketmq-trigger",
        "specversion":"1.0",
        "type":"mq:Topic:SendMessage",
        "datacontenttype":"application/json; charset=utf-8",
        "subject":"acs:mq:cn-hangzhou:164901546557****:MQ_INST_164901546557****_BXhFHryi%TopicName",
        "time":"2021-04-08T06:01:20.766Z",
        "aliyunaccountid":"164901546557****",
        "aliyunpublishtime":"2021-10-15T02:05:16.791Z",
        "aliyunoriginalaccountid":"164901546557****",
        "aliyuneventbusname":"RocketMQ-Function-rocketmq-trigger",
        "aliyunregionid":"cn-chengdu",
        "aliyunpublishaddr":"42.120.XX.XX",
        "data":{
            "topic":"TopicName",
            "systemProperties":{
                "MIN_OFFSET":"0",
                "TRACE_ON":"true",
                "MAX_OFFSET":"8",
                "MSG_REGION":"cn-hangzhou",
                "KEYS":"systemProperties.KEYS",
                "CONSUME_START_TIME":1628577790396,
                "TAGS":"systemProperties.TAGS",
                "INSTANCE_ID":"MQ_INST_164901546557****_BXhFHryi"
            },
            "userProperties":{
    
            },
            "body":"TEST"
        }
        }
    ]
    The following table describes the parameters in data. For information about the parameters that are defined in the CloudEvents specification, see Overview.
    Parameter Type Example Description
    topic String TopicName The name of the topic.
    systemProperties Map The system properties.
    MIN_OFFSET Int 0 The earliest offset.
    TRACE_ON Boolean true Indicates whether a message trace exists. Valid values:
    • true: A message trace exists.
    • false: No message trace exists.
    MAX_OFFSET Int 8 The latest offset.
    MSG_REGION String cn-hangzhou The region from which the message was sent.
    KEYS String systemProperties.KEYS The keys for filtering.
    CONSUME_START_TIME Long 1628577790396 The start time of consumption. Unit: milliseconds.
    UNIQ_KEY String AC14C305069E1B28CDFA3181CDA2**** The unique key of the message.
    TAGS String systemProperties.TAGS The tags for filtering.
    INSTANCE_ID String MQ_INST_123456789098****_BXhFHryi The ID of the instance.
    userProperties Map N/A The user properties.
    body String TEST The content of the message.

Step 3: Write the function code and test the function

After you create the trigger, you can write function code and test the function to verify that the code is correct. When Message Queue for Apache RocketMQ events are delivered to Function Compute by using EventBridge, the trigger triggers function execution.

  1. On the function details page, click the Code tab, edit the function code in the code editor, and then click Deploy.
    In this topic, Node.js is used as function code.
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. Click the Code tab and click Test Function.
    After the function is executed, you can view the result on the Code tab.

References

To modify or delete an existing trigger, see Manage triggers.