This topic describes how to create a Function Compute sink connector to synchronize data from a source topic in your ApsaraMQ for Kafka instance to a function in Function Compute.
Prerequisites
The following requirements are met:- ApsaraMQ for Kafka
- The connector feature is enabled for the ApsaraMQ for Kafka instance. For more information, see Enable the connector feature.
- A topic is created in the ApsaraMQ for Kafka instance. For more information, see Step 1: Create a topic.
In this example, the topic is named fc-test-input.
- Function Compute
- A function is created in Function Compute. For more information, see Create a function in the Function Compute console. Important The function that you create must be an event function.
In this example, an event function named hello_world is created for the guide-hello_world service that runs in the Python runtime environment. Function code:
# -*- coding: utf-8 -*- import logging # To enable the initializer feature # please implement the initializer function as below: # def initializer(context): # logger = logging.getLogger() # logger.info('initializing') def handler(event, context): logger = logging.getLogger() logger.info('hello world:' + bytes.decode(event)) return 'hello world:' + bytes.decode(event)
- A function is created in Function Compute. For more information, see Create a function in the Function Compute console.
- Optional:EventBridge
- EventBridge is activated. For more information about how to activate EventBridge, see Activate EventBridge and grant permissions to a RAM user.
Note You need to activate EventBridge only if the instance to which the Function Compute sink connector belongs is in the China (Hangzhou) or China (Chengdu) region.
Usage notes
- You can synchronize data from a source topic in a ApsaraMQ for Kafka instance to a function in Function Compute only within the same region. For more information about the limits on connectors, see Limits.
- If the instance to which the Function Compute sink connector belongs is in the China (Hangzhou) or China (Chengdu) region, the connector is deployed to EventBridge.
- You can use EventBridge free of charge. For more information, see Billing.
- When you create a connector, EventBridge automatically creates the following service-linked roles: AliyunServiceRoleForEventBridgeSourceKafka and AliyunServiceRoleForEventBridgeConnectVPC.
- If these service-linked roles are unavailable, EventBridge automatically creates them so that EventBridge can use these roles to access ApsaraMQ for Kafka and Virtual Private Cloud (VPC).
- If these service-linked roles are available, EventBridge does not create them again.
- You cannot view the operation logs of tasks that are deployed to EventBridge. After a connector is run, you can check the progress of the synchronization task by viewing the consumption details of the consumer group that subscribes to the source topic. For more information, see View consumption details.
Procedure
To use a Function Compute sink connector to synchronize data from a source topic in a ApsaraMQ for Kafka instance to a function in Function Compute, perform the following steps:
- Optional:Allow Function Compute sink connectors to access Function Compute across regions.Important If you do not need to use Function Compute sink connectors to access Function Compute across regions, skip this step.
- Optional:Allow Function Compute sink connectors to access Function Compute across Alibaba Cloud accounts.Important If you do not need to use Function Compute sink connectors to access Function Compute across Alibaba Cloud accounts, skip this step.
- Optional:Create the topics and consumer group that are required by a Function Compute sink connector.Important
- If you do not need to customize the names of the topics and consumer group, skip this step.
- Specific topics that are required by a Function Compute sink connector must use the local storage engine. If the major version of your ApsaraMQ for Kafka instance is V0.10.2, you cannot manually create topics that use the local storage engine. In this case, these topics must be automatically created.
- Create and deploy a Function Compute sink connector
- Verify the result.
Enable Internet access for Function Compute sink connectors
If you need to use Function Compute sink connectors to access other Alibaba Cloud services across regions, enable Internet access for Function Compute sink connectors. For more information, see Enable Internet access for a connector.
Create a custom policy.
Create a custom policy to grant access to Function Compute within the Alibaba Cloud account to which you want to synchronize data.
- Log on to the RAM console.
- In the left-side navigation pane, choose .
- On the Policies page, click Create Policy.
- On the Create Policy page, create a custom policy.
Create a RAM role
Create a RAM role within the Alibaba Cloud account to which you want to synchronize data. You cannot select ApsaraMQ for Kafka as the trusted service when you create a RAM role. Therefore, select a supported service as the trusted service first. Then, modify the trust policy of the created RAM role.
- In the left-side navigation pane, click RAM Roles.
- On the RAM Roles page, click Create RAM Role.
- In the Create Role panel, create a RAM role.
- Set the Select Trusted Entity parameter to Alibaba Cloud Service and click Next.
- Set the Role Type parameter to Normal Service Role. In the RAM Role Name field, enter AliyunKafkaConnectorRole. From the Select Trusted Service drop-down list, select Function Compute. Then, click OK.
- On the Roles page, find and click AliyunKafkaConnectorRole.
- On the AliyunKafkaConnectorRole page, click the Trust Policy Management tab. On this tab, click Edit Trust Policy.
- In the Edit Trust Policy panel, replace fc in the script with alikafka and click OK.
Grant permissions to the RAM role
Grant the created RAM role the permissions to access Function Compute within the Alibaba Cloud account to which you want to synchronize data.
- In the left-side navigation pane, click RAM Roles.
- On the Roles page, find AliyunKafkaConnectorRole and click Add Permissions in the Actions column.
- In the Add Permissions panel, attach the KafkaConnectorFcAccess policy to the RAM role.
- In the Select Policy section, click Custom Policy.
- In the Authorization Policy Name column, find and click KafkaConnectorFcAccess.
- Click OK.
- Click Complete.
Create the topics that are required by a Function Compute sink connector
In the ApsaraMQ for Kafka console, create the following topics that are required by a Function Compute sink connector: task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. These topics differ in the partition count and storage engine. For more information, see Parameters in the Configure Source Service step.
- Log on to the ApsaraMQ for Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance is deployed. Important You must create topics in the region where your application is deployed. When you create a topic, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if your message producers and consumers run on ECS instances that are deployed in the China (Beijing) region, create topics in the China (Beijing) region.
- On the Instances page, click the name of the instance that you want to manage.
- In the left-side navigation pane, click Topics.
- On the Topics page, click Create Topic.
- In the Create Topic panel, configure the parameters and click OK.
Parameter Description Example Name The name of the topic. demo Description The description of the topic. demo test Partitions The number of partitions in the topic. 12 Storage Engine Note For Standard Edition Message Queue for Apache Kafka instances, you can specify only Cloud Storage for the Storage Engine parameter.The storage engine of the topic. ApsaraMQ for Kafka supports the following storage engines:
- Cloud Storage: If you specify this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine provides the following benefits: low latency, high performance, durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can specify only Cloud Storage as the value of the Storage Engine parameter.
- Local Storage: If you specify this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.
Cloud Storage Message Type The message type of the topic. - Normal Message: By default, messages of the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the order of the messages may not be preserved in affected partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
- Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the messages are still stored in the affected partitions in the order in which the messages are sent. The affected partitions cannot store new messages until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
Normal Message Log Cleanup Policy The log cleanup policy for the topic. If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances.
ApsaraMQ for Kafka provides the following log cleanup policies:
- Delete: The default log cleanup policy is used. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes messages in the order in which the messages are stored. The earliest message that is stored is the first message that is deleted. This helps ensure that the performance of the service is not degraded.
- Compact: The Apache Kafka log compaction policy is used. Log compaction ensures that Apache Kafka retains at least the last known value for each message key. This policy applies to scenarios such as restoring the system state after the application crashes or the system fails, or reloading caches after the application restarts during operational maintenance. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic. Important You can use log-compacted topics only in specific cloud-native components such as Kafka Connect and Confluent Schema Registry. You cannot use the log compaction policy for a topic that is used to send and receive messages in other components. For more information, see aliware-kafka-demos.
Compact Tag The tags that you want to attach to the topic. demo After the topic is created, the topic is displayed on the Topics page.
Create the consumer group that is required by a Function Compute sink connector
In the ApsaraMQ for Kafka console, create the consumer group that is required by a Function Compute sink connector. The name of the consumer group must be in the connect-Task name format. For more information, see Parameters in the Configure Source Service step.
- Log on to the ApsaraMQ for Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
- On the Instances page, click the name of the instance that you want to manage.
- In the left-side navigation pane, click Groups.
- On the Groups page, click Create Group.
- In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK. After the group is created, you can view the group on the Groups page.
Create and deploy a Function Compute sink connector
Create and deploy a Function Compute sink connector that synchronizes data from ApsaraMQ for Kafka to Function Compute.
- Log on to the ApsaraMQ for Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
- In the left-side navigation pane, click Connectors.
- On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
- Complete the Create Connector wizard.
- Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column. To configure Function Compute resources, choose Actions column to go to the Function Compute console and complete the configuration.in the
Send a test message
After you deploy the Function Compute sink connector, you can send a message to the source topic in the ApsaraMQ for Kafka instance to test whether the message can be synchronized to Function Compute.
- On the Connectors page, find the connector that you want to use and click Test in the Actions column.
- In the Send Message panel, configure the required parameters to send a test message.
- Set the Method of Sending parameter to Console.
- In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
- In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
- Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.
- If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
- If you do not want to send the message to a specified partition, click No.
- Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
- Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.
- Set the Method of Sending parameter to Console.
View function logs
After you send a message to the source topic in the ApsaraMQ for Kafka instance, you can view the function logs to check whether the message is received. For more information, see Configure the logging feature.
If the test message that you sent appears in the logs as shown in the following figure, the data synchronization task is successful.