All Products
Search
Document Center

MaxCompute:Best practices for Kafka data migration to MaxCompute

Last Updated:Feb 24, 2026

This topic describes how to use DataWorks Data Integration to migrate data from a Kafka cluster to MaxCompute.

Prerequisites

  • MaxCompute is activated.

  • A MaxCompute data source is added. For more information, see Associate a MaxCompute compute resource.

  • A workflow is created in your DataWorks workspace. This topic uses a workflow in a workspace that is in basic mode as an example. For more information, see Create a workflow.

  • A Kafka cluster is set up.

    Before you perform data migration, verify that your Kafka cluster environment is operational. This topic uses the Alibaba Cloud EMR service to automatically deploy a Kafka cluster. For detailed procedures, see Kafka Quick Start.

    The EMR Kafka cluster used in this topic has the following configurations:

    • EMR version: EMR-3.12.1

    • Cluster type: Kafka

    • Software: Ganglia 3.7.2, ZooKeeper 3.4.12, Kafka 2.11-1.0.1, and Kafka-Manager 1.3.X.XX

    The Kafka cluster uses a virtual private cloud (VPC) in the China (Hangzhou) region. The ECS compute resources of the primary instance group are configured with public and private IP addresses.

Background information

Kafka is a distributed publish-subscribe middleware. It is widely used because of its high performance and high throughput, which allow it to process millions of messages per second. Kafka is suitable for stream data processing and is mainly used in scenarios such as user behavior tracking and log collection.

A typical Kafka cluster includes several producers, brokers, consumers, and a ZooKeeper cluster. The Kafka cluster uses ZooKeeper to manage its configurations and coordinate services.

A topic is a collection of messages and a fundamental concept in a Kafka cluster. A topic is a logical concept for message storage. Topics are not stored on physical disks. Instead, messages within a topic are stored in partitions on the disks of different nodes in the cluster. Multiple producers can send messages to a topic, and multiple consumers can pull (consume) messages from the topic.

When a message is added to a partition, it is assigned an offset. The offset uses zero-based numbering and serves as the unique ID of the message within that partition.

Step 1: Prepare Kafka data

Create test data in the Kafka cluster. To ensure that you can log on to the EMR cluster header host and that MaxCompute and DataWorks can communicate with the host, you must configure the security group of the EMR cluster header host to allow traffic on TCP ports 22 and 9092.

  1. Log on to the EMR cluster header host.

    1. Go to the EMR Hadoop console.

    2. In the top navigation bar, click Cluster Management.

    3. On the page that appears, find the cluster in which you want to create test data and go to its details page.

    4. On the cluster details page, click the host list, find the address of the EMR cluster header host, and use an SSH connection to remotely log on to the host.

  2. Create a test topic.

    Run the following command to create the test topic testkafka:

    kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
  3. Write test data.

    Run the following command to simulate a producer that writes data to the testkafka topic. Because Kafka processes stream data, you can continuously write data to the topic. To ensure valid test results, write more than 10 data records.

    kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka

    Open another SSH window and run the following command to simulate a consumer that verifies whether data is successfully written to Kafka. If the data is written successfully, the data is displayed.

    kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning

Step 2: Create a target table in DataWorks

Create a target table in DataWorks to store the data from Kafka.

  1. You can go to the Data Development page.

    1. Log on to the DataWorks console.

    2. In the navigation pane on the left, click Data Development And O&M > Data Development.

    3. Select the corresponding workspace from the drop-down list, and then click Go To Data Studio.

  2. Right-click the required business flow and choose Create Table > MaxCompute > Table.

  3. In the Create Table dialog box, enter a table name and click Create.

    Note
    • A table name must start with a letter and cannot contain Chinese characters or special characters.

    • If you attach multiple MaxCompute data sources to Data Studio, select the appropriate MaxCompute Engine Instance.

  4. On the table editing page, click DDL Statement.

  5. In the DDL dialog box, enter the following table definition statement and click Generate Table Schema.

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;

    Each column corresponds to a default column in the Kafka reader of DataWorks Data Integration:

    • __key__: The message key.

    • __value__: The complete message content.

    • __partition__: The partition where the current message resides.

    • __headers__: The headers of the current message.

    • __offset__: The offset of the current message.

    • __timestamp__: The timestamp of the current message.

    You can also specify a custom name. For more information, see Kafka Reader.

  6. Click commit to the production environment and confirm.

Step 3: Synchronize data

  1. Create an exclusive resource group for Data Integration.

    The Kafka reader cannot run as expected on the shared resource group for Data Integration. You must use an exclusive resource group for Data Integration to synchronize data. For more information, see Use exclusive resource groups for Data Integration.

  2. Create a data integration node.

    1. Go to the Data Studio page, right-click the required workflow, and choose Create Node > Data Integration > Offline Sync.

    2. In the Create Node dialog box, enter the node's Name, then click Confirm.

  3. In the top navigation bar, choose Conversion scripticon.

  4. In script mode, click **icon.

  5. Configure the script. The sample code is as follows:

    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_source",// MaxCompute data source name
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1
            }
        }
    }

    You can run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list command on the master node of the EMR Kafka cluster to view the consumer group names and the value of the group.id parameter.

    • Command example

      kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    • Result

      _emr-client-metrics-handler-group
      console-consumer-69493
      console-consumer-83505
      console-consumer-21030
      console-consumer-45322
      console-consumer-14773

    For example, for console-consumer-83505, you can run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505 command on a header host to confirm the values of the beginOffset and endOffset parameters.

    • Command example

      kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    • Result

      TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
      testkafka                      6          0               0               0          -                                                 -                              -
      test                           6          3               3               0          -                                                 -                              -
      testkafka                      0          0               0               0          -                                                 -                              -
      testkafka                      1          1               1               0          -                                                 -                              -
      testkafka                      5          0               0               0          -                                                 -                              -
  6. Configure the scheduling resource group.

    1. On the node editing page, in the right-side navigation bar, click the Scheduling Configuration button.

    2. In the Resource Properties section, for Schedule Resource Group, select the exclusive resource group for Data Integration that you created.

      Note

      To write Kafka data to MaxCompute at regular intervals, such as every hour, you can use the beginDateTime and endDateTime parameters to set the data read interval to 1 hour. Then, you can schedule the data integration task to run once per hour. For more information, see Kafka Reader.

  7. Click **icon to run the code.

  8. You can operation Log view the results.

What to do next

You can create a task in Data Studio to run an SQL statement and check whether the data from ApsaraMQ for Kafka is synchronized to the table. This topic uses the select * from testkafka statement as an example. The following steps describe how to create the task:

  1. Go to the Data Development page.

    1. Log on to the DataWorks console.

    2. In the left navigation pane, click Workspace to go to the Workspaces page.

    3. In the top navigation bar, switch to the destination region, locate the created workspace, and in the Actions column, click Shortcuts > Data Development to go to the Data Studio page.

  2. In the navigation pane on the left of the DataStudio page, click the image icon to go to the Ad Hoc Query pane. In the Ad Hoc Query pane, click the image icon and choose Create > ODPS SQL.

  3. In the Create Node dialog box, you can configure the Path and Name parameters.

  4. Click Confirm.

  5. On the node configuration page, enter the select * from testkafka statement, click the image icon to run the node, and then view the operational log.

    image