This topic describes how to create a message inflow task in the ApsaraMQ for RocketMQ console to synchronize data from ApsaraMQ for Kafka to ApsaraMQ for RocketMQ.
Prerequisites
An ApsaraMQ for RocketMQ instance is purchased and deployed. Make sure that the instance is in the Running state. For more information, see Create resources.
An ApsaraMQ for Kafka instance is purchased and deployed. Make sure that the instance is in the Running state. For more information, see Step 3: Create a resource.
Create a message inflow task
Log on to the ApsaraMQ for RocketMQ console. In the left-side navigation pane, choose .
In the top navigation bar, select a region, such as China (Hangzhou). On the Tasks page, click Create Task.
In the Create Task page, configure the Task Name and Description parameters. Then, follow the on-screen instructions to configure other parameters. The following section describes the parameters:
Task Creation
In the Source step, set the Data Provider parameter to ApsaraMQ for Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.
Parameter
Description
Example
Region
The region where the source ApsaraMQ for Kafka instance resides.
China (Beijing)
Message Queue for Apache Kafka Instance
The ApsaraMQ for Kafka instance in which the messages that you want to route are produced.
MQ_INST_115964845466****_ByBeUp3p
Topic
The topic on the ApsaraMQ for Kafka instance in which the messages that you want to route are produced.
topic
Group ID
The name of the consumer group on the source instance. You must use a separate consumer group to create the message routing source. Do not use the same consumer group for ApsaraMQ for Kafka and another existing messaging service. Otherwise, you may fail to send or receive messages by using the existing messaging service.
GID_http_1
Consumer Offset
The offset from which messages are consumed.
Latest Offset
Earliest Offset
Latest Offset
Network Configuration
The type of network over which you want to route messages.
Basic Network
Internet
Basic Network
VPC
The ID of the virtual private cloud (VPC) in which the ApsaraMQ for Kafka instance is deployed. This parameter is required only if you set the Network Configuration parameter to Internet.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The ID of the vSwitch with which the ApsaraMQ for Kafka instance is associated. This parameter is required only if you set the Network Configuration parameter to Internet.
vsw-bp1gbjhj53hdjdkg****
Security Group
The ID of the security group to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Internet.
alikafka_pre-cn-7mz2****
Data Format (Body)
The data format feature is used to encode binary data delivered from the source into a specific data format. Multiple data formats are supported. If you do not have special requirements on encoding, specify JSON as the value.
JSON: Binary data is encoded into JSON-formatted data based on UTF-8 encoding and then put into the payload. This is the default value.
Text: Binary data is encoded into strings based on UTF-8 encoding and then put into the payload.
Binary: Binary data is encoded into strings based on Base64 encoding and then put into the payload.
Json
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
100
Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 indicates that messages are sent immediately after aggregation.
3
In the Filtering step, define a data pattern to filter data. For more information, see Event patterns.
In the Transformation step, specify a data cleansing method to implement data processing capabilities such as splitting, mapping, enrichment, and dynamic routing. For more information, see Use Function Compute to perform message cleansing.
In the Sink step, set the Service Type parameter to ApsaraMQ for RocketMQ and follow the on-screen instructions to configure other parameters. The following table describes the parameters.
Parameter
Description
Example
Version
The version of the ApsaraMQ for RocketMQ instance to which you want to route messages. Valid values:
RocketMQ 4.x: ApsaraMQ for RocketMQ 4.x.
RocketMQ 5.x: ApsaraMQ for RocketMQ 5.x.
RocketMQ 5.x
Instance ID
The ApsaraMQ for RocketMQ instance to which you want to route messages.
rmq-cn-****
Topic
The topic on the ApsaraMQ for RocketMQ instance to which you want to route messages.
topic
Message Body
Complete Data
Data Extraction
Fixed Value
Template
Data Extraction
$.data.body
Custom Property
Not Specified
Data Extraction
Template
Template
Parameters:
{ "userProperties":"$.data.userProperties", "msgId":"$.data.systemProperties.UNIQ_KEY" }
Template:
{ "EB_SYS_EMBED_OBJECT":"${userProperties}", "UNIQ_KEY":"${msgId}" }
Message Key
Not Specified
Data Extraction
Fixed Value
Template
Data Extraction
$.data.systemProperties.KEYS
Message Tag
Not Specified
Data Extraction
Fixed Value
Template
Data Extraction
$.data.systemProperties.TAGS
Task Property
Configure the retry policy that is used when events fail to be pushed and the method that is used to handle faults. For more information, see Retry policies and dead-letter queues.
Click Save. On the Tasks page, find the task that you created. When the status in the Status column changes from Starting to Running, the task is created.
Other operations
On the Tasks page, find the task that you want to manage and perform other operations in the Actions column.
View the task details: Click Details in the Actions column. On the Task Details page, view the basic information, properties, and monitoring metrics of the task.
Modify the task configurations: Click Edit in the Actions column. In the Edit Task panel, modify the details and properties of the task.
Enable or disable the task: Click Enable or Pause in the Actions column. In the Note message, click OK.
Delete the task: Click Delete in the Actions column. In the Note message, click OK.