ApsaraDB for SelectDB は BitSail と統合されています。BitSail SelectDB Sink を使用して、テーブルデータを ApsaraDB for SelectDB にインポートできます。このトピックでは、BitSail SelectDB Sink を使用して ApsaraDB for SelectDB にデータを同期する方法について説明します。
概要
BitSail は、分散アーキテクチャに基づく高性能データ統合エンジンです。BitSail は、複数の異種データソース間のデータ同期をサポートし、オフライン、リアルタイム、フル、および増分データ統合シナリオを網羅する包括的なソリューションを提供します。BitSail を使用して、MySQL、Hive、Kafka などのデータソースから大量のデータを読み取り、BitSail SelectDB Sink を使用して ApsaraDB for SelectDB に書き込むことができます。
前提条件
BitSail 0.1.0 以降がインストールされていること。
仕組み
BitSail の job.writer で SelectDB コネクタパラメータを構成します。次のサンプルコードは例を示しています。
{
"job": {
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "<selectdb_http_address>", // SelectDB の HTTP アドレス
"jdbc_url": "<selectdb_mysql_address>", // SelectDB の MySQL アドレス
"cluster_name": "<selectdb_cluster_name>", // SelectDB のクラスタ名
"user": "<username>", // ユーザー名
"password": "<password>", // パスワード
"table_identifier": "<selectdb_table_identifier>", // SelectDB のテーブル識別子
"columns": [
{
"index": 0,
"name": "id",
"type": "int"
},
{
"index": 1,
"name": "bigint_type",
"type": "bigint"
},
{
"index": 2,
"name": "string_type",
"type": "varchar"
},
{
"index": 3,
"name": "double_type",
"type": "double"
},
{
"index": 4,
"name": "date_type",
"type": "date"
}
]
}
}
}次の表は、コネクタ構成のパラメータについて説明しています。
パラメータ | 必須 | 説明 |
class | はい | ApsaraDB for SelectDB 用の書き込みコネクタのタイプ。デフォルト値: |
load_url | はい | ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと HTTP ポート。 ApsaraDB for SelectDB インスタンスの仮想プライベートクラウド (VPC) エンドポイントまたはパブリックエンドポイントと HTTP ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、ApsaraDB for SelectDB インスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページで、[ネットワーク情報] セクションの [VPC エンドポイント] または [パブリックエンドポイント] パラメータと [HTTP ポート] パラメータの値を表示します。 例: |
jdbc_url | はい | ApsaraDB for SelectDB インスタンスにアクセスするために使用されるエンドポイントと MySQL ポート。 ApsaraDB for SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、次の操作を実行します。ApsaraDB for SelectDB コンソールにログオンし、ApsaraDB for SelectDB インスタンスの [インスタンスの詳細] ページに移動します。[基本情報] ページで、[ネットワーク情報] セクションの [VPC エンドポイント] または [パブリックエンドポイント] パラメータと [mysql ポート] パラメータの値を表示します。 例: |
cluster_name | はい | ApsaraDB for SelectDB インスタンス内のクラスタの名前。ApsaraDB for SelectDB インスタンスには複数のクラスタが含まれている場合があります。ビジネス要件に基づいてクラスタを選択します。 |
user | はい | ApsaraDB for SelectDB インスタンスに接続するために使用されるユーザー名。 |
password | はい | ApsaraDB for SelectDB インスタンスに接続するために使用されるパスワード。 |
table_identifier | はい | ApsaraDB for SelectDB インスタンス内のテーブルの名前。 |
writer_parallelism_num | いいえ | ApsaraDB for SelectDB への同時書き込みの数。 |
sink_flush_interval_ms | いいえ | UPSERT モードでデータがフラッシュされる間隔。単位: ミリ秒。デフォルト値: 5000。 |
sink_max_retries | いいえ | データ書き込みの最大再試行回数。デフォルト値: 3。 |
sink_buffer_size | いいえ | データ書き込みの最大バッファサイズ。単位: バイト。デフォルト値: 1048576 (1 MB)。 |
sink_buffer_count | いいえ | 初期化されるバッファの数。デフォルト値: 3。 |
sink_enable_delete | いいえ | DELETE イベントを同期するかどうかを指定します。 |
sink_write_mode | いいえ | 書き込みモード。値を BATCH_UPSERT に設定します。 |
stream_load_properties | いいえ | Map<String,String> 形式で Stream Load URL に追加されるパラメータ。 |
load_contend_type | いいえ | COPY INTO ステートメントを実行して書き込まれるデータの形式。有効な値: CSV および JSON。デフォルト値: JSON。 |
csv_field_delimiter | いいえ | CSV 形式のフィールド区切り文字。デフォルトでは、カンマ (,) が使用されます。 |
csv_line_delimiter | いいえ | CSV 形式の行区切り文字。デフォルトでは、\n が使用されます。 |
例
この例では、BitSail を使用してデータソースを構築し、アップストリームデータを ApsaraDB for SelectDB にインポートします。
環境を準備する
BitSail インストールパッケージをダウンロードして解凍します。
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gzApsaraDB for SelectDB インスタンスを構成します。
ApsaraDB for SelectDB インスタンスを作成します。
MySQL プロトコルを介して ApsaraDB for SelectDB インスタンスに接続します。詳細については、「インスタンスへの接続」をご参照ください。
テストデータベースとテストテーブルを作成します。
テストデータベースを作成します。
CREATE DATABASE test_db;テストテーブルを作成します。
CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" );
ApsaraDB for SelectDB インスタンスのパブリックエンドポイントを申請します。詳細については、「パブリックエンドポイントの申請または解放」をご参照ください。
BitSail のパブリック IP アドレスを ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを設定する」をご参照ください。
ローカル BitSail ジョブを使用して ApsaraDB for SelectDB にデータを同期する
test.json構成ファイルを作成し、ジョブ情報を構成します。BitSail パッケージの FakeSource クラスを使用して、インポート用のローカルデータを作成します。{ "job": { "common": { "job_id": -2413, "job_name": "bitsail_fake_to_selectdb_test", // ジョブ名 "instance_id": -20413, "user_name": "user" }, "reader": { "class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource", // フェイクデータソース "total_count": 300, // 生成するレコード数 "rate": 10000, // レコード生成レート "random_null_rate": 0, // null 値の割合 "unique_fields": "id", // ユニークフィールド "columns_with_fixed_value": [ { "name": "partition_date", "fixed_value": "2022-10-10" // 固定値を設定 } ], "columns": [ { "index": 0, "name": "id", "type": "long" }, { "index": 1, "name": "bigint_type", "type": "long" }, { "index": 2, "name": "string_type", "type": "string" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date.date" }, { "index": 6, "name": "partition_date", "type": "string" } ] }, "writer": { "class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink", "load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080", // SelectDB の HTTP アドレス "jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030", // SelectDB の MySQL アドレス "cluster_name": "new_cluster", // クラスタ名 "user": "admin", // ユーザー名 "password": "****", // パスワード "table_identifier": "test_db.test_table", // テーブル識別子 "columns": [ { "index": 0, "name": "id", "type": "bigint" }, { "index": 1, "name": "bigint_type", "type": "bigint" }, { "index": 2, "name": "string_type", "type": "varchar" }, { "index": 3, "name": "double_type", "type": "double" }, { "index": 4, "name": "decimal_type", "type": "double" }, { "index": 5, "name": "date_type", "type": "date" }, { "index": 6, "name": "partition_date", "type": "date" } ] } } }ジョブを送信します。
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.json