All Products
Search
Document Center

E-MapReduce:Start a Spark task

Last Updated:Jul 19, 2024

You can submit Spark tasks in the E-MapReduce (EMR) console or by calling API operations. Alibaba Cloud provides SDKs for different languages to encapsulate APIs. This topic describes how to call API operations based on Python to submit a Spark task.

Prerequisites

  • An AccessKey pair is created. For more information, see Create an AccessKey pair.

    Note

    To prevent security risks caused by the leak of the AccessKey pair of your Alibaba Cloud account, we recommend that you create a RAM user and grant the RAM user the permissions required to access EMR Serverless Spark. Then, you can use the AccessKey pair of the RAM user to call the desired SDK.

  • Python 3 is prepared.

  • Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. For more information, see Configure environment variables in Linux, macOS, and Windows.

Install EMR Serverless Spark SDK for Python

Run the following command to install the SDK for Python:

pip install alibabacloud_emr_serverless_spark20230808==1.0.0

Example

The following sample code is provided only for reference. You can modify the sample code based on your business requirements.

For information about the endpoints of EMR Serverless Spark, see Endpoints.

# -*- coding: utf-8 -*-
import os

from typing import List
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_emr_serverless_spark20230808.client import Client
from alibabacloud_emr_serverless_spark20230808.models import (
    StartJobRunRequest,
    Tag,
    JobDriver,
    JobDriverSparkSubmit,
)

from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


# Make sure that the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables are configured. 
# If the project code is leaked, the AccessKey pair may be leaked and the security of all resources in your account may be compromised. The following sample code provides an example on how to use environment variables to obtain an AccessKey pair and use the AccessKey pair to call API operations. The sample code is provided only for reference. We recommend that you use Security Token Service (STS), which provides higher security. 
# Replace the environment variable in the endpoint with the ID of the region in which EMR Serverless Spark is available. 
def create_client() -> Client:
    config = open_api_models.Config(
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
    )
    config.endpoint = f'emr-serverless-spark.cn-hangzhou.aliyuncs.com'
    return Client(config)


def example_jar():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/jars/spark-examples_2.12-3.3.1.jar",
        ["1"],
        "--class org.apache.spark.examples.SparkPi --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="JAR",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_sql():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql",
        ["-f", "oss://<YourBucket>/spark-resource/examples/sql/show_db.sql"],
        "--class org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver --conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    # configuration_overrides = StartJobRunRequestConfigurationOverrides([StartJobRunRequestConfigurationOverridesConfigurations("test", "test", "test")])
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="SQL",
        name="airflow-sql-test",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver,
        # configuration_overrides=configuration_overrides
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)

def example_py():
    print("Let's run a simple test...")
    client = create_client()
    tags: List[Tag] = [Tag("environment", "production"), Tag("workflow", "true")]
    job_driver_spark_submit = JobDriverSparkSubmit(
        "oss://<YourBucket>/spark-resource/examples/src/main/python/pi.py",
        ["50"],
        "--conf spark.executor.cores=4 --conf spark.executor.memory=20g --conf spark.driver.cores=4 --conf spark.driver.memory=8g --conf spark.executor.instances=1"
    )

    job_driver = JobDriver(job_driver_spark_submit)
    start_job_run_request = StartJobRunRequest(
        region_id="cn-hangzhou",
        resource_queue_id="root_queue",
        code_type="PYTHON",
        name="emr-spark-task",
        release_version="esr-2.1-native (Spark 3.3.1, Scala 2.12, Native Runtime)",
        tags=tags,
        job_driver=job_driver
    )
    runtime = util_models.RuntimeOptions()
    headers = {}
    try:
        response = client.start_job_run_with_options('w-ae42e9c92927****', start_job_run_request, headers,
                                                     runtime)
        print(response.body.to_map())
    except Exception as error:
        print(error.message)
        print(error.data.get("Recommend"))
        UtilClient.assert_as_string(error.message)


example_jar()
# example_sql()
# example_py()