All Products
Search
Document Center

:Manage ETL tasks

Last Updated:Sep 28, 2023

You can create and run extract, transform, load (ETL) tasks to cleanse, transform, and dump data in ApsaraMQ for Kafka instances. This topic describes how to create an ETL task in the ApsaraMQ for Kafka console to deliver processed data in a source topic to a destination topic.

Prerequisites

Before you run an ETL task, make sure that the following operations are performed:

  • Create a source topic and a destination topic on ApsaraMQ for Kafka instances. For more information, see Step 1: Create a topic.

    Note

    If you want to manually create auxiliary topics that are required by an ETL task, you must also create topics that are used to store the information about the ETL task. For information about the parameters that are configured to create topics, see Step 3 in the "Create an ETL task" section of this topic.

  • Activate Function Compute. For more information, see Activate Function Compute.

  • Obtain the required permissions if you are a Resource Access Management (RAM) user. For more information, see Grant permissions to RAM users.

    The following code provides an example of the policy that specifies the permissions:

    {
        "Version": "1",
        "Statement": [
            {
                "Action": [
                    "kafka:CreateETLTask",
                    "kafka:ListETLTask",
                    "kafka:DeleteETLTask"
                ],
                "Resource": "*",
                "Effect": "Allow"
            }
        ]
    }

Background information

  • ETL is the process in which data is extracted, transformed, and loaded into a destination. You can write a function to implement the logic of the ETL task. ApsaraMQ for Kafka calls the function to process data in the source topic and send the data to the destination topic.

  • During data processing, Function Compute automatically creates the corresponding service and function. The name of the created service is in the _FC-kafka-ETL task name format.

  • The source topic and the destination topic on ApsaraMQ for Kafka instances must reside in the same region.

  • Function Compute allows you to query the logs of function calls to troubleshoot issues. For more information, see Configure the logging feature.

  • The ETL task feature of ApsaraMQ for Kafka is in public preview. This feature is independent of ApsaraMQ for Kafka instances. ApsaraMQ for Kafka does not charge you for this feature. If the ETL task that you create depends on other services, refer to the billing rules of the corresponding services.

Enable ETL

Important

ETL tasks are created in the Connector Ecosystem Integration module of the ApsaraMQ for Kafka console. This module provides data filtering and transformation capabilities. For more information, see Overview.

The first time you use the ETL task feature, you must authorize ApsaraMQ for Kafka to access related services. After you confirm the authorization, the system automatically creates the service-linked role AliyunServiceRoleForAlikafkaETL. ApsaraMQ for Kafka can assume the role to access the services that are used during the processing of an ETL task. For more information, see Service-linked roles.

  1. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  2. In the left-side navigation pane, choose Ecosystem Integration > ETL Tasks.

  3. On the ETL Tasks page, click Create Task.

  4. In the Service Authorization message that appears, click OK.

Create an ETL task

This section describes how to create and deploy an ETL task to extract data from a source topic, process the data, and then load the processed data to a destination topic.

  1. On the ETL Tasks page, click Create Task.

  2. In the Configure Basic Information step, enter a task name and click Next.

  3. In the Configure Source and Destination step, specify the data source, destination topic, and consumption information. Then, click Next.

    Parameter

    Description

    Example

    Instance

    The instances to which the source topic and the destination topic belong.

    • alikafka_pre-cn-7pp2bz47****

    • alikafka_post-cn-2r42bvbm****

    Topic

    The source topic and the destination topic.

    Note

    The source topic and the destination topic must be different topics.

    • topic****

    • test****

    Consumer Offset

    The offset from which you want messages to be consumed. This parameter is displayed only after you click Advanced Settings. Valid values:

    • Earliest Offset: Consume messages from the earliest offset.

    • Latest Offset: Consume messages from the latest offset.

    Latest Offset

    Failure Handling Policy

    Specifies whether to send subsequent messages if a message fails to be sent. This parameter is displayed only after you click Advanced Settings. Valid values:

    • Continue Subscription: Send subsequent messages if a message fails to be sent.

    • Stop Subscription: Do not send subsequent messages if a message fails to be sent.

    Continue Subscription

    Resource Creation Method

    The method that is used to create auxiliary topics that are required by the ETL task. This parameter is displayed only after you click Advanced Settings. Valid values:

    • Auto

    • Manual

    Auto

    Consumer Group

    The consumer group that is used by the ETL task. This parameter is displayed only if you set the Resource Creation Method parameter to Manual. We recommend that you use etl-cluster as the prefix of the consumer group name.

    etl-cluster-kafka

    Task Offset Topic

    The topic that is used to store consumer offsets. This parameter is displayed only if you set the Manual parameter to Resource Creation Method.

    • Topic: the name of the topic. We recommend that you use etl-offset as the prefix of the topic name.

    • Partitions: the number of partitions in the topic. This parameter must be set to a value greater than 1.

    • Storage Engine: the storage engine that is used by the topic. This parameter must be set to Local Storage.

      Note

      You can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.

    • cleanup.policy: the log cleanup policy that is used by the topic. This parameter must be set to Compact.

    etl-offset-kafka

    Task Configuration Topic

    The topic that is used to store task configurations. This parameter is displayed only if you set the Manual parameter to Resource Creation Method.

    • Topic: the name of the topic. We recommend that you use etl-config as the prefix of the topic name.

    • Partitions: the number of partitions in the topic. This parameter must be set to 1.

    • Storage Engine: the storage engine that is used by the topic. This parameter must be set to Local Storage.

      Note

      You can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.

    • cleanup.policy: the log cleanup policy that is used by the topic. This parameter must be set to Compact.

    etl-config-kafka

    Task Status Topic

    The topic that is used to store the task status. This parameter is displayed only if you set the Manual parameter to Resource Creation Method.

    • Topic: the name of the topic. We recommend that you use etl-status as the prefix of the topic name.

    • Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.

    • Storage Engine: the storage engine that is used by the topic. This parameter must be set to Local Storage.

      Note

      You can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.

    • cleanup.policy: the log cleanup policy that is used by the topic. This parameter must be set to Compact.

    etl-status-kafka

    Dead-letter Queue Topic

    The topic that is used to store the error data of the ETL framework. This parameter is displayed only if you set the Manual parameter to Resource Creation Method. To save topic resources, this topic can be the same as the Error Data Topic.

    • Topic: the name of the topic. We recommend that you use etl-error as the prefix of the topic name.

    • Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.

    • Storage Engine: the storage engine that is used by the topic. This parameter can be set to Local Storage or Cloud Storage.

      Note

      You can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.

    etl-error-kafka

    Error Data Topic

    The topic that is used to store the error data of the sink. This parameter is displayed only if you set the Manual parameter to Resource Creation Method. To save topic resources, this topic can be the same as the Dead-letter Queue Topic.

    • Topic: the name of the topic. We recommend that you use etl-error as the prefix of the topic name.

    • Partitions: the number of partitions in the topic. We recommend that you set this parameter to 6.

    • Storage Engine: the storage engine that is used by the topic. This parameter can be set to Local Storage or Cloud Storage.

      Note

      You can set the Storage Engine parameter to Local Storage only if you create a topic on a Professional Edition instance.

    etl-error-kafka

  4. In the Configure Function step, configure the parameters and click Create.

    Before you click Create, you can click Test to test whether the function works as expected.

    Parameter

    Description

    Example

    Programming Language

    The language in which the function is written. Set this parameter to Python 3.

    Python3

    Template

    The function template that the system provides. After you select a function template, the system automatically populates the Code field with the code of the function template.

    Add Prefix/Suffix

    Code

    The code that is used to process the message. ApsaraMQ for Kafka provides function templates that you can use to cleanse and transform data. You can modify the code of the selected function template based on your business requirements.

    Note
    • You can import Python modules based on your requirements.

    • The message in the code is in the dictionary format. You need only to modify the key and the value.

    • Return the processed message. If the function is used to filter messages, return None.

    def deal_message(message):
        for keyItem in message.keys():
            if (keyItem == 'key'):
                message[keyItem] = message[keyItem] + "KeySurfix"
                continue
            if (keyItem == 'value'):
                message[keyItem] = message[keyItem] + "ValueSurfix"
                continue
        return message

    Message Key

    The key of the message to be processed in the source topic. Click Test Code to display the parameter.

    demo

    Message Content

    The value of the message to be processed in the source topic.

    {"key": "test"}

    After the ETL task is created, you can view the task on the ETL Tasks page. The system automatically deploys the task.

Send a test message

After the ETL task is deployed, you can send a test message to the source ApsaraMQ for Kafka topic to test whether the data can be processed by the configured function and sent to the destination topic.

  1. On the ETL Tasks page, find the ETL task that you created and click Test in the Actions column.

  2. In the Send Message panel, enter the information of the test message and click OK to send the test message.

    • In the Message Key field, enter the key of the test message. Example: demo.

    • In the Message Content field, enter the content of the test message. Example: {"key": "test"}.

    • Configure the Send to Specified Partition parameter to specify whether to send the test message to a specific partition.

      • If you want to send the test message to a specific partition, click Partition ID and enter a partition ID in the Yes field. Example: 0. For information about how to query partition IDs, see View partition status.

      • If you do not want to send the test message to a specific partition, click No.

View function logs

After you extract and process data in ApsaraMQ for Kafka, you can view the logs of the function to verify whether the destination topic receives the processed data. For more information, see Configure the logging feature.

Figure 1. View function logs函数日志查询

View the details of an ETL task

After you create an ETL task, you can view its details in the ApsaraMQ for Kafka console.

On the ETL Tasks page, find the ETL task that you created and click Details in the Actions column.

On the Task Details page, view the details of the ETL task.

Delete an ETL task

If you no longer require an ETL task, you can delete the task in the ApsaraMQ for Kafka console.

  1. On the ETL Tasks page, find the ETL task that you want to delete and click Delete in the Actions column.

  2. In the Notes message that appears, click OK.