This topic describes how to create a MySQL source connector and use DataWorks to synchronize data from ApsaraDB RDS for MySQL to topics in your ApsaraMQ for Kafka instance.

Prerequisites

The following requirements are met:
  • The connector feature is enabled for your ApsaraMQ for Kafka instance. For more information, see Enable the connector feature.
    Important Make sure that your ApsaraMQ for Kafka instance is deployed in the China (Shenzhen), China (Chengdu), China (Beijing), China (Zhangjiakou), China (Hangzhou), China (Shanghai), or Singapore region.
  • An ApsaraDB RDS for MySQL instance is created. For information about how to create an ApsaraDB RDS for MySQL instance, see Create an ApsaraDB RDS for MySQL instance.
  • A database and a database account are created in the ApsaraDB RDS for MySQL instance. For information about how to create a database and a database account, see Create databases and accounts for an ApsaraDB RDS for MySQL instance.
  • A table is created in the database. For information about the common SQL statements that are used in ApsaraDB RDS for MySQL, see Commonly used SQL statements for MySQL.
  • DataWorks is authorized to access your elastic network interfaces (ENIs) regardless of whether you use an Alibaba Cloud account or a Resource Access Management (RAM) user. To grant permissions to an account, go to the Cloud Resource Access Authorization page.
    Important If you use a RAM user, make sure that the RAM user is granted the following permissions:
    • AliyunDataWorksFullAccess: the permissions to manage all DataWorks resources within the Alibaba Cloud account.
    • AliyunBSSOrderAccess: the permissions to purchase Alibaba Cloud services.

    For information about how to attach policies to RAM users, see Step 2: Grant permissions to RAM users.

  • Both the data source and the data destination are created by using your account. The data source is an ApsaraDB RDS for MySQL instance. The data destination is your ApsaraMQ for Kafka instance.
  • The CIDR block of the virtual private cloud (VPC) where the ApsaraDB RDS for MySQL instance resides and the CIDR block of the VPC where the ApsaraMQ for Kafka instance resides do not overlap. If the CIDR blocks overlap, a MySQL source connector cannot be created in Message Queue for Apache Kafka.

Background information

You can create a MySQL source connector in the ApsaraMQ for Kafka console to synchronize data from tables in your ApsaraDB RDS for MySQL instance to topics in your ApsaraMQ for Kafka instance. The connector is created and run by using DataWorks, as shown in the following figure. mysql_connector

After a MySQL source connector is created in the ApsaraMQ for Kafka console, DataWorks Basic Edition is automatically activated free of charge. DataWorks Basic Edition creates DataWorks workspaces and exclusive resource groups for data integration. The DataWorks workspaces are free of charge, and exclusive resource groups are paid services. The specifications of an exclusive resource group for data integration are 4 vCPUs and 8 GB memory. Resource groups support monthly subscriptions. By default, an exclusive resource group for data integration is automatically renewed upon expiration. For more information about the billing of DataWorks, see Overview.

In addition, DataWorks automatically generates destination topics in ApsaraMQ for Kafka based on the configurations of your MySQL source connector. Source tables and destination topics are related based on one-to-one mappings. By default, DataWorks generates a topic that contains six partitions for each table that has a primary key and a topic that contains one partition for each table that does not have a primary key. Make sure that the available numbers of topics and partitions in your Message Queue for Apache Kafka instance are sufficient. Otherwise, the MySQL source connector fails to be created because DataWorks fails to create topics.

The name of each topic is in the <Specified prefix>_<Name of the source table> format. The underscore (_) is automatically added by the system. The following figure provides an example.

table_topic_match

In this example, the specified prefix is mysql. The source tables to be synchronized are table_1, table_2, ..., and table_n. DataWorks automatically generates topics for you to receive the data that is synchronized from the source tables. The topics are named mysql_table_1, mysql_table_2, ..., and mysql_table_n.

Precautions

  • Regions
    • Your data source and destination Message Queue for Apache Kafka instance may not be deployed in the same region. In this case, make sure that you have a Cloud Enterprise Network (CEN) instance within the Alibaba Cloud account and that the VPCs of your data source and Message Queue for Apache Kafka instance are attached to the CEN instance. In addition, make sure that the bandwidth for cross-region connections is configured to ensure end-to-end connectivity between the data source and the data destination.

      Otherwise, a CEN instance may be automatically created, and the VPCs of your destination Message Queue for Apache Kafka instance and the Elastic Compute Service (ECS) instance where your exclusive resource group resides are attached to the CEN instance to ensure end-to-end connectivity. The bandwidth of the automatically created CEN instance is extremely low because the bandwidth is not manually configured. This may cause a connectivity error when you create a MySQL source connector or when the connector is running.

    • Your data source and destination Message Queue for Apache Kafka instance may be in the same region. In this case, when you create a MySQL source connector, an ENI is automatically created in the VPC of the data source or Message Queue for Apache Kafka instance. The ENI is also automatically bound to the ECS instance where your exclusive resource group resides. This ensures end-to-end connectivity between the data source and the data destination.
  • Exclusive resource groups in DataWorks
    • DataWorks allows you to use each exclusive resource group to run up to three MySQL source connectors. If DataWorks finds that an existing resource group has been used to run less than three MySQL source connectors when you create a MySQL source connector, DataWorks uses this resource group to run the newly created MySQL source connector.
    • Each exclusive resource group in DataWorks can be associated with the ENIs of up to two VPCs. DataWorks may fail to create a MySQL source connector by using a resource group if the ENI to be associated with the resource group and ENIs that are already associated with the resource group have overlapping CIDR blocks or due to other technical limits. In this case, even if less than three connectors are running on this resource group, DataWorks still creates a resource group to ensure that the MySQL source connector can be created.

Create and deploy a MySQL source connector

  1. Log on to the ApsaraMQ for Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance is deployed.
  3. In the left-side navigation pane, click Connectors.
  4. On the Connectors page, select the instance in which the data source topic resides from the Select Instance drop-down list and click Create Connector.
  5. In the Create Connector wizard, perform the following operations:
    1. In the Configure Basic Information step, enter the connector name in the Name field and click Next.
      ParameterDescriptionExample
      NameThe name of the connector. Specify a connector name based on the following naming conventions:
      • The connector name must be 1 to 48 characters in length and can contain digits, lowercase letters, and hyphens (-). The name cannot start with a hyphen (-).
      • Each connector name must be unique within a ApsaraMQ for Kafka instance.

      The connector must use a consumer group that is named in the connect-Connector name format.Group If you have not created this consumer group, Message Queue for Apache Kafka automatically creates the consumer group for you.Group

      kafka-source-mysql
      InstanceThe information about the Message Queue for Apache Kafka instance. By default, the name and ID of the instance are displayed. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select ApsaraDB RDS for MySQL as the source service, configure the parameters that are described in the following table, and then click Next.
      ParameterDescriptionExample
      Region of ApsaraDB RDS for MySQL InstanceThe region ID of the ApsaraDB RDS for MySQL instance. Select a region ID from the drop-down list. China (Shenzhen)
      ApsaraDB RDS for MySQL Instance IDThe ID of the ApsaraDB RDS for MySQL instance from which you want to synchronize data. rm-wz91w3vk6owmz****
      Database NameThe name of the ApsaraDB RDS for MySQL database from which you want to synchronize data. mysql-to-kafka
      Database AccountThe username of the account that is used to access the ApsaraDB RDS for MySQL database. mysql_to_kafka
      Password of Database AccountThe password of the account that is used to access the ApsaraDB RDS for MySQL database. N/A
      Database TableThe name of one or more ApsaraDB RDS for MySQL tables from which you want to synchronize data. Separate multiple table names with commas (,).

      Source tables and destination topics are related based on one-to-one mappings.

      mysql_tbl
      Tables to Automatically AddThe regular expression based on which the system can automatically add other tables in the database. When the system detects that a table is created in the database and matches the regular expression, the system automatically synchronizes the data in the table to Message Queue for Apache Kafka.

      You can use .* to specify all tables in the database.

      .*
      Topic PrefixThe prefix that is used to name the automatically created topics for the connector in ApsaraMQ for Kafka. Each topic name consists of the prefix and the name of the corresponding source table in the ApsaraDB RDS for MySQL database. Make sure that the prefix is globally unique. mysql
      Important
      Make sure that the ApsaraDB RDS for MySQL database account is granted at least the following permissions:
      • SELECT
      • REPLICATION SLAVE
      • REPLICATION CLIENT
      The following statement provides an example on how to grant permissions to the account:
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Username of an account' @'%'; // Grant the SELECT, REPLICATION SLAVE, and REPLICATION CLIENT permissions to the database account. 
    3. In the Configure Destination Service step, the destination ApsaraMQ for Kafka instance to which you want to synchronize the data appears. Confirm the information and click Create.
  6. Go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
    On the Connectors page, if the value of the Status parameter is displayed as Running for the connector, the connector is created.
    Note If the connector fails to be created, check whether all the prerequisites that are described in this topic are met.

    If you need to modify the configurations of the MySQL source connector, click Task Configurations in the Actions column. In the DataWorks console to which you are redirected, modify the configurations of the connector.

Verify the result

  1. Insert data into a source table in the ApsaraDB RDS for MySQL database.
    The following sample code provides an example on how to insert data into a source table:
    INSERT INTO mysql_tbl
        (mysql_title, mysql_author, submission_date)
        VALUES
        ("mysql2kafka", "tester", NOW())
    For more information about SQL statements, see Commonly used SQL statements for MySQL.
  2. Use the message query feature of ApsaraMQ for Kafka to check whether the data of the source table in the ApsaraDB RDS for MySQL database is synchronized to the corresponding topic in your ApsaraMQ for Kafka instance.
    For more information, see Query messages.
    The following sample code provides an example of the data that is synchronized from a source table in ApsaraDB RDS for MySQL to a topic in ApsaraMQ for Kafka. For more information about message structures and fields, see Appendix: Message formats.
    {
        "schema":{
            "dataColumn":[
                {
                    "name":"mysql_id",
                    "type":"LONG"
                },
                {
                    "name":"mysql_title",
                    "type":"STRING"
                },
                {
                    "name":"mysql_author",
                    "type":"STRING"
                },
                {
                    "name":"submission_date",
                    "type":"DATE"
                }
            ],
            "primaryKey":[
                "mysql_id"
            ],
            "source":{
                "dbType":"MySQL",
                "dbName":"mysql_to_kafka",
                "tableName":"mysql_tbl"
            }
        },
        "payload":{
            "before":null,
            "after":{
                "dataColumn":{
                    "mysql_title":"mysql2kafka",
                    "mysql_author":"tester",
                    "submission_date":1614700800000
                }
            },
            "sequenceId":"1614748790461000000",
            "timestamp":{
                "eventTime":1614748870000,
                "systemTime":1614748870925,
                "checkpointTime":1614748870000
            },
            "op":"INSERT",
            "ddl":null
        },
        "version":"0.0.1"
    }