This topic describes how to create a MaxCompute sink connector to synchronize data from a source topic in your ApsaraMQ for Kafka instance to a table in MaxCompute.

Prerequisites

For information about the prerequisites, see Prerequisites.

Usage notes

If you want to use the partition feature of MaxCompute, you must create an additional partition key column whose name is time and type is string when you create a table.

Step 1: Create MaxCompute resources

Create a table on the MaxCompute client. For more information, see Create tables.
In this example, a table named kafka_to_maxcompute is created. The table contains three columns, and the partition feature is enabled. The following code shows the statement that is executed to create the table:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT) PARTITIONED by (time STRING);
The following code shows the statement that is executed to create the table if the partition feature is disabled:
CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING,valueName STRING,valueAge BIGINT);
If the statement is successfully executed, the following result is displayed.Execution result
On the Table Management page, view the information about the created table. Table

Step 2: Create and start a MaxCompute sink connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, click the name of the region where your instance is deployed.
  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Message Outflow.
  3. On the Message Outflow page, click Create Task.
  4. In the Create Message Outflow Task panel, configure the parameters and click Confirm.
    1. In the Basic Information section, configure the Task Name parameter and select MaxCompute from the Message Outflow Task Type drop-down list.
    2. In the Resource Configuration section, configure the parameters. The following table describes the parameters.
      Table 1. Source Message Queue for Apache Kafka
      ParameterDescriptionExample
      RegionSelect the region where the Message Queue for Apache Kafka instance resides. China (Hangzhou)
      Message Queue for Apache Kafka InstanceThe ID of the Message Queue for Apache Kafka instance. alikafka_post-cn-9hdsbdhd****
      TopicThe source topic in the Message Queue for Apache Kafka instance. guide-sink-topic
      Group IDThe ID of the group in the Message Queue for Apache Kafka instance.
      • Quickly Create: The system automatically creates a group whose ID is in the GID_EVENTBRIDGE_xxx format.
      • Use Existing Group: Select the ID of an existing group that is not being used. If you select an existing group that is being used, the publishing and subscription of existing messages are affected.
      Use Existing Group
      Concurrency Quota (Consumers)The number of threads that concurrently consume data in the topic. The following items describe the mapping between threads and partitions in a topic:
      • If the number of partitions in a topic is equal to the number of threads that concurrently consume data in the topic, one thread consumes data in one partition. We recommend that you use this configuration.
      • If the number of partitions in a topic is greater than the number of threads that concurrently consume data in the topic, the threads evenly consume data in all partitions.
      • If the number of partitions in a topic is smaller than the number of threads that concurrently consume data in the topic, one thread consumes data in one partition. The additional threads do not consume data.
      2
      Consumer Offset
      • Latest Offset: Consume messages from the most recent offset.
      • Earliest Offset: Consume messages from the earliest offset.
      Latest Offset
      Network ConfigurationIf cross-border data transmission is required, select Internet. In other cases, select Default Network. Default Network
      Table 2. Target MaxCompute
      ParameterDescriptionExample
      AccessKey IDThe AccessKey ID of your Alibaba Cloud account. The AccessKey ID is used to access MaxCompute. LTAI5tHPVCZywsoEVvFu****
      AccessKey SecretThe AccessKey secret of your Alibaba Cloud account. 4RAKUQpZtUntDgvoKu0tvrkrOM****
      MaxCompute Project NameSelect the created MaxCompute project. test_compute
      MaxCompute Table NameSelect the created MaxCompute table. kafka_to_maxcompute
      MaxCompute Table Input ParameterAfter you select the MaxCompute table, the column name and type of the table are displayed here. You need to configure only the Value Extraction Rule parameter. The following code shows how to configure the value extraction rule of a message. In this example, the value of the topic column is extracted from the topic field of the message. Therefore, the Value Extraction Rule parameter is set to $.topic.
      {
          'topic': 'guide-sink-topic',
          'partition': 2,
          'offset': 1,
          'timestamp': 1681372713689,
          'headers': {
              'headers': [],
              'isReadOnly': False
          },
          'key': 'fc_k1',
          'value': 'fc_v1'
      }
      $.topic
      Partitioning
      • Enable
      • Disable
      Enable
      Partition DimensionThis parameter is required only if the Partitioning parameter is set to Enable.
      • DAY: The time in the time column is in the YYYY-MM-DD format. Example: 2023-01-01.
      • HOUR: The time in the time column is in the YYYY-MM-DD HH format. Example: 2023-01-01 12.
      • MINUTE: The time in the time column is in the YYYY-MM-DD HH:mm format. Example: 2023-01-01 12:30.
      HOUR
      Network Settings
      • VPC: Messages in Message Queue for Apache Kafka are delivered to MaxCompute in a virtual private cloud (VPC).
      • Internet: Messages in Message Queue for Apache Kafka are delivered to MaxCompute over the Internet.
      Internet
      VPCSelect a VPC. This parameter is required only if the Network Settings parameter is set to VPC. vpc-bp17fapfdj0dwzjkd****
      vSwitchSelect a vSwitch. This parameter is required only if the Network Settings parameter is set to VPC. vsw-bp1gbjhj53hdjdkg****
      Security GroupSelect a security group. This parameter is required only if the Network Settings parameter is set to VPC. test_group
      Batch PushYou can use the batch push feature to aggregate multiple events in a batch. Batch push is triggered when the condition that is specified by the Messages parameter or the Interval parameter is met. For example, you set the Messages parameter to 100 and the Interval parameter to 15 seconds. If the number of messages reaches 100 in 10 seconds, batch push is immediately triggered. If this feature is disabled, only one message is delivered to Function Compute at a time.None
      MessagesThe maximum number of messages that can be sent for each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: [1,10000]. This parameter is required only if the batch push feature is enabled.100
      IntervalThe interval at which you want to invoke the function. The system sends the aggregated messages to Function Compute when the specified interval is reached. Valid values: 0 to 15. Unit: seconds. The value 0 specifies that messages are immediately sent after aggregation. This parameter is required only if the batch push feature is enabled.10
    After you perform the preceding operations, go to the Message Outflow page, find the MaxCompute sink connector that you created, and then click Start in the Actions column. When the status in the Status column changes from Starting to Running, the connector is created.

Step 3: Test the MaxCompute sink connector

  1. On the Message Outflow page, find the MaxCompute sink connector that you created and click the name of the source topic in the Event Source column.
  2. On the Topic Details page, click Send Message.
  3. In the Start to Send and Consume Message panel, configure the parameters based on the following figure and click OK.
    Send messages
  4. Go to the MaxCompute console and execute the following SQL statement to query the information about the partition.
    show PARTITIONS kafka_to_maxcompute;
    The following figure shows the query result.Partitions
  5. Execute the following statement based on the partition information to query the data in the partition:
    SELECT * FROM kafka_to_maxcompute WHERE time="2023-04-13 17";
    The following figure shows the query result.Data in the partition