このトピックでは、StarRocks コネクタの使用方法について説明します。
背景情報
StarRocks は新世代の Massively Parallel Processing(MPP)データウェアハウスであり、あらゆるシナリオで非常に高速なクエリパフォーマンスを提供します。 StarRocks は、非常に高速で統合された分析エクスペリエンスの提供に特化しています。 StarRocks には次の利点があります。
MySQL プロトコルと互換性があります。 MySQL クライアントまたは一般的なビジネスインテリジェンス(BI)ツールを使用して StarRocks にアクセスし、データ分析を行うことができます。
分散アーキテクチャを使用しており、以下の機能を提供します。
テーブルを水平方向に分割し、複数のレプリカにデータを格納します。
10 PB のデータの分析をサポートするために、柔軟な方法でクラスタをスケーリングします。
MPP アーキテクチャをサポートして、データ計算を高速化します。
複数のレプリカをサポートして、フォールトトレランスを確保します。
Flink コネクタはデータをキャッシュし、Stream Load を使用してデータをバッチでインポートして結果テーブルを生成し、データをバッチで読み取ってソーステーブルを生成します。 次の表に、StarRocks コネクタでサポートされている機能を示します。
項目 | 説明 |
テーブルタイプ | ソーステーブル、ディメンションテーブル、シンクテーブル、およびデータインジェスチョンシンク |
実行モード | ストリーミングモードとバッチモード |
データ形式 | CSV |
メトリック | 該当なし |
API タイプ | DataStream API、SQL API、およびデータインジェスチョンの YAML API |
結果テーブルのデータの更新または削除 | サポートされています |
前提条件
StarRocks クラスタが作成されています。 StarRocks クラスタは、EMR の StarRocks クラスタ、または Elastic Compute Service(ECS)インスタンスでホストされているセルフマネージド StarRocks クラスタにすることができます。
制限事項
StarRocks コネクタは、少なくとも 1 回のセマンティクスと 1 回限りのセマンティクスのみをサポートします。
Ververica Runtime(VVR)11.1 以降のみが、StarRocks ディメンションテーブルとのルックアップ結合をサポートします。
SQL 文
機能
E-MapReduce(EMR)の StarRocks は、CREATE TABLE AS(CTAS)文と CREATE DATABASE AS(CDAS)文をサポートしています。 CREATE TABLE AS 文を使用すると、単一テーブルのスキーマとデータを同期できます。 CREATE DATABASE AS 文を使用すると、データベース全体のデータ、または同じデータベース内の複数のテーブルのスキーマとデータを同期できます。 詳細については、「Realtime Compute for Apache Flink の CREATE TABLE AS 文と CREATE DATABASE AS 文を使用して、ApsaraDB RDS for MySQL インスタンスから StarRocks クラスタにデータを同期する」をご参照ください。
構文
CREATE TABLE USER_RESULT(
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'starrocks',
'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
'database-name' = 'xxx',
'table-name' = 'xxx',
'username' = 'xxx',
'password' = 'xxx'
);
WITH 句のコネクタオプション
カテゴリ | オプション | 説明 | データの型 | 必須 | デフォルト値 | 備考 |
全般 | connector | テーブルタイプ。 | 文字列 | はい | デフォルト値なし |
|
jdbc-url | データベースへの接続に使用する Java Database Connectivity(JDBC)URL。 | 文字列 | はい | デフォルト値なし | 指定されたフロントエンド(FE)の IP アドレスと JDBC ポートが使用されます。 このオプションの値は、 | |
database-name | StarRocks データベースの名前。 | 文字列 | はい | デフォルト値なし | 該当なし | |
table-name | StarRocks テーブルの名前。 | 文字列 | はい | デフォルト値なし | 該当なし | |
username | StarRocks データベースへの接続に使用するユーザー名。 | 文字列 | はい | デフォルト値なし | 該当なし | |
password | StarRocks データベースへの接続に使用するパスワード。 | 文字列 | はい | デフォルト値なし | 該当なし | |
starrocks.create.table.properties | StarRocks テーブルのプロパティ。 | 文字列 | いいえ | デフォルト値なし | エンジンやレプリカ数など、StarRocks テーブルの初期プロパティを指定します。 例: 'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1' | |
ソース固有 | scan-url | データスキャンの URL。 | 文字列 | いいえ | デフォルト値なし | 指定された FE の IP アドレスと HTTP ポートが使用されます。 このオプションの値は、 説明 複数の IP アドレスとポート番号のペアをセミコロン(;)で区切ります。 |
scan.connect.timeout-ms | Realtime Compute for Apache Flink の StarRocks コネクタが StarRocks データベースに接続するためのタイムアウト期間。 接続時間がこのオプションの値を超えると、エラーが返されます。 | 文字列 | いいえ | 1000 | 単位: ミリ秒。 | |
scan.params.keep-alive-min | クエリタスクのキープアライブ期間。 | 文字列 | いいえ | 10 | 該当なし | |
scan.params.query-timeout-s | クエリタスクのタイムアウト期間。 このオプションで指定された期間内にクエリ結果が返されない場合、クエリタスクは停止されます。 | 文字列 | いいえ | 600 | 単位: 秒。 | |
scan.params.mem-limit-byte | バックエンド(BE)ノードでの単一クエリの最大メモリ。 | 文字列 | いいえ | 1073741824 (1 GB) | 単位: バイト。 | |
scan.max-retries | クエリが失敗した場合の最大再試行回数。 再試行回数がこのオプションの値に達すると、エラーが返されます。 | 文字列 | いいえ | 1 | 該当なし | |
シンク固有 | load-url | データインポートの URL。 | 文字列 | はい | デフォルト値なし | 指定された FE の IP アドレスと HTTP ポートが使用されます。 このオプションの値は、 説明 複数の IP アドレスとポート番号のペアをセミコロン(;)で区切ります。 |
sink.semantic | データ書き込みのセマンティクス。 | 文字列 | いいえ | at-least-once | 有効な値:
| |
sink.buffer-flush.max-bytes | バッファに許容されるデータの最大量。 | 文字列 | いいえ | 94371840 (90 MB) | 有効な値: 64 MB から 10 GB。 | |
sink.buffer-flush.max-rows | バッファに許容される行の最大数。 | 文字列 | いいえ | 500000 | 有効な値: 64000 から 5000000。 | |
sink.buffer-flush.interval-ms | バッファがリフレッシュされる間隔。 | 文字列 | いいえ | 300000 | 有効な値: 1000 から 3600000。 単位: ミリ秒。 | |
sink.max-retries | テーブルへのデータ書き込みの最大再試行回数。 | 文字列 | いいえ | 3 | 有効な値: 0 から 10。 | |
sink.connect.timeout-ms | StarRocks データベースへの接続のタイムアウト期間。 | 文字列 | いいえ | 1000 | 有効な値: 100 から 60000。 単位: ミリ秒。 | |
sink.properties.* | シンクテーブルのプロパティ。 | 文字列 | いいえ | デフォルト値なし | Stream Load のインポートプロパティ。 たとえば、sink.properties.format プロパティは、Stream Load モードでインポートされるデータの形式を指定します。 データ形式は CSV にすることができます。 オプションの詳細については、「Stream Load」をご参照ください。 | |
ディメンションテーブル固有 | lookup.cache.enabled | ディメンションテーブルをキャッシュするかどうかを指定します。 | ブール値 | いいえ | true | 有効な値:
重要
|
データ型マッピング
StarRocks のデータ型 | Realtime Compute for Apache Flink のデータ型 |
NULL | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
BIGINT UNSIGNED 説明 VVR 8.0.10 以降のみがこのデータ型マッピングをサポートしています。 | DECIMAL(20,0) |
LARGEINT | DECIMAL(20,0) |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
DATETIME | TIMESTAMP |
DECIMAL | DECIMAL |
DECIMALV2 | DECIMAL |
DECIMAL32 | DECIMAL |
DECIMAL64 | DECIMAL |
DECIMAL128 | DECIMAL |
CHAR(m) 説明
| CHAR(n) |
VARCHAR(m) 説明
| CHAR(n) |
VARCHAR | STRING |
VARBINARY 説明 VVR 8.0.10 以降のみがこのデータ型マッピングをサポートしています。 | VARBINARY |
サンプルコード
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
) WITH (
'connector' = 'starrocks',
'jdbc-url' = 'jdbc:mysql://ip:9030',
'scan-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
`runoob_id` BIGINT NOT NULL,
`runoob_title` STRING NOT NULL,
`runoob_author` STRING NOT NULL,
`submission_date` DATE NULL
PRIMARY KEY(`runoob_id`)
NOT ENFORCED
) WITH (
'jdbc-url' = 'jdbc:mysql://ip:9030',
'connector' = 'starrocks',
'load-url' = 'ip:18030',
'database-name' = 'db_name',
'table-name' = 'table_name',
'password' = 'xxxxxxx',
'username' = 'xxxx',
'sink.buffer-flush.interval-ms' = '5000'
);
INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;
データインジェスチョン
StarRocks パイプラインコネクタを使用すると、アップストリームデータソースからのデータレコードとテーブルスキーマの変更を、外部の StarRocks データベース に簡単に書き込むことができます。 オープンソースの StarRocks と フルマネージド EMR Serverless StarRocks の両方がサポートされています。
機能
データベースとテーブルの自動作成
アップストリームデータベースとテーブルがダウンストリーム StarRocks インスタンスに存在しない場合、データベースとテーブルが自動的に作成されます。
table.create.properties.*
オプションを構成して、テーブルの自動作成のオプションを指定できます。テーブルスキーマ変更の同期
StarRocks コネクタは、CreateTableEvent、AddColumnEvent、DropColumnEvent イベントをダウンストリームデータベースに自動的に適用します。
VVR 11.1 以降では、互換性のあるカラム型の変換がサポートされています。 詳細については、StarRocks ドキュメントの ALTER TABLE を参照してください。
使用上の注意
StarRocks コネクタは、少なくとも 1 回のセマンティクスのみをサポートし、プライマリキーテーブルを使用して書き込み操作のべき等性を確保します。
データを同期するテーブルには、プライマリキーが含まれている必要があります。 プライマリキーが含まれていないテーブルの場合、テーブルのデータをダウンストリームデータベースに書き込む前に、
TRANSFORM
文ブロックで各テーブルのプライマリキーを指定する必要があります。 サンプルコード:transform: - source-table: ... primary-keys: id, ...
自動的に作成されたテーブルのバケットキーはプライマリキーと同じである必要があり、テーブルにはパーティションキーを含めることができません。
テーブルスキーマの変更の同期中、新しいカラムは既存のカラムの末尾にのみ追加できます。 デフォルトでは、スキーマ進化には Lenient モードが使用されます。 このモードでは、テーブルの他の位置に挿入されたカラムは、既存のカラムの末尾に自動的に移動されます。
2.5.7 より前の StarRocks バージョンを使用している場合は、
table.create.num-buckets
オプションを使用してバケット数を明示的に指定する必要があります。 StarRocks 2.5.7 以降を使用している場合は、バケット数は自動的に指定されます。 詳細については、「データ分散」を参照してください。StarRocks 3.2 以降を使用している場合は、
table.create.properties.fast_schema_evolution
オプションを true に設定して、テーブルスキーマの変更を高速化することをお勧めします。
構文
source:
...
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: pass
コネクタオプション
オプション | 説明 | データ型 | 必須 | デフォルト値 | 備考 |
| コネクタ名。 | String | はい | デフォルト値なし | 値を |
| シンクの表示名。 | String | いいえ | デフォルト値なし | 該当なし |
| データベースへの接続に使用する JDBC URL。 | String | はい | デフォルト値なし | 複数の URL を指定できます。URL はカンマ( |
| FE ノードへの接続に使用する HTTP URL。 | String | はい | デフォルト値なし | 複数の URL を指定できます。URL はセミコロン( |
| StarRocks データベースへの接続に使用するユーザー名。 | String | はい | デフォルト値なし | 宛先テーブルに対する SELECT 権限と INSERT 権限をユーザーに付与する必要があります。StarRocks の GRANT コマンドを使用して、必要な権限をユーザーに付与できます。 |
| StarRocks データベースへの接続に使用するパスワード。 | String | はい | デフォルト値なし | 該当なし |
| データ書き込みのセマンティクス。 | String | いいえ | at-least-once | 有効な値:
|
| Stream Load に使用されるラベルプレフィックス。 | String | いいえ | デフォルト値なし | 該当なし |
| HTTP 接続のタイムアウト期間。 | Integer | いいえ | 30000 | 単位:ミリ秒。有効な値:100 ~ 60000。 |
| クライアントがサーバーからの 100 Continue 応答を待機するタイムアウト期間。 | Integer | いいえ | 30000 | 単位:ミリ秒。有効な値:3000 ~ 600000。 |
| データが StarRocks データベースに書き込まれる前にメモリにキャッシュできるデータサイズ。 | Long | いいえ | 157286400 | 単位:バイト。有効な値:64 MB ~ 10 GB。 説明
|
| データが StarRocks データベースに書き込まれる前に、メモリにキャッシュできるレコード数。 | Long | いいえ | 500000 | 有効な値:64000 ~ 5000000。 |
| 各テーブルの 2 回の連続するフラッシュ操作の間隔。 | Long | いいえ | 300000 | 単位:ミリ秒。 |
| 最大再試行回数。 | Long | いいえ | 3 | 有効な値:0 ~ 1000。 |
| フラッシュ操作を実行する必要があるかどうかを検出するための、2 回の連続するチェックの間隔。 | Long | いいえ | 50 | 単位:ミリ秒。 |
| Stream Load モードでのデータインポート中のスレッド数。 | Integer | いいえ | 2 | 該当なし |
| データインポートに Stream Load トランザクションインターフェイス を使用するかどうかを指定します。 | Boolean | いいえ | true | このオプションの設定は、サポートされているデータベースが使用されている場合にのみ有効になります。 |
| シンクに追加で提供されるオプション。 | String | いいえ | デフォルト値なし | Stream Load モードでサポートされているオプションを表示できます。 |
| 自動的に作成されたテーブルのバケット数。 | Integer | いいえ | デフォルト値なし |
|
| テーブルが自動的に作成されるときに指定される追加オプション。 | String | いいえ | デフォルト値なし | たとえば、 |
| スキーマ変更操作のタイムアウト期間。 | Duration | いいえ | 30 min | このオプションの値は整数に設定する必要があります。単位:秒。 説明 スキーマ変更操作の期間がこのオプションで指定された値を超えると、デプロイは失敗します。 |
データ型マッピング
StarRocks は、すべての Change Data Capture(CDC)YAML データ型をサポートしているわけではありません。サポートされていない型のデータをダウンストリーム データベースに書き込むと、ジョブは失敗します。変換コンポーネントでビルトイン関数 CAST を使用して、サポートされていないデータ型を変換するか、射影文を使用してシンク テーブルからサポートされていない型のデータを削除できます。詳細については、「データ投入開発リファレンス」をご参照ください。
CDC のデータ型 | StarRocks のデータ型 | 備考 |
TINYINT | TINYINT | 該当なし。 |
SMALLINT | SMALLINT | |
INT | INT | |
BIGINT | BIGINT | |
FLOAT | FLOAT | |
DOUBLE | DOUBLE | |
BOOLEAN | BOOLEAN | |
DATE | DATE | |
TIMESTAMP | DATETIME | |
TIMESTAMP_LTZ | DATETIME | |
DECIMAL(p, s) | DECIMAL(p, s) | StarRocks はプライマリキーのデータ型として DECIMAL をサポートしていません。そのため、アップストリームデータテーブルの DECIMAL データ型の列がプライマリキーとして使用されている場合、StarRocks に同期されるテーブルスキーマのプライマリキーのデータ型は DECIMAL から VARCHAR に自動的に変更されます。 |
CHAR(n) (n ≤ 85) | CHAR(n × 3) | CDC の CHAR 型列の長さは、格納できる文字数を指定します。ただし、StarRocks の CHAR 型列の長さは、格納できる UTF-8 でエンコードされたバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語の文字の長さは 3 バイトを超えることはできません。したがって、CDC の CHAR 型列が StarRocks の CHAR 型列にマッピングされた後、列の長さはマッピング前の長さの 3 倍になります。 説明 StarRocks の CHAR 型列の長さは 255 バイトを超えることはできません。したがって、長さが 85 文字を超えない CDC CHAR 型列のみを StarRocks の CHAR 型列にマッピングできます。 |
CHAR(n) (n > 85) | VARCHAR(n × 3) | CDC の CHAR 型列の長さは、格納できる文字数を指定します。ただし、StarRocks の VARCHAR 型列の長さは、格納できる UTF-8 でエンコードされたバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語の文字の長さは 3 バイトを超えることはできません。したがって、CDC の CHAR 型列が StarRocks の VARCHAR 型列にマッピングされた後、列の長さはマッピング前の長さの 3 倍になります。 説明 StarRocks の CHAR 型列の長さは 255 バイトを超えることはできません。したがって、長さが 85 文字を超える CDC CHAR 型列は、StarRocks の VARCHAR 型列にマッピングされます。 |
VARCHAR(n) | VARCHAR(n × 3) | CDC の VARCHAR 型列の長さは、格納できる文字数を指定します。ただし、StarRocks の VARCHAR 型列の長さは、格納できる UTF-8 でエンコードされたバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語の文字の長さは 3 バイトを超えることはできません。したがって、CDC の VARCHAR 型列が StarRocks の VARCHAR 型列にマッピングされた後、列の長さはマッピング前の長さの 3 倍になります。 |
BINARY(n) | BINARY(n+2) | エラーを回避するために、2 つの パディング バイトが追加されます。 |
VARBINARY(n) | VARBINARY(n+1) | エラーを回避するために、1 つの パディング バイトが追加されます。 |