すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:StarRocks コネクタ

最終更新日:Jun 11, 2025

このトピックでは、StarRocks コネクタの使用方法について説明します。

背景情報

StarRocks は新世代の Massively Parallel Processing(MPP)データウェアハウスであり、あらゆるシナリオで非常に高速なクエリパフォーマンスを提供します。 StarRocks は、非常に高速で統合された分析エクスペリエンスの提供に特化しています。 StarRocks には次の利点があります。

  • MySQL プロトコルと互換性があります。 MySQL クライアントまたは一般的なビジネスインテリジェンス(BI)ツールを使用して StarRocks にアクセスし、データ分析を行うことができます。

  • 分散アーキテクチャを使用しており、以下の機能を提供します。

    • テーブルを水平方向に分割し、複数のレプリカにデータを格納します。

    • 10 PB のデータの分析をサポートするために、柔軟な方法でクラスタをスケーリングします。

    • MPP アーキテクチャをサポートして、データ計算を高速化します。

    • 複数のレプリカをサポートして、フォールトトレランスを確保します。

Flink コネクタはデータをキャッシュし、Stream Load を使用してデータをバッチでインポートして結果テーブルを生成し、データをバッチで読み取ってソーステーブルを生成します。 次の表に、StarRocks コネクタでサポートされている機能を示します。

項目

説明

テーブルタイプ

ソーステーブル、ディメンションテーブル、シンクテーブル、およびデータインジェスチョンシンク

実行モード

ストリーミングモードとバッチモード

データ形式

CSV

メトリック

該当なし

API タイプ

DataStream API、SQL API、およびデータインジェスチョンの YAML API

結果テーブルのデータの更新または削除

サポートされています

前提条件

StarRocks クラスタが作成されています。 StarRocks クラスタは、EMR の StarRocks クラスタ、または Elastic Compute Service(ECS)インスタンスでホストされているセルフマネージド StarRocks クラスタにすることができます。

制限事項

  • StarRocks コネクタは、少なくとも 1 回のセマンティクスと 1 回限りのセマンティクスのみをサポートします。

  • Ververica Runtime(VVR)11.1 以降のみが、StarRocks ディメンションテーブルとのルックアップ結合をサポートします。

SQL 文

機能

E-MapReduce(EMR)の StarRocks は、CREATE TABLE AS(CTAS)文と CREATE DATABASE AS(CDAS)文をサポートしています。 CREATE TABLE AS 文を使用すると、単一テーブルのスキーマとデータを同期できます。 CREATE DATABASE AS 文を使用すると、データベース全体のデータ、または同じデータベース内の複数のテーブルのスキーマとデータを同期できます。 詳細については、「Realtime Compute for Apache Flink の CREATE TABLE AS 文と CREATE DATABASE AS 文を使用して、ApsaraDB RDS for MySQL インスタンスから StarRocks クラスタにデータを同期する」をご参照ください。

構文

CREATE TABLE USER_RESULT(
 name VARCHAR,
 score BIGINT
 ) WITH (
 'connector' = 'starrocks',
 'jdbc-url'='jdbc:mysql://fe1_ip:query_port,fe2_ip:query_port,fe3_ip:query_port?xxxxx',
 'load-url'='fe1_ip:http_port;fe2_ip:http_port;fe3_ip:http_port',
 'database-name' = 'xxx',
 'table-name' = 'xxx',
 'username' = 'xxx',
 'password' = 'xxx'
 );

WITH 句のコネクタオプション

カテゴリ

オプション

説明

データの型

必須

デフォルト値

備考

全般

connector

テーブルタイプ。

文字列

はい

デフォルト値なし

starrocks に設定します。

jdbc-url

データベースへの接続に使用する Java Database Connectivity(JDBC)URL。

文字列

はい

デフォルト値なし

指定されたフロントエンド(FE)の IP アドレスと JDBC ポートが使用されます。 このオプションの値は、jdbc:mysql://ip:port 形式です。

database-name

StarRocks データベースの名前。

文字列

はい

デフォルト値なし

該当なし

table-name

StarRocks テーブルの名前。

文字列

はい

デフォルト値なし

該当なし

username

StarRocks データベースへの接続に使用するユーザー名。

文字列

はい

デフォルト値なし

該当なし

password

StarRocks データベースへの接続に使用するパスワード。

文字列

はい

デフォルト値なし

該当なし

starrocks.create.table.properties

StarRocks テーブルのプロパティ。

文字列

いいえ

デフォルト値なし

エンジンやレプリカ数など、StarRocks テーブルの初期プロパティを指定します。 例: 'starrocks.create.table.properties' = 'buckets 8','starrocks.create.table.properties' = 'replication_num=1'

ソース固有

scan-url

データスキャンの URL。

文字列

いいえ

デフォルト値なし

指定された FE の IP アドレスと HTTP ポートが使用されます。 このオプションの値は、fe_ip:http_port;fe_ip:http_port 形式です。

説明

複数の IP アドレスとポート番号のペアをセミコロン(;)で区切ります。

scan.connect.timeout-ms

Realtime Compute for Apache Flink の StarRocks コネクタが StarRocks データベースに接続するためのタイムアウト期間。

接続時間がこのオプションの値を超えると、エラーが返されます。

文字列

いいえ

1000

単位: ミリ秒。

scan.params.keep-alive-min

クエリタスクのキープアライブ期間。

文字列

いいえ

10

該当なし

scan.params.query-timeout-s

クエリタスクのタイムアウト期間。

このオプションで指定された期間内にクエリ結果が返されない場合、クエリタスクは停止されます。

文字列

いいえ

600

単位: 秒。

scan.params.mem-limit-byte

バックエンド(BE)ノードでの単一クエリの最大メモリ。

文字列

いいえ

1073741824 (1 GB)

単位: バイト。

scan.max-retries

クエリが失敗した場合の最大再試行回数。

再試行回数がこのオプションの値に達すると、エラーが返されます。

文字列

いいえ

1

該当なし

シンク固有

load-url

データインポートの URL。

文字列

はい

デフォルト値なし

指定された FE の IP アドレスと HTTP ポートが使用されます。 このオプションの値は、fe_ip:http_port;fe_ip:http_port 形式です。

説明

複数の IP アドレスとポート番号のペアをセミコロン(;)で区切ります。

sink.semantic

データ書き込みのセマンティクス。

文字列

いいえ

at-least-once

有効な値:

  • at-least-once: 少なくとも 1 回のセマンティクスが使用されます。 これはデフォルト値です。

  • exactly-once: 1 回限りのセマンティクスが使用されます。

sink.buffer-flush.max-bytes

バッファに許容されるデータの最大量。

文字列

いいえ

94371840 (90 MB)

有効な値: 64 MB から 10 GB。

sink.buffer-flush.max-rows

バッファに許容される行の最大数。

文字列

いいえ

500000

有効な値: 64000 から 5000000。

sink.buffer-flush.interval-ms

バッファがリフレッシュされる間隔。

文字列

いいえ

300000

有効な値: 1000 から 3600000。 単位: ミリ秒。

sink.max-retries

テーブルへのデータ書き込みの最大再試行回数。

文字列

いいえ

3

有効な値: 0 から 10。

sink.connect.timeout-ms

StarRocks データベースへの接続のタイムアウト期間。

文字列

いいえ

1000

有効な値: 100 から 60000。 単位: ミリ秒。

sink.properties.*

シンクテーブルのプロパティ。

文字列

いいえ

デフォルト値なし

Stream Load のインポートプロパティ。 たとえば、sink.properties.format プロパティは、Stream Load モードでインポートされるデータの形式を指定します。 データ形式は CSV にすることができます。 オプションの詳細については、「Stream Load」をご参照ください。

ディメンションテーブル固有

lookup.cache.enabled

ディメンションテーブルをキャッシュするかどうかを指定します。

ブール値

いいえ

true

有効な値:

  • true: ディメンションテーブルデータのキャッシュを有効にし、タイムアウトまで後続のクエリでキャッシュから直接データを取得します。 これにより、I/O オーバーヘッドが削減されます。

  • false: ディメンションテーブルのキャッシュを無効にし、データソースから直接データを取得します。

重要
  • このオプションは、VVR 11.1 以降でサポートされています。

  • 次のシナリオでは、このオプションを false に設定します。

    • ディメンションテーブルが頻繁に更新され、最新のデータが必要な場合。

    • ディメンションテーブルに大量のデータがある場合。

データ型マッピング

StarRocks のデータ型

Realtime Compute for Apache Flink のデータ型

NULL

NULL

BOOLEAN

BOOLEAN

TINYINT

TINYINT

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

BIGINT UNSIGNED

説明

VVR 8.0.10 以降のみがこのデータ型マッピングをサポートしています。

DECIMAL(20,0)

LARGEINT

DECIMAL(20,0)

FLOAT

FLOAT

DOUBLE

DOUBLE

DATE

DATE

DATETIME

TIMESTAMP

DECIMAL

DECIMAL

DECIMALV2

DECIMAL

DECIMAL32

DECIMAL

DECIMAL64

DECIMAL

DECIMAL128

DECIMAL

CHAR(m)

説明
  • VVR 8.0.10 を使用する Realtime Compute for Apache Flink のみ、マッピング前の CHAR 型カラムの長さを 3 倍に自動的に拡張できます (m=n × 3、n ≤ 85)。 これは、MySQL と StarRocks のエンコーディングの違いに適応しています。

  • VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink のみ、マッピング前の CHAR 型カラムの長さを 4 倍に自動的に拡張できます (m=n × 4、n ≤ 63)。 これは、MySQL と StarRocks のエンコーディングの違いに適応しています。

  • StarRocks の CHAR 型カラムの長さは 255 バイトを超えることはできません。 したがって、Realtime Compute for Apache Flink で自動的に長さを拡張した後に長さが 255 文字を超えない CHAR 型カラムのみを、StarRocks の CHAR 型カラムにマッピングできます。

CHAR(n)

VARCHAR(m)

説明
  • VVR 8.0.10 を使用する Realtime Compute for Apache Flink のみ、マッピング前の VARCHAR 型カラムの長さを 3 倍に自動的に拡張できます (m=n × 4、n > 85)。 これは、MySQL と StarRocks のエンコーディングの違いに適応しています。

  • VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink のみ、マッピング前の VARCHAR 型カラムの長さを 4 倍に自動的に拡張できます (m=n × 4、n > 63)。 これは、MySQL と StarRocks のエンコーディングの違いに適応しています。

  • StarRocks の CHAR 型カラムの長さは 255 バイトを超えることはできません。 したがって、Realtime Compute for Apache Flink で自動的に長さを拡張した後に長さが 255 文字を超える CHAR 型カラムのみを、StarRocks の VARCHAR 型カラムにマッピングできます。

CHAR(n)

VARCHAR

STRING

VARBINARY

説明

VVR 8.0.10 以降のみがこのデータ型マッピングをサポートしています。

VARBINARY

サンプルコード

CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_source` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
) WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'scan-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxxx'
);
CREATE TEMPORARY TABLE IF NOT EXISTS `runoob_tbl_sink` (
  `runoob_id` BIGINT NOT NULL,
  `runoob_title` STRING NOT NULL,
  `runoob_author` STRING NOT NULL,
  `submission_date` DATE NULL
  PRIMARY KEY(`runoob_id`)
  NOT ENFORCED
) WITH (
  'jdbc-url' = 'jdbc:mysql://ip:9030',
  'connector' = 'starrocks',
  'load-url' = 'ip:18030',
  'database-name' = 'db_name',
  'table-name' = 'table_name',
  'password' = 'xxxxxxx',
  'username' = 'xxxx',
  'sink.buffer-flush.interval-ms' = '5000'
);

INSERT INTO runoob_tbl_sink SELECT * FROM runoob_tbl_source;

データインジェスチョン

StarRocks パイプラインコネクタを使用すると、アップストリームデータソースからのデータレコードとテーブルスキーマの変更を、外部の StarRocks データベース に簡単に書き込むことができます。 オープンソースの StarRocks と フルマネージド EMR Serverless StarRocks の両方がサポートされています。

機能

  • データベースとテーブルの自動作成

    アップストリームデータベースとテーブルがダウンストリーム StarRocks インスタンスに存在しない場合、データベースとテーブルが自動的に作成されます。 table.create.properties.* オプションを構成して、テーブルの自動作成のオプションを指定できます。

  • テーブルスキーマ変更の同期

    StarRocks コネクタは、CreateTableEvent、AddColumnEvent、DropColumnEvent イベントをダウンストリームデータベースに自動的に適用します。

  • VVR 11.1 以降では、互換性のあるカラム型の変換がサポートされています。 詳細については、StarRocks ドキュメントの ALTER TABLE を参照してください。

使用上の注意

  • StarRocks コネクタは、少なくとも 1 回のセマンティクスのみをサポートし、プライマリキーテーブルを使用して書き込み操作のべき等性を確保します。

  • データを同期するテーブルには、プライマリキーが含まれている必要があります。 プライマリキーが含まれていないテーブルの場合、テーブルのデータをダウンストリームデータベースに書き込む前に、TRANSFORM 文ブロックで各テーブルのプライマリキーを指定する必要があります。 サンプルコード:

    transform:
      - source-table: ...
        primary-keys: id, ...
  • 自動的に作成されたテーブルのバケットキーはプライマリキーと同じである必要があり、テーブルにはパーティションキーを含めることができません。

  • テーブルスキーマの変更の同期中、新しいカラムは既存のカラムの末尾にのみ追加できます。 デフォルトでは、スキーマ進化には Lenient モードが使用されます。 このモードでは、テーブルの他の位置に挿入されたカラムは、既存のカラムの末尾に自動的に移動されます。

  • 2.5.7 より前の StarRocks バージョンを使用している場合は、table.create.num-buckets オプションを使用してバケット数を明示的に指定する必要があります。 StarRocks 2.5.7 以降を使用している場合は、バケット数は自動的に指定されます。 詳細については、「データ分散」を参照してください。

  • StarRocks 3.2 以降を使用している場合は、table.create.properties.fast_schema_evolution オプションを true に設定して、テーブルスキーマの変更を高速化することをお勧めします。

構文

source:
  ...

sink:
  type: starrocks
  name: StarRocks Sink
  jdbc-url: jdbc:mysql://127.0.0.1:9030
  load-url: 127.0.0.1:8030
  username: root
  password: pass

コネクタオプション

オプション

説明

データ型

必須

デフォルト値

備考

type

コネクタ名。

String

はい

デフォルト値なし

値を starrocks に設定します。

name

シンクの表示名。

String

いいえ

デフォルト値なし

該当なし

jdbc-url

データベースへの接続に使用する JDBC URL。

String

はい

デフォルト値なし

複数の URL を指定できます。URL はカンマ(,)で区切ります。例:jdbc:mysql://fe_host1:fe_query_port1,fe_host2:fe_query_port2,fe_host3:fe_query_port3

load-url

FE ノードへの接続に使用する HTTP URL。

String

はい

デフォルト値なし

複数の URL を指定できます。URL はセミコロン(;)で区切ります。例:fe_host1:fe_http_port1;fe_host2:fe_http_port2

username

StarRocks データベースへの接続に使用するユーザー名。

String

はい

デフォルト値なし

宛先テーブルに対する SELECT 権限と INSERT 権限をユーザーに付与する必要があります。StarRocks の GRANT コマンドを使用して、必要な権限をユーザーに付与できます。

password

StarRocks データベースへの接続に使用するパスワード。

String

はい

デフォルト値なし

該当なし

sink.semantic

データ書き込みのセマンティクス。

String

いいえ

at-least-once

有効な値:

  • at-least-once: 少なくとも 1 回のセマンティクスが使用されます。これはデフォルト値です。

  • exactly-once: 1 回限りのセマンティクスが使用されます。

sink.label-prefix

Stream Load に使用されるラベルプレフィックス。

String

いいえ

デフォルト値なし

該当なし

sink.connect.timeout-ms

HTTP 接続のタイムアウト期間。

Integer

いいえ

30000

単位:ミリ秒。有効な値:100 ~ 60000。

sink.wait-for-continue.timeout-ms

クライアントがサーバーからの 100 Continue 応答を待機するタイムアウト期間。

Integer

いいえ

30000

単位:ミリ秒。有効な値:3000 ~ 600000。

sink.buffer-flush.max-bytes

データが StarRocks データベースに書き込まれる前にメモリにキャッシュできるデータサイズ。

Long

いいえ

157286400

単位:バイト。有効な値:64 MB ~ 10 GB。

説明
  • キャッシュスペースはすべてのテーブルで共有されます。キャッシュサイズが指定された値に達すると、コネクタは複数のテーブルでフラッシュ操作を実行します。

  • スループットを向上させるには、このパラメーターの値を増やすことができます。ただし、これによりデータインポートのレイテンシが増加する可能性があります。

sink.buffer-flush.max-rows

データが StarRocks データベースに書き込まれる前に、メモリにキャッシュできるレコード数。

Long

いいえ

500000

有効な値:64000 ~ 5000000。

sink.buffer-flush.interval-ms

各テーブルの 2 回の連続するフラッシュ操作の間隔。

Long

いいえ

300000

単位:ミリ秒。

sink.max-retries

最大再試行回数。

Long

いいえ

3

有効な値:0 ~ 1000。

sink.scan-frequency.ms

フラッシュ操作を実行する必要があるかどうかを検出するための、2 回の連続するチェックの間隔。

Long

いいえ

50

単位:ミリ秒。

sink.io.thread-count

Stream Load モードでのデータインポート中のスレッド数。

Integer

いいえ

2

該当なし

sink.at-least-once.use-transaction-stream-load

データインポートに Stream Load トランザクションインターフェイス を使用するかどうかを指定します。

Boolean

いいえ

true

このオプションの設定は、サポートされているデータベースが使用されている場合にのみ有効になります。

sink.properties.*

シンクに追加で提供されるオプション。

String

いいえ

デフォルト値なし

Stream Load モードでサポートされているオプションを表示できます。

table.create.num-buckets

自動的に作成されたテーブルのバケット数。

Integer

いいえ

デフォルト値なし

  • StarRocks 2.5.7 以降:このオプションは任意です。バケット数は自動的に指定できます。詳細については、「データ分散」をご参照ください。

  • StarRocks 2.5.6 以前:このオプションは必須です。

table.create.properties.*

テーブルが自動的に作成されるときに指定される追加オプション。

String

いいえ

デフォルト値なし

たとえば、'table.create.properties.fast_schema_evolution' = 'true' 構成を追加して、高速スキーマ進化機能を有効にできます。詳細については、StarRocks ドキュメント をご参照ください。

table.schema-change.timeout

スキーマ変更操作のタイムアウト期間。

Duration

いいえ

30 min

このオプションの値は整数に設定する必要があります。単位:秒。

説明

スキーマ変更操作の期間がこのオプションで指定された値を超えると、デプロイは失敗します。

データ型マッピング

説明

StarRocks は、すべての Change Data Capture(CDC)YAML データ型をサポートしているわけではありません。サポートされていない型のデータをダウンストリーム データベースに書き込むと、ジョブは失敗します。変換コンポーネントでビルトイン関数 CAST を使用して、サポートされていないデータ型を変換するか、射影文を使用してシンク テーブルからサポートされていない型のデータを削除できます。詳細については、「データ投入開発リファレンス」をご参照ください。

CDC のデータ型

StarRocks のデータ型

備考

TINYINT

TINYINT

該当なし。

SMALLINT

SMALLINT

INT

INT

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLE

DOUBLE

BOOLEAN

BOOLEAN

DATE

DATE

TIMESTAMP

DATETIME

TIMESTAMP_LTZ

DATETIME

DECIMAL(p, s)

DECIMAL(p, s)

StarRocks はプライマリキーのデータ型として DECIMAL をサポートしていません。そのため、アップストリームデータテーブルの DECIMAL データ型の列がプライマリキーとして使用されている場合、StarRocks に同期されるテーブルスキーマのプライマリキーのデータ型は DECIMAL から VARCHAR に自動的に変更されます。

CHAR(n)

(n ≤ 85)

CHAR(n × 3)

CDC の CHAR 型列の長さは、格納できる文字数を指定します。ただし、StarRocks の CHAR 型列の長さは、格納できる UTF-8 でエンコードされたバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語の文字の長さは 3 バイトを超えることはできません。したがって、CDC の CHAR 型列が StarRocks の CHAR 型列にマッピングされた後、列の長さはマッピング前の長さの 3 倍になります。

説明

StarRocks の CHAR 型列の長さは 255 バイトを超えることはできません。したがって、長さが 85 文字を超えない CDC CHAR 型列のみを StarRocks の CHAR 型列にマッピングできます。

CHAR(n)

(n > 85)

VARCHAR(n × 3)

CDC の CHAR 型列の長さは、格納できる文字数を指定します。ただし、StarRocks の VARCHAR 型列の長さは、格納できる UTF-8 でエンコードされたバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語の文字の長さは 3 バイトを超えることはできません。したがって、CDC の CHAR 型列が StarRocks の VARCHAR 型列にマッピングされた後、列の長さはマッピング前の長さの 3 倍になります。

説明

StarRocks の CHAR 型列の長さは 255 バイトを超えることはできません。したがって、長さが 85 文字を超える CDC CHAR 型列は、StarRocks の VARCHAR 型列にマッピングされます。

VARCHAR(n)

VARCHAR(n × 3)

CDC の VARCHAR 型列の長さは、格納できる文字数を指定します。ただし、StarRocks の VARCHAR 型列の長さは、格納できる UTF-8 でエンコードされたバイト数を指定します。ほとんどの場合、UTF-8 でエンコードされた中国語の文字の長さは 3 バイトを超えることはできません。したがって、CDC の VARCHAR 型列が StarRocks の VARCHAR 型列にマッピングされた後、列の長さはマッピング前の長さの 3 倍になります。

BINARY(n)

BINARY(n+2)

エラーを回避するために、2 つの パディング バイトが追加されます。

VARBINARY(n)

VARBINARY(n+1)

エラーを回避するために、1 つの パディング バイトが追加されます。