すべてのプロダクト
Search
ドキュメントセンター

ApsaraDB for SelectDB:BitSail を使用してデータをインポートする

最終更新日:Apr 09, 2025

ApsaraDB for SelectDB は BitSail と統合されています。BitSail SelectDB Sink を使用して、テーブルデータを ApsaraDB for SelectDB にインポートできます。このトピックでは、BitSail SelectDB Sink を使用して ApsaraDB for SelectDB にデータを同期する方法について説明します。

概要

BitSail は、分散アーキテクチャに基づく高性能データ統合エンジンです。BitSail は、複数の異種データソース間のデータ同期をサポートし、オフライン、リアルタイム、フル、および増分データ統合シナリオを網羅する包括的なソリューションを提供します。BitSail を使用して、MySQL、Hive、Kafka などのデータソースから大量のデータを読み取り、BitSail SelectDB Sink を使用して ApsaraDB for SelectDB に書き込むことができます。

前提条件

BitSail 0.1.0 以降がインストールされていること。

仕組み

BitSail の job.writer で SelectDB コネクタパラメータを構成します。次のサンプルコードは例を示しています。

{ 
  "job": { 
    "writer": { 
      "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", 
      "load_url": "<selectdb_http_address>", // SelectDB の HTTP アドレス
      "jdbc_url": "<selectdb_mysql_address>", // SelectDB の MySQL アドレス
      "cluster_name": "<selectdb_cluster_name>", // SelectDB のクラスタ名
      "user": "<username>", // ユーザー名
      "password": "<password>", // パスワード
      "table_identifier": "<selectdb_table_identifier>", // SelectDB のテーブル識別子
      "columns": [ 
        { 
          "index": 0, 
          "name": "id", 
          "type": "int" 
        }, 
        { 
          "index": 1, 
          "name": "bigint_type", 
          "type": "bigint" 
        }, 
        { 
          "index": 2, 
          "name": "string_type", 
          "type": "varchar" 
        }, 
        { 
          "index": 3, 
          "name": "double_type", 
          "type": "double" 
        }, 
        { 
          "index": 4, 
          "name": "date_type", 
          "type": "date" 
        } 
      ] 
    } 
  }
}

次の表は、コネクタ構成のパラメータについて説明しています。

パラメータ

必須

説明

class

はい

ApsaraDB for SelectDB 用の書き込みコネクタのタイプ。デフォルト値: com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink

load_url

はい

ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと HTTP ポート。

ApsaraDB for SelectDB インスタンスの仮想プライベートクラウド (VPC) エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、ApsaraDB for SelectDB インスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページで、[ネットワーク情報] セクションの [VPC エンドポイント] または [パブリックエンドポイント] パラメータと [HTTP ポート] パラメータの値を表示します。

例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080

jdbc_url

はい

ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと MySQL ポート。

ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、ApsaraDB for SelectDB インスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページで、[ネットワーク情報] セクションの [VPC エンドポイント] または [パブリックエンドポイント] パラメータと [mysql ポート] パラメータの値を表示します。

例: selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030

cluster_name

はい

ApsaraDB for SelectDB インスタンス内のクラスタの名前。ApsaraDB for SelectDB インスタンスには複数のクラスタが含まれている場合があります。ビジネス要件に基づいてクラスタを選択します。

user

はい

ApsaraDB for SelectDB インスタンスに接続するために使用されるユーザー名。

password

はい

ApsaraDB for SelectDB インスタンスに接続するために使用されるパスワード。

table_identifier

はい

ApsaraDB for SelectDB インスタンス内のテーブルの名前。データベース名.テーブル名 の形式で名前を指定します。例: test_db.test_table

writer_parallelism_num

いいえ

ApsaraDB for SelectDB への同時書き込みの数。

sink_flush_interval_ms

いいえ

UPSERT モードでデータがフラッシュされる間隔。単位: ミリ秒。デフォルト値: 5000。

sink_max_retries

いいえ

データ書き込みの最大再試行回数。デフォルト値: 3。

sink_buffer_size

いいえ

データ書き込みの最大バッファサイズ。単位: バイト。デフォルト値: 1048576 (1 MB)。

sink_buffer_count

いいえ

初期化されるバッファの数。デフォルト値: 3。

sink_enable_delete

いいえ

DELETE イベントを同期するかどうかを指定します。

sink_write_mode

いいえ

書き込みモード。値を BATCH_UPSERT に設定します。

stream_load_properties

いいえ

Map<String,String> 形式で Stream Load URL に追加されるパラメータ。

load_contend_type

いいえ

COPY INTO ステートメントを実行して書き込まれるデータの形式。有効な値: CSV および JSON。デフォルト値: JSON。

csv_field_delimiter

いいえ

CSV 形式のフィールド区切り文字。デフォルトでは、カンマ (,) が使用されます。

csv_line_delimiter

いいえ

CSV 形式の行区切り文字。デフォルトでは、\n が使用されます。

この例では、BitSail を使用してデータソースを構築し、アップストリームデータを ApsaraDB for SelectDB にインポートします。

環境を準備する

  1. BitSail インストールパッケージをダウンロードして解凍します。

    wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz
    tar -zxvf bitsail.tar.gz
  2. ApsaraDB for SelectDB インスタンスを構成します。

    1. ApsaraDB for SelectDB インスタンスを作成します。

    2. MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。

    3. テストデータベースとテストテーブルを作成します。

      1. テストデータベースを作成します。

        CREATE DATABASE test_db;
      2. テストテーブルを作成します。

        CREATE TABLE `test_table` (
          `id` BIGINT(20) NULL,
          `bigint_type` BIGINT(20) NULL,
          `string_type` VARCHAR(100) NULL,
          `double_type` DOUBLE NULL,
          `decimal_type` DECIMALV3(27, 9) NULL,
          `date_type` DATEV2 NULL,
          `partition_date` DATEV2 NULL
        ) ENGINE=OLAP
        DUPLICATE KEY(`id`)
        COMMENT 'OLAP'
        DISTRIBUTED BY HASH(`id`) BUCKETS 10
        PROPERTIES (
          "light_schema_change" = "true"
        );
    4. ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請または解放」をご参照ください。

    5. BitSail のパブリック IP アドレスを ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。

ローカル BitSail ジョブを使用して ApsaraDB for SelectDB にデータを同期する

  1. test.json 構成ファイルを作成し、ジョブ情報を構成します。BitSail パッケージの FakeSource クラスを使用して、インポート用のローカルデータを作成します。

    {
      "job": {
        "common": {
          "job_id": -2413,
          "job_name": "bitsail_fake_to_selectdb_test", // ジョブ名
          "instance_id": -20413,
          "user_name": "user"
        },
        "reader": {
          "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource", // フェイクデータソース
          "total_count": 300, // 生成するレコード数
          "rate": 10000, // レコード生成レート
          "random_null_rate": 0, // null 値の割合
          "unique_fields": "id", // ユニークフィールド
          "columns_with_fixed_value": [
            {
              "name": "partition_date",
              "fixed_value": "2022-10-10" // 固定値を設定
            }
          ],
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "long"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "long"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "string"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date.date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "string"
            }
          ]
        },
        "writer": {
          "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
          "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080", // SelectDB の HTTP アドレス
          "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030", // SelectDB の MySQL アドレス
          "cluster_name": "new_cluster", // クラスタ名
          "user": "admin", // ユーザー名
          "password": "****", // パスワード
          "table_identifier": "test_db.test_table", // テーブル識別子
          "columns": [
            {
              "index": 0,
              "name": "id",
              "type": "bigint"
            },
            {
              "index": 1,
              "name": "bigint_type",
              "type": "bigint"
            },
            {
              "index": 2,
              "name": "string_type",
              "type": "varchar"
            },
            {
              "index": 3,
              "name": "double_type",
              "type": "double"
            },
            {
              "index": 4,
              "name": "decimal_type",
              "type": "double"
            },
            {
              "index": 5,
              "name": "date_type",
              "type": "date"
            },
            {
              "index": 6,
              "name": "partition_date",
              "type": "date"
            }
          ]
        }
      }
    }
  2. ジョブを送信します。

    bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json