すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark Load

最終更新日:Jan 11, 2025

Spark Loadを使用すると、外部のSparkリソースを使用してインポートするデータを前処理できます。 これにより、StarRocksクラスタへの大量データのインポートのパフォーマンスが向上し、StarRocksクラスタのコンピューティングリソースを節約できます。 データを初めてStarRocksクラスタに移行する場合、または大量のデータ(TBレベル)をStarRocksクラスタにインポートする場合は、Spark Loadを使用できます。 このトピックでは、Spark Loadの用語、ワークフロー、使用例、ベストプラクティス、FAQについて説明します。

背景情報

Spark Loadは非同期ロード方式です。 MySQLプロトコルを使用してSparkタイプのインポートジョブを作成し、SHOW LOAD コマンドを実行してインポート結果を表示する必要があります。

説明

このトピックの画像と一部の情報は、オープンソースのStarRocksのBulk load using Apache Sparkからのものです。

用語

  • Spark ETL:Spark ETLは、インポートするデータを抽出、変換、ロード(ETL)するために使用されます。 Spark ETLを使用して、ビットマップグローバルディクショナリの作成、データパーティション、データソート、データ集計などの操作を実行できます。

  • Broker:Brokerは、ファイルシステムインターフェースをカプセル化する独立したステートレスプロセスです。 これにより、StarRocksクラスタはリモートストレージシステムからファイルを読み取ることができます。

  • グローバルディクショナリ:グローバルディクショナリは、生データとマッピングされたエンコードデータのデータ構造を格納します。 生データはすべてのタイプにすることができますが、エンコードデータは整数である必要があります。 グローバルディクショナリは、通常、事前計算の高精度重複排除に使用されます。

ワークフロー

MySQLクライアントからSparkタイプのインポートジョブが送信されると、フロントエンドノードはメタデータを記録し、ジョブが正常に送信されたことを示すメッセージを返します。

次の図は、Spark Loadのワークフローを示しています。 Spark Load

ワークフローは、次の手順で構成されます。

  1. Spark Loadジョブをフロントエンドノードに送信します。

  2. フロントエンドノードは、ETLジョブをスケジュールしてSparkクラスタに送信します。

  3. SparkクラスタでETLジョブを実行します。 ビットマップグローバルディクショナリが作成され、データがパーティション化、ソート、集計されます。 これにより、インポートするデータを前処理できます。

  4. ETLジョブの実行後、フロントエンドノードは各パーティションで前処理されたデータのディレクトリを取得し、バックエンドノードにプッシュジョブを実行するようにスケジュールします。

  5. バックエンドノードはBrokerを使用してデータを読み取り、データ型をStarRocksクラスタに格納できる型に変換します。

  6. フロントエンドノードはStarRocksバージョンを公開し、インポートジョブを完了します。

グローバルディクショナリ

シナリオ

StarRocksのビットマップ列は、roaringビットマップを使用して実現されます。 roaringビットマップを使用してインポートされるデータは整数である必要があります。 データをインポートする前にビットマップ列を事前計算するには、データ型をIntegerに変換します。 データのインポート中に、グローバルディクショナリはHiveテーブルに基づいて生データとマッピングされたエンコードデータのデータ構造を格納します。

ワークフロー

  1. データソースからデータを読み取り、hive-tableという名前の一時Hiveテーブルにデータを格納します。

  2. hive-tableテーブルのデータを重複排除し、重複排除された生データをdistinct-value-tableという名前のHiveテーブルに格納します。

  3. 生データを1つの列に、エンコードデータを別の列に格納するために、dict-tableという名前のグローバルディクショナリテーブルを作成します。

  4. distinct-value-tableテーブルとdict-tableテーブルでLEFT JOIN操作を実行します。 ウィンドウ関数を使用してLEFT JOIN結果の生データをエンコードし、生データとエンコードデータの列をdict-tableテーブルに書き戻します。

  5. dict-tableテーブルとhive-tableテーブルでJOIN操作を実行します。 hive-tableテーブルの生データをエンコードされた整数に置き換えます。

  6. hive-tableテーブルのデータは、後続の操作で読み取られ、計算されてから、StarRocksクラスタにインポートされます。

データの前処理

データは、次の手順で前処理されます。

  1. Hadoop Distributed File System(HDFS)ファイルまたはHiveテーブルからデータを読み取ります。

  2. データに対してフィールドマッピングと式ベースの計算を実行し、パーティション情報に基づいてbucket-idという名前のフィールドを生成します。

  3. StarRocksテーブルのロールアップメタデータに基づいてロールアップツリーを生成します。

  4. ロールアップツリーをトラバースして、異なるレイヤーのデータを集計します。 あるレイヤーのロールアップは、前のレイヤーのロールアップに基づいて計算されます。

  5. データが集計された後、データはbucket-idフィールドに基づいて異なるバケットに配布され、HDFSに書き込まれます。

  6. BrokerはHDFSファイルを読み取り、データをStarRocksクラスタのバックエンドノードにインポートします。

基本操作

ETL ジョブのクラスタを構成する

Sparkは、StarRocksクラスタでETLジョブを実行するための外部リソースを提供します。 今後、StarRocksクラスタでは他のタイプの外部リソースがサポートされる予定です。 たとえば、Sparkまたは GPU をデータクエリに使用したり、HDFSまたはAmazon Simple Storage Service(S3)を外部データの格納に使用したり、MapReduceをETLに使用したりできます。 これらの外部リソースは、リソース管理を使用して管理できます。

インポートジョブを送信する前に、ETLジョブのSparkクラスタを構成します。 構文:

-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
 type = spark,
 spark_conf_key = spark_conf_value,
 working_dir = path,
 broker = broker_name,
 broker.property_key = property_value
);

-- drop spark resource
DROP RESOURCE resource_name;

-- show resources
SHOW RESOURCES
SHOW PROC "/resources";

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
  • リソースを作成する

    resource-nameは、StarRocksクラスタ用に構成されたSparkリソースの名前を指定するプロパティです。

    PROPERTIESには、Sparkリソースの多くのプロパティが含まれています。 次の表に、プロパティを示します。 詳細については、SparkドキュメントのSpark Configurationを参照してください。

    プロパティ

    説明

    type

    リソースのタイプ。 値をsparkに設定します。 このプロパティは必須です。

    Spark関連のプロパティ

    spark.master

    値をyarnに設定します。 このプロパティは必須です。

    spark.submit.deployMode

    Sparkプログラムがデプロイされるモード。 有効な値:clusterとclient。 このプロパティは必須です。

    spark.hadoop.fs.defaultFS

    このプロパティは、spark.masterプロパティがyarnに設定されている場合にのみ必須です。

    spark.hadoop.yarn.resourcemanager.address

    ResourceManagerのアドレス。

    spark.hadoop.yarn.resourcemanager.ha.enabled

    ResourceManagerで高可用性を有効にするかどうかを指定します。 デフォルト値:true。

    spark.hadoop.yarn.resourcemanager.ha.rm-ids

    ResourceManagerの論理ID。

    spark.hadoop.yarn.resourcemanager.hostname.rm-id

    ホスト名は各論理IDに対応しています。

    説明

    高可用性ResourceManagerの場合は、spark.hadoop.yarn.resourcemanager.hostname.rm-id プロパティまたは spark.hadoop.yarn.resourcemanager.address.rm-id プロパティのいずれかを構成する必要があります。

    spark.hadoop.yarn.resourcemanager.address.rm-id

    アドレスは、クライアントがジョブを送信する各論理IDに対応しています。 アドレスはhost:port形式です。

    説明

    高可用性ResourceManagerの場合は、spark.hadoop.yarn.resourcemanager.hostname.rm-id プロパティまたは spark.hadoop.yarn.resourcemanager.address.rm-id プロパティのいずれかを構成する必要があります。

    working_dir

    ETLのリソースが存在するディレクトリ。

    説明

    SparkリソースがETLに使用される場合は、このプロパティが必須です。 例:hdfs://host:port/tmp/starrocks

    broker

    Brokerの名前。

    説明

    SparkリソースがETLに使用される場合は、このプロパティが必須です。 事前に ALTER SYSTEM ADD BROKER コマンドを実行してBrokerを構成する必要があります。

    broker.property_key

    BrokerがETLの中間ファイルを読み取るときに検証に必要な情報。

    サンプルコード:

    -- yarn cluster mode
    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.jars" = "xxx.jar,yyy.jar",
        "spark.files" = "/tmp/aaa,/tmp/bbb",
        "spark.executor.memory" = "1g",
        "spark.yarn.queue" = "queue0",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker0",
        "broker.username" = "user0",
        "broker.password" = "password0"
    );
    
    -- yarn cluster mode that enables high availability
    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
        "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
        "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
        "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker1"
    );
    
    -- HDFS cluster mode that enables high availability
    CREATE EXTERNAL RESOURCE "spark2"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://myha",
        "spark.hadoop.dfs.nameservices" = "myha",
        "spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
        "working_dir" = "hdfs://myha/tmp/starrocks",
        "broker" = "broker2",
        "broker.dfs.nameservices" = "myha",
        "broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
  • リソースを表示する

    show resources;

    通常のアカウントを使用して、USAGE-PRIV権限を持っているリソースのみを表示できます。 rootアカウントまたはadminアカウントを使用して、すべてのリソースを表示できます。

  • リソースに対する権限を管理する

    GRANT REVOKEコマンドを実行して、リソースに対する権限を管理できます。 USAGE-PRIV権限のみがサポートされています。 USAGE-PRIV権限をユーザーまたはロールに付与できます。 ロールは通常どおり割り当てることができます。 サンプルコード:

    -- リソース spark0 に対する USAGE-PRIV 権限をユーザー user0 に付与します。
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
    
    -- リソース spark0 に対する USAGE-PRIV 権限をロール role0 に付与します。
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
    
    -- すべてのリソースに対する USAGE-PRIV 権限をユーザー user0 に付与します。
    GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
    
    -- すべてのリソースに対する USAGE-PRIV 権限をロール role0 に付与します。
    GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
    
    -- リソース spark0 からユーザー user0 の USAGE-PRIV 権限を取り消します。
    REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

Spark クライアントを構成する

基盤となるフロントエンドノードは、spark-submitコマンドを実行してSpark Loadジョブを送信します。 したがって、フロントエンドノードのSparkクライアントを構成する必要があります。 Spark ダウンロード URLをクリックして、Spark 2.4.5以降のバージョンのSpark 2.xをダウンロードすることをお勧めします。 次に、次の手順を実行してSparkクライアントを構成します。

  1. SPARK-HOME環境変数を構成する

    Sparkクライアントを、フロントエンドノードが存在するディレクトリに格納します。 フロントエンドノードの構成ファイルのspark_home_default_dir パラメータをディレクトリに設定します。 このパラメータのデフォルト値は、フロントエンドノードのルートディレクトリの下のlib/spark2xです。 このパラメータを空にすることはできません。

  2. Spark依存関係パッケージを構成する

    Sparkクライアントのjars フォルダにあるすべてのJARパッケージをZIPファイルにパッケージ化します。 フロントエンドノードの構成ファイルのspark_resource_path パラメータをZIPファイルのディレクトリに設定します。 パラメータが空の場合、フロントエンドノードはルートディレクトリでlib/spark2x/jars/spark-2x.zip ファイルを検索します。 ファイルが見つからない場合は、エラーメッセージが返されます。

    Spark Loadジョブが送信されると、アーカイブされた依存関係パッケージは、デフォルトでworking_dir/{cluster_id} ディレクトリにあるリモートウェアハウスにアップロードされます。 リモートウェアハウスには、--spark-repository--{resource-name} 形式で名前が付けられます。 {resource-name}フィールドは、リモートウェアハウスに対応するリソースを指定します。 リモートウェアハウスのディレクトリ構造の例:

    ---spark-repository--spark0/
       |---archive-1.0.0/
       |   |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar
       |   |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip
       |---archive-1.1.0/
       |   |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar
       |   |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip
       |---archive-1.2.0/
       |   |-...

デフォルトでは、Spark依存関係パッケージの名前はspark-2x.zipです。 フロントエンドノードは、Dynamic Partition Pruning(DPP)依存関係パッケージもリモートウェアハウスにアップロードします。 Spark Loadがすべての依存関係パッケージをリモートウェアハウスに送信する場合、フロントエンドノードは2つの依存関係パッケージをアップロードする必要がないため、時間を節約できます。

YARN クライアントを構成する

基盤となるフロントエンドノードはYARNコマンドを実行して、進行中のアプリケーションのステータスを取得し、アプリケーションを終了します。 したがって、フロントエンドノードのYARNクライアントを構成する必要があります。 Hadoop ダウンロード URLをクリックして、Hadoop 2.5.2以降のバージョンのHadoop 2.xをダウンロードすることをお勧めします。 次に、次の手順を実行してYARNクライアントを構成します。

  1. YARNクライアントの実行可能ファイルのディレクトリを指定する

    YARNクライアントを、フロントエンドノードが存在するディレクトリに格納します。 フロントエンドノードの構成ファイルのyarn_client_path パラメータを、YARNクライアントのバイナリ実行可能ファイルのディレクトリに設定します。 このパラメータのデフォルト値は、フロントエンドノードのルートディレクトリの下のlib/yarn-client/hadoop/bin/yarnです。

  2. (オプション)YARNコマンドの構成ファイルのディレクトリを指定する

    フロントエンドノードがアプリケーションのステータスを取得するか、アプリケーションを終了すると、YARNコマンドを実行するために使用される構成ファイルが、デフォルトでフロントエンドノードのルートディレクトリの下にあるlib/yarn-config ディレクトリに生成されます。 フロントエンドノードの構成ファイルのyarn_config_dir パラメータを変更することで、ファイルが生成されるディレクトリを変更できます。 生成される構成ファイルには、core-site.xmlyarn-site.xml が含まれます。

インポートジョブを作成する

  • 構文:

    LOAD LABEL load_label
        (data_desc, ...)
    WITH RESOURCE resource_name
    [resource_properties]
    [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
     (key2=value2, ...)

    構文の詳細については、HELP SPARK LOAD コマンドを実行してください。 次のリストでは、構文のパラメータについて説明します。

    • ラベル

      インポートジョブのラベル。 ラベルはデータベース内で一意です。 パラメータの仕様は、Broker Loadトピックの同じパラメータの仕様と同じです。

    • データ記述のパラメータ

      サポートされているデータソースは、CSVファイルとHiveテーブルのみです。 パラメータのその他の仕様は、Broker Loadトピックの同じパラメータの仕様と同じです。

    • ジョブプロパティのパラメータ

      インポートジョブのプロパティ。 パラメータはopt_properties セクションにあります。 パラメータの仕様は、Broker Loadトピックのbroker_propertiesセクションのパラメータの仕様と同じです。

    • Sparkリソースのパラメータ

      Sparkリソースのパラメータは、事前にStarRocksクラスタで構成する必要があります。 Spark Loadは、ユーザーにUSAGE-PRIV権限が付与された後にのみ使用できます。 次のサンプルコードでは、Sparkリソースの2つのパラメータが構成されています。 ジョブリソースを追加するなど、一時的な要件がある場合は、パラメータを構成できます。 この構成は、このジョブに対してのみ有効であり、StarRocksクラスタの構成には影響しません。 サンプルコード:

      WITH RESOURCE 'spark0'
      (
         "spark.driver.memory" = "1g",
         "spark.executor.memory" = "3g"
      )
    • Hiveテーブルからデータをインポートする

      Hiveテーブルからデータをインポートするには、外部Hiveテーブルを作成し、インポートコマンドを送信するときにテーブル名を指定します。

    • データのインポート中にグローバルディクショナリを作成する

      StarRocksテーブルの集計列がビットマップ列の場合、グローバルディクショナリを作成できます。 グローバルディクショナリを作成するには、次の形式でLOADコマンドにグローバルディクショナリを作成するフィールドを指定します。 フィールド名=bitmap_dict (Hiveテーブルのフィールド名)

      重要

      Hiveテーブルからデータをインポートする場合にのみ、グローバルディクショナリを作成できます。

  • サンプルコード:

    • HDFSファイルからデータをインポートするインポートジョブを作成します。

      LOAD LABEL db1.label1 
      (
          DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/starRocks/test/ml/file1")
          INTO TABLE tbl1
          COLUMNS TERMINATED BY ","
          (tmp_c1,tmp_c2)
          SET
          (
              id=tmp_c2,
              name=tmp_c1
          ),
          DATA INFILE("hdfs://emr-header-1.cluster-xxx:9000/user/starRocks/test/ml/file2")
          INTO TABLE tbl2
          COLUMNS TERMINATED BY ","
          (col1, col2)
          where col1 > 1
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
    • Hiveテーブルからデータをインポートするインポートジョブを作成します。

      1. Hiveリソースを作成します。

        CREATE EXTERNAL RESOURCE hive0
        properties
        (
            "type" = "hive",
            "hive.metastore.uris" = "thrift://emr-header-1.cluster-xxx:9083"
        );
      2. 外部Hiveテーブルを作成します。

        CREATE EXTERNAL TABLE hive_t1
        (
            k1 INT,
            K2 SMALLINT,
            k3 varchar(50),
            uuid varchar(100)
        )
        ENGINE=hive
        properties
        (
            "resource" = "hive0",
            "database" = "tmp",
            "table" = "t1"
        );
      3. LOADコマンドを送信して、データのインポート先のStarRocksテーブルの列が外部Hiveテーブルにも存在することを確認します。

        LOAD LABEL db1.label1
        (
            DATA FROM TABLE hive_t1
            INTO TABLE tbl1
            SET
            (
                uuid=bitmap_dict(uuid)
            )
        )
        WITH RESOURCE 'spark0'
        (
            "spark.executor.memory" = "2g",
            "spark.shuffle.compress" = "true"
        )
        PROPERTIES
        (
            "timeout" = "3600"
        );

インポートジョブを表示する

Spark LoadまたはBroker Loadを使用して送信されたインポートジョブは、非同期インポートジョブです。 SHOW LOAD コマンドでジョブラベルを使用して、インポートジョブを表示する必要があります。 このコマンドは、すべてのメソッドで送信されたインポートジョブを表示するために一般的に使用されます。 詳細については、HELP SHOW LOAD コマンドを実行してください。

サンプルコード:

show load order by createtime desc limit 1\G

返された結果:

 *************************** 1. row ***************************
  JobId: 76391
  Label: label1
  State: FINISHED
 Progress: ETL:100%; LOAD:100%
  Type: SPARK
 EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
 TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
 ErrorMsg: N/A
 CreateTime: 2019-07-27 11:46:42
 EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
  URL: http://1.1.*.*:8089/proxy/application_1586619723848_0035/
 JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}

次の表に、返された結果のパラメータを示します。

パラメータ

説明

State

インポートジョブの現在のステータス。

インポートジョブが送信されると、インポートジョブはPENDING状態になります。 ETLジョブが送信されると、インポートジョブはETL状態になります。 ETLジョブが完了し、フロントエンドノードがバックエンドノードにプッシュジョブを実行するようにスケジュールすると、インポートジョブはLOADING状態になります。 プッシュジョブが完了し、StarRocksバージョンが有効になると、インポートジョブはFINISHED状態になります。

インポートジョブが完了すると、インポートジョブはCANCELLEDまたはFINISHED状態になります。 CANCELLED状態はインポートジョブが失敗したことを示し、FINISHED状態はインポートジョブが成功したことを示します。

Progress

インポートジョブの進捗状況。ETLの進捗状況とLOADの進捗状況が含まれます。 ETLの進捗状況は、ETL状態の進捗状況を示します。 LOADの進捗状況は、LOADING状態の進捗状況を示します。

LOADの進捗状況は0~100%にすることができます。 LOADの進捗状況は、次の式に基づいて計算できます。

LOADの進捗状況 = すべてのレプリカでインポートされたタブレットの数/インポートジョブのタブレットの総数 × 100%
説明
  • すべてのテーブルがインポートされた場合、LOADの進捗状況は99%です。 インポートが有効になった後にのみ、LOADの進捗状況は100%になります。

  • インポートの進捗状況は線形ではありません。 したがって、一定期間進捗状況が変化しない場合でも、インポートが一時停止されているとは限りません。

Type

インポートジョブのタイプ。 Spark Loadが使用されるシナリオでは、タイプはSPARKです。

CreateTime

インポートジョブが作成された時刻。

EtlStartTime

インポートジョブがETL状態になった時刻。

EtlFinishTime

インポートジョブがETL状態ではなくなった時刻。

LoadStartTime

インポートジョブがLOADING状態になった時刻。

LoadFinishTime

インポートジョブが完了した時刻。

JobDetails

インポートジョブの詳細。インポートされたファイルの数、インポートされたデータの合計バイト数、タスクの数、生データで処理された行数が含まれます。 例:

 {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}

URL

アプリケーションWebページの URL 。 URL をブラウザに貼り付けて、対応するアプリケーションのWebページにアクセスできます。

返された結果のパラメータの詳細については、Broker Loadを参照してください。

ログを表示する

Spark Loadジョブが送信されると、ログが生成されます。 デフォルトでは、ログはフロントエンドノードのルートディレクトリの下のlog/spark_launcher_log ディレクトリに格納され、spark-launcher-{load-job-id}-{label}.log 形式で名前が付けられます。 ログは一時的に保持され、フロントエンドノードのメタデータのインポート情報とともに削除されます。 デフォルトでは、ログは3日間保持されます。

インポートジョブをキャンセルする

インポートジョブがCANCELLEDまたはFINISHED状態ではない場合は、必要に応じてインポートジョブをキャンセルできます。 インポートジョブをキャンセルする場合は、ジョブのラベルを指定します。 構文の詳細については、HELP CANCEL LOAD コマンドを実行してください。

関連するシステム構成

次の表に、Spark Loadを使用して送信されるインポートジョブのシステムレベルパラメータを示します。 fe.confでパラメータを変更できます。

パラメータ

説明

enable-spark-load

Spark Loadとリソース作成機能を有効にするかどうかを指定します。

デフォルト値:false。Spark Loadとリソース作成機能を無効にすることを指定します。

spark-load-default-timeout-second

タスクのデフォルトのタイムアウト期間。インポートジョブ。

単位:秒。 デフォルト値:259200。3日間を指定します。

spark-home-default-dir

Sparkクライアントが存在するディレクトリ。

デフォルト値:fe/lib/spark2x

spark-launcher-log-dir

Spark依存関係パッケージが存在するディレクトリ。

デフォルトでは、このパラメータは空のままです。

spark-launcher-log-dir

Sparkクライアントによって送信されたログが存在するディレクトリ。

デフォルト値:fe/log/spark-launcher-log

yarn-client-path

YARNクライアントのバイナリ実行可能ファイルが存在するディレクトリ。

デフォルト値:fe/lib/yarn-client/hadoop/bin/yarn

yarn-config-dir

YARNクライアントの構成ファイルが存在するディレクトリ。

デフォルト値:fe/lib/yarn-config

ベストプラクティス

Spark Loadは、数十 GB または TB のデータをHDFSファイルからインポートするシナリオに最適です。 少量のデータをインポートする場合は、Stream LoadまたはBroker Loadを使用することをお勧めします。

完全なサンプルコードについては、GitHubの03_sparkLoad2StarRocks.mdを参照してください。