All Products
Search
Document Center

ApsaraMQ for Kafka:Use Kafka Connect to synchronize data from an SQL Server database to ApsaraMQ for Kafka

Last Updated:Sep 14, 2023

This tutorial shows you how to synchronize data from an SQL Server database to ApsaraMQ for Kafka by using a source connector of Kafka Connect.

Prerequisites

Before you start this tutorial, make sure that the following operations are complete:

  • An SQL Server source connector is downloaded. For more information, see SQL Server Source Connector.

  • Kafka Connect is downloaded. For more information, see Kafka Connect.

    Note

    SQL Server source connectors support only Kafka Connect 2.1.0 and later.

  • Docker is downloaded. For more information, see Docker.

Step 1: Configure Kafka Connect

  1. Decompress the downloaded package of the SQL Server source connector to the specified directory.

  2. In the configuration file connect-distributed.properties of Kafka Connect, configure the plug-in installation path.

    ## Specify the path where the decompressed plug-in is stored. 
    plugin.path=/kafka/connect/plugins
    Important

    In earlier versions of Kafka Connect, plugin.path cannot be configured, and you must specify the plug-in path in CLASSPATH.

    export CLASSPATH=/kafka/connect/plugins/sqlserver-connector/*

Step 2: Start Kafka Connect

After connect-distributed.properties is configured, run the following commands to start Kafka Connect:

  1. For access from the Internet, you must first configure java.security.auth.login.config. For access from a virtual private cloud (VPC), skip this step.

    export KAFKA_OPTS="-Djava.security.auth.login.config=kafka_client_jaas.conf"
  2. Start Kafka Connect.

    bin/connect-distributed.sh config/connect-distributed.properties

Step 3: Install SQL Server

Important

Only versions later than SQL Server 2016 SP1 support change data capture (CDC). Therefore, the version of your SQL Server must be later than this version.

  1. Download docker-compose-sqlserver.yaml.

  2. Run the following command to install SQL Server:

    docker-compose -f docker-compose-sqlserver.yaml up

Step 4: Configure SQL Server

  1. Download inventory.sql.

  2. Run the following command to initialize the test data in the SQL Server database:

    cat inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
  3. Optional:To listen to existing tables in the SQL Server database, perform the following operations:

    1. Run the following commands to enable CDC:

      ## Enable CDC for the database. 
      USE testDB
      GO
      EXEC sys.sp_cdc_enable_db
      GO
    2. Run the following commands to enable CDC for a specified table:

      ## Enable CDC for a specified table. 
      USE testDB
      GO
      
      EXEC sys.sp_cdc_enable_table
      @source_schema = N'dbo',
      @source_name   = N'MyTable',
      @role_name     = N'MyRole',
      @filegroup_name = N'MyDB_CT',
      @supports_net_changes = 1
      GO
    3. Run the following commands to check whether you have the permissions to access the table for which CDC is enabled:

      EXEC sys.sp_cdc_help_change_data_capture
      GO
      Note

      If the returned result is empty, check whether you have the permissions to access the table.

    4. Run the following command to check whether SQL Server Agent is enabled:

      EXEC master.dbo.xp_servicecontrol N'QUERYSTATE',N'SQLSERVERAGENT'
      Note

      If Running is returned, SQL Server Agent is enabled.

Step 5: Start the SQL Server source connector

  1. Download register-sqlserver.json.

  2. Edit register-sqlserver.json.

    • Access from a VPC

      ## The default endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint in the Message Queue for Apache Kafka console. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## You must create a topic in the Message Queue for Apache Kafka console with the same name as the specified topic in the SQL Server database in advance. In this example, create a topic named server1. 
      ## All table change data is recorded in server1.$DATABASE.$TABLE topics, such as the server1.testDB.products topic. 
      ## Therefore, you must create all related topics in the Message Queue for Apache Kafka console in advance. 
      "database.server.name": "server1",
      ## Schema changes are recorded in this topic. 
      ## You must create this topic in the Message Queue for Apache Kafka console in advance. 
      "database.history.kafka.topic": "schema-changes-inventory"
    • Access from the Internet

      ## The SSL endpoint of the Message Queue for Apache Kafka instance. You can obtain the endpoint in the Message Queue for Apache Kafka console. 
      "database.history.kafka.bootstrap.servers" : "kafka:9092",
      ## You must create a topic in the Message Queue for Apache Kafka console with the same name as the specified topic in the SQL Server database in advance. In this example, create a topic named server1. 
      ## All table change data is recorded in server1.$DATABASE.$TABLE topics, such as the server1.testDB.products topic. 
      ## Therefore, you must create all related topics in the Message Queue for Apache Kafka console in advance. 
      "database.server.name": "server1",
      ## Schema changes are recorded in this topic. 
      ## You must create this topic in the Message Queue for Apache Kafka console in advance. 
      "database.history.kafka.topic": "schema-changes-inventory",
      ## To access Message Queue for Apache Kafka over the Internet by using the SSL endpoint, modify the following configurations: 
      "database.history.producer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.producer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.producer.security.protocol": "SASL_SSL",
      "database.history.producer.sasl.mechanism": "PLAIN",
      "database.history.consumer.ssl.truststore.location": "kafka.client.truststore.jks",
      "database.history.consumer.ssl.truststore.password": "KafkaOnsClient",
      "database.history.consumer.security.protocol": "SASL_SSL",
      "database.history.consumer.sasl.mechanism": "PLAIN",
  3. After you configure register-sqlserver.json, you must create the corresponding topics in the console based on the configuration. For more information about the steps, see Step 1: Create a topic.

    In SQL Server installed based on this tutorial, db name:testDB is created in advance. The database contains the following four tables:

    • customers

    • orders

    • products

    • products_on_hand

    You must call the CreateTopic operation to create the following topics based on the preceding configuration of register-sqlserver.json:

    • server1

    • server1.testDB.customers

    • server1.testDB.orders

    • server1.testDB.products

    • server1.testDB.products_on_hand

    Schema change information needs to be stored in schema-changes-testDB based on the configuration in register-sqlserver.json. Therefore, you must create the schema-changes-inventory topic by calling the CreateTopic operation. For more information, see CreateTopic.

  4. Run the following command to start SQL Server:

    curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

Verify the results.

Perform the following steps to check whether ApsaraMQ for Kafka can receive change data from SQL Server:

  1. Modify the data in the monitored SQL Server database.

  2. Log on to the Message Queue for Apache Kafka console. On the Message Query page, query messages to verify that the table change data is synchronized. For more information, see Query messages.