さまざまなビジネスシナリオの要件をより適切に満たすために、StarRocks は複数のデータモデルをサポートしています。StarRocks に保存されるデータは、特定のモデルに基づいて編成する必要があります。このトピックでは、さまざまなインポート方法の基本的な概念、原則、システム構成、およびシナリオについて説明します。ベストプラクティスと FAQ もこのトピックで提供されます。
背景情報
データインポート機能を使用すると、対応するモデルに基づいて生データをクレンジングおよび変換し、後続のデータクエリと使用のために StarRocks にインポートできます。 StarRocks は複数のインポート方法をサポートしています。インポートするデータ量またはインポート頻度に基づいて、ビジネス要件を満たすインポート方法を使用できます。
次の図は、さまざまなデータソースと、StarRocks でサポートされている対応するインポート方法を示しています。
さまざまなデータソースに基づいて適切なインポート方法を使用できます。
オフラインデータのインポート:データソースが Hive または Hadoop Distributed File System(HDFS)の場合、Broker Load インポート方法を使用することをお勧めします。詳細については、「Broker Load」をご参照ください。多数のデータテーブルをインポートする必要がある場合は、パフォーマンスが Broker Load インポート方法よりも劣る Hive 外部テーブルを使用できます。ただし、この場合はデータ移行は必要ありません。単一のテーブルに大量のデータが含まれている場合、またはテーブルをグローバルデータディクショナリとして使用するために正確な重複排除が必要な場合は、Spark Load インポート方法を使用できます。詳細については、「Spark Load」をご参照ください。
リアルタイムでのデータのインポート:データベースのログデータとバイナリログを Kafka に同期した後、Routine Load インポート方法を使用してデータを StarRocks にインポートすることをお勧めします。詳細については、「Routine Load」をご参照ください。複数テーブルの結合と抽出、変換、ロード(ETL)が必要な場合は、Flink コネクタを使用して事前にデータを処理し、Stream Load インポート方法を使用して StarRocks にデータを書き込むことができます。詳細については、「Flink connector」および「Stream Load」をご参照ください。
プログラムを介した StarRocks へのデータのインポート:Stream Load インポート方法を使用することをお勧めします。詳細については、「Stream Load」をご参照ください。Stream Load のトピックにある Java または Python のデモを参照できます。
テキストファイルのインポート:Stream Load インポート方法を使用することをお勧めします。詳細については、「Stream Load」をご参照ください。
MySQL からのデータのインポート:MySQL 外部テーブルを使用し、
insert into new_table select * from external_tableステートメントを実行してデータをインポートすることをお勧めします。StarRocks 内のデータのインポート:INSERT INTO インポート方法を選択し、外部スケジューラを使用して単純な ETL 処理を実装することをお勧めします。詳細については、「INSERT INTO」をご参照ください。
このトピックの画像と一部の情報は、オープンソース StarRocks のデータロードの概要からのものです。
使用上の注意
StarRocks にデータをインポートするために、プログラム接続が頻繁に確立されます。StarRocks にデータをインポートする際は、次の項目に注意してください。
適切なインポート方法を選択します。データ量、インポート頻度、またはデータソースの場所に基づいて、インポート方法を選択できます。
たとえば、生データが HDFS に保存されている場合は、Broker Load メソッドを使用してデータをインポートできます。
インポート方法のプロトコルを決定します。Broker Load インポート方法を使用する場合、MySQL プロトコルを使用して、外部システムが定期的にインポートジョブを送信および表示できるようにする必要があります。
データインポートのモードを決定します。データインポートは、同期モードと非同期モードに分類されます。非同期モードを使用する場合、外部システムはコマンドを実行してデータインポートを表示し、コマンドの結果に基づいてインポートが成功したかどうかを判断する必要があります。
ラベル生成ポリシーを設定します。ラベル生成ポリシーは、各ジョブでインポートされるデータが一意で静的である必要があるという原則を満たしている必要があります。
Exactly-Once 配信を確保します。外部システムはデータインポートの少なくとも 1 回の配信を保証し、StarRocks のラベルメカニズムはデータインポートの最大 1 回の配信を保証します。このようにして、データインポート全体で Exactly-Once 配信を実現できます。
用語
用語 | 説明 |
インポートジョブ | 指定されたデータソースからデータを読み取り、データをクレンジングおよび変換してから、StarRocks にインポートします。データがインポートされると、データをクエリできます。 |
ラベル | インポートジョブを識別します。すべてのインポートジョブにはラベルがあります。 ラベルはユーザーが指定することも、StarRocks によって自動的に生成することもできます。ラベルはデータベース内で一意です。各ラベルは、1 つの正常なインポートジョブにのみ使用できます。インポートジョブが完了した後、このインポートジョブのラベルを再利用して別のインポートジョブを送信することはできません。失敗したインポートジョブのラベルのみを再利用できます。このメカニズムは、特定のラベルに関連付けられたデータが 1 回だけインポートされるようにするのに役立ちます。このようにして、最大 1 回の配信セマンティクスが実装されます。 |
原子性 | StarRocks によって提供されるすべてのロード方法は、原子性を保証できます。原子性とは、ジョブ内の適格データがすべて正常にインポートされるか、適格データがまったく正常にインポートされないかのいずれかであることを意味します。適格データの一部がロードされ、他のデータがロードされないということはありません。適格データには、データ型変換エラーなどの品質の問題により除外されたデータは含まれません。 |
MySQL および HTTP プロトコル | StarRocks は、インポートジョブの送信に使用できる 2 つの通信プロトコル(MySQL と HTTP)をサポートしています。 |
Broker Load | デプロイされたブローカーを使用して HDFS などの外部データソースからデータを読み取り、StarRocks にインポートします。ブローカープロセスは、その計算リソースを使用してデータを前処理およびインポートします。 |
Spark Load | Spark などの外部リソースを使用してデータを再処理し、中間ファイルを生成します。StarRocks は中間ファイルを読み取り、インポートします。Spark Load は非同期インポート方法です。MySQL プロトコルを使用してインポートジョブを作成し、特定のコマンドを実行してインポート結果を確認する必要があります。 |
FE | フロントエンド(FE)ノードは、StarRocks のメタデータおよびスケジューリングノードです。データインポートプロセスでは、FE ノードを使用してインポート実行プランを生成し、インポートジョブをスケジュールします。 |
BE | バックエンド(BE)ノードは、StarRocks の計算およびストレージノードです。 BE ノードは、データの ETL 操作を実行し、データを保存するために使用されます。 |
タブレット | StarRocks テーブルの論理シャード。テーブルは、パーティションルールとバケットルールに基づいて複数のタブレットに分割できます。詳細については、「データ分散」をご参照ください。 |
仕組み
次の図は、インポートジョブの実行方法を示しています。
次の表は、インポートジョブの 5 つのステージについて説明しています。
ステージ | 説明 |
PENDING | オプション。ジョブが送信され、FE ノードによるスケジュールを待機しています。 このステージは、Broker Load または Spark Load インポート方法を使用するジョブに含まれています。 |
ETL | オプション。データは前処理されます。これには、クレンジング、パーティション分割、ソート、および集計が含まれます。 Spark Load にはこの手順が含まれています。彼は外部計算リソース Spark を使用して ETL を完了します。 |
LOADING | データは最初にクレンジングおよび変換され、処理のために BE ノードに送信されます。すべてのデータがインポートされると、データはキューに入れられ、有効になるのを待機します。このとき、ジョブのステータスは LOADING のままです。 |
FINISHED | すべてのデータが有効になると、ジョブのステータスは FINISHED になります。このとき、データをクエリできます。FINISHED は最終的なジョブ状態です。 |
CANCELLED | ジョブのステータスが FINISHED になる前に、いつでもジョブをキャンセルできます。StarRocks は、インポートエラーが発生した場合にもジョブを自動的にキャンセルできます。CANCELLED も最終的なジョブ状態です。 |
次の表は、サポートされているデータ型について説明しています。
型 | 説明 |
整数 | TINYINT、SMALLINT、INT、BIGINT、および LARGEINT。例:1、1000、1234。 |
浮動小数点 | FLOAT、DOUBLE、および DECIMAL。例:1.1、0.23、0.356。 |
日付 | DATE および DATETIME。例:2017-10-03 および 2017-06-13 12:34:03。 |
文字列 | CHAR および VARCHAR。例:"I am a student" および "a"。 |
インポート方法
さまざまなデータインポート要件を満たすために、StarRocks は HDFS、Kafka、ローカルファイルなどのさまざまなデータソースから、またはさまざまな方法でデータをインポートするための 5 つの方法を提供します。StarRocks は同期および非同期インポートモードをサポートしています。
すべてのインポート方法は CSV データ形式をサポートしています。Broker Load は Parquet および ORC データ形式もサポートしています。
データインポート方法
インポート方法 | 説明 | インポートモード |
Broker Load | ブローカープロセスを使用して外部ソースからデータを読み取り、MySQL プロトコルを使用して StarRocks にデータをインポートするジョブを作成します。送信されたジョブは非同期で実行されます。 Broker Load インポート方法は、インポートするデータがブローカープロセスがアクセスできるストレージシステム(HDFS など)にある場合に適用できます。インポートデータ量は数十 GB から数百 GB の範囲です。詳細については、「Broker Load」をご参照ください。 | 非同期 |
Spark Load | 外部 Spark リソースを使用してインポートされたデータを前処理します。これにより、大量のデータをインポートする際の StarRocks のパフォーマンスが向上し、StarRocks クラスタの計算リソースが節約されます。Spark Load は非同期インポート方法です。MySQL プロトコルを使用してインポートジョブを作成し、 Spark Load は、大量のデータが初めて StarRocks に移行され、データが Spark クラスタがアクセスできるストレージシステム(HDFS など)にある場合に適しています。テラバイト単位のデータをインポートできます。詳細については、「Spark Load」をご参照ください。 | 非同期 |
Stream Load | 同期インポート方法。HTTP リクエストを送信してローカルファイルまたはデータストリームを StarRocks にインポートし、システムがインポート結果ステータスを返すのを待機できます。戻り値に基づいて、インポートが成功したかどうかを判断できます。 Stream Load は、ローカルファイルのインポート、またはプログラムを使用したデータストリームからのデータのインポートに適用できます。詳細については、「Stream Load」をご参照ください。 | 同期 |
Routine Load | 指定されたデータソースからデータを自動的にインポートします。MySQL プロトコルを使用してルーチンインポートジョブを送信できます。次に、常駐スレッドが生成され、Kafka などのデータソースから継続的にデータを読み取り、StarRocks にインポートします。詳細については、「Routine Load」をご参照ください。 | 非同期 |
INSERT INTO | このインポート方法は、MySQL の INSERT ステートメントと同様の方法で使用されます。StarRocks では、 | 同期 |
インポートモード
外部プログラムを使用してデータをインポートする場合は、インポート方法を決定する前に、ビジネス要件に最適なインポートモードを選択する必要があります。
同期モード
同期モードでは、外部システムがインポートジョブを作成した後、StarRocks はジョブを同期的に実行します。インポートジョブが完了すると、StarRocks はインポート結果を返します。外部システムは、戻り値に基づいて、インポートが成功したかどうかを判断できます。
手順:
外部システムがインポートジョブを作成します。
StarRocks はインポート結果を返します。
外部システムはインポート結果を判断します。インポートジョブが失敗した場合、別のインポートジョブを作成できます。
非同期モード
非同期モードでは、外部システムがインポートジョブを作成した後、StarRocks はジョブが作成されたことを示す結果を返します。これは、データがインポートされたことを意味するわけではありません。インポートジョブは非同期で実行されます。ジョブが作成されると、外部システムは特定のコマンドを実行してインポートジョブのステータスをポーリングします。インポートジョブの作成に失敗した場合、外部システムは失敗情報に基づいて別のインポートジョブを作成するかどうかを判断できます。
手順:
外部システムがインポートジョブを作成します。
StarRocks はインポートジョブの作成結果を返します。
外部システムは、戻り値に基づいて次の手順に進むかどうかを判断します。インポートジョブが作成された場合は、手順 4 に進みます。インポートジョブの作成に失敗した場合は、手順 1 に戻り、インポートジョブを再度作成します。
外部システムは、ステータスが FINISHED または CANCELLED に変わるまで、インポートジョブのステータスをポーリングします。
シナリオ
シナリオ | 説明 |
HDFS からのデータのインポート | インポートするデータが HDFS に保存されており、データ量が数十 GB から数百 GB の範囲の場合は、Broker Load メソッドを使用して StarRocks にデータをインポートできます。この場合、HDFS データソースは、デプロイされたブローカープロセスがアクセスできる必要があります。インポートジョブは非同期で実行されます。 インポートするデータが HDSF に保存されており、テラバイト単位のデータをインポートする必要がある場合は、Spark Load メソッドを使用して StarRocks にデータをインポートできます。この場合、HDFS データソースは、デプロイされた Spark プロセスがアクセスできる必要があります。インポートジョブは非同期で実行されます。 Broker Load または Spark Load メソッドは、ブローカーまたは Spark プロセスがデータソースからデータを読み取ることができる場合にのみ、他の外部データソースからのデータのインポートをサポートします。 |
ローカルファイルのインポート | データがローカルファイルに保存されており、データ量が 10 GB 未満の場合は、Stream Load メソッドを使用してデータを StarRocks にすばやくインポートできます。インポートジョブは HTTP プロトコルを使用して作成され、同期的に実行されます。HTTP リクエストの戻り値に基づいて、インポートが成功したかどうかを判断できます。 |
Kafka からのデータのインポート | Kafka などのストリーミングデータソースから StarRocks にデータをリアルタイムでインポートする場合は、Routine Load メソッドを使用できます。MySQL プロトコルを使用してルーチンインポートジョブを作成できます。StarRocks は Kafka から継続的にデータを読み取り、インポートします。 |
INSERT INTO メソッドを使用したデータのインポート |
|
メモリ制限
パラメータを設定して、単一インポートジョブのメモリ使用量を制限できます。これにより、インポートジョブが過剰なメモリを占有してメモリ不足(OOM)エラーが発生するのを防ぎます。さまざまなインポート方法を採用するジョブのメモリ使用量を制限するために使用される方法はさまざまです。詳細については、各インポート方法の対応するトピックを参照してください。
ほとんどの場合、インポートジョブは複数の BE ノードで実行されます。クラスタ全体ではなく、単一 BE ノードでのインポートジョブのメモリ使用量を制限するパラメータを設定できます。さらに、各 BE ノードでインポートジョブが使用できる最大メモリサイズを指定できます。詳細については、このトピックの一般的なシステム構成セクションを参照してください。システム構成は、BE ノードで実行されるすべてのインポートジョブのメモリ使用量の全体的な上限を設定します。
メモリ使用量が上限に達するとデータが頻繁にディスクに書き込まれるため、メモリ使用量の下限がインポート効率に影響を与える可能性があります。ただし、メモリ使用量の上限が過度に高いと、インポートの同時実行性が高い場合に OOM エラーが発生する可能性があります。したがって、ビジネス要件に基づいてメモリ関連パラメータを適切な値に設定する必要があります。
一般的なシステム構成
FE ノードの構成
次の表は、FE ノードのシステム構成パラメータについて説明しています。FE ノードの構成ファイル fe.conf でパラメータを変更できます。
パラメータ | 説明 |
max_load_timeout_second | 最大および最小インポートタイムアウト期間。単位:秒。デフォルトでは、最大タイムアウト期間は 3 日、最小タイムアウト期間は 1 秒です。指定するインポートタイムアウト期間はこの範囲内である必要があります。このパラメータは、すべてのタイプのインポートジョブに有効です。 |
min_load_timeout_second | |
desired_max_waiting_jobs | 待機キューが収容できるインポートジョブの最大数。デフォルト値:100。 たとえば、FE ノードの PENDING 状態のインポートジョブの数がこのパラメータの値に達すると、新しいインポートリクエストは拒否されます。PENDING 状態は、インポートジョブが実行されるのを待機していることを示します。このパラメータは、非同期インポートジョブにのみ有効です。PENDING 状態の非同期インポートジョブの数が上限に達すると、後続のインポートジョブの作成リクエストは拒否されます。 |
max_running_txn_num_per_db | 各データベースで許可される進行中のインポートジョブの最大数。デフォルト値:100。 データベースで実行されるインポートジョブの数が指定した最大数に達すると、後続のインポートジョブは実行されません。この状況で同期インポートジョブが送信されると、ジョブは拒否されます。非同期インポートジョブが送信されると、ジョブはキューで待機します。 |
label_keep_max_second | インポートジョブの履歴レコードの保持期間。 StarRocks は、完了し、FINISHED または CANCELLED 状態にあるインポートジョブのレコードを一定期間保持します。このパラメータを使用して保持期間を設定できます。デフォルトの保持期間は 3 日です。このパラメータは、すべてのタイプのインポートジョブに有効です。 |
BE ノードの構成
次の表は、BE ノードのシステム構成パラメータについて説明しています。BE ノードの構成ファイル be.conf でパラメータを変更できます。
パラメータ | 説明 |
push_write_mbytes_per_sec | BE ノードのタブレットあたりの最大書き込み速度。デフォルト値は 10 で、書き込み速度が 10 MB/s であることを示します。 ほとんどの場合、最大書き込み速度はスキーマと使用されるシステムに基づいて 10 MB/s から 30 MB/s の範囲です。このパラメータの値を変更して、データインポート速度を制御できます。 |
write_buffer_size | 最大メモリブロックサイズ。インポートされたデータは最初に BE ノードのメモリブロックに書き込まれます。インポートされたデータ量が指定した最大メモリブロックサイズに達すると、データはディスクに書き込まれます。デフォルトサイズは 100 MB です。 最大メモリブロックサイズが非常に小さい場合、BE ノードに多数の小さなファイルが生成される可能性があります。最大メモリブロックサイズを増やすと、生成されるファイルの数を減らすことができます。最大メモリブロックサイズが過度に大きい場合、リモートプロシージャコール(RPC)がタイムアウトする可能性があります。詳細については、tablet_writer_rpc_timeout_sec パラメータの説明を参照してください。 |
tablet_writer_rpc_timeout_sec | インポートプロセス中にバッチデータ(1024 行)を送信するための RPC タイムアウト期間。デフォルト値:600。単位:秒。 RPC には、複数のタブレットのメモリブロック内のデータをディスクに書き込む操作が含まれる場合があります。この場合、ディスク書き込み操作が原因で RPC タイムアウトが発生する可能性があります。RPC タイムアウト期間を調整して、send batch fail エラーなどのタイムアウトエラーを減らすことができます。write_buffer_size パラメータを高い値に設定する場合は、tablet_writer_rpc_timeout_sec パラメータの値も増やす必要があります。 |
streaming_load_rpc_max_alive_time_sec | 各 Writer スレッドの待機タイムアウト期間。データインポートプロセス中に、StarRocks は Writer スレッドを起動して、各タブレットからデータを受信し、各タブレットにデータを書き込みます。デフォルト値:600。単位:秒。 Writer プロセスが指定した待機タイムアウト期間内にデータを受信しない場合、StarRocks は Writer スレッドを自動的に破棄します。システムが低速でデータを処理する場合、Writer スレッドは長期間にわたって次のバッチのデータを受信しない可能性があり、そのため |
load_process_max_memory_limit_percent | 各 BE ノードのすべてのインポートジョブに使用できるメモリの最大量と最大パーセンテージ。StarRocks は、2 つのパラメータの値のうち小さい方のメモリ消費量を、許可される最終的なメモリ消費量として識別します。
|
load_process_max_memory_limit_bytes |