全部產品
Search
文件中心

ApsaraDB for SelectDB:通過BitSail匯入資料

更新時間:Jul 06, 2024

ApsaraDB for SelectDB整合BitSail,支援使用SelectDB Sink匯入表資料至ApsaraDB for SelectDB。本文將為您介紹使用SelectDB Sink同步資料至ApsaraDB for SelectDB的使用方式。

概述

BitSail是一款基於分布式架構的高效能資料整合引擎,支援多種異構資料來源間的資料同步,並提供離線、即時、全量、增量情境下的全域Data Integration解決方案。您可以通過BitSail引擎讀取MySQL、Hive、Kafka等資料來源中的海量資料,然後由SelectDB Sink將資料寫入到ApsaraDB for SelectDB

前提條件

BitSail 0.1.0版本及以上。

使用方式

BitSail的job.writer中配置SelectDB連接器參數,樣本如下。

{ 
  "job": { 
    "writer": { 
      "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", 
      "load_url": "<selectdb_http_address>", 
      "jdbc_url": "<selectdb_mysql_address>", 
      "cluster_name": "<selectdb_cluster_name>", 
      "user": "<username>", 
      "password": "<password>", 
      "table_identifier": "<selectdb_table_identifier>", 
      "columns": [ 
        { 
          "index": 0, 
          "name": "id", 
          "type": "int" 
        }, 
        { 
          "index": 1, 
          "name": "bigint_type", 
          "type": "bigint" 
        }, 
        { 
          "index": 2, 
          "name": "string_type", 
          "type": "varchar" 
        }, 
        { 
          "index": 3, 
          "name": "double_type", 
          "type": "double" 
        }, 
        { 
          "index": 4, 
          "name": "date_type", 
          "type": "date" 
        } 
      ] 
    } 
  }
}

參數說明如下。

參數名稱

是否必填

參數含義

class

ApsaraDB for SelectDB寫連接器類型,預設為:com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink

load_url

ApsaraDB for SelectDB執行個體的訪問地址和HTTP協議連接埠。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和HTTP協議連接埠

樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

jdbc_url

ApsaraDB for SelectDB執行個體的訪問地址和MySQL協議連接埠。

您可以從ApsaraDB for SelectDB控制台的執行個體詳情 > 網路資訊中擷取VPC地址(或公網地址)和MySQL協議連接埠

樣本:selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

cluster_name

ApsaraDB for SelectDB執行個體的叢集名。執行個體中可能包含多個叢集,可按需選擇。

user

ApsaraDB for SelectDB執行個體的使用者名稱。

password

ApsaraDB for SelectDB執行個體對應使用者名稱的密碼。

table_identifier

ApsaraDB for SelectDB執行個體的表名,格式為庫名.表名。樣本:test_db.test_table

writer_parallelism_num

指定SelectDB寫並發數量。

sink_flush_interval_ms

Upsert模式下的flush間隔,單位毫秒;預設為:5000。

sink_max_retries

寫入的最大重試次數,預設為:3。

sink_buffer_size

寫入buffer最大值,單位位元組;預設為:1048576 (1 MB)。

sink_buffer_count

初始化buffer的數量,預設為:3。

sink_enable_delete

是否支援Delete事件同步。

sink_write_mode

寫入模式,目前僅支援BATCH_UPSERT。

stream_load_properties

追加在Stream Load URL後的參數,以Map<String,String>格式組成。

load_contend_type

copy-into使用的格式。取值範圍為CSV或JSON。預設JSON。

csv_field_delimiter

CSV格式的行內分隔字元,預設為:逗號","。

csv_line_delimiter

CSV格式的行間分隔字元,預設為:"\n"。

使用樣本

下文使用BItSail構造資料來源為例,為您介紹如何通過BitSail將上遊資料匯入至ApsaraDB for SelectDB

環境準備

  1. 配置BitSail環境,下載並解壓BitSail安裝包。

    wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz
    tar -zxvf bitsail.tar.gz
  2. 配置ApsaraDB for SelectDB執行個體。

    1. 建立ApsaraDB for SelectDB執行個體,詳情請參見建立執行個體

    2. 通過MySQL協議串連ApsaraDB for SelectDB執行個體,詳情請參見串連執行個體

    3. 建立測試資料庫和測試表。

      1. 建立測試資料庫。

        CREATE DATABASE test_db;
      2. 建立測試表。

        CREATE TABLE `test_table` (
          `id` BIGINT(20) NULL,
          `bigint_type` BIGINT(20) NULL,
          `string_type` VARCHAR(100) NULL,
          `double_type` DOUBLE NULL,
          `decimal_type` DECIMALV3(27, 9) NULL,
          `date_type` DATEV2 NULL,
          `partition_date` DATEV2 NULL
        ) ENGINE=OLAP
        DUPLICATE KEY(`id`)
        COMMENT 'OLAP'
        DISTRIBUTED BY HASH(`id`) BUCKETS 10
        PROPERTIES (
          "light_schema_change" = "true"
        );
    4. 開通ApsaraDB for SelectDB公網地址,詳情請參見申請和釋放公網地址

    5. 將BitSail環境的公網IP添加到IP白名單中,詳情請參見設定白名單

通過BitSail本地任務同步資料到SelectDB

  1. 建立設定檔test.json,配置任務資訊。通過bitsail包中的FakeSource類構造本機資料進行匯入。

    {
      "job": {
        "common": {
          "job_id": -2413,
          "job_name": "bitsail_fake_to_selectdb_test",
          "instance_id": -20413,
          "user_name": "user"
        },
        "reader": {
          "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
          "total_count": 300,
          "rate": 10000,
          "random_null_rate": 0,
          "unique_fields": "id",
          "columns_with_fixed_value": [
            {
              "name": "partition_date",
              "fixed_value": "2022-10-10"
            }
          ],
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "long"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "long"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "string"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date.date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "string"
            }
          ]
        },
        "writer": {
          "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
          "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080",
          "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030",
          "cluster_name": "new_cluster",
          "user": "admin",
          "password": "****",
          "table_identifier": "test_db.test_table",
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "bigint"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "bigint"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "varchar"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "date"
            }
          ]
        }
      }
    }
  2. 命令列提交任務。

    bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json