BitSail は、MySQL、Hive、Kafka などの異種データソース間で、オフラインおよびリアルタイム、全量および増分同期をサポートする分散型 Data Integration エンジンです。ApsaraDB for SelectDB は、SelectDB Sink コネクタを介して BitSail と統合され、Stream Load HTTP API を通じてご利用の SelectDB インスタンスに直接データを書き込みます。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
BitSail 0.1.0 以降がインストール済みであること
ApsaraDB for SelectDB インスタンスが、少なくとも 1 つのクラスターを含む状態であること
SelectDB インスタンスの HTTP ポートおよび MySQL ポート(「接続情報の取得」を参照)
仕組み
SelectDB Sink コネクタは、BitSail ジョブ構成ファイル内の job.writer セクションを読み取り、データを SelectDB にストリーミングします。各書き込みジョブでは、以下の処理が実行されます。
ご提供いただいた認証情報を使用して、SelectDB インスタンスに対する認証を行います。
設定されたバッファサイズに達するか、またはフラッシュ間隔が経過するまで、受信レコードをバッファーに一時保存します。
バッファーに格納されたデータを、COPY INTO 文(デフォルトで JSON 形式)または Stream Load を使用して対象テーブルに送信します。
書き込み失敗時に、設定された最大リトライ回数に達するまで再試行し、その後ジョブを失敗と見なします。
接続情報の取得
コネクタで必要となるエンドポイントおよびポート値を取得するには、以下の手順を実行します。
ApsaraDB for SelectDB コンソール にログインします。
ご利用のインスタンスの インスタンス詳細 ページに移動します。
基本情報 ページで、ネットワーク情報 セクションを確認します。
VPC エンドポイント または パブリックエンドポイント の値に加え、HTTP ポート および MySQL ポート の値をコピーします。
SelectDB Sink コネクタの構成
BitSail ジョブ構成ファイルに job.writer ブロックを追加し、以下のパラメーターを設定します。
必須パラメーター
| パラメーター | 説明 | 例 |
|---|---|---|
class | 書き込みコネクタのクラス。必ず com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink を指定します。 | com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink |
load_url | SelectDB インスタンスのエンドポイントおよび HTTP ポート。 | selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080 |
jdbc_url | SelectDB インスタンスのエンドポイントおよび MySQL ポート。 | selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030 |
cluster_name | SelectDB インスタンス内のクラスター名。 | new_cluster |
user | SelectDB インスタンスへの接続に使用するユーザー名。 | admin |
password | SelectDB インスタンスへの接続に使用するパスワード。 | — |
table_identifier | <database>.<table> 形式の対象テーブル。 | test_db.test_table |
columns | 対象テーブルのカラム定義(インデックス、名前、型を含む)。 | 下記の例を参照してください。 |
任意パラメーター
書き込み動作
| パラメーター | デフォルト | 説明 |
|---|---|---|
sink_write_mode | — | 書き込みモード。バッチアップサートモードを有効にする場合は BATCH_UPSERT を指定します。 |
sink_flush_interval_ms | 5000 | アップサートモードにおいて、バッファーに格納されたデータを SelectDB にフラッシュする頻度(ミリ秒単位)。 |
sink_buffer_size | 1048576(1 MB) | 1 回の書き込みにおける最大バッファーサイズ(バイト単位)。 |
sink_buffer_count | 3 | 初期化する書き込みバッファーの数。 |
sink_max_retries | 3 | 書き込み失敗時の最大リトライ回数。 |
sink_enable_delete | — | DELETE イベントを SelectDB に伝播させる場合は true を指定します。 |
writer_parallelism_num | — | 並列書き込みタスクの数。 |
データ形式
| パラメーター | デフォルト | 説明 |
|---|---|---|
load_contend_type | JSON | COPY INTO 文で使用される形式。有効な値: CSV、JSON。 |
csv_field_delimiter | , | load_contend_type が CSV の場合のフィールド区切り文字。 |
csv_line_delimiter | \n | load_contend_type が CSV の場合の行区切り文字。 |
stream_load_properties | — | Stream Load URL に付加される追加プロパティ(Map<String,String> 形式)。 |
SelectDB への合成データのインポート
この例では、BitSail の組み込み FakeSource コネクタを使用して合成レコードを生成し、SelectDB のテーブルに書き込みます。実際のデータソースに接続する前に、コネクタ構成の検証にご利用ください。
ステップ 1:環境の準備
BitSail インストールパッケージをダウンロードして展開します。
wget feilun-justtmp.oss-cn-hongkong.aliyuncs.com/bitsail.tar.gz tar -zxvf bitsail.tar.gzApsaraDB for SelectDB コンソールで、以下の操作を完了します。
まだ SelectDB インスタンスをお持ちでない場合は、新規作成します。
インスタンスへの接続 を MySQL プロトコル経由で行います。
テスト用データベースおよびテーブルを作成します。
CREATE DATABASE test_db; CREATE TABLE `test_table` ( `id` BIGINT(20) NULL, `bigint_type` BIGINT(20) NULL, `string_type` VARCHAR(100) NULL, `double_type` DOUBLE NULL, `decimal_type` DECIMALV3(27, 9) NULL, `date_type` DATEV2 NULL, `partition_date` DATEV2 NULL ) ENGINE=OLAP DUPLICATE KEY(`id`) COMMENT 'OLAP' DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ( "light_schema_change" = "true" );インスタンスに対してパブリックエンドポイントを申請します。
BitSail ホストの IP アドレスを追加するをインスタンスの IP アドレスホワイトリストに追加します。
ステップ 2:ジョブ構成の作成
以下の内容で test.json というファイルを作成します。load_url、jdbc_url、cluster_name、user、および password の値を、実際の値に置き換えてください。
{
"job": {
"common": {
"job_id": -2413,
"job_name": "bitsail_fake_to_selectdb_test",
"instance_id": -20413,
"user_name": "user"
},
"reader": {
"class": "com.bytedance.bitsail.connector.legacy.fake.source.FakeSource",
"total_count": 300,
"rate": 10000,
"random_null_rate": 0,
"unique_fields": "id",
"columns_with_fixed_value": [
{
"name": "partition_date",
"fixed_value": "2022-10-10"
}
],
"columns": [
{ "index": 0, "name": "id", "type": "long" },
{ "index": 1, "name": "bigint_type", "type": "long" },
{ "index": 2, "name": "string_type", "type": "string" },
{ "index": 3, "name": "double_type", "type": "double" },
{ "index": 4, "name": "decimal_type", "type": "double" },
{ "index": 5, "name": "date_type", "type": "date.date" },
{ "index": 6, "name": "partition_date", "type": "string" }
]
},
"writer": {
"class": "com.bytedance.bitsail.connector.selectdb.sink.SelectdbSink",
"load_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:8080",
"jdbc_url": "selectdb-cn-4xl3jv1****.selectdbfe.rds.aliyuncs.com:9030",
"cluster_name": "new_cluster",
"user": "admin",
"password": "****",
"table_identifier": "test_db.test_table",
"columns": [
{ "index": 0, "name": "id", "type": "bigint" },
{ "index": 1, "name": "bigint_type", "type": "bigint" },
{ "index": 2, "name": "string_type", "type": "varchar" },
{ "index": 3, "name": "double_type", "type": "double" },
{ "index": 4, "name": "decimal_type", "type": "double" },
{ "index": 5, "name": "date_type", "type": "date" },
{ "index": 6, "name": "partition_date", "type": "date" }
]
}
}
}ステップ 3:ジョブの実行
bash bin/bitsail run --engine flink --execution-mode run --deployment-mode local --conf test.jsonジョブが正常に完了した場合、対象テーブルをクエリして、レコードが正しく書き込まれていることを確認します。
SELECT COUNT(*) FROM test_db.test_table;結果として、300 行が返されることを確認してください。