This topic describes how to create a Function Compute sink connector to synchronize data from a source topic in your Message Queue for Apache Kafka instance to a function in Function Compute.
Prerequisites
- Message Queue for Apache Kafka
- The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
- A topic is created in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.
In this example, the topic is named fc-test-input.
- Function Compute
- A function is created in Function Compute. For more information, see Create a function in the Function Compute console.
Important The function that you create must be an event function.
In this example, an event function named hello_world is created for the guide-hello_world service that runs in the Python runtime environment. Function code:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
- A function is created in Function Compute. For more information, see Create a function in the Function Compute console.
- Optional:EventBridge
- EventBridge is activated. For more information about how to activate EventBridge, see Activate EventBridge and grant permissions to a RAM user.
Note You need to activate EventBridge only if the instance to which the Function Compute sink connector belongs is in the China (Hangzhou) or China (Chengdu) region.
Usage notes
- You can synchronize data from a source topic in a Message Queue for Apache Kafka instance to a function in Function Compute only within the same region. For more information about the limits on connectors, see Limits.
- If the instance to which the Function Compute sink connector belongs is in the China
(Hangzhou) or China (Chengdu) region, the connector is deployed to EventBridge.
- You can use EventBridge free of charge. For more information, see Billing.
- When you create a connector, EventBridge automatically creates the following service-linked roles: AliyunServiceRoleForEventBridgeSourceKafka and AliyunServiceRoleForEventBridgeConnectVPC.
- If these service-linked roles are unavailable, EventBridge automatically creates them so that EventBridge can use these roles to access Message Queue for Apache Kafka and Virtual Private Cloud (VPC).
- If these service-linked roles are available, EventBridge does not create them again.
- You cannot view the operation logs of tasks that are deployed to EventBridge. After a connector is run, you can check the progress of the synchronization task by viewing the consumption details of the consumer group that subscribes to the source topic. For more information, see View consumption details.
Procedure
To use a Function Compute sink connector to synchronize data from a source topic in a Message Queue for Apache Kafka instance to a function in Function Compute, perform the following steps:
- Optional:Allow Function Compute sink connectors to access Function Compute across regions.
Important If you do not need to use Function Compute sink connectors to access Function Compute across regions, skip this step.
- Optional:Allow Function Compute sink connectors to access Function Compute across Alibaba Cloud
accounts.
Important If you do not need to use Function Compute sink connectors to access Function Compute across Alibaba Cloud accounts, skip this step.
- Optional:Create the topics and consumer group that are required by a Function Compute sink
connector.
Important
- If you do not need to customize the names of the topics and consumer group, skip this step.
- Specific topics that are required by a Function Compute sink connector must use the local storage engine. If the major version of your Message Queue for Apache Kafka instance is V0.10.2, you cannot manually create topics that use the local storage engine. In this case, these topics must be automatically created.
- Create and deploy a Function Compute sink connector
- Verify the result.
Enable Internet access for Function Compute sink connectors
If you need to use Function Compute sink connectors to access other Alibaba Cloud services across regions, enable Internet access for Function Compute sink connectors. For more information, see Enable Internet access for a connector.
Create a custom policy.
Create a custom policy to grant access to Function Compute within the Alibaba Cloud account to which you want to synchronize data.
- Log on to the RAM console.
- In the left-side navigation pane, choose .
- On the Policies page, click Create Policy.
- On the Create Policy page, create a custom policy.
Create a RAM role
Create a RAM role within the Alibaba Cloud account to which you want to synchronize data. You cannot select Message Queue for Apache Kafka as the trusted service when you create a RAM role. Therefore, select a supported service as the trusted service first. Then, modify the trust policy of the created RAM role.
Grant permissions to the RAM role
Grant the created RAM role the permissions to access Function Compute within the Alibaba Cloud account to which you want to synchronize data.
- In the left-side navigation pane, click RAM Roles.
- On the Roles page, find AliyunKafkaConnectorRole and click Add Permissions in the Actions column.
- In the Add Permissions panel, attach the KafkaConnectorFcAccess policy to the RAM role.
- In the Select Policy section, click Custom Policy.
- In the Authorization Policy Name column, find and click KafkaConnectorFcAccess.
- Click OK.
- Click Complete.
Create the topics that are required by a Function Compute sink connector
In the Message Queue for Apache Kafka console, create the following topics that are required by a Function Compute sink connector: task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. These topics differ in the partition count and storage engine. For more information, see Parameters in the Configure Source Service step.
Create the consumer group that is required by a Function Compute sink connector
In the Message Queue for Apache Kafka console, create the consumer group that is required by a Function Compute sink connector. The name of the consumer group must be in the connect-Task name format. For more information, see Parameters in the Configure Source Service step.
Create and deploy a Function Compute sink connector
Create and deploy a Function Compute sink connector that synchronizes data from Message Queue for Apache Kafka to Function Compute.
Send a test message
After you deploy the Function Compute sink connector, you can send a message to the source topic in the Message Queue for Apache Kafka instance to test whether the message can be synchronized to Function Compute.
- On the Connectors page, find the connector that you want to use and click Test in the Actions column.
- In the Send Message panel, configure the required parameters to send a test message.
- Set the Method of Sending parameter to Console.
- In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
- In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
- Configure the Send to Specified Partition parameter to specify whether to send the message to a specific partition.
- If you want to send the message to a specific partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
- If you do not want to send the message to a specific partition, click No.
- Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
- Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.
- Set the Method of Sending parameter to Console.
View function logs
After you send a message to the source topic in the Message Queue for Apache Kafka instance, you can view the function logs to check whether the message is received. For more information, see Configure the logging feature.
If the test message that you sent appears in the logs as shown in the following figure, the data synchronization task is successful.
