All Products
Search
Document Center

Data Lake Analytics - Deprecated:Use the spark-submit CLI

Last Updated:Feb 07, 2024

This topic describes how to use the spark-submit CLI of Data Lake Analytics (DLA) and provides some examples of using the spark-submit CLI.

Important

DLA is discontinued. AnalyticDB for MySQL Data Lakehouse Edition supports the existing features of DLA and provides more features and better performance. For more information about how to use AnalyticDB for MySQL to develop Spark applications, see Use spark-submit to develop Spark applications.

Download and install the spark-submit CLI

  1. Download the installation package of the spark-submit CLI.

    You can also run the following wget command to download this package whose file name is dla-spark-toolkit.tar.gz:

    wget https://dla003.oss-cn-hangzhou.aliyuncs.com/dla_spark_toolkit/dla-spark-toolkit.tar.gz
  2. After the package is downloaded, decompress the package.

    tar zxvf dla-spark-toolkit.tar.gz
    Note

    To use the spark-submit CLI, make sure that JDK 8 or later is installed.

Configure parameters

Use the command line to configure common parameters in the spark-defaults.conf file. The following sample code shows the parameters that you can configure in the spark-defaults.conf file.

#  cluster information
# AccessKeyId  
#keyId =
#  AccessKeySecret
#secretId =
#  RegionId
#regionId =
#  set vcName
#vcName =
#  set ossKeyId, if not set will use --keyId value or keyId value
#ossKeyId =
# set ossSecretId if not set will use --secretId value or secretId value
#ossSecretId =
#  set OssUploadPath, if you need upload local resource
#ossUploadPath =

##spark conf
#  driver specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.driver.resourceSpec =
#  executor instance number
#spark.executor.instances =
#  executor specifications : small(1c4g) | medium (2c8g) | large (4c16g) | xalrge (8c32g)
#spark.executor.resourceSpec =
#  when use ram,  role arn
#spark.dla.roleArn =
#  config dla oss connectors
#spark.dla.connectors = oss
#  config eni, if you want to use eni
#spark.dla.eni.enable = true
#spark.dla.eni.vswitch.id =
#spark.dla.eni.security.group.id =
#  config log location, need an oss path to store logs
#spark.dla.job.log.oss.uri =
#  config spark read dla table when use option -f or -e
#spark.sql.hive.metastore.version = dla

## any other user defined spark conf...

You must configure the keyId, secretId, regionId, and vcName parameters. The following table describes these parameters.

Parameter

Description

keyId

The AccessKey ID of your Alibaba Cloud account.

secretId

The AccessKey secret of your Alibaba Cloud account.

vcName

The name of your Spark cluster.

regionId

The ID of the region in which your Spark cluster resides. For more information about the mappings between regions, zones and region IDs, see Regions and Zones.

You can run the following command to query the information to help you use the spark-submit CLI:

cd /path/to/dla-spark-toolkit
./bin/spark-submit --help

After you run the preceding command, the following results are returned:

Info: Usage: spark-submit [options] <app jar> [app arguments]
Usage: spark-submit --list [PAGE_NUMBER] [PAGE_SIZE]
Usage: spark-submit --kill [JOB_ID]
Info:
Options:
  --keyId                             Your ALIYUN_ACCESS_KEY_ID, is same as `keyId` in conf/spark-defaults.conf or --conf spark.dla.access.key.id=<value>, required
  --secretId                          Your ALIYUN_ACCESS_KEY_SECRET, is same as `secretId` in conf/spark-defaults.conf or --conf spark.dla.access.secret.id=<value>, required
  --regionId                          Your Cluster Region Id, is same as `regionId` in conf/spark-defaults.conf or --conf spark.dla.region.id=<value>,  required
  --vcName                            Your Virtual Cluster Name, is same as `vcName` in conf/spark-defaults.conf or --conf spark.dla.vc.name=<value>, required
  --oss-keyId                         Your ALIYUN_ACCESS_KEY_ID to upload local resource to oss,
                                      by default, the value will take from --keyId, is same as `ossKeyId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.key.id=<value>
  --oss-secretId                      Your ALIYUN_ACCESS_KEY_SECRET to upload local resource to oss,
                                      default the value will take from --secretId, is same as `ossSecretId` in conf/spark-defaults.conf or --conf spark.dla.oss.access.secret.id=<value>
  --oss-endpoint                      Oss endpoint where the resource will upload. default is http://oss-$regionId.aliyuncs.com,
                                      is same as `ossEndpoint` in conf/spark-defaults.conf or --conf spark.dla.oss.endpoint=<value>
  --oss-upload-path                   The user oss path where the resource will upload
                                      If you want to upload a local jar package to the OSS directory,
                                      you need to specify this parameter. It is same as `ossUploadPath` in conf/spark-defaults.conf or --conf spark.dla.oss.upload.path=<value>
  --class CLASS_NAME                  Your application's main class (for Java / Scala apps).
  --name NAME                         A name of your application.
  --jars JARS                         Comma-separated list of jars to include on the driver
                                      and executor classpaths.
  --conf PROP=VALUE                   Arbitrary Spark configuration property, or you can set conf in conf/spark-defaults.conf
  --help, -h                          Show this help message and exit.
  --driver-resource-spec              Indicates the resource specifications used by the driver:
                                      small | medium | large | xlarge | 2xlarge
                                      you can also set this value through --conf spark.driver.resourceSpec=<value>
  --executor-resource-spec            Indicates the resource specifications used by the executor:
                                      small | medium | large | xlarge | 2xlarge
                                      you can also set this value through --conf spark.executor.resourceSpec=<value>
  --num-executors                     Number of executors to launch, you can also set this value through --conf spark.executor.instances=<value>
  --driver-memory MEM                 Memory for driver (e.g. 1000M, 2G)
                                      you can also set this value through --conf spark.driver.memory=<value>
  --driver-cores NUM                  Number of cores used by the driver
                                      you can also set this value through --conf spark.driver.cores=<value>
  --driver-java-options               Extra Java options to pass to the driver
                                      you can also set this value through --conf spark.driver.extraJavaOptions=<value>
  --executor-memory MEM               Memory per executor (e.g. 1000M, 2G)
                                      you can also set this value through --conf spark.executor.memory=<value>
  --executor-cores NUM                Number of cores per executor.
                                      you can also set this value through --conf spark.executor.cores=<value>
  --properties-file                   Spark default conf file location, only local files are supported, default conf/spark-defaults.conf
  --py-files PY_FILES                 Comma-separated list of .zip, .egg, or .py files to place
                                      on the PYTHONPATH for Python apps
  --files FILES                       Comma-separated list of files to be placed in the working
                                      directory of each executor. File paths of these files
                                      in executors can be accessed via SparkFiles.get(fileName).
                                      Specially, you can pass in a custom log output format file named `log4j.properties`
                                      Note: The file name must be `log4j.properties` to take effect
  --archives                          Comma separated list of archives to be extracted into the
                                      working directory of each executor. Support file types: zip, tgz, tar, tar.gz
  --status job_id                     If given, requests the status and details of the job specified
  --verbose                           Print additional debug output
  --version, -v                       Print out the dla-spark-toolkit version.

  List Spark Job Only:
  --list                              List Spark Job, should use specify --vcName and --regionId
  --pagenumber, -pn                   Set page number which want to list (default: 1)
  --pagesize, -ps                     Set page size which want to list (default: 1)

  Get Job Log Only:
  --get-log job_id                    Get job log

  Kill Spark Job Only:
  --kill job_id                       Specifies the jobid to be killed

  Spark Offline SQL options:
  -e <quoted-query-string>            SQL from command line. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
                                      you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
                                      which requires the user to specified the --oss-upload-path
  -f <filename>                       SQL from files. By default, use SubmitSparkSQL API to submit SQL, support set command to set spark conf.
                                      you can set --disable-submit-sql to submit an SQL job using the previous SubmitSparkJob API,
                                      which requires the user to specified the --oss-upload-path
  -d,--define <key=value>             Variable substitution to apply to spark sql
                                      commands. e.g. -d A=B or --define A=B
  --hivevar <key=value>               Variable substitution to apply to spark sql
                                      commands. e.g. --hivevar A=B
  --hiveconf <property=value>         Use value for given property, DLA spark toolkit will add `spark.hadoop.` prefix to property
  --database <databasename>           Specify the database to use
  --enable-inner-endpoint             It means that DLA pop SDK and OSS SDK will use the endpoint of Intranet to access DLA,
                                      you can turn on this option when you are on Alibaba cloud's ECS machine.


  Inner API options:
  --api-retry-times                   Specifies the number of retries that the client fails to call the API, default 3.
  --time-out-seconds                  Specifies the timeout for the API(time unit is second (s)), which is considered a call failure.
                                      default 10s.                            
Note

The spark-submit CLI script automatically reads configurations from the spark-defaults.conf file. If you use the command line to modify the parameters in the conf/spark-defaults.conf file, the spark-submit CLI obtains the parameter values submitted by the command line.

Configure compatibility

To resolve compatibility issues with the open source spark-submit CLI, you can specify the following parameters in the spark-defaults.conf file.

--keyId              #--conf spark.dla.access.key.id=<value>
--secretId           #--conf spark.dla.access.secret.id=<value>
--regionId           #--conf spark.dla.region.id=<value>
--vcName             #--conf spark.dla.vc.name=<value>
--oss-keyId          #--conf spark.dla.oss.access.key.id=<value>
--oss-secretId       #--conf spark.dla.oss.access.secret.id=<value>
--oss-endpoint       #--conf spark.dla.oss.endpoint=<value>
--oss-upload-path    #--conf spark.dla.oss.upload.path=<value>

The following parameters are not supported or are meaningless for the serverless Spark engine of DLA. Therefore, the values of these parameters are ignored.

Useless options(these options will be ignored):
  --deploy-mode
  --master
  --packages, please use `--jars` instead
  --exclude-packages
  --proxy-user
  --repositories
  --keytab
  --principal
  --queue
  --total-executor-cores
  --driver-library-path
  --driver-class-path
  --supervise
  -S,--silent
  -i <filename>
                

The driver and executors of the serverless Spark engine run on an elastic container. However, the elastic container supports only specific resource specifications. For more information about the resource specifications supported by the serverless Spark engine, see Overview. To resolve this issue, the Alibaba Cloud DLA team adjusts the parameters related to the resource specifications in the spark-defaults.conf file. The following table describes the adjusted parameters that are used by the spark-submit CLI of DLA. The adjusted parameters work in a different way from the parameters used by the open source spark-submit CLI.

Parameter

Description

--driver-cores/--conf spark.driver.cores

Specifies the number of CPU cores that are used for the driver. The spark-submit CLI of DLA sets this parameter to a value that is closest to and greater than or equal to the user-specified number.

--driver-memory/--conf spark.driver.memory

Specifies the memory that is used for the driver. The spark-submit CLI of DLA sets this parameter to a value that is closest to and greater than or equal to the user-specified memory.

--executor-cores/--conf spark.executor.cores

Specifies the number of CPU cores that are used for each executor. The spark-submit CLI of DLA sets this parameter to a value that is closest to and greater than or equal to the user-specified number.

--executor-memory/--conf spark.executor.memory

Specifies the memory that is used for each executor. The spark-submit CLI sets this parameter to a value that is closest to but greater than or equal to the user-specified memory.

The following table describes the parameters that are supported only by the spark-submit CLI of DLA.

Parameter

Description

--driver-resource-spec

Specifies the resource specifications of the driver. The priority of this parameter is higher than that of the --driver-cores/--conf spark.driver.cores parameter. If the --driver-resource-spec and --driver-cores parameters are both specified, the value of the --driver-resource-spec parameter is used.

--executor-resource-spec

Specifies the resource specifications of the driver. The priority of this parameter is higher than that of the --executor-cores/--conf spark.executor.cores parameter.

--api-retry-times

Specifies the number of times a failed command can be rerun. The commands that are used to submit jobs cannot be rerun because a job submission operation is not an idempotent operation. If a job fails to be submitted due to network timeout, the job may still be successfully executed in the background. To prevent a job from being repeatedly submitted, the commands used to submit jobs cannot be rerun if they fail. You need to use --list to obtain the jobs that are submitted. You can also go to the DLA console to check whether jobs are successfully submitted based on the job list.

--time-out-seconds

Specifies the default network timeout period (in seconds) of the spark-submit CLI. Default value: 10. If the network times out, the command may fail and be rerun.

--enable-inner-endpoint

Specifies whether to access DLA POP SDK and OSS SDK over an internal network. You can specify this parameter if your services are deployed on an Elastic Compute Service (ECS) instance. After you specify this parameter, the spark-submit CLI accesses DLA POP SDK and OSS SDK over an internal network. This makes the network connection more stable.

--list

Obtains the list of jobs. This parameter is usually used with the --pagesize and --pagenumber parameters. The --pagesize parameter specifies the number of pages to display the job list. The --pagenumber parameter specifies the number of jobs displayed on each page. By default, the --pagesize parameter is set to 1 and the --pagenumber parameter is set to 10. This indicates that 10 jobs on the first page are returned.

--kill

Kills a job based on the ID of the job.

--get-log

Obtains logs of a job based on the ID of the job.

--status

Obtains the details of a job based on the ID of the job.

Submit a job

  • On the Submit job page, submit a Spark job by using the configurations in the JSON format. The following sample code provides an example.

    {
        "name": "xxx",
        "file": "oss://{bucket-name}/jars/xxx.jar",
        "jars": "oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar"
        "className": "xxx.xxx.xxx.xxx.xxx",
        "args": [
            "xxx",
            "xxx"
        ],
        "conf": {
            "spark.executor.instances": "1",
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium",
            "spark.dla.job.log.oss.uri": "oss://{bucket-name}/path/to/log/"
        }
    }
  • If you use the spark-submit CLI to submit a job, the job configurations are in the following format.

    ./bin/spark-submit  \
    --class xxx.xxx.xxx.xxx.xxx \
    --verbose \
    --name xxx \
    --jars oss://{bucket-name}/jars/xxx.jar,oss://{bucket-name}/jars/xxx.jar
    --conf spark.driver.resourceSpec=medium \
    --conf spark.executor.instances=1 \
    --conf spark.executor.resourceSpec=medium \
    oss://{bucket-name}/jars/xxx.jar \
    args0 args1
    
    ## The main program file can be a JAR package that is specified by the --jars parameter or a file that is specified by the --py-files or --files parameter. The main program file can be saved in a local directory or an Object Storage Service (OSS) directory. 
    ## You must specify an absolute path for a local file. When you use the spark-submit CLI, the local file is automatically uploaded to the specified OSS directory.
    ## You can use the --oss-upload-path or ossUploadPath parameter in the spark-defaults.conf file to specify the OSS directory.
    ## When a local file is being uploaded, the file content is verified by using MD5. If a file that has the same name and MD5 value as your local file exists in the specified OSS directory, the file upload is canceled. 
    ## If you manually update the JAR package in the specified OSS directory, delete the MD5 file that corresponds to the JAR package.
    ## Format: --jars  /path/to/local/directory/XXX.jar,/path/to/local/directory/XXX.jar
    ## Separate multiple file names with commas (,) and specify an absolute path for each file.
    
    ## The --jars, --py-files, and --files parameters also allow you to specify a local directory from which all files are uploaded. Files in subdirectories are not recursively uploaded.
    ## You must specify an absolute path for the directory. Example: --jars /path/to/local/directory/,/path/to/local/directory2/
    ## Separate multiple directories with commas (,) and specify an absolute path for each directory.
    
    
    ## View the program output. You can use the Spark web UI listed in the following output to access the Spark web UI of the job and view the job details to check whether the parameters submitted by the job meet your expectations.
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: starting
    Info: job status: running
    
    {
      "jobId": "",
      "jobName": "SparkPi",
      "status": "running",
      "detail": "",
      "sparkUI": "",
      "createTime": "2020-08-20 14:12:07",
      "updateTime": "2020-08-20 14:12:07",
      ...
    }
    Job Detail: {
      "name": "SparkPi",
      "className": "org.apache.spark.examples.SparkPi",
      "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": "1",
        "spark.executor.resourceSpec": "medium"
      },
      "file": "",
      "sparkUI":"https://xxx"  
    }

    Exit codes for jobs submitted by the spark-submit CLI:

    255    # Indicates that a job fails to run.
    0      # Indicates that a job succeeds.
    143    # Indicates that a job is killed.
    Note
    • For more information about how to use an AccessKey pair to submit jobs as a Resource Access Management (RAM) user, see Grant permissions to a RAM user (detailed version).

    • If you use the spark-submit CLI to upload a JAR package from your local directory, you must authorize the RAM user to access OSS. You can attach the AliyunOSSFullAccess policy to the RAM user on the Users page of the RAM console.

Kill a Spark job

Run the following command to kill a Spark job:

./spark-submit \
--kill <jobId>

The following result is returned:

## Display the result.
Info: kill job: jxxxxxxxx, response: null

Query Spark jobs

You can use command lines to query Spark jobs. For example, you can run the following command to query the one job on the first page:

./bin/spark-submit \
--list --pagenumber 1 --pagesize 1                      

The following result is returned:

## Display the result.
{
  "requestId": "",
  "dataResult": {
    "pageNumber": "1",
    "pageSize": "1",
    "totalCount": "251",
    "jobList": [
      {
        "createTime": "2020-08-20 11:02:17",
        "createTimeValue": "1597892537000",
        "detail": "",
        "driverResourceSpec": "large",
        "executorInstances": "4",
        "executorResourceSpec": "large",
        "jobId": "",
        "jobName": "",
        "sparkUI": "",
        "status": "running",
        "submitTime": "2020-08-20 11:01:58",
        "submitTimeValue": "1597892518000",
        "updateTime": "2020-08-20 11:22:01",
        "updateTimeValue": "1597893721000",
        "vcName": ""
      }
    ]
  }
}                       

Obtain the parameters and Spark web UI of a submitted job

Run the following command to obtain the parameters and Spark web UI of a submitted job:

./bin/spark-submit --status <jobId>

The following result is returned:

## Display the result.
Info: job status: success
Info:
{
  "jobId": "jxxxxxxxx",
  "jobName": "drop database if exists `",
  "status": "success",
  "detail": "xxxxxx",
  "sparkUI": "xxxxxxx",
  "createTime": "2021-05-08 20:02:28",
  "updateTime": "2021-05-08 20:04:05",
  "submitTime": "2021-05-08 20:02:28",
  "createTimeValue": "1620475348180",
  "updateTimeValue": "1620475445000",
  "submitTimeValue": "1620475348180",
  "vcName": "release-test"
}
Info: Job Detail:
set spark.sql.hive.metastore.version=dla;
set spark.dla.connectors=oss;
set spark.executor.instances=1;
set spark.sql.hive.metastore.version = dla;
set spark.dla.eni.enable = true;
set spark.dla.eni.security.group.id = xxxx ;
set spark.dla.eni.vswitch.id = xxxxx;

drop database if exists `my_hdfs_db_1` CASCADE;

Obtain job logs

Run the following command to obtain the parameters and Spark web UI of a submitted job:

./bin/spark-submit --get-log <jobId>

The following result is returned:

## Display the result.
20/08/20 06:24:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/20 06:24:58 INFO SparkContext: Running Spark version 2.4.5
20/08/20 06:24:58 INFO SparkContext: Submitted application: Spark Pi
20/08/20 06:24:58 INFO SecurityManager: Changing view acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls to: spark
20/08/20 06:24:58 INFO SecurityManager: Changing view acls groups to: 
20/08/20 06:24:58 INFO SecurityManager: Changing modify acls groups to: 
20/08/20 06:24:58 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(spark); groups with view permissions: Set(); users  with modify permissions: Set(spark); groups with modify permissions: Set()
...