このトピックでは、SelectDB コネクタの使用方法について説明します。
背景情報
ApsaraDB for SelectDB は、次世代のリアルタイムデータウェアハウスサービスです。これは Alibaba Cloud 上のフルマネージドサービスであり、Apache Doris と 100% 互換性があります。ApsaraDB for SelectDB を簡単に購入して、大量のデータ分析のニーズを満たすことができます。メリットとシナリオの詳細については、「ApsaraDB for SelectDB とは」をご参照ください。
カスタム SelectDB コネクタは、次の機能をサポートしています:
カテゴリ | 詳細 |
サポートされているタイプ | ソーステーブル、結果テーブル、ディメンションテーブル、およびデータインジェストシンク |
実行モード | ストリームおよびバッチ |
データ形式 | JSON および CSV |
特定の監視メトリック | なし |
API タイプ | DataStream および SQL |
更新/削除のサポート | はい |
機能
データベース全体のデータ同期をサポートします。
SelectDB コネクタは 1回限りのセマンティクス (exactly-once semantics) を提供し、データが重複したり失われたりしないことを保証します。
このコネクタは Apache Doris 1.0 以降と互換性があります。Flink SelectDB カスタムコネクタを使用して、Apache Doris にデータを同期できます。
使用上の注意
Realtime Compute for Apache Flink の Ververica Runtime (VVR) 8.0.10 以降のバージョンのみが、SelectDB カスタムコネクタをサポートします。
SelectDB カスタムコネクタの使用中に問題が発生した場合は、ApsaraDB for SelectDB にチケットを送信してください。
ApsaraDB for SelectDB にデータを同期する前に、次の要件を満たす必要があります:
ApsaraDB for SelectDB インスタンスを作成します。詳細については、「インスタンスの作成」をご参照ください。
IP アドレスホワイトリストを設定します。詳細については、「ホワイトリストの設定」をご参照ください。
SQL
使用方法
SelectDB コネクタは VVR 11.1 以降に組み込まれています。次の手順はスキップできます。
JAR パッケージをクリックして、SelectDB カスタムコネクタ (バージョン 1.15 から 1.17) をダウンロードします。
Realtime Compute for Apache Flink 開発コンソールで、SelectDB カスタムコネクタをアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。
SQL ジョブで SelectDB カスタムコネクタを使用します。connector パラメーターの値は
dorisに固定されています。
構文
コネクタをソーステーブルとして使用するには、クラスター直接接続を有効にして Arrow Flight 機能を使用する必要があります。
ApsaraDB for SelectDB コンソールで、[インスタンス詳細] > [ネットワーク情報] ページに移動し、[クラスターへの直接接続を有効化] をクリックします。
CREATE TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);WITH パラメーター
一般
パラメーター
説明
データ型
必須
デフォルト値
備考
connector
テーブルタイプ。
String
はい
なし
値は
dorisに固定されます。fenodes
ApsaraDB for SelectDB インスタンスのアクセスアドレスと HTTP プロトコルポート。
String
はい
なし
ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [HTTP プロトコルポート] を取得できます。
例:
selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080。jdbc-url
Java Database Connectivity (JDBC) 接続情報。
String
いいえ
なし
ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [MySQL プロトコルポート] を取得できます。
例:
jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030。table.identifier
データベーステーブル名。
String
はい
なし
例:
db.tbl。username
ユーザー名。
String
はい
なし
パスワードを忘れた場合は、ApsaraDB for SelectDB コンソールの [インスタンス詳細] ページの右上隅でリセットできます。
password
パスワード。
String
はい
なし
doris.request.retries
リクエストを送信する際のリトライ回数。
Integer
いいえ
3
なし。
doris.request.connect.timeout
リクエストを送信する際の接続タイムアウト。
Duration
いいえ
30s
なし。
doris.request.read.timeout
リクエストを送信する際の読み取りタイムアウト。
Duration
いいえ
30s
なし。
ソーステーブル固有
パラメーター
説明
データ型
必須
デフォルト値
備考
doris.request.query.timeout
クエリのタイムアウト。デフォルト値は 6 時間です。
Duration
いいえ
21600s
値は
dorisに固定されます。doris.request.tablet.size
パーティションに対応するタブレットの数。
Integer
いいえ
1
値を小さくすると、より多くのパーティションが生成されます。これにより、Flink 側の並列処理の次数は増加しますが、データベースへの負荷も増加します。
doris.batch.size
一度に BE から読み取る最大行数。
Integer
いいえ
4064
この値を増やすと、Flink とデータベース間で確立される接続の数を減らすことができます。これにより、ネットワーク遅延による余分な時間的オーバーヘッドが削減されます。
doris.exec.mem.limit
単一クエリのメモリ制限。
Integer
いいえ
8192mb
デフォルト値は 8 GB です。単位はバイトです。
source.use-flight-sql
読み取りに Arrow Flight SQL を使用するかどうかを指定します。
Boolean
いいえ
false
設定は不要です。ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページに移動し、[クラスターへの直接接続を有効化] をクリックするだけです。
source.flight-sql-port
Arrow Flight SQL で読み取る際の FE の arrow_flight_sql_port。
Integer
いいえ
-
なし。
シンクテーブル固有
パラメーター
説明
データ型
必須
デフォルト値
備考
sink.label-prefix
Stream Load インポートに使用されるラベルのプレフィックス。
String
いいえ
--
Flink の 1回限りのセマンティクスを保証するために、プレフィックスは複数のジョブ間でグローバルに一意である必要があります。同じラベルは、重複書き込みを防ぐために一度しかインポートできません。
sink.properties.*
Stream Load のインポートパラメーター。
String
いいえ
--
CSV フォーマット設定
'sink.properties.column_separator' = ',', -- カンマをデリミタとして使用 -- データにカンマが含まれる可能性がある場合は、次のような印刷不可文字を使用します。 -- 'sink.properties.column_separator' = '\x01'JSON フォーマット設定
'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true' -- または strip_outer_array を使用sink.enable-delete
削除を有効にするかどうかを指定します。このオプションは、Doris テーブルでバッチ削除が有効になっている必要があります。
Boolean
いいえ
true
Unique モデルのみがサポートされています。
sink.enable-2pc
2フェーズコミットプロトコル (2PC) を有効にするかどうかを指定します。
Boolean
いいえ
true
1 回限りのセマンティクスを保証します。2PC の詳細については、「明示的なトランザクション操作」をご参照ください。
sink.buffer-size
データ書き込みキャッシュバッファーのサイズ。
Integer
いいえ
1 MB
単位はバイトです。このパラメーターは変更せず、デフォルト設定を使用することを推奨します。
sink.buffer-count
データ書き込みキャッシュバッファーの数。
Integer
いいえ
3
このパラメーターは変更せず、デフォルト設定を使用することを推奨します。
sink.max-retries
コミットが失敗した後の最大リトライ回数。
Integer
いいえ
3
なし。
sink.enable.batch-mode
書き込みにバッチモードを使用するかどうかを指定します。
Boolean
いいえ
false
有効にすると、書き込みのタイミングはチェックポイントに依存しなくなります。これは
sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.intervalパラメーターによって制御されます。有効にすると、1回限りのセマンティクスは保証されませんが、Uniq モデルを使用してべき等性を実現できます。
sink.flush.queue-size
バッチモードでのキャッシュキューのサイズ。
Integer
いいえ
2
なし。
sink.buffer-flush.max-rows
バッチモードで一度に書き込むデータ行の最大数。
Integer
いいえ
500000
なし。
sink.buffer-flush.max-bytes
バッチモードで一度に書き込む最大バイト数。
Integer
いいえ
100 MB
単位はバイトです。
sink.buffer-flush.interval
バッチモードでキャッシュを非同期にフラッシュする間隔。
String
いいえ
10s
単位はミリ秒です。
sink.ignore.update-before
update-before イベントを無視するかどうかを指定します。
Boolean
いいえ
true
なし。
ディメンションテーブル固有
パラメーター
説明
データ型
必須
デフォルト値
備考
lookup.cache.max-rows
ルックアップキャッシュの最大行数。
Integer
いいえ
-1
値
-1は、デフォルトでキャッシュが無効になっていることを意味します。lookup.cache.ttl
ルックアップキャッシュの最大生存時間 (TTL)。
String
いいえ
10s
単位はミリ秒です。
lookup.max-retries
ルックアップクエリが失敗した後のリトライ回数。
Integer
いいえ
1
なし。
lookup.jdbc.async
非同期ルックアップを有効にするかどうかを指定します。
Boolean
いいえ
false
なし。
lookup.jdbc.read.batch.size
非同期ルックアップにおける各クエリの最大バッチサイズ。
Integer
いいえ
128
なし。
lookup.jdbc.read.batch.queue-size
非同期ルックアップ中の中間バッファーキューのサイズ。
Integer
いいえ
256
なし。
lookup.jdbc.read.thread-size
各タスクの JDBC ルックアップスレッドの数。
Integer
いいえ
3
なし。
使用例
ソーステーブル
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****'
);シンクテーブル
CREATE TEMPORARY TABLE selectdb_source (
order_id BIGINT,
user_id BIGINT,
total_amount DECIMAL(10, 2),
order_status TINYINT,
create_time TIMESTAMP(3),
product_name STRING COMMENT
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'shop_db.orders',
'username' = 'admin',
'password' = '****',
-- 'sink.label-prefix' = 'flink_orders' -- 同じラベルは、重複書き込みを防ぐために一度しかインポートできません。
);ディメンションテーブル
CREATE TEMPORARY TABLE fact_table (
`id` BIGINT,
`name` STRING,
`city` STRING,
`process_time` as proctime()
) WITH (
'connector' = 'kafka',
...
);
create TEMPORARY table dim_city(
`city` STRING,
`level` INT ,
`province` STRING,
`country` STRING
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'jdbc-url' = 'jdbc:mysql://selectdb-cn-***.selectdbfe.rds.aliyuncs.com:9030',
'table.identifier' = 'dim.dim_city',
'username' = 'admin',
'password' = '****'
);
SELECT a.id, a.name, a.city, c.province, c.country,c.level
FROM fact_table a
LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
ON a.city = c.cityデータインジェスチョン
SelectDB コネクタをシンクとして使用し、YAML ジョブでデータを書き込んでデータインジェストを行います。
構文
source:
type: xxx
sink:
type: doris
name: Doris Sink
fenodes: selectdb-cn-****.selectdbfe.rds.aliyuncs.com:8080
username: root
password: ""
設定項目
パラメーター | 説明 | 必須 | デフォルト値 | データ型 | 備考 |
type | シンクのタイプ。 | はい | (なし) | 文字列 | 値は |
name | シンク名。 | いいえ | (なし) | 文字列 | なし。 |
fenodes | ApsaraDB for SelectDB インスタンスのアクセスアドレスと HTTP プロトコルポート。 | はい | (なし) | 文字列 | ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [HTTP プロトコルポート] を取得できます。 例: |
jdbc-url | ApsaraDB for SelectDB インスタンスの JDBC 接続情報。 | いいえ | (なし) | 文字列 | ApsaraDB for SelectDB コンソールの [インスタンス詳細] > [ネットワーク情報] ページから [VPC アドレス] (または [パブリックアドレス]) と [MySQL プロトコルポート] を取得できます。 例: |
username | ApsaraDB for SelectDB インスタンスのデータベースユーザー名。 | はい | (なし) | 文字列 | パスワードを忘れた場合は、ApsaraDB for SelectDB コンソールの [インスタンス詳細] ページの右上隅でリセットできます。 |
password | ApsaraDB for SelectDB インスタンスのデータベースユーザー名に対応するパスワード。 | はい | (なし) | 文字列 | |
sink.enable.batch-mode | SelectDB への書き込みにバッチモードを使用するかどうかを指定します。 | いいえ | true | ブール値 | 有効にすると、書き込みのタイミングはチェックポイントに依存しなくなります。これは 有効にすると、1回限りのセマンティクスは保証されませんが、Uniq モデルを使用してべき等性を実現できます。 |
sink.flush.queue-size | バッチ処理モードでのキャッシュキューのサイズ。 | いいえ | 2 | 整数 | バッチ書き込み用のキューサイズ。 |
sink.buffer-flush.max-rows | バッチ処理モードで一度に書き込むデータ行の最大数。 | いいえ | 500000 | 整数 | なし。 |
sink.buffer-flush.max-bytes | バッチ処理モードで一度に書き込む最大バイト数。 | いいえ | 100 MB | 整数 | なし。 |
sink.buffer-flush.interval | バッチ処理モードでキャッシュを非同期にフラッシュする間隔。最小値は 1s です。 | いいえ | 10s | 文字列 | なし。 |
sink.properties.* | Stream Load のインポートパラメーター。 | いいえ | (なし) | 文字列 | CSV フォーマットの設定 JSON フォーマットの設定 |
型マッピング
Flink CDC 型 | SelectDB 型 |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) 説明 Doris では、文字列は UTF-8 エンコーディングで保存されます。英字は 1 バイト、漢字は 3 バイトを占有するため、長さは 3 倍になります。CHAR 型の最大長は 255 です。この制限を超えると、型は自動的に VARCHAR に変換されます。 |
VARCHAR(n) | VARCHAR(n*3) 説明 上記と同様です。長さは 3 倍になります。VARCHAR 型の最大長は 65533 です。この制限を超えると、型は自動的に STRING に変換されます。 |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |