This topic describes how to create an AnalyticDB sink connector to export data from a data source topic of a Message Queue for Apache Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database by using Alibaba Cloud Function Compute.
Prerequisites
- Message Queue for Apache Kafka
- The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
- A data source topic is created in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.
- Function Compute
- Function Compute is activated. For more information, see Activate Function Compute.
- AnalyticDB for MySQL and AnalyticDB for PostgreSQL
- If you want to export data to an AnalyticDB for MySQL database, make sure that you have created a cluster and a database account, connected to the cluster, and created a database in the cluster in the AnalyticDB for MySQL console. For more information, see Create a cluster, Create a database account, Connect to an AnalyticDB for MySQL cluster, and Create a database.
- If you want to export data to an AnalyticDB for PostgreSQL database, make sure that you have created an instance and a database account and connected to the database in the AnalyticDB for PostgreSQL console. For more information, see Create an instance, Create a database account, and Use client tools to connect to an instance.
Usage notes
- You can only export data from a data source topic of a Message Queue for Apache Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database through Function Compute within the same region. For information about the limits on connectors, see Limits.
- AnalyticDB sink connectors export data by using Function Compute. Function Compute provides a free quota of resources for you. If your usage exceeds this free quota, you are charged for the excess based on the billing rules of Function Compute. For more information, see Billing.
- Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure logging.
- Message Queue for Apache Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
- If you specify a private endpoint of the destination database for the AnalyticDB sink connector, you must specify the same virtual private cloud (VPC ) and vSwitch as those of the destination database for the corresponding function in the Function Compute console. Otherwise, Function Compute cannot access the destination database. For more information, see Update a Service.
- When you create a connector, Message Queue for Apache Kafka creates a service-linked role for you.
- If no service-linked role is available, Message Queue for Apache Kafka automatically creates a service-linked role for you to use an AnalyticDB sink connector to export data from Message Queue for Apache Kafka to AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
- If a service-linked role is available, Message Queue for Apache Kafka does not create a new one.
Procedure
This section describes how to use an AnalyticDB sink connector to export data from a data source topic of a Message Queue for Apache Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database.
- Optional: Create the topics and group that are required by an AnalyticDB sink connector.
If you do not want to manually create the topics and group, skip this step and set the Resource Creation Method parameter to Auto in the next step.
Notice Specific topics that are required by an AnalyticDB sink connector must use a local storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, topics that use a local storage engine cannot be manually created. In this version, these topics must be automatically created. - Create and deploy an AnalyticDB sink connector
- Configure Function Compute and AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
- Verify the result.
Create the topics that are required by an AnalyticDB sink connector
In the Message Queue for Apache Kafka console, you can manually create the five topics that are required by an AnalyticDB sink connector. The five topics are the task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in storage engine and the number of partitions. For more information, see Table 1.
Create the group that is required by an AnalyticDB sink connector
In the Message Queue for Apache Kafka console, you can manually create the group that is required by an AnalyticDB sink connector. The name of the group must be in the connect-Task name format. For more information, see Table 1.
Create and deploy an AnalyticDB sink connector
- Log on to the Message Queue for Apache Kafka console.
- In the Resource Distribution section of the Overview page, select the region where your instance resides.
- In the left-side navigation pane, click Connectors.
- On the Connectors page, click Create Connector.
- In the Create Connector wizard, perform the following steps:
- Go to the Connectors page, find the connector that you created, and click Deploy in the Actions column.
Configure the related Function Compute service
After an AnalyticDB sink connector is created and deployed in the Message Queue for Apache Kafka console, Function Compute automatically creates a function service and a function for the connector. The function service is named in the kafka-service-<connector_name>-<Random string> format, and the function is named in the fc-adb-<Random string> format.
Configure AnalyticDB for MySQL or AnalyticDB for PostgreSQL
After the Function Compute service is deployed, you must add the CIDR block for the VPC that you specify in the Function Compute console to the whitelist for the destination AnalyticDB for MySQL or AnalyticDB for PostgreSQL instance. You can view the CIDR block on the vSwitch page of the VPC console. The CIDR block is in the row where the VPC and vSwitch of the Function Compute service reside.
- You should log on to the AnalyticDB for MySQL console to set the whitelist for a cluster. For more information, see Configure a whitelist.
- You should log on to the AnalyticDB for PostgreSQL console to set the whitelist for a cluster. For more information, see Configure an IP address whitelist.
Send test messages
- On the Connectors page, find the connector that you created, and click Test in the Actions column.
- In the Send Message panel, set the parameters or use the method as prompted to send a test message.
- Set the Method of Sending parameter to Console.
- In the Message Key field, enter the key of the test message, such as demo.
- In the Message Content field, enter the content of the test message, such as {"key": "test"}.
- Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
- If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
- If you do not want to send the test message to a specific partition, click No.
- Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
- Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.
- Set the Method of Sending parameter to Console.
Verify the data export result
After you send messages to the data source topic of a Message Queue for Apache Kafka instance, log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console, and connect to the destination database.On the SQLConsole command window of the Data Management Service 5.0 console, click the destination table to check whether the data in the data source topic is exported successfully.
