本トピックでは、StarRocks コネクタの使用方法について説明します。
背景情報
StarRocks は、あらゆるシナリオで非常に高速なクエリパフォーマンスを提供する、新世代の大規模並列処理 (MPP) データウェアハウスです。StarRocks は、非常に高速で統一された分析エクスペリエンスを提供することに特化しています。StarRocks には、次の利点があります。
MySQL プロトコルと互換性があります。MySQL クライアントまたは一般的なビジネスインテリジェンス (BI) ツールを使用して StarRocks にアクセスし、データ分析を行うことができます。
分散アーキテクチャを使用しており、次の機能を提供します。
テーブルを水平に分割し、複数のレプリカにデータを保存します。
クラスターを柔軟にスケーリングして、10 PB のデータ分析をサポートします。
MPP アーキテクチャをサポートし、データコンピューティングを高速化します。
複数のレプリカをサポートし、フォールトトレランスを確保します。
Flink コネクタはデータをキャッシュし、Stream Load を使用してデータをバッチでインポートして結果テーブルを生成し、データをバッチで読み取ってソーステーブルを生成します。次の表に、StarRocks コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | ソーステーブル、ディメンションテーブル、結果テーブル、データインジェストシンク |
実行モード | ストリーミングモード、バッチモード |
データ形式 | CSV |
メトリック | N/A |
API タイプ | DataStream API、SQL API、データインジェスト用の YAML API |
結果テーブルでのデータの更新または削除 | サポート済み |
前提条件
StarRocks クラスターが作成されていること。StarRocks クラスターは、E-MapReduce (EMR) の StarRocks クラスター、または Elastic Compute Service (ECS) インスタンス上でホストされている自己管理の StarRocks クラスターのいずれかです。
制限事項
StarRocks コネクタは、at-least-once セマンティクスと exactly-once セマンティクスのみをサポートします。
Ververica Runtime (VVR) 11.1 以降のみが、StarRocks ディメンションテーブルとのルックアップ結合をサポートします。
ネットワーク制限を防ぐため、ご利用の StarRocks クラスターのセキュリティグループまたはファイアウォールで、次のポートをホワイトリストに登録してください:
9030、8030、8040、9060、8060、9020。
SQL 文
機能
E-MapReduce (EMR) の StarRocks は、CREATE TABLE AS (CTAS) 文と CREATE DATABASE AS (CDAS) 文をサポートしています。CREATE TABLE AS 文は、単一テーブルのスキーマとデータを同期するために使用できます。CREATE DATABASE AS 文は、データベース全体のデータ、または同じデータベース内の複数テーブルのスキーマとデータを同期するために使用できます。詳細については、「Realtime Compute for Apache Flink の CREATE TABLE AS 文と CREATE DATABASE AS 文を使用して ApsaraDB RDS for MySQL インスタンスから StarRocks クラスターにデータを同期する」をご参照ください。
構文
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);WITH 句のコネクタオプション
カテゴリ | オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
一般 | connector | テーブルタイプです。 | String | はい | デフォルト値なし | 値を |
jdbc-url | データベースへの接続に使用される Java Database Connectivity (JDBC) URL です。 | String | はい | デフォルト値なし | 指定されたフロントエンド (FE) の IP アドレスと JDBC ポートを使用します。このオプションの値は | |
database-name | StarRocks データベースの名前です。 | String | はい | デフォルト値なし | N/A | |
table-name | StarRocks テーブルの名前です。 | String | はい | デフォルト値なし | N/A | |
username | StarRocks データベースへの接続に使用するユーザー名です。 | String | はい | デフォルト値なし | N/A | |
password | StarRocks データベースへの接続に使用するパスワードです。 | String | はい | デフォルト値なし | N/A | |
starrocks.create.table.properties | StarRocks テーブルのプロパティです。 | String | いいえ | デフォルト値なし | StarRocks テーブルの初期プロパティ (エンジンやレプリカ数など) を指定します。例:'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1' | |
ソース固有 | scan-url | データスキャン用の URL です。 | String | いいえ | デフォルト値なし | 指定された FE の IP アドレスと HTTP ポートを使用します。このオプションの値は 説明 複数の IP アドレスとポート番号のペアはセミコロン (;) で区切ります。 |
scan.connect.timeout-ms | Realtime Compute for Apache Flink の StarRocks コネクタが StarRocks データベースに接続する際のタイムアウト期間です。 接続時間がこのオプションの値を超えると、エラーが返されます。 | String | いいえ | 1000 | 単位:ミリ秒。 | |
scan.params.keep-alive-min | クエリタスクのキープアライブ期間です。 | String | いいえ | 10 | N/A | |
scan.params.query-timeout-s | クエリタスクのタイムアウト期間です。 このオプションで指定された期間内にクエリ結果が返されない場合、クエリタスクは停止します。 | String | いいえ | 600 | 単位:秒。 | |
scan.params.mem-limit-byte | バックエンド (BE) ノードでの単一クエリの最大メモリです。 | String | いいえ | 1073741824 (1 GB) | 単位:バイト。 | |
scan.max-retries | クエリが失敗した場合の最大リトライ回数です。 リトライ回数がこのオプションの値に達すると、エラーが返されます。 | String | いいえ | 1 | N/A | |
シンク固有 | load-url | データインポート用の URL です。 | String | はい | デフォルト値なし | 指定された FE の IP アドレスと HTTP ポートを使用します。このオプションの値は 説明 複数の IP アドレスとポート番号のペアはセミコロン (;) で区切ります。 |
sink.semantic | データ書き込みのセマンティクスです。 | String | いいえ | at-least-once | 有効値:
| |
sink.buffer-flush.max-bytes | バッファーで許容される最大データ量です。 | String | いいえ | 94371840 (90 MB) | 有効値:64 MB から 10 GB。 | |
sink.buffer-flush.max-rows | バッファーで許容される最大行数です。 | String | いいえ | 500000 | 有効値:64000 から 5000000。 | |
sink.buffer-flush.interval-ms | バッファーがリフレッシュされる間隔です。 | String | いいえ | 300000 | 有効値:1000 から 3600000。単位:ミリ秒。 | |
sink.max-retries | テーブルへのデータ書き込みの最大リトライ回数です。 | String | いいえ | 3 | 有効値:0 から 10。 | |
sink.connect.timeout-ms | StarRocks データベースへの接続タイムアウト期間です。 | String | いいえ | 1000 | 有効値:100 から 60000。単位:ミリ秒。 | |
sink.properties.* | シンクテーブルのプロパティです。 | String | いいえ | デフォルト値なし | Stream Load のインポートプロパティです。たとえば、sink.properties.format プロパティは、Stream Load モードでインポートされるデータの形式を指定します。データ形式は CSV にすることができます。オプションの詳細については、「Stream Load」をご参照ください。 | |
ディメンションテーブル固有 | lookup.cache.enabled | ディメンションテーブルをキャッシュするかどうかを指定します。 | Boolean | いいえ | true | 有効値:
重要
|
データ型のマッピング
StarRocks のデータ型 | Realtime Compute for Apache Flink のデータ型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED 説明 このデータ型のマッピングは VVR 8.0.10 以降でのみサポートされます。 | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(m) 説明
| CHAR(n) |
VARCHAR(m) 説明
| CHAR(n) |
VARCHAR | STRING |
VARBINARY 説明 このデータ型のマッピングは VVR 8.0.10 以降でのみサポートされます。 | VARBINARY |
コード例
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;StarRocks は NULL 可能プライマリキー列を許容しますが、Flink はデータ整合性のためにプライマリキーが NULL 不可かつ一意であることを要求します。StarRocks で NULL 可能プライマリキーを使用すると、Invalid primary key. Column 'xxx' is nullable というエラーが発生します。詳細については、「テーブルへの書き込み時に「Invalid primary key. Column 'xxx' is nullable.」エラーが発生する理由」をご参照ください。
データインジェスト
StarRocks パイプラインコネクタを使用すると、アップストリームのデータソースから外部の StarRocks データベースにデータレコードとテーブルスキーマの変更を簡単に書き込むことができます。オープンソースの StarRocks と フルマネージド EMR Serverless StarRocks の両方がサポートされています。
機能
データベースとテーブルの自動作成
アップストリームのデータベースとテーブルがダウンストリームの StarRocks インスタンスに存在しない場合、データベースとテーブルは自動的に作成されます。
table.create.properties.*オプションを設定して、テーブルの自動作成に関するオプションを指定できます。テーブルスキーマ変更の同期
StarRocks コネクタは、CreateTableEvent、AddColumnEvent、DropColumnEvent イベントをダウンストリームのデータベースに自動的に適用します。
VVR 11.1 以降、互換性のある列の型の変更がサポートされています。詳細については、StarRocks ドキュメントの「ALTER TABLE」をご参照ください。
注意事項
StarRocks コネクタは at-least-once セマンティクスのみをサポートし、プライマリキーテーブルを使用して書き込み操作のべき等性を保証します。
データを同期するテーブルにはプライマリキーが含まれている必要があります。プライマリキーを含まないテーブルの場合、テーブルのデータをダウンストリームのデータベースに書き込む前に、
TRANSFORM文ブロックで各テーブルにプライマリキーを指定する必要があります。コード例:transform: - source-table: ... primary-keys: id, ...自動作成されたテーブルのバケットキーはプライマリキーと同じでなければならず、テーブルにパーティションキーを含めることはできません。
テーブルスキーマの変更を同期する際、新しい列は既存の列の末尾にのみ追加できます。デフォルトでは、スキーマ進化には Lenient モードが使用されます。このモードでは、テーブルの他の位置に挿入された列は自動的に既存の列の末尾に移動されます。
2.5.7 より前のバージョンの StarRocks を使用する場合、
table.create.num-bucketsオプションを使用してバケット数を明示的に指定する必要があります。StarRocks 2.5.7 以降を使用する場合、バケット数は自動的に指定されます。詳細については、「データ分散」をご参照ください。StarRocks 3.2 以降を使用する場合、テーブルスキーマの変更を高速化するために、
table.create.properties.fast_schema_evolutionオプションを true に設定することを推奨します。Flink CDC を使用して EMR Serverless StarRocks にデータをインジェストする際に、ストリームデータの問題が発生する可能性があります。これらの問題を回避するには、次のいずれかのオプションを使用します。
SQL ジョブを使用し、ジョブドラフトで
sink.version=V1を設定します。引き続き Flink CDC を使用しますが、
FE emr_internal_redirectを有効にします。ロードバランシングには SLB ではなく、ビルトインの Private Zone を備えた EMR Serverless StarRocks インスタンスを使用します。
構文
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
sink.buffer-flush.interval-ms: 5000 # バッファーされたデータを 5 秒ごとにフラッシュします。コネクタオプション
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
| コネクタ名です。 | String | はい | デフォルト値なし | 値を |
| シンクの表示名です。 | String | いいえ | デフォルト値なし | N/A |
| データベースへの接続に使用される JDBC URL です。 | String | はい | デフォルト値なし | 複数の URL を指定できます。URL はカンマ ( |
| FE ノードへの接続に使用される HTTP URL です。 | String | はい | デフォルト値なし | 複数の URL を指定できます。URL はセミコロン ( |
| StarRocks データベースへの接続に使用するユーザー名です。 | String | はい | デフォルト値なし | 宛先テーブルに対する SELECT および INSERT 権限をユーザーに付与する必要があります。StarRocks の GRANT コマンドを使用して、必要な権限をユーザーに付与できます。 |
| StarRocks データベースへの接続に使用するパスワードです。 | String | はい | デフォルト値なし | N/A |
| データ書き込みのセマンティクスです。 | String | いいえ | at-least-once | 有効値:
|
| Stream Load に使用されるラベルプレフィックスです。 | String | いいえ | デフォルト値なし | N/A |
| HTTP 接続のタイムアウト期間です。 | Integer | いいえ | 30000 | 単位:ミリ秒。有効値:100 から 60000。 |
| クライアントがサーバーからの 100 Continue 応答を待機するタイムアウト期間です。 | Integer | いいえ | 30000 | 単位:ミリ秒。有効値:3000 から 600000。 |
| データが StarRocks データベースに書き込まれる前にメモリにキャッシュできるデータのサイズです。 | Long | いいえ | 157286400 | 単位:バイト。有効値:64 MB から 10 GB。 説明
|
| データが StarRocks データベースに書き込まれる前にメモリにキャッシュできるレコード数です。 | Long | いいえ | 500000 | 有効値:64000 から 5000000。 |
| 各テーブルの 2 つの連続するフラッシュ操作の間隔です。 | Long | いいえ | 300000 | 単位:ミリ秒。 説明 データセットが小さい場合は、このオプションを減らして、バッファーされたデータが迅速にフラッシュされるようにします。 |
| 最大リトライ回数です。 | Long | いいえ | 3 | 有効値:0 から 1000。 |
| フラッシュ操作を実行する必要があるかどうかを検出するための 2 つの連続するチェックの間隔です。 | Long | いいえ | 50 | 単位:ミリ秒。 |
| Stream Load モードでのデータインポート中のスレッド数です。 | Integer | いいえ | 2 | N/A |
| データインポートに Stream Load トランザクションインターフェイスを使用するかどうかを指定します。 | Boolean | いいえ | true | このオプションの設定は、サポートされているデータベースを使用している場合にのみ有効です。 |
| シンクに提供される追加オプションです。 | String | いいえ | デフォルト値なし | サポートされているオプションは Stream Load モードで確認できます。 |
| 自動作成されたテーブルのバケット数です。 | Integer | いいえ | デフォルト値なし |
|
| テーブルが自動的に作成されるときに指定する追加オプションです。 | String | いいえ | デフォルト値なし | たとえば、 |
| スキーマ変更操作のタイムアウト期間です。 | Duration | いいえ | 30 min | このオプションの値は整数に設定する必要があります。単位:秒。 説明 スキーマ変更操作の期間がこのオプションで指定された値を超えると、デプロイメントは失敗します。 |
| 各 Unicode 文字に割り当てるバイト数です。 | Integer | いいえ | 3 | Flink CDC では、VARCHAR の長さは文字数で測定されますが、StarRocks ではバイト数で測定されます。 通常、UTF-8 エンコーディングは Unicode 文字あたり最大 3 バイトを使用します。ただし、一部の珍しい文字や絵文字は 4 バイト以上を必要とする場合があります。 |
データ型のマッピング
StarRocksは、すべての Change Data Capture (CDC) YAML データの型をサポートしているわけではありません。サポートされていない型のデータをダウンストリームデータベースに書き込むと、ジョブは失敗します。変換コンポーネントでビルトイン関数 CAST を使用してサポートされていないデータの型を変換するか、射影文を使用して結果テーブルからサポートされていない型のデータを削除することができます。詳細については、「Flink CDC を使用したデータ取り込み」をご参照ください。
CDC のデータ型 | StarRocks のデータ型 | 備考 |
TINYINT | TINYINT | N/A |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
DECIMAL(p, s) | DECIMAL(p, s) | StarRocks はプライマリキーのデータ型として DECIMAL をサポートしていません。したがって、アップストリームのデータテーブルで DECIMAL データ型の列がプライマリキーとして使用されている場合、StarRocks に同期されるテーブルスキーマのプライマリキーのデータ型は、DECIMAL から VARCHAR に自動的に変更されます。 |
CHAR(n) (n ≤ 85) | CHAR(n × 3) | CDC の CHAR 型列の長さは格納できる文字数を指定します。一方、StarRocks の CHAR 型列の長さは、UTF-8 でエンコードされた格納可能なバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語文字の長さは 3 バイトを超えません。したがって、CDC の CHAR 型列が StarRocks の CHAR 型列にマッピングされると、列の長さはマッピング前の 3 倍になります。 説明
|
CHAR(n) (n > 85) | VARCHAR(n × 3) | CDC の CHAR 型列の長さは格納できる文字数を指定します。一方、StarRocks の CHAR 型列の長さは、UTF-8 でエンコードされた格納可能なバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語文字の長さは 3 バイトを超えません。したがって、CDC の CHAR 型列が StarRocks の VARCHAR 型列にマッピングされると、列の長さはマッピング前の 3 倍になります。 説明
|
VARCHAR(n) | VARCHAR(n × 3) | CDC の VARCHAR 型列の長さは格納できる文字数を指定します。一方、StarRocks の VARCHAR 型列の長さは、UTF-8 でエンコードされた格納可能なバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語文字の長さは 3 バイトを超えません。したがって、CDC の VARCHAR 型列が StarRocks の VARCHAR 型列にマッピングされると、列の長さはマッピング前の 3 倍になります。 |
BINARY(n) | BINARY(n+2) | エラーを防ぐために 2 バイトのパディングが追加されます。 |
VARBINARY(n) | VARBINARY(n+1) | エラーを防ぐために 1 バイトのパディングが追加されます。 |
スキーマ進化
データインジェストシンクとして、StarRocks は次のスキーマ進化イベントをサポートします。
CREATE TABLE
説明StarRocks テーブルが既に存在する場合、コネクタは再度作成しようとはしません。既存のテーブルスキーマがソーススキーマと互換性があることを確認する必要があります。
ADD COLUMN
説明StarRocks では、プライマリキー列を常に最初に配置する必要があります。新しく追加された列もこの制限に従う必要があります。
MODIFY COLUMN TYPE
説明サポートされている変換の詳細については、公式 StarRocks ドキュメントをご参照ください。
DROP COLUMN
TRUNCATE TABLE
DROP TABLE