ApsaraDB for SelectDB is integrated with SeaTunnel. You can use SeaTunnel SelectDB Sink to import table data to ApsaraDB for SelectDB. This topic describes how to use SeaTunnel SelectDB Sink to synchronize data to ApsaraDB for SelectDB.
Overview
SeaTunnel is an easy-to-use, high-performance distributed data integration platform that supports real-time synchronization of large amounts of data. You can use SeaTunnel to read large amounts of data from data sources such as MySQL, Hive, and Kafka, and then use SeaTunnel SelectDB Sink to write the data to ApsaraDB for SelectDB.
Prerequisites
SeaTunnel 2.3.1 or later is installed.
How it works
SeaTunnel allows you to write upstream data in the JSON or CSV format to ApsaraDB for SelectDB. The sink configurations vary with the data format.
JSON format
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="json"
}
}
}CSV format
sink {
SelectDB {
load-url="ip:http_port"
jdbc-url="ip:mysql_port"
cluster-name="Cluster"
table.identifier="test_db.test_table"
username="admin"
password="****"
selectdb.config {
file.type="csv"
file.column_separator=","
file.line_delimiter="\n"
}
}
}The following table describes the parameters in the sink configurations.
Parameter | Required | Description |
load-url | Yes | The endpoint and HTTP port that are used to access the ApsaraDB for SelectDB instance. To obtain the virtual private cloud (VPC) endpoint or public endpoint and HTTP port of an ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter. Example: |
jdbc-url | Yes | The endpoint and MySQL port that are used to access the ApsaraDB for SelectDB instance. To obtain the VPC endpoint or public endpoint and MySQL port of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the instance whose information you want to view. In the Network Information section of the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the MySQL Port parameter. Example: |
cluster-name | Yes | The name of the cluster in the ApsaraDB for SelectDB instance. An ApsaraDB for SelectDB instance may contain multiple clusters. Select a cluster based on your business requirements. |
username | Yes | The username that is used to connect to the ApsaraDB for SelectDB instance. |
password | Yes | The password that is used to connect to the ApsaraDB for SelectDB instance. |
table.identifier | Yes | The name of the table in the ApsaraDB for SelectDB instance. Specify the name in the format of |
selectdb.config | Yes | The configurations of the data import job.
|
sink.enable-delete | No | Specifies whether to enable the bulk deletion feature. This feature is supported only for the unique key model. |
sink.buffer-size | No | The maximum buffer size. Unit: bytes. The default value is equivalent to 10 MB. If the buffer size exceeds the upper limit, all the content in the buffer is flushed to Object Storage Service (OSS). We recommend that you use the default value. |
sink.buffer-count | No | The maximum number of data records that can be buffered. Default value: 10000. If the number of data records that are buffered exceeds the upper limit, all the content in the buffer is flushed to OSS. We recommend that you use the default value. |
sink.max-retries | No | The maximum number of retries in the commit phase. Default value: 3. |
sink.enable-2pc | No | Specifies whether to enable the two-phase commit mode. You can enable the two-phase commit mode to ensure the exactly-once semantics. Default value: true. |
Example
In this example, SeaTunnel is used to import data from an upstream MySQL database to ApsaraDB for SelectDB. The following table describes the software versions in the example.
Environment | Version |
Java Development Kit (JDK) | 1.8 |
SeaTunnel | 2.3.3 |
ApsaraDB for SelectDB | 3.0.4 |
Prepare the environment
Configure SeaTunnel.
Download and decompress the SeaTunnel installation package. In this example, the SeaTunnel installation package apache-seatunnel-2.3.3-bin.tar.gz is used.
wget https://dlcdn.apache.org/seatunnel/2.3.3/apache-seatunnel-2.3.3-bin.tar.gz tar -xzvf apache-seatunnel-2.3.3-bin.tar.gzModify the SEATUNNEL_HOME/config/plugin_config file. Retain only the required connectors.
--connectors-v2-- connector-cdc-mysql connector-selectdb-cloud connector-jdbc connector-fake connector-console connector-assert --end--Install the SeaTunnel connector plug-in.
sh bin/install-plugin.shDownload the MySQL driver and place it in the SEATUNNEL_HOME/jar directory.
cd lib/ wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
Construct the data to be imported. In this example, a small amount of sample data is constructed in a MySQL database for import.
Create a test table in the MySQL database.
CREATE TABLE `employees` ( `emp_no` INT NOT NULL, `birth_date` DATE NOT NULL, `first_name` VARCHAR(14) NOT NULL, `last_name` VARCHAR(16) NOT NULL, `gender` ENUM('M','F') NOT NULL, `hire_date` DATE NOT NULL, PRIMARY KEY (`emp_no`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3Use Data Management (DMS) to generate test data. For more information, see Generate test data.
Configure an ApsaraDB for SelectDB instance.
Create an ApsaraDB for SelectDB instance. For more information, see Create an instance.
Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.
Create a test database and a test table.
Create a test database.
CREATE DATABASE test_db;Create a test table.
USE test_db; CREATE TABLE employees ( emp_no INT NOT NULL, birth_date DATE, first_name VARCHAR(20), last_name VARCHAR(20), gender CHAR(2), hire_date DATE ) UNIQUE KEY(`emp_no`) DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1;
Apply for a public endpoint for the ApsaraDB for SelectDB instance. For more information, see Apply for or release a public endpoint.
Add the public IP address of SeaTunnel to the IP address whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.
Use the local SeaTunnel engine to synchronize data from the MySQL database to the ApsaraDB for SelectDB instance
Create the
mysqlToSelectDB.confconfiguration file and configure the job information.env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 } source{ jdbc { url = "jdbc:mysql://host:ip/test_db" driver = "com.mysql.cj.jdbc.Driver" user = "admin" password = "****" query = "select * from employees" } } sink { SelectDBCloud { load-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:8080" jdbc-url="selectdb-cn-pe33hab****-public.selectdbfe.rds.aliyuncs.com:9030" cluster-name="new_cluster" table.identifier="test_db.employees" username="admin" password="****" selectdb.config { file.type="json" } } }Submit the job.
sh ./bin/seatunnel.sh --config ./mysqlToSelectDB.conf -e local