All Products
Search
Document Center

ApsaraDB for SelectDB:Use SeaTunnel to import data

Last Updated:Dec 09, 2024

ApsaraDB for SelectDB is integrated with SeaTunnel. You can use SeaTunnel SelectDB Sink to import table data to ApsaraDB for SelectDB. This topic describes how to use SeaTunnel SelectDB Sink to synchronize data to ApsaraDB for SelectDB.

Overview

SeaTunnel is an easy-to-use, high-performance distributed data integration platform that supports real-time synchronization of large amounts of data. You can use SeaTunnel to read large amounts of data from data sources such as MySQL, Hive, and Kafka, and then use SeaTunnel SelectDB Sink to write the data to ApsaraDB for SelectDB.

Prerequisites

SeaTunnel 2.3.1 or later is installed.

How it works

SeaTunnel allows you to write upstream data in the JSON or CSV format to ApsaraDB for SelectDB. The sink configurations vary with the data format.

JSON format

sink { 
  SelectDB { 
    load-url="ip:http_port" 
    jdbc-url="ip:mysql_port" 
    cluster-name="Cluster" 
    table.identifier="test_db.test_table" 
    username="admin" 
    password="****" 
    selectdb.config { 
      file.type="json" 
    } 
  }
}

CSV format

sink { 
  SelectDB { 
    load-url="ip:http_port" 
    jdbc-url="ip:mysql_port" 
    cluster-name="Cluster" 
    table.identifier="test_db.test_table" 
    username="admin" 
    password="****" 
    selectdb.config { 
      file.type="csv" 
      file.column_separator="," 
      file.line_delimiter="\n" 
    } 
  }
}

The following table describes the parameters in the sink configurations.

Parameter

Required

Description

load-url

Yes

The endpoint and HTTP port that are used to access the ApsaraDB for SelectDB instance.

To obtain the virtual private cloud (VPC) endpoint or public endpoint and HTTP port of an ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter.

Example: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

jdbc-url

Yes

The endpoint and MySQL port that are used to access the ApsaraDB for SelectDB instance.

To obtain the VPC endpoint or public endpoint and MySQL port of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the MySQL Port parameter.

Example: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030.

cluster-name

Yes

The name of the cluster in the ApsaraDB for SelectDB instance. An ApsaraDB for SelectDB instance may contain multiple clusters. Select a cluster based on your business requirements.

username

Yes

The username that is used to connect to the ApsaraDB for SelectDB instance.

password

Yes

The password that is used to connect to the ApsaraDB for SelectDB instance.

table.identifier

Yes

The name of the table in the ApsaraDB for SelectDB instance. Specify the name in the format of Database name.Table name. Example: test_db.test_table.

selectdb.config

Yes

The configurations of the data import job.

  • CSV format:

    selectdb.config { file.type='csv' file.column_separator=',' file.line_delimiter='\n' } 
  • JSON format:

    selectdb.config { file.type="json" file.strip_outer_array="false" }

sink.enable-delete

No

Specifies whether to enable the bulk deletion feature. This feature is supported only for the unique key model.

sink.buffer-size

No

The maximum buffer size. Unit: bytes. The default value is equivalent to 10 MB. If the buffer size exceeds the upper limit, all the content in the buffer is flushed to Object Storage Service (OSS). We recommend that you use the default value.

sink.buffer-count

No

The maximum number of data records that can be buffered. Default value: 10000. If the number of data records that are buffered exceeds the upper limit, all the content in the buffer is flushed to OSS. We recommend that you use the default value.

sink.max-retries

No

The maximum number of retries in the commit phase. Default value: 3.

sink.enable-2pc

No

Specifies whether to enable the two-phase commit mode. You can enable the two-phase commit mode to ensure the exactly-once semantics. Default value: true.

Example

In this example, SeaTunnel is used to import data from an upstream MySQL database to ApsaraDB for SelectDB. The following table describes the software versions in the example.

Environment

Version

Java Development Kit (JDK)

1.8

SeaTunnel

2.3.3

ApsaraDB for SelectDB

3.0.4

Prepare the environment

  1. Configure SeaTunnel.

    1. Download and decompress the SeaTunnel installation package. In this example, the SeaTunnel installation package apache-seatunnel-2.3.3-bin.tar.gz is used.

      wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz
      tar -xzvf apache-seatunnel-2.3.3-bin.tar.gz
    2. Modify the SEATUNNEL_HOME/config/plugin_config file. Retain only the required connectors.

      --connectors-v2--
      connector-cdc-mysql
      connector-selectdb-cloud
      connector-jdbc
      connector-fake
      connector-console
      connector-assert
      --end--
    3. Install the SeaTunnel connector plug-in.

      sh bin/install-plugin.sh
    4. Download the MySQL driver and place it in the SEATUNNEL_HOME/jar directory.

      cd lib/
      wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
  2. Construct the data to be imported. In this example, a small amount of sample data is constructed in a MySQL database for import.

    1. Create a test table in the MySQL database.

      CREATE TABLE `employees` (
        `emp_no` INT NOT NULL,
        `birth_date` DATE NOT NULL,
        `first_name` VARCHAR(14) NOT NULL,
        `last_name` VARCHAR(16) NOT NULL,
        `gender` ENUM('M','F') NOT NULL,
        `hire_date` DATE NOT NULL,
        PRIMARY KEY (`emp_no`)
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
    2. Use Data Management (DMS) to generate test data. For more information, see Generate test data.

  3. Configure an ApsaraDB for SelectDB instance.

    1. Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.

    2. Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.

    3. Create a test database and a test table.

      1. Create a test database.

        CREATE DATABASE test_db;
      2. Create a test table.

        USE test_db;
        CREATE TABLE employees (
            emp_no       INT NOT NULL,
            birth_date   DATE,
            first_name   VARCHAR(20),
            last_name    VARCHAR(20),
            gender       CHAR(2),
            hire_date    DATE
        )
        UNIQUE KEY(`emp_no`)
        DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
    4. Apply for a public endpoint for the ApsaraDB for SelectDB instance. For more information, see Apply for or release a public endpoint.

    5. Add the public IP address of SeaTunnel to the IP address whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.

Use the local SeaTunnel engine to synchronize data from the MySQL database to the ApsaraDB for SelectDB instance

  1. Create the mysqlToSelectDB.conf configuration file and configure the job information.

    env {
      execution.parallelism = 2
      job.mode = "BATCH"
      checkpoint.interval = 10000
    }
    
    source{
      jdbc {
        url = "jdbc:mysql://host:ip/test_db"
        driver = "com.mysql.cj.jdbc.Driver"
        user = "admin"
        password = "****"
        query = "select * from employees"
      }
    }
     
    sink {
      SelectDBCloud {
        load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080"
        jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030"
        cluster-name="new_cluster"
        table.identifier="test_db.employees"
        username="admin"
        password="****"
        selectdb.config {
            file.type="json"
        }
      }
    }
  2. Submit the job.

    sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local