This topic describes how to create an AnalyticDB sink connector to export data from a data source topic of a Message Queue for Apache Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database by using Alibaba Cloud Function Compute.

Prerequisites

The following requirements must be met:

Usage notes

  • You can only export data from a data source topic of a Message Queue for Apache Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database through Function Compute within the same region. For information about the limits on connectors, see Limits.
  • AnalyticDB sink connectors export data by using Function Compute. Function Compute provides a free quota of resources for you. If your usage exceeds this free quota, you are charged for the excess based on the billing rules of Function Compute. For more information, see Billing.
  • Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure logging.
  • Message Queue for Apache Kafka serializes messages into UTF-8-encoded strings for transfer. Message Queue for Apache Kafka does not support binary data.
  • If you specify a private endpoint of the destination database for the AnalyticDB sink connector, you must specify the same virtual private cloud (VPC ) and vSwitch as those of the destination database for the corresponding function in the Function Compute console. Otherwise, Function Compute cannot access the destination database. For more information, see Update a Service.
  • When you create a connector, Message Queue for Apache Kafka creates a service-linked role for you.
    • If no service-linked role is available, Message Queue for Apache Kafka automatically creates a service-linked role for you to use an AnalyticDB sink connector to export data from Message Queue for Apache Kafka to AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
    • If a service-linked role is available, Message Queue for Apache Kafka does not create a new one.
    For more information about service-linked roles, see Service-linked roles.

Procedure

This section describes how to use an AnalyticDB sink connector to export data from a data source topic of a Message Queue for Apache Kafka instance to an AnalyticDB for MySQL or AnalyticDB for PostgreSQL database.

  1. Optional: Create the topics and group that are required by an AnalyticDB sink connector.

    If you do not want to manually create the topics and group, skip this step and set the Resource Creation Method parameter to Auto in the next step.

    Notice Specific topics that are required by an AnalyticDB sink connector must use a local storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, topics that use a local storage engine cannot be manually created. In this version, these topics must be automatically created.
    1. Create the topics that are required by an AnalyticDB sink connector
    2. Create the group that is required by an AnalyticDB sink connector
  2. Create and deploy an AnalyticDB sink connector
  3. Configure Function Compute and AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
    1. Configure the related Function Compute service
    2. Configure AnalyticDB for MySQL or AnalyticDB for PostgreSQL
  4. Verify the result.
    1. Send test messages
    2. Verify the data export result

Create the topics that are required by an AnalyticDB sink connector

In the Message Queue for Apache Kafka console, you can manually create the five topics that are required by an AnalyticDB sink connector. The five topics are the task offset topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in storage engine and the number of partitions. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
    Notice You must create a topic in the region where your application resides. To do this, select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if a topic is created in the China (Beijing) region, the message producer and consumer must run on ECS instances that reside in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, set the properties of the topic and click OK.
    Create a topic
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions in the topic. 12
    Storage Engine The storage engine of the topic.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If this option is selected, disks provided by Alibaba Cloud are used and three replicas are stored in distributed mode. This type of storage engine features low latency, high performance, durability, and high reliability. If the Instance Edition of your instance is Standard (High Write), you can select only Cloud Storage.
    • Local Storage: If this option is selected, the in-sync replicas (ISR) algorithm of open source Apache Kafka is used and three replicas are stored in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the order of the messages may not be preserved in affected partitions. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the order of the messages are preserved in affected partitions. However, messages in the affected partitions cannot be sent until the partitions are restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must set the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka supports the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If the system has sufficient disk space, messages are retained for the maximum retention period. The system considers disk space to be insufficient when the disk usage exceeds 85%. When disk space is insufficient, the system deletes messages starting from the earliest stored message to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. If the keys of different messages are the same, messages that have the latest key values are retained. This policy applies to scenarios in which the system recovers from a failure, or the cache is reloaded after a system restart. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the system status information or configuration information in a log-compacted topic.
      Notice Log-compacted topics are generally used only in specific ecosystem components, such as Kafka Connect or Confluent Schema Registry. Do not use this log cleanup policy for a topic that is used to send and subscribe to messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags to be attached to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Create the group that is required by an AnalyticDB sink connector

In the Message Queue for Apache Kafka console, you can manually create the group that is required by an AnalyticDB sink connector. The name of the group must be in the connect-Task name format. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Groups.
  5. On the Groups page, click Create Group.
  6. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the group, and then click OK.
    After the group is created, it is displayed on the Groups page.

Create and deploy an AnalyticDB sink connector

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click Connectors.
  4. On the Connectors page, click Create Connector.
  5. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • Each connector name must be unique within a Message Queue for Apache Kafka instance.

      The name of the group that is used by the connector task must be in the connect-Task name format. If you have not already created such a group, Message Queue for Apache Kafka automatically creates one for you.

      kafka-adb-sink
      Instance The information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka as the source service, set the parameters that are described in the following table, and then click Next.
      Table 1. Parameters in the Configure Source Service step
      Parameter Description Example
      Data Source Topic The name of the data source topic from which data is to be exported. adb-test-input
      Consumer Thread Concurrency The number of concurrent consumer threads used to export data from the data source topic. Default value: 6. Valid values:
      • 1
      • 2
      • 3
      • 6
      • 12
      6
      Consumer Offset The offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC ID The ID of the VPC where the data export task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the Message Queue for Apache Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch ID The ID of the vSwitch where the data export task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the Message Queue for Apache Kafka instance. The default value is the vSwitch ID that you specified when you deployed the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Failure Handling Policy Specifies whether to retain the subscription to the partition where an error causes a message send failure. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and returns the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and returns the logs.
      Note
      • For information about how to view connector logs, see View connector logs.
      • For more information about how to troubleshoot errors based on error codes, see Error codes.
      • To resume the subscription to the partition where an error occurs, submit a ticket to the technical support of Message Queue for Apache Kafka.
      Continue Subscription
      Resource Creation Method The method to create the topics and group that are required by the AnalyticDB sink connector. Click Configure Runtime Environment to display the parameter. Valid values:
      • Auto
      • Manual
      Auto
      Connector Consumer Group The group that is used by the connector.Group Click Configure Runtime Environment to display the parameter. The name of the group must be in the connect-Task name format. connect-kafka-adb-sink
      Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.
      connect-offset-kafka-adb-sink
      Task Configuration Topic The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.
      connect-config-kafka-adb-sink
      Task Status Topic The topic that is used to store task status. Click Configure Runtime Environment to display the parameter.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to Compact.
      connect-status-kafka-adb-sink
      Dead-letter Queue Topic The topic that is used to store the error data of the Kafka Connect framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-adb-sink
      Error Data Topic The topic that is used to store the error data of the Sink connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-adb-sink
    3. In the Configure Destination Service step, select AnalyticDB as the destination service, set the parameters that are described in the following table, and then click Create.
      Parameter Description Example
      Instance Type The type of the destination database instance. Valid values: AnalyticDB for MySQL and AnalyticDB for PostgreSQL. AnalyticDB for MySQL
      AnalyticDB Instance ID The ID of the destination AnalyticDB for MySQL or AnalyticDB for PostgreSQL instance. am-bp139yqk8u1ik****
      Database Name The name of the destination database. adb_demo
      Table Name The name of the table within the destination database where the exported data is stored. user
      Database Username The username that you use to log on to the destination database. adbmysql
      Database Password The password that you use to log on to the destination database. The password is specified when you create the destination AnalyticDB for MySQL AnalyticDB for PostgreSQL instance. If you forget the password, you can reset it.
      • If you want to reset the password of an AnalyticDB for MySQL database account, perform the steps described in Reset the password of a privileged account.
      • If you want to reset the password of an AnalyticDB for PostgreSQL database account, log on to the AnalyticDB for PostgreSQL console and click the destination instance. In the left-side navigation pane, click Account Management, locate the database account for which you want to reset the password, and click Reset Password in the Actions column.
      ********
      Note The username and password are passed to the functions in Function Compute as environment variables when Message Queue for Apache Kafka creates a data export task. After the task is created, Message Queue for Apache Kafka does not save the username or password.
      After the connector is created, you can view it on the Connectors page.
  6. Go to the Connectors page, find the connector that you created, and click Deploy in the Actions column.

Configure the related Function Compute service

After an AnalyticDB sink connector is created and deployed in the Message Queue for Apache Kafka console, Function Compute automatically creates a function service and a function for the connector. The function service is named in the kafka-service-<connector_name>-<Random string> format, and the function is named in the fc-adb-<Random string> format.

  1. On the Connectors page, locate the connector for which you want to configure the Function Compute service and click Configure Function in the Actions column.
    The page is redirected to the Function Compute console.
  2. In the Function Compute console, find the automatically created service and configure a VPC and vSwitch for the service. For more information, see Update a Service.

Configure AnalyticDB for MySQL or AnalyticDB for PostgreSQL

After the Function Compute service is deployed, you must add the CIDR block for the VPC that you specify in the Function Compute console to the whitelist for the destination AnalyticDB for MySQL or AnalyticDB for PostgreSQL instance. You can view the CIDR block on the vSwitch page of the VPC console. The CIDR block is in the row where the VPC and vSwitch of the Function Compute service reside.

Send test messages

You can send messages to the data source topic of a Message Queue for Apache Kafka instance to check whether data in the topic can be exported to AnalyticDB for MySQL or AnalyticDB for PostgreSQL.
Note The value of the Message Content parameter must be in the JSON format and will be parsed to key-value pairs. The keys are the column names of the destination database table and the values are the data in the columns. Therefore, make sure that each key of the message content has a corresponding column name in the destination database table.Message Queue for Apache Kafka You can log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console and connect to the destination database to check the column names of the destination database table.
  1. On the Connectors page, find the connector that you created, and click Test in the Actions column.
  2. In the Send Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        • If you want to send the test message to a specific partition, click Yes and enter the partition ID, such as 0, in the Partition ID field. For more information about how to query partition IDs, see View partition status.
        • If you do not want to send the test message to a specific partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.

Verify the data export result

After you send messages to the data source topic of a Message Queue for Apache Kafka instance, log on to the AnalyticDB for MySQL console or the AnalyticDB for PostgreSQL console, and connect to the destination database.On the SQLConsole command window of the Data Management Service 5.0 console, click the destination table to check whether the data in the data source topic is exported successfully.

The following figure shows the result of a data export task from Message Queue for Apache Kafka to AnalyticDB for MySQL:ADB-Connector-Result