All Products
Search
Document Center

AnalyticDB for MySQL:Use Airflow to schedule Spark jobs

Last Updated:Dec 30, 2023

Airflow is a popular open source tool that is used to orchestrate and schedule various workloads as directed acyclic graphs (DAGs). You can use AnalyticDB for MySQL Spark Airflow Operator or AnalyticDB for MySQL spark-submit to allow Airflow to schedule Spark jobs. This topic describes how to use Airflow to schedule AnalyticDB for MySQL Spark jobs.

Usage notes

  • For information about the supported configuration parameters of AnalyticDB for MySQL Spark, see Conf configuration parameters.

  • If you use Apache Livy to schedule Spark jobs, contact technical support to apply for an invitational preview of AnalyticDB for MySQL Spark Livy Proxy.

AnalyticDB for MySQL Spark Airflow Operator

Preparations

  1. Install and start Airflow. For more information, see Installation of Airflow.

  2. Run the following command to install the Airflow Spark plug-in:

    pip install https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20230608/qvjf/adb_spark_airflow-0.0.1-py3-none-any.whl

Procedure

  1. Create a connection. For more information, see the "Creating a Connection" section of the Using the Command Line Interface topic.

    {
      "auth_type": "AK",
      "access_key_id": "<your_access_key_ID>",
      "access_key_secret": "<your_access_key_secret>",
      "region": "<your_region>"
    }
  2. Create a declaration file for an Airflow DAG. In this example, a file named spark_dags.py is created.

    from datetime import datetime
    
    from airflow.models.dag import DAG
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkBatchOperator
    from airflow_alibaba.providers.alibaba.cloud.operators.analyticdb_spark import AnalyticDBSparkSQLOperator
    
    with DAG(
        dag_id="my_dag_name",
        start_date=datetime(2021, 1, 1),
        default_args={"cluster_id": "<your_cluster_ID>", "rg_name": "<your_resource_group>", "region": "<your_region>"},
        max_active_runs=1,
        catchup=False,
    ) as dag:
    
        spark_batch = AnalyticDBSparkBatchOperator(
            task_id="task1",
            file="oss://<bucket_name>/tmp/spark-examples.jar",
            class_name="org.apache.spark.examples.SparkPi"
        )
    
        spark_sql = AnalyticDBSparkSQLOperator(
            task_id="task2",
            sql="SHOW DATABASES;"
        )
    
        spark_batch >> spark_sql
    

    The following tables describe the parameters that are included in the declaration file.

    Supported parameters of AnalyticDBSparkBatchOperator

    Parameter

    Required

    Description

    file

    Yes

    The absolute path of the main file of the Spark application. The main file can be a JAR package that contains the entry point or an executable file that serves as the entry point for the Python application.

    Important

    The main files of Spark applications must be stored in Object Storage Service (OSS).

    The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.

    class_name

    Yes

    • The entry point of the Java or Scala application.

    • Python applications do not require entry points.

    args

    No

    The arguments of the Spark application.

    conf

    No

    The configuration parameters of the Spark application. The parameters are similar to those in Apache Spark. Specify the parameters in the key: value format. For information about the parameters that are different from those in Apache Spark and the parameters that are specific to AnalyticDB for MySQL, see Conf configuration parameters.

    jars

    No

    The absolute paths of the JAR packages that are required for the Spark application. When a Spark application runs, JAR packages are added to the classpaths of the driver and executor Java virtual machines (JVMs).

    Important

    All JAR packages that are required for Spark applications must be stored in OSS.

    The OSS bucket and the AnalyticDB for MySQL cluster must reside in the same region.

    py_files

    No

    The Python files that are required for the PySpark application. The files must be in the ZIP, PY, or EGG format. If multiple Python files are required, we recommend that you use the files in the ZIP or EGG format. You can reference Python files as modules in Python code.

    Important

    All Python files that are required for Spark applications must be stored in OSS.

    files

    No

    The files that are required for the Spark application. These files are downloaded to the working directories of the driver and executor processes.

    You can configure aliases for these files. Example: oss://<testBucketName>/test/test1.txt#test1. In this example, test1 is the file alias. You can access the file by specifying ./test1 or ./test1.txt.

    Note

    If the log4j.properties file is included in the files parameter, the Spark application uses the log4j.properties file as the log configuration file.

    All files that are required for Spark applications must be stored in OSS.

    driver_resource_spec

    No

    The resource specifications of the Spark driver. Default value: medium.

    Each type corresponds to distinct specifications. For more information, see the Type column in the "Spark resource specifications" table of the Conf configuration parameters topic.

    Note

    You must specify the same value for the spark.driver.resourceSpec and spark.executor.resourceSpec parameters.

    If you submit only Spark batch applications, you can use Apache Spark parameters and configure the parameters based on the number of cores and the memory size that are described in the "Spark resource specifications" table of the Conf configuration parameters topic.

    executor_resource_spec

    No

    The resource specifications of a Spark executor. Default value: medium.

    Each type corresponds to distinct specifications. For more information, see the Type column in the "Spark resource specifications" table of the Conf configuration parameters topic.

    num_executors

    No

    The number of Spark executors. Default value: 3.

    archives

    No

    The compressed packages that are required for the Spark application. Specify the packages in the .TAR.GZ format. The packages are decompressed to the working directory of the Spark process.

    You can configure aliases for the files contained in the packages. Example: oss://testBucketName/test/test1.tar.gz#test1. In this example, test1 is used as the file alias. For example, test2.txt is a file in the test1.tar.gz package. You can access the file by specifying ./test1/test2.txt or ./test1.tar.gz/test2.txt.

    Note

    All compressed packages that are required for Spark applications must be stored in OSS. If a package fails to be decompressed, the task fails.

    name

    No

    The name of the Spark application.

    cluster_id

    Yes

    The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.

    rg_name

    Yes

    The name of the job resource group that is used in the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.

    adb_spark_conn_id

    Yes

    The Airflow connection ID of AnalyticDB for MySQL Spark. Default value: adb_spark_default.

    region

    Yes

    The region ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.

    polling_interval

    No

    The interval at which the Spark application status is scanned.

    Supported parameters of AnalyticDBSparkSQLOperator

    Parameter

    Required

    Description

    SQL

    Yes

    The Spark SQL statement.

    conf

    No

    The configuration parameters of the Spark application. The parameters are similar to those in Apache Spark. Specify the parameters in the key: value format. For information about the parameters that are different from those in Apache Spark and the parameters that are specific to AnalyticDB for MySQL, see Conf configuration parameters.

    driver_resource_spec

    No

    The resource specifications of the Spark driver. Default value: medium.

    Each type corresponds to distinct specifications. For more information, see the Type column in the "Spark resource specifications" table of the Conf configuration parameters topic.

    Note

    You must specify the same value for the spark.driver.resourceSpec and spark.executor.resourceSpec parameters.

    If you submit only Spark batch applications, you can use Apache Spark parameters and configure the parameters based on the number of cores and the memory size that are described in the "Spark resource specifications" table of the Conf configuration parameters topic.

    executor_resource_spec

    No

    The resource specifications of a Spark executor. Default value: medium.

    Each type corresponds to distinct specifications. For more information, see the Type column in the "Spark resource specifications" table of the Conf configuration parameters topic.

    num_executors

    No

    The number of Spark executors. Default value: 3.

    name

    No

    The name of the Spark application.

    cluster_id

    Yes

    The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.

    rg_name

    Yes

    The name of the job resource group that is used in the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.

    adb_spark_conn_id

    Yes

    The Airflow connection ID of AnalyticDB for MySQL Spark. Default value: adb_spark_default.

    region

    Yes

    The region ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.

    polling_interval

    No

    The interval at which the Spark application status is scanned.

  3. Store the spark_dags.py file in the folder where the Airflow configuration declaration file dags_folder is located.

  4. Execute the DAG. For more information, see Installation of Airflow.

AnalyticDB for MySQL spark-submit

Note

You can configure specific parameters of AnalyticDB for MySQL in the spark-defaults.conf file that is stored in the conf folder of AnalyticDB for MySQL Spark. You can also use Airflow parameters to configure the parameters. The specific parameters of AnalyticDB for MySQL include clusterId, regionId, keyId, secretId, and ossUploadPath. For more information, see the "Configure the parameters" section of the AnalyticDB for MySQL spark-submit topic.

Preparations

Preparation 1: Install Airflow

  1. Install and start Airflow. For more information, see Installation of Airflow.

  2. Run the following command to install the Airflow Spark plug-in:

    pip3 install apache-airflow-providers-apache-spark
    Important
    • You must install Python 3 before you can install the Airflow Spark plug-in.

    • When you install the apache-airflow-providers-apache-spark plug-in, PySpark, which is developed by the Apache Spark community, is automatically installed. If you want to uninstall PySpark, run the following command:

      pip3 uninstall pyspark

Preparation 2: Download and configure AnalyticDB for MySQL spark-submit

  1. Download AnalyticDB for MySQL spark-submit and configure the parameters. For more information, see AnalyticDB for MySQL spark-submit.

  2. Run the following command to add the address of AnalyticDB for MySQL spark-submit to the path of Airflow:

    export PATH=$PATH:</your/adb/spark/path/bin>
    Important

    Before you start Airflow, you must add the address of AnalyticDB for MySQL spark-submit to the path of Airflow. Otherwise, the system may fail to find spark-submit to schedule jobs.

Procedure

  1. Create a declaration file for an Airflow DAG to declare a Spark workflow. In this example, a file named demo.py is created.

    from airflow.models import DAG
    from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
    from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
    from airflow.utils.dates import days_ago
    args = {
        'owner': 'Aliyun ADB Spark',
    }
    with DAG(
        dag_id='example_spark_operator',
        default_args=args,
        schedule_interval=None,
        start_date=days_ago(2),
        tags=['example'],
    ) as dag:
        adb_spark_conf = {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.resourceSpec": "medium"
        }
        # [START howto_operator_spark_submit]
        submit_job = SparkSubmitOperator(
            conf=adb_spark_conf,
            application="oss://<bucket_name>/jar/pi.py",
            task_id="submit_job",
            verbose=True
        )
        # [END howto_operator_spark_submit]
        # [START howto_operator_spark_sql]
        sql_job = SparkSqlOperator(
            conn_id="spark_default",
            sql="SELECT * FROM yourdb.yourtable",
            conf=",".join([k + "=" + v for k, v in adb_spark_conf.items()]),
            task_id="sql_job",
            verbose=True
        )
        # [END howto_operator_spark_sql]
        submit_job >> sql_job
    
  2. Store the demo.py file in the dags folder of the Airflow installation directory.

  3. Execute the DAG. For more information, see Installation of Airflow.