This topic describes how to publish DataStream API jobs to clusters and run the jobs in fully managed Flink.

Upload a JAR package

Before you run a DataStream API job, perform the following steps to upload the JAR package, Python job file, or Python dependency file to the console of fully managed Flink.
Note A maximum of 200 packages can be uploaded. The size of each package cannot exceed 200 MB. If the size of the JAR package exceeds 200 MB, we recommend that you upload the package in the Object Storage Service (OSS) console. For more information, see How do I upload a JAR package in the Object Storage Service (OSS) console?
  1. Log on to the Realtime Compute for Apache Flink console.
  2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
  3. In the left-side navigation pane, click Artifacts.
  4. In the upper-right corner of the page, click Upload Artifact and select the JAR package that you want to upload.
    If the job is a Python API job, upload the official JAR package of PyFlink. To download the official JAR package of the required version, click PyFlink V1.11 or PyFlink V1.12.
    Note An entry point for publishing Python jobs is added to Ververica Platform (VVP) 2.4.0. We recommend that you publish Python jobs from this entry point. For more information, see Overview.

Create a job

  1. Log on to the console of fully managed Flink to create a job.
    1. Log on to the Realtime Compute for Apache Flink console.
    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.
    3. In the left-side navigation pane, click Draft Editor.
    4. Click New.
    5. In the New Draft dialog box, configure the parameters of the job. The following table describes the parameters.
      Parameter Description
      Name The name of the job.
      Note The job name must be unique in the current project.
      Type Streaming jobs and batch jobs support the following file types:
      • SQL
      • JAR
      • PYTHON
      Note Ververica Platform (VVP) 2.4.1 and its later versions and Ververica Runtime (VVR) 3.0.1 and its later versions support batch jobs.
      Deployment Target The cluster in which the job is deployed. You must select a cluster type before you can select a cluster. The following cluster types are supported:
      • Per-Job Clusters: suitable for jobs that consume a large number of resources or jobs that run in a continuous and stable manner. This is the default value. Each job requires an independent JobManager to achieve resource isolation between jobs. Therefore, the resource utilization of JobManagers for jobs that involve a small amount of data is low.
      • Session Clusters: suitable for jobs that consume few resources or jobs that start and stop frequently. Multiple jobs can reuse the same JobManager. This improves the resource utilization of JobManager.
      Note If you need to enable the SQL preview feature, you must select Session Clusters and turn on Use for SQL Editor previews. For more information, see Debug a job and Configure a session cluster.
      Locate The folder in which the code file of the job is saved.

      You can also click the Create Folder icon next to an existing folder to create a subfolder.

    6. Click OK.
  2. On the Draft Editor page, enter the basic configuration information.
    You can directly enter the following configuration information. You can also click YAML in the lower-right corner of the page to directly modify the configuration information. The following table describes the parameters.
    Parameter Description
    Deployment Target You can change the cluster that you selected when you created the job to another one.
    JAR URI Select a file or manually upload a new file. You can drag the file that you want to upload to this field or click the Upload icon on the right to select the file that you want to upload.
    Note If the job is a Python API job, upload the official JAR package of PyFlink. For more information about how to download the official JAR package, see PyFlink V1.11 and PyFlink V1.12.
    Entrypoint Class The entry class of the program. If you do not specify a main class for the JAR package, enter a standard path in the Entrypoint Class field.
    Note If the job is a Python API job, set Entrypoint Class to org.apache.flink.client.python.PythonDriver.
    Entrypoint main args You can pass parameters and call them in the main method.
    Note If the job is a Python API job, upload the Python job file first. By default, after you upload the Python job file, the file is located in the /flink/usrlib/ directory of the node that runs the job.

    If the Python job file is named word_count.py, set Entrypoint main args to -py /flink/usrlib/word_count.py.

    Enter a full path of the Python job file. You cannot omit or change /flink/usrlib/.

    Additional Dependencies
    • (Recommended) Select the dependency file that you uploaded.

      To upload a dependency file, click Upload Artifacts in the upper-left corner of the Artifacts page or click the Update a JAR file icon on the right side of Additional Dependencies on the Advanced tab of the Draft Editor page. The uploaded dependency file is saved in the oss://ossBucketName/artifacts/namespaces/namespaceName/* directory.

    • Enter the OSS bucket where the required dependency file is stored.

      You must upload the dependency file to the OSS bucket that corresponds to the current instance in advance. The OSS bucket to which the dependency file is uploaded must be the OSS bucket that you selected to activate fully managed Flink.

    • Enter the URL of the required dependency file. Only URLs that end with file names are supported, such as http://xxxxxx/file.

      You must upload the dependency file to the publicly accessible HTTP service in advance.

    Note
    • Only Per-Job clusters support the Additional Dependencies parameter.
    • When a job is running, the dependency files that are uploaded by using the preceding methods are loaded to the /flink/usrlib directory of the Pod in which JobManager and TaskManager reside.
    • The directory of dependency files cannot be configured in jobs for session clusters.
    • If the job is a Python API job, select a Python job file and the dependencies in the Additional Dependencies field. For more information about Python dependencies, see Manage Python dependencies. After you upload a Python dependency, the dependency is automatically uploaded to the /flink/usrlib/ directory of the node that runs the job.
    Parallelism The number of jobs that run in parallel.
  3. On the right side of the Draft Editor page, click the Advanced tab and configure the parameters based on your business requirements.
    The following table describes the parameters.
    Section Parameter Description
    Configuration Engine Version Valid values: vvr-4.0.7-flink-1.13 and vvr-3.0.3-flink-1.12.
    Edit Labels You can specify labels for jobs. This way, you can easily search for jobs by using the labels.
    Behavior Max Job Creation Attempts The number of retries allowed after the instance fails to be created.
    Stop with Drain If you turn on Stop With Drain, all event time-based windows are triggered when you manually stop a job.
    Flink Configuration Checkpointing Interval The interval at which a checkpoint is scheduled to run. If you leave this parameter empty, the checkpointing feature is disabled.
    Min Time Between Checkpoints The minimum interval between two checkpoints. If the maximum degree of parallelism between checkpoints is 1, this parameter specifies the minimum interval between two checkpoints.
    Enabled Unaligned Checkpoints If you turn on Enabled Unaligned Checkpoints, the running time of a checkpoint is significantly reduced when backpressure exists. However, the state size of a single checkpoint increases.
    Flink Restart Strategy Configuration If a task fails and the checkpointing feature is disabled, JobManager cannot be restarted. If the checkpointing feature is enabled, JobManager is restarted. Valid values:
    • Failure Rate: JobManager is restarted if the number of failures within the specified interval of time exceeds the upper limit.
    • Fixed Delay: JobManager is restarted at a fixed interval.
    • No Restarts: JobManager is not restarted. This is the default value.
    Additional Configuration Other Flink settings, such as taskmanager.numberOfTaskSlots: 1.
    Resources Job Manager CPUs Default value: 1.
    Job Manager Memory Minimum value: 1 GiB. We recommend that you use GiB or MiB as the unit. For example, you can set this parameter to 1024 MiB or 1.5 GiB.
    Task Manager CPUs Default value: 1.
    Task Manager Memory Minimum value: 1 GiB. We recommend that you use GiB or MiB as the unit. For example, you can set this parameter to 1024 MiB or 1.5 GiB.
    Logging Root Log Level Valid values: TRACE, DEBUG, INFO, WARN, and ERROR.
    Log Levels Enter the log name and log level.
    Logging Profile The log template. You can use the system template or configure a custom template.
  4. In the upper-right corner of the Draft Editor page, click Publish.