This topic describes how to use the custom SelectDB connector to write data to ApsaraDB for SelectDB.
Background information
ApsaraDB for SelectDB is a next-generation real-time data warehouse service. It is fully managed and hosted on Alibaba Cloud and 100% compatible with Apache Doris. ApsaraDB for SelectDB can accommodate your needs to analyze massive amounts of data. For more information about the service's benefits and use cases, see What is ApsaraDB for SelectDB.
The following table describes the capabilities supported by the custom SelectDB connector.
Item | Description |
Supported type | Sink table; data ingestion sink |
Running mode | Streaming and batch |
Data format | JSON and CSV |
Metrics | N/A |
API | DataStream API and SQL API |
Data update/deletion in the sink | Supported |
Features
Database synchronization.
Exactly-once semantics, ensuring no duplicates or omissions.
Compatibility with Apache Doris 1.0 or later, enabling seamless data synchronization to Apache Doris via the custom SelectDB connector.
Usage notes
Only Ververica Runtime (VVR) 8.0.10 or later supports the custom SelectDB connector.
If you have any questions when using the custom SelectDB connector, submit a ticket to ApsaraDB for SelectDB.
The prerequisites for synchronizing data to ApsaraDB for SelectDB are as follows:
An ApsaraDB for SelectDB instance is created. For more information, see Create an instance.
The IP address whitelist is configured. For more information, see Configure an IP address whitelist.
SQL
The SelectDB connector can be used as a sink table in SQL jobs.
Upload and configure the connector
Starting from VVR 11.1, the SelectDB connector becomes a built-in connector, so you can skip the following steps.
Click JAR file to download a SelectDB connector JAR (version 1.15 to 1.17).
Upload the SelectDB connector JAR to the Realtime Compute for Apache Flink console. For more information, see Manage custom connectors.
Create an SQL draft and use the custom SelectDB connector.
Set the
connectoroption todoris. For information about other sink options, see Configuration items of Doris sink.
Syntax
CREATE TABLE selectdb_sink (
emp_no INT ,
birth_date DATE,
first_name STRING,
last_name STRING,
gender STRING,
hire_date DATE
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'test.employees',
'username' = 'admin',
'password' = '****',
'sink.enable-delete' = 'true'
);Data type mappings
See the "Type Mapping" section of the Flink Doris connector topic in the Doris documentation.
Use the connector
This section illustrates how to synchronize data from ApsaraDB RDS for MySQL to ApsaraDB for SelectDB by using the custom SelectDB connector.
Prepare for data synchronization.
Create a Flink workspace, an ApsaraDB RDS for MySQL instance, and an ApsaraDB for SelectDB instance.
In the ApsaraDB RDS for MySQL console, create a database named
order_dw_mysqland a table namedorders, and import test data into the table.CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);After connecting to an ApsaraDB for SelectDB instance by using DMS, create a database named
selectdband a table namedselecttable.CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;Add the CIDR Block of the vSwitch in your Flink workspace to the IP address whitelist of your ApsaraDB for SelectDB instance. For more information, see How do I configure an IP address whitelist?.
In the Realtime Compute for Apache Flink console, develop an SQL job and start it.
Create a MySQL catalog named
mysqlcatalog. For more information, see Manage MySQL catalogs.Click JAR file to download the SelectDB connector (version 1.15 to 1.17) JAR, and upload the JAR file. For more information, see Manage custom connectors.
Go to , click New to create a blank stream draft, and copy the following code to the draft:
CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;Click Deploy and start the deployment in the initial mode. For more information, see Create a deployment and Start a deployment.
After connecting to an ApsaraDB for SelectDB instance by using DMS, query data in the
selecttabletable.SELECT * FROM `selecttable` ;
Data ingestion
The SelectDB connector can be used as a data ingestion sink.
Syntax
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1
Configuration options
Option | Description | Required? | Data Type | Default value | Remarks |
| The sink type. | Yes | String | No default value | Set it to |
| The sink name. | No | String | No default value | |
| The endpoint and HTTP port of the ApsaraDB for SelectDB instance. | Yes | String | No default value | To obtain the VPC or public endpoint of your SelectDB instance, go to the ApsaraDB for SelectDB console, click your instance name, and find the information in the Network Information section. Example: |
| BE HTTP address. | No | String | No default value | Example: |
| The JDBC connection information of the ApsaraDB for SelectDB instance. | No | String | No default value | To obtain the VPC or public endpoint and MySQL port of your SelectDB instance, go to the ApsaraDB for SelectDB console, click your instance name, and find the information in the Network Information section. Example: |
| The cluster username of the ApsaraDB for SelectDB instance. | Yes | String | No default value | |
| The cluster password of the ApsaraDB for SelectDB instance. | No | String | No default value | |
| Specifies whether to redirect stream load requests. When enabled, stream load will write data through FE without explicitly obtaining BE information. | No | String |
| Whether to write through FE redirection and directly connect to BE to write |
| Charset encoding for the HTTP client. | No | Boolean |
| |
| Specifies whether to use batch mode to write to SelectDB. When enabled, writing does not depend on checkpoints, but is controlled by When enabled, exactly-once semantics are not guaranteed. To achieve idempotence, use the Unique model. | No | Boolean |
| |
| The queue size for batch writing. | No | Integer |
| |
| The maximum number of records to flush in a single batch. | No | Integer |
| |
| The maximum number of bytes to flush in a single batch. | No | Integer |
| |
| The flush interval. If this time is exceeded, the data will be flushed asynchronously. Minimum: 1 second. | No | String | 10s | |
sink.properties. | Import parameters for Stream Load. Please enter property configurations.
| No | String | No default value | Example: |
| Properties configuration for table creation. | No | String | No default value | Example: |
Data type mappings
Flink CDC Type | SelectDB Type |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) Note In Doris, strings are UTF-8 encoded, so each English character takes 1 byte and each Chinese character takes 3 bytes. Hence the length is multiplied by 3 here. The maximum length of a CHAR is 255. Once it is exceeded, a CHAR will automatically convert to a VARCHAR. |
VARCHAR(n) | VARCHAR(n*3) Note In Doris, strings are UTF-8 encoded, so each English character takes 1 byte and each Chinese character takes 3 bytes. Hence the length is multiplied by 3 here. The maximum length of VARCHAR is 65533. Once exceeded, a VARCHAR will automatically convert to a STRING. |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |