All Products
Search
Document Center

ApsaraMQ for Kafka:Create a MaxCompute sink connector

Last Updated:Mar 11, 2026

When your Kafka topics accumulate data that needs offline analytics or warehousing, you can stream it directly into MaxCompute by creating a sink connector. The connector continuously exports messages from a specified topic and writes them to a MaxCompute table, with optional time-based partitioning for efficient querying.

Prerequisites

Before you begin, make sure you have:

  • An ApsaraMQ for Kafka instance in a supported region

  • A topic on the ApsaraMQ for Kafka instance that produces the data to export

  • A consumer group ID on the ApsaraMQ for Kafka instance (create one or use an existing group that is not in use)

  • A MaxCompute project

  • An Alibaba Cloud AccessKey pair (AccessKey ID and AccessKey secret) with permissions to access MaxCompute

Collect the following values before you start. You need them when configuring the connector:

ValueDescriptionExample
ApsaraMQ for Kafka instance IDThe instance that produces dataalikafka_post-cn-9hdsbdhd****
Topic nameThe source topicguide-sink-topic
Consumer group IDThe consumer group for the connectorGID_EVENTBRIDGE_xxx
AccessKey IDCredentials to access MaxComputeLTAI5tXxx
AccessKey secretCredentials to access MaxComputexXxXxXx
MaxCompute project nameThe destination projecttest_compute
MaxCompute table nameThe destination tablekafka_to_maxcompute

Step 1: Create a MaxCompute table

Create a destination table on the MaxCompute client. For details, see Create a table.

With partitioning -- add a partition key column named time with type STRING:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING, valueName STRING, valueAge BIGINT) PARTITIONED by (time STRING);

Without partitioning:

CREATE TABLE IF NOT EXISTS kafka_to_maxcompute(topic STRING, valueName STRING, valueAge BIGINT);
Tip: Partitioning organizes data into time-based segments, which improves query performance and reduces scan costs for large tables. Enable partitioning if you expect high data volumes or need to query by time range. Skip partitioning for small tables or when time-based filtering is not needed.

After the statement runs successfully, verify the table on the Tables page in the MaxCompute console.

Table creation resultTables page with the new table

Step 2: Create and start the connector

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where your instance resides.

  2. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  3. On the Tasks page, click Create Task.

  4. On the Create Task page, enter a Task Name and Description, then configure the following sections.

Configure the source

In the Source step, set Data Provider to ApsaraMQ for Kafka and configure the following parameters. Click Next Step when finished.

ParameterDescriptionExample
RegionThe region where the ApsaraMQ for Kafka instance resides.China (Hangzhou)
ApsaraMQ for Kafka InstanceThe instance ID.alikafka_post-cn-9hdsbdhd****
TopicThe source topic that produces the data to export.guide-sink-topic
Group IDThe consumer group for the connector. Select Quickly Create to auto-generate a group in GID_EVENTBRIDGE_xxx format, or select Use Existing Group to choose an existing group that is not in use. Selecting a group that is already in use affects existing message subscriptions.Use Existing Group
Consumer OffsetLatest Offset: start from the newest message. Earliest Offset: start from the oldest message.Latest Offset
Network ConfigurationSelect Self-managed Internet for cross-border data transmission. Otherwise, select Basic Network.Basic Network
Data FormatEncoding format for binary data from the source. Json (default): UTF-8 encoded JSON in the payload. Text: UTF-8 encoded strings. Binary: Base64 encoded strings.Json
MessagesMaximum number of messages per function invocation. Messages are sent when the backlog reaches this value. Valid values: 1 to 10000.2000
Interval (Unit: Seconds)Time interval for invoking the function and sending aggregated messages to Function Compute. Valid values: 0 to 15. A value of 0 sends messages immediately after aggregation.3

Configure filtering and transformation

  1. In the Filtering step, define a data pattern to filter messages. For details, see Event patterns.

  2. In the Transformation step, specify a data cleansing method for operations such as splitting, mapping, enrichment, and dynamic routing. For details, see Use Function Compute to perform message cleansing.

Configure the sink

In the Sink step, set Service Type to MaxCompute acs.maxcompute and configure the following parameters.

ParameterDescriptionExample
AccessKey IDThe AccessKey ID for accessing MaxCompute.yourAccessKeyID
AccessKey SecretThe AccessKey secret for accessing MaxCompute.yourAccessKeySecret
MaxCompute Project NameThe name of the MaxCompute project.test_compute
MaxCompute Table NameThe name of the MaxCompute table created in Step 1.kafka_to_maxcompute
MaxCompute Table Input ParameterAfter selecting the table, column names and types are displayed. Set the Value Extraction Rule for each column using JSONPath expressions. See Value extraction rules for details.$.data.topic
Partition DimensionDisable: no partitioning. Enable: partition data by time. If enabled, configure the Partition Value using time variables {yyyy}, {MM}, {dd}, {HH}, {mm} (case-sensitive) or constants.Enable, {yyyy}-{MM}-{dd}.{HH}:{mm}.suffix
Network ConfigurationVPC: deliver messages through a virtual private cloud (VPC). Internet: deliver messages over the public network.Internet
VPCThe VPC ID. Required only when Network Configuration is set to VPC.vpc-bp17fapfdj0dwzjkd****
vSwitchThe vSwitch ID. Required only when Network Configuration is set to VPC.vsw-bp1gbjhj53hdjdkg****
Security GroupThe security group ID. Required only when Network Configuration is set to VPC.test_group

Value extraction rules

Each message delivered to the connector follows the CloudEvents structure. Use JSONPath expressions to map message fields to MaxCompute table columns.

Example message:

{
  "data": {
    "topic": "t_test",
    "partition": 2,
    "offset": 1,
    "timestamp": 1717048990499,
    "headers": {
      "headers": [],
      "isReadOnly": false
    },
    "key": "MaxCompute-K1",
    "value": "MaxCompute-V1"
  },
  "id": "9b05fc19-9838-4990-bb49-ddb942307d3f-2-1",
  "source": "acs:alikafka",
  "specversion": "1.0",
  "type": "alikafka:Topic:Message",
  "datacontenttype": "application/json; charset=utf-8",
  "time": "2024-05-30T06:03:10.499Z",
  "aliyunaccountid": "1413397765616316"
}

Example extraction rules for the kafka_to_maxcompute table:

ColumnTypeValue extraction ruleExtracted value
topicSTRING$.data.topict_test
valueNameSTRING$.data.valueMaxCompute-V1
valueAgeBIGINT$.data.offset1

Configure the retry policy

In the Task Property section, configure the retry policy for failed event pushes and the fault-handling method. For details, see Retry policies and dead-letter queues.

Save and verify the connector status

Click Save. On the Tasks page, find the connector. When the Status column changes from Starting to Running, the connector is active and streaming data.

Step 3: Verify data delivery

Send a test message and confirm that it arrives in the MaxCompute table.

  1. On the Tasks page, find the connector and click the source topic name in the Event Source column.

  2. On the Topic Details page, click Send Message.

  3. In the Start to Send and Consume Message panel, enter a test message and click OK.

    Send message panel

  4. In the MaxCompute console, query the partitions to confirm data arrived:

       show PARTITIONS kafka_to_maxcompute;

    Partition query result

  5. Query data in the target partition. Replace the time value with the actual partition value from the previous step: If the query returns the test message data, the connector is working correctly.

       SELECT * FROM kafka_to_maxcompute WHERE time="2024-05-31.16:37.suffix";

    Partition data query result