All Products
Search
Document Center

ApsaraMQ for RocketMQ:DTS

Last Updated:Dec 04, 2024

This topic describes how to create a message inflow task in the ApsaraMQ for RocketMQ console to synchronize data from Data Transmission Service (DTS) to ApsaraMQ for RocketMQ.

Prerequisites

  • An ApsaraMQ for RocketMQ instance is purchased and deployed. Make sure that the instance is in the Running state. For more information, see Create resources.

  • A change tracking task is created in the DTS console. Make sure that the change tracking task is in the Normal state. For more information, see Manage a change tracking task.

  • A consumer group is created in the change tracking task. For more information, see Create consumer groups.

Supported regions

When you create a message inflow task, you can use DTS as the source in the following regions: China (Hangzhou), China (Shanghai), China (Qingdao), China (Beijing), China (Shenzhen), China (Guangzhou), China (Chengdu), and China (Hong Kong).

Create a message inflow task

  1. Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, choose Message Integration > Tasks.

  2. In the top navigation bar, select a region, such as China (Hangzhou). On the Tasks page, click Create Task.

  3. In the Create Task page, configure the Task Name and Description parameters. Then, follow the on-screen instructions to configure other parameters. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Data Transmission Service (DTS) and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.

        Parameter

        Description

        Example

        Region

        The region that you selected when you created the message inflow task is automatically filled.

        China (Hangzhou)

        Data Subscription Task

        The ID of the change tracking task that you created in the DTS console.

        dts8jqe****

        Access Method

        The access method of the database instance that serves as the source of the change tracking task. You cannot change the value of this parameter.

        RDS

        Instance ID

        The ID of the database instance that serves as the source of the change tracking task. You cannot change the value of this parameter.

        rm-bp18mj3q2dzyb****

        Consumer Group

        The name of the consumer group that you created to consume the data of the change tracking task.

        Note

        Make sure that the consumer group runs on only one client. Otherwise, the specified consumer offset may become invalid.

        test

        Account

        The account name that you specified when you created the consumer group.

        test

        Password

        The account password that you specified when you created the consumer group.

        ******

        Consumer Offset

        The time when the first data entry is to be consumed. The data entry that is specified by the consumer offset must be within the data range of the change tracking task.

        Note

        The consumer offset that you specify takes effect only when the consumer group consumes data for the first time. If the change tracking task is restarted, the consumer group consumes data from the last recorded consumer offset.

        2022-06-21 00:00:00

        Messages

        The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.

        100

        Interval (Unit: Seconds)

        The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.

        3

      2. In the Filtering step, define a data pattern to filter data. For more information, see Event patterns.

      3. In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.

      4. In the Sink step, set the Service Type parameter to ApsaraMQ for RocketMQ and follow the on-screen instructions to configure other parameters. The following table describes the parameters.

        Parameter

        Description

        Example

        Version

        The version of the ApsaraMQ for RocketMQ instance to which you want to route messages. Valid values:

        • RocketMQ 4.x: ApsaraMQ for RocketMQ 4.x.

        • RocketMQ 5.x: ApsaraMQ for RocketMQ 5.x.

        RocketMQ 5.x

        Instance ID

        The ApsaraMQ for RocketMQ instance to which you want to route messages.

        rmq-cn-****

        Topic

        The topic on the ApsaraMQ for RocketMQ instance to which you want to route messages.

        topic

        Message Body

        • Complete Data

        • Data Extraction

        • Fixed Value

        • Template

        Data Extraction

        $.data.body

        Custom Property

        • Not Specified

        • Data Extraction

        • Template

        Template

        Parameters:

        {
          "userProperties":"$.data.userProperties",
          "msgId":"$.data.systemProperties.UNIQ_KEY"
        }

        Template:

        {
          "EB_SYS_EMBED_OBJECT":"${userProperties}",
          "UNIQ_KEY":"${msgId}"
        }

        Message Key

        • Not Specified

        • Data Extraction

        • Fixed Value

        • Template

        Data Extraction

        $.data.systemProperties.KEYS

        Message Tag

        • Not Specified

        • Data Extraction

        • Fixed Value

        • Template

        Data Extraction

        $.data.systemProperties.TAGS
    • Task Property

      Configure the retry policy that is used when events fail to be pushed and the method that is used to handle faults. For more information, see Retry policies and dead-letter queues.

  4. Click Save. On the Tasks page, find the task that you created. When the status in the Status column changes from Starting to Running, the task is created.

Other operations

On the Tasks page, find the task that you want to manage and perform other operations in the Actions column.

  • View the task details: Click Details in the Actions column. On the Task Details page, view the basic information, properties, and monitoring metrics of the task.

  • Modify the task configurations: Click Edit in the Actions column. In the Edit Task panel, modify the details and properties of the task.

  • Enable or disable the task: Click Enable or Pause in the Actions column. In the Note message, click OK.

  • Delete the task: Click Delete in the Actions column. In the Note message, click OK.