すべてのプロダクト
Search
ドキュメントセンター

DataWorks:StarRocks データソース

最終更新日:Mar 07, 2026

StarRocks データソースを使用すると、StarRocks のデータの読み取りと書き込みができます。このトピックでは、DataWorks がサポートする StarRocks のデータ同期機能について説明します。

サポートされるバージョン

  • EMR Serverless StarRocks のすべてのバージョンがサポートされています。

  • E-MapReduce on ECS:StarRocks 2.1

  • StarRocks Community Edition がサポートされています。

    説明
    • DataWorks は内部ネットワーク経由でのみ StarRocks に接続します。したがって、StarRocks の Community Edition は E-MapReduce on ECS にデプロイする必要があります。

    • このデータソースで互換性の問題が発生した場合は、チケットを送信してください。

制限事項

  • MySQL から StarRocks へのデータベース全体のリアルタイム同期では、ターゲットの StarRocks テーブルはプライマリキーモデルを使用する必要があります。

  • MySQL から StarRocks へデータベース全体のリアルタイム同期を実行する場合、TRUNCATE 以外の DDL 操作はサポートされません。これらの DDL 操作を無視するか、タスクがエラーを報告するように設定することができます。

サポートされるデータ型

数値、文字列、日付のデータ型のみがサポートされています。

ネットワーク接続

EMR Serverless StarRocks

ネットワーク接続を確保するには、DataWorks リソースグループの IP アドレスを EMR Serverless StarRocks インスタンスの IP アドレス許可リストに追加する必要があります。

自己管理型 StarRocks

DataWorks リソースグループが、ご利用の StarRocks インスタンスのクエリポートFE ポート、および BE ポートにアクセスできることを確認してください。デフォルトのポートは 9030、8030、8040 です。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメーターの説明を参照して、各パラメーターの意味を理解することができます

ネットワーク環境に基づいて StarRocks の接続モードを選択します:

ユースケース 1:内部ネットワーク接続 (推奨)

内部ネットワーク接続は、パブリックネットワークアクセスを必要とせず、低レイテンシーと高セキュリティを提供します。

  • ユースケース:ご利用の StarRocks インスタンスと Serverless リソースグループが同じ VPC 内にある場合。

  • Alibaba Cloud インスタンスモードと接続文字列モードの両方がサポートされています

    • ApsaraDB for RDS を選択:同じ VPC 内の StarRocks インスタンスを直接選択します。システムが自動的に接続情報を取得します。

    • User-created Data Store with Public IP Addresses を選択:インスタンスの内部アドレスまたは IP アドレス、ポート、および Load URL を手動で入力します。

ユースケース 2:パブリックネットワーク接続

パブリックネットワーク経由のデータ転送にはセキュリティリスクが伴います。IP 許可リストや IP ベースの認証などのセキュリティ制御を使用してください。

  • ユースケース:リージョン間アクセスやオンプレミス環境からなど、パブリックネットワーク経由で StarRocks インスタンスにアクセスする必要がある場合。

  • 接続文字列モードのみがサポートされています。ご利用の StarRocks インスタンスでパブリックネットワークアクセスが有効になっていることを確認してください

    • User-created Data Store with Public IP Addresses を選択:インスタンスのパブリックアドレスまたは IP アドレス、ポート、および Load URL を手動で入力します。

説明

デフォルトでは、Serverless リソースグループはパブリックネットワークにアクセスできません。パブリックエンドポイントを使用して StarRocks インスタンスに接続するには、NAT Gateway と Elastic IP アドレス (EIP) をバインドされた VPC に設定して、パブリックネットワークアクセスを有効にする必要があります。また、Serverless リソースグループが、ご利用の StarRocks インスタンスのクエリポートFE ポート、および BE ポートにアクセスできることを確認する必要があります。デフォルトのポートは 9030、8030、8040 です。

Alibaba Cloud EMR StarRocks Serverless を使用している場合は、ホストアドレス/IPInternal Endpoint または Public network address に設定し、ポートを [クエリポート] に設定します。

  • FE:インスタンス詳細ページで FE 情報を取得できます。

    image.png

  • データベースEMR StarRocks Manager を使用してインスタンスに接続した後、SQL Editor または メタデータ管理 で対応するデータベースを表示できます。

    image.png

    説明

    データベースを作成するには、SQL エディターで直接 SQL コマンドを実行します。

データ同期タスクの開発

同期タスクの設定のエントリーポイントと手順については、以下の設定ガイドをご参照ください。

単一テーブルのバッチ同期

単一テーブルのリアルタイム同期

データベース全体のバッチ同期

データベース全体のリアルタイム同期

付録:コードとパラメーター

コードエディターを使用したバッチ同期タスクの設定

コードエディターを使用してバッチ同期タスクを設定する場合、統一されたスクリプト形式の要件に基づいて、スクリプト内の関連パラメーターを設定する必要があります。詳細については、「コードエディターの使用」をご参照ください。以下では、コードエディターを使用してバッチ同期タスクを設定する際に、データソースに対して設定する必要があるパラメーターについて説明します。

Reader コード例

{
    "stepType": "starrocks",
    "parameter": {
        "selectedDatabase": "didb1",
        "datasource": "starrocks_datasource",
        "column": [
            "id",
            "name"
        ],
        "where": "id>100",
        "table": "table1",
        "splitPk": "id"
    },
    "name": "Reader",
    "category": "reader"
}

Reader パラメーター

パラメーター

説明

必須

デフォルト

datasource

StarRocks データソースの名前。

はい

なし

selectedDatabase

StarRocks データベースの名前。

いいえ

StarRocks データソースを設定したときに指定したデータベース名。

column

同期対象として設定されたテーブル内の列名のコレクション。StarRocks からデータを読み取る際に SET_VAR ヒントを追加したい場合は、column の最初の列名の前にヒントを追加できます。例えば、同期する列が id で、SET_VAR(enable_spill = true) を追加したい場合、column を [ "/*+ SET_VAR(enable_spill = true)*/ id"] のように設定します。

はい

なし

where

実際のビジネスシナリオでは、当日のデータを同期するための一般的なフィルターは、where 句を gmt_create>${bizdate} のように指定することです。

  • where 句を使用して、増分ビジネスデータを効率的に同期できます。

  • where 句、または where のキーや値を指定しない場合、データ同期は完全なデータ同期として扱われます。

いいえ

なし

table

ソーステーブル。

はい

なし

splitPk

StarRocks Reader がデータを抽出する際に、splitPk パラメーターを指定して、splitPk に指定されたフィールドに基づいてデータをシャーディングできます。これにより、同時データ同期タスクが開始され、効率が向上します。テーブルのプライマリキーは通常均等に分散されているため、結果として得られるシャードでのデータホットスポットを防ぐのに役立つため、分割キーとしてテーブルのプライマリキーを使用することを推奨します。

いいえ

なし

Writer コード例

{
    "stepType": "starrocks",
    "parameter": {
        "selectedDatabase": "didb1",
        "loadProps": {
            "row_delimiter": "",
            "column_separator": ""
        },
        "datasource": "starrocks_public",
        "column": [
            "id",
            "name"
        ],
        "loadUrl": [
            "1.1.X.X:8030"
        ],
        "table": "table1",
        "preSql": [
            "truncate table table1"
        ],
        "postSql": [
        ],
        "maxBatchRows": 500000,
        "maxBatchSize": 5242880,
        "strategyOnError": "exit"
    },
    "name": "Writer",
    "category": "writer"
}

Writer パラメーター

パラメーター

説明

必須

デフォルト値

datasource

StarRocks データソースの名前。

はい

なし

selectedDatabase

StarRocks データベースの名前。

いいえ

StarRocks データソースを設定したときに指定したデータベース名。

loadProps

StarRocks StreamLoad ジョブのリクエストパラメーター。CSV 形式でデータをインポートするためのインポートパラメーターを設定できます。特別な設定が不要な場合は、このパラメーターを {} に設定します。以下のパラメーターが利用可能です:

  • column_separator:CSV インポートの列区切り文字。デフォルト値は \t です。

  • row_delimiter:CSV インポートの行区切り文字。デフォルト値:

データに \t や \n が含まれている場合は、他の文字を区切り文字として指定する必要があります。特殊文字の使用例を以下に示します:

{"column_separator":"","row_delimiter":""}

StreamLoad は JSON 形式でのデータインポートもサポートしています。format パラメーターを json に設定できます:

{
  "format": "json"
}

JSON 形式では以下のパラメーターを設定できます:

  • strip_outer_array:最も外側の配列構造を削除するかどうかを指定します。有効な値:true および false。デフォルト値:false

    実際のシナリオでは、インポートする JSON データが外側の配列構造 [] で囲まれている場合があります。この状況では、このパラメーターを true に設定することを推奨します。StarRocks は外側の角括弧 [] を削除し、括弧内の各要素を個別のデータ行としてインポートします。このパラメーターを false に設定すると、StarRocks は JSON データ全体を単一の配列として解析し、単一のデータ行としてインポートします。

    例えば、インポートする JSON データは次のとおりです:

    [{"category":1,"author":2},{"category":3,"author":4}]
    
    • このパラメーターを true に設定すると、StarRocks は {"category":1,"author":2}{"category":3,"author":4} を 2 つの別々の行として解析し、ターゲットの StarRocks テーブルにインポートします。

    • このパラメーターを false に設定すると、StarRocks は JSON 配列全体を単一のデータ行として解析し、ターゲットの StarRocks テーブルにインポートします。

  • ignore_json_size:HTTP リクエスト内の JSON 本文のサイズをチェックするかどうかを指定します。

    説明

    デフォルトでは、HTTP リクエスト内の JSON 本文のサイズは 100 MB を超えることはできません。JSON 本文のサイズが 100 MB を超えると、エラー The size of this batch exceed the max size [104857600] of json type data data [8617627793].Set ignore_json_size to skip check,although it may lead huge memory consuming. が報告されます。このエラーを防ぐために、ignore_json_size: true 設定を HTTP リクエストヘッダーに追加して、JSON 本文のサイズチェックをバイパスできます。

  • compression:StreamLoad データ転送プロセス中に使用する圧縮アルゴリズムを指定します。サポートされているアルゴリズムは GZIPBZIP2LZ4_FRAME、および ZSTD です。

  • strict_mode:厳格モードを有効にするかどうかを指定します。

    有効な値:

    • true:厳格モードを有効にします。StarRocks は不正なデータ行をフィルタリングし、正しいデータ行のみをインポートし、不正なデータの詳細を返します。

    • false:厳格モードを無効にします。StarRocks は変換に失敗したフィールドを NULL 値に変換し、NULL 値を含むエラー行を有効なデータ行とともにインポートします。

    デフォルト値:false

はい

なし

column

データを書き込む先の列。

はい

なし

loadUrl

StarRocks FrontEnd の IP と HTTP ポート (デフォルトは 8030) を入力します。複数の FrontEnd ノードがある場合は、すべてを入力し、エントリをカンマ (,) で区切ることができます。

はい

なし

table

ターゲットテーブル。

はい

なし

preSql

同期タスクが開始される前に実行する SQL 文。例えば、TRUNCATE TABLE tablename を使用して、テーブル内の既存のデータをクリアできます。

いいえ

なし

postSql

同期タスクが終了した後に実行する SQL 文。

いいえ

なし

maxBatchRows

書き込みバッチあたりの最大行数。

いいえ

500000

maxBatchSize

書き込みバッチあたりの最大データサイズ (バイト単位)。

いいえ

5242880

strategyOnError

バッチ書き込み中の例外を処理するためのポリシー。

有効な値:

  • exit:StarRocks へのデータ書き込み中に例外が発生した場合、同期タスクは失敗して終了します。

  • batchDirtyData:StarRocks へのデータ書き込み中に例外が発生した場合、現在のデータバッチはダーティデータとして記録されます。

デフォルト値:exit

いいえ

exit