All Products
Search
Document Center

EventBridge:Synchronize data from ApsaraMQ for RabbitMQ to ApsaraMQ for Kafka by using the RabbitMQ connector

Last Updated:Feb 28, 2026

The RabbitMQ connector reads messages from an ApsaraMQ for RabbitMQ queue and writes them to an ApsaraMQ for Kafka topic. The connector runs on the Apache Kafka Connect framework, which EventBridge provisions as a Serverless App Engine (SAE) application.

Prerequisites

Before you begin, make sure you have completed the following:

Step 1: Create ApsaraMQ for RabbitMQ resources

  1. Log on to the ApsaraMQ for RabbitMQ console and create an instance. For more information, see Create an instance.

  2. Click the instance that you created. On the Instance Details page, create the following resources:

    1. In the left-side navigation pane, click Users and Permissions. On the page that appears, click Create Username/Password. For more information, see Create a pair of username and password. Save the username and password after you create them. Create a static account for the ApsaraMQ for RabbitMQ instance

    2. In the left-side navigation pane, click Vhosts and then click Create vhost. For more information, see Create an exchange.

    3. In the left-side navigation pane, click Queues. On the page that appears, click Change next to vhost. Select the vhost you created from the drop-down list and click Create Queue. For more information, see Create a queue.

Step 2: Create the connector

  1. Download the RabbitMQ Connector file and upload it to the OSS bucket you created. For more information, see Get started by using the OSS console.

  2. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where your ApsaraMQ for Kafka instance resides.

  3. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  4. On the Tasks page, click Create Task.

  5. On the Create Task page, configure the Task Name parameter and follow the on-screen instructions to configure other parameters as described below.

Configure the data source

  1. In the Source step, set Data Provider to Apache Kafka Connect and click Next Step.

  2. In the Connector step, configure the following parameters and click Next Step. Update the following fields in the code editor: Sample .properties file:

    SubsectionParameterDescription
    Kafka Connect Plug-inBucketThe OSS bucket to which the RabbitMQ Connector file was uploaded.
    FileThe RabbitMQ Connector file you uploaded to the OSS bucket.
    Message Queue for Apache Kafka ResourcesMessage Queue for Apache Kafka ParametersSelect Source Connect.
    Message Queue for Apache Kafka InstanceThe ApsaraMQ for Kafka instance you created in the prerequisites.
    VPCAuto-populated from your Kafka instance deployment. This value cannot be changed.
    vSwitchAuto-populated from your Kafka instance deployment. This value cannot be changed.
    Security GroupSelect a security group.
    Kafka ConnectParse .properties File in ZIP PackageSelect Create .properties File. Then, select the .properties file that contains the source connector configurations from the ZIP package. The path is /etc/source-xxx.properties.
    ParameterDescription
    connector.classThe connector class name. Use the default value.
    tasks.maxThe maximum number of tasks.
    rabbitmq.hostThe VPC endpoint of the ApsaraMQ for RabbitMQ instance. Find this in the Endpoint Information section of the Instance Details page.
    rabbitmq.usernameThe username you created in Step 1.
    rabbitmq.passwordThe password you created in Step 1.
    rabbitmq.virtual.hostThe vhost you created in Step 1.
    kafka.topicThe destination ApsaraMQ for Kafka topic. Create this topic before synchronizing data.
    rabbitmq.queueThe queue you created in Step 1.
       connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector
       name=rabbitmq-source-connector
       # The VPC endpoint of the ApsaraMQ for RabbitMQ instance.
       rabbitmq.host=<your-rabbitmq-vpc-endpoint>
       # The static password for the ApsaraMQ for RabbitMQ instance.
       rabbitmq.password=<your-rabbitmq-password>
       # The static username for the ApsaraMQ for RabbitMQ instance.
       rabbitmq.username=<your-rabbitmq-username>
       # The vhost of the ApsaraMQ for RabbitMQ instance.
       rabbitmq.virtual.host=<your-rabbitmq-vhost>
       # The destination ApsaraMQ for Kafka topic.
       kafka.topic=<your-kafka-topic>
       # The queue of the ApsaraMQ for RabbitMQ instance.
       rabbitmq.queue=<your-rabbitmq-queue>
       tasks.max=4

Configure workers

  1. In the Instance step, configure the following parameters and click Next Step. The following internal resources are created automatically:

    SubsectionParameterDescription
    Worker TypeWorker TypeSelect a worker type.
    Min. Number of WorkersThe minimum number of workers.
    Max. Number of WorkersThe maximum number of workers. The value cannot exceed the tasks.max value.
    Horizontal Scaling Threshold (%)The CPU utilization threshold that triggers auto scaling. Required only if the minimum and maximum worker counts differ.
    Worker ConfigurationsAutomatically Create Dependencies for Apache Kafka Connector WorkerSelect this option (recommended). The system creates the internal topics and consumer groups required to run Kafka Connect in the selected ApsaraMQ for Kafka instance and fills the resource information in the code editor.
    ResourceNaming format
    Offset Topicconnect-eb-offset-<Task name>
    Config Topicconnect-eb-config-<Task name>
    Status Topicconnect-eb-status-<Task name>
    Kafka Connect Consumer Groupconnect-eb-cluster-<Task name>
    Kafka Source Connector Consumer Groupconnector-eb-cluster-<task name>-<connector name>

Configure running settings and task properties

  1. In the Running Configurations section, set the Log Delivery parameter to Deliver Data to Log Service or Deliver Data to ApsaraMQ for Kafka. In the Role Authorization subsection, select a role from the Role drop-down list, and then click Save.

    Important

    Select a role that has the AliyunSAEFullAccess permission policy attached. Otherwise, the task may fail to run.

  2. Configure the retry policy and dead-letter queue for the task. For more information, see Retry policies and dead-letter queues.

When the task status changes to Running, the connector is working as expected.

Step 3: Test the connector

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where your instance resides. In the instance list, click the instance name.

  3. In the left-side navigation pane, click Queues. Find the queue and click Details in the Actions column.

  4. On the Bound as Destination tab of the Queue Details page, click Add Binding.

  5. In the Bound as Destination panel, select amq.direct for Source Exchange and click OK.

  6. On the Bound as Destination tab, find the amq.direct exchange and click Send Message in the Actions column to send a message to the destination ApsaraMQ for Kafka topic. For more information, see Send messages.

    Send a message from the RabbitMQ queue

  7. Log on to the ApsaraMQ for Kafka console. On the Instances page, click your instance.

  8. Click the topic and then click the Message Query tab to verify that the message was delivered.

    Verify message delivery in the Kafka topic

Troubleshooting

All tasks fail to run

Error message:

All tasks under connector mongo-source failed, please check the error trace of the task.

Solution: On the Message Inflow Task Details page, click Diagnostics in the Basic Information section to go to the Connector Monitoring page. View the task failure details there.

Kafka Connect unexpectedly exits

Error message:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

Solution: The status update of Kafka Connect may be delayed. Refresh the page first. If Kafka Connect still fails, troubleshoot by following these steps:

  1. In the Worker Information section of the Message Inflow Task Details page, click the instance name next to SAE Application to go to the Application Details page.

  2. On the Basic Information page, click the Instance Deployment Information tab.

  3. Click Webshell in the Actions column to log on to the running environment of Kafka Connect.

    • Run vi /home/admin/connector-bootstrap.log to view the startup logs and check for error messages.

    • Run vi /opt/kafka/logs/connect.log to view the running logs and check for ERROR or WARN entries.

    实例部署信息

After troubleshooting the issue, restart the task.

Connector parameter verification fails

Error message:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

Solution: Find the parameter with the invalid value from the error message and update it. If the error message does not identify the parameter, log on to the Kafka Connect running environment (see Kafka Connect unexpectedly exits) and run the following command:

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

The response returns the validation result for each connector parameter. If a parameter value is invalid, its errors field is not empty.

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}