All Products
Search
Document Center

ApsaraMQ for Kafka:Use the MongoDB Kafka Connector to connect to ApsaraDB for MongoDB databases

Last Updated:Dec 12, 2023

This topic describes how to create the MongoDB Kafka Connector to synchronize data between an ApsaraDB for MongoDB database and an ApsaraMQ for Kafka instance.

Prerequisites

Step 1: Create a table

  1. Log on to the ApsaraDB for MongoDB console and create an instance. You can also use an existing instance. In this topic, a sharded cluster instance is created. For more information, see Create a sharded cluster instance.

    Important
    • Make sure the ApsaraDB for MongoDB instance that you create or use is deployed in the same VPC as the ApsaraMQ for Kafka instance that you created in the "Prerequisites" section of this topic. Otherwise, the connection cannot be established.

    • When you create the ApsaraDB for MongDB instance, the system automatically creates the root username. When you configure the password, do not include at signs (@) or colons (:).

    • Make sure the ApsaraDB for MongoDB instance that you create or use is deployed in the same vSwitch as the ApsaraMQ for Kafka instance that is created in the "Prerequisites" section of this topic. If the existing instance that you use is not deployed in the same vSwitch as the ApsaraMQ for Kafka instance, you can add the CIDR block of the vSwitch in which the ApsaraMQ for Kafka instance is deployed to the whitelist of the ApsaraDB for MongoDB instance. For more information, see Configure a whitelist for an ApsaraDB for MongoDB instance. You can obtain the CIDR block of the vSwitch on the vSwitch Details page in the VPC console.

  2. On the Sharded Cluster Instances page, find the instance that you created and click the instance name to go to the Instance Details page. Then, perform the following operations:

    1. Add the IP address of the Data Management (DMS) server to the whitelist of the instance. For more information, see Configure a whitelist for an ApsaraDB for MongoDB instance.

    2. In the Connection Info section of the Basic Information page, view and record the VPC endpoint.

      连接地址

  3. On the Basic Information page, click Log On to go to the DMS console. For more information, see Connect to an ApsaraDB for MongoDB sharded cluster instance by using DMS.

  4. Create a database and a collection on the created instance.

    • On the SQL Console page, run the following command to create a test database:

      use test
    • Run the following command in the test database to create a collection named mongo:

      db.createCollection("mongo")

    For more information, see Create a database and a collection and write data.

Step 2: Create the connector

Source Connector

  1. Download the MongoDB Connector V1.8.1 file and decompress the file to your local computer. The following code shows the directory structure:

    Important

    When you download the MongoDB Connector file, select a version that is compatible with Java 8.

    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Download avro-1.9.2.jar and mongodb-driver-sync-4.9.0.jar from the Maven repository and move the two JAR packages to the lib directory in the MongoDB Connector folder. Then, compress the MongoDB Connector file into a ZIP file and upload the ZIP file to the created OSS bucket. For more information, see Get started by using the OSS console.

  3. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  4. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  5. On the Tasks page, click Create Task.

  6. On the Create Task page, configure the Task Name parameter and follow the on-screen instructions to configure other parameters. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Apache Kafka Connect and click Next Step.

      2. In the Connector step, configure the parameters and click Next Step. The following table describes the parameters.

        Subsection

        Parameter

        Description

        Kafka Connect Plug-in

        Bucket

        Select the OSS bucket to which the MongoDB Connector file was uploaded.

        File

        Select the MongoDB Connector file that you uploaded to the OSS bucket.

        Message Queue for Apache Kafka Resources

        Message Queue for Apache Kafka Parameters

        Select Source Connect.

        Message Queue for Apache Kafka Instance

        Select the ApsaraMQ for Kafka instance that you created in the "Prerequisites" section of this topic.

        VPC

        Select the VPC that you created.

        vSwitch

        Select the vSwitch that you created.

        Security Group

        Select a security group.

        Kafka Connect

        Parse .properties File in ZIP Package

        Select Create .properties File. Then, select the .properties file that contains the configurations of the source connector from the ZIP package. The path of the .properties file is /etc/MongoSourceConnector.properties.

        Change the values of the related fields in the code editor. Expand to view the field description

        Field

        Description

        connector.class

        The name of the MongoDB Connector file. Use the default value for this field.

        tasks.max

        The maximum number of tasks. Set this field to 1.

        connection.url

        The URL that is used to access the ApsaraDB for MongDB database. Specify the VPC endpoint that you obtained in Step 1: Create a table. Replace **** in the endpoint with the password of the root account.

        database

        The name of the ApsaraDB for MongoDB database.

        collection

        The name of the ApsaraDB for MongoDB collection.

        topic.namespace.map

        The information about the destination topic. The value of this field is in the key-value pair format. The key is in the database.{collection} format, and the value is the name of the destination topic. This field specifies that data changes in the specified collection are synchronized to the specified topic. Before you synchronize data, you must create the destination topic.

        copy.existing

        Specifies whether to synchronize all existing data in the source ApsaraDB for MongoDB collection to the destination ApsaraMQ for Kafka topic. If you set this parameter to true, all existing data in the ApsaraDB for MongoDB collection is synchronized to the destination ApsaraMQ for Kafka topic when the connector is started for the first time. To prevent repeated consumption that is caused by the synchronization of all existing data after a connector is recreated, we recommend that you change the value of this parameter to false after the first synchronization of all existing data is complete.

        Expand to view sample code

        connector.class=com.mongodb.kafka.connect.MongoSourceConnector
        name=mongo-source
        batch.size=0
        change.stream.full.document=updateLookup
        collection=testCollection
        connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
        database=testDatabase
        poll.await.time.ms=5000
        poll.max.batch.size=1000
        tasks.max=1
        topic.namespace.map={"testDatabase.testCollection": "targetTopic"}
        # Specifies whether to synchronize all existing data in the source ApsaraDB for MongoDB collection to the destination ApsaraMQ for Kafka topic. 
        # If this parameter is set to true, all historical data in the ApsaraDB for MongoDB collection is synchronized to the downstream ApsaraMQ for Kafka topic when the connector is started for the first time. 
        # We recommend that you change the value of this parameter to false after the first full data synchronization is complete. This prevents repeated data consumption after the connector is deleted and rebuilt. 
        copy.existing=true

        For information about all parameters that are used to create the MongDB Kafka source connector, see All Source Connector Configuration Properties.

      3. In the Instance step, configure the parameters and click Next Step. The following table describes the parameters.

        Subsection

        Parameter

        Description

        Worker Type

        Worker Type

        Select a worker type.

        Min. Number of Workers

        Set this parameter to 1.

        Max. Number of Workers

        Set this parameter to 1.

        Worker Configurations

        Automatically Create Dependencies for Apache Kafka Connector Worker

        We recommend that you select this option. After you select this option, the system creates internal topics and consumer groups that are required to run Kafka Connect in the selected ApsaraMQ for Kafka instance and then synchronizes the information to the corresponding parameters in the code editor. The following section describes the configuration items.

        • Offset Topic: the topic that is used to store offset data. The name of the topic is in the connect-eb-offset-<Task name> format.

        • Config Topic: the topic that is used to store the configuration data of connectors and tasks. The name of the topic is in the connect-eb-config-<Task name> format.

        • Status Topic: the topic that is used to store status data of connectors and tasks. The name of the topic is in the connect-eb-status-<Task name> format.

        • Kafka Connect Consumer Group: the consumer group that the Kafka Connect workers use to consume messages in internal topics. The name of the consumer group is in the connect-eb-cluster-<Task name> format.

        • Kafka Source Connector Consumer Group: the consumer group that is used to consume data in the source ApsaraMQ for Kafka topic. This consumer group can be used only by the MongoDB Kafka sink connector. The name of the consumer group is in the connector-eb-cluster-<task name>-<connector name> format.

      4. In the Running Configurations section, set the Log Delivery parameter to Deliver Data to Log Service or Deliver Data to ApsaraMQ for Kafka, select a role on which Kafka Connect depends from the Role drop-down list in the Role Authorization subsection, and then click Save.

        Important

        We recommend that you select a role to which the AliyunSAEFullAccess permission policy is attached. Otherwise, the task may fail to run.

    • Task Property

      Configure the retry policy and dead-letter queue for the task. For more information, see Retry policies and dead-letter queues.

    If the status of the task becomes Running, the connector starts to work as expected.

Sink Connector

  1. Download the MongoDB Connector V1.8.1 file and decompress the file to your local computer. The following code shows the directory structure:

    Important

    When you download the MongoDB Connector file, select a version that is compatible with Java 8.

    .
    ├── assets
    │   ├── mongodb-leaf.png
    │   └── mongodb-logo.png
    ├── doc
    │   ├── LICENSE.txt
    │   └── README.md
    ├── etc
    │   ├── MongoSinkConnector.properties
    │   └── MongoSourceConnector.properties
    ├── lib
    │   └── mongo-kafka-connect-1.9.1-confluent.jar
    └── manifest.json
  2. Download avro-1.9.2.jar and mongodb-driver-sync-4.9.0.jar from the Maven repository and move the two JAR packages to the lib directory in the MongoDB Connector folder. Then, compress the MongoDB Connector file into a ZIP file and upload the ZIP file to the created OSS bucket. For more information, see Get started by using the OSS console.

  3. Log on to the ApsaraMQ for Kafka console. In the Resource Distribution section of the Overview page, select the region where the ApsaraMQ for Kafka instance that you want to manage resides.

  4. In the left-side navigation pane, choose Connector Ecosystem Integration > Tasks.

  5. On the Tasks page, click Create Task.

  6. On the Create Task page, configure the Task Name parameter and follow the on-screen instructions to configure other parameters. The following section describes the parameters:

    • Task Creation

      1. In the Source step, set the Data Provider parameter to Apache Kafka Connect and click Next Step.

      2. In the Connector step, configure the parameters and click Next Step. The following table describes the parameters.

        Subsection

        Parameter

        Description

        Kafka Connect Plug-in

        Bucket

        Select the OSS bucket to which the MongoDB Connector file was uploaded.

        File

        Select the MongoDB Connector file that you uploaded to the OSS bucket.

        Message Queue for Apache Kafka Resources

        Message Queue for Apache Kafka Parameters

        Select Sink Connect.

        Message Queue for Apache Kafka Instance

        Select the ApsaraMQ for Kafka instance that you created in the "Prerequisites" section of this topic.

        VPC

        Select the VPC that you created.

        vSwitch

        Select the vSwitch that you created.

        Security Group

        Select a security group.

        Kafka Connect

        Parse .properties File in ZIP Package

        Select Create .properties File. Then, select the .properties file that contains the configurations of the sink connector from the ZIP package. The path of the .properties file is /etc/MongoSinkConnector.properties.

        Change the values of the related fields in the code editor. Expand to view the field description

        Field

        Description

        connector.class

        The name of the MongoDB Connector file. Use the default value for this field.

        tasks.max

        The maximum number of tasks.

        topics

        The name of the source topic. Separate multiple topics with commas (,).

        connection.url

        The URL that is used to access the ApsaraDB for MongDB database. Specify the VPC endpoint that you obtained in Step 1: Create a table. Replace **** in the endpoint with the password of the root account.

        database

        The name of the destination ApsaraDB for MongoDB database.

        connection

        The name of the destination ApsaraDB for MongoDB collection.

        Expand to view sample code

        connector.class=com.mongodb.kafka.connect.MongoSinkConnector
        name=mongo-sink
        collection=testCollection
        connection.uri=mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
        database=testDatabase
        delete.on.null.values=false
        key.converter=org.apache.kafka.connect.storage.StringConverter
        key.converter.schemas.enable=false
        max.batch.size=0
        rate.limiting.every.n=0
        rate.limiting.timeout=0
        tasks.max=2
        topics=testTopic
        value.converter=org.apache.kafka.connect.storage.StringConverter
        value.converter.schemas.enable=true
        writemodel.strategy=com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy

        For information about all parameters that are used to create the MongDB Kafka sink connector, see All Sink Connector Configuration Properties.

      3. In the Instance step, configure the parameters and click Next Step. The following table describes the parameters.

        Subsection

        Parameter

        Description

        Worker Type

        Worker Type

        Select a worker type.

        Min. Number of Workers

        Set this parameter to 1.

        Max. Number of Workers

        Set this parameter to 1. The value of this field cannot be greater than the value of the tasks.max field.

        Worker Configurations

        Automatically Create Dependencies for Apache Kafka Connector Worker

        We recommend that you select this option. After you select this option, the system creates internal topics and consumer groups that are required to run Kafka Connect in the selected ApsaraMQ for Kafka instance and then synchronizes the information to the corresponding parameters in the code editor. The following section describes the configuration items.

        • Offset Topic: the topic that is used to store offset data. The name of the topic is in the connect-eb-offset-<Task name> format.

        • Config Topic: the topic that is used to store the configuration data of connectors and tasks. The name of the topic is in the connect-eb-config-<Task name> format.

        • Status Topic: the topic that is used to store status data of connectors and tasks. The name of the topic is in the connect-eb-status-<Task name> format.

        • Kafka Connect Consumer Group: the consumer group that the Kafka Connect workers use to consume messages in internal topics. The name of the consumer group is in the connect-eb-cluster-<Task name> format.

        • Kafka Source Connector Consumer Group: the consumer group that is used to consume data in the source ApsaraMQ for Kafka topic. This consumer group can be used only by the MongoDB Kafka sink connector. The name of the consumer group is in the connector-eb-cluster-<task name>-<connector name> format.

      4. In the Running Configurations section, set the Log Delivery parameter to Deliver Data to Log Service or Deliver Data to ApsaraMQ for Kafka, select a role on which Kafka Connect depends from the Role drop-down list in the Role Authorization subsection, and then click Save.

        Important

        We recommend that you select a role to which the AliyunSAEFullAccess permission policy is attached. Otherwise, the task may fail to run.

    • Task Property

      Configure the retry policy and dead-letter queue for the task. For more information, see Retry policies and dead-letter queues.

    If the status of the task becomes Running, the connector starts to work as expected.

Step 3: Test the connector

Source Connector

  1. In the DMS console, insert a data record into the data table that you created in Step 1: Create a table. The following sample command provides an example on how to insert a data record whose Key is testKey and Value is testValue:

    db.testCollection.insert({"testKey":"testValue"})
  2. Log on to the ApsaraMQ for Kafka console. On the Instances page, click the name of the instance that you want to manage.

  3. In the left-side navigation pane of the Instance Details page, click Topics. On the page that appears, click the topic that you want to manage. Then, click the Message Query tab to view the inserted message data. The following sample code provides an example of a message value:

    {"_id": {"_data": "826464A63D000000012B022C0100296E5A1004CB11AB15FD6D4C409E37370B43A4B82246645F696400646464A624458CE6B7B626645B****"}, "operationType": "insert", "clusterTime": {"$timestamp": {"t": 1684317757, "i": 1}}, "fullDocument": {"_id": {"$oid": "6464a624458ce6b7b626****"}, "testKey": "testValue"}, "ns": {"db": "test", "coll": "mongo"}, "documentKey": {"_id": {"$oid": "6464a624458ce6b7b626****"}}}

Sink Connector

  1. Log on to the ApsaraMQ for Kafka console. On the Instances page, click the name of the instance that you want to manage.

  2. In the left-side navigation pane of the Instance Details page, click Topics. On the page that appears, click the name of the topic that you want to manage.

  3. In the upper-right corner of the Topic Details page, click Send Message.

  4. In the Start to Send and Consume Message panel, specify the message content. For example, if you want to add a data record whose Key is Key1 and Value is Value1, enter the following message content:

    {"key1": "value1"}
  5. In the DMS console, run the following command to view the received data in the destination collection:

    db.mongo.find()

    Sample received data:

    {
        "_id":"ObjectId("643f4d5551daf4552246****")"
        "_insertedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "_modifiedTS":"ISODate("2023-05-18T02:09:25.314Z")"
        "key1":"value1"
    }

Common errors and troubleshooting

Error 1: All tasks fail to run

Error message:

All tasks under connector mongo-source failed, please check the error trace of the task.

Solution: On the Message Inflow Task Details page, click Diagnostics in the Basic Information section to go to the Connector Monitoring page. On the Connector Monitoring page, you can view the details of the task failure.

Error 2: Kafka Connect exits

Error message:

Kafka connect exited! Please check the error log /opt/kafka/logs/connect.log on sae application to find out the reason why kafka connect exited and update the event streaming with valid arguments to solve it.

Solution: The status update of Kafka Connect may be delayed. We recommend that you refresh the page. If Kafka Connect still fails, you can perform the following operations to troubleshoot the issue:

  1. In the Worker Information section of the Message Inflow Task Details page, click the instance name to the right of SAE Application to go to the Application Details page.

  2. On the Basic Information page, click the Instance Deployment Information tab.

  3. Click Webshell in the Actions column to log on to the running environment of Kafka Connect.实例部署信息

    • Run the vi /home/admin/connector-bootstrap.log command to view the startup logs of the connector and check whether an error message exists in the logs.

    • Run the vi /opt/kafka/logs/connect.log command to view the running logs of the connector and check whether an error message exists in the ERROR or WARN field.

After you troubleshoot the issue based on the error message, you can restart the corresponding task.

Error 3: The verification of connector parameters fails

Error message:

Start or update connector xxx failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):
Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`

Solution: Find the parameter whose value is invalid based on the error message and update the parameter. If you cannot find the parameter based on the error message, you can log on to the running environment of Kafka Connect and run the following command. For information about how to log on to the running environment of Kafka Connect, see Error 2 in this topic.

curl -i -X PUT -H "Accept:application/json" -H  "Content-Type:application/json" -d @$CONNECTOR_PROPERTIES_MAPPING http://localhost:8083/connector-plugins/io.confluent.connect.jdbc.JdbcSinkConnector/config/validate

The verification result of each connector parameter is returned in the response. If the value of a parameter is invalid, the errors field of the parameter is not empty.

"value":{
    "name":"snapshot.mode",
    "value":null,
    "recommended_values":[
        "never",
        "initial_only",
        "when_needed",
        "initial",
        "schema_only",
        "schema_only_recovery"
    ],
    "errors":[
        "Value must be one of never, initial_only, when_needed, initial, schema_only, schema_only_recovery"
    ],
    "visible":true
}

Error 4: The connector fails to connect to the server

Error message:

Start or update connector mongo-source failed. Error code=400. Error message=Connector configuration is invalid and contains the following 1 error(s):Unable to connect to the server.

Solution: Check whether the configurations of the connector are correct. Then, check whether the ApsaraDB for MongoDB instance and the ApsaraMQ for Kafka instance are deployed in the same VPC and vSwitch. If the instances are not deployed in the same vSwitch, add the CIDR block of the vSwitch in which the ApsaraMQ for Kafka instance is deployed to the whitelist of the ApsaraDB for MongoDB instance.

Error 5: The username or the password of the ApsaraDB for MongoDB database contains invalid characters

Error message:

The connection string contains invalid user information. If the username or password contains a colon (:) or an at-sign (@) then it must be urlencoded\nYou can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"}

Solution: Check whether at signs (@) or colons (:) are included in the username and password of the ApsaraDB for MongoDB instance. If these special characters are included in the username and password, escape the special characters. After you encode the hexadecimal URL, an at sign (@) is displayed as %40 and a colon (:) is displayed as %3A.