すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:SelectDB コネクタ

最終更新日:Aug 07, 2025

このトピックでは、カスタム SelectDB コネクタを使用して ApsaraDB for SelectDB にデータを書き込む方法について説明します。

背景情報

ApsaraDB for SelectDB は、次世代のリアルタイム データウェアハウスサービスです。Alibaba Cloud で完全に管理およびホストされており、Apache Doris と 100% 互換性があります。 ApsaraDB for SelectDB は、大量のデータを分析するというニーズに対応できます。サービスのメリットとユースケースの詳細については、「ApsaraDB for SelectDB とは」をご参照ください。

次の表に、カスタム SelectDB コネクタでサポートされている機能を示します。

項目

説明

サポートされているタイプ

シンクテーブル。データインジェスチョンシンク

実行モード

ストリーミングとバッチ

データ形式

JSON および CSV

メトリクス

該当なし

API

DataStream API および SQL API

シンクでのデータの更新/削除

サポートされています

機能

  • データベースの同期。

  • 重複や欠落がないことを保証する、1 回限りのセマンティクス。

  • Apache Doris 1.0 以後との互換性により、カスタム SelectDB コネクタを介して Apache Doris へのシームレスなデータ同期が可能になります。

使用上の注意

  • Ververica Runtime (VVR) 8.0.10 以後のみが、カスタム SelectDB コネクタをサポートしています。

  • カスタム SelectDB コネクタの使用中に質問がある場合は、ApsaraDB for SelectDB にチケットを送信してください。

  • ApsaraDB for SelectDB にデータを同期するための前提条件は次のとおりです。

SQL

SelectDB コネクタは、SQL ジョブのシンクテーブルとして使用できます。

コネクタのアップロードと構成

説明

VVR 11.1 以降、SelectDB コネクタは組み込みコネクタになるため、次の手順をスキップできます。

  1. JAR ファイルをクリックして、SelectDB コネクタ JAR (バージョン 1.15 ~ 1.17) をダウンロードします。

  2. SelectDB コネクタ JAR を Realtime Compute for Apache Flink コンソールにアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。

  3. SQL ドラフトを作成する、カスタム SelectDB コネクタを使用します。

    connector オプションを doris に設定します。その他のシンクオプションについては、「Doris シンクの構成項目」をご参照ください。

構文

CREATE TABLE selectdb_sink (
  emp_no       INT ,
  birth_date   DATE,
  first_name   STRING,
  last_name    STRING,
  gender       STRING,
  hire_date    DATE
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'test.employees',
  'username' = 'admin',
  'password' = '****',
  'sink.enable-delete' = 'true'
);

データ型マッピング

Doris ドキュメントの Flink Doris コネクタトピックの「型マッピング」セクションを参照してください。

コネクタの使用

このセクションでは、カスタム SelectDB コネクタを使用して、ApsaraDB RDS for MySQL から ApsaraDB for SelectDB にデータを同期する方法について説明します。

  1. データ同期の準備をします。

    1. Flink ワークスペースApsaraDB RDS for MySQL インスタンス、および ApsaraDB for SelectDB インスタンスを作成します。

    2. ApsaraDB RDS for MySQL コンソールで、order_dw_mysql という名前のデータベースと orders という名前のテーブルを作成し、テストデータをテーブルにインポートします。

      CREATE TABLE `orders` (
        order_id bigint not null primary key,
        user_id varchar(50) not null,
        shop_id bigint not null,
        product_id bigint not null,
        buy_fee decimal(20,2) not null,   
        create_time timestamp not null,
        update_time timestamp not null default now(),
        state int not null 
      );
      
      INSERT INTO orders VALUES
      (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
      (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
      (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
      (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
      (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
      (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
      (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    3. DMS を使用して ApsaraDB for SelectDB インスタンスに接続した後、selectdb という名前のデータベースと selecttable という名前のテーブルを作成します。

      CREATE DATABASE selectdb;
      
      CREATE TABLE `selecttable` (
        order_id bigint,
        user_id varchar(50),
        shop_id bigint,
        product_id bigint,
        buy_fee DECIMAL,   
        create_time DATETIME,
        update_time DATETIME,
        state int
       )DISTRIBUTED BY HASH(order_id) BUCKETS 10;
    4. Flink ワークスペースの vSwitch の [CIDR ブロック] を ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを構成する方法」をご参照ください。

  2. Realtime Compute for Apache Flink コンソールで、SQL ジョブを開発して開始します。

    1. mysqlcatalog という名前の MySQL カタログを作成します。詳細については、「MySQL カタログの管理」をご参照ください。

    2. JAR ファイルをクリックして、SelectDB コネクタ (バージョン 1.15 ~ 1.17) JAR をダウンロードし、JAR ファイルをアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。

    3. [開発] > [ETL] に移動し、[新規] をクリックして空のストリームドラフトを作成し、次のコードをドラフトにコピーします。

      CREATE TEMPORARY TABLE  selectdb_sink (
        order_id BIGINT,
        user_id STRING,
        shop_id BIGINT,
        product_id BIGINT,
        buy_fee DECIMAL,   
        create_time TIMESTAMP(6),
        update_time TIMESTAMP(6),
        state int
      ) 
        WITH (
        'connector' = 'doris',
        'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080',
        'table.identifier' = 'selectdb.selecttable',
        'username' = 'admin',
        'password' = '${secret_values.selectdb}',
        'sink.enable-delete' = 'true'
      );
      
      INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;
    4. [デプロイ] をクリックし、初期モードでデプロイメントを開始します。詳細については、「デプロイメントの作成」および「デプロイメントの開始」をご参照ください。

  3. DMS を使用して ApsaraDB for SelectDB インスタンスに接続した後、selecttable テーブルのデータをクエリします。

    SELECT * FROM `selecttable` ;

データインジェスチョン

SelectDB コネクタは、データインジェスチョンシンクとして使用できます。

構文

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink  // Doris シンク
   fenodes: 127.0.0.1:8030
   username: root
   password: ""
   table.create.properties.replication_num: 1

構成オプション

オプション

説明

必須?

データ型

デフォルト値

備考

type

シンクのタイプ。

はい

文字列

デフォルト値なし

doris に設定します。

name

シンク名。

いいえ

文字列

デフォルト値なし

fenodes

ApsaraDB for SelectDB インスタンスのエンドポイントと HTTP ポート。

はい

文字列

デフォルト値なし

SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントを取得するには、ApsaraDB for SelectDB コンソールに移動し、インスタンス名をクリックして、[ネットワーク情報] セクションの情報を確認します。

例: selectdb-sg-***.selectdbfe.ap-southeast-6.rds.aliyuncs.com:8080

benodes

BE HTTP アドレス。

いいえ

文字列

デフォルト値なし

例: 127.0.0.1:8040

jdbc-url

ApsaraDB for SelectDB インスタンスの JDBC 接続情報。

いいえ

文字列

デフォルト値なし

SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、ApsaraDB for SelectDB コンソールに移動し、インスタンス名をクリックして、[ネットワーク情報] セクションの情報を確認します。

例: jdbc:mysql://selectdb-sg-***.selectdbfe.ap-southeast-6.rds.aliyuncs.com:9030

username

ApsaraDB for SelectDB インスタンスのクラスタユーザー名。

はい

文字列

デフォルト値なし

password

ApsaraDB for SelectDB インスタンスのクラスタパスワード。

いいえ

文字列

デフォルト値なし

auto-redirect

ストリームロードリクエストをリダイレクトするかどうかを指定します。有効にすると、ストリームロードは BE 情報を明示的に取得せずに FE を介してデータを書き込みます。

いいえ

文字列

false

FE リダイレクトを介して書き込むかどうか、および BE に直接接続して書き込むかどうか

charset-encoding

HTTP クライアントの文字セットエンコーディング。

いいえ

ブール値

UTF-8

sink.enable.batch-mode

バッチモードを使用して SelectDB に書き込むかどうかを指定します。有効にすると、書き込みはチェックポイントに依存せず、sink.buffer-flush.max-rowssink.buffer-flush.max-bytes、および sink.buffer-flush.interval によって制御されます。

有効にすると、1 回限りのセマンティクスは保証されません。べき等性を実現するには、一意モデルを使用します。

いいえ

ブール値

true

sink.flush.queue-size

バッチ書き込みのキューサイズ。

いいえ

整数

2

sink.buffer-flush.max-rows

単一バッチでフラッシュするレコードの最大数。

いいえ

整数

50000

sink.buffer-flush.max-bytes

単一バッチでフラッシュするバイトの最大数。

いいえ

整数

10485760 (10MB)

sink.buffer-flush.interval

フラッシュ間隔。この時間を超えると、データは非同期にフラッシュされます。最小値: 1 秒。

いいえ

文字列

10s

sink.properties.

Stream Load のパラメータをインポートします。プロパティの構成を入力してください。

  • CSV 形式の場合は、次のように構成します。

    sink.properties.format='csv' 
    sink.properties.column_separator=',' // 列の区切り文字
    sink.properties.line_delimiter='\n' // 行の区切り文字
  • JSON 形式の場合は、次のように構成します。

    sink.properties.format='json' 

いいえ

文字列

デフォルト値なし

例: sink.properties.strict_mode: true。詳細については、「Stream Load を使用してデータをインポートする」をご参照ください。

table.create.properties.*

テーブル作成のプロパティ構成。

いいえ

文字列

デフォルト値なし

例: table.create.properties.replication_num: 1。「Doris テーブルプロパティ」も参照してください。

データ型マッピング

Flink CDC タイプ

SelectDB タイプ

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

説明

Doris では、文字列は UTF-8 エンコードされているため、各英字は 1 バイトを占め、各中国語文字は 3 バイトを占めます。したがって、ここでは長さに 3 を掛けます。CHAR の最大長は 255 です。それを超えると、CHAR は自動的に VARCHAR に変換されます。

VARCHAR(n)

VARCHAR(n*3)

説明

Doris では、文字列は UTF-8 エンコードされているため、各英字は 1 バイトを占め、各中国語文字は 3 バイトを占めます。したがって、ここでは長さに 3 を掛けます。VARCHAR の最大長は 65533 です。それを超えると、VARCHAR は自動的に STRING に変換されます。

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING