すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:SelectDB

最終更新日:Jan 16, 2026

このトピックでは、SelectDB コネクタの使用方法について説明します。

背景情報

ApsaraDB for SelectDB は、次世代のリアルタイムデータウェアハウスサービスです。これは Alibaba Cloud 上のフルマネージドサービスであり、Apache Doris と 100% 互換性があります。ApsaraDB for SelectDB を簡単に購入して、大量のデータ分析のニーズを満たすことができます。メリットとシナリオの詳細については、「ApsaraDB for SelectDB とは」をご参照ください。

カスタム SelectDB コネクタは、次の機能をサポートしています:

カテゴリ

詳細

サポートされているタイプ

ソーステーブル、結果テーブル、ディメンションテーブル、およびデータインジェストシンク

実行モード

ストリームおよびバッチ

データ形式

JSON および CSV

特定の監視メトリック

なし

API タイプ

DataStream および SQL

更新/削除のサポート

はい

機能

  • データベース全体のデータ同期をサポートします。

  • SelectDB コネクタは 1回限りのセマンティクス (exactly-once semantics) を提供し、データが重複したり失われたりしないことを保証します。

  • このコネクタは Apache Doris 1.0 以降と互換性があります。Flink SelectDB カスタムコネクタを使用して、Apache Doris にデータを同期できます。

使用上の注意

  • Realtime Compute for Apache Flink の Ververica Runtime (VVR) 8.0.10 以降のバージョンのみが、SelectDB カスタムコネクタをサポートします。

  • SelectDB カスタムコネクタの使用中に問題が発生した場合は、ApsaraDB for SelectDB にチケットを送信してください。

  • ApsaraDB for SelectDB にデータを同期する前に、次の要件を満たす必要があります:

    • ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。

    • IP アドレスホワイトリストを設定します。詳細については、「ホワイトリストの設定」をご参照ください。

SQL

使用方法

説明

SelectDB コネクタは VVR 11.1 以降に組み込まれています。次の手順はスキップできます。

  1. JAR パッケージをクリックして、SelectDB カスタムコネクタ (バージョン 1.15 から 1.17) をダウンロードします。

  2. Realtime Compute for Apache Flink 開発コンソールで、SelectDB カスタムコネクタをアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。

  3. SQL ジョブで SelectDB カスタムコネクタを使用します。connector パラメーターの値は doris に固定されています。

構文

説明

コネクタをソーステーブルとして使用するには、クラスター直接接続を有効にして Arrow Flight 機能を使用する必要があります。

ApsaraDB for SelectDB コンソールで、[インスタンス詳細] > [ネットワーク情報] ページに移動し、[クラスターへの直接接続を有効化] をクリックします。

CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

WITH パラメーター

  • 一般

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    connector

    テーブルタイプ。

    String

    はい

    なし

    値は doris に固定されます。

    fenodes

    ApsaraDB for SelectDB インスタンスのアクセスアドレスと HTTP プロトコルポート。

    String

    はい

    なし

    ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [HTTP プロトコルポート] を取得できます。

    例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

    jdbc-url

    Java Database Connectivity (JDBC) 接続情報。

    String

    いいえ

    なし

    ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [MySQL プロトコルポート] を取得できます。

    例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

    table.identifier

    データベーステーブル名。

    String

    はい

    なし

    例:db.tbl

    username

    ユーザー名。

    String

    はい

    なし

    パスワードを忘れた場合は、ApsaraDB for SelectDB コンソールの [インスタンス詳細] ページの右上隅でリセットできます。

    password

    パスワード。

    String

    はい

    なし

    doris.request.retries

    リクエストを送信する際のリトライ回数。

    Integer

    いいえ

    3

    なし。

    doris.request.connect.timeout

    リクエストを送信する際の接続タイムアウト。

    Duration

    いいえ

    30s

    なし。

    doris.request.read.timeout

    リクエストを送信する際の読み取りタイムアウト。

    Duration

    いいえ

    30s

    なし。

  • ソーステーブル固有

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    doris.request.query.timeout

    クエリのタイムアウト。デフォルト値は 6 時間です。

    Duration

    いいえ

    21600s

    値は doris に固定されます。

    doris.request.tablet.size

    パーティションに対応するタブレットの数。

    Integer

    いいえ

    1

    値を小さくすると、より多くのパーティションが生成されます。これにより、Flink 側の並列処理の次数は増加しますが、データベースへの負荷も増加します。

    doris.batch.size

    一度に BE から読み取る最大行数。

    Integer

    いいえ

    4064

    この値を増やすと、Flink とデータベース間で確立される接続の数を減らすことができます。これにより、ネットワーク遅延による余分な時間的オーバーヘッドが削減されます。

    doris.exec.mem.limit

    単一クエリのメモリ制限。

    Integer

    いいえ

    8192mb

    デフォルト値は 8 GB です。単位はバイトです。

    source.use-flight-sql

    読み取りに Arrow Flight SQL を使用するかどうかを指定します。

    Boolean

    いいえ

    false

    設定は不要です。ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページに移動し、[クラスターへの直接接続を有効化] をクリックするだけです。

    source.flight-sql-port

    Arrow Flight SQL で読み取る際の FE の arrow_flight_sql_port。

    Integer

    いいえ

    -

    なし。

  • シンクテーブル固有

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    sink.label-prefix

    Stream Load インポートに使用されるラベルのプレフィックス。

    String

    いいえ

    --

    Flink の 1回限りのセマンティクスを保証するために、プレフィックスは複数のジョブ間でグローバルに一意である必要があります。同じラベルは、重複書き込みを防ぐために一度しかインポートできません。

    sink.properties.*

    Stream Load のインポートパラメーター。

    String

    いいえ

    --

    CSV フォーマット設定

    'sink.properties.column_separator' = ',', -- カンマをデリミタとして使用
    -- データにカンマが含まれる可能性がある場合は、次のような印刷不可文字を使用します。
    -- 'sink.properties.column_separator' = '\x01'

    JSON フォーマット設定

    'sink.properties.format' = 'json',
    'sink.properties.read_json_by_line' = 'true' -- または strip_outer_array を使用

    sink.enable-delete

    削除を有効にするかどうかを指定します。このオプションは、Doris テーブルでバッチ削除が有効になっている必要があります。

    Boolean

    いいえ

    true

    Unique モデルのみがサポートされています。

    sink.enable-2pc

    2フェーズコミットプロトコル (2PC) を有効にするかどうかを指定します。

    Boolean

    いいえ

    true

    1 回限りのセマンティクスを保証します。2PC の詳細については、「明示的なトランザクション操作」をご参照ください。

    sink.buffer-size

    データ書き込みキャッシュバッファーのサイズ。

    Integer

    いいえ

    1 MB

    単位はバイトです。このパラメーターは変更せず、デフォルト設定を使用することを推奨します。

    sink.buffer-count

    データ書き込みキャッシュバッファーの数。

    Integer

    いいえ

    3

    このパラメーターは変更せず、デフォルト設定を使用することを推奨します。

    sink.max-retries

    コミットが失敗した後の最大リトライ回数。

    Integer

    いいえ

    3

    なし。

    sink.enable.batch-mode

    書き込みにバッチモードを使用するかどうかを指定します。

    Boolean

    いいえ

    false

    有効にすると、書き込みのタイミングはチェックポイントに依存しなくなります。これは sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval パラメーターによって制御されます。

    有効にすると、1回限りのセマンティクスは保証されませんが、Uniq モデルを使用してべき等性を実現できます。

    sink.flush.queue-size

    バッチモードでのキャッシュキューのサイズ。

    Integer

    いいえ

    2

    なし。

    sink.buffer-flush.max-rows

    バッチモードで一度に書き込むデータ行の最大数。

    Integer

    いいえ

    500000

    なし。

    sink.buffer-flush.max-bytes

    バッチモードで一度に書き込む最大バイト数。

    Integer

    いいえ

    100 MB

    単位はバイトです。

    sink.buffer-flush.interval

    バッチモードでキャッシュを非同期にフラッシュする間隔。

    String

    いいえ

    10s

    単位はミリ秒です。

    sink.ignore.update-before

    update-before イベントを無視するかどうかを指定します。

    Boolean

    いいえ

    true

    なし。

  • ディメンションテーブル固有

    パラメーター

    説明

    データ型

    必須

    デフォルト値

    備考

    lookup.cache.max-rows

    ルックアップキャッシュの最大行数。

    Integer

    いいえ

    -1

    -1 は、デフォルトでキャッシュが無効になっていることを意味します。

    lookup.cache.ttl

    ルックアップキャッシュの最大生存時間 (TTL)。

    String

    いいえ

    10s

    単位はミリ秒です。

    lookup.max-retries

    ルックアップクエリが失敗した後のリトライ回数。

    Integer

    いいえ

    1

    なし。

    lookup.jdbc.async

    非同期ルックアップを有効にするかどうかを指定します。

    Boolean

    いいえ

    false

    なし。

    lookup.jdbc.read.batch.size

    非同期ルックアップにおける各クエリの最大バッチサイズ。

    Integer

    いいえ

    128

    なし。

    lookup.jdbc.read.batch.queue-size

    非同期ルックアップ中の中間バッファーキューのサイズ。

    Integer

    いいえ

    256

    なし。

    lookup.jdbc.read.thread-size

    各タスクの JDBC ルックアップスレッドの数。

    Integer

    いいえ

    3

    なし。

使用例

ソーステーブル

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****'
);

シンクテーブル

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING COMMENT
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username' = 'admin',
  'password' = '****',
--  'sink.label-prefix' = 'flink_orders' -- 同じラベルは、重複書き込みを防ぐために一度しかインポートできません。
);

ディメンションテーブル

CREATE TEMPORARY TABLE fact_table (
  `id` BIGINT,
  `name` STRING,
  `city` STRING,
  `process_time` as proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

create TEMPORARY table dim_city(
  `city` STRING,
  `level` INT ,
  `province` STRING,
  `country` STRING
) WITH (
  'connector' = 'doris',
  'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username' = 'admin',
  'password' = '****'
);

SELECT a.id, a.name, a.city, c.province, c.country,c.level 
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city

データインジェスチョン

SelectDB コネクタをシンクとして使用し、YAML ジョブでデータを書き込んでデータインジェストを行います。

構文

source:
   type: xxx

sink:
   type: doris
   name: Doris Sink
   fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
   username: root
   password: ""

設定項目

パラメーター

説明

必須

デフォルト値

データ型

備考

type

シンクのタイプ。

はい

(なし)

文字列

値は doris に固定されます。

name

シンク名。

いいえ

(なし)

文字列

なし。

fenodes

ApsaraDB for SelectDB インスタンスのアクセスアドレスと HTTP プロトコルポート。

はい

(なし)

文字列

ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [HTTP プロトコルポート] を取得できます。

例:selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080

jdbc-url

ApsaraDB for SelectDB インスタンスの JDBC 接続情報。

いいえ

(なし)

文字列

ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [MySQL プロトコルポート] を取得できます。

例:jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030

username

ApsaraDB for SelectDB インスタンスのデータベースユーザー名。

はい

(なし)

文字列

パスワードを忘れた場合は、ApsaraDB for SelectDB コンソールの [インスタンス詳細] ページの右上隅でリセットできます。

password

ApsaraDB for SelectDB インスタンスのデータベースユーザー名に対応するパスワード。

はい

(なし)

文字列

sink.enable.batch-mode

SelectDB への書き込みにバッチモードを使用するかどうかを指定します。

いいえ

true

ブール値

有効にすると、書き込みのタイミングはチェックポイントに依存しなくなります。これは sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval パラメーターによって制御されます。

有効にすると、1回限りのセマンティクスは保証されませんが、Uniq モデルを使用してべき等性を実現できます。

sink.flush.queue-size

バッチ処理モードでのキャッシュキューのサイズ。

いいえ

2

整数

バッチ書き込み用のキューサイズ。

sink.buffer-flush.max-rows

バッチ処理モードで一度に書き込むデータ行の最大数。

いいえ

500000

整数

なし。

sink.buffer-flush.max-bytes

バッチ処理モードで一度に書き込む最大バイト数。

いいえ

100 MB

整数

なし。

sink.buffer-flush.interval

バッチ処理モードでキャッシュを非同期にフラッシュする間隔。最小値は 1s です。

いいえ

10s

文字列

なし。

sink.properties.*

Stream Load のインポートパラメーター。

いいえ

(なし)

文字列

CSV フォーマットの設定

sink.properties.column_separator: ',' # カンマをデリミタとして使用
# データにカンマが含まれる可能性がある場合は、次のような印刷不可文字を使用します。
# sink.properties.column_separator: '\x01'

JSON フォーマットの設定

sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true' # または strip_outer_array を使用

型マッピング

Flink CDC 型

SelectDB 型

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

DECIMAL

DECIMAL

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP [(p)]

DATETIME [(p)]

TIMESTAMP_LTZ [(p)]

DATETIME [(p)]

CHAR(n)

CHAR(n*3)

説明

Doris では、文字列は UTF-8 エンコーディングで保存されます。英字は 1 バイト、漢字は 3 バイトを占有するため、長さは 3 倍になります。CHAR 型の最大長は 255 です。この制限を超えると、型は自動的に VARCHAR に変換されます。

VARCHAR(n)

VARCHAR(n*3)

説明

上記と同様です。長さは 3 倍になります。VARCHAR 型の最大長は 65533 です。この制限を超えると、型は自動的に STRING に変換されます。

BINARY(n)

STRING

VARBINARY(N)

STRING

STRING

STRING