このトピックでは、カスタム SelectDB コネクタを使用して ApsaraDB for SelectDB にデータを書き込む方法について説明します。
背景情報
ApsaraDB for SelectDB は、次世代のリアルタイム データウェアハウスサービスです。Alibaba Cloud で完全に管理およびホストされており、Apache Doris と 100% 互換性があります。 ApsaraDB for SelectDB は、大量のデータを分析するというニーズに対応できます。サービスのメリットとユースケースの詳細については、「ApsaraDB for SelectDB とは」をご参照ください。
次の表に、カスタム SelectDB コネクタでサポートされている機能を示します。
項目 | 説明 |
サポートされているタイプ | シンクテーブル。データインジェスチョンシンク |
実行モード | ストリーミングとバッチ |
データ形式 | JSON および CSV |
メトリクス | 該当なし |
API | DataStream API および SQL API |
シンクでのデータの更新/削除 | サポートされています |
機能
データベースの同期。
重複や欠落がないことを保証する、1 回限りのセマンティクス。
Apache Doris 1.0 以後との互換性により、カスタム SelectDB コネクタを介して Apache Doris へのシームレスなデータ同期が可能になります。
使用上の注意
Ververica Runtime (VVR) 8.0.10 以後のみが、カスタム SelectDB コネクタをサポートしています。
カスタム SelectDB コネクタの使用中に質問がある場合は、ApsaraDB for SelectDB にチケットを送信してください。
ApsaraDB for SelectDB にデータを同期するための前提条件は次のとおりです。
ApsaraDB for SelectDB インスタンスが作成されます。詳細については、「インスタンスの作成」をご参照ください。
IP アドレスのホワイトリストが構成されています。詳細については、「IP アドレスホワイトリストの構成」をご参照ください。
SQL
SelectDB コネクタは、SQL ジョブのシンクテーブルとして使用できます。
コネクタのアップロードと構成
VVR 11.1 以降、SelectDB コネクタは組み込みコネクタになるため、次の手順をスキップできます。
JAR ファイルをクリックして、SelectDB コネクタ JAR (バージョン 1.15 ~ 1.17) をダウンロードします。
SelectDB コネクタ JAR を Realtime Compute for Apache Flink コンソールにアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。
SQL ドラフトを作成する、カスタム SelectDB コネクタを使用します。
connectorオプションをdorisに設定します。その他のシンクオプションについては、「Doris シンクの構成項目」をご参照ください。
構文
CREATE TABLE selectdb_sink (
emp_no INT ,
birth_date DATE,
first_name STRING,
last_name STRING,
gender STRING,
hire_date DATE
) WITH (
'connector' = 'doris',
'fenodes' = 'selectdb-cn-*******.selectdbfe.rds.aliyuncs.com:8080',
'table.identifier' = 'test.employees',
'username' = 'admin',
'password' = '****',
'sink.enable-delete' = 'true'
);データ型マッピング
Doris ドキュメントの Flink Doris コネクタトピックの「型マッピング」セクションを参照してください。
コネクタの使用
このセクションでは、カスタム SelectDB コネクタを使用して、ApsaraDB RDS for MySQL から ApsaraDB for SelectDB にデータを同期する方法について説明します。
データ同期の準備をします。
Flink ワークスペース、ApsaraDB RDS for MySQL インスタンス、および ApsaraDB for SelectDB インスタンスを作成します。
ApsaraDB RDS for MySQL コンソールで、
order_dw_mysqlという名前のデータベースとordersという名前のテーブルを作成し、テストデータをテーブルにインポートします。CREATE TABLE `orders` ( order_id bigint not null primary key, user_id varchar(50) not null, shop_id bigint not null, product_id bigint not null, buy_fee decimal(20,2) not null, create_time timestamp not null, update_time timestamp not null default now(), state int not null ); INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1), (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1), (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1), (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1), (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1), (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1), (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);DMS を使用して ApsaraDB for SelectDB インスタンスに接続した後、
selectdbという名前のデータベースとselecttableという名前のテーブルを作成します。CREATE DATABASE selectdb; CREATE TABLE `selecttable` ( order_id bigint, user_id varchar(50), shop_id bigint, product_id bigint, buy_fee DECIMAL, create_time DATETIME, update_time DATETIME, state int )DISTRIBUTED BY HASH(order_id) BUCKETS 10;Flink ワークスペースの vSwitch の [CIDR ブロック] を ApsaraDB for SelectDB インスタンスの IP アドレスホワイトリストに追加します。詳細については、「IP アドレスホワイトリストを構成する方法」をご参照ください。
Realtime Compute for Apache Flink コンソールで、SQL ジョブを開発して開始します。
mysqlcatalogという名前の MySQL カタログを作成します。詳細については、「MySQL カタログの管理」をご参照ください。JAR ファイルをクリックして、SelectDB コネクタ (バージョン 1.15 ~ 1.17) JAR をダウンロードし、JAR ファイルをアップロードします。詳細については、「カスタムコネクタの管理」をご参照ください。
に移動し、[新規] をクリックして空のストリームドラフトを作成し、次のコードをドラフトにコピーします。
CREATE TEMPORARY TABLE selectdb_sink ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee DECIMAL, create_time TIMESTAMP(6), update_time TIMESTAMP(6), state int ) WITH ( 'connector' = 'doris', 'fenodes' = 'selectdb-cn-jfj3z******.selectdbfe.rds.aliyuncs.com:8080', 'table.identifier' = 'selectdb.selecttable', 'username' = 'admin', 'password' = '${secret_values.selectdb}', 'sink.enable-delete' = 'true' ); INSERT INTO selectdb_sink SELECT * FROM `mysqlcatalog`.`order_dw_mysql`.`orders`;[デプロイ] をクリックし、初期モードでデプロイメントを開始します。詳細については、「デプロイメントの作成」および「デプロイメントの開始」をご参照ください。
DMS を使用して ApsaraDB for SelectDB インスタンスに接続した後、
selecttableテーブルのデータをクエリします。SELECT * FROM `selecttable` ;
データインジェスチョン
SelectDB コネクタは、データインジェスチョンシンクとして使用できます。
構文
source:
type: xxx
sink:
type: doris
name: Doris Sink // Doris シンク
fenodes: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1
構成オプション
オプション | 説明 | 必須? | データ型 | デフォルト値 | 備考 |
| シンクのタイプ。 | はい | 文字列 | デフォルト値なし |
|
| シンク名。 | いいえ | 文字列 | デフォルト値なし | |
| ApsaraDB for SelectDB インスタンスのエンドポイントと HTTP ポート。 | はい | 文字列 | デフォルト値なし | SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントを取得するには、ApsaraDB for SelectDB コンソールに移動し、インスタンス名をクリックして、[ネットワーク情報] セクションの情報を確認します。 例: |
| BE HTTP アドレス。 | いいえ | 文字列 | デフォルト値なし | 例: |
| ApsaraDB for SelectDB インスタンスの JDBC 接続情報。 | いいえ | 文字列 | デフォルト値なし | SelectDB インスタンスの VPC エンドポイントまたはパブリックエンドポイントと MySQL ポートを取得するには、ApsaraDB for SelectDB コンソールに移動し、インスタンス名をクリックして、[ネットワーク情報] セクションの情報を確認します。 例: |
| ApsaraDB for SelectDB インスタンスのクラスタユーザー名。 | はい | 文字列 | デフォルト値なし | |
| ApsaraDB for SelectDB インスタンスのクラスタパスワード。 | いいえ | 文字列 | デフォルト値なし | |
| ストリームロードリクエストをリダイレクトするかどうかを指定します。有効にすると、ストリームロードは BE 情報を明示的に取得せずに FE を介してデータを書き込みます。 | いいえ | 文字列 |
| FE リダイレクトを介して書き込むかどうか、および BE に直接接続して書き込むかどうか |
| HTTP クライアントの文字セットエンコーディング。 | いいえ | ブール値 |
| |
| バッチモードを使用して SelectDB に書き込むかどうかを指定します。有効にすると、書き込みはチェックポイントに依存せず、 有効にすると、1 回限りのセマンティクスは保証されません。べき等性を実現するには、一意モデルを使用します。 | いいえ | ブール値 |
| |
| バッチ書き込みのキューサイズ。 | いいえ | 整数 |
| |
| 単一バッチでフラッシュするレコードの最大数。 | いいえ | 整数 |
| |
| 単一バッチでフラッシュするバイトの最大数。 | いいえ | 整数 |
| |
| フラッシュ間隔。この時間を超えると、データは非同期にフラッシュされます。最小値: 1 秒。 | いいえ | 文字列 | 10s | |
sink.properties. | Stream Load のパラメータをインポートします。プロパティの構成を入力してください。
| いいえ | 文字列 | デフォルト値なし | 例: |
| テーブル作成のプロパティ構成。 | いいえ | 文字列 | デフォルト値なし | 例: |
データ型マッピング
Flink CDC タイプ | SelectDB タイプ |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
BOOLEAN | BOOLEAN |
DATE | DATE |
TIMESTAMP [(p)] | DATETIME [(p)] |
TIMESTAMP_LTZ [(p)] | DATETIME [(p)] |
CHAR(n) | CHAR(n*3) 説明 Doris では、文字列は UTF-8 エンコードされているため、各英字は 1 バイトを占め、各中国語文字は 3 バイトを占めます。したがって、ここでは長さに 3 を掛けます。CHAR の最大長は 255 です。それを超えると、CHAR は自動的に VARCHAR に変換されます。 |
VARCHAR(n) | VARCHAR(n*3) 説明 Doris では、文字列は UTF-8 エンコードされているため、各英字は 1 バイトを占め、各中国語文字は 3 バイトを占めます。したがって、ここでは長さに 3 を掛けます。VARCHAR の最大長は 65533 です。それを超えると、VARCHAR は自動的に STRING に変換されます。 |
BINARY(n) | STRING |
VARBINARY(N) | STRING |
STRING | STRING |