ApsaraDB for SelectDB is integrated with BitSail. You can use BitSail SelectDB Sink to import table data to ApsaraDB for SelectDB. This topic describes how to use BitSail SelectDB Sink to synchronize data to ApsaraDB for SelectDB.
Overview
BitSail is a high-performance data integration engine based on the distributed architecture. BitSail supports data synchronization between multiple heterogeneous data sources and provides comprehensive solutions that cover offline, real-time, full, and incremental data integration scenarios. You can use BitSail to read large amounts of data from data sources such as MySQL, Hive, and Kafka, and then use BitSail SelectDB Sink to write the data to ApsaraDB for SelectDB.
Prerequisites
BitSail 0.1.0 or later is installed.
How it works
Configure the SelectDB connector parameters in job.writer of BitSail. The following sample code provides an example:
{
"job": {
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "<selectdb_http_address>",
"jdbc_url": "<selectdb_mysql_address>",
"cluster_name": "<selectdb_cluster_name>",
"user": "<username>",
"password": "<password>",
"table_identifier": "<selectdb_table_identifier>",
"columns": [
{
"index": 0,
"name": "id",
"type": "int"
},
{
"index": 1,
"name": "bigint_type",
"type": "bigint"
},
{
"index": 2,
"name": "string_type",
"type": "varchar"
},
{
"index": 3,
"name": "double_type",
"type": "double"
},
{
"index": 4,
"name": "date_type",
"type": "date"
}
]
}
}
}The following table describes the parameters in the connector configurations.
Parameter | Required | Description |
class | Yes | The type of the write connector for ApsaraDB for SelectDB. Default value: |
load_url | Yes | The endpoint and HTTP port that are used to access the ApsaraDB for SelectDB instance. To obtain the virtual private cloud (VPC) endpoint or public endpoint and HTTP port of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the ApsaraDB for SelectDB instance. On the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter in the Network Information section. Example: |
jdbc_url | Yes | The endpoint and MySQL port that are used to access the ApsaraDB for SelectDB instance. To obtain the VPC endpoint or public endpoint and MySQL port of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the ApsaraDB for SelectDB instance. On the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the MySQL Port parameter in the Network Information section. Example: |
cluster_name | Yes | The name of the cluster in the ApsaraDB for SelectDB instance. An ApsaraDB for SelectDB instance may contain multiple clusters. Select a cluster based on your business requirements. |
user | Yes | The username that is used to connect to the ApsaraDB for SelectDB instance. |
password | Yes | The password that is used to connect to the ApsaraDB for SelectDB instance. |
table_identifier | Yes | The name of the table in the ApsaraDB for SelectDB instance. Specify the name in the format of |
writer_parallelism_num | No | The number of concurrent writes to ApsaraDB for SelectDB. |
sink_flush_interval_ms | No | The interval at which data is flushed in UPSERT mode. Unit: millisecond. Default value: 5000. |
sink_max_retries | No | The maximum number of retries to write data. Default value: 3. |
sink_buffer_size | No | The maximum buffer size for data writing. Unit: bytes. Default value: 1048576, which is equal to 1 MB. |
sink_buffer_count | No | The number of buffers to be initialized. Default value: 3. |
sink_enable_delete | No | Specifies whether to synchronize DELETE events. |
sink_write_mode | No | The write mode. Set the value to BATCH_UPSERT. |
stream_load_properties | No | The parameters that are appended to the Stream Load URL in the Map<String,String> format. |
load_contend_type | No | The format of the data that is written by executing the COPY INTO statement. Valid values: CSV and JSON. Default value: JSON. |
csv_field_delimiter | No | The field delimiter for the CSV format. By default, a comma (,) is used. |
csv_line_delimiter | No | The row delimiter for the CSV format. By default, \n is used. |
Example
In this example, BitSail is used to construct a data source and import upstream data to ApsaraDB for SelectDB.
Prepare the environment
Download and decompress the BitSail installation package.
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gzConfigure an ApsaraDB for SelectDB instance.
Create an ApsaraDB for SelectDB instance.
Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.
Create a test database and a test table.
Create a test database.
CREATE DATABASE test_db;Create a test table.
CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" );
Apply for a public endpoint for the ApsaraDB for SelectDB instance. For more information, see Apply for or release a public endpoint.
Add the public IP address for BitSail to the IP address whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.
Use a local BitSail job to synchronize data to ApsaraDB for SelectDB
Create the
test.jsonconfiguration file and configure the job information. Use the FakeSource class in the BitSail package to construct local data for import.{ "job": { "common": { "job_id": -2413, "job_name": "bitsail_fake_to_selectdb_test", "instance_id": -20413, "user_name": "user" }, "reader": { "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource", "total_count": 300, "rate": 10000, "random_null_rate": 0, "unique_fields": "id", "columns_with_fixed_value": [ { "name": "partition_date", "fixed_value": "2022-10-10" } ], "columns": [ { "index": 0, "name": "id", "type": "long" }, { "index": 1, "name": "bigint_type", "type": "long" }, { "index": 2, "name": "string_type", "type": "string" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date.date" }, { "index": 6, "name": "partition_date", "type": "string" } ] }, "writer": { "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080", "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030", "cluster_name": "new_cluster", "user": "admin", "password": "****", "table_identifier": "test_db.test_table", "columns": [ { "index": 0, "name": "id", "type": "bigint" }, { "index": 1, "name": "bigint_type", "type": "bigint" }, { "index": 2, "name": "string_type", "type": "varchar" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date" }, { "index": 6, "name": "partition_date", "type": "date" } ] } } }Submit the job.
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json