This article describes how to create a MaxCompute sink connector to synchronize data from a data source topic of a Message Queue for Apache Kafka instance to a MaxCompute table.

Prerequisites

Before you start, make sure that the following requirements are met:
  • The connector feature is enabled for your Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
  • A data source topic is created for the Message Queue for Apache Kafka instance. For more information, see Step 1: Create a topic.

    A topic named maxcompute-test-input is created in this example.

  • A MaxCompute table is created from the MaxCompute client. For more information, see Create and view a table.

    A MaxCompute table named test_kafka is created in a project named connector_test in this example. You can execute the following statement to create a MaxCompute table named test_kafka:

    CREATE TABLE IF NOT EXISTS test_kafka(topic STRING,partition BIGINT,offset BIGINT,key STRING,value STRING) PARTITIONED by (pt STRING);

Procedure

To synchronize data from a data source topic of a Message Queue for Apache Kafka instance to a MaxCompute table by using a MaxCompute sink connector, perform the following steps:

  1. Grant Message Queue for Apache Kafka the permissions to access MaxCompute.
  2. Optional:Create the topics and consumer groups that are required by the MaxCompute sink connector.

    If you do not want to specify the names of the topics and consumer groups, skip this step and select Automatically in the next step.

    Notice Some topics require an on-premises storage engine. If the major version of your Message Queue for Apache Kafka instance is 0.10.2, you cannot manually create topics that use an on-premises storage engine. In major version 0.10.2, these topics must be automatically created.
    1. Optional:Create the topics that are required by the MaxCompute sink connector
    2. Optional:Create the consumer groups that are required by the MaxCompute sink connector
  3. Create and deploy a MaxCompute sink connector
  4. Verify the result.
    1. Send test messages
    2. View data in the MaxCompute table

Create a RAM role

You cannot select Message Queue for Apache Kafka as the trusted service of a RAM role. When you create the RAM role, select a supported service as the trusted service. Then, manually modify the trust policy of the created RAM role.

  1. Log on to the RAM console.
  2. In the left-side navigation pane, click RAM Roles.
  3. On the RAM Roles page, click Create RAM Role.
  4. In the Create RAM Role panel, perform the following operations:
    1. Set the Trusted entity type parameter to Alibaba Cloud Service and click Next.
    2. Set the Role Type parameter to Normal Service Role. In the RAM Role Name field, enter AliyunKafkaMaxComputeUser1. From the Select Trusted Service drop-down list, select MaxCompute. Then, click OK.
  5. On the RAM Roles page, find the AliyunKafkaMaxComputeUser1 RAM role and click the role name.
  6. On the AliyunKafkaMaxComputeUser1 page, click the Trust Policy Management tab, and then click Edit Trust Policy.
  7. In the Edit Trust Policy panel, replace odps in the script with alikafka and click OK.
    pg_ram

Grant permissions

To enable the MaxCompute sink connector to synchronize messages to a MaxCompute table, you must grant at least the following permissions to the RAM role.

Object Action Description
Project CreateInstance The permissions to create instances in projects.
Table Describe The permissions to read the metadata of tables.
Table Alter The permissions to modify the metadata of tables and the permissions to create and delete partitions.
Table Update The permissions to overwrite data in tables and insert data to tables.

For more information about the preceding permissions and how to grant these permissions, see Authorize users.

To grant the required permissions to the AliyunKafkaMaxComputeUser1 RAM role, perform the following steps:

  1. Log on to the MaxCompute client.
  2. Run the following command to add the AliyunKafkaMaxComputeUser1 RAM role as a RAM user:
    add user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
    Note Replace <accountid> with the ID of your Alibaba Cloud account.
  3. Grant the RAM user the minimum permissions that are required to access MaxComupte.
    1. Run the following command to grant the RAM user the permissions on projects:
      grant CreateInstance on project connector_test to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      Note Replace <accountid> with the ID of your Alibaba Cloud account.
    2. Run the following command to grant the RAM user the permissions on tables:
      grant Describe, Alter, Update on table test_kafka to user `RAM$<accountid>:role/aliyunkafkamaxcomputeuser1`;
      Note Replace <accountid> with the ID of your Alibaba Cloud account.

Create the topics that are required by the MaxCompute sink connector

You can log on to the Message Queue for Apache Kafka console and manually create the topics that are required by the MaxCompute sink connector.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the top navigation bar, select the region where your instance is located.
  3. In the left-side navigation pane, click Instances.
  4. On the Instances page, click the name of the instance that you want to manage.
  5. In the left-side navigation pane, click Topics.
  6. On the Topics page, click Create Topic.
  7. In the Create Topic panel, set the properties of a topic and click Create.
    Topic Description
    Task site topic The topic that is used to store consumer offsets.
    • Topic: the name of the topic. We recommend that you start the name with connect-offset.
    • Partitions: the number of partitions in the topic. The value must be greater than 1.
    • Storage Engine: the storage engine of the topic. Select Local Storage.
    • cleanup.policy: the log cleanup policy for the topic. Select compact.
    Task configuration topic The topic that is used to store task configurations.
    • Topic: the name of the topic. We recommend that you start the name with connect-config.
    • Partitions: the number of partitions in the topic. Set this parameter to 1.
    • Storage Engine: the storage engine of the topic. Select Local Storage.
    • cleanup.policy: the log cleanup policy for the topic. Select compact.
    Task status topic The topic that is used to store task statuses.
    • Topic: the name of the topic. We recommend that you start the name with connect-status.
    • Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.
    • Storage Engine: the storage engine of the topic. Select Local Storage.
    • cleanup.policy: the log cleanup policy for the topic. Select compact.
    Dead-letter queue topic The topic that is used to store the abnormal data of the connector framework. To save topic resources, you can create a topic as both the dead-letter queue topic and the abnormal data topic.
    • Topic: the name of the topic. We recommend that you start the name with connect-error.
    • Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.
    • Storage Engine: the storage engine of the topic. Valid values: Local Storage and Cloud Storage.
    Abnormal data topic The topic that is used to store the abnormal data of the sink connector. To save topic resources, you can create a topic as both the dead-letter queue topic and the abnormal data topic.
    • Topic: the name of the topic. We recommend that you start the name with connect-error.
    • Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.
    • Storage Engine: the storage engine of the topic. Valid values: Local Storage and Cloud Storage.

Create the consumer groups that are required by the MaxCompute sink connector

You can log on to the Message Queue for Apache Kafka console and manually create the consumer groups that are required by the MaxCompute sink connector.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the top navigation bar, select the region where your instance is located.
  3. In the left-side navigation pane, click Instances.
  4. On the Instances page, click the name of the instance that you want to manage.
  5. In the left-side navigation pane, click Consumer Groups.
  6. On the Consumer Groups page, click Create Consumer Group.
  7. In the Create Consumer Group panel, set the properties of a consumer group and click Create.
    Consumer group Description
    Connector task consumer group The consumer group that is used by the data synchronization task of the connector. The name of this consumer group must be in the connect-Task name format.
    Connector consumer group The consumer group that is used by the connector. We recommend that you start the name of this consumer group with connect-cluster.

Create and deploy a MaxCompute sink connector

To create and deploy a MaxCompute sink connector that is used to synchronize data from Message Queue for Apache Kafka to the MaxCompute sink connector, perform the following operations:

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the top navigation bar, select the region where your instance is located.
  3. In the left-side navigation pane, click Instances.
  4. On the Instances page, click the name of the instance that you want to manage.
  5. In the left-side navigation pane, click Connector (Public Preview).
  6. On the Connector (Public Preview) page, click Create Connector.
  7. In the Create Connector panel, perform the following steps:
    1. In the Enter Basic Information step, enter a connector name in the Connector Name field, select Message Queue for Apache Kafka from the Dump Path drop-down list and MaxCompute from the Dump To drop-down list, and then click Next.
      Parameter Description Example
      Connector Name The name of the connector. Take note of the following rules when you set a connector name:
      • The connector name must be 1 to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • Each connector name must be unique for the same Message Queue for Apache Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not created such a consumer group, the system automatically creates a consumer group for you.

      kafka-maxcompute-sink
      Task Type The type of the data synchronization task of the connector. In this example, the task synchronizes data from Message Queue for Apache Kafka to MaxCompute. For more information about task types, see Types of connectors. KAFKA2ODPS
    2. In the Configure Source Instance step, enter the name of the data source topic in the Data Source Topic field, select a consumer offset from the Consumer Offset drop-down list, select Automatically or Manually for the Create Resource parameter, and then click Next. If you select Manually, enter the names of the topics and consumer group that you created.
      Parameter Description Example
      VPC ID The ID of the virtual private cloud (VPC) where the data synchronization task runs. The default value is the ID of the VPC where the Message Queue for Apache Kafka instance is deployed. You cannot change the value. vpc-bp1xpdnd3l***
      VSwitch The ID of the vSwitch based on which the data synchronization task runs. The vSwitch must be in the same VPC as the Message Queue for Apache Kafka instance. The default value is the ID of the vSwitch that you have specified for the Message Queue for Apache Kafka instance. vsw-bp1d2jgg81***
      Data Source Topic The name of the topic from which data is to be synchronized. maxcompute-test-input
      Consumer Offset The offset where consumption starts. Valid values:
      • latest: Consumption starts from the latest offset.
      • earliest: Consumption starts from the initial offset.
      latest
      Connector consumer group The consumer group that is used by the connector. We recommend that you start the name of this consumer group with connect-cluster. connect-cluster-kafka-maxcompute-sink
      Task site Topic The topic that is used to store consumer offsets.
      • Topic: We recommend that you start the topic name with connect-offset.
      • Partitions: The number of partitions in the topic must be greater than 1.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-offset-kafka-maxcompute-sink
      Task configuration Topic The topic that is used to store task configurations.
      • Topic: We recommend that you start the topic name with connect-config.
      • Partitions: The topic can contain only one partition.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-config-kafka-maxcompute-sink
      Task status Topic The topic that is used to store task statuses.
      • Topic: We recommend that you start the topic name with connect-status.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic must be set to Local Storage.
      • cleanup.policy: The log cleanup policy for the topic must be set to compact.
      connect-status-kafka-maxcompute-sink
      Dead letter queue Topic The topic that is used to store the abnormal data of the connector framework. To save topic resources, you can create a topic as both the dead-letter queue topic and the abnormal data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-maxcompute-sink
      Abnormal Data Topic The topic that is used to store the abnormal data of the sink connector. To save topic resources, you can create a topic as both the dead-letter queue topic and the abnormal data topic.
      • Topic: We recommend that you start the topic name with connect-error.
      • Partitions: We recommend that you set the number of partitions in the topic to 6.
      • Storage Engine: The storage engine of the topic can be set to Local Storage or Cloud Storage.
      connect-error-kafka-maxcompute-sink
    3. In the Configure Destination Instance step, set the parameters for MaxCompute and click Next.
      Parameter Description Example
      MaxCompute Endpoint The endpoint of MaxCompute. For more information, see Configure endpoints.
      • VPC endpoint: We recommend that you use the VPC endpoint because it has lower latency. The VPC endpoint can be used when the Message Queue for Apache Kafka instance and MaxCompute project are created in the same region.
      • Public endpoint: We recommend that you do not use the public endpoint because it has higher latency. The public endpoint can be used when the Message Queue for Apache Kafka instance and the MaxCompute project are created in different regions. To use the public endpoint, enable Internet access for the connector. For more information, see Enable Internet access for a connector.
      http://service.cn-hangzhou.maxcompute.aliyun-inc.com/api
      MaxCompute Workspace The name of the MaxCompute project to which you want to synchronize data. connector_test
      MaxCompute Table The name of the MaxCompute table to which you want to synchronize data. test_kafka
      Region of MaxCompute Table The region where the MaxCompute table is created. China (Hangzhou)
      Alibaba Cloud Account ID The ID of the Alibaba Cloud account that is used to access MaxCompute. 188***
      RAM Role The name of the RAM role that is assumed by Message Queue for Apache Kafka . For more information, see Create a RAM role. AliyunKafkaMaxComputeUser1
      Mode The mode in which messages are synchronized to the MaxCompute sink connector. The default value is DEFAULT. Valid values:
      • KEY: Only the keys of messages are retained and written into the Key column of the MaxCompute table.
      • VALUE: Only the values of messages are retained and written into the Value column of the MaxCompute table.
      • DEFAULT: Both keys and values of messages are retained and written into the Key and Value columns of the MaxCompute table.
        Notice In DEFAULT mode, the CSV format is not supported. You can select only the TEXT and BINARY formats.
      DEFAULT
      Format The format in which messages are synchronized to the MaxCompute sink connector. The default value is TEXT. Valid values:
      • TEXT: strings.
      • BINARY: byte arrays.
      • CSV: strings separated with commas (,).
        Notice If you select the CSV format, the DEFAULT mode is not supported. Only the KEY and VALUE modes are supported.
        • KEY mode: Only the keys of messages are retained. Keys are separated with commas (,) and then written into the MaxCompute table in the order of indexes.
        • VALUE mode: Only the values of messages are retained. Values are separated with commas (,) and then written into the MaxCompute table in the order of indexes.
      TEXT
      Partition The granularity level at which partitions are created. The default value is HOUR. Valid values:
      • DAY: writes data into a new partition every day.
      • HOUR: writes data into a new partition every hour.
      • MINUTE: writes data into a new partition every minute.
      HOUR
      Time Zone The time zone of the Message Queue for Apache Kafka producer client that sends messages to the data source topic of the MaxCompute sink connector. The default value is GMT 08:00. GMT 08:00
    4. In the Preview/Submit step, confirm the configurations of the connector and click Submit.
  8. In the Create Connector panel, click Deploy.

Send test messages

After you deploy the MaxCompute sink connector, you can send messages to the data source topic in Message Queue for Apache Kafka to test whether the data can be synchronized to MaxCompute.

  1. On the Connector (Public Preview) page, find the connector that you created and click Test in the Actions column.
  2. On the Topics page, select your instance, find the maxcompute-test-input topic, and then choose More > Send Message in the Actions column.
  3. In the Send Message panel, set the parameters used to send a test message.
    1. In the Partitions field, enter 0.
    2. In the Message Key field, enter 1.
    3. In the Message Value field, enter 1.
    4. Click Send.

View data in the MaxCompute table

After you send messages to the data source topic in Message Queue for Apache Kafka , you can log on to the MaxCompute client and check whether the messages are received.

To view the test_kafka table, perform the following steps:

  1. Log on to the MaxCompute client.
  2. Run the following command to view the partitions of the table:
    show partitions test_kafka;
    The following result is returned in this example:
    pt=11-17-2020 15
    
    OK
  3. Run the following command to view the data stored in the partitions:
    select * from test_kafka where pt ="11-17-2020 14";
    The following result is returned in this example:
    +-----------------------+------------+------------+-----+-------+---------------+
    | topic                 | partition  | offset     | key | value | pt            |
    +-----------------------+------------+------------+-----+-------+---------------+
    | maxcompute-test-input | 0          | 0          | 1   | 1     | 11-17-2020 14 |
    +-----------------------+------------+------------+-----+-------+---------------+