All Products
Search
Document Center

AnalyticDB for MySQL:Use PySpark to develop Spark applications

Last Updated:Dec 30, 2023

This topic describes how to develop AnalyticDB for MySQL Spark Python jobs and use the virtual environments technology to package the runtime environment of Python jobs.

Prerequisites

Use PySpark

  1. Write the following sample code and save the code in a file named example.py:

    from pyspark.sql import SparkSession
    
    if __name__ == "__main__":
        spark = SparkSession.builder.getOrCreate()
        df = spark.sql("SELECT 1+1")
        df.printSchema()
        df.show()
    
  2. Upload the example.py file to OSS. For more information, see Upload objects.

  3. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  4. In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is used.

  5. Run the following code in the editor:

    {
    
     "name": "Spark Python Test",
     "file": "oss://{your oss bucket path}/example.py",
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.executor.resourceSpec": "small"
     }
    }

    For information about the parameters, see Spark application development.

Use Python dependencies

Method

If you develop Python applications by using self-developed or third-party dependencies, you must upload the dependencies to OSS and configure the pyFiles parameter when you submit a Spark job.

Example

This section shows an example on how to calculate the after-tax incomes of employees by using a custom function. In this example, a file named staff.csv is uploaded to OSS. The staff.csv file contains the following data:

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
  1. Compile and upload dependencies to OSS.

    1. Create a folder named tools. Create a file named func.py in the folder.

      def tax(salary):
          """
          convert string to int and cut 15% tax from the salary
      
          :param salary: The salary of staff worker
          :return:
          """
          return 0.15 * int(salary)
      
    2. Compress and upload the tools folder to OSS. In this example, the folder is compressed to the tools.tar.gz package.

      Note

      If multiple dependent Python files are required, we recommend that you compress the files into a .gz package. You can reference Python files in Python code as modules.

  2. Write the following sample code and save the code in a file named example.py:

    from __future__ import print_function
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    
    import sys
    
    # import third party file
    from tools import func
    
    if __name__ == "__main__":
        # init pyspark context
        spark = SparkSession.builder.appName("Python Example").getOrCreate()
        # read csv from oss to a dataframe, show the table
        cvs_file = sys.argv[1]
        df = spark.read.csv(cvs_file, mode="DROPMALFORMED", inferSchema=True, header=True)
        # print schema and data to the console
        df.printSchema()
        df.show()
        # create an udf
        taxCut = udf(lambda salary: func.tax(salary), FloatType())
        # cut tax from salary and show result
        df.select("name", taxCut("salary").alias("final salary")).show()
        spark.stop()
    
  3. Upload the example.py file to OSS. For more information, see Upload objects.

  4. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  5. In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is used.

  6. Run the following code in the editor:

    {
     "name": "Spark Python",
     "file": "oss://<bucket name>/example.py",
     "pyFiles": ["oss://<bucket name>/tools.tar.gz"],
     "args": [
     "oss://<bucket name>/staff.csv"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 2,
     "spark.executor.resourceSpec": "small"
     }
    }

    Parameters:

    • file: the OSS path of the Python code.

    • pyFiles: the OSS path of the Python dependencies that are required for PySpark. The suffix of the path can be tar or tar.gz. Separate multiple packages with commas (,).

      Note

      All Python dependencies that are required for PySpark must be stored in OSS.

    • args: the parameters that are required for JAR packages. In this example, the OSS path of the staff.csv file is used.

    For information about the parameters, see Spark application development.

Use the virtual environments technology to package dependent environments

If you encounter complex dependent environments when you develop Python jobs, you can use the virtual environments technology of Python to manage and isolate the environments. AnalyticDB for MySQL Spark allows you to use the virtual environments technology to package and upload the on-premises dependent environments to OSS. For more information about virtual environments, see Python documentation.

Important

AnalyticDB for MySQL Spark uses glibc-devel 2.28. If this version is not compatible with the virtual environments technology, PySpark jobs may fail to be run.

Method

To use the virtual environments technology to package Python environments, you must compress and upload the Python environments to OSS and configure the archives and spark.pyspark.python parameters when you submit a Spark job.

Example

  1. Prepare a Linux operating system.

    A Linux operating system is required to package Python environments based on the virtual environments technology. You can use one of the following methods to prepare a Linux operating system. In this example, an Elastic Compute Service (ECS) instance is purchased.

    • Purchase an ECS instance that runs Centos 7 or AnolisOS 8. For more information, see Create an instance by using the wizard.

    • Install an operating system of Centos 7, AnolisOS 8, or later on your on-premises device.

    • Use an official Docker image of Centos or AnolisOS and package Python environments in the image.

  2. Use the virtual environments technology to package Python environments and upload the package to OSS.

    Use virtualenv or conda to package the dependent Python environments. You can customize the Python version during packaging. In this example, virtualenv is used.

    # Create directory venv at current path with python3
    # MUST ADD --copies !
    virtualenv --copies --download --python Python3.7 venv
    
    # active environment
    source venv/bin/activate
    
    # install third part modules
    pip install scikit-spark==0.4.0
    
    # check the result
    pip list
    
    # compress the environment
    tar -czvf venv.tar.gz venv
    Note

    For information about how to use conda to package the dependent Python environments, see Managing environments.

  3. Go to the Spark editor.

    1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.

    2. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  4. In the upper part of the editor, select a job resource group and a Spark job type. In this example, the Batch type is used.

  5. Run the following code in the editor:

    {
     "name": "venv example",
     "archives": [
     "oss://<bucket name>/venv.tar.gz#PY3"
     ],
     "conf": {
     "spark.driver.resourceSpec": "small",
     "spark.executor.instances": 1,
     "spark.pyspark.python": "./PY3/venv/bin/python3",
     "spark.executor.resourceSpec": "small"
     },
     "file": "oss://<bucket name>/example.py"
    }

    Parameters:

    • archives: the OSS path of the Python environment package. In this example, the OSS path of the venv.tar.gz package is used.

    • spark.pyspark.python: the path of the Python interpreter on your on-premises device.

    For information about the parameters, see Spark application development.