All Products
Search
Document Center

Function Compute:ApsaraMQ for Kafka triggers

Last Updated:Nov 17, 2023

After you integrate ApsaraMQ for Kafka with Function Compute by using EventBridge, you can use ApsaraMQ for Kafka triggers to trigger the associated functions to perform custom operations on messages that are published to ApsaraMQ for Kafka. This topic describes how to create an ApsaraMQ for Kafka trigger and configure the input parameters of a function in the Function Compute console. This topic also describes how to write function code and test a function.

Overview

After you submit a request to create a trigger in the Function Compute console, Function Compute automatically creates event streams in EventBridge based on the configurations of the trigger.

After the trigger is created, you can view the information about the trigger in the Function Compute console. You can also view the information about the created resources in the EventBridge console. When a message is enqueued in ApsaraMQ for Kafka, the associated function in Function Compute is triggered, and the message is separately or batch pushed to the function for processing based on your batch configuration.

Limits

  • The ApsaraMQ for Kafka instance that is used as the trigger source must reside in the same region as the function in Function Compute.

  • If the number of event streams that are created reaches the upper limit, you can no longer create ApsaraMQ for Kafka triggers. For information about the upper limit of event streams, see Limits.

Before you start

Step 1: Create an ApsaraMQ for Kafka trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Services & Functions.
  2. In the top navigation bar, select a region. On the Services page, click the desired service.
  3. On the Functions page, click the function that you want to manage.
  4. On the function details page, click the Triggers tab, select the version or alias from the Version or Alias drop-down list, and then click Create Trigger.
  5. In the Create Trigger panel, configure the parameters and click OK.

    The following table describes the basic parameters.

    Parameter

    Description

    Example

    Trigger Type

    Select ApsaraMQ for Kafka.

    ApsaraMQ for Kafka

    Name

    Enter a trigger name.

    kafka-trigger

    Version or Alias

    Default value: LATEST. If you want to create a trigger for another version or alias, select a version or alias in the upper-right corner of the function details page. For more information about versions and aliases, see Manage versions and Manage aliases.

    LATEST

    ApsaraMQ for Kafka Instance

    Select the ApsaraMQ for Kafka instance that you created.

    alikafka_pre-cn-i7m2t7t1****

    Topic

    Select the topic that you created on the ApsaraMQ for Kafka instance.

    topic1

    Group ID

    Select the group that you created on the ApsaraMQ for Kafka instance.

    Note

    Use a unique group ID to create the trigger. Do not use the group ID of an existing service. Otherwise, the receiving and sending of existing messages are affected.

    GID_group1

    Concurrent Consumption Tasks

    Specify the number of concurrent consumers. Valid values: 1 to the number of partitions in a topic.

    2

    Consumer Offset

    Select the offset from which ApsaraMQ for Kafka starts to pull messages from EventBridge.

    Valid values:

    • Earliest Offset: pulls messages from the earliest offset.

    • Latest Offset: pulls messages from the latest offset.

    Latest Offset

    Invocation Method

    Select a function invocation method.

    Valid values:

    • Sync Invocation: This method is suitable for sequential invocations. When an event or a batch of events triggers the function, Function Compute runs the function and waits for a response before the function processes the next event or batch of events. The maximum payload of a synchronous invocation request is 32 MB. For more information, see Synchronous invocations.

    • Async Invocation: This method allows you to quickly consume events. When an event or a batch of events triggers the function, Function Compute immediately returns a response and continues processing the next event or batch of events. During this process, the function is executed in asynchronous mode. The maximum payload of an asynchronous invocation request is 128 KB. For more information, see Overview.

    Sync Invocation

    Max. Delivery Concurrency

    Specify the maximum number of ApsaraMQ for Kafka messages that can be concurrently delivered to Function Compute. Valid values: 1 to 300. This parameter is available only if you set the Invocation Method parameter to Sync Invocation. If you require higher concurrency, go to the Quota Center of EventBridge, find the quota whose name is EventStreaming FC Sink Maximum Concurrent Number of Synchronous Posting, and then click Apply in the Actions column to request a quota increase.

    1

    Trigger State

    Specify whether to enable the trigger immediately after it is created. By default, Enable Trigger is selected, which indicates that the trigger is enabled immediately after it is created.

    N/A

    For information about advanced configurations such as push settings, retry policies, and dead-letter queues, see Advanced features of triggers.

    After the trigger is created, it is displayed on the Triggers tab. To modify or delete an existing trigger, see Manage triggers.

Step 2: Configure the input parameters of the function

The ApsaraMQ for Kafka event source is passed to the function as an input parameter in the form of event. You can manually pass event to the function to simulate event triggering.

  1. On the function details page, click the Code tab and click the xialatubiao icon. From the drop-down list that appears, select Configure Test Parameters.
  2. In the Configure Test Parameters panel, click the Create New Test Event or Modify Existing Test Event tab, and specify an event name and the event content. Then, click OK.

    Sample code of event:

    [
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        },
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        }
    ]

    For more information about the parameters defined in the CloudEvents specification, see Overview.

    The following table describes the parameters contained in data.

    ParameterTypeExampleDescription
    topicStringTopicNameThe name of the topic.
    partitionInt1The partition number in the ApsaraMQ for Kafka instance.
    offsetInt0The consumer offset of the ApsaraMQ for Kafka instance.
    timestampString1655952591589The timestamp when the consumption started.

Step 3: Write the function code and test the function

After a trigger is created, you can write function code and test the function to verify whether the code is correct. When an event occurs in ApsaraMQ for Kafka, the function is automatically invoked by the trigger.

  1. On the function details page, click the Code tab, enter function code in the code editor, and then click Deploy.

    The following sample code provides an example on how to write function code in Node.js:

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. Click the Code tab and click Test Function.
    After the function is executed, you can view the result on the Code tab.

References

In addition to the Function Compute console, you can configure triggers by using the following methods:
  • Use Serverless Devs to configure triggers. For more information, see Serverless Devs.
  • Use SDKs to configure triggers. For more information, see SDKs.

To modify or delete an existing trigger, see Manage triggers.