All Products
Search
Document Center

MaxCompute:Migrate data from Kafka to MaxCompute

Last Updated:Oct 30, 2023

This topic describes how to use DataWorks Data Integration to migrate data from a Kafka cluster to MaxCompute.

Prerequisites

  • The MaxCompute and DataWorks services are activated. For more information, see Activate MaxCompute and DataWorks.

  • A workflow is created in your workspace in the DataWorks console. In this topic, a workflow is created in a workspace that is in basic mode. For more information, see Create a workflow.

  • A Kafka cluster is created.

    Before you migrate data from the Kafka cluster to MaxCompute, make sure that the Kafka cluster can work as expected. In this example, Alibaba Cloud E-MapReduce (EMR) is used to create a Kafka cluster. For more information, see Overview.

    In this topic, the following configurations are used for the EMR Kafka cluster:

    • EMR version: V3.12.1

    • Cluster type: Kafka

    • Software: Ganglia 3.7.2, ZooKeeper 3.4.12, Kafka 2.11-1.0.1, and Kafka Manager 1.3.3.16

    The EMR Kafka cluster is deployed in a virtual private cloud (VPC) in the China (Hangzhou) region. The Elastic Compute Service (ECS) instances in the master node group of the EMR Kafka cluster are configured with public and private IP addresses.

Background information

Kafka is a distributed middleware that is used to publish and subscribe to messages. Kafka is widely used because of its high performance and high throughput. Kafka can process millions of messages per second. Kafka is suitable for processing streaming data and is mainly used in scenarios such as user behavior tracing and log collection.

A typical Kafka cluster contains several producers, brokers, consumers, and a ZooKeeper cluster. A Kafka cluster uses ZooKeeper to manage configurations and coordinate services in the cluster.

A topic is a collection of messages that are most commonly used in a Kafka cluster, and is a logical concept for message storage. Topics are not stored on physical disks. Messages in each topic are stored on the disk of each node in the cluster by partition. Multiple producers can publish messages to a topic, and multiple consumers can subscribe to messages in a topic.

When a message is stored to a partition, the system allocates an offset to the message. The offset is a unique ID of the message in the partition. The offsets of messages in each partition start from 0.

Step 1: Prepare test data in the EMR Kafka cluster

You must prepare test data in the EMR Kafka cluster. To ensure that you can log on to the master node of the EMR Kafka cluster and MaxCompute and DataWorks can communicate with the master node, you must configure a security group rule for the master node of the EMR Kafka cluster to allow requests on TCP ports 22 and 9092.

  1. Log on to the master node of the EMR Kafka cluster.

    1. Log on to the EMR console.

    2. In the left-side navigation pane, click EMR on ECS.

    3. On the EMR on ECS page, find the cluster in which you want to prepare test data and click the name of the cluster in the Cluster ID/Name column.

    4. On the details page of the cluster, click the Nodes tab. On the Nodes tab, find the IP address of the master node and use the IP address to remotely log on to the master node in SSH mode.

  2. Create a test topic.

    Run the following command to create a test topic named testkafka:

    kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
  3. Write test data to the test topic.

    Run the following command to simulate a producer to write data to the testkafka topic. Kafka is used to process streaming data. You can continuously write data to the topic in the Dataflow Kafka cluster. To ensure that test results are valid, we recommend that you write more than 10 data records to the topic.

    kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka

    To simulate a consumer to check whether data is written to the topic in the Dataflow Kafka cluster, open another SSH window and run the following command. If the written data appears, the data is written to the topic.

    kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning

Step 2: Create a table in DataWorks

Create a table in DataWorks to receive data from the EMR Kafka cluster.

  1. Go to the DataStudio page.

    1. Log on to the DataWorks console.

    2. In the left-side navigation pane, click Workspaces.

    3. On the Workspaces page, find the desired workspace and choose Shortcuts > Data Development in the Actions column.

  2. Right-click workflow, Select new > MaxCompute > table.

  3. In the Create Table dialog box, enter the name of the table that you want to create in the Name field and click Create.

    Note
    • The table name must start with a letter and cannot contain Chinese or special characters.

    • If multiple MaxCompute compute engine instances are associated with the current workspace, you must select one from the Engine Instance drop-down list.

  4. On the table editing page, click DDL.

  5. In the DDL dialog box, enter the following statement and click Generate Table Schema:

    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;

    Each column in the table that is created by using the preceding statement corresponds to a column from which Kafka Reader reads data.

    • __key__: the key of a Kafka message

    • __value__: the complete content of a Kafka message

    • __partition__: the partition to which a Kafka message belongs

    • __headers__: the header of a Kafka message

    • __offset__: the offset of a Kafka message

    • __timestamp__: the timestamp of a Kafka message.

    You can specify a custom column. For more information, see Kafka Reader.

  6. Click commit to the production environment and confirm.

Step 3: Synchronize the data

  1. Create an exclusive resource group for Data Integration.

    Kafka Reader cannot run on the default resource group of DataWorks as expected. You must use an exclusive resource group for Data Integration to synchronize the data. For more information about how to create an exclusive resource group for Data Integration, see Create and use an exclusive resource group for Data Integration.

  2. Create a data synchronization node.

    1. Go to the DataStudio page. In the left-side navigation pane of the DataStudio page, click Scheduled Workflow, right-click the name of the desired workflow, and then choose Create Node > Data Integration > Offline synchronization.

    2. In the Create Node dialog box, configure the Name parameter and click Confirm.

  3. In the top navigation bar, choose Conversion scripticon.

  4. In script mode, click **icon.

  5. Configure the script. The following sample code provides an example of a script.

    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_first",
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1
            }
        }
    }

    To view the values of the group.id parameter and the names of consumer groups, run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list command on the master node of the EMR Kafka cluster.

    • Sample statement:

      kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    • Return value

      _emr-client-metrics-handler-group
      console-consumer-69493
      console-consumer-83505
      console-consumer-21030
      console-consumer-45322
      console-consumer-14773

    This example uses console-consumer-83505 to demonstrate how to obtain the values of the beginOffset and endOffset parameters by running the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505 command.

    • Sample statement:

      kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    • Return value

      TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
      testkafka                      6          0               0               0          -                                                 -                              -
      test                           6          3               3               0          -                                                 -                              -
      testkafka                      0          0               0               0          -                                                 -                              -
      testkafka                      1          1               1               0          -                                                 -                              -
      testkafka                      5          0               0               0          -                                                 -                              -
  6. Configure a resource group for scheduling.

    1. In the right-side navigation pane of the node configuration tab, click the Properties tab.

    2. In the Resource Group section of the Properties tab, set the Resource Group parameter to the exclusive resource group for Data Integration that you created.

      Note

      If you want to write Kafka data to MaxCompute at a regular interval, such as on an hourly basis, you can use the beginDateTime and endDateTime parameters to set the interval for data reading to 1 hour. Then, the node is scheduled to run once per hour. For more information, see Kafka Reader.

  7. Click **icon to run the code.

  8. You can operation Log view the results.

What to do next

You can create a data development job and run SQL statements to check whether the data has been synchronized from Message Queue for Apache Kafka to the current table. This topic uses the select * from testkafka statement as an example. Specific steps are as follows:

  1. In the left-side navigation pane, choose Data Development > Business Flow.
  2. Right-click and choose Data Development > Create Data Development Node ID > ODPS SQL.
  3. In the Create Node dialog box, enter the node name, and then click Submit.
  4. On the page of the created node, enter select * from testkafka and then click the Run icon.