This topic describes how to create a MaxCompute sink connector to synchronize data from a data source topic in a Message Queue for Apache Kafka instance to a MaxCompute table.

Prerequisites

Before you create a MaxCompute sink connector, make sure that the following operations are completed:
  • The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
  • A data source topic is created in the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.

    A topic named maxcompute-test-input is used in this example.

  • A MaxCompute table is created by using the MaxCompute client. For more information, see Create tables.

    In this example, a MaxCompute table named test_kafka is created in a project named connector_test. You can execute the following statement to create a MaxCompute table named test_kafka:

    CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);

Procedure

To synchronize data from a data source topic in a Message Queue for Apache Kafka instance to a MaxCompute table by using a MaxCompute sink connector, perform the following steps:

  1. Grant Message Queue for Apache Kafka the permissions to access MaxCompute.
  2. Optional:Create the topics and consumer groups that are required by the MaxCompute sink connector.

    If you do not want to customize the names of the topics and consumer groups, skip this step and select Automatically in the next step.

    Notice Some topics that are required by a MaxCompute sink connector must use a local storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, you cannot manually create topics that use a local storage engine. In major version 0.10.2, these topics must be automatically created.
    1. Create the topics that are required by the MaxCompute sink connector
    2. Create consumer groups that are required by a MaxCompute sink connector
  3. Create and deploy a MaxCompute sink connector
  4. Verify the results.
    1. Send a test message
    2. View data in the MaxCompute table

Create a RAM role

When you create a Resource Access Management (RAM) role, select a supported Alibaba Cloud service as the trusted service. You cannot select Message Queue for Apache Kafka as the trusted service of the RAM role. Then, manually modify the trust policy of the created RAM role.

  1. Log on to the RAM console.
  2. In the left-side navigation pane, click RAM Roles.
  3. On the RAM Roles page, click Create RAM Role.
  4. In the Create RAM Role panel, perform the following operations:
    1. Set Trusted entity type to Alibaba Cloud Service and click Next.
    2. Set Role Type to Normal Service Role. In the RAM Role Name field, enter AliyunKafkaMaxComputeUser1. From the Select Trusted Service drop-down list, select MaxCompute. Then, click OK.
  5. On the RAM Roles page, find and click AliyunKafkaMaxComputeUser1.
  6. On the AliyunKafkaMaxComputeUser1 page, click the Trust Policy Management tab, and click Edit Trust Policy.
  7. In the Edit Trust Policy panel, replace fc in the script with alikafka and click OK.
    pg_ram

Grant permissions

To enable the MaxCompute sink connector to synchronize messages to a MaxCompute table, you must grant at least the following permissions to the RAM role.

Object Action Description
Project CreateInstance The permissions to create instances in projects.
Table Describe The permissions to read the metadata of tables.
Table Alter The permissions to modify the metadata of tables or create and delete partitions.
Table Update The permissions to overwrite data in tables or insert data to tables.

For more information about the preceding permissions and how to grant these permissions, see Authorize users.

To grant the required permissions to AliyunKafkaMaxComputeUser1, perform the following steps:

  1. Log on to the MaxCompute client.
  2. Run the following command to add the AliyunKafkaMaxComputeUser1 RAM role as a RAM user:
    add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
    Note Replace <accountid> with the ID of your Alibaba Cloud account.
  3. Grant the RAM user the minimum permissions that are required to access MaxComupte.
    1. Run the following command to grant the RAM user the permissions on the connector_test project:
      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      Note Replace <accountid> with the ID of your Alibaba Cloud account.
    2. Run the following command to grant the RAM user the permissions on the test_kafka table:
      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      Note Replace <accountid> with the ID of your Alibaba Cloud account.

Create the topics that are required by the MaxCompute sink connector

In the Message Queue for Apache Kafka console, you can manually create the five topics that the MaxCompute sink connector requires. The five topics include the offset storage topic, task configuration topic, task status topic, dead-letter queue topic, and error data topic. The five topics differ in the partition count and storage engine. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
    Notice You must create a topic in the region where your application resides. This means that you must select the region where your Elastic Compute Service (ECS) instance is deployed. A topic cannot be used across regions. For example, if a topic is created in the China (Beijing) region, the message producer and consumer must run on ECS instances in the China (Beijing) region.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Topics.
  5. On the Topics page, click Create Topic.
  6. In the Create Topic panel, set the properties of the topic and click OK.
    Create a topic
    Parameter Description Example
    Name The name of the topic. demo
    Description The description of the topic. demo test
    Partitions The number of partitions in the topic. 12
    Storage Engine The storage engine of the topic.

    Message Queue for Apache Kafka supports the following storage engines:

    • Cloud Storage: If this option is selected, disks provided by Alibaba Cloud are used and three replicas are stored in distributed mode. This type of storage engine features low latency, high performance, persistence, and high durability. If the disk type of your instance is Standard (High Write), you can select only Cloud Storage.
    • Local Storage: If this option is selected, the in-sync replicas (ISR) algorithm of open source Apache Kafka is used and three replicas are stored in distributed mode.
    Cloud Storage
    Message Type The message type of the topic.
    • Normal Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages may be out of order. If you set the Storage Engine parameter to Cloud Storage, this parameter is automatically set to Normal Message.
    • Partitionally Ordered Message: By default, messages of the same key are stored in the same partition in the order in which they are sent. When a broker in the cluster fails, the messages are stilled stored in the same partition in the order they are sent. However, specific messages in the partition cannot be sent until the partition is restored. If you set the Storage Engine parameter to Local Storage, this parameter is automatically set to Partitionally Ordered Message.
    Normal Message
    Log Cleanup Policy The log cleanup policy for the topic.

    If you set the Storage Engine parameter to Local Storage, you must set the Log Cleanup Policy parameter.

    Message Queue for Apache Kafka supports the following log cleanup policies:

    • Delete: The default log cleanup policy is used. If the remaining disk space is sufficient, messages are retained for the maximum retention period. If disk usage exceeds 85%, the disk space is insufficient, and earlier messages are deleted to ensure service availability.
    • Compact: The Apache Kafka log compaction policy is used. If the keys of different messages are the same, messages that have the latest key values are retained. This policy applies to scenarios in which the system is recovered from a system failure or the cache is reloaded after a system restart. For example, when you use Kafka Connect or Confluent Schema Registry, you must store the system status information or configuration information in a log-compacted topic.
      Notice Log-compacted topics are generally used only in specific ecosystem components, such as Kafka Connect or Confluent Schema Registry. Do not use this log cleanup policy for a topic that is used to send and subscribe to messages in other components. For more information, see Message Queue for Apache Kafka demos.
    Compact
    Tag The tags to be attached to the topic. demo
    After the topic is created, it is displayed on the Topics page.

Create consumer groups that are required by a MaxCompute sink connector

In the Message Queue for Apache Kafka console, you can manually create the consumer group that is required by a MaxCompute sink connector. The name of the consumer group must be in the connect-Task name format. For more information, see Table 1.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Groups.
  5. On the Groups page, click Create Group.
  6. In the Create Group panel, enter the group name in the Group ID field and the group description in the Description field, attach tags to the consumer group, and then click OK.
    After the consumer group is created, it is displayed on the Groups page.

Create and deploy a MaxCompute sink connector

To create and deploy a MaxCompute sink connector that is used to synchronize data from Message Queue for Apache Kafka to MaxCompute, perform the following steps:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Connectors.
  5. On the Connectors page, click Create Connector.
  6. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, set the parameters that are described in the following table and click Next.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name can be up to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • The connector name must be unique within the Message Queue for Apache Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not manually created such a consumer group, the system automatically creates a consumer group for you.

      kafka-maxcompute-sink
      Instance The default configuration is the name and instance ID of the instance. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select Message Queue for Apache Kafka for Data Source, set the parameters that are described in the following table, and then click Next.
      Note If you have created the required topics and consumer groups, set Resource Creation Method to Manual and enter the information about the created resources. Otherwise, set Resource Creation Method to Auto.
      Table 1. Parameters in the Configure Source Service step
      Parameter Description Example
      Data Source Topic The name of the topic from which data is to be synchronized. maxcompute-test-input
      Consumer Thread Concurrency The number of concurrent consumer threads to synchronize data from the data source topic. Default value: 6. Valid values:
      • 6
      • 12
      6
      Consumer Offset The offset where consumption starts. Valid values:
      • Earliest Offset: Consumption starts from the earliest offset.
      • Latest Offset: Consumption starts from the latest offset.
      Earliest Offset
      VPC ID The ID of the virtual private cloud (VPC) where the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The default value is the VPC ID that you specified when you deployed the Message Queue for Apache Kafka instance. You do not need to change the value. vpc-bp1xpdnd3l***
      vSwitch ID The ID of the vSwitch based on which the data synchronization task runs. Click Configure Runtime Environment to display the parameter. The vSwitch must be deployed in the same VPC as the Message Queue for Apache Kafka instance. The default value is the vSwitch ID that you specified when you deployed the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Failure Handling Policy Specifies whether to retain the subscription to the partition where an error occurs after the message fails to be sent. Click Configure Runtime Environment to display the parameter. Valid values:
      • Continue Subscription: retains the subscription to the partition where an error occurs and prints the logs.
      • Stop Subscription: stops the subscription to the partition where an error occurs and prints the logs.
      Note
      • For more information about how to view the connector logs, see View connector logs.
      • For more information about how to troubleshoot errors based on error codes, see Error codes.
      • To resume the subscription to the partition where an error occurs, submit a ticket to the technical support of Message Queue for Apache Kafka .
      Continue Subscription
      Resource Creation Method The method of creating the topics and consumer group that are required by the MaxCompute sink connector. Click Configure Runtime Environment to display the parameter.
      • Auto
      • Manual
      Auto
      Connector Consumer Group The consumer group that is used by the data synchronization task of the connector. Click Configure Runtime Environment to display the parameter. The name of this consumer group must be in the connect-Task name format. connect-kafka-maxcompute-sink
      Task Offset Topic The topic that is used to store consumer offsets. Click Configure Runtime Environment to display the parameter.
      • Topic: the name of the topic. We recommend that you start the name with connect-offset.
      • Partitions: the number of partitions in the topic. Set the parameter to a value greater than 1.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to compact.
      connect-offset-kafka-maxcompute-sink
      Task Configuration Topic The topic that is used to store task configurations. Click Configure Runtime Environment to display the parameter.
      • Topic: the name of the topic. We recommend that you start the name with connect-config.
      • Partitions: the number of partitions in the topic. Set the value to 1.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to compact.
      connect-config-kafka-maxcompute-sink
      Task Status Topic The topic that is used to store task status. Click Configure Runtime Environment to display the parameter.
      • Topic: the name of the topic. We recommend that you start the name with connect-status.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
      • cleanup.policy: the log cleanup policy for the topic. Set the value to compact.
      connect-status-kafka-maxcompute-sink
      Dead-letter Queue Topic The topic that is used to store the error data of the connector framework. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: the name of the topic. We recommend that you start the name with connect-error.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Valid values: Local Storage and Cloud Storage.
      connect-error-kafka-maxcompute-sink
      Error Data Topic The topic that is used to store the error data of the connector. Click Configure Runtime Environment to display the parameter. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
      • Topic: the name of the topic. We recommend that you start the name with connect-error.
      • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
      • Storage Engine: the storage engine of the topic. Valid values: Local Storage and Cloud Storage.
      connect-error-kafka-maxcompute-sink
    3. In the Configure Destination Service step, select MaxCompute for Destination Service, set the parameters that are described in the following table, and then click Create.
      Parameter Description Example
      Endpoint The endpoint of MaxCompute. For more information, see Configure endpoints.
      • VPC endpoint: We recommend that you use the VPC endpoint because it has lower latency. The VPC endpoint can be used if the Message Queue for Apache Kafka instance and the MaxCompute project are in the same region.
      • Public endpoint: We recommend that you do not use the public endpoint because it has high latency. The public endpoint can be used if the Message Queue for Apache Kafka instance and the MaxCompute project are in different regions. To use the public endpoint, you must enable Internet access for the connector. For more information, see Enable Internet access for a connector.
      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
      Workspace The name of the MaxCompute project to which you want to synchronize data. connector_test
      Table The name of the MaxCompute table to which you want to synchronize data. test_kafka
      Region for Table The region where the MaxCompute table is created. China (Hangzhou)
      Alibaba Cloud Account ID The ID of the Alibaba Cloud account that is used to access MaxCompute. 188***
      RAM Role The name of the RAM role that is assumed by Message Queue for Apache Kafka . For more information, see Create a RAM role. AliyunKafkaMaxComputeUser1
      Mode The mode in which messages are synchronized to the MaxCompute sink connector. Default value: DEFAULT. Valid values:
      • KEY: Only the keys of messages are retained and written into the Key column of the MaxCompute table.
      • VALUE: Only the values of messages are retained and written into the Value column of the MaxCompute table.
      • DEFAULT: Both keys and values of messages are retained and written into the Key and Value columns of the MaxCompute table.
        Notice In DEFAULT mode, the CSV format is not supported. You can select only the TEXT and BINARY formats.
      DEFAULT
      Format The format in which messages are synchronized to the MaxCompute sink connector. Default value: TEXT. Valid values:
      • TEXT: strings
      • BINARY: byte arrays
      • CSV: strings separated with commas (,)
        Notice If you set the parameter to CSV, the DEFAULT mode is not supported. Only the KEY and VALUE modes are supported.
        • KEY mode: Only the keys of messages are retained. Keys are separated with commas (,) and then written into the MaxCompute table in the order of indexes.
        • VALUE mode: Only the values of messages are retained. Values are separated with commas (,) and then written into the MaxCompute table in the order of indexes.
      TEXT
      Partition The granularity at which partitions are created. Default value: HOUR. Valid values:
      • DAY: writes data into a new partition every day.
      • HOUR: writes data into a new partition every hour.
      • MINUTE: writes data into a new partition every minute.
      HOUR
      Time Zone The time zone of the Message Queue for Apache Kafka producer client that sends messages to the data source topic of the MaxCompute sink connector. Default value: GMT 08:00. GMT 08:00
      After the connector is created, you can view it on the Connectors page.
  7. After the connector is created, go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.

Send a test message

After you deploy the MaxCompute sink connector, you can send a message to the data source topic in Message Queue for Apache Kafka to test whether the message can be synchronized to MaxCompute.

  1. On the Connectors page, find the connector that you created, and click Test in the Actions column.
  2. In the Send Message panel, set the parameters or use the method as prompted to send a test message.
    • Set the Method of Sending parameter to Console.
      1. In the Message Key field, enter the key of the test message, such as demo.
      2. In the Message Content field, enter the content of the test message, such as {"key": "test"}.
      3. Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
        1. If you want to send the test message to a specific partition, click Yes and enter the partition ID in the Partition ID field, such as 0. For more information about how to query partition IDs, see View partition status.
        2. If you do not want to send the test message to a specific partition, click No.
    • Set the Method of Sending parameter to Docker and run the docker commands provided in the Run the Docker container to produce a sample message section to send the test message.
    • Set the Method of Sending parameter to SDK, select a programming language or a framework, and then select an access method to use the corresponding SDK to send the test message.

View data in the MaxCompute table

After you send a message to the data source topic in Message Queue for Apache Kafka , you can log on to the MaxCompute client and check whether the message is received.

To view the test_kafka table, perform the following steps:

  1. Log on to the MaxCompute client.
  2. Run the following command to view the partitions of the table:
    show partitions test_kafka;
    The following result is returned in this example:
    pt=11-17-2020 15
    
    OK
  3. Run the following command to view the data stored in the partitions:
    select * from test_kafka where pt ="11-17-2020 14";
    The following result is returned in this example:
    +----------------------+------------+------------+-----+-------+---------------+
    | topic                | partition  | offset     | key | value | pt            |
    +----------------------+------------+------------+-----+-------+---------------+
    | maxcompute-test-input| 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +----------------------+------------+------------+-----+-------+---------------+