All Products
Search
Document Center

EventBridge:Use the RabbitMQ connector to synchronize data from ApsaraMQ for RabbitMQ to ApsaraMQ for Kafka

Last Updated:Dec 06, 2023

This topic describes how to create the RabbitMQ connector to synchronize data from ApsaraMQ for RabbitMQ to ApsaraMQ for Kafka.

Prerequisites

Step 1: Create ApsaraMQ for RabbitMQ resources

  1. Log on to the ApsaraMQ for RabbitMQ console and create an instance. For more information, see Create an instance.

  2. Click the instance that you created. On the Instance Details page, perform the following steps to create resources:

    1. In the left-side navigation pane, click Static Accounts. On the page that appears, click Create Username/Password. For more information, see Create a pair of username and password.

      Save the username and password after you create them.image..png

    2. In the left-side navigation pane, click vhosts and then click Create vhost. For more information, see Create an exchange.

    3. In the left-side navigation pane, click Queues. On the page that appears, click Change next to vhost. Then, select the created vhost from the drop-down list and click Create Queue. For more information, see Create a queue.

Step 2: Create the connector

  1. Download the RabbitMQ Connector file and upload the file to the OSS bucket that you created. For more information, see Get started by using the OSS console.

  2. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  3. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  4. On the Tasks page, click Create Task.

  5. On the Create Task page, configure the Task Name parameter and follow the on-screen instructions to configure other parameters. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Apache Kafka Connect and click Next Step.

      2. In the Connector step, configure the parameters and click Next Step. The following table describes the parameters.

        Subsection

        Parameter

        Description

        Kafka Connect Plug-in

        Bucket

        Select the OSS bucket to which the RabbitMQ Connector file was uploaded.

        File

        Select the RabbitMQ Connector file that you uploaded to the OSS bucket.

        Message Queue for Apache Kafka Resources

        Message Queue for Apache Kafka Parameters

        Select Source Connect.

        Message Queue for Apache Kafka Instance

        Select the ApsaraMQ for Kafka instance that you created in the "Prerequisites" section of this topic.

        VPC

        By default, the VPC that you selected when you deployed the ApsaraMQ for Kafka instance is specified. You cannot change the value of this parameter.

        vSwitch

        By default, the vSwitch ID that you selected when you deployed the ApsaraMQ for Kafka instance is specified. You cannot change the value of this parameter.

        Security Group

        Select a security group.

        Kafka Connect

        Parse .properties File in ZIP Package

        Select Create .properties File. Then, select the .properties file that contains the configurations of the source connector from the ZIP package. The path is /etc/source-xxx.properties. Update the values of the following fields in the code editor:

        • connector.class: the name of the RabbitMQ Connector file. Use the default value for this field.

        • tasks.max: the maximum number of tasks.

        • rabbitmq.host: the VPC endpoint of the ApsaraMQ for RabbitMQ instance. You can obtain the information about the endpoint in the Endpoint Information section of the Instance Details page.

        • rabbitmq.username: the username. Enter the username that you created for the ApsaraMQ for RabbitMQ instance in Step 1: Create ApsaraMQ for RabbitMQ resources.

        • rabbitmq.password: the password. Enter the password that you created for the ApsaraMQ for RabbitMQ instance in Step 1: Create RabbitMQ Resources.

        • rabbitmq.virtual.host: the vhost. Enter the vhost that you created in Step 1: Create ApsaraMQ for RabbitMQ resources.

        • kafka.topic: the ApsaraMQ for Kafka topic to which you want to synchronize data. You must create the topic before you synchronize data.

        • rabbitmq.queue: the queue. Enter the queue that you created in Step 1: Create ApsaraMQ for RabbitMQ resources.

        Sample code:

        connector.class=com.ibm.eventstreams.connect.rabbitmqsource.RabbitMQSourceConnector
        name=rabbitmq-source-connector
        # The VPC endpoint that is used to connect to the ApsaraMQ for RabbitMQ instance. 
        rabbitmq.host=xxx
        # The static password that you created for the ApsaraMQ for RabbitMQ instance. 
        rabbitmq.password=xxx
        # The static username that you created for the ApsaraMQ for RabbitMQ instance. 
        rabbitmq.username=xxx
        # The vhost that you created for the ApsaraMQ for RabbitMQ instance. 
        rabbitmq.virtual.host=xxx
        # The destination ApsaraMQ for Kafka topic. 
        kafka.topic=xxx
        # The queue that you created for the ApsaraMQ for RabbitMQ instance. 
        rabbitmq.queue=xxx
        tasks.max=4
      3. In the Instance step, configure the parameters and click Next Step. The following table describes the parameters.

        Subsection

        Parameter

        Description

        Worker Type

        Worker Type

        Select a worker type.

        Min. Number of Workers

        Specify the minimum number of workers.

        Max. Number of Workers

        Specify the maximum number of workers. The value of this parameter cannot be greater than the value of the tasks.max parameter.

        Horizontal Scaling Threshold (%)

        If the CPU utilization is greater than or less than the specified value, auto scaling is triggered. This parameter is required only if the values of the Min. Number of Workers and Max. Number of Workers parameters are different.

        Worker Configurations

        Automatically Create Dependencies for Apache Kafka Connector Worker

        We recommend that you select this option. After you select this option, the system creates internal topics and consumer groups that are required to run Kafka Connect in the selected ApsaraMQ for Kafka instance and fills the resource information in the code editor. The following items describe the configuration items:

        • Offset Topic: the topic that is used to store offset data. The name of the topic is in the connect-eb-offset-<Task name> format.

        • Config Topic: the topic that is used to store the configuration data of connectors and tasks. The name of the topic is in the connect-eb-config-<Task name> format.

        • Status Topic: the topic that is used to store status data of connectors and tasks. The name of the topic is in the connect-eb-status-<Task name> format.

        • Kafka Connect Consumer Group: the consumer group that the Kafka Connect workers use to consume messages in the internal topics. The name of the consumer group is in the connect-eb-cluster-<Task name> format.

        • Kafka Source Connector Consumer Group: the consumer group that is used to consume data in the source topic. This consumer group can be used only by the RabbitMQ sink connector. The name of the consumer group is in the connector-eb-cluster-<task name>-<connector name> format.

      4. In the Running Configurations section, set the Log Delivery parameter to Deliver Data to Log Service or Deliver Data to ApsaraMQ for Kafka, select a role on which Kafka Connect depends from the Role drop-down list in the Role Authorization subsection, and then click Save.

        Important

        We recommend that you select a role to which the AliyunSAEFullAccess permission policy is attached. Otherwise, the task may fail to run.

    • Task Property

      Configure the retry policy and dead-letter queue for the task. For more information, see Retry policies and dead-letter queues.

    If the status of the task becomes Running, the connector starts to work as expected.

Step 3: Test the connector

  1. Log on to the ApsaraMQ for RabbitMQ console. In the left-side navigation pane, click Instances.

  2. In the top navigation bar of the Instances page, select the region where the instance that you want to manage resides. Then, in the instance list, click the name of the instance that you want to manage.

  3. In the left-side navigation pane, click Queues. Find the queue that you want to manage and click Details in the Actions column.

  4. On the Bound as Destination tab of the Queue Details page, click Add Binding.

  5. In the Bound as Destination panel, select amq.direct for the Source Exchange parameter and click OK.

  6. On the Bound as Destination tab, find the amq.direct exchange and click Send Message in the Actions column to send a message to the destination ApsaraMQ for Kafka topic. For more information, see Send messages.image..png

  7. Log on to the ApsaraMQ for Kafka console. On the Instances page, click the instance that you want to manage.

  8. On the page that appears, click the topic that you want to manage and click the Message Query tab to view the inserted message data.image..png

Common errors and troubleshooting

Error 1: All tasks fail to run

Error message:

All tasks under connector mongo-source failed, please check the error trace of the task.

Solution: On the Message Inflow Task Details page, click Diagnostics in the Basic Information section to go to the Connector Monitoring page. On the Connector Monitoring page, you can view the details of the task failure.

Error 2: Kafka Connect unexpectedly exits

Error message:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

Solution: The status update of Kafka Connect may be delayed. We recommend that you refresh the page. If Kafka Connect still fails, you can perform the following operations to troubleshoot the issue:

  1. In the Worker Information section of the Message Inflow Task Details page, click the instance name next to SAE Application to go to the Application Details page.

  2. On the Basic Information page, click the Instance Deployment Information tab.

  3. Click Webshell in the Actions column to log on to the running environment of Kafka Connect.实例部署信息

    • Run the vi /home/admin/connector-bootstrap.log command to view the startup logs of the connector and check whether an error message exists in the logs.

    • Run the vi /opt/kafka/logs/connect.log command to view the running logs of the connector and check whether an error message exists in the ERROR or WARN field.

After you troubleshoot the issue based on the error message, you can restart the corresponding task.

Error 3: The verification of connector parameters fails

Error message:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

Solution: Find the parameter whose value is invalid based on the error message and update the parameter. If you cannot find the parameter based on the error message, you can log on to the running environment of Kafka Connect and run the following command. For information about how to log on to the running environment of Kafka Connect, see Error 2 in this topic.

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

The verification result of each connector parameter is returned in the response. If the value of a parameter is invalid, the errors field of the parameter is not empty.

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}