This topic describes how to publish a Python job to a fully managed Flink cluster.

Upload resources

Before you can run a Python API job, perform the following steps to upload the Python job file or Python dependency file to the console of fully managed Flink.

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Artifacts.
  4. Click Upload Artifact. Select the Python job file or Python dependency file that you want to upload.

Publish a job

  1. Log on to the console of fully managed Flink and create a job.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click New.
    5. In the New Draft dialog box, configure the parameters of the job. The following table describes the parameters.
      Parameter Description
      Name The name of the job that you want to create.
      Note The job name must be unique in the current project.
      Type Streaming jobs and batch jobs support the following file types:
      • SQL
      • JAR
      • PYTHON
      Note Only Flink that uses VVR 3.0.1 or later supports batch jobs.
      Deployment Target The cluster in which the job is deployed.
      Note Python jobs support only Per-Job clusters.
      Location The folder in which the code file of the job is saved.

      You can click the Create Folder icon to the right of an existing folder to create a subfolder.

    6. Click OK.
  2. On the Draft Editor page, configure basic settings.
    You can directly configure basic settings. You can also click YAML in the lower-right corner of the page to modify the existing settings. The following table describes the parameters.
    Parameter Description
    Deployment Target You can change the cluster that you selected when you created the job to another one.
    Python Uri The Uniform Resource Identifier (URI) to access the Python job file that you want to upload. Python job files can be .py files or .zip files.
    Entry Module The entry point class of the program. If you select a .py Python job file, you do not need to specify this parameter. If you select a .zip Python job file, you must specify this parameter. For example, you can enter example.word_count in the Entry Module field.
    Entrypoint main args Job parameters.
    Python Libraries A third-party Python package. The third-party Python package that you uploaded is added to PYTHONPATH of the Python worker process so that the package can be directly accessed in Python user-defined functions (UDFs). For more information about how to use third-party Python packages, see Use a third-party Python package.
    Python Archives Archive files. Only ZIP files such as .zip, .jar, .whl, and .egg are supported.
    Archive files are decompressed to the working directory of the Python worker process. For example, if the name of the compressed file where the archive files are located is mydata.zip, the following code can be written in the Python UDFs to access the mydata.zip archive file.
    def map():  
        with open("mydata.zip/mydata/data.txt") as f: 
        ...

    For more information about Python Archives, see Use a custom Python virtual environment and Use data files.

    Additional Dependencies The JAR packages or data files that you want to upload. The selected files are automatically uploaded to the /flink/usrlib/ directory of the node where the job is running.
    Note Session clusters do not support the configuration of the Additional Dependencies parameter. Only Per-Job clusters support the configuration of the Additional Dependencies parameter.
    Parallelism The number of jobs that run in parallel.
  3. On the right side of the Draft Editor page, click the Advanced tab and configure the parameters based on your business requirements.
    The following table describes the parameters.
    Section Parameter Description
    Basic Configuration Deployment Target You can change the cluster that you selected when you created the job to another one.
    Additional Dependencies If you want to add more dependency files, select a file or enter a valid file address in this field.
    Note Session clusters do not support the configuration of the Additional Dependencies parameter. Only Per-Job clusters support the configuration of the Additional Dependencies parameter.
    Configuration Engine Version Valid values: vvr-4.0.7-flink-1.13 and vvr-3.0.3-flink-1.12.
    Edit Labels You can specify labels for jobs. This way, you can easily search for jobs by using the labels.
    Behavior Max Job Creation Attempts The number of retries allowed after the instance fails to be created.
    Stop with Drain If you turn on Stop with Drain, all event time-based windows are triggered when you manually stop a job.
    Flink Configuration Checkpointing Interval The interval at which checkpoints are scheduled. If you do not configure this parameter, the checkpointing feature is disabled.
    Min Time Between Checkpoints The minimum interval between two checkpoints. If the maximum parallelism of checkpoints is 1, this parameter specifies the minimum interval between the two checkpoints.
    Enabled Unaligned Checkpoints The Unaligned Checkpoints feature reduces the time required to run checkpoints when backpressure occurs. However, this increases the state size of a single checkpoint.
    Flink Restart Policy If a task fails and the checkpointing feature is disabled, JobManager cannot be restarted. If the checkpointing feature is enabled, JobManager is restarted. Valid values:
    • Failure Rate: JobManager is restarted if the number of failures within the specified interval exceeds the upper limit.
    • Fixed Delay: JobManager is restarted at a fixed interval.
    • No Restarts: JobManager is not restarted. This is the default value.
    Additional Configuration Other Flink settings, such as taskmanager.numberOfTaskSlots: 1.
    Logging Log Archiving By default, Allow Log Archives is turned on. The Log Archive Expires parameter is set to 7. Unit: days. After Allow Log Archives is turned on in the Logging section, you can view the logs of a historical job instance on the Logs tab. For more information, see View the logs of a historical job instance.
    Note
    • In VVR 3.X, only VVR 3.0.7 and later minor versions allow you to turn on Allow Log Archives in the Logging section of the Advanced tab for a job.
    • In VVR 4.X, only VVR 4.0.11 and later minor versions allow you to turn on Allow Log Archives in the Logging section of the Advanced tab for a job.
    Root Log Level You can specify the following log levels. The levels are listed in ascending order of urgency.
    1. TRACE: records finer-grained information than DEBUG logs.
    2. DEBUG: records the status of the system.
    3. INFO: records important system information.
    4. WARN: records the information about potential issues.
    5. ERROR: records the information about errors and exceptions that occur.
    Log Levels Enter the log name and log level.
    Logging Profile The log template. You can use the system template or configure a custom template.
  4. In the upper-right corner of the Draft Editor page, click Publish.