This topic describes how to use DataWorks Data Integration to migrate data from a Kafka cluster to MaxCompute.

Prerequisites

  • MaxCompute is activated. For more information, see Activate MaxCompute.
  • DataWorks is activated.
  • A workflow is created in DataWorks. In this example, a DataWorks workspace in basic mode is used. For more information, see Create a workflow.
  • A Kafka cluster is created.

    Before data migration, make sure that your Kafka cluster works as expected. In this example, Alibaba Cloud E-MapReduce (EMR) is used to automatically create a Kafka cluster. For more information, see Kafka quick start.

    In this example, the following version of EMR Kafka is used:
    • EMR version: V3.12.1
    • Cluster type: Kafka
    • Software: Ganglia 3.7.2, ZooKeeper 3.4.12, Kafka 2.11-1.0.1, and Kafka Manager 1.3.3.16

    The Kafka cluster is deployed in a virtual private cloud (VPC) in the China (Hangzhou) region. The Elastic Compute Service (ECS) instances in the primary instance group of the Kafka cluster are configured with public and private IP addresses.

Background information

Kafka is distributed middleware that is used to publish and subscribe to messages. Kafka is widely used because of its high performance and high throughput. Kafka can process millions of messages per second. Kafka is applicable to streaming data processing, and is used in scenarios such as user behavior tracing and log collection.

A typical Kafka cluster contains several producers, brokers, consumers, and a ZooKeeper cluster. A Kafka cluster uses ZooKeeper to manage configurations and coordinate services in the cluster.

A topic is the most commonly used collection of messages in a Kafka cluster, and is a logical concept for message storage. Topics are not stored on physical disks. Instead, messages in each topic are stored on the disks of each cluster node by partition. Multiple producers can publish messages to a topic, and multiple consumers can subscribe to messages in a topic.

When a message is stored to a partition, the message is allocated an offset. The offset is the unique ID of the message in the partition. The offsets of messages in each partition start from 0.

Step 1: Prepare Kafka data

You must prepare test data in the Kafka cluster. Configure a security group rule for the header node of the EMR cluster to allow requests on TCP ports 22 and 9092. This way, you can log on to the header node of the EMR cluster and MaxCompute and DataWorks can communicate with the header node.

  1. Log on to the header node of the EMR cluster.
    1. Log on to the EMR console.
    2. In the top navigation bar, click Cluster Management.
    3. On the page that appears, find the cluster for which you want to prepare test data and go to the details page of the cluster.
    4. On the details page of the cluster, click Instances. Find the IP address of the header node of the E-MapReduce cluster and use the IP address to remotely log on to the header node by using Secure Shell (SSH).
  2. Create a test topic.
    Run the following command to create a test topic named testkafka:
    [root@emr-header-1 ~]# kafka-topics.sh --zookeeper emr-header-1:2181/kafka-1.0.1 --partitions 10 --replication-factor 3 --topic testkafka  --create
    Created topic "testkafka".
  3. Write test data.
    Run the following command to simulate a producer to write data to the testkafka topic. Kafka is used to process streaming data. You can continuously write data to the topic. To ensure that test results are valid, we recommend that you write more than 10 records.
    [root@emr-header-1 ~]# kafka-console-producer.sh --broker-list emr-header-1:9092 --topic testkafka
    >123
    >abc
    >
    To simulate a consumer to check whether data is written to Kafka, open another SSH window and run the following command. If the data that is written appears, the data is written to the topic.
    [root@emr-header-1 ~]# kafka-console-consumer.sh --bootstrap-server emr-header-1:9092 --topic testkafka --from-beginning
    123
    abc

Step 2: Create a destination table in DataWorks

Create a destination table in DataWorks to receive data from Kafka.

  1. Go to the DataStudio page.
    1. Login DataWorks console.
    2. In the left-side navigation pane, click Workspaces.
    3. In the top navigation bar, select the region where the target workspace resides. Find the target workspace and click Data Analytics in the Actions column.
  2. Right-click workflow, Select new > MaxCompute > table.
  3. In create a table page, select the engine type, and enter table name.
  4. Click DDL Statement. In the DDL Statement dialog box, enter the following CREATE TABLE statement and click Generate Table Schema:
    CREATE TABLE testkafka 
    (
     key             string,
     value           string,
     partition1      string,
     timestamp1      string,
     offset          string,
     t123            string,
     event_id        string,
     tag             string
    ) ;
    Each column in the statement corresponds to a default column of Kafka Reader that is provided by DataWorks Data Integration.
    • __key__: the key of the message.
    • __value__: the complete content of the message.
    • __partition__: the partition where the message resides.
    • __headers__: the header of the message.
    • __offset__: the offset of the message.
    • __timestamp__: the timestamp of the message.

    You can customize a column. For more information, see Kafka Reader.

  5. Click commit to the production environment and confirm.

Step 3: Synchronize the data

  1. Create an exclusive resource group for Data Integration.

    The Kafka plug-in cannot run on the default resource group of DataWorks as expected. You must use an exclusive resource group for Data Integration to synchronize data. For more information, see Create and use an exclusive resource group for Data Integration.

  2. In the top navigation bar, choose Conversion scripticon.
  3. In script mode, click **icon.
  4. Configure the script. In this example, enter the following code:
    {
        "type": "job",
        "steps": [
            {
                "stepType": "kafka",
                "parameter": {
                    "server": "47.xxx.xxx.xxx:9092",
                    "kafkaConfig": {
                        "group.id": "console-consumer-83505"
                    },
                    "valueType": "ByteArray",
                    "column": [
                        "__key__",
                        "__value__",
                        "__partition__",
                        "__timestamp__",
                        "__offset__",
                        "'123'",
                        "event_id",
                        "tag.desc"
                    ],
                    "topic": "testkafka",
                    "keyType": "ByteArray",
                    "waitTime": "10",
                    "beginOffset": "0",
                    "endOffset": "3"
                },
                "name": "Reader",
                "category": "reader"
            },
            {
                "stepType": "odps",
                "parameter": {
                    "partition": "",
                    "truncate": true,
                    "compress": false,
                    "datasource": "odps_first",
                    "column": [
                        "key",
                        "value",
                        "partition1",
                        "timestamp1",
                        "offset",
                        "t123",
                        "event_id",
                        "tag"
                    ],
                    "emptyAsNull": false,
                    "table": "testkafka"
                },
                "name": "Writer",
                "category": "writer"
            }
        ],
        "version": "2.0",
        "order": {
            "hops": [
                {
                    "from": "Reader",
                    "to": "Writer"
                }
            ]
        },
        "setting": {
            "errorLimit": {
                "record": ""
            },
            "speed": {
                "throttle": false,
                "concurrent": 1,
            }
        }
    }
    To view the values of the group.id parameter and the names of consumer groups, run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --list command on the header node.
    [root@emr-header-1 ~]#  kafka-consumer-groups.sh  --bootstrap-server emr-header-1:9092  --list
    Note: This will not show information about old Zookeeper-based consumers.
    
    _emr-client-metrics-handler-group
    console-consumer-69493
    console-consumer-83505
    console-consumer-21030
    console-consumer-45322
    console-consumer-14773
    In this example, console-consumer-83505 is used. Run the kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505 command on the header node to obtain the values of the beginOffset and endOffset parameters.
    [root@emr-header-1 ~]# kafka-consumer-groups.sh --bootstrap-server emr-header-1:9092 --describe --group console-consumer-83505
    Note: This will not show information about old Zookeeper-based consumers.
    Consumer group 'console-consumer-83505' has no active members.
    TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
    testkafka                      6          0               0               0          -                                                 -                              -
    test                           6          3               3               0          -                                                 -                              -
    testkafka                      0          0               0               0          -                                                 -                              -
    testkafka                      1          1               1               0          -                                                 -                              -
    testkafka                      5          0               0               0          -                                                 -                              -
  5. Configure a resource group for scheduling.
    1. On the node configuration tab, click the Properties tab in the right-side navigation pane.
    2. In the Resource Group section, set the Resource Group parameter to the exclusive resource group for Data Integration that you have created.
      Note Assume that you want to write Kafka data to MaxCompute at a regular interval, for example, on an hourly basis. You can use the beginDateTime and endDateTime parameters to set the interval for data reading to 1 hour. Then, the data integration node is scheduled to run once per hour. For more information, see Kafka Reader.
  6. Click **icon to run the code.
  7. You can operation Log view the results.

What to do next

You can create a data development job and run SQL statements to check whether the data has been synchronized from Message Queue for Apache Kafka to the current table. This topic uses the select * from testkafka statement as an example. Specific steps are as follows:

  1. In the left-side navigation pane, choose Data Development > Business Flow.
  2. Right-click and choose Data Development > Create Data Development Node ID > ODPS SQL.
  3. In the Create Node dialog box, enter the node name, and then click Submit.
  4. On the page of the created node, enter select * from testkafka and then click the Run icon.