Spark Load allows you to use external Spark resources to preprocess data to be imported. This improves the performance of importing large volumes of data to StarRocks clusters and saves the computing resources of StarRocks clusters. You can use Spark Load the first time data is migrated to StarRocks clusters or if large volumes of data (TB level) are imported to StarRocks clusters. This topic describes the terms, workflow, usage examples, best practices, and FAQ of Spark Load.

Background information

Spark Load is an asynchronous load method. You must create import jobs of the Spark type by using the MySQL protocol and view import results by running the SHOW LOAD command.
Note The images and some information in this topic are from Bulk load using Apache Spark of open source StarRocks.

Terms

  • Spark ETL: Spark ETL is used to extract, transform, and load (ETL) data to be imported. You can use Spark ETL to perform operations such as creating bitmap global dictionaries, data partitioning, data sorting, and data aggregation.
  • Broker: A broker is an independent stateless process that encapsulates file system interfaces. This allows StarRocks clusters to read files from remote storage systems.
  • Global dictionary: A global dictionary stores the data structures of raw data and mapped encoded data. The raw data can be of all types, whereas the encoded data must be integers. Global dictionaries are usually used for high-accuracy deduplication in precalculation.

Workflow

After an import job of the Spark type is submitted from the MySQL client, the frontend node records metadata and returns a message to indicate that the job is successfully submitted.

The following figure shows the workflow of Spark Load. Spark Load
The workflow consists of the following steps:
  1. Submit a Spark Load job to the frontend node.
  2. The frontend node schedules and submits an ETL job to a Spark cluster.
  3. Run the ETL job in the Spark cluster. Bitmap global dictionaries are created and data is partitioned, sorted, and aggregated. This way, the data to be imported can be preprocessed.
  4. After the ETL job is run, the frontend node obtains the directory of the preprocessed data in each partition and schedules backend nodes to run a push job.
  5. The backend nodes use brokers to read data and transform the data type into the type that can be stored in StarRocks clusters.
  6. The frontend node publishes a StarRocks version and completes the import job.

Global dictionary

Scenario

Bitmap columns in StarRocks are realized by using roaring bitmaps. The data imported by using roaring bitmaps must be integers. To precalculate bitmap columns before you import data, convert the data type into Integer. During data import, a global dictionary stores the data structures of raw data and mapped encoded data based on a Hive table.

Workflow

  1. Read data from a data source and store the data in a temporary Hive table named hive-table.
  2. Deduplicate data in the hive-table table and store the deduplicated raw data in a Hive table named distinct-value-table.
  3. Create a global dictionary table named dict-table to store raw data in a column and encoded data in another column.
  4. Perform a LEFT JOIN operation on the distinct-value-table and dict-table tables. Use a window function to encode the raw data of the LEFT JOIN results, and write the columns of raw data and encoded data back to the dict-table table.
  5. Perform a JOIN operation on the dict-table and hive-table tables. Replace the raw data in the hive-table table with the encoded integers.
  6. Data in the hive-table table is read and calculated in the subsequent operations, and then imported to a StarRocks cluster.

Data preprocessing

The data is preprocessed in the following steps:
  1. Read data from a Hadoop Distributed File System (HDFS) file or a Hive table.
  2. Perform field mappings and expression-based calculation on the data and generate a field called bucket-id based on the partition information.
  3. Generate rollup trees based on the rollup metadata in StarRocks tables.
  4. Traverse rollup trees to aggregate data in different layers. Rollups in a layer are calculated based on rollups in the previous layer.
  5. After data is aggregated, data is distributed to different buckets based on the bucket-id filed and written into HDFS.
  6. Brokers read HDFS files and import data into backend nodes of StarRocks clusters.

Basic operations

Configure clusters for ETL jobs

Spark provides external resources for you to run ETL jobs in StarRocks clusters. Other types of external resources will be supported in StarRocks clusters in the future. For example, Spark or GPU can be used for data queries, HDFS or Amazon Simple Storage Service (S3) can be used to store external data, and MapReduce can be used for ETL. You can use Resource Management to manage these external resources.

Before you submit an import job, configure a Spark cluster for the ETL job. Syntax:
-- create spark resource
CREATE EXTERNAL RESOURCE resource_name
PROPERTIES
(
 type = spark,
 spark_conf_key = spark_conf_value,
 working_dir = path,
 broker = broker_name,
 broker.property_key = property_value
);

-- drop spark resource
DROP RESOURCE resource_name;

-- show resources
SHOW RESOURCES
SHOW PROC "/resources";

-- privileges
GRANT USAGE_PRIV ON RESOURCE resource_name TO user_identityGRANT USAGE_PRIV ON RESOURCE resource_name TO ROLE role_name;
REVOKE USAGE_PRIV ON RESOURCE resource_name FROM user_identityREVOKE USAGE_PRIV ON RESOURCE resource_name FROM ROLE role_name;
  • Create a resource

    resource-name is the property that specifies the name of the Spark resource configured for the StarRocks clusters.

    PROPERTIES includes a number of properties of the Spark resource. The following table describes the properties. For more information, see Spark Configuration of the Spark documentation.
    PropertyDescription
    typeThe type of the resource. Set the value to spark. This property is required.
    Spark-related propertiesspark.master Set the value to yarn. This property is required.
    spark.submit.deployModeThe mode in which the Spark program is deployed. Valid values: cluster and client. This property is required.
    spark.hadoop.fs.defaultFSThis property is required only if the spark.master property is set to yarn.
    spark.hadoop.yarn.resourcemanager.addressThe address of a ResourceManager.
    spark.hadoop.yarn.resourcemanager.ha.enabledSpecifies whether to enable high availability on the ResourceManager. Default value: true.
    spark.hadoop.yarn.resourcemanager.ha.rm-idsThe logical IDs of the ResourceManager.
    spark.hadoop.yarn.resourcemanager.hostname.rm-idThe hostname corresponds to each logical ID.
    Note You need to configure either the spark.hadoop.yarn.resourcemanager.hostname.rm-id property or the spark.hadoop.yarn.resourcemanager.address.rm-id property for a high-availability ResourceManager.
    spark.hadoop.yarn.resourcemanager.address.rm-idThe address corresponds to each logical ID to which the client submits a job. The address is in the host:port format.
    Note You need to configure either the spark.hadoop.yarn.resourcemanager.hostname.rm-id property or the spark.hadoop.yarn.resourcemanager.address.rm-id property for a high-availability ResourceManager.
    working_dir The directory in which the resource for ETL resides.
    Note This property is required if the Spark resource is used for ETL. Example: hdfs://host:port/tmp/starrocks.
    brokerThe name of the broker.
    Note This property is required if the Spark resource is used for ETL. You must run the ALTER SYSTEM ADD BROKER command to configure the broker in advance.
    broker.property_keyThe information required for verification when the broker reads the intermediate file of ETL.
    Sample code:
    -- yarn cluster mode
    CREATE EXTERNAL RESOURCE "spark0"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.jars" = "xxx.jar,yyy.jar",
        "spark.files" = "/tmp/aaa,/tmp/bbb",
        "spark.executor.memory" = "1g",
        "spark.yarn.queue" = "queue0",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker0",
        "broker.username" = "user0",
        "broker.password" = "password0"
    );
    
    -- yarn cluster mode that enables high availability
    CREATE EXTERNAL RESOURCE "spark1"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.submit.deployMode" = "cluster",
        "spark.hadoop.yarn.resourcemanager.ha.enabled" = "true",
        "spark.hadoop.yarn.resourcemanager.ha.rm-ids" = "rm1,rm2",
        "spark.hadoop.yarn.resourcemanager.hostname.rm1" = "host1",
        "spark.hadoop.yarn.resourcemanager.hostname.rm2" = "host2",
        "spark.hadoop.fs.defaultFS" = "hdfs://namenode_host:9000",
        "working_dir" = "hdfs://namenode_host:9000/tmp/starrocks",
        "broker" = "broker1"
    );
    
    -- HDFS cluster mode that enables high availability
    CREATE EXTERNAL RESOURCE "spark2"
    PROPERTIES
    (
        "type" = "spark",
        "spark.master" = "yarn",
        "spark.hadoop.yarn.resourcemanager.address" = "resourcemanager_host:8032",
        "spark.hadoop.fs.defaultFS" = "hdfs://myha",
        "spark.hadoop.dfs.nameservices" = "myha",
        "spark.hadoop.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "spark.hadoop.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "spark.hadoop.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
        "working_dir" = "hdfs://myha/tmp/starrocks",
        "broker" = "broker2",
        "broker.dfs.nameservices" = "myha",
        "broker.dfs.ha.namenodes.myha" = "mynamenode1,mynamenode2",
        "broker.dfs.namenode.rpc-address.myha.mynamenode1" = "nn1_host:rpc_port",
        "broker.dfs.namenode.rpc-address.myha.mynamenode2" = "nn2_host:rpc_port",
        "broker.dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
    );
  • View resources

    You can view resources only on which you have the USAGE-PRIV permission by using an ordinary account. You can view all resources by using the root or admin account.

  • Manage permissions on resources
    You can run the GRANT REVOKE command to manage permissions on resources. Only the USAGE-PRIV permission is supported. You can grant the USAGE-PRIV permission to a user or role. The role can be assigned as usual. Sample code:
    -- Grant the USAGE-PRIV permission on the resource spark0 to the user user0:
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO "user0"@"%";
    
    -- Grant the USAGE-PRIV permission on the resource spark0 to the role role0:
    GRANT USAGE_PRIV ON RESOURCE "spark0" TO ROLE "role0";
    
    -- Grant the USAGE-PRIV permission on all resources to the user user0:
    GRANT USAGE_PRIV ON RESOURCE * TO "user0"@"%";
    
    -- Grant the USAGE-PRIV permission on all resources to role role0:
    GRANT USAGE_PRIV ON RESOURCE * TO ROLE "role0";
    
    -- Revoke the USAGE-PRIV permission on the resource spark0 from the user user0:
    REVOKE USAGE_PRIV ON RESOURCE "spark0" FROM "user0"@"%";

Configure the Spark client

The underlying frontend node runs the spark-submit command to submit Spark Load jobs. Therefore, you must configure the Spark client for the frontend node. We recommend that you click Spark download URL to download Spark 2.4.5 or a later version of Spark 2.x. Then, perform the following steps to configure the Spark client.
  1. Configure the SPARK-HOME environment variable

    Store the Spark client in the directory in which the frontend node resides. Set the spark_home_default_dir parameter in the configuration file of the frontend node to the directory. The default value of this parameter is lib/spark2x under the root directory of the frontend node. This parameter cannot be left empty.

  2. Configure a Spark dependency package

    Package all JAR packages in the jars folder of the Spark client into a ZIP file. Set the spark_resource_path parameter in the configuration file of the frontend node to the directory of the ZIP file. If the parameter is left empty, the frontend node searches the root directory for the lib/spark2x/jars/spark-2x.zip file. If the file is not found, an error message is returned.

    When a Spark Load job is submitted, the archived dependency package is uploaded to a remote warehouse which by default resides in the working_dir/{cluster_id} directory. The remote warehouse is named in the --spark-repository--{resource-name} format. The {resource-name} field specifies the resource that corresponds to the remote warehouse. Example of the directory structure of a remote warehouse:
    ---spark-repository--spark0/
       |---archive-1.0.0/
       |   |---lib-990325d2c0d1d5e45bf675e54e44fb16-spark-dpp-1.0.0-jar-with-dependencies.jar
       |   |---lib-7670c29daf535efe3c9b923f778f61fc-spark-2x.zip
       |---archive-1.1.0/
       |   |---lib-64d5696f99c379af2bee28c1c84271d5-spark-dpp-1.1.0-jar-with-dependencies.jar
       |   |---lib-1bbb74bb6b264a270bc7fca3e964160f-spark-2x.zip
       |---archive-1.2.0/
       |   |-...

By default, the Spark dependency package is named spark-2x.zip. The frontend node also uploads the Dynamic Partition Pruning (DPP) dependency package to the remote warehouse. If Spark Load submits all dependency packages to the remote warehouse, the frontend node does not need to upload the two dependency packages, which saves time.

Configure the YARN client

The underlying frontend node runs YARN commands to obtain the status of ongoing applications and terminate applications. Therefore, you must configure the YARN client for the frontend node. We recommend that you click Hadoop download URL to download Hadoop 2.5.2 or a later version of Hadoop 2.x. Then, perform the following steps to configure the YARN client.
  1. Specify the directory of the executable file for the YARN client

    Store the YARN client in the directory in which the frontend node resides. Set the yarn_client_path parameter in the configuration file of the frontend node to the directory of the binary executable file for the YARN client. The default value of this parameter is lib/yarn-client/hadoop/bin/yarn under the root directory of the frontend node.

  2. (Optional) Specify the directory of the configuration file for YARN commands

    When the frontend node obtains the status of applications or terminates applications, a configuration file used to run YARN commands is generated in the lib/yarn-config directory that is under the root directory of the frontend node by default. You can change the directory in which the file is generated by modifying the yarn_config_dir parameter in the configuration file of the frontend node. The generated configuration files include core-site.xml and yarn-site.xml.

Create an import job

  • Syntax:
    LOAD LABEL load_label
        (data_desc, ...)
    WITH RESOURCE resource_name
    [resource_properties]
    [PROPERTIES (key1=value1, ... )]
    
    * load_label:
        db_name.label_name
    
    * data_desc:
        DATA INFILE ('file_path', ...)
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [COLUMNS TERMINATED BY separator ]
        [(col1, ...)]
        [COLUMNS FROM PATH AS (col2, ...)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
        DATA FROM TABLE hive_external_tbl
        [NEGATIVE]
        INTO TABLE tbl_name
        [PARTITION (p1, p2)]
        [SET (k1=f1(xx), k2=f2(xx))]
        [WHERE predicate]
    
    * resource_properties:
     (key2=value2, ...)
    For more information about the syntax, run the HELP SPARK LOAD command. The following list describes the parameters in the syntax:
    • Label

      The label of the import job. The label is unique in a database. Specifications of the parameter are the same as those of the same parameter in the Broker Load topic.

    • Parameters on data description

      Only CSV files and Hive tables are supported data sources. Other specifications of the parameters are the same as those of the same parameters in the Broker Load topic.

    • Parameters on job properties

      The properties of the import job. The parameters are in the opt_properties section. Specifications of the parameters are the same as those of the parameters in the broker_properties section in the Broker Load topic.

    • Parameters on Spark resources
      Parameters on Spark resources must be configured in the StarRocks cluster in advance. Spark Load can be used only after the user is granted the USAGE-PRIV permission. In the following sample code, two parameters on Spark resources are configured. You can configure the parameters if you have temporary requirements, such as adding resources for the job. This configuration takes effect only for this job and does not affect the configurations of the StarRocks cluster. Sample code:
      WITH RESOURCE 'spark0'
      (
         "spark.driver.memory" = "1g",
         "spark.executor.memory" = "3g"
      )
    • Import data from a Hive table

      To import data from a Hive table, create an external Hive table and specify the table name when you submit the import command.

    • Create a global dictionary during data import
      You can create a global dictionary if the aggregated column of the StarRocks table is the bitmap column. To create a global dictionary, specify the field for which you want to create a global dictionary in the LOAD command in the following format: Field name=bitmap_dict (Field name of the Hive table).
      Important You can create a global dictionary only if you import data from a Hive table.
  • Sample code:
    • Create an import job to import data from an HDFS file:
      LOAD LABEL load_label
          (data_desc, ...)
      WITH RESOURCE resource_name
      [resource_properties]
      [PROPERTIES (key1=value1, ... )]
      
      * load_label:
          db_name.label_name
      
      * data_desc:
          DATA INFILE ('file_path', ...)
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [COLUMNS TERMINATED BY separator ]
          [(col1, ...)]
          [COLUMNS FROM PATH AS (col2, ...)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]
      
          DATA FROM TABLE hive_external_tbl
          [NEGATIVE]
          INTO TABLE tbl_name
          [PARTITION (p1, p2)]
          [SET (k1=f1(xx), k2=f2(xx))]
          [WHERE predicate]
      
      * resource_properties:
       (key2=value2, ...)
    • Create an import job to import data from a Hive table:
      1. Create a Hive resource.
        CREATE EXTERNAL RESOURCE hive0
        properties
        (
            "type" = "hive",
            "hive.metastore.uris" = "thrift://emr-header-1.cluster-xxx:9083"
        );
      2. Create an external Hive table.
        CREATE EXTERNAL TABLE hive_t1
        (
            k1 INT,
            K2 SMALLINT,
            k3 varchar(50),
            uuid varchar(100)
        )
        ENGINE=hive
        properties
        (
            "resource" = "hive0",
            "database" = "tmp",
            "table" = "t1"
        );
      3. Submit the LOAD command to ensure that the column in the StarRocks table to which the data is to be imported also resides in the external Hive table.
        LOAD LABEL db1.label1
        (
            DATA FROM TABLE hive_t1
            INTO TABLE tbl1
            SET
            (
                uuid=bitmap_dict(uuid)
            )
        )
        WITH RESOURCE 'spark0'
        (
            "spark.executor.memory" = "2g",
            "spark.shuffle.compress" = "true"
        )
        PROPERTIES
        (
            "timeout" = "3600"
        );

View the import job

Import jobs submitted by using Spark Load or Broker Load are asynchronous import jobs. You must use the job labels in the SHOW LOAD command to view the import jobs. The command is commonly used to view import jobs submitted in all methods. For more information, run the HELP SHOW LOAD command.

Sample code:
show load order by createtime desc limit 1\G
Returned result:
 *************************** 1. row ***************************
  JobId: 76391
  Label: label1
  State: FINISHED
 Progress: ETL:100%; LOAD:100%
  Type: SPARK
 EtlInfo: unselected.rows=4; dpp.abnorm.ALL=15; dpp.norm.ALL=28133376
 TaskInfo: cluster:cluster0; timeout(s):10800; max_filter_ratio:5.0E-5
 ErrorMsg: N/A
 CreateTime: 2019-07-27 11:46:42
 EtlStartTime: 2019-07-27 11:46:44
 EtlFinishTime: 2019-07-27 11:49:44
 LoadStartTime: 2019-07-27 11:49:44
LoadFinishTime: 2019-07-27 11:50:16
  URL: http://1.1.1.1:8089/proxy/application_1586619723848_0035/
 JobDetails: {"ScannedRows":28133395,"TaskNumber":1,"FileNumber":1,"FileSize":200000}
The following table describes the parameters in the returned result.
ParameterDescription
StateThe current status of the import job.

After the import job is submitted, the import job enters the PENDING state. After the ETL job is submitted, the import job enters the ETL state. After the ETL job is completed and the frontend node schedules backend nodes to run a push job, the import job enters the LOADING state. After the push job is completed and the StarRocks version takes effect, the import job enters the FINISHED state.

After the import job is complete, the import job enters the CANCELLED or FINISHED state. The CANCELLED state indicates that the import job fails, whereas the FINISHED state indicates that the import job is successful.

ProgressThe progress of the import job, which includes the ETL progress and the LOAD progress. The ETL progress indicates the progress in the ETL state. The LOAD progress indicates the progress in the LOADING state.
The LOAD progress can be 0 to 100%. You can calculate the LOAD progress based on the following formula:
LOAD progress = Number of imported tablets in all replicas/Total number of tablets in the import job × 100%
Note
  • If all tables are imported, the LOAD progress is 99%. The LOAD progress becomes 100% only after the import takes effect.
  • The import progress is not linear. Therefore, if the progress remains unchanged within a time period, the import is not necessarily paused.
TypeThe type of the import job. In scenarios in which Spark Load is used, the type is SPARK.
CreateTimeThe time when the import job is created.
EtlStartTimeThe time when the import job enters the ETL state.
EtlFinishTimeThe time when the import job leaves the ETL state.
LoadStartTimeThe time when the import job enters the LOADING state.
LoadFinishTimeThe time when the import job is completed.
JobDetailsThe details of the import job, including the number of imported files, the total bytes of imported data, the number of tasks, and the number of processed rows in the raw data. Example:
 {"ScannedRows":139264,"TaskNumber":1,"FileNumber":1,"FileSize":940754064}
URLThe URL of the application webpage. You can paste the URL in a browser to go to the webpage of the corresponding application.

For more information about the parameters in the returned result, see Broker Load.

View the logs

When the Spark Load job is submitted, logs are generated. By default, the logs are stored in the log/spark_launcher_log directory under the root directory of the frontend node and named in the spark-launcher-{load-job-id}-{label}.log format. The log is temporarily retained before it is deleted along with the import information in the metadata of the frontend node. By default, the log is retained for three days.

Cancel the import job

If the import job is not in the CANCELLED or FINISHED state, you can cancel the import job as required. When you cancel an import job, specify the label of the job. For more information about the syntax, run the HELP CANCEL LOAD command.

Related system configurations

The following table describes the system-level parameters of import jobs that are submitted by using Spark Load. You can modify the parameters in fe.conf.
ParameterDescription
enable-spark-loadSpecifies whether to enable Spark Load and the resource creation feature.

Default value: false, which specifies to disable Spark Load and the resource creation feature.

spark-load-default-timeout-secondThe default timeout period of the task.import job.

Unit: seconds. Default value: 259200, which specifies three days.

spark-home-default-dirThe directory in which the Spark client resides.

Default value: fe/lib/spark2x.

spark-launcher-log-dirThe directory in which the Spark dependency package resides.

By default, this parameter is left empty.

spark-launcher-log-dirThe directory in which the log submitted by the Spark client resides.

Default value: fe/log/spark-launcher-log.

yarn-client-pathThe directory in which the binary executable file of the YARN client resides.

Default value: fe/lib/yarn-client/hadoop/bin/yarn.

yarn-config-dirThe directory in which the configuration file of the YARN client resides.

Default value: fe/lib/yarn-config.

Best practice

Spark Load is most suitable for scenarios in which tens of GB or TB of data is to be imported from HDFS files. If you import a small amount of data, we recommend that you use Stream Load or Broker Load.

For information about complete sample code, see 03_sparkLoad2StarRocks.md at GitHub.