The SelectDB connector integrates Realtime Compute for Apache Flink with ApsaraDB for SelectDB, a fully managed, Apache Doris-compatible real-time data warehouse on Alibaba Cloud. Use it to build real-time pipelines that read from, write to, or look up data in SelectDB, and to run full database synchronization in YAML-based data ingestion jobs.
Supported capabilities:
| Category | Details |
|---|---|
| Table types | Source table, sink table, dimension table, data ingestion sink |
| Running mode | Stream and batch |
| Data format | JSON and CSV |
| API type | DataStream and SQL |
| Update/Delete support | Yes |
| Monitoring metrics | None |
Key features:
-
Full database data synchronization
-
Exactly-once semantics via two-phase commit (2PC) — no duplicate or lost records
-
Compatible with Apache Doris 1.0 and later
Prerequisites
Before you begin, make sure you have:
-
Realtime Compute for Apache Flink with Ververica Runtime (VVR) 8.0.10 or later
-
An ApsaraDB for SelectDB instance. See Create an instance.
-
An IP address whitelist configured on the instance. See Configure a whitelist.
Set up the connector
The SelectDB connector is built into VVR 11.1 and later — no manual installation required.
For VVR 8.0.10 through 11.0, install the connector manually:
-
Download the JAR package from Maven Central (Flink versions 1.15–1.17).
-
Upload the JAR to your Realtime Compute for Apache Flink development console. See Manage custom connectors.
-
Reference the connector in your SQL job using
'connector' = 'doris'.
SQL
Syntax
All three table types — source, sink, and dimension — share the same DDL syntax. Specify the table role through the parameters you include.
To use SelectDB as a source table, enable direct cluster connection first. In the ApsaraDB for SelectDB console, go to Instance Details > Network Information and click Enable Direct Cluster Connection. This activates the Arrow Flight SQL protocol for high-throughput parallel reads.
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);
Parameters
General
| Parameter | Required | Default | Description |
|---|---|---|---|
connector |
Yes | — | Fixed to doris. |
fenodes |
Yes | — | HTTP endpoint of the SelectDB instance: <VPC Address or Public Address>:<HTTP Protocol Port>. Get both from Instance Details > Network Information in the SelectDB console. Example: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080. |
jdbc-url |
No | — | Java Database Connectivity (JDBC) connection string for dimension table lookups and metadata queries: jdbc:mysql://<VPC Address or Public Address>:<MySQL Protocol Port>. Example: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030. |
table.identifier |
Yes | — | Target table in <database>.<table> format. Example: db.tbl. |
username |
Yes | — | Database username. Reset the password from the upper-right corner of the Instance Details page if needed. |
password |
Yes | — | Password for the database username. |
doris.request.retries |
No | 3 |
Number of retries for failed requests. |
doris.request.connect.timeout |
No | 30s |
Connection timeout. |
doris.request.read.timeout |
No | 30s |
Read timeout. |
Source table
| Parameter | Required | Default | Description |
|---|---|---|---|
doris.request.query.timeout |
No | 21600s |
Query timeout (6 hours by default). |
doris.request.tablet.size |
No | 1 |
Number of tablets per partition. Lower values increase Flink parallelism but add more pressure on the database. |
doris.batch.size |
No | 4064 |
Maximum rows read from a Backend (BE) node per request. Increase to reduce connection overhead and network latency. |
doris.exec.mem.limit |
No | 8192mb |
Memory limit per query in bytes (8 GB by default). |
source.use-flight-sql |
No | false |
No configuration needed — enabling Direct Cluster Connection in the SelectDB console activates Arrow Flight SQL automatically. |
source.flight-sql-port |
No | — | Arrow Flight SQL port (arrow_flight_sql_port) of the Frontend (FE) node. |
Sink table
Write mode affects delivery guarantees and flush behavior. Choose based on your consistency requirements:
| Streaming write | Batch write | |
|---|---|---|
| Trigger condition | Follows Flink checkpoint intervals | Periodic flush by data volume or time threshold |
| Delivery guarantee | Exactly-once (via 2PC) | At-least-once; achieve idempotence with the Unique model |
| Latency | Bounded by checkpoint interval | Flexible, independent of checkpoints |
| Fault tolerance | Full Flink state recovery | Relies on Unique model deduplication |
| Parameter | Required | Default | Description |
|---|---|---|---|
sink.label-prefix |
No | — | Label prefix for Stream Load imports. Must be globally unique across all jobs — the same label can only be committed once. Required to guarantee exactly-once semantics across job restarts. |
sink.properties.* |
No | — | Stream Load import parameters passed directly to the SelectDB Stream Load API. See examples below. |
sink.enable-delete |
No | true |
Propagate DELETE operations. Requires the Doris table to have batch deletion enabled and only works with the Unique model. |
sink.enable-2pc |
No | true |
Enable two-phase commit (2PC) for exactly-once semantics. See Explicit Transaction Operations. |
sink.buffer-size |
No | 1 MB |
Write cache buffer size in bytes. Leave at the default. |
sink.buffer-count |
No | 3 |
Number of write cache buffers. Leave at the default. |
sink.max-retries |
No | 3 |
Maximum retries after a commit failure. |
sink.enable.batch-mode |
No | false |
Switch to batch write mode. Flush is controlled by the three sink.buffer-flush.* parameters below instead of checkpoints. Exactly-once is not guaranteed; use the Unique model for idempotence. |
sink.flush.queue-size |
No | 2 |
Cache queue size in batch mode. |
sink.buffer-flush.max-rows |
No | 500000 |
Maximum rows per flush in batch mode. |
sink.buffer-flush.max-bytes |
No | 100 MB |
Maximum bytes per flush in batch mode. |
sink.buffer-flush.interval |
No | 10s |
Flush interval in batch mode. |
sink.ignore.update-before |
No | true |
Ignore update-before events from Flink CDC. |
**sink.properties.* examples:**
CSV format:
'sink.properties.column_separator' = ','
-- If values may contain commas, use a non-printable separator:
-- 'sink.properties.column_separator' = '\x01'
JSON format:
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
-- Alternatively: 'sink.properties.strip_outer_array' = 'true'
Dimension table
| Parameter | Required | Default | Description |
|---|---|---|---|
lookup.cache.max-rows |
No | -1 |
Maximum rows in the lookup cache. -1 disables caching. |
lookup.cache.ttl |
No | 10s |
Cache entry time-to-live (TTL). |
lookup.max-retries |
No | 1 |
Retries after a lookup query fails. |
lookup.jdbc.async |
No | false |
Enable asynchronous lookup. |
lookup.jdbc.read.batch.size |
No | 128 |
Maximum batch size per query in async lookup mode. |
lookup.jdbc.read.batch.queue-size |
No | 256 |
Intermediate buffer queue size in async lookup mode. |
lookup.jdbc.read.thread-size |
No | 3 |
JDBC lookup threads per task in async lookup mode. |
Examples
Source table
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);
Sink table
CREATE TEMPORARY TABLE selectdb_sink (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
'sink.label-prefix' = 'flink_orders' -- Must be globally unique across jobs
);
Dimension table
SelectDB acts as a lookup dimension table joined against a streaming fact table.
-- Fact table from Kafka
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` AS proctime()
) WITH (
'connector' = 'kafka',
...
);
-- Dimension table from SelectDB
CREATE TEMPORARY TABLE dim_city (
`city` STRING,
`level` INT,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
-- Temporal join
SELECT a.id, a.name, a.city, c.province, c.country, c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city;
Data ingestion
Use the SelectDB connector as a sink in YAML-based data ingestion jobs for full database synchronization.
Syntax
source:
type: <source-type>
sink:
type: doris
name: Doris Sink
fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
username: root
password: ""
Parameters
| Parameter | Required | Default | Description |
|---|---|---|---|
type |
Yes | — | Fixed to doris. |
name |
No | — | Descriptive name for the sink. |
fenodes |
Yes | — | HTTP endpoint: <VPC Address or Public Address>:<HTTP Protocol Port>. Get both from Instance Details > Network Information in the SelectDB console. Example: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080. |
jdbc-url |
No | — | JDBC connection string. Example: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030. |
username |
Yes | — | Database username. |
password |
Yes | — | Password for the database username. |
sink.enable.batch-mode |
No | true |
Batch mode is on by default in data ingestion jobs. Flush is controlled by the three sink.buffer-flush.* parameters. Exactly-once is not guaranteed; use the Unique model for idempotence. |
sink.flush.queue-size |
No | 2 |
Cache queue size. |
sink.buffer-flush.max-rows |
No | 500000 |
Maximum rows per flush. |
sink.buffer-flush.max-bytes |
No | 100 MB |
Maximum bytes per flush. |
sink.buffer-flush.interval |
No | 10s |
Flush interval. Minimum: 1s. |
sink.properties.* |
No | — | Stream Load import parameters. |
**sink.properties.* examples:**
CSV format:
sink.properties.column_separator: ','
# If values may contain commas, use a non-printable separator:
# sink.properties.column_separator: '\x01'
JSON format:
sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true'
Type mapping
Flink to SelectDB
| Flink CDC type | SelectDB type | Notes |
|---|---|---|
TINYINT |
TINYINT |
|
SMALLINT |
SMALLINT |
|
INT |
INT |
|
BIGINT |
BIGINT |
|
DECIMAL |
DECIMAL |
|
FLOAT |
FLOAT |
|
DOUBLE |
DOUBLE |
|
BOOLEAN |
BOOLEAN |
|
DATE |
DATE |
|
TIMESTAMP[(p)] |
DATETIME[(p)] |
|
TIMESTAMP_LTZ[(p)] |
DATETIME[(p)] |
|
CHAR(n) |
CHAR(n*3) |
SelectDB stores strings in UTF-8. English characters occupy 1 byte; Chinese characters occupy 3 bytes. Maximum CHAR length is 255; longer values are auto-converted to VARCHAR. |
VARCHAR(n) |
VARCHAR(n*3) |
Same UTF-8 multiplier applies. Maximum VARCHAR length is 65533; longer values are auto-converted to STRING. |
BINARY(n) |
STRING |
|
VARBINARY(n) |
STRING |
|
STRING |
STRING |