This topic describes how to publish a Python job to a fully managed Flink cluster.

Upload resources

Before you can run a Python API job, perform the following steps to upload the Python job file or Python dependency file to the console of fully managed Flink.

  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Artifacts.
  4. Click Upload Artifact. Select the Python job file or Python dependency file that you want to upload.

Publish a job

  1. Log on to the console of fully managed Flink and create a job.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click New.
    5. In the New Draft dialog box, configure the parameters of the job. The following table describes the parameters.
      Parameter Description
      Name The name of the job that you want to create.
      Note The job name must be unique in the current project.
      Type You must set the value of this parameter to STREAM / PYTHON.
      Streaming jobs and batch jobs support the following file types:
      • SQL
      • JAR
      • PYTHON
      Note Only Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 3.0.1 or later supports batch jobs.
      Deployment Target The cluster in which the job is deployed.
      Note Python jobs support only Per-Job clusters.
      Location The folder in which the code file of the job is saved.

      You can click the Create Folder icon to the right of an existing folder to create a subfolder.

    6. Click OK.
  2. On the Draft Editor page, configure basic settings.
    You can directly configure basic settings. You can also click YAML in the lower-right corner of the page to modify the existing settings. The following table describes the parameters.
    Parameter Description
    Deployment Target You can change the cluster that you selected when you created the job to another one.
    Python Uri The Uniform Resource Identifier (URI) to access the Python job file that you want to upload. Python job files can be .py files or .zip files.
    Entry Module The entry point class of the program. If you select a .py Python job file, you do not need to specify this parameter. If you select a .zip Python job file, you must specify this parameter. For example, you can enter example.word_count in the Entry Module field.
    Entry Point Main Arguments Job parameters.
    Python Libraries A third-party Python package. The third-party Python package that you uploaded is added to PYTHONPATH of the Python worker process so that the package can be directly accessed in Python user-defined functions (UDFs). For more information about how to use third-party Python packages, see Use a third-party Python package.
    Python Archives Archive files. Only ZIP files such as .zip, .jar, .whl, and .egg are supported.
    Archive files are decompressed to the working directory of the Python worker process. For example, if the name of the compressed file where the archive files are located is mydata.zip, the following code can be written in the Python UDFs to access the mydata.zip archive file.
    def map():  
        with open("mydata.zip/mydata/data.txt") as f: 
        ...

    For more information about Python Archives, see Use a custom Python virtual environment and Use data files.

    Additional Dependencies The JAR packages or data files that you want to upload. The selected files are automatically uploaded to the /flink/usrlib/ directory of the node where the job is running.
    Note Session clusters do not support the configuration of the Additional Dependencies parameter. Only Per-Job clusters support the configuration of the Additional Dependencies parameter.
  3. On the right side of the Draft Editor page, click the Advanced tab and configure the parameters based on your business requirements.
    The following table describes the parameters.
    Section Parameter Description
    Configuration Engine Version Valid values: vvr-4.0.7-flink-1.13 and vvr-3.0.3-flink-1.12.
    Edit Labels You can specify labels for jobs. This way, you can easily search for jobs by using the labels.
    Flink Configuration Checkpointing Interval The interval at which checkpoints are scheduled. If you do not configure this parameter, the checkpointing feature is disabled.
    Min Time Between Checkpoints The minimum interval between two checkpoints. If the maximum parallelism of checkpoints is 1, this parameter specifies the minimum interval between the two checkpoints.
    Enabled Unaligned Checkpoints The Unaligned Checkpoints feature reduces the time required to run checkpoints when backpressure occurs. However, this increases the state size of a single checkpoint.
    Flink Restart Strategy If a task fails and the checkpointing feature is disabled, the JobManager cannot be restarted. If the checkpointing feature is enabled, the JobManager is restarted. Valid values:
    • Failure Rate: The JobManager is restarted if the number of failures within the specified interval exceeds the upper limit.

      If you select Failure Rate from the Flink Restart Policy drop-down list, you must set the Failure Rate Interval, Max Failures per Interval, and Delay Between Restart Attempts parameters.

    • Fixed Delay: The JobManager is restarted at a fixed interval.

      If you select Fixed Delay from the Flink Restart Policy drop-down list, you must set the Number of Restart Attempts and Delay Between Restart Attempts parameters.

    • No Restarts: The JobManager is not restarted. This is the default value.
    Additional Configuration Other Flink settings, such as taskmanager.numberOfTaskSlots: 1.
    Logging Allow Log Archives By default, Allow Log Archives is turned on. After Allow Log Archives is turned on in the Logging section, you can view the logs of a historical job instance on the Logs tab. For more information, see View the logs of a historical job instance.
    Note
    • In VVR 3.X, only VVR 3.0.7 and later minor versions allow you to turn on Allow Log Archives in the Logging section of the Advanced tab for a job.
    • In VVR 4.X, only VVR 4.0.11 and later minor versions allow you to turn on Allow Log Archives in the Logging section of the Advanced tab for a job.
    Log Archives Expires(Unit: day) By default, the archived log files are valid for seven days.
    Root Log Level You can specify the following log levels. The levels are listed in ascending order of urgency.
    1. TRACE: records finer-grained information than DEBUG logs.
    2. DEBUG: records the status of the system.
    3. INFO: records important system information.
    4. WARN: records the information about potential issues.
    5. ERROR: records the information about errors and exceptions that occur.
    Log Levels Enter the log name and log level.
    Logging Profile The log template. You can use the system template or configure a custom template.
  4. In the upper-right corner of the Draft Editor page, click Publish.