This topic describes how to use DataWorks Data Integration to migrate data from a Kafka cluster to MaxCompute.

Prerequisites

  • MaxCompute is activated. For more information, see Activate MaxCompute.
  • DataWorks is activated.
  • A workflow is created in DataWorks. In this example, a DataWorks workspace in basic mode is used. For more information, see Create a workflow.
  • A Kafka cluster is created.

    Before data migration, make sure that your Kafka cluster works as expected. In this topic, Alibaba Cloud E-MapReduce is used to automatically create a Kafka cluster. For more information, see Kafka Quick Start.

    This topic uses the following version of E-MapReduce Kafka:
    • E-MapReduce version: EMR-3.12.1
    • Cluster type: Kafka
    • Software: Ganglia 3.7.2, ZooKeeper 3.4.12, Kafka 2.11-1.0.1, and Kafka Manager 1.3.3.16

    The Kafka cluster is deployed in a virtual private cloud (VPC) in the China (Hangzhou) region. The Elastic Compute Service (ECS) instances in the master instance group are configured with public and private IP addresses.

Background information

Kafka is distributed middleware used to publish and subscribe to messages. Kafka is widely used because of its high performance and high throughput. Kafka can process millions of messages per second. Kafka is applicable to streaming data processing, and is used in scenarios such as user behavior tracing and log collection.

A typical Kafka cluster contains several producers, brokers, consumers, and a ZooKeeper cluster. A Kafka cluster uses ZooKeeper to manage configurations and coordinate services in the cluster.

A topic is the most commonly used collection of messages in a Kafka cluster, and is a logical concept for message storage. Topics are not stored on physical disks. Instead, messages in each topic are stored on the disks of each node by partition. Multiple producers can publish messages to a topic, and multiple consumers can subscribe to messages in a topic.

When a message is stored to a partition, the message is allocated an offset, which is the unique ID of the message in the partition. The offsets of messages in each partition start from 0.

Step 1: Prepare Kafka test data

You must prepare test data in the Kafka cluster. To make sure that you can log on to the header node of the E-MapReduce cluster and that MaxCompute and DataWorks can communicate with the header node, configure a security group rule for the header node to allow requests on TCP ports 22 and 9092.

  1. Log on to the header node of the E-MapReduce cluster.
    Log on to the E-MapReduce console. Go to the Instances page under Cluster Management. Find the IP address of the header node of the E-MapReduce cluster and remotely log on to the header node by using Secure Shell (SSH).
  2. Create a test topic.
    Run the following command to create a test topic named testkafka:
    [root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
    Created topic "testkafka".
  3. Write test data to the topic.
    Run the following command to simulate a producer to write test data to the testkafka topic. Kafka is used to process streaming data. Therefore, you can continuously write data to the topic. To make sure that test results are valid, we recommend that you write more than 10 records to the topic.
    [root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
    >123
    >abc
    >
    To simulate a consumer to check whether data is written to the topic, open another SSH window and run the following command. If the data that was wrote appears, the data is written to the topic.
    [root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
    123
    abc

Step 2: Create a destination table in a DataWorks workspace

Create a destination table in a DataWorks workspace to receive data from Kafka.

  1. Go to the DataStudio page.
    1. Login DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where the target workspace resides. Find the target workspace and click Data Analytics in the Actions column.
  2. Right-click workflow, Select new > MaxCompute > table.
  3. In create a table page, select the engine type, and enter table name.
  4. In the DDL Statement dialog box, enter the following table creation statement and click Generate Table Schema:
    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;
    Each column maps a default column of Kafka Reader provided by DataWorks Data Integration.
    • __key__: the key of the message.
    • __value__: the complete content of the message.
    • __partition__: the partition where the message resides.
    • __headers__: the header of the message.
    • __offset__: the offset of the message.
    • __timestamp__: the timestamp of the message.

    You can also use custom names. For more information, see Configure Kafka Reader.

  5. Click commit to the production environment and confirm.

Step 3: Synchronize the data

  1. Create an exclusive resource group for Data Integration.

    The Kafka plug-in cannot run on the default resource group of DataWorks as expected. You must create an exclusive resource group for Data Integration to synchronize data. For more information, see Use exclusive resource groups for data integration.

  2. In the top navigation bar, choose Conversion scripticon.
  3. In script mode, click **icon.
  4. Configure the script. In this example, enter the following code:
    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_first",
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1,
            }
        }
    }
    To view the values of the group.id parameter and the names of consumer groups, run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list command on the header node.
    [root@emr-header-1 ~]#  kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    _emr-client-metrics-handler-group
    console-consumer-69493
    console-consumer-83505
    console-consumer-21030
    console-consumer-45322
    console-consumer-14773
    In this example, console-consumer-83505 is used. Run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505 command on the header node to obtain the values of the beginOffset and endOffset parameters.
    [root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    Note: This will not show information about old Zookeeper-based consumers.
    Consumer group 'console-consumer-83505' has no active members.
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    testkafka                      6          0               0               0          -                                                 -                              -
    test                           6          3               3               0          -                                                 -                              -
    testkafka                      0          0               0               0          -                                                 -                              -
    testkafka                      1          1               1               0          -                                                 -                              -
    testkafka                      5          0               0               0          -                                                 -                              -
  5. Configure a resource group for scheduling.
    1. In the right-side navigation pane, click Properties.
    2. In the Resource Group section, set Resource Group to the exclusive resource group for Data Integration that you created.
      Note If you need to write Kafka data to MaxCompute at a regular interval, for example, on an hourly basis, you can use the beginDateTime and endDateTime parameters to set the interval to 1 hour. Then, the Data Synchronization node is scheduled once every hour. For more information, see Configure Kafka Reader.
  6. Click **icon to run the code.
  7. You can operation Log view the results.

What to do next

You can create a data development job and run SQL statements to check whether the data has been synchronized from Message Queue for Apache Kafka to the current table. This topic uses the select * from testkafka statement as an example. Specific steps are as follows:

  1. In the left-side navigation pane, choose Data Development > Business Flow.
  2. Right-click and choose Data Development > Create Data Development Node ID > ODPS SQL.
  3. In the Create Node dialog box, enter the node name, and then click Submit.
  4. On the page of the created node, enter select * from testkafka and then click the Run icon.