This topic describes how to create an AnalyticDB sink connector to export data from a data source topic of a ApsaraMQ for Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database by using Alibaba Cloud Function Compute.

Prerequisites

The following requirements must be met:

Usage notes

  • You can only export data from a data source topic of a ApsaraMQ for Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database through Function Compute within the same region. For information about the limits on connectors, see Limits.
  • AnalyticDB sink connectors export data by using Function Compute. Function Compute provides a free quota of resources for you. If your usage exceeds this free quota, you are charged for the excess based on the billing rules of Function Compute. For more information, see Billing overview.
  • Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure the logging feature.
  • ApsaraMQ for Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
  • If you specify a private endpoint of the destination database for the AnalyticDB sink connector, you must specify the same virtual private cloud (VPC ) and vSwitch as those of the destination database for the corresponding function in the Function Compute console. Otherwise, Function Compute cannot access the destination database. For more information, see Update a service.
  • When you create a connector, ApsaraMQ for Kafka creates a service-linked role for you.
    • If no service-linked role is available, ApsaraMQ for Kafka automatically creates a service-linked role for you to use an AnalyticDB sink connector to export data from ApsaraMQ for Kafka to AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
    • If a service-linked role is available, ApsaraMQ for Kafka does not create a new one.
    For more information about service-linked roles, see Service-linked roles.

Procedure

This section describes how to use an AnalyticDB sink connector to export data from a data source topic of a ApsaraMQ for Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database.

  1. Optional: Create the topics and group that are required by an AnalyticDB sink connector.

    If you do not want to manually create the topics and group, skip this step and set the Resource Creation Method parameter to Auto in the next step.

    Important Specific topics that are required by an AnalyticDB sink connector must use a local storage engine. If the major version of your ApsaraMQ for Kafka instance is 0.10.2, topics that use a local storage engine cannot be manually created. In this version, these topics must be automatically created.
    1. Create the topics that are required by an AnalyticDB sink connector
    2. Create the group that is required by an AnalyticDB sink connector
  2. Create and deploy an AnalyticDB sink connector
  3. Configure Function Compute and AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
    1. Configure the related Function Compute service
    2. Configure AnalyticDB for MySQL or AnalyticDB for PostgreSQL
  4. Verify the result.
    1. Send test messages
    2. Verify the data export result

Create the topics that are required by an AnalyticDB sink connector

In the ApsaraMQ for Kafka console, you can manually create the five topics that are required by an AnalyticDB sink connector. The five topics are the task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in storage engine and the number of partitions. For more information, see Parameters in the Configure Source Service step.

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
    Important You must create topics in the region where your application is deployed. When you create a topic, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if your message producers and consumers run on ECS instances that are deployed in the China (Beijing) region, create topics in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, configure the parameters and click OK.
    Create a topic
    ParameterDescriptionExample
    NameThe name of the topic. demo
    DescriptionThe description of the topic. demo test
    PartitionsThe number of partitions in the topic. 12
    Storage Engine
    Note For Standard Edition Message Queue for Apache Kafka instances, you can specify only Cloud Storage for the Storage Engine parameter.
    The storage engine of the topic.

    ApsaraMQ for Kafka supports the following storage engines:

    • Cloud Storage: If you specify this value, the system uses Alibaba Cloud disks for the topic and stores data in three replicas in distributed mode. This storage engine provides the following benefits: low latency, high performance, durability, and high reliability. If you set the Instance Edition parameter to Standard (High Write) when you created the instance, you can specify only Cloud Storage as the value of the Storage Engine parameter.
    • Local Storage: If you specify this value, the system uses the in-sync replicas (ISR) algorithm of open source Apache Kafka and stores data in three replicas in distributed mode.
    Cloud Storage
    Message TypeThe message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the order of the messages may not be preserved in affected partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which the messages are sent. When a broker in the cluster fails, the messages are still stored in the affected partitions in the order in which the messages are sent. The affected partitions cannot store new messages until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup PolicyThe log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must configure the Log Cleanup Policy parameter. You can set the Storage Engine to Local Storage only for Professional Edition Message Queue for Apache Kafka instances.

    ApsaraMQ for Kafka provides the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If sufficient storage space is available in the system, messages are retained based on the maximum retention period. After the storage usage exceeds 85%, the system deletes messages in the order in which the messages are stored. The earliest message that is stored is the first message that is deleted. This helps ensure that the performance of the service is not degraded.
    • Compact: The Apache Kafka log compaction policy is used. Log compaction ensures that Apache Kafka retains at least the last known value for each message key. This policy applies to scenarios such as restoring the system state after the application crashes or the system fails, or reloading caches after the application restarts during operational maintenance. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the information about the system status and configurations in a log-compacted topic.
      Important You can use log-compacted topics only in specific cloud-native components such as Kafka Connect and Confluent Schema Registry. You cannot use the log compaction policy for a topic that is used to send and receive messages in other components. For more information, see aliware-kafka-demos.
    Compact
    TagThe tags that you want to attach to the topic. demo
    After the topic is created, the topic is displayed on the Topics page.

Create the group that is required by an AnalyticDB sink connector

In the ApsaraMQ for Kafka console, you can manually create the group that is required by an AnalyticDB sink connector. The name of the group must be in the connect-Task name format. For more information, see Parameters in the Configure Source Service step.

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Groups.
  5. On the Groups page, click Create Group.
  6. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK.
    After the group is created, you can view the group on the Groups page.

Create and deploy an AnalyticDB sink connector

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. In the left-side navigation pane, click Connectors.
  4. On the Connectors page, click Create Connector.
  5. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.
      ParameterDescriptionExample
      NameThe name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • Each connector name must be unique within a ApsaraMQ for Kafka instance.

      The name of the group that is used by the connector task must be in the connect-Task name format. If you have not already created such a group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-adb-sink
      InstanceThe information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are described in the following table, and then click Next.
      Table 1. Parameters in the Configure Source Service step
      ParameterDescriptionExample
      Data Source TopicThe name of the data source topic from which data is to be exported. adb-test-input
      Consumer Thread ConcurrencyThe number of concurrent consumer threads used to export data from the data source topic. Default value: 6. Valid values:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      Consumer OffsetThe offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC IDThe ID of the VPC where the data export task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the ApsaraMQ for Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch IDThe ID of the vSwitch where the data export task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the ApsaraMQ for Kafka instance. The default value is the vSwitch ID that you specified when you deployed the ApsaraMQ for Kafka instance. vsw-bp1d2jgg81***
      Failure Handling PolicySpecifies whether to retain the subscription to the partition where an error causes a message send failure. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and returns the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and returns the logs.
      Note
      • For information about how to view connector logs, see Manage a connector.
      • For more information about how to troubleshoot errors based on error codes, see Error codes.
      Continue Subscription
      Resource Creation MethodThe method to create the topics and group that are required by the AnalyticDB sink connector. Click Configure Runtime Environment to display the parameter. Valid values:
      • Auto
      • Manual
      Auto
      Connector Consumer GroupThe group that is used by the connector.Group Click Configure Runtime Environment to display the parameter. The name of the group must be in the connect-Task name format. connect-kafka-adb-sink
      Task Offset TopicThe topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.
      connect-offset-kafka-adb-sink
      Task Configuration TopicThe topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.
      connect-config-kafka-adb-sink
      Task Status TopicThe topic that is used to store task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.
      connect-status-kafka-adb-sink
      Dead-letter Queue TopicThe topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-adb-sink
      Error Data TopicThe topic that is used to store the error data of the Sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-adb-sink
    3. In the Configure Destination Service step, select AnalyticDB as the destination service, set the parameters that are described in the following table, and then click Create.
      ParameterDescriptionExample
      Instance TypeThe type of the destination database instance. Valid values: AnalyticDB for MySQL and AnalyticDB for PostgreSQL. AnalyticDB for MySQL
      AnalyticDB Instance IDThe ID of the destination AnalyticDB for MySQL or AnalyticDB for PostgreSQL instance. am-bp139yqk8u1ik****
      Database NameThe name of the destination database. adb_demo
      Table NameThe name of the table within the destination database where the exported data is stored. user
      Database UsernameThe username that you use to log on to the destination database. adbmysql
      Database PasswordThe password that you use to log on to the destination database. The password is specified when you create the destination AnalyticDB for MySQL AnalyticDB for PostgreSQL instance. If you forget the password, you can reset it.
      • If you want to reset the password of an AnalyticDB for MySQL database account, perform the steps described in Reset the password of a privileged account.
      • If you want to reset the password of an AnalyticDB for PostgreSQL database account, log on to the AnalyticDB for PostgreSQL console and click the destination instance. In the left-side navigation pane, click Account Management, locate the database account for which you want to reset the password, and click Reset Password in the Actions column.
      ********
      Note The username and password are passed to the functions in Function Compute as environment variables when ApsaraMQ for Kafka creates a data export task. After the task is created, ApsaraMQ for Kafka does not save the username or password.
      After the connector is created, you can view it on the Connectors page.
  6. Go to the Connectors page, find the connector that you created, and click Deploy in the Actions column.

Configure the related Function Compute service

After an AnalyticDB sink connector is created and deployed in the ApsaraMQ for Kafka console, Function Compute automatically creates a function service and a function for the connector. The function service is named in the kafka-service-<connector_name>-<Random string> format, and the function is named in the fc-adb-<Random string> format.

  1. On the Connectors page, locate the connector for which you want to configure the Function Compute service and click Configure Function in the Actions column.
    The page is redirected to the Function Compute console.
  2. In the Function Compute console, find the automatically created service and configure a VPC and vSwitch for the service. For more information, see Update a service.

Configure AnalyticDB for MySQL or AnalyticDB for PostgreSQL

After the Function Compute service is deployed, you must add the CIDR block for the VPC that you specify in the Function Compute console to the whitelist for the destination AnalyticDB for MySQL or AnalyticDB for PostgreSQL instance. You can view the CIDR block on the vSwitch page of the VPC console. The CIDR block is in the row where the VPC and vSwitch of the Function Compute service reside.

Send test messages

You can send messages to the data source topic of a ApsaraMQ for Kafka instance to check whether data in the topic can be exported to AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
Note The value of the Message Content parameter must be in the JSON format and will be parsed to key-value pairs. The keys are the column names of the destination database table and the values are the data in the columns. Therefore, make sure that each key of the message content has a corresponding column name in the destination database table.ApsaraMQ for Kafka You can log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console and connect to the destination database to check the column names of the destination database table.
  1. On the Connectors page, find the connector that you want to use and click Test in the Actions column.
  2. In the Send Message panel, configure the required parameters to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the message. For example, you can enter demo as the key of the message.
      2. In the Message Content field, enter the content of the message. For example, you can enter {"key": "test"} as the content of the message.
      3. Configure the Send to Specified Partition parameter to specify whether to send the message to a specified partition.
        • If you want to send the message to a specified partition, click Yes and enter the partition ID in the Partition ID field. For example, you can enter 0 as the partition ID. For information about how to query partition IDs, see View partition status.
        • If you do not want to send the message to a specified partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands that are provided in the Run the Docker container to produce a sample message section to send a test message.
    • Set the Method of Sending parameter to SDK and click the link to the topic that describes how to obtain and use the SDK that you want to use. Then, use the SDK to send and consume a test message. Message Queue for Apache Kafka provides topics that describe how to use SDKs for different programming languages based on different connection types.

Verify the data export result

After you send messages to the data source topic of a ApsaraMQ for Kafka instance, log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console, and connect to the destination database.On the SQLConsole command window of the Data Management Service 5.0 console, click the destination table to check whether the data in the data source topic is exported successfully.

The following figure shows the result of a data export task from ApsaraMQ for Kafka to AnalyticDB for MySQL:ADB-Connector-Result