All Products
Search
Document Center

Function Compute:ApsaraMQ for Kafka triggers

Last Updated:Feb 02, 2024

After ApsaraMQ for Kafka is integrated with Function Compute by using EventBridge, ApsaraMQ for Kafka triggers can be used to invoke functions. This way, you can invoke functions to process the messages that are published to ApsaraMQ for Kafka based on your business requirements. This topic describes how to create an ApsaraMQ for Kafka trigger, configure the input parameters of a function, and write and test function code in the Function Compute console.

Overview

After you submit a request to create a trigger in the Function Compute console, Function Compute automatically creates event streams in EventBridge based on the configurations of the trigger.

After the trigger is created, you can view the information about the trigger in the Function Compute console. You can also view the information about the created resources in the EventBridge console. When a message is queued in ApsaraMQ for Kafka, the associated function in Function Compute is invoked. One or more message events are pushed to the function in batches for processing based on your batch configurations.

Usage notes

  • The ApsaraMQ for Kafka instance that serves as the trigger source must reside in the same region as the function to be invoked in Function Compute.

  • If the number of created event streams reaches the upper limit, no more ApsaraMQ for Kafka triggers can be created. For more information about the limits on the number of event streams, see Limits.

Prerequisites

Step 1: Create an ApsaraMQ for Kafka trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Functions.

  2. In the top navigation bar, select a region. On the Functions page, click the function that you want to manage.

  3. On the function details page, click the Configurations tab. In the left-side navigation pane, click Triggers. Then, click Create Trigger.

  4. In the Create Trigger panel, configure the parameters and click OK.

    The following table describes the basic parameters.

    Parameter

    Description

    Example

    Trigger Type

    The type of the trigger. Select ApsaraMQ for Kafka.

    ApsaraMQ for Kafka

    Name

    The name of the trigger.

    kafka-trigger

    Version or Alias

    The version or alias of the trigger. Default value: LATEST. If you want to create a trigger for another version or alias, select a version or alias in the upper-right corner of the function details page. For more information about versions and aliases, see Manage versions and Manage aliases.

    LATEST

    ApsaraMQ for Kafka Instance

    The name of the existing ApsaraMQ for Kafka instance.

    alikafka_pre-cn-i7m2t7t1****

    Topic

    The name of the existing topic in the selected ApsaraMQ for Kafka instance.

    topic1

    Group ID

    The ID of the existing consumer group in the selected ApsaraMQ for Kafka instance.

    Note

    Use a separate consumer group ID to create the trigger. Do not use the consumer group ID of an existing service. Otherwise, the receiving and sending of existing messages are affected.

    GID_group1

    Concurrent Consumption Tasks

    The number of concurrent consumers. Valid values: [1, Number of partitions of a topic].

    2

    Consumer Offset

    The consumer offset of messages. A consumer offset specifies the point from which ApsaraMQ for Kafka starts to pull messages from EventBridge.

    Valid values:

    • Earliest Offset: consumes messages from the earliest offset.

    • Latest Offset: consumes messages from the latest offset.

    Latest Offset

    Network Settings

    The type of the network over which you want to route messages.

    Valid values:

    • Default Network: uses the VPC ID and vSwitch ID that are specified when the ApsaraMQ for Kafka instance is deployed by default.

    • Internet: You must specify the VPC, vSwitch, and Security Group parameters.

    Default Network

    Invocation Method

    The mode in which the function is invoked.

    Valid values:

    • Sync Invocation: This mode is suitable for sequential invocations. When an event or a batch of events invoke the function, Function Compute executes the function and waits for a response before it processes the next event or batch of events. The upper limit of the payload for a synchronous invocation request is 32 MB. For more information, see Synchronous invocations.

    • Async Invocation: This mode allows you to quickly consume events. When an event or a batch of events invoke the function, Function Compute immediately returns a response and continues to process the next event or batch of events. During this process, the function is executed in asynchronous mode. The upper limit of the payload for an asynchronous invocation request is 128 KB. For more information, see Overview.

    Sync Invocation

    Trigger State

    Specifies whether to enable the trigger immediately after it is created. By default, Enable Trigger is selected. The trigger is immediately enabled after it is created.

    N/A

    For more information about advanced configurations such as push settings, retry policies, and dead-letter queues, see Advanced features of triggers.

    After the trigger is created, it is displayed on the Triggers tab. To modify or delete a trigger, see Manage triggers.

Step 2: Configure the input parameters of the function

An ApsaraMQ for Kafka event is used to invoke a function in Function Compute. The parameters of the event are used as the input parameters of the function. You can manually pass the parameters of the event to invoke the function as a test.

  1. On the Code tab of the function details page, click the image.png icon next Test Function and select Configure Test Parameters from the drop-down list.

  2. In the Configure Test Parameters panel, click the Create New Test Event or Modify Existing Test Event tab, enter the event name and event content, and then click OK.

    The following sample code provides an example on the format of the event content:

    [
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        },
        {
            "specversion":"1.0",
            "id":"8e215af8-ca18-4249-8645-f96c1026****",
            "source":"acs:alikafka",
            "type":"alikafka:Topic:Message",
            "subject":"acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
            "datacontenttype":"application/json; charset=utf-8",
            "time":"2022-06-23T02:49:51.589Z",
            "aliyunaccountid":"164901546557****",
            "data":{
                "topic":"****",
                "partition":7,
                "offset":25,
                "timestamp":1655952591589,
                "headers":{
                    "headers":[
    
                    ],
                    "isReadOnly":false
                },
                "key":"keytest",
                "value":"hello kafka msg"
            }
        }
    ]

    For information about the parameters defined in the CloudEvents specification, see Overview.

    The following table describes the parameters contained in data.

    Parameter

    Type

    Example

    Details

    topic

    String

    TopicName

    The topic name.

    partition

    Int

    1

    The information about partitions on the ApsaraMQ for Kafka instance.

    offset

    Int

    0

    The message offset of the ApsaraMQ for Kafka instance.

    timestamp

    String

    1655952591589

    The timestamp that indicates when message consumption started.

Step 3: Write and test function code

After you create the trigger, you can write function code and test the function code to verify whether the code is valid. When an ApsaraMQ for Kafka event occurs, the trigger automatically invokes the function.

  1. On the function details page, click the Code tab, enter function code in the code editor, and then click Deploy.

    In this example, the function code is written in Node.js.

    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. Click Test Function.

References

To modify or delete an existing trigger, see Manage triggers.