This topic describes how to use Flink Python in an E-MapReduce (EMR) Dataflow cluster.

Background information

The Flink Python API in an EMR Dataflow cluster supports all features of the Apache Flink Python API. For more information about the Apache Flink Python API, see Python API.

Use Python dependencies

You can use Python dependencies based on the instructions in the following sections:

Use a custom Python virtual environment

  • Method 1: Create a Python virtual environment on a node in a Dataflow cluster
    1. On a node of a Dataflow cluster, configure the setup-pyflink-virtual-env.sh script. The following descriptions show the content of the script:
      set -e
      # Create a Python virtual environment. 
      python3.6 -m venv venv
      
      # Activate the Python virtual environment. 
      source venv/bin/activate
      
      # Install the Python virtual environment. 
      pip install --upgrade pip
      
      # Install Flink Python dependencies. 
      pip install "apache-flink==1.13.0"
      
      # Exit the Python virtual environment. 
      deactivate
    2. Run the following command to run the setup-pyflink-virtual-env.sh script:
      ./setup-pyflink-virtual-env.sh

      After you run the command, a folder named venv is generated. The information about the virtual environment for Python 3.6 is stored in this folder. You can modify the content of the preceding script to install a Python virtual environment of another version or install a third-party Python package in the virtual environment.

      To use the created Python virtual environment, you can distribute the virtual environment to all nodes in the Dataflow cluster or use the virtual environment when you submit a Flink Python job. For more information, see Command-Line Interface.

  • Method 2: Create a Python virtual environment on an on-premises developer machine
    1. On the on-premises developer machine, configure the setup-pyflink-virtual-env.sh script. The following descriptions show the content of the script:
      set -e
      # Download the miniconda.sh script for Python 3.7. 
      wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"
      
      # Add the execute permissions to the miniconda.sh script for Python 3.7. 
      chmod +x miniconda.sh
      
      # Create a Python virtual environment. 
      ./miniconda.sh -b -p venv
      
      # Activate the Conda Python virtual environment. 
      source venv/bin/activate ""
      
      # Install Flink Python dependencies. 
      pip install "apache-flink==1.13.0"
      
      # Exit the Conda Python virtual environment. 
      conda deactivate
      
      # Delete cached packages. 
      rm -rf venv/pkgs
      
      # Package the installed Conda Python virtual environment. 
      zip -r venv.zip venv
    2. On the on-premises developer machine, configure the build.sh script. The following descriptions show the content of the script:
      #!/bin/bash
      set -e -x
      yum install -y zip wget
      
      cd /root/
      bash /build/setup-pyflink-virtual-env.sh
      mv venv.zip /build/
    3. On the CLI, run the following command:
      docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh

      After you run the command, a file package named venv.zip is generated. The package is the virtual environment package for Python 3.7. You can modify the content of the preceding script to install a Python virtual environment of another version or install a third-party Python package in the virtual environment.

Use a third-party Python package

If the third-party Python package that you use is marked with the zip_safe flag, you can directly use the package in a Python job. For more information, see Python libraries.

If the third-party Python package that you use is a source code package and a file named setup.py exists in the root directory of the source code package, you must compile the third-party Python package before you can use it in most cases. You can use one of the following methods to compile the third-party Python package:
  • Compile the third-party Python package on a node of the Dataflow cluster.
  • Use a Python environment in the quay.io/pypa/manylinux2014_x86_64 image to compile the third-party Python package. After the package is compiled by using the image, the package is compatible with most Linux operating systems. For more information about the image, see manylinux.
In this example, the third-party Python package opencv-python-headless is used to describe how to compile and use a third-party Python package.
  1. Compile the third-party Python package.
    1. On the on-premises developer machine, configure the requirements.txt file. Sample command:
      opencv-python-headless
    2. On the on-premises developer machine, configure the build.sh script. The following descriptions show the content of the script:
      #!/bin/bash
      set -e -x
      yum install -y zip
      
      PYBIN=/opt/python/cp37-cp37m/bin
      
      "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt --no-deps
      cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd ..
      rm -rf __pypackages__
    3. On the CLI, run the following command:
      docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh

      After you run the command, a file named deps.zip is generated. This file is the compiled third-party Python package. You can also modify the requirements.txt file to install other required third-party Python packages. In addition, multiple Python dependencies can be specified in the requirements.txt file.

  2. Use the third-party Python package.

    For more information about how to use a third-party Python package in a Flink Python job, see Python libraries.

Use a JAR package

If your Flink Python job uses Java classes, such as a connector and a custom function in Java, you must specify the JAR package to which the connector or the custom function in Java belongs. For more information, see JAR Dependencies.

Use a data file

If you need to access a data file such as a model file in a Flink Python job, you can specify an archive file. For more information, see Archives.