This topic describes how to use DataWorks Data Integration to migrate data from a Kafka cluster to MaxCompute.

Prerequisites

  • Activate MaxCompute.
  • DataWorks is activated.
  • A workflow is created in the DataWorks console. In this example, create a workflow in a DataWorks workspace in the basic mode. For more information, see Create a workflow.
  • A Kafka cluster is created.

    Before data migration, make sure that your Kafka cluster works properly. In this example, use Alibaba Cloud E-MapReduce to create a Kafka cluster automatically. For more information, see Quick start.

    This topic uses the following configurations as an example:
    • E-MapReduce version: 3.12.1
    • Cluster type: Kafka
    • Software: Ganglia 3.7.2, ZooKeeper 3.4.12, Kafka 2.11-1.0.1, and Kafka Manager 1.3.3.16

    The Kafka cluster is deployed in a Virtual Private Cloud (VPC) in the China (Hangzhou) region. The Elastic Compute Service (ECS) instances in the master instance group of the Kafka cluster are configured with public and private IP addresses.

Background information

Kafka is message-oriented middleware used to publish and subscribe to messages in a distributed manner. Featuring high performance and high throughput, Kafka can process millions of messages per second. Applicable to streaming data processing, Kafka is mainly used to track user behavior data and collect logs.

A typical Kafka cluster contains several producers, brokers, and consumers, and a ZooKeeper cluster. The ZooKeeper cluster manages configurations and coordinates services for the Kafka cluster.

A topic is a logical collection of messages. Topics are not stored on physical disks. Instead, messages in each topic are stored on the disks of each cluster node by partition. Multiple producers can publish messages to a topic, and multiple consumers can subscribe to messages in a topic.

When a message is stored to a partition, it is allocated an offset, which is the unique ID of the message in the partition. The offsets of messages in each partition start from 0.

Step 1: Prepare Kafka test data

You need to prepare test data in the Kafka cluster, which is referred to as an E-MapReduce cluster in the following sections. To make sure that you can log on to the header node of the E-MapReduce cluster and that MaxCompute and DataWorks can communicate with the header node, configure a security group rule for the header node to allow requests on TCP ports 22 and 9092.

  1. Log on to the header node of the E-MapReduce cluster by using the IP address of the header node.
    Log on to the E-MapReduce console. In the top navigation bar, click Cluster Management. On the page that appears, find the target cluster and go to the details page of the cluster. Then, click Instances. Find the IP address of the header node of the E-MapReduce cluster and use the IP address to remotely log on to the header node through Secure Shell (SSH).
  2. Create a test topic.
    Run the following command to create a test topic named testkafka:
    [root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
    Created topic "testkafka".
  3. Write test data to the test topic.
    Run the following command to simulate a producer to write test data to the testkafka topic. Kafka is used to process streaming data. Therefore, you can write data to the topic continuously. To make sure that test results are valid, we recommend that you write more than 10 records to the topic.
    [root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
    >123
    >abc
    >
    To check whether data is written to the topic in real time, open another command prompt and run the following command. If the data that the producer wrote appears, the data is written to the topic.
    [root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
    123
    abc

Step 2: Create a destination table in a DataWorks workspace

To create a destination table in a DataWorks workspace to receive data from the Kafka cluster, follow these steps:

  1. Go to the DataStudio page.
    1. Login DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where the target workspace resides. Find the target workspace and click Data Analytics in the Actions column.
  2. Right-click workflow, Select new > MaxCompute > table.
  3. In create a table page, select the engine type, and enter table name.
  4. Click DDL Statement. In the dialog box that appears, enter the following statement and click Generate Table Schema:
    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;
    Each column in the CREATE TABLE statement corresponds to a default column of Kafka Reader provided by DataWorks Data Integration.
    • __key__: the key of the message.
    • __value__: the complete content of the message.
    • __partition__: the partition where the message resides.
    • __headers__: the header of the message.
    • __offset__: the offset of the message.
    • __timestamp__: the timestamp of the message.

    You can also customize a column. For more information, see Configure Kafka Reader.

  5. Click commit to the production environment and confirm.

Step 3: Synchronize the test data to MaxCompute

  1. Add a custom resource group.

    Kafka Reader cannot properly run on the default resource group of DataWorks. You need to add a custom resource group for data integration. For more information, see Add a custom resource group.

    To save resources, add the header node of the E-MapReduce cluster as a custom resource group. Then, wait until the status of the custom resource group changes to Available.

  2. Create an offline synchronization node.
    1. Go to the data analytics page. Right-click the specified workflow and choose new > data integration > offline synchronization.
    2. In create a node dialog box, enter node name, and click submit.
  3. In the top navigation bar, choose Conversion scripticon.
  4. In script mode, click **icon.
  5. Enter the script content. In this example, enter the following content:
    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_first",
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1,
            }
        }
    }
    To view the values of the group.id parameter, run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list command on the header node of the E-MapReduce cluster.
    [root@emr-header-1 ~]#  kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    _emr-client-metrics-handler-group
    console-consumer-69493
    console-consumer-83505
    console-consumer-21030
    console-consumer-45322
    console-consumer-14773
    Assume that the consumer group is console-consumer-83505. Run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505 command on the header node to obtain the values of the beginOffset and endOffset parameters.
    [root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    Note: This will not show information about old Zookeeper-based consumers.
    Consumer group 'console-consumer-83505' has no active members.
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    testkafka                      6          0               0               0          -                                                 -                              -
    test                           6          3               3               0          -                                                 -                              -
    testkafka                      0          0               0               0          -                                                 -                              -
    testkafka                      1          1               1               0          -                                                 -                              -
    testkafka                      5          0               0               0          -                                                 -                              -
  6. Configure a resource group for scheduling.
    1. In the right-side navigation pane of the node configuration tab, click the Properties tab.
    2. In the Resource Group section, set Resource Group to your custom resource group.
      Note If you want to synchronize Kafka data to MaxCompute at a regular interval, for example, on an hourly basis, you can use the beginDateTime and endDateTime parameters to set the interval to 1 hour. Then, the batch sync node is run once every hour. For more information, see Configure Kafka Reader.
  7. Click **icon to run the code.
  8. You can operation Log view the results.

What to do next

You can create a data development job and run SQL statements to check whether the data has been synchronized from Message Queue for Apache Kafka to the current table. This topic uses the select * from testkafka statement as an example. Specific steps are as follows:

  1. In the left-side navigation pane, choose Data Development > Business Flow.
  2. Right-click and choose Data Development > Create Data Development Node ID > ODPS SQL.
  3. In the Create Node dialog box, enter the node name, and then click Submit.
  4. On the page of the created node, enter select * from testkafka and then click the Run icon.