BitSail is a distributed data integration engine that supports offline, real-time, full, and incremental synchronization across heterogeneous data sources — including MySQL, Hive, and Kafka. ApsaraDB for SelectDB integrates with BitSail through the SelectDB Sink connector, which writes data directly into your SelectDB instance via the Stream Load HTTP API.
Prerequisites
Before you begin, make sure you have:
-
BitSail 0.1.0 or later installed
-
An ApsaraDB for SelectDB instance with at least one cluster
-
The HTTP port and MySQL port of your SelectDB instance (see Get connection details)
How it works
The SelectDB Sink connector reads the job.writer section of your BitSail job configuration and streams data into SelectDB. Each write job:
-
Authenticates with the SelectDB instance using the credentials you provide.
-
Buffers incoming records up to the configured buffer size or until the flush interval elapses.
-
Submits buffered data to the target table via COPY INTO statements (JSON format by default) or Stream Load.
-
Retries failed writes up to the configured retry limit before the job fails.
Get connection details
To get the endpoint and port values required by the connector:
-
Log on to the ApsaraDB for SelectDB console.
-
Go to the Instance Details page of your instance.
-
On the Basic Information page, find the Network Information section.
-
Copy the VPC Endpoint or Public Endpoint value, along with the HTTP Port and MySQL Port values.
Configure the SelectDB Sink connector
Add a job.writer block to your BitSail job configuration file and set the following parameters.
Required parameters
| Parameter | Description | Example |
|---|---|---|
class |
The write connector class. Always set this to com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink. |
com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink |
load_url |
The endpoint and HTTP port of your SelectDB instance. | selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080 |
jdbc_url |
The endpoint and MySQL port of your SelectDB instance. | selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
cluster_name |
The name of the cluster in your SelectDB instance. | new_cluster |
user |
The username for connecting to the SelectDB instance. | admin |
password |
The password for connecting to the SelectDB instance. | — |
table_identifier |
The target table in <database>.<table> format. |
test_db.test_table |
columns |
The column definitions for the target table, including index, name, and type. | See example below. |
Optional parameters
Write behavior
| Parameter | Default | Description |
|---|---|---|
sink_write_mode |
— | The write mode. Set to BATCH_UPSERT to enable batch upsert mode. |
sink_flush_interval_ms |
5000 |
How often (in milliseconds) buffered data is flushed to SelectDB in upsert mode. |
sink_buffer_size |
1048576 (1 MB) |
The maximum buffer size per write, in bytes. |
sink_buffer_count |
3 |
The number of write buffers to initialize. |
sink_max_retries |
3 |
The maximum number of retry attempts for a failed write. |
sink_enable_delete |
— | Set to true to propagate DELETE events to SelectDB. |
writer_parallelism_num |
— | The number of parallel write tasks. |
Data format
| Parameter | Default | Description |
|---|---|---|
load_contend_type |
JSON |
The format used by COPY INTO statements. Valid values: CSV, JSON. |
csv_field_delimiter |
, |
The field delimiter when load_contend_type is CSV. |
csv_line_delimiter |
\n |
The row delimiter when load_contend_type is CSV. |
stream_load_properties |
— | Additional properties appended to the Stream Load URL, in Map<String,String> format. |
Import synthetic data into SelectDB
This example uses BitSail's built-in FakeSource connector to generate synthetic records and write them to a SelectDB table. Use it to verify your connector configuration before connecting a real data source.
Step 1: Set up your environment
-
Download and extract the BitSail installation package:
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gz -
In the ApsaraDB for SelectDB console, complete the following:
-
Create a SelectDB instance if you don't have one already.
-
Connect to the instance over the MySQL protocol.
-
Create a test database and table:
CREATE DATABASE test_db; CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" ); -
Apply for a public endpoint for the instance.
-
Add the BitSail host's IP address to the instance's IP address whitelist.
-
Step 2: Create the job configuration
Create a file named test.json with the following content. Replace load_url, jdbc_url, cluster_name, user, and password with your own values.
{
"job": {
"common": {
"job_id": -2413,
"job_name": "bitsail_fake_to_selectdb_test",
"instance_id": -20413,
"user_name": "user"
},
"reader": {
"class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
"total_count": 300,
"rate": 10000,
"random_null_rate": 0,
"unique_fields": "id",
"columns_with_fixed_value": [
{
"name": "partition_date",
"fixed_value": "2022-10-10"
}
],
"columns": [
{ "index": 0, "name": "id", "type": "long" },
{ "index": 1, "name": "bigint_type", "type": "long" },
{ "index": 2, "name": "string_type", "type": "string" },
{ "index": 3, "name": "double_type", "type": "double" },
{ "index": 4, "name": "decimal_type", "type": "double" },
{ "index": 5, "name": "date_type", "type": "date.date" },
{ "index": 6, "name": "partition_date", "type": "string" }
]
},
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080",
"jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030",
"cluster_name": "new_cluster",
"user": "admin",
"password": "****",
"table_identifier": "test_db.test_table",
"columns": [
{ "index": 0, "name": "id", "type": "bigint" },
{ "index": 1, "name": "bigint_type", "type": "bigint" },
{ "index": 2, "name": "string_type", "type": "varchar" },
{ "index": 3, "name": "double_type", "type": "double" },
{ "index": 4, "name": "decimal_type", "type": "double" },
{ "index": 5, "name": "date_type", "type": "date" },
{ "index": 6, "name": "partition_date", "type": "date" }
]
}
}
}
Step 3: Submit the job
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json
If the job succeeds, query the target table to confirm the rows were written:
SELECT COUNT(*) FROM test_db.test_table;
The result should show 300 rows.