All Products
Search
Document Center

E-MapReduce:Submit a Flink job

Last Updated:May 17, 2023

This topic describes how to submit a Flink job.

Prerequisites

A Flink cluster is created on the EMR on ACK page of the new Alibaba Cloud E-MapReduce (EMR) console. For more information, see Getting started.

Method 1: Use the ACK console

  1. Log on to the EMR on ACK console.

  2. On the EMR on ACK page, find the specified EMR cluster and click the link in the ACK Cluster column.

  3. In the upper-right corner of the Pods page, click Create from YAML.

  4. On the Create page, select Custom from the Sample Template drop-down list, add the following code to the editor, and then click Create.

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: basic-emr-example
    spec:
      flinkVersion: v1_13
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: file:///flink-data/flink-savepoints
        state.checkpoints.dir: file:///flink-data/flink-checkpoints
      serviceAccount: flink
      podTemplate:
        spec:
          serviceAccount: flink
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
    
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: stateless
    Note

    In this example, Flink 1.13 is used. If you use Flink of another version, configure the flinkVersion parameter based on the version information in the EMR console.

Method 2: Use kubectl

  1. Connect to an Alibaba Cloud Container Service for Kubernetes (ACK) cluster by using kubectl. For more information, see Obtain the kubeconfig file of a cluster and use kubectl to connect to the cluster.

    You can also connect to the ACK cluster by calling an API operation. For more information, see Use the Kubernetes API.

  2. Create a file named basic-emr-example.yaml. The file contains the following information:

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: basic-emr-example
    spec:
      flinkVersion: v1_13
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: file:///flink-data/flink-savepoints
        state.checkpoints.dir: file:///flink-data/flink-checkpoints
      serviceAccount: flink
      podTemplate:
        spec:
          serviceAccount: flink
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
    
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: stateless
    Note
    • You can change the name of the file. In this example, basic-emr-example.yaml is used.

    • In this example, Flink 1.13 is used. If you use another version of Flink, configure the flinkVersion parameter based on your business requirements.

  3. Run the following command to submit the Flink job:

    kubectl apply -f basic-emr-example.yaml -namespace <Namespace in which the cluster resides>
    Note

    Replace <Namespace in which the cluster resides> with the namespace based on your business requirements. To view the namespace, log on to the EMR console and go to the Cluster Details tab.