SelectDB コネクタは、Realtime Compute for Apache Flink と ApsaraDB for SelectDB を統合します。ApsaraDB for SelectDB は、Alibaba Cloud 上で提供される、Apache Doris と互換性のあるフルマネージドのリアルタイムデータウェアハウスです。このコネクタを使用することで、SelectDB のデータの読み取り、書き込み、ルックアップを行うリアルタイムパイプラインを構築したり、YAML ベースのデータインジェストジョブでデータベース全体の同期を実行したりできます。
サポートされる機能:
| カテゴリ | 詳細 |
|---|---|
| テーブルタイプ | ソーステーブル、結果テーブル、ディメンションテーブル、データインジェストシンク |
| 実行モード | ストリームおよびバッチ |
| データフォーマット | JSON および CSV |
| API タイプ | DataStream および SQL |
| 更新/削除のサポート | はい |
| 監視メトリクス | なし |
主な特徴:
データベース全体のデータ同期
2 フェーズコミット (2PC) による 1 回限りのセマンティクス — レコードの重複や損失なし
Apache Doris 1.0 以降と互換
前提条件
開始する前に、以下が準備できていることを確認してください:
Ververica Runtime (VVR) 8.0.10 以降を搭載した Realtime Compute for Apache Flink
ApsaraDB for SelectDB インスタンス。詳細については、「インスタンスの作成」をご参照ください。
インスタンスに設定された IP アドレスホワイトリスト。詳細については、「ホワイトリストの設定」をご参照ください。
コネクタの設定
SelectDB コネクタは VVR 11.1 以降に組み込まれているため、手動でのインストールは不要です。
VVR 8.0.10 から 11.0 までの場合は、コネクタを手動でインストールしてください:
Maven Central から JAR パッケージをダウンロードします (Flink バージョン 1.15–1.17)。
JAR を Realtime Compute for Apache Flink の開発コンソールにアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。
SQL ジョブで
'connector' = 'doris'を使用してコネクタを参照します。
SQL
構文
ソース、結果、ディメンションの 3 つのテーブルタイプはすべて同じ DDL 構文を共有します。テーブルのロールは、含めるパラメーターによって指定します。
SelectDB をソーステーブルとして使用するには、まずクラスター直接接続を有効にする必要があります。ApsaraDB for SelectDB コンソールで、[インスタンス詳細] > [ネットワーク情報] に移動し、 [クラスター直接接続を有効にする] をクリックします。これにより、高スループットの並列読み取りのための Arrow Flight SQL プロトコルが有効になります。
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);パラメーター
全般
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
connector | はい | — | doris に固定。 |
fenodes | はい | — | SelectDB インスタンスの HTTP エンドポイント: <VPC アドレスまたはパブリックアドレス>:<HTTP プロトコルポート>。両方とも SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] から取得します。例: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。 |
jdbc-url | いいえ | — | ディメンションテーブルのルックアップおよびメタデータクエリ用の Java Database Connectivity (JDBC) 接続文字列: jdbc:mysql://<VPC アドレスまたはパブリックアドレス>:<MySQL プロトコルポート>。例: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030。 |
table.identifier | はい | — | ターゲットテーブルを <database>.<table> 形式で指定します。例: db.tbl。 |
username | はい | — | データベースのユーザー名。必要に応じて、[インスタンス詳細] ページの右上隅からパスワードをリセットしてください。 |
password | はい | — | データベースユーザー名のパスワード。 |
doris.request.retries | いいえ | 3 | 失敗したリクエストの再試行回数。 |
doris.request.connect.timeout | いいえ | 30s | 接続タイムアウト。 |
doris.request.read.timeout | いいえ | 30s | 読み取りタイムアウト。 |
ソーステーブル
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
doris.request.query.timeout | いいえ | 21600s | クエリタイムアウト (デフォルトは 6 時間)。 |
doris.request.tablet.size | いいえ | 1 | パーティションごとのタブレット数。値を小さくすると Flink の並列度が向上しますが、データベースへの負荷が増加します。 |
doris.batch.size | いいえ | 4064 | リクエストごとにバックエンド (BE) ノードから読み取る最大行数。値を大きくすると、接続オーバーヘッドとネットワーク遅延を削減できます。 |
doris.exec.mem.limit | いいえ | 8192mb | クエリごとのメモリ制限 (バイト単位、デフォルトは 8 GB)。 |
source.use-flight-sql | いいえ | false | 設定は不要です — SelectDB コンソールで [クラスター直接接続] を有効にすると、Arrow Flight SQL が自動的に有効になります。 |
source.flight-sql-port | いいえ | — | フロントエンド (FE) ノードの Arrow Flight SQL ポート (arrow_flight_sql_port)。 |
結果テーブル
書き込みモードは、配信保証とフラッシュ動作に影響します。一貫性の要件に基づいて選択してください:
| ストリーミング書き込み | バッチ書き込み | |
|---|---|---|
| トリガー条件 | Flink のチェックポイント間隔に従う | データ量または時間しきい値による定期的なフラッシュ |
| 配信保証 | 1 回限り (2PC 経由) | 1 回以上。Unique モデルでべき等性を実現 |
| レイテンシー | チェックポイント間隔によって制限される | 柔軟で、チェックポイントから独立 |
| フォールトトレランス | 完全な Flink 状態回復 | Unique モデルの重複排除に依存 |
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
sink.label-prefix | いいえ | — | Stream Load インポートのラベルプレフィックス。すべてのジョブでグローバルに一意である必要があります — 同じラベルは 1 回しかコミットできません。ジョブの再起動をまたいで 1 回限りのセマンティクスを保証するために必要です。 |
sink.properties.* | いいえ | — | SelectDB Stream Load API に直接渡される Stream Load インポートパラメーター。以下の例をご参照ください。 |
sink.enable-delete | いいえ | true | DELETE 操作を伝播します。Doris テーブルでバッチ削除が有効になっている必要があり、Unique モデルでのみ機能します。 |
sink.enable-2pc | いいえ | true | 2 フェーズコミット (2PC) を有効にして、1 回限りのセマンティクスを実現します。詳細については、「明示的なトランザクション操作」をご参照ください。 |
sink.buffer-size | いいえ | 1 MB | 書き込みキャッシュバッファーのサイズ (バイト単位)。デフォルト値のままにしてください。 |
sink.buffer-count | いいえ | 3 | 書き込みキャッシュバッファーの数。デフォルト値のままにしてください。 |
sink.max-retries | いいえ | 3 | コミット失敗後の最大再試行回数。 |
sink.enable.batch-mode | いいえ | false | バッチ書き込みモードに切り替えます。フラッシュはチェックポイントではなく、以下の 3 つの sink.buffer-flush.* パラメーターによって制御されます。1 回限りのセマンティクスは保証されません。べき等性を実現するには Unique モデルを使用してください。 |
sink.flush.queue-size | いいえ | 2 | バッチモードでのキャッシュキューのサイズ。 |
sink.buffer-flush.max-rows | いいえ | 500000 | バッチモードでのフラッシュごとの最大行数。 |
sink.buffer-flush.max-bytes | いいえ | 100 MB | バッチモードでのフラッシュごとの最大バイト数。 |
sink.buffer-flush.interval | いいえ | 10s | バッチモードでのフラッシュ間隔。 |
sink.ignore.update-before | いいえ | true | Flink CDC からの update-before イベントを無視します。 |
sink.properties.* の例:
CSV フォーマット:
'sink.properties.column_separator' = ','
-- 値にカンマが含まれる可能性がある場合は、印刷不可能な区切り文字を使用します:
-- 'sink.properties.column_separator' = '\x01'JSON フォーマット:
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true'
-- または: 'sink.properties.strip_outer_array' = 'true'ディメンションテーブル
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
lookup.cache.max-rows | いいえ | -1 | ルックアップキャッシュの最大行数。-1 はキャッシュを無効にします。 |
lookup.cache.ttl | いいえ | 10s | キャッシュエントリの生存時間 (TTL)。 |
lookup.max-retries | いいえ | 1 | ルックアップクエリが失敗した後の再試行回数。 |
lookup.jdbc.async | いいえ | false | 非同期ルックアップを有効にします。 |
lookup.jdbc.read.batch.size | いいえ | 128 | 非同期ルックアップモードでのクエリごとの最大バッチサイズ。 |
lookup.jdbc.read.batch.queue-size | いいえ | 256 | 非同期ルックアップモードでの中間バッファーキューのサイズ。 |
lookup.jdbc.read.thread-size | いいえ | 3 | 非同期ルックアップモードでのタスクごとの JDBC ルックアップスレッド数。 |
例
ソーステーブル
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);結果テーブル
CREATE TEMPORARY TABLE selectdb_sink (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
'sink.label-prefix' = 'flink_orders' -- ジョブ間でグローバルに一意である必要があります
);ディメンションテーブル
SelectDB は、ストリーミングファクトテーブルに対して結合されるルックアップディメンションテーブルとして機能します。
-- Kafka からのファクトテーブル
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` AS proctime()
) WITH (
'connector' = 'kafka',
...
);
-- SelectDB からのディメンションテーブル
CREATE TEMPORARY TABLE dim_city (
`city` STRING,
`level` INT,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
-- テンポラル結合
SELECT a.id, a.name, a.city, c.province, c.country, c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.city;データインジェスト
YAML ベースのデータインジェストジョブで SelectDB コネクタを sink として使用し、データベース全体の同期を行います。
構文
source:
type: <source-type>
sink:
type: doris
name: Doris Sink
fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
username: root
password: ""パラメーター
| パラメーター | 必須 | デフォルト | 説明 |
|---|---|---|---|
type | はい | — | doris に固定。 |
name | いいえ | — | sink の説明的な名前。 |
fenodes | はい | — | HTTP エンドポイント: <VPC アドレスまたはパブリックアドレス>:<HTTP プロトコルポート>。両方とも SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] から取得します。例: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。 |
jdbc-url | いいえ | — | JDBC 接続文字列。例: jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030。 |
username | はい | — | データベースのユーザー名。 |
password | はい | — | データベースユーザー名のパスワード。 |
sink.enable.batch-mode | いいえ | true | データインジェストジョブでは、バッチモードがデフォルトでオンになっています。フラッシュは 3 つの sink.buffer-flush.* パラメーターによって制御されます。1 回限りのセマンティクスは保証されません。べき等性を実現するには Unique モデルを使用してください。 |
sink.flush.queue-size | いいえ | 2 | キャッシュキューのサイズ。 |
sink.buffer-flush.max-rows | いいえ | 500000 | フラッシュごとの最大行数。 |
sink.buffer-flush.max-bytes | いいえ | 100 MB | フラッシュごとの最大バイト数。 |
sink.buffer-flush.interval | いいえ | 10s | フラッシュ間隔。最小: 1s。 |
sink.properties.* | いいえ | — | Stream Load インポートパラメーター。 |
sink.properties.* の例:
CSV フォーマット:
sink.properties.column_separator: ','
# 値にカンマが含まれる可能性がある場合は、印刷不可能な区切り文字を使用します:
# sink.properties.column_separator: '\x01'JSON フォーマット:
sink.properties.format: 'json'
sink.properties.read_json_by_line: 'true'型マッピング
Flink から SelectDB へ
| Flink CDC 型 | SelectDB 型 | 備考 |
|---|---|---|
TINYINT | TINYINT | |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
DECIMAL | DECIMAL | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP[(p)] | DATETIME[(p)] | |
TIMESTAMP_LTZ[(p)] | DATETIME[(p)] | |
CHAR(n) | CHAR(n*3) | SelectDB は文字列を UTF-8 で格納します。英字は 1 バイト、漢字は 3 バイトを占有します。最大 CHAR 長は 255 です。これより長い値は自動的に VARCHAR に変換されます。 |
VARCHAR(n) | VARCHAR(n*3) | 同じ UTF-8 の乗数が適用されます。最大 VARCHAR 長は 65533 です。これより長い値は自動的に STRING に変換されます。 |
BINARY(n) | STRING | |
VARBINARY(n) | STRING | |
STRING | STRING |