全部产品
Search
文档中心

PySpark

更新时间: 2021-06-24

本文展示如何提交PySpark作业以及使用自定义Virtualenv。

PySpark基本使用方式

1.开发主程序文件

您可以建立如下内容的example.py文件,示例中定义main函数可以允许PySpark找到程序的统一启动入口。

from __future__ import print_function
from pyspark.sql import SparkSession


# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    df = spark.sql("SELECT 2021")
    # print schema and data to the console
    df.printSchema()
    df.show()

2.执行主程序文件

  1. 和Scala、Java程序开发的JAR包一样,您需要将example.py文件上传到OSS中,并在Spark的启动配置中使用file来指定这个文件为启动文件。

  2. 在DLA控制台的Serverless->作业管理页面,使用如下示例代码配置作业。

    {
    
        "name": "Spark Python",
    
        "file": "oss://{your bucket name}/example.py"
    
        "conf": {
    
            "spark.driver.resourceSpec": "small",
    
            "spark.executor.instances": 2,
    
            "spark.executor.resourceSpec": "small",
    
            "spark.kubernetes.pyspark.pythonVersion": "3"
    
      }
    }

    注意
    • 您需要在配置时将{your bucket name}替换为您使用的OSS的Bucket名称。

    • 示例中使用Python 3执行文件,和社区版的Spark相同,通过spark.kubernetes.pyspark.pythonVersion配置使用的Python版本,默认为Python 2.7。

  3. 单击执行

如何上传自行开发的或者第三方开发的Module

当开发Python程序时,往往会用到自行开发的或者由第三方开发的各种Module模块,这些模块可以上传并加载到PySpark的执行环境中,被主程序调用。

以计算员工的税后收入为例,步骤如下。

1. 准备测试数据

新建一个如下格式的CSV文件,命名为staff.csv,并上传到OSS中。文件反映了每个员工的信息和收入情况。

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
说明

如何将文件上传到OSS请参见简单上传

2. 开发一个依赖方法

  1. 创建一个文件夹tools

  2. tools文件夹中创建一个文件func.py,文件内容如下。

    def tax(salary):
        """
        convert string to int
        then cut 15% tax from the salary
        return a float number
    
        :param salary: The salary of staff worker
        :return:
        """
        return 0.15 * int(salary)
  3. tools文件夹压缩为tools.zip后上传到OSS中。压缩包的生成方式如下。p286346

    注意

    不同操作系统平台的ZIP压缩工具会略有区别,请保证解压后可以看到顶层目录是tools文件夹。

3. 开发主程序

开发一个Spark的Python程序,将测试中的CSV从OSS中读取出来,注册为一个DataFrame。同时将依赖包中的tax方法注册为一个Spark UDF,然后使用该UDF对刚刚生成的DataFrame进行计算并打印结果。

示例代码如下, 您需要在配置时将{your bucket name}替换为您使用的OSS的Bucket名称。

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

将代码写入到example.py中,并上传到OSS。

4. 提交任务

在DLA控制台的Serverless->作业管理页面,新建一个作业,并提交以下作业信息。

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/tools.zip"],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

代码中主要参数说明。

参数

说明

是否必选

conf

Spark任务用到的配置参数,需要的配置项如下。

  • "spark.dla.connectors": "oss" :此任务需要有连接OSS的能力。

  • "spark.kubernetes.pyspark.pythonVersion": "3" :此任务需要使用Python 3来执行。

说明

更多参数说明请参见作业配置指南

PySpark使用自定义Virtualenv

当需要复杂的第三方依赖包时,可以使用Virtualenv来将本地调试环境上传到云端的Spark集群中。这种方式可以将大量复杂的系统包,如Pandas、Numpy、PyMySQL等装入隔离环境,并迁移到相同的操作系统中。您可以选择如下两种方案。

说明

Virtualenv的更多信息请参见Python官方社区venv说明

自行生成Virtualenv压缩包

1.准备Linux环境

由于Virtualenv需要相同的操作系统,当前上传到DLA Spark使用的压缩包必须在Linux环境下的进行安装。您可以采用如下方式准备Linux环境。

  • 准备一台Centos7的电脑进行打包。

  • 在阿里云以按量付费的方式新开一台Centos 7的ECS,使用完毕后关闭。

  • 使用Centos 7的官方Docker镜像,在镜像内部打包。

2.在Linux环境下打包Python执行环境

常用的执行环境打包工具包括VirtualenvConda,您可以根据您的需要来选择对应的工具,并安装工具到您的Linux环境中。

注意
  • 当前Serverless Spark支持的Python版本为3.7及以下主版本。

  • Spark运行环境为Centos 7,请使用该环境打包Venv(推荐使用Docker中的环境打包)。

以下示例使用Virtualenv生成一个执行环境压缩包venv.zip,压缩包中包含了scikit-spark的特定版本。

# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv

# active environment
source venv/bin/activate

# install third part modules
pip install scikit-spark==0.4.0

# check the result
pip list

# zip the environment
zip -r venv.zip venv
说明

如何使用Conda生成执行环境,请参见Conda管理虚拟环境

3.在Spark中使用Python执行环境

您可以在提交Spark作业时,使用如下的代码配置作业。其中spark.pyspark.python的参数值表示上传的压缩文件中的运行包。更多参数说明,请参见作业参数说明

{
    "name": "venv example",
    "archives": [
        "oss://test/venv.zip#PY3"
    ],
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.pyspark.python": "./PY3/venv/bin/python3",
        "spark.executor.resourceSpec": "medium"
    },
    "file": "oss://test/example.py"
}
说明

与Spark开源社区的语义相同,venv.zip#PY3代表将压缩包解压到计算节点工作目录的PY3文件夹下,继而可以从本地访问。如果不使用#指定文件夹名称,则默认使用文件名称作为新建的文件夹名。

使用镜像工具生成Virtualenv压缩包

1.使用如下命令拉取镜像。

docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1

2.将需要生成环境的requirements.txt文件放置在/home/admin文件夹中,并将此文件夹挂载到Docker中。

docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
说明

requirements.txt是Python的标准依赖包描述文件,更多信息请参见User Guide

打包程序自动化执行,您可以看到如下日志。

adding: venv-20210611-095454/lib64/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
  adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
  venv-20210611-095454.zip

3.在/home/admin文件夹下找到打包好的压缩文件venv-20210611-095454.zip。如何使用压缩包请参见在Spark中使用Python执行环境

4.(可选)关于Docker镜像的更多使用说明,您可以执行如下命令查看。

docker run -i dla-venv:0.1

Used to create venv package for Aliyun DLA 
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path 
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt 
 -p python version, could be python2 or python3
 -f path to requirements.txt, default is /tmp/requirements.txt

常见问题

如果在使用镜像工具生成Virtualenv压缩包时自动打包失败,您可以通过执行如下命令启动Linux Centos 7环境,并以Root权限进入环境内部进行操作。

docker run -ti --entrypoint bash dla-venv:0.1

后续操作请参见自行生成Virtualenv压缩包