All Products
Search
Document Center

Realtime Compute for Apache Flink:SelectDB connector

Last Updated:Aug 06, 2025

This topic describes how to use the custom SelectDB connector to write data to ApsaraDB for SelectDB.

Background information

ApsaraDB for SelectDB is a next-generation real-time data warehouse service. It is fully managed and hosted on Alibaba Cloud and 100% compatible with Apache Doris. ApsaraDB for SelectDB can accommodate your needs to analyze massive amounts of data. For more information about the service's benefits and use cases, see What is ApsaraDB for SelectDB.

The following table describes the capabilities supported by the custom SelectDB connector.

Item

Description

Supported type

Sink table; data ingestion sink

Running mode

Streaming and batch

Data format

JSON and CSV

Metrics

N/A

API

DataStream API and SQL API

Data update/deletion in the sink

Supported

Features

  • Database synchronization.

  • Exactly-once semantics, ensuring no duplicates or omissions.

  • Compatibility with Apache Doris 1.0 or later, enabling seamless data synchronization to Apache Doris via the custom SelectDB connector.

Usage notes

  • Only Ververica Runtime (VVR) 8.0.10 or later supports the custom SelectDB connector.

  • If you have any questions when using the custom SelectDB connector, submit a ticket to ApsaraDB for SelectDB.

  • The prerequisites for synchronizing data to ApsaraDB for SelectDB are as follows:

SQL

The SelectDB connector can be used as a sink table in SQL jobs.

Upload and configure the connector

Note

Starting from VVR 11.1, the SelectDB connector becomes a built-in connector, so you can skip the following steps.

  1. Click JAR file to download a SelectDB connector JAR (version 1.15 to 1.17).

  2. Upload the SelectDB connector JAR to the Realtime Compute for Apache Flink console. For more information, see Manage custom connectors.

  3. Create an SQL draft and use the custom SelectDB connector.

    Set the connector option to doris. For information about other sink options, see Configuration items of Doris sink.

Syntax

CREATE TABLE selectdb_sink (
  emp_no       INT ,
  birth_date   DATE,
  first_name   STRING,
  last_name    STRING,
  gender       STRING,
  hire_date    DATE
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'test.employees',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'true'
);

Data type mappings

See the "Type Mapping" section of the Flink Doris connector topic in the Doris documentation.

Use the connector

This section illustrates how to synchronize data from ApsaraDB RDS for MySQL to ApsaraDB for SelectDB by using the custom SelectDB connector.

  1. Prepare for data synchronization.

    1. Create a Flink workspace, an ApsaraDB RDS for MySQL instance, and an ApsaraDB for SelectDB instance.

    2. In the ApsaraDB RDS for MySQL console, create a database named order_dw_mysql and a table named orders, and import test data into the table.

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. After connecting to an ApsaraDB for SelectDB instance by using DMS, create a database named selectdb and a table named selecttable.

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. Add the CIDR Block of the vSwitch in your Flink workspace to the IP address whitelist of your ApsaraDB for SelectDB instance. For more information, see How do I configure an IP address whitelist?.

  2. In the Realtime Compute for Apache Flink console, develop an SQL job and start it.

    1. Create a MySQL catalog named mysqlcatalog. For more information, see Manage MySQL catalogs.

    2. Click JAR file to download the SelectDB connector (version 1.15 to 1.17) JAR, and upload the JAR file. For more information, see Manage custom connectors.

    3. Go to Development > ETL, click New to create a blank stream draft, and copy the following code to the draft:

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. Click Deploy and start the deployment in the initial mode. For more information, see Create a deployment and Start a deployment.

  3. After connecting to an ApsaraDB for SelectDB instance by using DMS, query data in the selecttable table.

    SELECT * FROM `selecttable` ;

Data ingestion

The SelectDB connector can be used as a data ingestion sink.

Syntax

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: 127.0.0.1:8030
   username: root
   password: ""
   table.create.properties.replication_num: 1

Configuration options

Option

Description

Required?

Data Type

Default value

Remarks

type

The sink type.

Yes

String

No default value

Set it to doris.

name

The sink name.

No

String

No default value

fenodes

The endpoint and HTTP port of the ApsaraDB for SelectDB instance.

Yes

String

No default value

To obtain the VPC or public endpoint of your SelectDB instance, go to the ApsaraDB for SelectDB console, click your instance name, and find the information in the Network Information section.

Example: selectdb-sg-***.selectdbfe.ap-southeast-6.rds.aliyuncs.com:8080.

benodes

BE HTTP address.

No

String

No default value

Example: 127.0.0.1:8040

jdbc-url

The JDBC connection information of the ApsaraDB for SelectDB instance.

No

String

No default value

To obtain the VPC or public endpoint and MySQL port of your SelectDB instance, go to the ApsaraDB for SelectDB console, click your instance name, and find the information in the Network Information section.

Example: jdbc:mysql://selectdb-sg-***.selectdbfe.ap-southeast-6.rds.aliyuncs.com:9030.

username

The cluster username of the ApsaraDB for SelectDB instance.

Yes

String

No default value

password

The cluster password of the ApsaraDB for SelectDB instance.

No

String

No default value

auto-redirect

Specifies whether to redirect stream load requests. When enabled, stream load will write data through FE without explicitly obtaining BE information.

No

String

false

Whether to write through FE redirection and directly connect to BE to write

charset-encoding

Charset encoding for the HTTP client.

No

Boolean

UTF-8

sink.enable.batch-mode

Specifies whether to use batch mode to write to SelectDB. When enabled, writing does not depend on checkpoints, but is controlled by sink.buffer-flush.max-rows, sink.buffer-flush.max-bytes, and sink.buffer-flush.interval.

When enabled, exactly-once semantics are not guaranteed. To achieve idempotence, use the Unique model.

No

Boolean

true

sink.flush.queue-size

The queue size for batch writing.

No

Integer

2

sink.buffer-flush.max-rows

The maximum number of records to flush in a single batch.

No

Integer

50000

sink.buffer-flush.max-bytes

The maximum number of bytes to flush in a single batch.

No

Integer

10485760(10MB)

sink.buffer-flush.interval

The flush interval. If this time is exceeded, the data will be flushed asynchronously. Minimum: 1 second.

No

String

10s

sink.properties.

Import parameters for Stream Load. Please enter property configurations.

  • For CSV format, configure:

    sink.properties.format='csv' 
    sink.properties.column_separator=','
    sink.properties.line_delimiter='\n' 
  • For JSON format, configure:

    sink.properties.format='json' 

No

String

No default value

Example: sink.properties.strict_mode: true. For more information, see Use Stream Load to import data.

table.create.properties.*

Properties configuration for table creation.

No

String

No default value

Example: table.create.properties.replication_num: 1. See also Doris table properties.

Data type mappings

Flink CDC Type

SelectDB Type

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

Note

In Doris, strings are UTF-8 encoded, so each English character takes 1 byte and each Chinese character takes 3 bytes. Hence the length is multiplied by 3 here. The maximum length of a CHAR is 255. Once it is exceeded, a CHAR will automatically convert to a VARCHAR.

VARCHAR(n)

VARCHAR(n*3)

Note

In Doris, strings are UTF-8 encoded, so each English character takes 1 byte and each Chinese character takes 3 bytes. Hence the length is multiplied by 3 here. The maximum length of VARCHAR is 65533. Once exceeded, a VARCHAR will automatically convert to a STRING.

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING