All Products
Search
Document Center

Realtime Compute for Apache Flink:SelectDB

Last Updated:Mar 26, 2026

The SelectDB connector integrates Realtime Compute for Apache Flink with ApsaraDB for SelectDB, a fully managed, Apache Doris-compatible real-time data warehouse on Alibaba Cloud. Use it to build real-time pipelines that read from, write to, or look up data in SelectDB, and to run full database synchronization in YAML-based data ingestion jobs.

Supported capabilities:

Category Details
Table types Source table, sink table, dimension table, data ingestion sink
Running mode Stream and batch
Data format JSON and CSV
API type DataStream and SQL
Update/Delete support Yes
Monitoring metrics None

Key features:

  • Full database data synchronization

  • Exactly-once semantics via two-phase commit (2PC) — no duplicate or lost records

  • Compatible with Apache Doris 1.0 and later

Prerequisites

Before you begin, make sure you have:

  • Realtime Compute for Apache Flink with Ververica Runtime (VVR) 8.0.10 or later

  • An ApsaraDB for SelectDB instance. See Create an instance.

  • An IP address whitelist configured on the instance. See Configure a whitelist.

Set up the connector

The SelectDB connector is built into VVR 11.1 and later — no manual installation required.

For VVR 8.0.10 through 11.0, install the connector manually:

  1. Download the JAR package from Maven Central (Flink versions 1.15–1.17).

  2. Upload the JAR to your Realtime Compute for Apache Flink development console. See Manage custom connectors.

  3. Reference the connector in your SQL job using 'connector' = 'doris'.

SQL

Syntax

All three table types — source, sink, and dimension — share the same DDL syntax. Specify the table role through the parameters you include.

To use SelectDB as a source table, enable direct cluster connection first. In the ApsaraDB for SelectDB console, go to Instance Details > Network Information and click Enable Direct Cluster Connection. This activates the Arrow Flight SQL protocol for high-throughput parallel reads.
CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****'
);

Parameters

General

Parameter Required Default Description
connector Yes Fixed to doris.
fenodes Yes HTTP endpoint of the SelectDB instance: <VPC Address or Public Address>:<HTTP Protocol Port>. Get both from Instance Details > Network Information in the SelectDB console. Example: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.
jdbc-url No Java Database Connectivity (JDBC) connection string for dimension table lookups and metadata queries: jdbc:mysql://<VPC Address or Public Address>:<MySQL Protocol Port>. Example: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030.
table.identifier Yes Target table in <database>.<table> format. Example: db.tbl.
username Yes Database username. Reset the password from the upper-right corner of the Instance Details page if needed.
password Yes Password for the database username.
doris.request.retries No 3 Number of retries for failed requests.
doris.request.connect.timeout No 30s Connection timeout.
doris.request.read.timeout No 30s Read timeout.

Source table

Parameter Required Default Description
doris.request.query.timeout No 21600s Query timeout (6 hours by default).
doris.request.tablet.size No 1 Number of tablets per partition. Lower values increase Flink parallelism but add more pressure on the database.
doris.batch.size No 4064 Maximum rows read from a Backend (BE) node per request. Increase to reduce connection overhead and network latency.
doris.exec.mem.limit No 8192mb Memory limit per query in bytes (8 GB by default).
source.use-flight-sql No false No configuration needed — enabling Direct Cluster Connection in the SelectDB console activates Arrow Flight SQL automatically.
source.flight-sql-port No Arrow Flight SQL port (arrow_flight_sql_port) of the Frontend (FE) node.

Sink table

Write mode affects delivery guarantees and flush behavior. Choose based on your consistency requirements:

Streaming write Batch write
Trigger condition Follows Flink checkpoint intervals Periodic flush by data volume or time threshold
Delivery guarantee Exactly-once (via 2PC) At-least-once; achieve idempotence with the Unique model
Latency Bounded by checkpoint interval Flexible, independent of checkpoints
Fault tolerance Full Flink state recovery Relies on Unique model deduplication
Parameter Required Default Description
sink.label-prefix No Label prefix for Stream Load imports. Must be globally unique across all jobs — the same label can only be committed once. Required to guarantee exactly-once semantics across job restarts.
sink.properties.* No Stream Load import parameters passed directly to the SelectDB Stream Load API. See examples below.
sink.enable-delete No true Propagate DELETE operations. Requires the Doris table to have batch deletion enabled and only works with the Unique model.
sink.enable-2pc No true Enable two-phase commit (2PC) for exactly-once semantics. See Explicit Transaction Operations.
sink.buffer-size No 1 MB Write cache buffer size in bytes. Leave at the default.
sink.buffer-count No 3 Number of write cache buffers. Leave at the default.
sink.max-retries No 3 Maximum retries after a commit failure.
sink.enable.batch-mode No false Switch to batch write mode. Flush is controlled by the three sink.buffer-flush.* parameters below instead of checkpoints. Exactly-once is not guaranteed; use the Unique model for idempotence.
sink.flush.queue-size No 2 Cache queue size in batch mode.
sink.buffer-flush.max-rows No 500000 Maximum rows per flush in batch mode.
sink.buffer-flush.max-bytes No 100 MB Maximum bytes per flush in batch mode.
sink.buffer-flush.interval No 10s Flush interval in batch mode.
sink.ignore.update-before No true Ignore update-before events from Flink CDC.

**sink.properties.* examples:**

CSV format:

'sink.properties.column_separator' = ','
-- If values may contain commas, use a non-printable separator:
-- 'sink.properties.column_separator' = '\x01'

JSON format:

'sink.properties.format'            = 'json',
'sink.properties.read_json_by_line' = 'true'
-- Alternatively: 'sink.properties.strip_outer_array' = 'true'

Dimension table

Parameter Required Default Description
lookup.cache.max-rows No -1 Maximum rows in the lookup cache. -1 disables caching.
lookup.cache.ttl No 10s Cache entry time-to-live (TTL).
lookup.max-retries No 1 Retries after a lookup query fails.
lookup.jdbc.async No false Enable asynchronous lookup.
lookup.jdbc.read.batch.size No 128 Maximum batch size per query in async lookup mode.
lookup.jdbc.read.batch.queue-size No 256 Intermediate buffer queue size in async lookup mode.
lookup.jdbc.read.thread-size No 3 JDBC lookup threads per task in async lookup mode.

Examples

Source table

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****'
);

Sink table

CREATE TEMPORARY TABLE selectdb_sink (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****',
  'sink.label-prefix' = 'flink_orders'  -- Must be globally unique across jobs
);

Dimension table

SelectDB acts as a lookup dimension table joined against a streaming fact table.

-- Fact table from Kafka
CREATE TEMPORARY TABLE fact_table (
  `id`           BIGINT,
  `name`         STRING,
  `city`         STRING,
  `process_time` AS proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

-- Dimension table from SelectDB
CREATE TEMPORARY TABLE dim_city (
  `city`     STRING,
  `level`    INT,
  `province` STRING,
  `country`  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url'         = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username'         = 'admin',
  'password'         = '****'
);

-- Temporal join
SELECT a.id, a.name, a.city, c.province, c.country, c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city;

Data ingestion

Use the SelectDB connector as a sink in YAML-based data ingestion jobs for full database synchronization.

Syntax

source:
  type: <source-type>

sink:
  type: doris
  name: Doris Sink
  fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
  username: root
  password: ""

Parameters

Parameter Required Default Description
type Yes Fixed to doris.
name No Descriptive name for the sink.
fenodes Yes HTTP endpoint: <VPC Address or Public Address>:<HTTP Protocol Port>. Get both from Instance Details > Network Information in the SelectDB console. Example: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080.
jdbc-url No JDBC connection string. Example: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030.
username Yes Database username.
password Yes Password for the database username.
sink.enable.batch-mode No true Batch mode is on by default in data ingestion jobs. Flush is controlled by the three sink.buffer-flush.* parameters. Exactly-once is not guaranteed; use the Unique model for idempotence.
sink.flush.queue-size No 2 Cache queue size.
sink.buffer-flush.max-rows No 500000 Maximum rows per flush.
sink.buffer-flush.max-bytes No 100 MB Maximum bytes per flush.
sink.buffer-flush.interval No 10s Flush interval. Minimum: 1s.
sink.properties.* No Stream Load import parameters.

**sink.properties.* examples:**

CSV format:

sink.properties.column_separator: ','
# If values may contain commas, use a non-printable separator:
# sink.properties.column_separator: '\x01'

JSON format:

sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true'

Type mapping

Flink to SelectDB

Flink CDC type SelectDB type Notes
TINYINT TINYINT
SMALLINT SMALLINT
INT INT
BIGINT BIGINT
DECIMAL DECIMAL
FLOAT FLOAT
DOUBLE DOUBLE
BOOLEAN BOOLEAN
DATE DATE
TIMESTAMP[(p)] DATETIME[(p)]
TIMESTAMP_LTZ[(p)] DATETIME[(p)]
CHAR(n) CHAR(n*3) SelectDB stores strings in UTF-8. English characters occupy 1 byte; Chinese characters occupy 3 bytes. Maximum CHAR length is 255; longer values are auto-converted to VARCHAR.
VARCHAR(n) VARCHAR(n*3) Same UTF-8 multiplier applies. Maximum VARCHAR length is 65533; longer values are auto-converted to STRING.
BINARY(n) STRING
VARBINARY(n) STRING
STRING STRING

What's next