After a Message Queue for Apache Kafka event source is integrated with Function Compute by using EventBridge, the Message Queue for Apache Kafka trigger (Kafka trigger) can trigger associated functions. The functions can be used to perform custom operations on the messages that are published to Message Queue for Apache Kafka. This topic describes how to create a Kafka trigger in the Function Compute console, configure the input parameters, and write and test code.

Overview

After you submit a request to create a trigger in the Function Compute console, Function Compute creates event stream resources on the EventBridge side based on the trigger configurations.

After the trigger is created, you can view the trigger information in the Function Compute console. You can also view the information about the created resources in the EventBridge console. When a message is enqueued in the source Message Queue for Apache Kafka, the function in Function Compute is triggered. One or more message events are pushed to the function in batches based on your batch configurations.

Usage notes

  • The Message Queue for Apache Kafka instance that is the trigger source must be in the same region as the function in Function Compute.
  • When the number of created event streams reaches the upper limit, the Message Queue for Apache Kafka trigger cannot be created.
The following table describes the limit on the number of resources that can be created by using an Alibaba Cloud account in each region.
Item Upper limit
Number of event streams 30

Prerequisites

Step 1: Create a Kafka trigger

  1. Log on to the Function Compute console. In the left-side navigation pane, click Services & Functions.
  2. In the top navigation bar, select a region. On the Services page, click the desired service.
  3. On the Functions page, click the function that you want to manage.
  4. On the function details page, click the Triggers tab, select the version or alias from the Version or Alias drop-down list, and then click Create Trigger.
  5. In the Create Trigger panel, specify related parameters. After you specify the parameters, click OK.
    For more information about advanced settings, see Advanced features of EventBridge triggers.
    Category Parameter Description Example
    Basic settings Trigger Type Select Message Queue for Apache Kafka. Message Queue for Apache Kafka
    Name Enter a trigger name. kafka-trigger
    Version or Alias The default value is LATEST. If you want to create a trigger for another version or alias, switch to the specified version or alias in the upper-right corner of the function details page. For information about versions and aliases of a service, see Manage versions and Manage aliases. LATEST
    Kafka Instance Select a Message Queue for Apache Kafka instance from the drop-down list. alikafka_pre-cn-i7m2t7t1****
    Topic Select a topic of a Message Queue for Apache Kafka instance from the drop-down list. topic1
    Group ID Select a group ID of a Message Queue for Apache Kafka instance.
    Note Use a separate group ID to create the trigger. Do not use the group ID of an existing service. Otherwise, the receiving and sending of existing messages are affected.
    GID_group1
    Concurrent Consumption Tasks The number of concurrent consumers. Valid values: [1, number of partitions of a topic]. 2
    Consumer Offset Select a consumer offset of the message. A consumer offset specifies the point at which Message Queue for Apache Kafka starts to pull messages from the event bus.
    Valid values:
    • Earliest Offset: consumes messages from the earliest offset.
    • Latest Offset: consumes messages from the latest offset.
    Latest Offset
    Network Settings The type of the network over which you want to route the messages.
    Valid values:
    • Default Network: uses VPC ID and vSwitch ID that are specified when the Message Queue for Apache Kafka instance is deployed by default.
    • Internet: You need to select the VPC, vSwitch, and Security Group.
    Default Network
    Advanced settings Invocation Method Select a method to invoke the function.
    Valid values:
    • Synchronous Invocation: After an event triggers a function, Function Compute returns the execution result when the execution is complete. This is the default value. For more information about synchronous invocation, see Synchronous invocations.
    • Asynchronous Invocation: After an event triggers a function, Function Compute immediately returns a response and ensures that the function is executed at least once. However, the detailed execution result is not returned. This invocation method is suitable for functions that have higher scheduling latency. For more information about asynchronous invocation, see Overview.
    Synchronous Invocation
    Batch Push Batch push helps you aggregate multiple events. Batch push is triggered when the conditions of either Batch Push Messages or Batch Push Interval are met.

    For example, if you set Batch Push Messages to 100 and Batch Push Interval to 15 seconds, the batch push is triggered immediately when the number of messages reaches 100 in less than 15 seconds.

    Enable
    Batch Push Messages The maximum number of messages that are sent by each function invocation in a batch. Requests are sent when the number of backlog messages reaches the specified value. Valid values: [1, 500]. 1
    Batch Push Interval The interval for function invocation. The system sends the aggregated messages to Function Compute at the specified interval. Valid values: [0, 15] (unit: seconds). A value of 0 specifies that the system sends the messages to Function Compute in real time. 1
    Retry Policy The retry policy to be used when a message fails to be pushed.
    Valid values:
    • Backoff Retry: A message push request can be retried for up to three times at random intervals in the range of 10 to 20 seconds.
    • Exponential Decay Retry: A message push request can be retried for up to 176 times, and the retry lasts for a maximum of one day. The interval between each retry is increased by a factor of 2 up to a maximum of 512 seconds: 1, 2, 4, 8, ... 512 seconds.
    Backoff Retry
    Fault Tolerance Policy The method to handle errors.
    Valid values:
    • Fault Tolerance Allowed: Fault tolerance is allowed. Event processing is not blocked when an error occurs. The messages that fail after they are retried based on the retry policy are delivered to dead-letter queues or discarded based on your configurations.
    • Fault Tolerance Prohibited: Fault tolerance is prohibited. If an error occurs and the message fails after it is retried based on the retry policy, event processing is blocked.
    Fault Tolerance Allowed
    Dead-letter Queue The message queue to which events that are not processed or have exceeded the number of retries are sent. If you disable this feature, messages that have exceeded the number of retries specified by the retry policy are discarded. Enable Dead-letter Queue
    Queue Type The type of the dead-letter queue.
    Valid values:
    • MNS
    • Message Queue for Apache RocketMQ
    MNS
    Queue Name The name of the dead-letter queue. test-queue

    After the trigger is created, it is displayed on the Triggers tab. To modify or delete an existing trigger, see Manage triggers.

Step 2: Configure the input parameters of the function

A Message Queue for Apache Kafka event source is passed to the function in the form of event, which acts as the input parameter. You can manually pass event to the function to trigger the function.

  1. On the function details page, click the Code tab and click the xialatubiao icon. From the drop-down list that appears, select Configure Test Parameters.
  2. In the Configure Test Parameters panel, click the Create New Test Event or Modify Existing Test Event tab, and specify Event Name and the event content. After you specify the parameters, click OK.
    Sample code of event:
    [
        {
        "specversion": "1.0",
        "id": "8e215af8-ca18-4249-8645-f96c1026****",
        "source": "acs:alikafka",
        "type": "alikafka:Topic:Message",
        "subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-23T02:49:51.589Z",
        "aliyunaccountid": "164901546557****",
        "data": {
          "topic": "****",
          "partition": 7,
          "offset": 25,
          "timestamp": 1655952591589,
          "headers": {
            "headers": [],
            "isReadOnly": false
          },
          "key": "keytest",
          "value": "hello kafka msg"
          }
      },
        {
        "specversion": "1.0",
        "id": "8e215af8-ca18-4249-8645-f96c1026****",
        "source": "acs:alikafka",
        "type": "alikafka:Topic:Message",
        "subject": "acs:alikafka_pre-cn-i7m2t7t1****:topic:mytopic",
        "datacontenttype": "application/json; charset=utf-8",
        "time": "2022-06-23T02:49:51.589Z",
        "aliyunaccountid": "164901546557****",
        "data": {
          "topic": "****",
          "partition": 7,
          "offset": 25,
          "timestamp": 1655952591589,
          "headers": {
            "headers": [],
            "isReadOnly": false
          },
          "key": "keytest",
          "value": "hello kafka msg"
          }
      }
    ]

    For more information about the parameters defined in the CloudEvents specification, see Overview.

    The following table describes the parameters contained in data.

    Parameter Type Example Description
    topic String TopicName The name of the topic.
    partition Int 1 The partition number in the Message Queue for Apache Kafka instance.
    offset Int 0 The consumer offset of the Message Queue for Apache Kafka instance.
    timestamp String 1655952591589 The timestamp when the consumption started.

Step 3: Write and test the function code

After you create the trigger, you can write function code and test the function to verify that the code is correct. When Message Queue for Apache Kafka events occur, the trigger triggers function execution.

  1. On the function details page, click the Code tab, edit the function code in the code editor, and then click Deploy.
    The following uses the Node.js function code as an example.
    'use strict';
    /*
    To enable the initializer feature
    please implement the initializer function as below:
    exports.initializer = (context, callback) => {
      console.log('initializing');
      callback(null, '');
    };
    */
    exports.handler = (event, context, callback) => {
      console.log("event: %s", event);
      // Parse the event parameters and process the event. 
      callback(null, 'return result');
    }
  2. Click the Code tab and click Test Function.
    After the function is executed, you can view the result on the Code tab.

References

In addition to the Function Compute console, you can configure triggers by using the following methods:
  • Use Serverless Devs to configure triggers. For more information, see Serverless Devs.
  • Use SDKs to configure triggers. For more information, see Supported SDKs.

To modify or delete an existing trigger, see Manage triggers.