All Products
Search
Document Center

E-MapReduce:Spark Load

Last Updated:Apr 28, 2024

This topic describes the Spark Load import method.

Background information

  • Spark Load is an asynchronous import method. You must create import jobs of the Spark type by using the MySQL protocol and view import results by running the SHOW LOAD command.

  • Spark Load sorts the data to be imported by using the resources of Spark clusters and uses the backend (BE) nodes of Doris to directly write files. This can greatly reduce the resource usage and loads of Doris clusters for the scenario where massive historical data needs to be migrated.

If you do not have a Spark cluster and want to quickly migrate historical data from external storage with ease, you can use the Broker Load import method. For more information, see Broker Load. Compared with Spark Load, Broker Load consumes more resources of Doris clusters.

Scenarios

Spark Load allows you to use external Spark resources to preprocess data to be imported. This improves the performance of importing a large amount of data to Doris and saves the computing resources of Doris clusters. Spark Load is mainly used in scenarios where a large amount of data is imported to Doris for the first time.

  • Source data is stored in a storage system that you can access from Spark, such as Hadoop Distributed File System (HDFS).

  • More than 10 GB to terabytes of data is involved.

Note

If the amount of data is small or does not meet the preceding requirements, we recommend that you use the Stream Load or Broker Load import method. For more information, see Stream Load and Broker Load.

Workflow

You can submit an import job of the Spark type by using the MySQL client. The frontend (FE) node records the metadata and returns a message indicating that the submission is successful. The following information shows the workflow of Spark Load.

+
                 | 
            +----v----+
            |   FE    |---------------------------------+
            +----+----+                                 |
                 | 3. FE send push tasks                |
                 | 5. FE publish version                |
    +------------+------------+                         |
    |            |            |                         |
+---v---+    +---v---+    +---v---+                     |
|  BE   |    |  BE   |    |  BE   |                     |1. FE submit Spark ETL job
+---^---+    +---^---+    +---^---+                     |
    |4. BE push with broker   |                         |
+---+---+    +---+---+    +---+---+                     |
|Broker |    |Broker |    |Broker |                     |
+---^---+    +---^---+    +---^---+                     |
    |            |            |                         |
+---+------------+------------+---+ 2.ETL +-------------v---------------+
|               HDFS              +------->       Spark cluster         |
|                                 <-------+                             |
+---------------------------------+       +-----------------------------+

The workflow consists of the following five stages:

  1. The FE node schedules and submits an extract, transform, and load (ETL) job to a Spark cluster for execution.

  2. The Spark cluster runs the ETL job to preprocess the data to be imported, including building a global dictionary of the BITMAP type, partitioning, sorting, and aggregating data.

  3. After the ETL job is run, the FE node obtains the directory of the preprocessed data in each partition and schedules BE nodes to run a push job.

  4. The BE nodes read data by using brokers and convert the data format into the storage format supported by Doris.

  5. The FE node publishes a version and completes the import job.

Global dictionary

Scenarios

  • Bitmap columns in Doris are realized by using roaring bitmaps. The data imported by using roaring bitmaps must be integers. To precalculate bitmap columns before you import data, convert the data type into Integer.

  • During data import, a global dictionary stores the data structures of raw data and mapped encoded data based on a Hive table.

Build process

  1. Read data from a data source and store the data in a temporary Hive table named hive_table.

  2. Deduplicate data in the hive_table table and store the deduplicated raw data in a Hive table named distinct_value_table.

  3. Create a global dictionary table named dict_table to store raw data in a column and encoded data in another column.

  4. Perform a LEFT JOIN operation on the distinct_value_table and dict_table tables. Use a window function to encode the raw data of the LEFT JOIN results, and write the columns of raw data and encoded data back to the dict_table table.

  5. Perform a JOIN operation on the dict_table and hive_table tables. Replace the raw data in the hive_table table with the encoded integers.

  6. Data in the hive_table table is read and calculated in the subsequent operations, and then imported to Doris.

Data preprocessing (DPP)

Process

  1. Read data from an HDFS file or a Hive table.

  2. Perform field mappings and expression-based calculations on the data and create a field called bucket_id for generating data buckets based on the partition information.

  3. Generate rollup trees based on the rollup metadata of Doris tables.

  4. Traverse rollup trees to aggregate data in different layers. Rollups in a layer are calculated based on rollups in the previous layer.

  5. After data is aggregated, data is distributed to different data buckets based on the bucket_id filed and written into HDFS.

  6. After that, brokers pull the files from HDFS and import them to the BE nodes of a Doris cluster.

Hive Bitmap UDF

Spark allows you to directly import the bitmap data that is generated in Hive to Doris.

Configure clusters for ETL jobs

Configure a Spark cluster

Spark provides external resources for you to run ETL jobs in Doris. Resource Management is introduced to manage these external resources that are used by Doris.

Before you submit an import job of the Spark type, you must configure a Spark cluster to run the ETL job. The following sample code shows how to configure the parameters. For more information, see the "Create a resource" section of this topic.

-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
  type = spark,
  spark_conf_key = spark_conf_value,
  working_dir = path,
  broker = broker_name,
  broker.property_key = property_value,
  broker.hadoop.security.authentication = kerberos,
  broker.kerberos_principal = doris@YOUR.COM,
  broker.kerberos_keytab = /home/doris/my.keytab
  broker.kerberos_keytab_content = ASDOWHDLAWI********ALDJSDIWALD
)

-- drop spark resource
DROP RESOURCE resource_name

-- show resources
SHOW RESOURCES
SHOW PROC "/resources"

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identity
GRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name

REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identity
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name

Create a resource

resource_name is the parameter that specifies the name of the Spark resource configured for the Doris clusters.

PROPERTIES contains a number of parameters of the Spark resource. The following parameters are included:

  • type: the type of the resource. This parameter is required. Set the value to spark.

  • Spark-related parameters:

    • spark.master: required. Set the value to yarn or spark://host:port.

    • spark.submit.deployMode: required. The deployment mode of the Spark program. The cluster and client modes are supported.

    • spark.hadoop.yarn.resourcemanager.address: If the spark.master parameter is set to yarn, this parameter is required.

    • spark.hadoop.fs.defaultFS: If the spark.master parameter is set to yarn, this parameter is required.

    • Other parameters are optional. For more information, see Spark Configuration.

  • working_dir: The directory in which the resource for ETL resides. This parameter is required if the Spark resource is used for ETL. Example: hdfs://host:port/tmp/doris.

  • broker.hadoop.security.authentication: specifies the authentication method. Set the value to kerberos.

  • broker.kerberos_principal: specifies the principle for Kerberos authentication.

  • broker.kerberos_keytab: specifies the path of the Kerberos keytab file. The file must reside on the same server as the broker processes and can be accessed by the broker processes.

  • broker.kerberos_keytab_content: the Base64-encoded content of the Kerberos keytab file. You need to specify either this parameter or the broker.kerberos_keytab parameter.

  • broker: the name of the broker. This parameter is required if the Spark resource is used for ETL. You must run the ALTER SYSTEM ADD BROKER command to configure the broker in advance.

  • broker.property_key: the authentication information that is required when the broker reads the intermediate file of ETL.

Examples:

  • YARN cluster mode:

    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
      "type" = "spark",
      "spark.master" = "yarn",
      "spark.submit.deployMode" = "cluster",
      "spark.jars" = "xxx.jar,yyy.jar",
      "spark.files" = "/tmp/aaa,/tmp/bbb",
      "spark.executor.memory" = "1g",
      "spark.yarn.queue" = "queue0",
      "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
      "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
      "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
      "broker" = "broker0",
      "broker.username" = "user0",
      "broker.password" = "password0"
    );
  • Spark standalone client mode:

    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
      "type" = "spark",
      "spark.master" = "spark://127.0.0.1:7777",
      "spark.submit.deployMode" = "client",
      "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
      "broker" = "broker1"
    );

Supported Kerberos authentication

If Spark Load is used to access a Hadoop cluster with Kerberos authentication, you need to only specify the following parameters when you create a Spark resource:

  • broker.hadoop.security.authentication: specifies the authentication method. Set the value to kerberos.

  • broker.kerberos_principal: specifies the principle for Kerberos authentication.

  • broker.kerberos_keytab: specifies the path of the Kerberos keytab file. The file must reside on the same server as the broker processes and can be accessed by the broker processes.

  • broker.kerberos_keytab_content: the Base64-encoded content of the Kerberos keytab file. You need to specify either this parameter or the kerberos_keytab parameter.

Example:

CREATE EXTERNAL RESOURCE "spark_on_kerberos"
PROPERTIES
(
  "type" = "spark",
  "spark.master" = "yarn",
  "spark.submit.deployMode" = "cluster",
  "spark.jars" = "xxx.jar,yyy.jar",
  "spark.files" = "/tmp/aaa,/tmp/bbb",
  "spark.executor.memory" = "1g",
  "spark.yarn.queue" = "queue0",
  "spark.hadoop.yarn.resourcemanager.address" = "127.0.0.1:9999",
  "spark.hadoop.fs.defaultFS" = "hdfs://127.0.0.1:10000",
  "working_dir" = "hdfs://127.0.0.1:10000/tmp/doris",
  "broker" = "broker0",
  "broker.hadoop.security.authentication" = "kerberos",
  "broker.kerberos_principal" = "doris@YOUR.COM",
  "broker.kerberos_keytab" = "/home/doris/my.keytab"
);

View resources

  • You can view resources only on which you have the USAGE_PRIV permission by using an ordinary account.

  • You can view all resources by using the root or admin account.

Manage permissions on resources

You can run the GRANT REVOKE command to manage permissions on resources. Only the USAGE_PRIV permission is supported.

You can grant the USAGE_PRIV permission to a user or role. The role can be assigned as usual.

-- Grant the USAGE_PRIV permission on the resource spark0 to the user user0:
GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";

-- Grant the USAGE_PRIV permission on the resource spark0 to the role role0:
GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";

-- Grant the USAGE_PRIV permission on all resources to the user user0:
GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";

-- Grant the USAGE_PRIV permission on all resources to the role role0:
GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";

-- Revoke the USAGE_PRIV permission on the resource spark0 from the user user0:
REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

Configure the Spark client

The underlying FE node runs the spark-submit command to submit Spark Load jobs. Therefore, you must configure the Spark client for the FE node. We recommend that you click Spark download URL to download Spark of version 2.4.5 or a version later than Spark 2.x. Then, perform the following steps to configure the Spark client.

  1. Configure the spark_home environment variable

    Store the Spark client in the directory in which the FE node resides. Set the spark_home_default_dir parameter in the configuration file of the FE node to the directory. The default value of this parameter is lib/spark2x in the root directory of the FE node. This parameter cannot be left empty.

  2. Configure the Spark dependency package

    Package all JAR packages in the jars folder of the Spark client into a ZIP file. Set the spark_resource_path parameter in the configuration file of the FE node to the directory of the ZIP file. If this parameter is left empty, the FE node searches the lib/spark2x/jars/spark-2x.zip file in the root directory of the FE node. If the file is not found, an error message that indicates the file does not exist is returned.

    When a Spark Load job is submitted, the archived dependency package is uploaded to a remote warehouse. By default, the remote warehouse resides in the working_dir/{cluster_id} directory and is named in the _spark_repository_{resource_name} format. The {resource-name} field specifies the resource that corresponds to the remote warehouse. The following sample code provides an example of the directory structure of a remote warehouse:

    __spark_repository__spark0/
        |-__archive_1.0.0/
        |        |-__lib_990325d2c0d1d5e45bf675e54e44fb16_spark-dpp-1.0.0-jar-with-dependencies.jar
        |        |-__lib_7670c29daf535efe3c9b923f778f61fc_spark-2x.zip
        |-__archive_1.1.0/
        |        |-__lib_64d5696f99c379af2bee28c1c84271d5_spark-dpp-1.1.0-jar-with-dependencies.jar
        |        |-__lib_1bbb74bb6b264a270bc7fca3e964160f_spark-2x.zip
        |-__archive_1.2.0/
        |        |-...
    Note

    By default, the Spark dependency package is named spark-2x.zip. The FE node also uploads the dependency package of Dynamic Partition Pruning (DPP) to the remote warehouse. If all dependency files that are submitted by the Spark Load job already exist in the remote warehouse, the dependencies are no longer needed to be uploaded, which saves time.

Configure the YARN client

  1. The underlying FE node runs YARN commands to obtain the status of ongoing applications and terminate applications. Therefore, you need to configure the YARN client for the FE node. We recommend that you use Hadoop of version 2.5.2 or a version later than Hadoop 2.0. For more information about how to download the Hadoop client, click Hadoop download URL.

  2. Store the YARN client in the directory in which the FE node resides. Set the yarn_client_path parameter in the configuration file of the FE node to the directory in which the binary executable file of the YARN client resides. The default value of this parameter is lib/yarn-client/hadoop/bin/yarn in the root directory of the FE node.

  3. Optional. When the FE node obtains the status of applications or terminates applications, configuration files used to run YARN commands are generated in the lib/yarn-config path. By default, the lib/yarn-config path is in the root directory of the FE node. You can change the path of the configuration files by modifying the yarn_config_dir parameter in the configuration file of the FE node. The generated configuration files include core-site.xml and yarn-site.xml.

Create an import job

This section describes the parameters in the syntax of the Spark Load job and usage notes.

Note

For more information about the syntax, run the HELP SPARK LOAD command.

  • Syntax of the Spark Load import job

    LOAD LABEL load_label
        (data_desc, ...)
        WITH RESOURCE resource_name
        [resource_properties]
        [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
        (key2=value2, ...)
  • Example 1: The data source is an HDFS file

    LOAD LABEL db1.label1
    (
        DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
        INTO TABLE tbl1
        COLUMNS TERMINATED BY ","
        (tmp_c1,tmp_c2)
        SET
        (
            id=tmp_c2,
            name=tmp_c1
        ),
        DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file2")
        INTO TABLE tbl2
        COLUMNS TERMINATED BY ","
        (col1, col2)
        where col1 > 1
    )
    WITH RESOURCE 'spark0'
    (
        "spark.executor.memory" = "2g",
        "spark.shuffle.compress" = "true"
    )
    PROPERTIES
    (
        "timeout" = "3600"
    );
  • Example 2: The data source is a Hive table

    1. Create an external Hive table.

      CREATE EXTERNAL TABLE hive_t1
      (
          k1 INT,
          K2 SMALLINT,
          k3 varchar(50),
          uuid varchar(100)
      )
      ENGINE=hive
      properties
      (
      "database" = "tmp",
      "table" = "t1",
      "hive.metastore.uris" = "thrift://0.0.0.0:8080"
      );
                                          
    2. Submit the LOAD command to ensure that the column in the Doris table to which the data is to be imported also resides in the external Hive table.

      LOAD LABEL db1.label1
      (
          DATA FROM TABLE hive_t1
          INTO TABLE tbl1
          SET
          (
              uuid=bitmap_dict(uuid)
          )
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
  • Example 3: The data source is the data of the BINARY type in the Hive table

    1. Create an external Hive table.

      CREATE EXTERNAL TABLE hive_t1
      (
          k1 INT,
          K2 SMALLINT,
          k3 varchar(50),
          uuid varchar(100) // The data of the BINARY type in the Hive table.
      )
      ENGINE=hive
      properties
      (
      "database" = "tmp",
      "table" = "t1",
      "hive.metastore.uris" = "thrift://0.0.0.0:8080"
      );
    2. Submit the LOAD command to ensure that the column in the Doris table to which the data is to be imported also resides in the external Hive table.

      LOAD LABEL db1.label1
      (
          DATA FROM TABLE hive_t1
          INTO TABLE tbl1
          SET
          (
              uuid=binary_bitmap(uuid)
          )
      )
      WITH RESOURCE 'spark0'
      (
          "spark.executor.memory" = "2g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );
  • Example 4: Import data from a partitioned Hive table

    • Hive table creation statements

      create table test_partition(
          id int,
          name string,
          age int
      )
      partitioned by (dt string)
      row format delimited fields terminated by ','
      stored as textfile;
                                          
    • Doris table creation statements

      CREATE TABLE IF NOT EXISTS test_partition_04
      (
          dt date,
          id int,
          name string,
          age int
      )
      UNIQUE KEY(`dt`, `id`)
      DISTRIBUTED BY HASH(`id`) BUCKETS 1
      PROPERTIES (
          "replication_allocation" = "tag.location.default: 1"
      );
    • Spark Load import statements

      CREATE EXTERNAL RESOURCE "spark_resource"
      PROPERTIES
      (
      "type" = "spark",
      "spark.master" = "yarn",
      "spark.submit.deployMode" = "cluster",
      "spark.executor.memory" = "1g",
      "spark.yarn.queue" = "default",
      "spark.hadoop.yarn.resourcemanager.address" = "localhost:50056",
      "spark.hadoop.fs.defaultFS" = "hdfs://localhost:9000",
      "working_dir" = "hdfs://localhost:9000/tmp/doris",
      "broker" = "broker_01"
      );
      LOAD LABEL demo.test_hive_partition_table_18
      (
          DATA INFILE("hdfs://localhost:9000/user/hive/warehouse/demo.db/test/dt=2022-08-01/*")
          INTO TABLE test_partition_04
          COLUMNS TERMINATED BY ","
          FORMAT AS "csv"
          (id,name,age)
          COLUMNS FROM PATH AS (`dt`)
          SET
          (
              dt=dt,
              id=id,
              name=name,
              age=age
          )
      )
      WITH RESOURCE 'spark_resource'
      (
          "spark.executor.memory" = "1g",
          "spark.shuffle.compress" = "true"
      )
      PROPERTIES
      (
          "timeout" = "3600"
      );

Parameters and related instructions

  • Parameters on data description

    Only CSV files and Hive tables are supported data sources. Other rules are consistent with the rules of Broker Load.

  • Parameters of the import job

    The parameters of the Spark Load import job are included in the opt_properties parameter. These parameters apply to the import job. Other rules are consistent with the rules of Broker Load.

  • Parameters of Spark resources

    Parameters of Spark resources must be configured in the Doris cluster in advance. Spark Load can be used only after the user is granted the USAGE_PRIV permission.

    If you have temporary requirements, such as modifying the Spark configurations to add resources for the job, you can refer to the following example:

    WITH RESOURCE 'spark0'
    (
      "spark.driver.memory" = "1g",
      "spark.executor.memory" = "3g"
    )
    Note

    The modifications take effect only for this job and do not affect the existing configurations of the Doris cluster.

  • Import data from a Hive table

    To import data from a Hive table, create an external Hive table and specify the table name when you submit the import command.

  • Create a global dictionary during data import

    You can create a global dictionary if the aggregated column of the Doris table is the bitmap column. To create a global dictionary, specify the field for which you want to create a global dictionary in the LOAD command in the following format: Doris field name=bitmap_dict. Replace binary_dict with a field name in the Hive table.

    Important

    You can create a global dictionary only if you import data from a Hive table.

  • Import data in the Hive column whose data type is BINARY

    This is applicable to the scenario where the aggregated column of the Doris table is the bitmap column, and the data type of the corresponding column in the source Hive table is BINARY, which is serialized by using the org.apache.doris.load.loadv2.dpp.BitmapValue class in spark-dpp of the FE node.

    To build a global dictionary, specify the corresponding fields in the LOAD command in the following format: Doris field name=binary_bitmap. Replace binary_bitmap with a field name in the Hive table.

    Important

    You can import data of the BINARY type only when the data source is a Hive table.

View the import job

The import method of Spark Load is asynchronous, which is the same as that of Broker Load. Therefore, you must record the label of the import job and use the label in the SHOW LOAD command to view the import results. The command is commonly used to view import jobs submitted in all methods. For more information, run the HELP SHOW LOAD command. Example:

show load order by createtime desc limit 1\G

The following output is returned:

*************************** 1. row ***************************
         JobId: 76391
         Label: label1
         State: FINISHED
      Progress: ETL:100%; LOAD:100%
          Type: SPARK
       EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
      TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
      ErrorMsg: N/A
    CreateTime: 2019-07-27 11:46:42
  EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
           URL: http://1.1.*.*:80**/proxy/application_15866****3848_0035/
    JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}

For more information about the parameters in the returned result, see Broker Load. The following information describes the differences of the parameters in the returned result between the two import methods:

  • The State parameter indicates the status of the import job. After the import job is submitted, the import job enters the PENDING state. After the ETL job is submitted, the import job enters the ETL state. After the ETL job is complete and the FE node schedules BE nodes to run a push job, the import job enters the LOADING state. After the push job is complete and the Doris version is published, the import job enters the FINISHED state.

    After the import job is complete, the import job enters the CANCELLED or FINISHED state. The import job is complete when it is in one of these two states. The CANCELLED state indicates that the import job fails, whereas the FINISHED state indicates that the import job is successful.

  • The Progress parameter indicates the progress of the import job, which includes the ETL progress and the LOAD progress. The ETL progress indicates the progress in the ETL state. The LOAD progress indicates the progress in the LOADING state.

    LOAD progress = Number of imported tablets in all replicas/Total number of tablets in the import job × 100%   

    The LOAD progress can be 0 to 100%. If all tables are imported, the LOAD progress is 99%. The LOAD progress becomes 100% only after the import takes effect.

    Note

    The import progress is not linear. Therefore, if the progress remains unchanged within a time period, the import is not necessarily paused.

  • The Type parameter indicates the type of the import job. In scenarios in which Spark Load is used, the type is SPARK.

  • The following information describes some other parameters in the result:

    • CreateTime: the time when the import job is created.

    • EtlStartTime: the time when the import job enters the ETL state.

    • EtlFinishTime: the time when the import job leaves the ETL state.

    • LoadStartTime: the time when the import job enters the LOADING state.

    • LoadFinishTime: the time when the import job is complete.

  • The JobDetails parameter indicates the details of the running status of the import job. This parameter is updated when the import job leaves the ETL state. The details include the number of imported files, the total size of the imported files, the number of tasks, and the number of raw data rows that are processed. Example: {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}.

  • The URL parameter indicates the URL of the application webpage. You can paste the URL in a browser to go to the webpage of the corresponding application.

View the logs

When the Spark Load job is submitted, logs are generated. By default, the logs are stored in the log/spark_launcher_log path of the root directory of the FE node and named in the following format: spark_launcher_{load_job_id}_{label}.log.

Note

The logs are stored in this directory for a period of time before they are deleted along with the import information in the metadata of the FE node. The default retention period is three days.

Cancel the import job

If the import job is not in the CANCELLED or FINISHED state, you can cancel the import job as required. When you cancel an import job, specify the label of the job. For more information about the syntax to cancel an import job, run the HELP CANCEL LOAD command.

Related system configurations

This section describes the system-level parameters of import jobs submitted by using Spark Load. The parameters are applied to all Spark Load import jobs. You can modify the parameters in the fe.conf file.

  • enable_spark_load: specifies whether to enable Spark Load and the resource creation feature. Default value: false, which specifies disabling Spark Load and the resource creation feature.

  • spark_load_default_timeout_second: the default timeout period of the import job. Unit: seconds. Default value: 259200, which specifies three days.

  • spark_home_default_dirspark: the directory in which the Spark client resides. Default value: fe/lib/spark2x.

  • spark_resource_path: the directory in which the Spark dependency package resides. By default, this parameter is empty.

  • spark_launcher_log_dirspark: the directory in which the log submitted by the Spark client resides. Default value: fe/log/spark_launcher_log.

  • yarn_client_pathyarn: the directory in which the binary executable file of the YARN client resides. Default value: fe/lib/yarn-client/hadoop/bin/yarn.

  • yarn_config_diryarn: The path in which the configuration files are generated is fe/lib/yarn-config.