すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:FAQ

最終更新日:Jan 11, 2025

このトピックでは、StarRocksへのデータインポートに関するよくある質問への回答を提供します。

「close index channel failed」または「too many tablet versions」エラーが発生した場合はどうすればよいですか?

  • 原因

    データインポート頻度が高いため、データがタイムリーにマージされません。その結果、マージされていないデータのバージョンの数が上限を超えています。

  • 解決策
    デフォルトでは、マージされていないデータの最大バージョン数は 1,000 です。次のいずれかの方法を使用してエラーを修正します。
    • 各バッチでインポートするデータ量を増やして、データインポート頻度を減らします。
    • バックエンド(BE)ノードの be.conf 構成ファイルで次のパラメータを指定された値に設定して、マージポリシーを変更します。これは、データのマージ速度を向上させるのに役立ちます。
      cumulative_compaction_num_threads_per_disk = 4
      base_compaction_num_threads_per_disk = 2
      cumulative_compaction_check_interval_seconds = 2

「Label Already Exists」エラーが発生した場合はどうすればよいですか?

  • 説明

    同じラベルを使用するインポートジョブが、StarRocksクラスタの同じデータベースで正常に実行されているか、実行中です。

  • 原因

    Stream Loadは、HTTPプロトコルを介してインポートジョブのリクエストを送信します。ほとんどの場合、さまざまな言語のHTTPクライアントは、事前に構成されたリクエスト再試行ロジックを使用します。StarRocksクラスタがリクエストを受信した後、Stream LoadモードでデータのStarRocksクラスタへのインポートが開始されます。インポート結果はHTTPクライアントにタイムリーに返されません。その結果、HTTPクライアントは同じリクエストの送信を再試行します。HTTPクライアントが2番目のリクエストを送信すると、StarRocksクラスタが最初のリクエストを処理しているため、Label Already Exists エラーが返されます。

  • 解決策
    次のいずれかの方法を使用して、異なるデータインポート方法を使用するジョブに同じラベルが割り当てられているかどうか、またはインポートジョブを繰り返し送信したかどうかを確認します。
    • インポートジョブのラベルに基づいてプライマリフロントエンド(FE)ノードのログを検索し、ラベルが2回表示されるかどうかを確認します。ラベルが2回表示される場合、HTTPクライアントはリクエストを2回送信しました。
      説明 StarRocksクラスタ内のインポートジョブのラベルは、データインポート方法に基づいて区別できません。その結果、異なるデータインポート方法を使用するジョブに同じラベルが割り当てられる場合があります。
    • SHOW LOAD WHERE LABEL = "xxx" ステートメントを実行して、FINISHED状態のインポートジョブが同じラベルを使用しているかどうかを確認します。ステートメントでは、xxx は、確認する必要があるラベルの文字列です。

    インポートするデータのサイズに基づいておおよそのインポート期間を計算し、インポートタイムアウト期間に基づいてHTTPクライアントのリクエストのタイムアウト期間を増やすことをお勧めします。これは、クライアントによってリクエストが繰り返し送信されるのを防ぐのに役立ちます。

「ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel」エラーが発生した場合はどうすればよいですか?

SHOW LOAD ステートメントを実行します。次に、ステートメントによって返される情報でURLを見つけ、URLのエラー情報を確認します。次の一般的なエラーが発生する可能性があります。
  • convert csv string to INT failed.

    インポートするファイルの列の文字列を特定のデータ型のデータに変換できません。たとえば、文字列abcを数値に変換できません。

  • the length of input is too long than schema.

    インポートするファイルの列の長さが指定された制限を超えています。たとえば、文字列の長さがテーブルの作成時に指定された固定長を超えているか、INT型のフィールドの値の長さが4バイトを超えています。

  • actual column number is less than schema column number.

    インポートするファイルの行が指定された区切り文字に基づいて分割された後、生成された列の数が指定された列の数よりも少なくなります。このエラーは、無効な区切り文字が原因で発生する可能性があります。

  • actual column number is more than schema column number.

    インポートするファイルの行が指定された区切り文字に基づいて分割された後、生成された列の数が指定された列の数よりも多くなります。

  • the frac part length longer than schema scale.

    インポートするファイルのDECIMAL型の列の値の小数部分が指定された長さを超えています。

  • the int part length longer than schema precision.

    インポートするファイルのDECIMAL型の列の値の整数部分が指定された長さを超えています。

  • there is no corresponding partition for this key.

    インポートするファイルの行のパーティションキー列の値がパーティション範囲内にありません。

「ERROR 1064 (HY000): Failed to find enough host in all backends. need: 3」エラーが発生した場合はどうすればよいですか?

テーブルを作成するときに、テーブルのプロパティに "replication_num" = "1" を追加します。

データをインポートするときにBEノードのログに「Too many open files」エラーが発生した場合はどうすればよいですか?

この問題を解決するには、次の手順を実行します。
  1. システムのファイルハンドルの数を変更します。
  2. base_compaction_num_threads_per_disk パラメータと cumulative_compaction_num_threads_per_disk パラメータの値を減らします。パラメータのデフォルト値は 1 です。構成項目の変更方法については、「構成項目の変更」をご参照ください。
  3. 問題が解決しない場合は、クラスタをスケールアウトするか、データインポート頻度を減らすことをお勧めします。

「increase config load_process_max_memory_limit_percent」エラーが発生した場合はどうすればよいですか?

データをインポートするときに次の図のようなエラーメッセージが表示された場合は、load_process_max_memory_limit_bytes パラメータと load_process_max_memory_limit_percent パラメータの値を確認して増やすことをお勧めします。構成項目の変更方法については、「構成項目の変更」をご参照ください。tablet open failed

データインポート中にリモートプロシージャコール(RPC)タイムアウトが発生した場合はどうすればよいですか?

BEノードの be.conf ファイルの write_buffer_size パラメータの値を確認します。このパラメータは、BEノードのメモリブロックの最大サイズを指定するために使用されます。このパラメータのデフォルト値は 100 MB です。値が大きすぎると、RPCタイムアウトが発生する可能性があります。この場合、BEノードの構成ファイルの tablet_writer_rpc_timeout_sec パラメータの値に基づいて write_buffer_size パラメータを変更する必要があります。BEノードの構成ファイルの他のパラメータの詳細については、「パラメータ構成」をご参照ください。

エラー「値の数が列数と一致しません」が発生した場合の対処方法

  • 説明
    インポートジョブが失敗し、エラー詳細の URL に「値の数が列数と一致しません」というエラーメッセージが表示されます。このエラーメッセージは、ソースデータの解析によって取得された列の数が、ターゲットテーブルの列の数と一致しないことを示しています。
    Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:00Z,cpu0,80.99
    Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:10Z,cpu1,75.23
    Error: Value count does not match column count. Expect 3, but got 1. Row: 2023-01-01T18:29:20Z,cpu2,59.44
  • 原因

    インポートコマンドまたはインポートステートメントで指定した列区切り文字が、ソースデータで使用されている列区切り文字と一致していません。上記の例では、ソースデータは CSV 形式で、3 つの列が含まれています。ソースデータは列区切り文字としてカンマ(,)を使用していますが、インポートコマンドまたはステートメントは列区切り文字としてタブ文字(\t)を使用しています。その結果、ソースデータの列のデータが 1 つの列のデータに解析されます。

  • 解決策

    インポートコマンドまたはインポートステートメントの列区切り文字をカンマ(,)に変更し、データを再度インポートします。

インポート方法を選択するにはどうすればよいですか?

インポート方法の選択方法の詳細については、「概要」をご参照ください。

「Value count does not match column count」エラーが発生した場合はどうすればよいですか?

ほとんどの場合、インポートのパフォーマンスには次の要因が影響します。
  • サーバーメモリ

    タブレットの数が多いほど、消費されるメモリリソースが多くなります。タブレットの数を決定する方法に記載されている手順に従って、単一タブレットのサイズを見積もることをお勧めします。

  • ディスク I/O 容量とネットワーク帯域幅

    50 Mbit/s から 100 Mbit/s のネットワーク帯域幅が適しています。

  • バッチサイズとインポート頻度
    • Stream Load インポート方式を使用する場合は、インポートバッチサイズを 10 MB から 100 MB の範囲の値に設定することをお勧めします。
    • Broker Load インポート方式では、バッチサイズの要件はありません。このインポート方式は、インポートバッチサイズが大きいシナリオで使用できます。
    • インポート頻度は非常に高くすることはできません。シリアルATA(SATA) HDD の場合は、1 秒あたり 2 つ以上のインポートジョブを実行しないでください。

テキストファイルの最初の行にある列名を Stream Load は識別できますか? Stream Load はテキストファイルの最初の行のデータを無視できますか?

いいえ、Stream Load はテキストファイルの最初の行にある列名を識別できません。最初の行のデータは、Stream Load にとって通常のデータです。Stream Load はテキストファイルの最初の行のデータを無視できません。インポートするテキストファイルの最初の行に列名が存在する場合は、次のいずれかの方法を使用して問題をトラブルシューティングできます。
  • エクスポートツールの設定を変更します。この方法では、列名なしでテキストファイルをエクスポートできます。
  • sed -i '1d' filename コマンドを実行して、テキストファイルの最初の行のデータを削除します。
  • Stream Load のステートメントに -H "where: column name! ='Column name' " を追加して、最初の行のデータをフィルタリングします。

    システムは、最初の行の文字列を特定のデータ型のデータに変換してから、データをフィルタリングします。変換に失敗した場合、null が返されます。この方法を使用する場合は、StarRocks テーブルの列に NOT NULL プロパティが設定されていないことを確認する必要があります。

  • Stream Load のステートメントに -H "max_filter_ratio:0.01" を追加して、フォールトトレラント率を 1% 以下に設定します。インポートジョブで少なくとも 1 つのエラー行が許可されていることを確認してください。この方法では、最初の行で発生するエラーを無視できます。インポートするデータの実際の量に基づいて、より低いフォールトトレラント率を指定することもできます。フォールトトレラント率を指定した後、返された結果の ErrorURL は依然としてエラーが発生したことを示しますが、インポートジョブは成功します。高いフォールトトレラント率を指定しないことをお勧めします。そうしないと、他のデータエラーが無視される可能性があります。

Stream Load を使用して StarRocks にデータをインポートする場合、標準の DATE 型または INT 型ではないパーティションキー列のデータのデータ型を変換する必要がありますか?

はい。StarRocks を使用して、データのインポートプロセス中にデータ型を変換できます。

たとえば、インポートする TEST ファイルが CSV 形式で、NO、DATE、VERSION、PRICE の各列が含まれているが、DATE 列のデータが 202106.00 形式であるとします。 StarRocks で使用する列が DATE の場合、StarRocks に NO、VERSION、PRICE、DATE の各列を含むテーブルを作成する必要があります。 また、DATE 列のデータのデータ型を DATE、DATETIME、または INT として指定する必要があります。 その後、Stream Load のステートメントで次の設定を構成して、列間でデータ型の変換を実行できます。
-H "columns: NO,DATE_1, VERSION, PRICE, DATE=LEFT(DATE_1,6)" // 列:NO、DATE_1、VERSION、PRICE、DATE=LEFT(DATE_1、6)

上記のステートメントでは、DATE_1 はデータを取得するためのプレースホルダーと見と見なすことができます。 その後、LEFT() 関数が呼び出されてデータ型の変換が実行され、変換されたデータが StarRocks テーブルの DATE 列に割り当てられます。 変換操作を実行するために関数を呼び出す前に、CSV ファイル内のすべての列の一時的な名前をリストする必要があることに注意してください。 列間でデータ型の変換を実行するために使用できる関数は、非集計関数やウィンドウ関数を含むスカラー関数です。

インポート方法を選択するにはどうすればよいですか?

  • 原因

    ソースデータファイルのサイズが、Stream Loadでサポートされている最大ファイルサイズ(10 GB)を超えています。

  • 解決策
    • seq -w 0 n コマンドを実行して、ファイルを分割します。
    • curl -XPOST http:///be_host:http_port/api/update_config?streaming_load_max_mb=<file_size> コマンドを実行して、streaming_load_max_mb パラメーターの値を変更し、ファイルの最大サイズを大きくします。BEノードの構成ファイルにある他のパラメーターの詳細については、「パラメーター設定」をご参照ください。

インポートのパフォーマンスに影響を与える要因は何ですか?

方法 1:タスクの並列処理を増やす

説明 このメソッドは、より多くの CPU リソースを消費する可能性があり、バージョンが多すぎるデータがインポートされます。
インポートジョブを実行するために複数のタスクに分割します。タスクの並列度は、次の式に基づいて計算されます。タスクの最大並列度は、稼働中の BE ノードの数、または使用したいパーティションの数によって決まります。
min(alive_be_number, partition_number, desired_concurrent_number, max_routine_load_task_concurrent_num)
ほとんどの場合、次の要因がインポートのパフォーマンスに影響を与えます。
  • alive_be_number: 生存している BE ノードの数。
  • partition_number: 使用するパーティションの数。
  • desired_concurrent_number: 単一のルーチンロードジョブの想定されるタスク並列度。デフォルト値:3。
    • インポートジョブを作成しなかった場合は、CREATE ROUTINE LOAD ステートメントを実行するときに、このパラメーターを設定する必要があります。
    • インポートジョブを作成した場合は、ALTER ROUTINE LOAD ステートメントを実行するときに、このパラメーターを変更する必要があります。
  • max_routine_load_task_concurrent_num: 単一のルーチンロードジョブの最大タスク並列度。デフォルト値:5。このパラメーターは FE の動的パラメーターです。詳細については、「パラメーター設定」をご参照ください。

多数のパーティションと BE ノードが使用され、alive_be_number パラメーターと partition_number パラメーターの値が、desired_concurrent_number パラメーターと max_routine_load_task_concurrent_num パラメーターの値よりも大きい場合は、desired_concurrent_number パラメーターと max_routine_load_task_concurrent_num パラメーターの値を増やすことで、タスクの並列処理を向上させることができます。

たとえば、消費するパーティションの数が 7、稼働中の BE ノードの数が 5、max_routine_load_task_concurrent_num のデフォルト値が 5 であるとします。この場合、タスクの並列処理を上限まで増やすには、desired_concurrent_number パラメーターの値を 3(デフォルト値)から 5 に変更する必要があります。その後、タスクの並列処理は次の式に基づいて計算されます。min(5,7,5,5)。結果は 5 です。

方法 2:1 つのインポートタスクで消費できるパーティションのデータ量を増やす

説明 この方法は、データインポートのレイテンシを増加させる可能性があります。

1 つのルーチンロードタスクで消費できるメッセージの最大数は、max_routine_load_batch_size パラメーターまたは routine_load_task_consume_second パラメーターによって決まります。インポートタスクがデータを消費するとき、消費されたメッセージの最大数が上記の パラメーターの要件を満たすと、消費は完了します。上記のパラメーターは FE パラメーターです。詳細については、パラメーター設定をご参照ください。

be/log/be.INFO ファイルを表示して、1 回のインポート タスクで消費できるデータ量の上限を指定するパラメーターを分析できます。その後、このパラメーターの値を増やすことで、1 回のインポート タスクで消費できるデータ量を増やすことができます。
// consumer group の完了:41448fb1a0ca59ad-30e34dabfa7e47a0。消費時間 (ミリ秒) = 3261、受信行数 = 179190、受信バイト数 = 9855450、eos:1、残り時間:-261、残りバイト数:514432550、ブロッキング取得時間 (マイクロ秒):3065086、ブロッキング書き込み時間 (マイクロ秒):24855

通常、left_bytes パラメーターの値は 0 以上です。これは、バッチで読み取られるデータ量が、routine_load_task_consume_second パラメーターで指定された期間内に、max_routine_load_batch_size パラメーターで指定された値を超えないことを示します。この場合、すべてのスケジュール済みインポートタスクは、遅延なくすべての Kafka データを消費できます。routine_load_task_consume_second パラメーターの値を増やすことで、1 つのインポートタスクで消費できるデータ量を増やすことができます。

left_bytes パラメーターの値が 0 未満の場合、バッチで読み取ることができるデータの量は、routine_load_task_consume_second パラメーターで指定された期間内に、max_routine_load_batch_size パラメーターで指定された値を超えています。この場合、スケジュールされたインポートタスクはすべての Kafka データを消費しない可能性があります。max_routine_load_batch_size パラメーターの値を大きくすることができます。

SHOW ROUTINE LOAD ステートメントを実行した後、インポートジョブのステータスが PAUSED または CANCELLED に変わります。どうすればよいですか?

エラーの説明に基づいてトラブルシューティングを行うことができます。
  • 説明: インポートジョブのステータスが PAUSED に変わり、ReasonOfStateChanged の説明は Broker: Offset out of range です。
    • 原因: インポートジョブのコンシューマーオフセットが Kafka パーティションに存在しません。
    • 解決策: SHOW ROUTINE LOAD ステートメントを実行して、Progress パラメーターでインポートジョブの最新のコンシューマーオフセットを表示します。次に、コンシューマーオフセットのメッセージが Kafka パーティションに存在するかどうかを確認します。コンシューマーオフセットのメッセージが存在しない場合、問題は次の理由で発生する可能性があります。
      • インポートジョブの作成時に指定されたコンシューマーオフセットが将来の時点です。
      • Kafka パーティションでは、コンシューマーオフセットのメッセージがインポートジョブによって消費される前にクリアされます。 log.retention.hours パラメーターや log.retention.bytes パラメーターなど、Kafka ログのクリーンアップポリシーとパラメーターをインポートジョブのインポート速度に基づいて構成することをお勧めします。
  • 説明: インポートジョブのステータスが PAUSED に変わります。
    • 原因: インポートタスクによってインポートされたエラーデータ行の数が、max_error_number パラメーターで指定された上限を超えている可能性があります。
    • 解決策: ReasonOfStateChangedErrorLogUrls の説明に基づいてトラブルシューティングを行います。
      • データソースのデータ形式が正しくないためにエラーが発生した場合は、データ形式を確認してエラーを修正します。エラーを修正した後、RESUME ROUTINE LOAD ステートメントを実行して、PAUSED 状態のインポートジョブを再開します。
      • データソースのデータ形式が StarRocks によって解析できない場合は、max_error_number パラメーターを変更して、インポートジョブで許可されるエラーデータ行の最大数を変更します。
        1. SHOW ROUTINE LOAD ステートメントを実行して、max_error_number パラメーターの値を確認します。
        2. ALTER ROUTINE LOAD ステートメントを実行して、max_error_number パラメーターの値を増やします。
        3. RESUME ROUTINE LOAD ステートメントを実行して、PAUSED 状態のインポートジョブを再開します。
  • 説明: インポートジョブのステータスが CANCELLED に変わります。
    • 原因: インポートタスクの実行時に例外が発生する可能性があります。たとえば、テーブルが削除されます。
    • 解決策: ReasonOfStateChangedErrorLogUrls の説明に基づいてトラブルシューティングを行います。エラーを修正した後、CANCELLED 状態のインポートジョブを再開することはできません。

Kafka から StarRocks にデータをインポートするために Routine Load を使用する場合、exactly-once セマンティクスは保証されますか?

はい、exactly-once セマンティクスは保証されます。

インポートタスクは個別のトランザクションです。トランザクションの実行中にエラーが発生した場合、トランザクションは終了し、FE ノードはインポートタスクの関連パーティションの消費進捗を更新しません。 FE ノードがキュー内のインポートタスクをスケジュールすると、FE ノードはパーティションの最後に保存されたコンシューマーオフセットから消費要求を開始します。これにより、exactly-once セマンティクスが保証されます。

エラー「Broker: Offset out of range」が発生した場合はどうすればよいですか?

SHOW ROUTINE LOAD ステートメントを実行して、最新のオフセットを表示します。次に、オフセットに Kafka クライアントのデータが含まれているかどうかを確認します。考えられる原因:
  • インポート中に将来のオフセットが指定されています。
  • Kafka は、インポートジョブが開始される前にオフセットデータをクリーンアップします。log.retention.hours パラメーターや log.retention.bytes パラメーターなど、StarRocks のインポート速度に基づいてログをクリーンアップするために使用するパラメーターに適切な値を指定する必要があります。

正常に実行され、FINISHED状態のインポートジョブをBroker Loadを使用して再実行できますか?

いいえ、正常に実行され、FINISHED状態のインポートジョブをBroker Loadを使用して再実行することはできません。さらに、正常に実行された各インポートジョブのラベルは再利用できません。これにより、インポートジョブが失われたり重複したりすることがなくなります。正常に実行され、FINISHED状態のインポートジョブを再実行する場合は、次の手順を実行します。SHOW LOAD ステートメントを実行して履歴インポートレコードを表示し、再実行するインポートジョブを見つけ、再実行するジョブの情報と新しいジョブラベルに基づいて別のインポートジョブを作成します。

Broker Load を使用して HDFS データを StarRocks にインポートすると、日付フィールドの値が異常になり、時刻が正しい時刻より 8 時間遅れています。どうすればよいですか?

  • 原因

    StarRocks テーブルとインポートジョブの作成時に指定したタイムゾーンは UTC+8 ですが、サーバーは UTC+0 タイムゾーンを使用しています。その結果、日付フィールドの値は、日付フィールドの元の値に 8 時間を加えた値になります。

  • 解決策

    StarRocks テーブルを作成するときに、timezone パラメーターを削除します。

Broker Load を使用して ORC 形式のデータを StarRocks にインポートすると、「ErrorMsg: type:ETL_RUN_FAIL; msg:Cannot cast '<slot 6>' from VARCHAR to ARRAY<VARCHAR(30)>」というエラーが発生します。どうすればよいですか?

  • 原因

    インポートするファイルの列名が StarRocks テーブルの列名と一致していません。この場合、システムは SET ステートメントの実行時に型の推論を実行し、cast 関数を呼び出してデータ型の変換を実行します。その結果、データ型の変換が失敗します。

  • 解決策

    インポートするファイルの列名が StarRocks テーブルの列名と一致していることを確認してください。そうすれば、SET ステートメントと cast 関数は不要になります。

Broker Loadを使用してインポートジョブを作成したときにエラーは発生しませんが、データがクエリされません。どうすればよいですか?

Broker Loadは非同期インポート方式です。インポートジョブを作成するための関連ステートメントを実行したときにエラーが発生しなくても、インポートジョブが成功しない場合があります。SHOW LOAD ステートメントを実行して、インポートジョブのステータスと error メッセージを表示し、インポートジョブに設定されているパラメーターを変更してから、インポートジョブを再実行できます。

エラー「failed to send batch」または「TabletWriter add batch with unknown id」が発生した場合はどうすればよいですか?

このエラーは、データ書き込みタイムアウトが原因で発生します。システム変数 query_timeout と BE パラメーター streaming_load_rpc_max_alive_time_sec の値を変更する必要があります。BEノードの構成ファイルの他のパラメーターの詳細については、「パラメーター設定」をご参照ください。

エラー「LOAD-RUN-FAIL; msg:OrcScannerAdapter::init_include_columns. col name = xxx not found」が発生した場合はどうすればよいですか?

ParquetファイルまたはORCファイルからデータをインポートする場合、ファイルヘッダーの列名が宛先のStarRocksテーブルの列名と一致するかどうかを確認します。

次のサンプルコードでは、ParquetまたはORCファイルの tmp_c1 列と tmp_c2 列が、StarRocksテーブルの name 列と id 列にそれぞれマッピングされています。SETステートメントを実行しない場合、ソース列は、指定された名前と同じ名前の列にマッピングされますが、StarRocksテーブルに存在しない可能性があります。
(tmp_c1,tmp_c2)
SET
(
   id=tmp_c2,
   name=tmp_c1
)

特定のバージョンのHiveで生成されたORCファイルをインポートし、ORCファイルのテーブルヘッダーが(_col0, _col1, _col2, ...)の場合、「Invalid Column Name(無効な列名)」エラーが発生する可能性があります。この場合、SETステートメントを実行して、列マッピングのルールを設定する必要があります。

インポートジョブが長時間終了しない場合はどうすればよいですか?

fe.log ファイルで、インポートジョブラベルに基づいてインポートジョブの ID を検索します。次に、BE ノードの be.INFO ファイルで、インポートジョブの ID に基づいてログコンテキストを検索し、エラーの原因を特定します。

高可用性 Apache HDFS クラスタにアクセスするにはどうすればよいですか?

HDFS クラスタの NameNode で高可用性 (HA) を構成した後、アクティブな NameNode が別の NameNode に切り替えられた場合、新しいアクティブな NameNode が自動的に識別されます。HA モードでデプロイされた HDFS クラスタにアクセスするには、次の表に示すパラメータを構成します。

パラメータ

説明

dfs.nameservices

HDFS サービスの名前。カスタム名を設定できます。

たとえば、dfs.nameservices パラメータを my_ha に設定します。

dfs.ha.namenodes.xxx

NameNode のカスタム名。複数の名前はカンマ (,) で区切ります。このパラメータ名の xxx は、dfs.nameservices パラメータに設定したカスタム名に置き換えます。

たとえば、dfs.ha.namenodes.my_ha パラメータを my_nn に設定します。

dfs.namenode.rpc-address.xxx.nn

NameNode がリモートプロシージャコール (RPC) に使用するアドレス。このパラメータ名の nn は、dfs.ha.namenodes.xxx パラメータに設定した NameNode の名前に置き換えます。

たとえば、dfs.namenode.rpc-address.my_ha.my_nn パラメータを Hostname:Port number 形式の値に設定します。

dfs.client.failover.proxy.provider

クライアントが NameNode に接続するために使用するプロバイダー。デフォルト値: org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider

HA モードでデプロイされた HDFS クラスタへのアクセスには、シンプル認証または Kerberos 認証を使用できます。次のサンプルコードは、シンプル認証を使用して HA HDFS クラスタにアクセスする方法の例を示しています。

(
    "username"="user", // ユーザー名
    "password"="passwd", // パスワード
    "dfs.nameservices" = "my-ha", // HDFS サービス名
    "dfs.ha.namenodes.my-ha" = "my_namenode1,my_namenode2", // NameNode の名前
    "dfs.namenode.rpc-address.my-ha.my-namenode1" = "nn1-host:rpc_port", // NameNode1 の RPC アドレス
    "dfs.namenode.rpc-address.my-ha.my-namenode2" = "nn2-host:rpc_port", // NameNode2 の RPC アドレス
    "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" // フェイルオーバープロバイダー
)

HDFS クラスタの構成は、hdfs-site.xml ファイルに書き込むことができます。ブローカープロセスを使用して HDFS クラスタに関する情報を読み取る場合は、クラスタのファイルパスと認証情報のみを指定する必要があります。

HDFS Federation で ViewFs を構成するにはどうすればよいですか?

View File System(ViewFs)の core-site.xml および hdfs-site.xml 構成ファイルを broker/conf ディレクトリにコピーします。

カスタムファイルシステムが含まれている場合は、ファイルシステムに関連する .jar ファイルを broker/lib ディレクトリにコピーします。

Kerberos 認証が有効になっている EMR クラスターにアクセスするときに「Kerberos レルムを取得できません」というエラーが発生した場合はどうすればよいですか?

  1. すべてのブローカーの物理マシンで `/etc/krb5.conf` ファイルが構成されているかどうかを確認します。
  2. 上記のファイルがすべてのブローカーの物理マシンで構成された後もエラーが解決しない場合は、Broker 起動スクリプトの -Djava.security.krb5.conf:/etc/krb5.confJAVA_OPTS 変数に を追加します。

INSERT INTO ステートメントを実行してデータをインポートするときに、データレコードを挿入するのに約 50 ~ 100 ミリ秒かかるのはなぜですか?

INSERT INTO ステートメントは、複数のデータレコードをバッチでインポートするために使用されます。そのため、1 つのデータレコードをインポートするために必要な期間は、複数のデータレコードをバッチでインポートするために必要な期間と同じです。オンライン分析処理 (OLAP) シナリオでは、INSERT INTO ステートメントを使用して 1 つのデータレコードをインポートしないことをお勧めします。

Stream Loadはテキストファイルの最初の行にある列名を識別できますか? Stream Loadはテキストファイルの最初の行のデータを無視できますか?

このエラーは、Stream Load の RPC タイムアウトが原因で発生します。対応する構成ファイルで RPC タイムアウトに関連するパラメーターを変更することで、エラーを修正できます。

いいえ、Stream Loadはテキストファイルの最初の行にある列名を識別できません。最初の行のデータは、Stream Loadの共通データです。Stream Loadはテキストファイルの最初の行のデータを無視できません。インポートするテキストファイルの最初の行に列名が存在する場合は、次のいずれかの方法を使用して問題をトラブルシューティングできます。be.conf
  • streaming_load_rpc_max_alive_time_sec: Stream Load の RPC タイムアウト期間。既定値: 1200。単位: 秒。
  • エクスポートツールの設定を変更します。このようにして、列名なしでテキストファイルをエクスポートできます。tablet_writer_rpc_timeout_sec

大量のデータをインポートするために INSERT INTO SELECT ステートメントを実行したときに、実行が失敗し、「execute timeout」エラーが発生した場合はどうすればよいですか?

このエラーは、クエリがタイムアウトしたために発生します。エラーを修正するには、セッションの query_timeout パラメーターを変更します。デフォルト値は 600 です。単位:秒。

サンプルコマンド:
set query_timeout =xx; // セッションのquery_timeoutパラメーターを設定します

Flinkジョブの実行時に「Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing」というエラーが発生した場合はどうすればよいですか?

  • 原因

    StarRocks-migrate-tools(SMT)の [table-rule.1][table-rule.2]config_prod.conf 設定ファイルに、 や などの複数のルールが設定されていますが、ルールに必要な情報が指定されていません。

  • 解決策

    ルール [table-rule.1][table-rule.2] にデータベース、テーブル、および Flink コネクタが設定されているかどうかを確認します。

Flink は失敗したタスクをどのように再起動しますか?

Flink は、チェックポイントメカニズムと再起動ポリシーに基づいて失敗したタスクを再起動します。

flink-conf.yaml 構成ファイルで次のパラメーターを構成して、チェックポイントメカニズムを有効にし、固定遅延再起動ポリシーを使用します。
# 単位:ミリ秒
execution.checkpointing.interval: 300000
state.backend: filesystem
state.checkpoints.dir: file:///tmp/flink-checkpoints-directory
パラメーター:
  • execution.checkpointing.interval: 2 つのチェックポイントの間隔。単位:ミリ秒。チェックポイントメカニズムを有効にするには、このパラメーターに 0 より大きい値を指定する必要があります。
  • state.backend: チェックポイントメカニズムが有効になると、データ損失を防ぎ、データリカバリ中のデータ整合性を確保するために、状態はチェックポイントに基づいて永続化されます。状態情報の保存、保存に使用される方法、およびチェックポイントに基づいて永続化される状態の場所は、選択した状態バックエンドによって異なります。詳細については、「State Backends」をご参照ください。
  • state.checkpoints.dir: チェックポイントが保存されるディレクトリ。

Stream Loadを使用して標準のDATE型またはINT型ではないパーティションキー列のデータをStarRocksにインポートする場合、データのデータ型を変換する必要がありますか?

Flink ジョブを停止し、セーブポイントからジョブを再開するために、セーブポイントをトリガーできます。 セーブポイントは、チェックポイントメカニズムに基づいて作成される、ストリーミングジョブの実行状態の一貫性のあるイメージです。 詳細については、「セーブポイント」をご参照ください。

はい、StarRocksを使用してデータインポートプロセス中にデータ型を変換できます。

停止前の状態に Flink ジョブを復元する場合、Flink ジョブの再送信時にセーブポイントを指定する必要があります。サンプルコマンド:
bin/flink stop --type [native/canonical] --savepointPath [:targetDirectory] :jobId
パラメーター:
  • jobId: Flink ジョブの ID。 flink list -running コマンドを実行するか、Flink の Web UI で ID を確認できます。
  • targetDirectory: セーブポイントが保存されるディレクトリ。 セーブポイントを保存するディレクトリを指定するには、Flink の state.savepoints.dirflink-conf.yml 設定ファイルで
    state.savepoints.dir: [file:// or hdfs://]/home/user/savepoints_dir
    パラメーターを設定します。 これにより、セーブポイントがトリガーされたときに、指定されたディレクトリを使用してセーブポイントを保存できます。
たとえば、インポートする
./flink run -c com.starrocks.connector.flink.tools.ExecuteSQL -s savepoints_dir/savepoints-xxxxxxxx flink-connector-starrocks-xxxx.jar -f flink-create.all.sql
ファイルはCSV形式で、NO、DATE、VERSION、PRICEの各列が含まれていますが、DATE列のデータは202106.00形式です。StarRocksで使用する列がDATEの場合は、StarRocksにNO、VERSION、PRICE、DATEの各列を含むテーブルを作成する必要があります。また、DATE列のデータのデータ型をDATE、DATETIME、またはINTとして指定する必要があります。次に、Stream Loadのステートメントで次の設定を構成して、列間でデータ型変換を実行できます。

トランザクションの exactly-once セマンティクスを使用してデータをインポートするときに、データのインポートが失敗した場合はどうすればよいですか?

  • 説明: 次のようなエラー情報が返されます:
    com.starrocks.data.load.stream.exception.StreamLoadFailException: {
        "TxnId": 3382****,
        "Label": "502c2770-cd48-423d-b6b7-9d8f9a59****",
        "Status": "Fail",
        "Message": "timeout by txn manager", // エラーメッセージ
        "NumberTotalRows": 1637,
        "NumberLoadedRows": 1637,
        "NumberFilteredRows": 0,
        "NumberUnselectedRows": 0,
        "LoadBytes": 4284214,
        "LoadTimeMs": 120294,
        "BeginTxnTimeMs": 0,
        "StreamLoadPlanTimeMs": 7,
        "ReadDataTimeMs": 9,
        "WriteDataTimeMs": 120278,
        "CommitAndPublishTimeMs": 0
    }
  • 原因: sink.properties.timeout パラメーターの値が、Flink のチェックポイント間隔よりも小さいです。
  • 解決策: sink.properties.timeout パラメーターの値を大きくします。値は、Flink のチェックポイント間隔よりも大きい値である必要があります。

flink-connector-jdbc_2.11 ドライバーを使用して StarRocks にデータをインポートした後、時刻が Flink の時刻より 8 時間早くなります。どうすればよいですか?

  • 説明: localtimestap 関数によって生成された時刻は Flink では正常です。 flink-connector-jdbc_2.11 ドライバーを使用して StarRocks にデータをインポートした後、時刻が Flink の時刻より 8 時間早くなります。 Flink と StarRocks がデプロイされているサーバーのタイムゾーンは UTC+8 (Asia/Shanghai) です。 Flink のバージョンは 1.12 で、ドライバーは flink-connector-jdbc_2.11 です。
  • 解決策: Flink シンクテーブルの server-time-zone パラメーターを Asia/Shanghai に設定し、&serverTimezone=Asia/Shanghaiurl パラメーターの値に
    CREATE TABLE sk (
        sid int,
        local_dtm TIMESTAMP,
        curr_dtm TIMESTAMP
    )
    WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://192.168.**.**:9030/sys_device?characterEncoding=utf-8&serverTimezone=Asia/Shanghai',
        'table-name' = 'sink',  // シンクテーブル
        'driver' = 'com.mysql.jdbc.Driver',
        'username' = 'sr',
        'password' = 'sr123',
        'server-time-zone' = 'Asia/Shanghai'
    );
    を追加します。サンプルステートメント:

StarRocksクラスターにデプロイされたKafkaからのデータは、StarRocksにインポートできます。ただし、他のマシンにデプロイされたKafkaからのデータは、StarRocksにインポートできません。なぜですか?

  • 説明: 次のエラーメッセージが返されます:
    failed to query wartermark offset, err: Local: Bad message format // ウォーターマークオフセットのクエリに失敗しました。エラー: ローカル: メッセージ形式が正しくありません
  • 原因: KafkaはStarRocksクラスターのホスト名を解析できません。
  • 解決策: StarRocksクラスターのノードでKafkaのホスト名を指定します。このようにして、/etc/hosts ファイルを解析できます。

クエリが実行されていないときに BE ノードのメモリが完全に占有され、BE ノードの CPU 使用率が 100% に達するのはなぜですか?

BE ノードは定期的に統計を収集し、長期間 CPU リソースを占有しません。使用メモリが 10 GB 未満の場合、BE ノードは残りのメモリリソースを解放しません。BE ノードは、その構成に基づいてメモリリソースを管理します。tc_use_memory_min パラメーターを構成してメモリサイズを変更できます。

tc_use_memory_min は、TCmalloc の最小メモリを指定します。デフォルト値は 10737418240 です。StarRocks は、実際に使用されているメモリが tc_use_memory_min パラメーターの値を超えた場合にのみ、アイドル状態のメモリリソースをオペレーティングシステムに返します。このパラメーターを構成するには、次の手順を実行します。EMR コンソールで、StarRocks サービスページの [構成] タブに移動します。[be.conf] タブをクリックします。BE ノードの構成ファイルの他のパラメーターの詳細については、「パラメーター構成」をご参照ください。

BEノードが要求されたメモリリソースをオペレーティングシステムに返さないのはなぜですか?

メモリの割り当ては負荷の高い操作です。データベースがオペレーティングシステムから大量のメモリリソースを要求すると、データベース用に多くのメモリリソースが予約されます。メモリリソースを再利用するために、要求されたメモリリソースは遅延してオペレーティングシステムに返されます。テスト環境の検証中にメモリ使用量を監視し、長期間にわたってメモリリソースをオペレーティングシステムに返すことができるかどうかを確認することをお勧めします。

Flinkコネクタの依存関係をシステムが解析できないのはなぜですか?

  • 原因:Alibaba CloudイメージリポジトリのアドレスからFlinkコネクタの依存関係を取得する必要があります。`/etc/maven/settings.xml` ファイルに関連設定が構成されていないため、Flinkコネクタのすべての依存関係がAlibaba Cloudイメージリポジトリのアドレスから取得されるわけではありません。
  • 解決策:Alibaba Cloudパブリックリポジトリのアドレスを https://maven.aliyun.com/repository/public に変更します。

Flink-connector-StarRocks で checkpoint 間隔パラメーターと一緒に sink.buffer-flush.interval-ms パラメーターを設定した場合、sink.buffer-flush.interval-ms パラメーターは有効になりますか?

  • 問題の説明: sink.buffer-flush.interval-ms パラメーターは 15 秒に設定されていますが、checkpoint 間隔 パラメーターは 5 分 に設定されています。
    +----------------------+--------------------------------------------------------------+
    |         Option       | Required |  Default   | Type   |       Description           |
    +-------------------------------------------------------------------------------------+
    |  sink.buffer-flush.  |  NO      |   300000   | String | フラッシュする時間間隔です。  |  // Translated comment
    |  interval-ms         |          |            |        | range: [1000ms, 3600000ms]  |
    +----------------------+--------------------------------------------------------------+
  • 解決策: フラッシュ操作は、checkpoint 間隔 パラメーターの値の影響を受けません。次のパラメーターのいずれかの値が指定された制限を超えると、フラッシュ操作がトリガーされます。checkpoint 間隔 パラメーターの値は exactly-once セマンティクスに有効であり、sink.buffer-flush.interval-ms パラメーターの値は at-least-once セマンティクスに有効です。
    sink.buffer-flush.max-rows
    sink.buffer-flush.max-bytes
    sink.buffer-flush.interval-ms

DataX を使用してインポートされたデータを更新できますか?

最新バージョンの StarRocks では、DataX を使用してプライマリキーモデルで作成された StarRocks テーブルのデータを更新できます。この機能を有効にするには、JSON 設定ファイルのリーダーセクションに _op フィールドを追加する必要があります。

DataX を使用してデータを同期する場合、DataX キーワードをどのように処理してエラーを回避すればよいですか?

バッククォート(``)でキーワードを囲みます。

エラー「master 'yarn' で実行する場合、環境で HADOOP-CONF-DIR または YARN-CONF-DIR を設定する必要があります」が発生した場合はどうすればよいですか?

Spark クライアントの HADOOP-CONF-DIRspark-env.sh スクリプトで 環境変数を設定します。

spark-submitコマンドを実行してSparkジョブを送信するときに「"xxx/bin/spark-submit"プログラムを実行できません: error=2, そのようなファイルまたはディレクトリはありません」というエラーが発生した場合はどうすればよいですか?

Spark Loadを使用してデータをインポートする場合、spark_home_default_dir パラメーターを設定していないか、Sparkクライアントのルートディレクトリを正しく設定していません。エラーを修正するには、Sparkクライアントの正しいルートディレクトリを指定します。

エラー「File xxx/jars/spark-2x.zip does not exist」が発生した場合はどうすればよいですか?

Spark Loadを使用してデータをインポートするときに、spark-resource-path パラメーターの値がパッケージ化された ZIP ファイルを指していません。ファイルパスがファイル名と一致しているかどうかを確認してください。

エラー「yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn」が発生した場合はどうすればよいですか?

Spark Loadを使用してデータをインポートする場合、yarn-client-path パラメーターに実行可能ファイルが設定されていません。