This topic describes how to create a sink connector in the ApsaraMQ for Kafka console to synchronize data from ApsaraMQ for Kafka to ApsaraMQ for RabbitMQ.
Prerequisites
An ApsaraMQ for Kafka instance is purchased and deployed. Make sure that the instance is in the Running state. For more information, see Step 2: Purchase and deploy an instance.
An ApsaraMQ for RabbitMQ instance is purchased and deployed. Make sure that the instance is in the Running state. For more information, see Step 2: Create resources.
EventBridge is activated and the required permissions are granted to a Resource Access Management (RAM) user. For more information, see Activate EventBridge and grant permissions to a RAM user.
Create a sink connector task
Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.
In the left-side navigation pane, choose .
On the Tasks page, click Create Task.
On the Create Task page, configure the Task Name and Description parameters and follow the on-screen instructions to configure other parameters. Then, click Save. The following section describes the parameters:
Task Creation
In the Source step, set the Data Provider parameter to ApsaraMQ for Kafka and follow the on-screen instructions to configure other parameters. Then, click Next Step. The following table describes the parameters.
Parameter
Description
Example
Region
The region where the ApsaraMQ for Kafka instance resides.
China (Beijing)
ApsaraMQ for Kafka Instance
The ApsaraMQ for Kafka instance in which the messages that you want to route are produced.
MQ_INST_115964845466****_ByBeUp3p
Topic
The topic on the ApsaraMQ for Kafka instance in which the messages that you want to route are produced.
topic
Group ID
The name of the consumer group on the ApsaraMQ for Kafka instance. You must use a separate consumer group to create the message routing source. Do not use a consumer group that is in use. Otherwise, existing messages may fail to be sent and received.
GID_http_1
Consumer Offset
The offset from which messages are consumed.
Latest Offset
Network Configuration
The type of the network over which you want to route messages.
Basic Network
VPC
The ID of the virtual private cloud (VPC) in which the ApsaraMQ for Kafka instance is deployed. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.
vpc-bp17fapfdj0dwzjkd****
vSwitch
The ID of the vSwitch with which the ApsaraMQ for Kafka instance is associated. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.
vsw-bp1gbjhj53hdjdkg****
Security Group
The security group to which the ApsaraMQ for Kafka instance belongs. This parameter is required only if you set the Network Configuration parameter to Self-managed Internet.
alikafka_pre-cn-7mz2****
Messages
The maximum number of messages that can be sent in each function invocation. Requests are sent only when the number of messages in the backlog reaches the specified value. Valid values: 1 to 10000.
100
Interval (Unit: Seconds)
The time interval at which the function is invoked. The system sends the aggregated messages to Function Compute at the specified time interval. Valid values: 0 to 15. Unit: seconds. The value 0 specifies that messages are sent immediately after aggregation.
3
In the Filtering step, define a data pattern in the Pattern Content code editor to filter data. For more information, see Event patterns.
In the Transformation step, specify a data cleansing method to implement data splitting, mapping, enrichment, and routing capabilities. For more information, see Use Function Compute to perform message cleansing.
In the Sink step, set the Service Type parameter to ApsaraMQ for RabbitMQ and follow the on-screen instructions to configure other parameters. The following table describes the parameters.
Parameter
Description
Example
Instance ID
The destination ApsaraMQ for RabbitMQ instance that you created.
amqp-cn-zvp2pny6****
vhost
The vhost that you created.
test
Destination Type
Exchange: A producer sends a message to an exchange, and then the exchange routes the message to one or more queues.
Queue: Each message is sent to one or more queues.
Queue
Exchange
The exchange to which messages are routed on the ApsaraMQ for RabbitMQ instance. This parameter is required only if you set the Destination Type parameter to Exchange.
exchange
Queue
The queue to which messages are routed on the ApsaraMQ for RabbitMQ instance. This parameter is required only if you set the Destination Type parameter to Queue.
queue
Message Routing Key
EventBridge extracts data from a message by using JSONPath and routes the specified content of the message to the event target. This parameter is required only if you set the Destination Type parameter to Exchange.
Data Extraction
Fixed Value
Data Extraction
$.data.keyMessage Body
EventBridge extracts data from a message by using JSONPath and routes the specified content of the message to the event target.
Complete Data
Data Extraction
Fixed Value
Template
Data Extraction
$.data.bodyMessageId
EventBridge extracts data from a message by using JSONPath and routes the specified content of the message to the event target.
Empty
Data Extraction
Fixed Value
Template
Data Extraction
$.data.props.messageIdEventBridge extracts data from a message by using JSONPath and routes the specified content of the message to the event target.
Empty
Data Extraction
Template
Data Extraction
$.data.props
Task Property
Configure the retry policy that you want to use when events fail to be pushed and the method that you want to use to handle faults. For more information, see Retry policies and dead-letter queues.
Go back to the Tasks page, find the ApsaraMQ for RabbitMQ sink connector that you created, and then click Enable in the Actions column.
In the Note message, click OK.
The sink connector requires 30 to 60 seconds to be enabled. You can view the progress in the Status column on the Tasks page.
Other operations
On the Tasks page, find the sink connector that you want to manage and perform other operations in the Actions column.
View the details of the connector: Click Details in the Actions column. On the Task Details page, view the basic information, properties, and monitoring metrics of the connector.
Edit the configurations of the connector: Click Edit in the Actions column. In the Edit Task panel, change the details and properties of the connector.
Enable or disable the connector: Click Enable or Pause in the Actions column. In the Note message, click OK.
Delete the connector: Click Delete in the Actions column. In the Note message, click OK.