This topic describes how to create a Function Compute sink connector to synchronize data from a data source topic in your Message Queue for Apache Kafka instance to a function in Function Compute.

Prerequisites

Before you create a Function Compute sink connector, make sure that the following requirements are met:
  • The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
  • A data source topic is created in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.

    A topic named fc-test-input is used in this example.

  • A function is created in Function Compute. For more information, see Create a function in the Function Compute console.
    Notice The function that you create must be an event function.

    An event function named hello_world is used in this example. This is an event function in the guide-hello_world service that runs in the Python runtime environment. The following information shows the sample code of this function:

    # -*- coding: utf-8 -*-
    import logging
    
    # To enable the initializer feature
    # please implement the initializer function as below:
    # def initializer(context):
    #   logger = logging.getLogger()
    #   logger.info('initializing')
    
    def handler(event, context):
      logger = logging.getLogger()
      logger.info('hello world:' + bytes.decode(event))
      return 'hello world:' + bytes.decode(event)

Procedure

To synchronize data from a data source topic in your Message Queue for Apache Kafka instance to a function in Function Compute by using a Function Compute sink connector, perform the following steps:

  1. Optional:Enable Function Compute sink connectors to access Function Compute across regions.
    Notice Skip this step if you do not want Function Compute sink connectors to access Function Compute across regions.

    Enable Internet access for Function Compute sink connectors

  2. Optional:Enable Function Compute sink connectors to access Function Compute across Alibaba Cloud accounts.
    Notice Skip this step if you do not want Function Compute sink connectors to access Function Compute across Alibaba Cloud accounts.
  3. Optional:Create the topics and consumer group that are required by a Function Compute sink connector.Group
    Notice
    • If you do not want to customize the names of the topics and consumer group, skip this step.Group
    • Partial topics that are required by a Function Compute sink connector must use a local storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, you cannot manually create topics that use a local storage engine. In the major version 0.10.2, these topics must be automatically created.
    1. Create the topics that are required by a Function Compute sink connector
    2. Create the group that is required by a Function Compute sink connector
  4. Create and deploy a Function Compute sink connector
  5. Verify the results.
    1. Send a test message
    2. View function logs

Enable Internet access for Function Compute sink connectors

If you want Function Compute sink connectors to access other Alibaba Cloud services across regions, enable Internet access for Function Compute sink connectors. For more information, see Enable Internet access for a connector.

Create a custom policy

You can create a custom policy to access Function Compute by using the Alibaba Cloud account within which you want to use Function Compute.

  1. Log on to the Resource Access Management (RAM) console.
  2. In the left-side navigation pane, choose Permissions > Policies.
  3. On the Policies page, click Create Policy.
  4. On the Create Custom Policy page, create a custom policy.
    1. In the Policy Name field, enter KafkaConnectorFcAccess.
    2. Set the Configuration Mode parameter to Script.
    3. In the Policy Document field, enter the custom policy script.
      The following sample code provides an example of the custom policy script for access to Function Compute:
      {
          "Version": "1",
          "Statement": [
              {
                  "Action": [
                      "fc:InvokeFunction",
                      "fc:GetFunction"
                  ],
                  "Resource": "*",
                  "Effect": "Allow"
              }
          ]
      }
    4. Click OK.

Create a RAM role

You can create a RAM role by using the Alibaba Cloud account within which you want to use Function Compute. When you create a RAM role, select a supported Alibaba Cloud service as the trusted service. You cannot select Message Queue for Apache Kafka as the trusted service of the RAM role. After you create the RAM role, you can modify the trust policy of the created RAM role.

  1. In the left-side navigation pane, choose Identities > Roles.
  2. On the Roles page, click Create Role.
  3. In the Create Role panel, create a RAM role.
    1. Set the Select Trusted Entity parameter to Alibaba Cloud Service and click Next.
    2. Set the Role Type parameter to Normal Service Role. In the RAM Role Name field, enter AliyunKafkaConnectorRole. From the Select Trusted Service drop-down list, select Function Compute. Then, click OK.
  4. On the Roles page, find and click AliyunKafkaConnectorRole.
  5. On the AliyunKafkaConnectorRole page, click the Trust Policy Management tab. On the tab that appears, click Edit Trust Policy.
  6. In the Edit Trust Policy panel, replace fc in the script with alikafka and click OK.
    AliyunKafkaConnectorRole

Grant permissions to the RAM role

You can grant the created RAM role the permissions to access Function Compute by using the Alibaba Cloud account within which you want to use Function Compute.

  1. In the left-side navigation pane, choose Identities > Roles.
  2. On the Roles page, find AliyunKafkaConnectorRole and click Add Permissions in the Actions column.
  3. In the Add Permissions panel, attach the KafkaConnectorFcAccess policy to the RAM role.
    1. In the Select Policy section, click Custom Policy.
    2. In the Authorization Policy Name column, find and click KafkaConnectorFcAccess.
    3. Click OK.
    4. Click Complete.

Create the topics that are required by a Function Compute sink connector

In the Message Queue for Apache Kafka console, you can manually create the five topics that are required by a Function Compute sink connector. The five topics are the offset storage topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in the partition count and storage engine. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
    Notice You must create a topic in the region where your application resides. This means that you must select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if a topic is created in the China (Beijing) region, the message producer and consumer must run on ECS instances in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, set the properties of the topic and click OK.
    Create a topic
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions in the topic. 12
    Storage Engine The storage engine of the topic.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If this option is selected, disks provided by Alibaba Cloud are used and three replicas are stored in distributed mode. This type of storage engine features low latency, high performance, persistence, and high durability. If the disk type of your instance is Standard (High Write), you can select only Cloud Storage.
    • Local Storage: If this option is selected, the in-sync replicas (ISR) algorithm of open source Apache Kafka is used and three replicas are stored in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages may be out of order. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages are stilled stored in the same partition in the order they are sent. However, specific messages in the partition cannot be sent until the partition is restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must set the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka supports the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If the remaining disk space is sufficient, messages are retained for the maximum retention period. If disk usage exceeds 85%, the disk space is insufficient, and earlier messages are deleted to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. If the keys of different messages are the same, messages that have the latest key values are retained. This policy applies to scenarios in which the system is recovered from a system failure, or the cache is reloaded after a system restart. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the system status information or configuration information in a log-compacted topic.
      Notice Log-compacted topics are generally used only in specific ecosystem components, such as Kafka Connect or Confluent Schema Registry. Do not use this log cleanup policy for a topic that is used to send and subscribe to messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags to be attached to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Create the group that is required by a Function Compute sink connector

In the Message Queue for Apache Kafka console, you can manually create the consumer group that is required by a Function Compute sink connector.Group The name of the consumer group must be in the connect-Task name format. For more information, see Table 1.Group

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Groups.
  5. On the Groups page, click Create Group.
  6. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK.
    After the group is created, it is displayed on the Groups page.

Create and deploy a Function Compute sink connector

You can create and deploy a Function Compute sink connector that synchronizes data from Message Queue for Apache Kafka to Function Compute.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Connectors.
  5. On the Connectors page, click Create Connector.
  6. Complete the Create Connector wizard.
    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • Each connector name must be unique within a Message Queue for Apache Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format.Group If you have not created such a consumer group, Message Queue for Apache Kafka automatically creates one for you.Group

      kafka-fc-sink
      Instance The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are described in the following table, and then click Next.
      Note If you have created the required topics and consumer group, set the Resource Creation Method parameter to Manual and enter the names of the created resources in the fields below.Group Otherwise, set the Resource Creation Method parameter to Auto.
      Table 1. Parameters in the Configure Source Service step
      Parameter Description Example
      Data Source Topic The name of the data source topic from which data is to be synchronized. fc-test-input
      Consumer Thread Concurrency The number of concurrent consumer threads used to synchronize data from the data source topic. Default value: 6. Valid values:
      • 6
      • 12
      6
      Consumer Offset The offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC ID The ID of the virtual private cloud (VPC) where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the Message Queue for Apache Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch ID The ID of the vSwitch where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the Message Queue for Apache Kafka instance. The default value is the vSwitch ID that you specified when you deployed the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Failure Handling Policy Specifies whether to retain the subscription to the partition where an error occurs after the relevant message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and returns the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and returns the logs.
      Note
      • For more information about how to view the connector logs, see View connector logs.
      • For more information about how to troubleshoot errors based on error codes, see Error codes.
      • To resume the subscription to the partition where an error occurs, submit a ticket to the technical support of Message Queue for Apache Kafka.
      Continue Subscription
      Resource Creation Method The method to create the topics and consumer group that are required by the Function Compute sink connector.Group Click Configure Runtime Environment to display the parameter.
      • Auto
      • Manual
      Auto
      Connector Consumer Group The consumer group that is required by the data synchronization task of the connector.Group Click Configure Runtime Environment to display the parameter. The name of this consumer group must be in the connect-Task name format.Group connect-kafka-fc-sink
      Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-offset-kafka-fc-sink
      Task Configuration Topic The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-config-kafka-fc-sink
      Task Status Topic The topic that is used to store the task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-status-kafka-fc-sink
      Dead-letter Queue Topic The topic that is used to store the error data of the connector framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-fc-sink
      Error Data Topic The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-fc-sink
    3. In the Configure Destination Service step, select Function Compute as the destination service, set the parameters that are described in the following table, and then click Create.
      Parameter Description Example
      Cross-account/Cross-region Specifies whether the Function Compute sink connector synchronizes data to Function Compute across Alibaba Cloud accounts or regions. Default value: No. Vaild values:
      • No: The Function Compute sink connector synchronizes data to Function Compute in the same region by using the same Alibaba Cloud account.
      • Yes: The Function Compute sink connector synchronizes data to Function Compute across regions by using the same Alibaba Cloud account, in the same region by using different Alibaba Cloud accounts, or across regions by using different Alibaba Cloud accounts.
      No
      Region The region where Function Compute is activated. By default, the region where the Function Compute sink connector resides is selected. To synchronize data across regions, enable Internet access for the connector and select the destination region. For more information, see Enable Internet access for Function Compute sink connectors.
      Notice The Region parameter is displayed only if you set the Cross-account/Cross-region parameter to Yes.
      cn-hangzhou
      Service Endpoint The endpoint of Function Compute. In the Function Compute console, you can view the endpoints of Function Compute in the Common Info section of the Overview page.
      • Internal endpoint: We recommend that you use the internal endpoint because it has lower latency. The internal endpoint can be used if the Message Queue for Apache Kafka instance and Function Compute are in the same region.
      • Public endpoint: We recommend that you do not use the public endpoint because it has higher latency The public endpoint can be used if the Message Queue for Apache Kafka instance and Function Compute are in different regions. To use the public endpoint, you must enable Internet access for the connector. For more information, see Enable Internet access for Function Compute sink connectors.
      Notice The Service Endpoint parameter is displayed only if you set the Cross-account/Cross-region parameter to Yes.
      http://188***.cn-hangzhou.fc.aliyuncs.com
      Alibaba Cloud Account The ID of the Alibaba Cloud account that is used to log on to Function Compute. In the Function Compute console, you can view the ID of the Alibaba Cloud account in the Common Info section of the Overview page.
      Notice The Alibaba Cloud Account parameter is displayed only if you set the Cross-account/Cross-region parameter to Yes.
      188***
      RAM Role Name The name of the RAM role that Message Queue for Apache Kafka assumes to access Function Compute.
      Notice The RAM Role Name parameter is displayed only if you set the Cross-account/Cross-region parameter to Yes.
      AliyunKafkaConnectorRole
      Service Name The name of the service in Function Compute. guide-hello_world
      Function Name The name of the function in the service in Function Compute. hello_world
      Version or Alias The version or alias of the service in Function Compute.
      Notice
      • You must set this parameter to Specified Version or Specified Alias if you set the Cross-account/Cross-region parameter to No.
      • You must manually specify a service version or a service alias if you set the Cross-account/Cross-region parameter to Yes.
      LATEST
      Service Version The version of the service in Function Compute.
      Notice The Service Version parameter is displayed only if you set the Cross-account/Cross-region parameter to No and the Version or Alias parameter to Specified Version.
      LATEST
      Service Alias The alias of the service in Function Compute.
      Notice The Service Alias parameter is displayed only if you set the Cross-account/Cross-region parameter to No and the Version or Alias parameter to Specified Alias.
      jy
      Transmission Mode The mode in which messages are sent. Valid values:
      • Asynchronous: recommended.
      • Synchronous: not recommended. In synchronous mode, if Function Compute spends a long period of time processing messages, Message Queue for Apache Kafka also requires a long period of time. If Function Compute spends more than 5 minutes processing a batch of messages, the Message Queue for Apache Kafka client rebalances the traffic.
      Asynchronous
      Data Size The maximum number of messages that can be sent at a time. Default value: 20. The connector aggregates the messages to be sent at a time based on the maximum number of messages and the maximum message size allowed in a request. The message size cannot exceed 6 MB in synchronous mode and 128 KB in asynchronous mode. For example, messages are sent in asynchronous mode, and up to 20 messages can be sent at a time. You want to send 18 messages, among which 17 messages have a total size of 127 KB, and one message is 200 KB in size. In this case, the connector aggregates the 17 messages into a single batch and sends this batch first, and then sends the remaining message whose size is more than 128 KB.
      Note If you set the key parameter to null when you send a message, the request does not contain the key parameter. If you set the value parameter to null, the request does not contain the value parameter.
      • If the size of messages in a batch does not exceed the maximum message size allowed in a request, the request contains all the content of the messages. The following code provides a sample request:
        [
            {
                "key":"this is the message's key2",
                "offset":8,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325438,
                "topic":"Test",
                "value":"this is the message's value2",
                "valueSize":28
            },
            {
                "key":"this is the message's key9",
                "offset":9,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325440,
                "topic":"Test",
                "value":"this is the message's value9",
                "valueSize":28
            },
            {
                "key":"this is the message's key12",
                "offset":10,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325442,
                "topic":"Test",
                "value":"this is the message's value12",
                "valueSize":29
            },
            {
                "key":"this is the message's key38",
                "offset":11,
                "overflowFlag":false,
                "partition":4,
                "timestamp":1603785325464,
                "topic":"Test",
                "value":"this is the message's value38",
                "valueSize":29
            }
        ]
      • If the size of a single message exceeds the maximum message size allowed in a request, the request does not contain the content of the message. The following code provides a sample request:
        [
            {
                "key":"123",
                "offset":4,
                "overflowFlag":true,
                "partition":0,
                "timestamp":1603779578478,
                "topic":"Test",
                "value":"1",
                "valueSize":272687
            }
        ]
        Note To obtain the content of the message, you must pull the message by using its offset.
      50
      Retries The maximum number of retries allowed after a message fails to be sent. Default value: 2. Valid values: 1 to 3. In specific cases where a message fails to be sent, retries are not supported. The following information describes the types of error codes and whether retries are supported. For more information, see Error codes.
      • 4XX: Retries are not supported except in the case where 429 is returned.
      • 5XX: Retries are supported.
      Note
      • The connector calls the InvokeFunction operation to send messages to Function Compute.
      • If a message still fails to be sent to Function Compute after all retry attempts allowed are initiated, the message is sent to the dead-letter queue topic. Messages in the dead-letter queue topic cannot trigger the data synchronization task of the connector. We recommend that you set an alert rule for the topic to monitor the topic resources in real time. This way, you can troubleshoot issues at the earliest opportunity.
      2
      After the connector is created, you can view it on the Connectors page.
  7. Go to the Connectors page, find the connector that you created, and click Deploy in the Actions column.
    To configure Function Compute resources, choose More > Configure Function in the Actions column to go to the Function Compute console and complete the operation.

Send a test message

After you deploy the Function Compute sink connector, you can send a message to the data source topic in Message Queue for Apache Kafka to test whether the message can be synchronized to Function Compute.

  1. On the Connectors page, find the connector that you created, and click Test in the Actions column.
  2. In the Send Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        • If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
        • If you do not want to send the test message to a specific partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.

View function logs

After you send a message to the data source topic in Message Queue for Apache Kafka, you can view the function logs to check whether the message is received. For more information, see Configure logging.

The test message that you sent appears in the logs, as shown in the following figure.

fc LOG