All Products
Search
Document Center

ApsaraMQ for Kafka:Create MaxCompute sink connectors

Last Updated:Jun 25, 2024

This topic describes how to create a MaxCompute sink connector to export data from an ApsaraMQ for Kafka topic to a MaxCompute table.

Prerequisites

For information about the prerequisites, see Prerequisites.

Usage notes

If you want to use the partition feature of MaxCompute, you must create an additional partition key column whose name is time and type is string when you create a table.

Step 1: Create MaxCompute resources

Create a table on the MaxCompute client. For more information, see Create a table.

In this example, a table named kafka_to_maxcompute is created. The table contains three columns, and the partition feature is enabled. The following code shows the statement that is executed to create the table:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);

The following code shows the statement that is executed to create the table if the partition feature is disabled:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);

If the statement is executed, the following result is displayed.执行成果

On the Tables page, view the information about the created table.表

Step 2: Create and start a MaxCompute sink connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. In the Create Task page, configure the Task Name and Description parameters. Then, follow the on-screen instructions to configure other parameters.

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Message Queue for Apache Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Region

        The region where the ApsaraMQ for Kafka instance resides.

        China (Hangzhou)

        Message Queue for Apache Kafka Instance

        The ID of the ApsaraMQ for Kafka instance in which the data that you want to route are produced.

        alikafka_post-cn-9hdsbdhd****

        Topic

        The topic on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.

        guide-sink-topic

        Group ID

        The ID of the group on the ApsaraMQ for Kafka instance in which the data that you want to route are produced.

        • Quickly Create: The system automatically creates a group whose ID is in the GID_EVENTBRIDGE_xxx format.

        • Use Existing Group: Select the ID of an existing group that is not being used. If you select an existing group that is being used, the publishing and subscription of existing messages are affected.

        Use Existing Group

        Consumer Offset

        • Latest Offset: Messages are consumed from the latest offset.

        • Earliest Offset: Messages are consumed from the earliest offset.

        Latest Offset

        Network Configuration

        If cross-border data transmission is required, select Internet. In other cases, select Basic Network.

        Basic Network

        Data Format

        The data format feature is used to encode binary data delivered from the source into a specific data format. Multiple data formats are supported. If you do not have special requirements on encoding, specify Json as the value.

        • Json: Binary data is encoded into JSON-formatted data based on UTF-8 encoding and then put into the payload. This is the default value.

        • Text: Binary data is encoded into strings based on UTF-8 encoding and then put into the payload.

        • Binary: Binary data is encoded into strings based on Base64 encoding and then put into the payload.

        Json

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        2000

        Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.

        3

      2. In the Filtering step, define a data pattern to filter data. For more information, see Message filtering.

      3. In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.

      4. In the Sink step, set the Service Type parameter to MaxCompute acs.maxcompute and then follow the on-screen instructions to configure other parameters. The following table describes the parameters.

        Parameter

        Description

        Example

        AccessKey ID

        The AccessKey ID of your Alibaba Cloud account. The AccessKey ID is used to access MaxCompute.

        LTAI5tHPVCZywsoEVvFu****

        AccessKey Secret

        The AccessKey secret of your Alibaba Cloud account.

        4RAKUQpZtUntDgvoKu0tvrkrOM****

        MaxCompute Project Name

        The MaxCompute project that you created.

        test_compute

        MaxCompute Table Name

        The MaxCompute table that you created.

        kafka_to_maxcompute

        MaxCompute Table Input Parameter

        After you select the MaxCompute table, the column name and type of the table are displayed here. You need to configure only the Value Extraction Rule parameter. The following code shows how to configure the value extraction rule of a message. In this example, the value of the topic column is extracted from the topic field of the message. Therefore, the Value Extraction Rule parameter is set to $.topic.

        {
          'data': {
            'topic': 't_test',
            'partition': 2,
            'offset': 1,
            'timestamp': 1717048990499,
            'headers': {
              'headers': [],
              'isReadOnly': False
            },
            'key': 'MaxCompute-K1',
            'value': 'MaxCompute-V1'
          },
          'id': '9b05fc19-9838-4990-bb49-ddb942307d3f-2-1',
          'source': 'acs:alikafka',
          'specversion': '1.0',
          'type': 'alikafka:Topic:Message',
          'datacontenttype': 'application/json; charset=utf-8',
          'time': '2024-05-30T06:03:10.499Z',
          'aliyunaccountid': '1413397765616316'
        }

        topic: $.data.topic

        valuename: $.data.value

        valueage: $.data.offset

        Partitioning

        • Enable

        • Disable

        Enable

        Partition Dimension

        This parameter is required only if the Partitioning parameter is set to Enable.

        • Time variables {yyyy}, {MM}, {dd}, {HH}, and {mm} are supported. These variables correspond to year, month, day, hour, and minute, respectively. Time variables are case-sensitive.

        • Constants are supported.

        {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix

        Network Settings

        • VPC: Messages in ApsaraMQ for Kafka are delivered to MaxCompute in a virtual private cloud (VPC).

        • Internet: Messages in ApsaraMQ for Kafka are delivered to MaxCompute over the Internet.

        Internet

        VPC

        The VPC ID. This parameter is required only if you set the Network Settings parameter to VPC.

        vpc-bp17fapfdj0dwzjkd****

        vSwitch

        The vSwitch ID. This parameter is required only if you set the Network Settings parameter to VPC.

        vsw-bp1gbjhj53hdjdkg****

        Security Group

        The security group ID. This parameter is required only if you set the Network Settings parameter to VPC.

        test_group

    • Task Property

      Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.

  5. Click Save. On the Tasks page, find the MaxCompute sink connector that you created. When the status in the Status column changes from Starting to Running, the connector is created.

Step 3: Test the MaxCompute sink connector

  1. On the Tasks page, find the MaxCompute sink connector that you created and click the name of the source topic in the Event Source column.

  2. On the Topic Details page, click Send Message.
  3. In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.

    发送消息

  4. Go to the MaxCompute console and execute the following SQL statement to query the information about the partition.

    show PARTITIONS kafka_to_maxcompute;

    The following result is returned.分区

  5. Execute the following statement based on the partition information to query data in the partition:

    SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";

    The following result is returned.分区内数据