すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:SelectDB

最終更新日:Mar 26, 2026

SelectDB コネクタは、Realtime Compute for Apache Flink と ApsaraDB for SelectDB を統合します。ApsaraDB for SelectDB は、Alibaba Cloud 上で提供される、Apache Doris と互換性のあるフルマネージドのリアルタイムデータウェアハウスです。このコネクタを使用することで、SelectDB のデータの読み取り、書き込み、ルックアップを行うリアルタイムパイプラインを構築したり、YAML ベースのデータインジェストジョブでデータベース全体の同期を実行したりできます。

サポートされる機能:

カテゴリ詳細
テーブルタイプソーステーブル、結果テーブル、ディメンションテーブル、データインジェストシンク
実行モードストリームおよびバッチ
データフォーマットJSON および CSV
API タイプDataStream および SQL
更新/削除のサポートはい
監視メトリクスなし

主な特徴:

  • データベース全体のデータ同期

  • 2 フェーズコミット (2PC) による 1 回限りのセマンティクス — レコードの重複や損失なし

  • Apache Doris 1.0 以降と互換

前提条件

開始する前に、以下が準備できていることを確認してください:

  • Ververica Runtime (VVR) 8.0.10 以降を搭載した Realtime Compute for Apache Flink

  • ApsaraDB for SelectDB インスタンス。詳細については、「インスタンスの作成」をご参照ください。

  • インスタンスに設定された IP アドレスホワイトリスト。詳細については、「ホワイトリストの設定」をご参照ください。

コネクタの設定

SelectDB コネクタは VVR 11.1 以降に組み込まれているため、手動でのインストールは不要です。

VVR 8.0.10 から 11.0 までの場合は、コネクタを手動でインストールしてください:

  1. Maven Central から JAR パッケージをダウンロードします (Flink バージョン 1.15–1.17)。

  2. JAR を Realtime Compute for Apache Flink の開発コンソールにアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。

  3. SQL ジョブで 'connector' = 'doris' を使用してコネクタを参照します。

SQL

構文

ソース、結果、ディメンションの 3 つのテーブルタイプはすべて同じ DDL 構文を共有します。テーブルのロールは、含めるパラメーターによって指定します。

SelectDB をソーステーブルとして使用するには、まずクラスター直接接続を有効にする必要があります。ApsaraDB for SelectDB コンソールで、[インスタンス詳細] > [ネットワーク情報] に移動し、 [クラスター直接接続を有効にする] をクリックします。これにより、高スループットの並列読み取りのための Arrow Flight SQL プロトコルが有効になります。
CREATE TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****'
);

パラメーター

全般

パラメーター必須デフォルト説明
connectorはいdoris に固定。
fenodesはいSelectDB インスタンスの HTTP エンドポイント: <VPC アドレスまたはパブリックアドレス>:<HTTP プロトコルポート>。両方とも SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] から取得します。例: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
jdbc-urlいいえディメンションテーブルのルックアップおよびメタデータクエリ用の Java Database Connectivity (JDBC) 接続文字列: jdbc:mysql://<VPC アドレスまたはパブリックアドレス>:<MySQL プロトコルポート>。例: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030
table.identifierはいターゲットテーブルを <database>.<table> 形式で指定します。例: db.tbl
usernameはいデータベースのユーザー名。必要に応じて、[インスタンス詳細] ページの右上隅からパスワードをリセットしてください。
passwordはいデータベースユーザー名のパスワード。
doris.request.retriesいいえ3失敗したリクエストの再試行回数。
doris.request.connect.timeoutいいえ30s接続タイムアウト。
doris.request.read.timeoutいいえ30s読み取りタイムアウト。

ソーステーブル

パラメーター必須デフォルト説明
doris.request.query.timeoutいいえ21600sクエリタイムアウト (デフォルトは 6 時間)。
doris.request.tablet.sizeいいえ1パーティションごとのタブレット数。値を小さくすると Flink の並列度が向上しますが、データベースへの負荷が増加します。
doris.batch.sizeいいえ4064リクエストごとにバックエンド (BE) ノードから読み取る最大行数。値を大きくすると、接続オーバーヘッドとネットワーク遅延を削減できます。
doris.exec.mem.limitいいえ8192mbクエリごとのメモリ制限 (バイト単位、デフォルトは 8 GB)。
source.use-flight-sqlいいえfalse設定は不要です — SelectDB コンソールで [クラスター直接接続] を有効にすると、Arrow Flight SQL が自動的に有効になります。
source.flight-sql-portいいえフロントエンド (FE) ノードの Arrow Flight SQL ポート (arrow_flight_sql_port)。

結果テーブル

書き込みモードは、配信保証とフラッシュ動作に影響します。一貫性の要件に基づいて選択してください:

ストリーミング書き込みバッチ書き込み
トリガー条件Flink のチェックポイント間隔に従うデータ量または時間しきい値による定期的なフラッシュ
配信保証1 回限り (2PC 経由)1 回以上。Unique モデルでべき等性を実現
レイテンシーチェックポイント間隔によって制限される柔軟で、チェックポイントから独立
フォールトトレランス完全な Flink 状態回復Unique モデルの重複排除に依存
パラメーター必須デフォルト説明
sink.label-prefixいいえStream Load インポートのラベルプレフィックス。すべてのジョブでグローバルに一意である必要があります — 同じラベルは 1 回しかコミットできません。ジョブの再起動をまたいで 1 回限りのセマンティクスを保証するために必要です。
sink.properties.*いいえSelectDB Stream Load API に直接渡される Stream Load インポートパラメーター。以下の例をご参照ください。
sink.enable-deleteいいえtrueDELETE 操作を伝播します。Doris テーブルでバッチ削除が有効になっている必要があり、Unique モデルでのみ機能します。
sink.enable-2pcいいえtrue2 フェーズコミット (2PC) を有効にして、1 回限りのセマンティクスを実現します。詳細については、「明示的なトランザクション操作」をご参照ください。
sink.buffer-sizeいいえ1 MB書き込みキャッシュバッファーのサイズ (バイト単位)。デフォルト値のままにしてください。
sink.buffer-countいいえ3書き込みキャッシュバッファーの数。デフォルト値のままにしてください。
sink.max-retriesいいえ3コミット失敗後の最大再試行回数。
sink.enable.batch-modeいいえfalseバッチ書き込みモードに切り替えます。フラッシュはチェックポイントではなく、以下の 3 つの sink.buffer-flush.* パラメーターによって制御されます。1 回限りのセマンティクスは保証されません。べき等性を実現するには Unique モデルを使用してください。
sink.flush.queue-sizeいいえ2バッチモードでのキャッシュキューのサイズ。
sink.buffer-flush.max-rowsいいえ500000バッチモードでのフラッシュごとの最大行数。
sink.buffer-flush.max-bytesいいえ100 MBバッチモードでのフラッシュごとの最大バイト数。
sink.buffer-flush.intervalいいえ10sバッチモードでのフラッシュ間隔。
sink.ignore.update-beforeいいえtrueFlink CDC からの update-before イベントを無視します。

sink.properties.* の例:

CSV フォーマット:

'sink.properties.column_separator' = ','
-- 値にカンマが含まれる可能性がある場合は、印刷不可能な区切り文字を使用します:
-- 'sink.properties.column_separator' = '\x01'

JSON フォーマット:

'sink.properties.format'            = 'json',
'sink.properties.read_json_by_line' = 'true'
-- または: 'sink.properties.strip_outer_array' = 'true'

ディメンションテーブル

パラメーター必須デフォルト説明
lookup.cache.max-rowsいいえ-1ルックアップキャッシュの最大行数。-1 はキャッシュを無効にします。
lookup.cache.ttlいいえ10sキャッシュエントリの生存時間 (TTL)。
lookup.max-retriesいいえ1ルックアップクエリが失敗した後の再試行回数。
lookup.jdbc.asyncいいえfalse非同期ルックアップを有効にします。
lookup.jdbc.read.batch.sizeいいえ128非同期ルックアップモードでのクエリごとの最大バッチサイズ。
lookup.jdbc.read.batch.queue-sizeいいえ256非同期ルックアップモードでの中間バッファーキューのサイズ。
lookup.jdbc.read.thread-sizeいいえ3非同期ルックアップモードでのタスクごとの JDBC ルックアップスレッド数。

ソーステーブル

CREATE TEMPORARY TABLE selectdb_source (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****'
);

結果テーブル

CREATE TEMPORARY TABLE selectdb_sink (
  order_id      BIGINT,
  user_id       BIGINT,
  total_amount  DECIMAL(10, 2),
  order_status  TINYINT,
  create_time   TIMESTAMP(3),
  product_name  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'table.identifier' = 'shop_db.orders',
  'username'         = 'admin',
  'password'         = '****',
  'sink.label-prefix' = 'flink_orders'  -- ジョブ間でグローバルに一意である必要があります
);

ディメンションテーブル

SelectDB は、ストリーミングファクトテーブルに対して結合されるルックアップディメンションテーブルとして機能します。

-- Kafka からのファクトテーブル
CREATE TEMPORARY TABLE fact_table (
  `id`           BIGINT,
  `name`         STRING,
  `city`         STRING,
  `process_time` AS proctime()
) WITH (
  'connector' = 'kafka',
  ...
);

-- SelectDB からのディメンションテーブル
CREATE TEMPORARY TABLE dim_city (
  `city`     STRING,
  `level`    INT,
  `province` STRING,
  `country`  STRING
) WITH (
  'connector'        = 'doris',
  'fenodes'          = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
  'jdbc-url'         = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
  'table.identifier' = 'dim.dim_city',
  'username'         = 'admin',
  'password'         = '****'
);

-- テンポラル結合
SELECT a.id, a.name, a.city, c.province, c.country, c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city;

データインジェスト

YAML ベースのデータインジェストジョブで SelectDB コネクタを sink として使用し、データベース全体の同期を行います。

構文

source:
  type: <source-type>

sink:
  type: doris
  name: Doris Sink
  fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
  username: root
  password: ""

パラメーター

パラメーター必須デフォルト説明
typeはいdoris に固定。
nameいいえsink の説明的な名前。
fenodesはいHTTP エンドポイント: <VPC アドレスまたはパブリックアドレス>:<HTTP プロトコルポート>。両方とも SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] から取得します。例: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
jdbc-urlいいえJDBC 接続文字列。例: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030
usernameはいデータベースのユーザー名。
passwordはいデータベースユーザー名のパスワード。
sink.enable.batch-modeいいえtrueデータインジェストジョブでは、バッチモードがデフォルトでオンになっています。フラッシュは 3 つの sink.buffer-flush.* パラメーターによって制御されます。1 回限りのセマンティクスは保証されません。べき等性を実現するには Unique モデルを使用してください。
sink.flush.queue-sizeいいえ2キャッシュキューのサイズ。
sink.buffer-flush.max-rowsいいえ500000フラッシュごとの最大行数。
sink.buffer-flush.max-bytesいいえ100 MBフラッシュごとの最大バイト数。
sink.buffer-flush.intervalいいえ10sフラッシュ間隔。最小: 1s
sink.properties.*いいえStream Load インポートパラメーター。

sink.properties.* の例:

CSV フォーマット:

sink.properties.column_separator: ','
# 値にカンマが含まれる可能性がある場合は、印刷不可能な区切り文字を使用します:
# sink.properties.column_separator: '\x01'

JSON フォーマット:

sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true'

型マッピング

Flink から SelectDB へ

Flink CDC 型SelectDB 型備考
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
BOOLEANBOOLEAN
DATEDATE
TIMESTAMP[(p)]DATETIME[(p)]
TIMESTAMP_LTZ[(p)]DATETIME[(p)]
CHAR(n)CHAR(n*3)SelectDB は文字列を UTF-8 で格納します。英字は 1 バイト、漢字は 3 バイトを占有します。最大 CHAR 長は 255 です。これより長い値は自動的に VARCHAR に変換されます。
VARCHAR(n)VARCHAR(n*3)同じ UTF-8 の乗数が適用されます。最大 VARCHAR 長は 65533 です。これより長い値は自動的に STRING に変換されます。
BINARY(n)STRING
VARBINARY(n)STRING
STRINGSTRING

次のステップ