All Products
Search
Document Center

ApsaraDB for SelectDB:Use BitSail to import data

更新时间:Jun 06, 2024

ApsaraDB for SelectDB is integrated with BitSail. You can use BitSail SelectDB Sink to import table data to ApsaraDB for SelectDB. This topic describes how to use BitSail SelectDB Sink to synchronize data to ApsaraDB for SelectDB.

Overview

BitSail is a high-performance data integration engine based on the distributed architecture. BitSail supports data synchronization between multiple heterogeneous data sources and provides comprehensive solutions that cover offline, real-time, full, and incremental data integration scenarios. You can use BitSail to read large amounts of data from data sources such as MySQL, Hive, and Kafka, and then use BitSail SelectDB Sink to write the data to ApsaraDB for SelectDB.

Prerequisites

BitSail 0.1.0 or later is installed.

How it works

Configure the SelectDB connector parameters in job.writer of BitSail. The following sample code provides an example:

{ 
  "job": { 
    "writer": { 
      "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", 
      "load_url": "<selectdb_http_address>", 
      "jdbc_url": "<selectdb_mysql_address>", 
      "cluster_name": "<selectdb_cluster_name>", 
      "user": "<username>", 
      "password": "<password>", 
      "table_identifier": "<selectdb_table_identifier>", 
      "columns": [ 
        { 
          "index": 0, 
          "name": "id", 
          "type": "int" 
        }, 
        { 
          "index": 1, 
          "name": "bigint_type", 
          "type": "bigint" 
        }, 
        { 
          "index": 2, 
          "name": "string_type", 
          "type": "varchar" 
        }, 
        { 
          "index": 3, 
          "name": "double_type", 
          "type": "double" 
        }, 
        { 
          "index": 4, 
          "name": "date_type", 
          "type": "date" 
        } 
      ] 
    } 
  }
}

The following table describes the parameters in the connector configurations.

Parameter

Required

Description

class

Yes

The type of the write connector for ApsaraDB for SelectDB. Default value: com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink.

load_url

Yes

The endpoint and HTTP port that are used to access the ApsaraDB for SelectDB instance.

To obtain the virtual private cloud (VPC) endpoint or public endpoint and HTTP port of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the ApsaraDB for SelectDB instance. On the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the HTTP Port parameter in the Network Information section.

Example: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080.

jdbc_url

Yes

The endpoint and MySQL port that are used to access the ApsaraDB for SelectDB instance.

To obtain the VPC endpoint or public endpoint and MySQL port of the ApsaraDB for SelectDB instance, perform the following operations: Log on to the ApsaraDB for SelectDB console and go to the Instance Details page of the ApsaraDB for SelectDB instance. On the Basic Information page, view the values of the VPC Endpoint or Public Endpoint parameter and the MySQL Port parameter in the Network Information section.

Example: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030.

cluster_name

Yes

The name of the cluster in the ApsaraDB for SelectDB instance. An ApsaraDB for SelectDB instance may contain multiple clusters. Select a cluster based on your business requirements.

user

Yes

The username that is used to connect to the ApsaraDB for SelectDB instance.

password

Yes

The password that is used to connect to the ApsaraDB for SelectDB instance.

table_identifier

Yes

The name of the table in the ApsaraDB for SelectDB instance. Specify the name in the format of Database name.Table name. Example: test_db.test_table.

writer_parallelism_num

No

The number of concurrent writes to ApsaraDB for SelectDB.

sink_flush_interval_ms

No

The interval at which data is flushed in UPSERT mode. Unit: millisecond. Default value: 5000.

sink_max_retries

No

The maximum number of retries to write data. Default value: 3.

sink_buffer_size

No

The maximum buffer size for data writing. Unit: bytes. Default value: 1048576, which is equal to 1 MB.

sink_buffer_count

No

The number of buffers to be initialized. Default value: 3.

sink_enable_delete

No

Specifies whether to synchronize DELETE events.

sink_write_mode

No

The write mode. Set the value to BATCH_UPSERT.

stream_load_properties

No

The parameters that are appended to the Stream Load URL in the Map<String,String> format.

load_contend_type

No

The format of the data that is written by executing the COPY INTO statement. Valid values: CSV and JSON. Default value: JSON.

csv_field_delimiter

No

The field delimiter for the CSV format. By default, a comma (,) is used.

csv_line_delimiter

No

The row delimiter for the CSV format. By default, \n is used.

Example

In this example, BitSail is used to construct a data source and import upstream data to ApsaraDB for SelectDB.

Prepare the environment

  1. Download and decompress the BitSail installation package.

    wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz
    tar -zxvf bitsail.tar.gz
  2. Configure an ApsaraDB for SelectDB instance.

    1. Create an ApsaraDB for SelectDB instance.

    2. Connect to the ApsaraDB for SelectDB instance over the MySQL protocol. For more information, see Connect to an instance.

    3. Create a test database and a test table.

      1. Create a test database.

        CREATE DATABASE test_db;
      2. Create a test table.

        CREATE TABLE `test_table` (
          `id` BIGINT(20) NULL,
          `bigint_type` BIGINT(20) NULL,
          `string_type` VARCHAR(100) NULL,
          `double_type` DOUBLE NULL,
          `decimal_type` DECIMALV3(27, 9) NULL,
          `date_type` DATEV2 NULL,
          `partition_date` DATEV2 NULL
        ) ENGINE=OLAP
        DUPLICATE KEY(`id`)
        COMMENT 'OLAP'
        DISTRIBUTED BY HASH(`id`) BUCKETS 10
        PROPERTIES (
          "light_schema_change" = "true"
        );
    4. Apply for a public endpoint for the ApsaraDB for SelectDB instance. For more information, see Apply for or release a public endpoint.

    5. Add the public IP address for BitSail to the IP address whitelist of the ApsaraDB for SelectDB instance. For more information, see Configure an IP address whitelist.

Use a local BitSail job to synchronize data to ApsaraDB for SelectDB

  1. Create the test.json configuration file and configure the job information. Use the FakeSource class in the BitSail package to construct local data for import.

    {
      "job": {
        "common": {
          "job_id": -2413,
          "job_name": "bitsail_fake_to_selectdb_test",
          "instance_id": -20413,
          "user_name": "user"
        },
        "reader": {
          "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
          "total_count": 300,
          "rate": 10000,
          "random_null_rate": 0,
          "unique_fields": "id",
          "columns_with_fixed_value": [
            {
              "name": "partition_date",
              "fixed_value": "2022-10-10"
            }
          ],
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "long"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "long"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "string"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date.date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "string"
            }
          ]
        },
        "writer": {
          "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
          "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080",
          "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030",
          "cluster_name": "new_cluster",
          "user": "admin",
          "password": "****",
          "table_identifier": "test_db.test_table",
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "bigint"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "bigint"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "varchar"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "date"
            }
          ]
        }
      }
    }
  2. Submit the job.

    bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json