すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Spark Load

最終更新日:Jan 11, 2025

このトピックでは、Spark Loadインポート方式について説明します。

背景情報

  • Spark Loadは非同期インポート方式です。MySQLプロトコルを使用してSparkタイプのインポートジョブを作成し、SHOW LOAD コマンドを実行してインポート結果を表示する必要があります。

  • Spark Loadは、Sparkクラスタのリソースを使用してインポートするデータをソートし、Dorisのバックエンド(BE)ノードを使用してファイルを直接書き込みます。これにより、大量の履歴データを移行する必要があるシナリオで、Dorisクラスタのリソース使用量と負荷を大幅に削減できます。

Sparkクラスタがなく、外部ストレージから履歴データを簡単にすばやく移行したい場合は、Broker Loadインポート方式を使用できます。詳細については、「Broker Load」をご参照ください。Spark Loadと比較して、Broker LoadはDorisクラスタのリソースをより多く消費します。

シナリオ

Spark Loadを使用すると、外部Sparkリソースを使用してインポートするデータを前処理できます。これにより、大量のデータをDorisにインポートするパフォーマンスが向上し、Dorisクラスタのコンピューティングリソースが節約されます。 Spark Loadは、主に大量のデータが初めてDorisにインポートされるシナリオで使用されます。

  • ソースデータは、Sparkからアクセスできるストレージシステム(Hadoop Distributed File System(HDFS)など)に格納されます。

  • 10 GBからテラバイト以上のデータが関係します。

説明

データ量が少なく、上記の要件を満たしていない場合は、Stream LoadまたはBroker Loadインポート方式を使用することをお勧めします。詳細については、「Stream Load」および「Broker Load」をご参照ください。

ワークフロー

MySQLクライアントを使用して、Sparkタイプのインポートジョブを送信できます。フロントエンド(FE)ノードはメタデータを記録し、送信が成功したことを示すメッセージを返します。次の情報は、Spark Loadのワークフローを示しています。

+
                 | 
            +----v----+
            |   FE    |---------------------------------+
            +----+----+                                 |
                 | 3. FE send push tasks                |
                 | 5. FE publish version                |
    +------------+------------+                         |
    |            |            |                         |
+---v---+    +---v---+    +---v---+                     |
|  BE   |    |  BE   |    |  BE   |                     |1. FE submit Spark ETL job  // FE は Spark ETL ジョブを送信します
+---^---+    +---^---+    +---^---+                     |
    |4. BE push with broker   |                         | // 4. BE は broker を使用してプッシュします
+---+---+    +---+---+    +---+---+                     |
|Broker |    |Broker |    |Broker |                     |
+---^---+    +---^---+    +---^---+                     |
    |            |            |                         |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
|               HDFS              +------->       Spark cluster         | // 2. ETL Spark クラスタ
|                                 <-------+                             |
+---------------------------------+       +-----------------------------+

ワークフローは、次の5つのステージで構成されています。

  1. FEノードは、抽出、変換、ロード(ETL)ジョブをSparkクラスタにスケジュールして送信し、実行します。

  2. SparkクラスタはETLジョブを実行して、インポートするデータを前処理します。これには、BITMAPタイプのグローバル辞書の構築、データのパーティション分割、ソート、集計が含まれます。

  3. ETLジョブの実行後、FEノードは各パーティションで前処理されたデータのディレクトリを取得し、BEノードにプッシュジョブを実行するようにスケジュールします。

  4. BEノードは、brokerを使用してデータを読み取り、データ形式をDorisでサポートされているストレージ形式に変換します。

  5. FEノードはバージョンを公開し、インポートジョブを完了します。

グローバル辞書

シナリオ

  • DorisのBitmapカラムは、roaring bitmapsを使用して実現されます。 roaring bitmapsを使用してインポートされたデータは整数である必要があります。データをインポートする前にビットマップカラムを事前に計算するには、データ型をIntegerに変換します。

  • データのインポート中に、グローバル辞書はHiveテーブルに基づいて、生データとマッピングされたエンコードデータのデータ構造を格納します。

構築プロセス

  1. データソースからデータを読み取り、hive_table という名前の一時Hiveテーブルにデータを格納します。

  2. hive_table テーブルのデータを重複排除し、重複排除された生データを distinct_value_table という名前のHiveテーブルに格納します。

  3. dict_table という名前のグローバル辞書テーブルを作成し、1つのカラムに生データを、別のカラムにエンコードデータを格納します。

  4. LEFT JOIN 操作を distinct_value_table テーブルと dict_table テーブルに対して実行します。ウィンドウ関数を使用して LEFT JOIN 結果の生データをエンコードし、生データとエンコードデータの列を dict_table テーブルに書き戻します。

  5. dict_table テーブルと hive_table テーブルでJOIN操作を実行します。 hive_table テーブルの生データをエンコードされた整数に置き換えます。

  6. hive_table テーブルのデータは、後続の操作で読み取りおよび計算され、Dorisにインポートされます。

データ前処理 (DPP)

プロセス

  1. HDFSファイルまたはHiveテーブルからデータを読み取ります。

  2. データに対してフィールドマッピングと式ベースの計算を実行し、パーティション情報に基づいてデータバケットを生成するための bucket_id という名前のフィールドを作成します。

  3. Dorisテーブルのロールアップメタデータに基づいてロールアップツリーを生成します。

  4. ロールアップツリーをトラバースして、異なるレイヤーのデータを集計します。あるレイヤーのロールアップは、前のレイヤーのロールアップに基づいて計算されます。

  5. データが集計された後、データは bucket_id フィールドに基づいて異なるデータバケットに配布され、HDFSに書き込まれます。

  6. その後、brokerはHDFSからファイルを取得し、DorisクラスタのBEノードにインポートします。

Hive Bitmap UDF

Sparkを使用すると、Hiveで生成されたビットマップデータをDorisに直接インポートできます。

ETL ジョブのクラスタを構成する

Spark クラスタを構成する

Sparkは、DorisでETLジョブを実行するための外部リソースを提供します。リソース管理は、Dorisで使用されるこれらの外部リソースを管理するために導入されています。

Sparkタイプのインポートジョブを送信する前に、ETLジョブを実行するようにSparkクラスタを構成する必要があります。次のサンプルコードは、パラメータを構成する方法を示しています。詳細については、このトピックの「リソースを作成する」セクションを参照してください。

-- create spark resource // spark リソースを作成する
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
  type = spark,
  spark_conf_key = spark_conf_value,
  working_dir = path,
  broker = broker_name,
  broker.property_key = property_value,
  broker.hadoop.security.authentication = kerberos,
  broker.kerberos_principal = doris@YOUR.COM,
  broker.kerberos_keytab = /home/doris/my.keytab
  broker.kerberos_keytab_content = ASDOWHDLAWI********ALDJSDIWALD
)

-- drop spark resource // spark リソースを削除する
DROP RESOURCE resource_name

-- show resources // リソースを表示する
SHOW RESOURCES
SHOW PROC "/resources"

-- privileges // 権限
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name

REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

リソースを作成する

resource_nameは、Dorisクラスタ用に構成されたSparkリソースの名前を指定するパラメータです。

PROPERTIESには、Sparkリソースの多くのパラメータが含まれています。次のパラメータが含まれています。

  • type: リソースのタイプ。このパラメータは必須です。値をsparkに設定します。

  • Spark関連のパラメータ:

    • spark.master: 必須。値をyarnまたは spark://host:port に設定します。

    • spark.submit.deployMode: 必須。Sparkプログラムのデプロイモード。 clusterモードとclientモードがサポートされています。

    • spark.hadoop.yarn.resourcemanager.address: spark.master パラメータがyarnに設定されている場合、このパラメータは必須です。

    • spark.hadoop.fs.defaultFS: spark.master パラメータがyarnに設定されている場合、このパラメータは必須です。

    • その他のパラメータはオプションです。詳細については、「Spark Configuration」をご参照ください。

  • working_dir: ETLのリソースが存在するディレクトリ。SparkリソースがETLに使用される場合、このパラメータは必須です。例: hdfs://host:port/tmp/doris

  • broker.hadoop.security.authentication: 認証方法を指定します。値をkerberosに設定します。

  • broker.kerberos_principal: Kerberos認証のプリンシパルを指定します。

  • broker.kerberos_keytab: Kerberos keytabファイルのパスを指定します。ファイルは、brokerプロセスと同じサーバー上に存在し、brokerプロセスからアクセスできる必要があります。

  • broker.kerberos_keytab_content: Kerberos keytabファイルのBase64エンコードされたコンテンツ。このパラメータまたは broker.kerberos_keytab パラメータのいずれかを指定する必要があります。

  • broker: brokerの名前。SparkリソースがETLに使用される場合、このパラメータは必須です。事前に ALTER SYSTEM ADD BROKER コマンドを実行してbrokerを構成する必要があります。

  • broker.property_key: brokerがETLの中間ファイルを読み取るときに必要な認証情報。

例:

  • YARN クラスタモード:

    CREATE EXTERNAL RESOURCE "spark0" // 外部リソース "spark0" を作成します
    PROPERTIES
    (
      "type" = "spark",
      "spark.master" = "yarn",
      "spark.submit.deployMode" = "cluster",
      "spark.jars" = "xxx.jar,yyy.jar",
      "spark.files" = "/tmp/aaa,/tmp/bbb",
      "spark.executor.memory" = "1g",
      "spark.yarn.queue" = "queue0",
      "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
      "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
      "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
      "broker" = "broker0",
      "broker.username" = "user0",
      "broker.password" = "password0"
    );
  • Spark スタンドアロン クライアントモード:

    CREATE EXTERNAL RESOURCE "spark1" // 外部リソース "spark1" を作成します
    PROPERTIES
    (
      "type" = "spark",
      "spark.master" = "spark://127.0.0.1:7777",
      "spark.submit.deployMode" = "client",
      "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
      "broker" = "broker1"
    );

サポートされている Kerberos 認証

Spark Loadを使用してKerberos認証でHadoopクラスタにアクセスする場合、Sparkリソースを作成するときに次のパラメータのみを指定する必要があります。

  • broker.hadoop.security.authentication: 認証方法を指定します。値をkerberosに設定します。

  • broker.kerberos_principal: Kerberos認証のプリンシパルを指定します。

  • broker.kerberos_keytab: Kerberos keytabファイルのパスを指定します。ファイルは、brokerプロセスと同じサーバー上に存在し、brokerプロセスからアクセスできる必要があります。

  • broker.kerberos_keytab_content: Kerberos keytabファイルのBase64エンコードされたコンテンツ。このパラメータまたは kerberos_keytab パラメータのいずれかを指定する必要があります。

例:

CREATE EXTERNAL RESOURCE "spark_on_kerberos" // 外部リソース "spark_on_kerberos" を作成します
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.jars" = "xxx.jar,yyy.jar",
  "spark.files" = "/tmp/aaa,/tmp/bbb",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "queue0",
  "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker0",
  "broker.hadoop.security.authentication" = "kerberos",
  "broker.kerberos_principal" = "doris@YOUR.COM",
  "broker.kerberos_keytab" = "/home/doris/my.keytab"
);

リソースを表示する

  • 通常のアカウントを使用して、USAGE_PRIV権限を持っているリソースのみを表示できます。

  • rootまたはadminアカウントを使用して、すべてのリソースを表示できます。

リソースの権限を管理する

GRANT REVOKEコマンドを実行して、リソースの権限を管理できます。 USAGE_PRIV権限のみがサポートされています。

ユーザーまたはロールにUSAGE_PRIV権限を付与できます。ロールは通常どおり割り当てることができます。

-- Grant the USAGE_PRIV permission on the resource spark0 to the user user0: // リソース spark0 の USAGE_PRIV 権限をユーザー user0 に付与します。
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";

-- Grant the USAGE_PRIV permission on the resource spark0 to the role role0: // リソース spark0 の USAGE_PRIV 権限をロール role0 に付与します。
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";

-- Grant the USAGE_PRIV permission on all resources to the user user0: // すべてのリソースの USAGE_PRIV 権限をユーザー user0 に付与します。
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";

-- Grant the USAGE_PRIV permission on all resources to the role role0: // すべてのリソースの USAGE_PRIV 権限をロール role0 に付与します。
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";

-- Revoke the USAGE_PRIV permission on the resource spark0 from the user user0: // リソース spark0 の USAGE_PRIV 権限をユーザー user0 から取り消します。
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

Spark クライアントを構成する

基盤となるFEノードは、spark-submit コマンドを実行してSpark Loadジョブを送信します。したがって、FEノードのSparkクライアントを構成する必要があります。Spark ダウンロード URL をクリックして、バージョン 2.4.5 以降の Spark 2.x をダウンロードすることをお勧めします。次に、次の手順を実行して Spark クライアントを構成します。

  1. spark_home 環境変数を構成する

    SparkクライアントをFEノードが存在するディレクトリに格納します。FEノードの構成ファイルの spark_home_default_dir パラメータをディレクトリに設定します。このパラメータのデフォルト値は、FEノードのルートディレクトリの lib/spark2x です。このパラメータを空にすることはできません。

  2. Spark 依存関係パッケージを構成する

    Sparkクライアントの jars フォルダにあるすべてのJARパッケージをZIPファイルにパッケージ化します。FEノードの構成ファイルの spark_resource_path パラメータをZIPファイルのディレクトリに設定します。このパラメータが空の場合、FEノードはFEノードのルートディレクトリにある lib/spark2x/jars/spark-2x.zip ファイルを検索します。ファイルが見つからない場合は、ファイルが存在しないことを示すエラーメッセージが返されます。

    Spark Loadジョブが送信されると、アーカイブされた依存関係パッケージはリモートウェアハウスにアップロードされます。デフォルトでは、リモートウェアハウスは working_dir/{cluster_id} ディレクトリにあり、_spark_repository_{resource_name} 形式で名前が付けられます。 {resource-name} フィールドは、リモートウェアハウスに対応するリソースを指定します。次のサンプルコードは、リモートウェアハウスのディレクトリ構造の例を示しています。

    __spark_repository__spark0/
        |-__archive_1.0.0/
        |        |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar
        |        |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
        |-__archive_1.1.0/
        |        |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar
        |        |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip
        |-__archive_1.2.0/
        |        |-...
    説明

    デフォルトでは、Spark依存関係パッケージの名前は spark-2x.zip です。FEノードは、Dynamic Partition Pruning (DPP) の依存関係パッケージもリモートウェアハウスにアップロードします。Spark Loadジョブによって送信されたすべての依存関係ファイルがすでにリモートウェアハウスに存在する場合、依存関係をアップロードする必要がなくなるため、時間を節約できます。

YARN クライアントを構成する

  1. 基盤となるFEノードは、YARN コマンドを実行して、実行中のアプリケーションのステータスを取得し、アプリケーションを終了します。したがって、FEノードのYARNクライアントを構成する必要があります。バージョン 2.5.2 以降の Hadoop 2.0 を使用することをお勧めします。Hadoopクライアントのダウンロード方法の詳細については、Hadoop ダウンロード URL をクリックしてください。

  2. YARNクライアントをFEノードが存在するディレクトリに格納します。FEノードの構成ファイルの yarn_client_path パラメータを、YARNクライアントのバイナリ実行可能ファイルが存在するディレクトリに設定します。このパラメータのデフォルト値は、FEノードのルートディレクトリの lib/yarn-client/hadoop/bin/yarn です。

  3. オプション。FEノードがアプリケーションのステータスを取得したり、アプリケーションを終了したりすると、YARNコマンドを実行するために使用される構成ファイルが lib/yarn-config パスに生成されます。デフォルトでは、lib/yarn-configパスはFEノードのルートディレクトリにあります。FEノードの構成ファイルの yarn_config_dir パラメータを変更することで、構成ファイルのパスを変更できます。生成される構成ファイルには、core-site.xmlyarn-site.xml が含まれます。

インポートジョブを作成する

このセクションでは、Spark Loadジョブの構文のパラメータと使用上の注意について説明します。

説明

構文の詳細については、HELP SPARK LOAD コマンドを実行してください。

  • Spark Load インポートジョブの構文

    LOAD LABEL load_label
        (data_desc, ...)
        WITH RESOURCE resource_name
        [resource_properties]
        [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
        (key2=value2, ...)
  • 例 1: データソースが HDFS ファイルの場合

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1") // データファイル "hdfs://abc.com:8888/user/palo/test/ml/file1"
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1,tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
        DATA INFILE("hdfs://abc.com:8880:8888/user/palo/test/ml/file2") // データファイル "hdfs://abc.com:8888/user/palo/test/ml/file2"
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH RESOURCE 'spark0'
    (
        "spark.executor.memory" = "2g",
        "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • 例 2: データソースが Hive テーブルの場合

    1. 外部Hiveテーブルを作成します。

      CREATE EXTERNAL TABLE hive_t1 // 外部テーブル hive_t1 を作成します
      (
          k1 INT,
          K2 SMALLINT,
          k3 varchar(50),
          uuid varchar(100)
      )
      ENGINE=hive
      properties
      (
      "database" = "tmp",
      "table" = "t1",
      "hive.metastore.uris" = "thrift://0.0.0.0:8080"
      );
                                          
    2. LOADコマンドを送信して、データをインポートするDorisテーブルのカラムが外部Hiveテーブルにも存在することを確認します。

      LOAD LABEL db1.label1
      (
          DATA FROM TABLE hive_t1 // テーブル hive_t1 からのデータ
          INTO TABLE tbl1
          SET
          (
              uuid=bitmap_dict(uuid)
          )
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
  • 例 3: データソースが Hive テーブルの BINARY タイプのデータの場合

    1. 外部Hiveテーブルを作成します。

      CREATE EXTERNAL TABLE hive_t1 // 外部テーブル hive_t1 を作成します
      (
          k1 INT,
          K2 SMALLINT,
          k3 varchar(50),
          uuid varchar(100) // Hive テーブルの BINARY タイプのデータ。
      )
      ENGINE=hive
      properties
      (
      "database" = "tmp",
      "table" = "t1",
      "hive.metastore.uris" = "thrift://0.0.0.0:8080"
      );
    2. LOADコマンドを送信して、データをインポートするDorisテーブルのカラムが外部Hiveテーブルにも存在することを確認します。

      LOAD LABEL db1.label1
      (
          DATA FROM TABLE hive_t1 // テーブル hive_t1 からのデータ
          INTO TABLE tbl1
          SET
          (
              uuid=binary_bitmap(uuid)
          )
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
  • 例 4: パーティション分割された Hive テーブルからデータをインポートする

    • Hive テーブル作成ステートメント

      create table test_partition( // テーブル test_partition を作成します
          id int,
          name string,
          age int
      )
      partitioned by (dt string) // dt 文字列でパーティション分割
      row format delimited fields terminated by ',' // カンマ区切りのフィールド
      stored as textfile; // テキストファイルとして保存
                                          
    • Doris テーブル作成ステートメント

      CREATE TABLE IF NOT EXISTS test_partition_04 // テーブル test_partition_04 を作成します(存在しない場合)
      (
          dt date,
          id int,
          name string,
          age int
      )
      UNIQUE KEY(`dt`, `id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
          "replication_allocation" = "tag.location.default: 1"
      );
    • Spark Load インポートステートメント

      CREATE EXTERNAL RESOURCE "spark_resource" // 外部リソース "spark_resource" を作成します
      PROPERTIES
      (
      "type" = "spark",
      "spark.master" = "yarn",
      "spark.submit.deployMode" = "cluster",
      "spark.executor.memory" = "1g",
      "spark.yarn.queue" = "default",
      "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056",
      "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000",
      "working_dir" = "hdfs://localhost:9000/tmp/doris",
      "broker" = "broker_01"
      );
      LOAD LABEL demo.test_hive_partition_table_18
      (
          DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*") // データファイル "hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*"
          INTO TABLE test_partition_04
          COLUMNS TERMINATED BY ","
          FORMAT AS "csv"
          (id,name,age)
          COLUMNS FROM PATH AS (`dt`)
          SET
          (
              dt=dt,
              id=id,
              name=name,
              age=age
          )
      )
      WITH RESOURCE 'spark_resource'
      (
          "spark.executor.memory" = "1g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );

パラメータと関連する説明

  • データ記述のパラメータ

    サポートされているデータソースは、CSVファイルとHiveテーブルのみです。その他のルールは、「Broker Load」のルールと一致しています。

  • インポートジョブのパラメータ

    Spark Loadインポートジョブのパラメータは、opt_propertiesパラメータに含まれています。これらのパラメータは、インポートジョブに適用されます。その他のルールは、「Broker Load」のルールと一致しています。

  • Spark リソースのパラメータ

    Sparkリソースのパラメータは、事前にDorisクラスタで構成する必要があります。 Spark Loadは、ユーザーにUSAGE_PRIV権限が付与された後にのみ使用できます。

    ジョブのリソースを追加するためにSpark構成を変更するなど、一時的な要件がある場合は、次の例を参照できます。

    WITH RESOURCE 'spark0'
    (
      "spark.driver.memory" = "1g",
      "spark.executor.memory" = "3g"
    )
    説明

    変更はこのジョブにのみ有効で、Dorisクラスタの既存の構成には影響しません。

  • Hive テーブルからデータをインポートする

    Hiveテーブルからデータをインポートするには、外部Hiveテーブルを作成し、インポートコマンドを送信するときにテーブル名を指定します。

  • データインポート中にグローバル辞書を作成する

    Dorisテーブルの集計カラムがビットマップカラムの場合、グローバル辞書を作成できます。グローバル辞書を作成するには、LOAD コマンドで、次の形式でグローバル辞書を作成するフィールドを指定します。Doris フィールド名=bitmap_dict。 binary_dictをHiveテーブルのフィールド名に置き換えます。

    重要

    Hiveテーブルからデータをインポートする場合にのみ、グローバル辞書を作成できます。

  • データ型が BINARY の Hive カラムのデータをインポートする

    これは、Dorisテーブルの集計カラムがビットマップカラムであり、ソースHiveテーブルの対応するカラムのデータ型がBINARYであり、FEノードのspark-dppの org.apache.doris.load.loadv2.dpp.BitmapValue クラスを使用してシリアル化されているシナリオに適用されます。

    グローバル辞書を作成するには、LOAD コマンドで、次の形式で対応するフィールドを指定します。Doris フィールド名=binary_bitmap。 binary_bitmapをHiveテーブルのフィールド名に置き換えます。

    重要

    データソースがHiveテーブルの場合にのみ、BINARYタイプのデータをインポートできます。

インポートジョブを表示する

Spark Loadのインポート方法は、Broker Loadと同じく非同期です。したがって、インポートジョブのラベルを記録し、SHOW LOADコマンドでラベルを使用してインポート結果を表示する必要があります。このコマンドは、すべての方法で送信されたインポートジョブを表示するために一般的に使用されます。詳細については、HELP SHOW LOAD コマンドを実行してください。例:

show load order by createtime desc limit 1\G

次の出力が返されます。

*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED // 状態: 完了
      Progress: ETL:100%; LOAD:100% // 進捗: ETL:100%; LOAD:100%
          Type: SPARK // タイプ: SPARK
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42 // 作成時間: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44 // ETL開始時間: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44 // ETL終了時間: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44 // ロード開始時間: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16 // ロード終了時間: 2019-07-27 11:50:16
           URL: http://1.1.*.*:80**/proxy/application_15866****3848_0035/
    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}

返された結果のパラメータの詳細については、「Broker Load」を参照してください。次の情報は、2つのインポート方法で返される結果のパラメータの違いについて説明しています。

  • Stateパラメータは、インポートジョブのステータスを示します。インポートジョブが送信されると、インポートジョブはPENDING状態になります。ETLジョブが送信されると、インポートジョブはETL状態になります。ETLジョブが完了し、FEノードがBEノードにプッシュジョブを実行するようにスケジュールすると、インポートジョブはLOADING状態になります。プッシュジョブが完了し、Dorisバージョンが公開されると、インポートジョブはFINISHED状態になります。

    インポートジョブが完了すると、インポートジョブはCANCELLEDまたはFINISHED状態になります。インポートジョブは、これらの2つの状態のいずれかになると完了です。 CANCELLED状態は、インポートジョブが失敗したことを示し、FINISHED状態は、インポートジョブが成功したことを示します。

  • Progressパラメータは、インポートジョブの進捗状況を示します。これには、ETLの進捗状況とLOADの進捗状況が含まれます。 ETLの進捗状況は、ETL状態の進捗状況を示します。 LOADの進捗状況は、LOADING状態の進捗状況を示します。

    LOAD progress = Number of imported tablets in all replicas/Total number of tablets in the import job × 100%   // LOAD の進捗 = すべてレプリカでインポートされたタブレット数 / インポートジョブのタブレット総数 × 100%
    

    LOADの進捗状況は、0~100%になります。すべてのテーブルがインポートされると、LOADの進捗状況は99%になります。インポートが有効になった後にのみ、LOADの進捗状況は100%になります。

    説明

    インポートの進捗状況は線形ではありません。したがって、一定期間進捗状況が変化しない場合でも、インポートが必ずしも一時停止しているとは限りません。

  • Typeパラメータは、インポートジョブのタイプを示します。Spark Loadが使用されるシナリオでは、タイプはSPARKです。

  • 次の情報は、結果の他のいくつかのパラメータについて説明しています。

    • CreateTime: インポートジョブが作成された時刻。

    • EtlStartTime: インポートジョブがETL状態になった時刻。

    • EtlFinishTime: インポートジョブがETL状態ではなくなった時刻。

    • LoadStartTime: インポートジョブがLOADING状態になった時刻。

    • LoadFinishTime: インポートジョブが完了した時刻。

  • JobDetailsパラメータは、インポートジョブの実行ステータスの詳細を示します。このパラメータは、インポートジョブがETL状態ではなくなったときに更新されます。詳細は、インポートされたファイルの数、インポートされたファイルの合計サイズ、タスクの数、および処理された生データ行の数が含まれます。例: {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}

  • URLパラメータは、アプリケーションWebページのURLを示します。URLをブラウザに貼り付けて、対応するアプリケーションのWebページにアクセスできます。

ログを表示する

Spark Loadジョブが送信されると、ログが生成されます。デフォルトでは、ログはFEノードのルートディレクトリの log/spark_launcher_log パスに格納され、spark_launcher_{load_job_id}_{label}.log という形式で名前が付けられます。

説明

ログは一定期間このディレクトリに保存され、FEノードのメタデータにあるインポート情報とともに削除されます。デフォルトの保存期間は3日間です。

インポートジョブをキャンセルする

インポートジョブがCANCELLEDまたはFINISHED状態ではない場合、必要に応じてインポートジョブをキャンセルできます。インポートジョブをキャンセルするときは、ジョブのラベルを指定します。インポートジョブをキャンセルする構文の詳細については、HELP CANCEL LOAD コマンドを実行してください。

関連するシステム構成

このセクションでは、Spark Loadを使用して送信されたインポートジョブのシステムレベルのパラメータについて説明します。パラメータはすべてのSpark Loadインポートジョブに適用されます。 fe.conf ファイルでパラメータを変更できます。

  • enable_spark_load: Spark Loadとリソース作成機能を有効にするかどうかを指定します。デフォルト値: false。Spark Loadとリソース作成機能を無効にすることを指定します。

  • spark_load_default_timeout_second: インポートジョブのデフォルトのタイムアウト期間。単位: 秒。デフォルト値: 259200。3日間を指定します。

  • spark_home_default_dirspark: Sparkクライアントが存在するディレクトリ。デフォルト値: fe/lib/spark2x

  • spark_resource_path: Spark依存関係パッケージが存在するディレクトリ。デフォルトでは、このパラメータは空です。

  • spark_launcher_log_dirspark: Sparkクライアントによって送信されたログが存在するディレクトリ。デフォルト値: fe/log/spark_launcher_log

  • yarn_client_pathyarn: YARNクライアントのバイナリ実行可能ファイルが存在するディレクトリ。デフォルト値: fe/lib/yarn-client/hadoop/bin/yarn

  • yarn_config_diryarn: 構成ファイルが生成されるパスは、fe/lib/yarn-config です。