You can create and run an extract, transform, load (ETL) task to obtain data from a source topic, process the data, and sends the processed data to a destination topic in Message Queue for Apache Kafka. This topic describes how to manage ETL tasks in Message Queue for Apache Kafka.

Prerequisites

Before you run an ETL task, make sure that the following requirements are met:
  • A source topic and a destination topic are created in Message Queue for Apache Kafka. For more information, see Step 1: Create a topic.
    Note The topics that are used to store the information about the ETL task are created. You must create these topics if you want to manually create auxiliary topics that are required by the ETL task. For more information about the parameters of these topics, see Step 6 in the Create an ETL task section of this topic.
  • Function Compute is activated. For more information, see Activate Function Compute.
  • The related permissions are obtained if you are a RAM user. For more information, see Grant permissions to RAM users.

    The following code provides an example of the policy that specifies the permissions:

    {
        "Version": "1",
        "Statement": [
            {
                "Action": [
                    "kafka:CreateETLTask",
                    "kafka:ListETLTask",
                    "kafka:DeleteETLTask"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    }

Background information

  • An ETL task extracts data from the source topic, transforms the data, and then loads the transformed data to the destination topic. You can write a function to implement the logic of the ETL task. Message Queue for Apache Kafka calls this function to complete the ETL process.
  • During data processing, Function Compute automatically creates the corresponding service and function. The name of the service is in the format of _FC-kafka-ETL task name.
  • The source topic and the destination topic in the Message Queue for Apache Kafka instances must reside in the same region.
  • Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure Log Service resources and view function execution logs.
  • The ETL task feature in Message Queue for Apache Kafka is in public preview. This feature is independent of Message Queue for Apache Kafka instances. Message Queue for Apache Kafka does not charge you for using this feature. Alibaba Cloud does not provide service level agreement (SLA) guarantee for the ETL task feature. For more information about the services that are related to ETL tasks and the billing rules of these services, see the documentation of these services.

Grant the permissions to access services that are related to the ETL task feature

When you use the ETL task feature for the first time, you must authorize Message Queue for Apache Kafka to access related services. After you confirm the authorization, the system automatically creates the service-linked role AliyunServiceRoleForAlikafkaETL. This role grants Message Queue for Apache Kafka the permissions to access services that are related to the ETL task feature. For more information, see Service-linked roles.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click ETL Tasks.
  4. On the ETL Tasks page, click Create Task.
  5. In the Service Authorization message that appears, click OK.

Create an ETL task

Create and deploy an ETL task to extract data from a source topic, process the data, and then load the processed data to a destination topic.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click ETL Tasks.
  4. On the ETL Tasks page, click Create Task.
  5. In the Configure Basic Information step, enter the name of the ETL task and click Next.
  6. In the Configure Source and Destination step, configure the topics and advanced settings, and click Next.
    Parameter Description Example
    Instance The instances in which the source topic and the destination topic reside.
    • alikafka_pre-cn-7pp2bz47****
    • alikafka_post-cn-2r42bvbm****
    Topic The source topic and the destination topic.
    Note The source topic and the destination topic must be different topics.
    • topic****
    • test****
    Consumer Offset The start time of data consumption. Click Advanced Settings to display the parameter. Valid values:
    • Earliest Offset: Consumption starts from the earliest offset.
    • Latest Offset: Consumption starts from the latest offset.
    Latest Offset
    Failure Handling Policy Specifies whether to send the rest data after a message fails to be sent. Click Advanced Settings to display the parameter. Valid values:
    • Continue Subscription: sends the rest data after a message fails to be sent.
    • Stop Subscription: does not send the rest data after a message fails to be sent.
    Continue Subscription
    Resource Creation Method The method to create auxiliary topics that are required by the ETL task. Click Advanced Settings to display the parameter.
    • Auto
    • Manual
    Auto
    ETL Task Consumer Group The group that is used by the ETL task. If you set the Resource Creation Method parameter to Manual, this parameter is displayed. We recommend that you start the name of the group with etl-cluster. etl-cluster-kafka
    Task Offset Topic The topic that is used to store consumer offsets. If you set the Resource Creation Method parameter to Manual, this parameter is displayed.
    • Name: the name of the topic. We recommend that you start the name with etl-offset.
    • Partitions: the number of partitions in the topic. The number must be greater than 1.
    • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
    • Log Cleanup Policy: the log cleanup policy for the topic. Set the value to Compact.
    etl-offset-kafka
    Task Configuration Topic The topic that is used to store task configurations. If you set the Resource Creation Method parameter to Manual, this parameter is displayed.
    • Name: the name of the topic. We recommend that you start the name with etl-config.
    • Partitions: the number of partitions in the topic. The number must be 1.
    • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
    • Log Cleanup Policy: the log cleanup policy for the topic. Set the value to Compact.
    etl-config-kafka
    Task Status Topic The topic that is used to store the task status. If you set the Resource Creation Method parameter to Manual, this parameter is displayed.
    • Name: the name of the topic. We recommend that you start the name with etl-status.
    • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
    • Storage Engine: the storage engine of the topic. Set the value to Local Storage.
    • Log Cleanup Policy: the log cleanup policy for the topic. Set the value to Compact.
    etl-status-kafka
    Dead-letter Queue Topic The topic that is used to store exception data of the ETL task framework. If you set the Resource Creation Method parameter to Manual, this parameter is displayed. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
    • Name: the name of the topic. We recommend that you start the name with etl-error.
    • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
    • Storage Engine: the storage engine of the topic. You can set the value to Local Storage or Cloud Storage.
    etl-error-kafka
    Error Data Topic The topic that is used to store the error data of the connector. If you set the Resource Creation Method parameter to Manual, this parameter is displayed. To save topic resources, you can create a topic as both the dead-letter queue topic and the error data topic.
    • Name: the name of the topic. We recommend that you start the name with etl-error.
    • Partitions: the number of partitions in the topic. We recommend that you set the value to 6.
    • Storage Engine: the storage engine of the topic. You can set the value to Local Storage or Cloud Storage.
    etl-error-kafka
  7. In the Configure Function step, configure the function and click Create.
    Before you click Create, you can click Test to test whether the function works as expected.
    Parameter Description Example
    Programming Language The language in which the function is written. Set the value to Python 3. Python3
    Template The function template that the system provides. After you select a function template, the system automatically populates the Code field with the code of the function template. Add Prefix/Suffix
    Code The code that is used to process messages. Message Queue for Apache Kafka provides function templates that you can use to cleanse and transform data. You can modify the code of the selected function template as required.
    Note
    • You can import the Python modules that are supported.
    • The message in the code is in the dictionary format. You need only to modify the key and the value.
    • Return the processed message. Return None if the function is used to filter messages.
    def deal_message(message):
        for keyItem in message.keys():
            if (keyItem == 'key'):
                message[keyItem] = message[keyItem] + "KeySurfix"
                continue
            if (keyItem == 'value'):
                message[keyItem] = message[keyItem] + "ValueSurfix"
                continue
        return message
    Message Key The key of the message to be processed in the source topic. Click Test Code to display the parameter. demo
    Message Content The value of the message to be processed in the source topic. {"key": "test"}
    After you create the ETL task, you can view the task on the ETL Tasks page. The system automatically deploys the task.
    Note If the created ETL task cannot run because Function Compute is not supported in the zone in which the destination instance resides, a message is displayed. To run this ETL task, submit a ticket to contact the technical support of Message Queue for Apache Kafka.

Send a test message

After the ETL task is deployed, you can send a test message to the source topic in Message Queue for Apache Kafka to test whether the data can be processed by the function you configure and sent to the destination topic.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click ETL Tasks.
  4. On the ETL Tasks page, find the created ETL task and click Test in the Actions column.
  5. In the Send Message panel, enter the information of the test message and click OK to send the test message.
    • In the Message Key field, enter the key of the test message, such as demo.
    • In the Message Content field, enter the content of the test message, such as {"key": "test"}.
    • Set the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.
      • If you want to send the test message to a specific partition, click Partition ID and enter the partition ID, such as 0, in the Yes field. For more information about how to query partition IDs, see View partition status.
      • If you do not want to send the test message to a specific partition, click No.

View function logs

After you extract and process data in Message Queue for Apache Kafka, you can view the logs of the function to verify whether the destination topic receives the processed data. For more information, see Configure Log Service resources and view function execution logs.

Figure 1. View function logs
View function logs

View the details of an ETL task

After you create the ETL task, you can view the details of the task in the Message Queue for Apache Kafka console.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click ETL Tasks.
  4. On the ETL Tasks page, find the created ETL task and click Details in the Actions column.
    On the Task Details page, view the details of the ETL task.

Delete an ETL task

If you no longer need to use an ETL task, you can delete the ETL task in the Message Queue for Apache Kafka console.

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. In the left-side navigation pane, click ETL Tasks.
  4. On the ETL Tasks page, find the ETL task that you want to delete and click Delete in the Actions column.
  5. In the Note message, click OK.