This topic describes how to create a MySQL source connector and synchronize data from ApsaraDB RDS for MySQL to topics in your Message Queue for Apache Kafka instance by using DataWorks.

Prerequisites

Before you export data, make sure that the following operations are completed:
  • The connector feature is enabled for the Message Queue for Apache Kafka instance. For more information, see Enable the connector feature.
    Notice Your Message Queue for Apache Kafka instance is deployed in the China (Shenzhen), China (Chengdu), China (Beijing), China (Zhangjiakou), China (Hangzhou), China (Shanghai), or Singapore (Singapore) region.
  • An ApsaraDB RDS for MySQL instance is created. For more information, see Create an ApsaraDB RDS for MySQL instance.
  • Databases and accounts are created in the ApsaraDB RDS for MySQL instance. For more information, see Create accounts and databases for an ApsaraDB RDS for MySQL instance.
  • Tables are created in the databases. For more information about the SQL statements that are frequently used in ApsaraDB RDS for MySQL, see Commonly used SQL statements for MySQL.
  • DataWorks is authorized to access your elastic network interfaces (ENIs) no matter whether you use an Alibaba Cloud account or a Resource Access Management (RAM) user. To grant permissions, go to the Cloud Resource Access Authorization page.
    Notice If you use a RAM user, make sure that the RAM user is granted the following permissions:
    • AliyunDataWorksFullAccess: the permission to manage all the DataWorks resources within the Alibaba Cloud account.
    • AliyunBSSOrderAccess: the permission to purchase an Alibaba Cloud service.

    For more information about how to attach policies to RAM users, see Step 2: Grant permissions to the RAM user.

  • Both the data source and the data destination are created by using your account. The data source is an ApsaraDB RDS for MySQL instance. The data destination is your Message Queue for Apache Kafka instance.
  • The CIDR block of the virtual private cloud (VPC) where the ApsaraDB RDS for MySQL instance is located and the CIDR block of the VPC where the Message Queue for Apache Kafka instance is located do not overlap. If the CIDR blocks overlap, a data synchronization task cannot be created in Message Queue for Apache Kafka.

Background information

You can create a data synchronization task in the Message Queue for Apache Kafka console to synchronize data from tables in your ApsaraDB RDS for MySQL instance to topics in your Message Queue for Apache Kafka instance. The data synchronization task is created and run by using DataWorks, as shown in the following figure. mysql_connector

After a data synchronization task is created in the Message Queue for Apache Kafka console, DataWorks Basic Edition is automatically activated for free. DataWorks Basic Edition allows you to create DataWorks workspaces for free and create exclusive resource groups for Data Integration that incur fees. The specifications of an exclusive resource group for Data Integration are 4 vCPUs and 8 GB memory. The resource groups are available for monthly subscriptions. By default, an exclusive resource group for Data Integration is automatically renewed upon expiration. For more information about the billing of DataWorks, see Overview.

In addition, DataWorks automatically generates destination topics in Message Queue for Apache Kafka based on the configurations of your data synchronization task. Source tables and destination topics have one-to-one mappings. By default, DataWorks generates a topic with six partitions for each table that has a primary key and a topic with one partition for each table that does not have a primary key. Make sure that the total numbers of topics and partitions in your Message Queue for Apache Kafka instance are sufficient after DataWorks generates the topics and partitions. Otherwise, the task fails and an error is thrown because topics fail to be created.

The name of each topic must be in the Specified prefix_Name of the corresponding source table format. The underscore (_) is automatically added by the system. The following figure provides an example.

table_topic_match

In this example, the specified prefix is mysql. The source tables to be synchronized are table_1, table_2, ..., and table_n. DataWorks automatically generates topics for you to receive the data that is synchronized from the source tables. The topics are named mysql_table_1, mysql_table_2, ..., and mysql_table_n.

Usage notes

  • Regions
    • Your data source and Message Queue for Apache Kafka instance may not be in the same region. In this case, make sure that you have a Cloud Enterprise Network (CEN) instance within your Alibaba Cloud account. Make sure that the VPCs of the data source and the Message Queue for Apache Kafka instance are attached to the CEN instance. In addition, make sure that the configurations of traffic and bandwidth are complete and that end-to-end connectivity is available.

      Otherwise, a CEN instance may be automatically created. In this case, the VPCs of your destination Message Queue for Apache Kafka instance and Elastic Compute Service (ECS) instances of your exclusive resource group are attached to the CEN instance to ensure end-to-end connectivity. However, the bandwidth of the automatically created CEN instance is extremely low because the bandwidth is not manually configured. Therefore, a network access error may occur when you create a data synchronization task or when the data synchronization task is running.

    • Assume that your data source and Message Queue for Apache Kafka instance are in the same region. When you create a data synchronization task, an ENI is automatically created in the VPC of the data source or Message Queue for Apache Kafka instance. The ENI is also automatically bound to the ECS instances of your exclusive resource group to ensure end-to-end connectivity.
  • Exclusive resource groups in DataWorks
    • DataWorks allows you to use each exclusive resource group to run up to three data synchronization tasks. Assume that DataWorks finds that an existing resource group has been used to run less than three data synchronization tasks when you create a data synchronization task. DataWorks uses this resource group to run the newly created data synchronization task.
    • Each exclusive resource group in DataWorks can be bound to the ENIs of up to two VPCs. The ENI bound to the existing resource group that DataWorks finds and the ENI that needs to be bound may have overlapping CIDR blocks. This, as well as other technical issues, causes DataWorks to fail to create a data synchronization task by using the existing resource group. In this case, even if the existing resource group has been used to run less than three data synchronization tasks, DataWorks still creates a resource group to ensure that a data synchronization task can be created.

Create and deploy a MySQL source connector

  1. Log on to the Message Queue for Apache Kafka console.
  2. In the Resource Distribution section of the Overview page, select the region where your instance resides.
  3. On the Instances page, click the name of the instance that you want to manage.
  4. In the left-side navigation pane, click Connectors.
  5. On the Connectors page, click Create Connector.
  6. In the Create Connector wizard, perform the following steps:
    1. In the Configure Basic Information step, enter the connector name in the Name field and click Next.
      Parameter Description Example
      Name The name of the connector. Take note of the following rules when you specify a connector name:
      • The connector name can be up to 48 characters in length. It can contain digits, lowercase letters, and hyphens (-), but cannot start with a hyphen (-).
      • The connector name must be unique within the Message Queue for Apache Kafka instance.

      The data synchronization task of the connector must use a consumer group that is named in the connect-Task name format. If you have not manually created such a consumer group, the system automatically creates a consumer group for you.

      kafka-source-mysql
      Instance The default configuration is the name and instance ID of the instance. demo alikafka_post-cn-st21p8vj****
    2. In the Configure Source Service step, select ApsaraDB RDS for MySQL for Data Source, set the parameters that are described in the following table, and then click Next.
      Parameter Description Example
      Region of ApsaraDB RDS for MySQL Instance The ID of the region where the ApsaraDB RDS for MySQL instance is located. Select an ID from the drop-down list. China (Shenzhen)
      ApsaraDB RDS for MySQL Instance ID The ID of the ApsaraDB RDS for MySQL instance from which data is to be synchronized. rm-wz91w3vk6owmz****
      Database Name The name of the ApsaraDB RDS for MySQL database from which data is to be synchronized. mysql-to-kafka
      Database Account The account that is used to access the ApsaraDB RDS for MySQL database. mysql_to_kafka
      Password of Database Account The password that is used to access the ApsaraDB RDS for MySQL database. None
      Database Table The name of the ApsaraDB RDS for MySQL table from which data is to be synchronized. Separate multiple table names with commas (,).

      Source tables and destination topics have one-to-one mappings.

      mysql_tbl
      Topic Prefix The prefix used to name topics that are to be created in Message Queue for Apache Kafka . Each topic name consists of the prefix and the name of the corresponding source table in the ApsaraDB RDS for MySQL database. Make sure that the prefix is globally unique. mysql
      Notice
      Make sure that the ApsaraDB RDS for MySQL database account is granted at least the following permissions:
      • SELECT
      • REPLICATION SLAVE
      • REPLICATION CLIENT
      The following code provides an example on how to grant permissions to the account:
      GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'Account for data synchronization'@'%'; // Grant the SELECT, REPLICATION SLAVE, and REPLICATION CLIENT permissions to the account. 
    3. In the Configure Destination Service step, the destination Message Queue for Apache Kafka instance to which the data is to be synchronized appears. Confirm the information and click Create.
  7. After the connector is created, go to the Connectors page, find the connector that you created, and then click Deploy in the Actions column.
    On the Connectors page, the value of Status of the data synchronization task is Running. This means that the task is created.
    Note If the task fails to be created, check whether all the prerequisites that are described in this topic are met.

Verify the results

  1. Insert data into a data source table in the ApsaraDB RDS for MySQL database.
    The following sample code provides an example:
    INSERT INTO mysql_tbl
        (mysql_title, mysql_author, submission_date)
        VALUES
        ("mysql2kafka", "tester", NOW())
    For more information about the SQL statements that are frequently used in ApsaraDB RDS for MySQL, see Commonly used SQL statements for MySQL.
  2. Use the message query feature of Message Queue for Apache Kafka to verify whether the data of the table in the ApsaraDB RDS for MySQL database can be synchronized to a topic in your Message Queue for Apache Kafka instance.
    For more information, see Query messages.