This topic describes how to create a Function Compute sink connector to synchronize data from a topic in your Message Queue for Apache Kafka instance to a function in Function Compute.
Prerequisites
- The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
- A data source topic is created in the Message Queue for Apache Kafka instance . For more information, see Step 1: Create a topic.
A topic named fc-test-input is used in this example.
- A function is created in Function Compute. For more information, see Create a function in the Function Compute console.
Notice The function that you create must be an event function.
An event function named hello_world is used in this example. This is an event function in the guide-hello_world service that runs in the Python runtime environment. The following sample code provides an example of the function:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
Procedure
To synchronize data from a topic in your Message Queue for Apache Kafka instance to a function in Function Compute by using a Function Compute sink connector, perform the following steps:
- Optional:Enable Internet access for Function Compute sink connectorsNotice If you do not want Function Compute sink connectors to access Function Compute across regions, skip this step.
- Optional:Enable the Function Compute sink connector to access Function Compute across Alibaba
Cloud accounts.
Notice If you do not want Function Compute sink connectors to access Function Compute across Alibaba Cloud accounts, skip this step.
- Optional:Create a custom policy
- Optional:Create a RAM role
- Optional:Grant permissions
- Optional:Create the topics and consumer groups that the Function Compute sink connector requires.
Notice
- If you do not want to customize the names of the topics and consumer groups, skip this step.
- Some topics that are required by a Function Compute sink connector must use a local storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, you cannot manually create topics that use a local storage engine. In major version 0.10.2, these topics must be automatically created.
- Create and deploy a Function Compute sink connector
- Verify the result.
Enable Internet access for Function Compute sink connectors
If you want Function Compute sink connectors to access other Alibaba Cloud services across regions, enable Internet access for the Function Compute sink connectors. For more information, see Enable Internet access for a connector.
Create a custom policy
You can create a custom policy to access Function Compute by using the Alibaba Cloud account within which you want to use Function Compute.
- Log on to the Resource Access Management (RAM) console.
- In the left-side navigation pane, choose .
- On the Policies page, click Create Policy.
- On the Create Custom Policy page, create a custom policy.
Create a RAM role
You can create a RAM role by using the Alibaba Cloud account within which you want to use Function Compute. When you create a RAM role, select a supported Alibaba Cloud service as the trusted service. You cannot select Message Queue for Apache Kafka as the trusted service of the RAM role. After you create the RAM role, you can modify the trust policy of the created RAM role.
Grant permissions
You can grant the created RAM role the permissions to access Function Compute by using the Alibaba Cloud account within which you want to use Function Compute.
- In the left-side navigation pane, click RAM Roles.
- On the RAM Roles page, find AliyunKafkaConnectorRole and click Add Permissions in the Actions column.
- In the Add Permissions panel, attach the KafkaConnectorFcAccess policy.
- In the Select Policy section, click Custom Policy.
- In the Authorization Policy Name column, find and click KafkaConnectorFcAccess.
- Click OK.
- Click Complete.
Create topics that are required by a Function Compute sink connector
In the Message Queue for Apache Kafka console, you can manually create the five topics that the Function Compute sink connector requires. The five topics include the offset storage topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in the partition count and storage engine. For more information, see Table 1.
Create consumer groups that are required by a Function Compute sink connector
In the Message Queue for Apache Kafka console, you can manually create the consumer group that is required by a Function Compute sink connector. The name of the consumer group must be in the connect-Task name format. For more information, see Table 1.
Create and deploy a Function Compute sink connector
You can create and deploy a Function Compute sink connector that synchronizes data from Message Queue for Apache Kafka to Function Compute.
Send a test message
After you deploy the Function Compute sink connector, you can send a message to the data source topic in Message Queue for Apache Kafka to test whether the message can be synchronized to Function Compute.
- On the Connectors page, find the connector that you created, and click Test in the Actions column.
- In the Send Message panel, set the parameters or use the method as prompted to send a test message.
- Set the Method of Sending parameter to Console.
- In the Message Key field, enter the key of the test message, such as demo.
- In the Message Content field, enter the content of the test message, such as {"key": "test"}.
- Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
- If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field, such as 0. For more information about how to query partition IDs, see View partition status.
- If you do not want to send the test message to a specific partition, click No.
- Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
- Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.
- Set the Method of Sending parameter to Console.
View function logs
After you send a message to the data source topic in Message Queue for Apache Kafka , you can view the function logs to check whether the message is received. For more information, see Configure Log Service resources and view function execution logs.
The test message that you sent appears in the logs.
