全部產品
Search
文件中心

E-MapReduce:提交Flink作業

更新時間:May 07, 2026

本文為您介紹如何提交Flink作業。

前提條件

已在E-MapReduce on ACK控制台建立Flink叢集,詳情請參見快速入門

注意事項

在本文的樣本中,JAR檔案已經直接打包在了鏡像中。如果您使用的是自己的JAR包,您可以將其上傳到阿里雲OSS。上傳操作請參見簡單上傳

此時,需要您修改命令中的local:///opt/flink/examples/streaming/StateMachineExample.jar為您OSS上存放JAR包的真實路徑,路徑格式為oss://<yourBucketName>/<path>.jar

方式一:通過ACK控制台提交作業

  1. 登入EMR on ACK

  2. 在EMR on ACK頁面,單擊目的地組群所在行所属 ACK 集群列的連結。

  3. 容器組頁面,單擊右上方的使用YAML建立資源

  4. 创建頁面,從樣本模板列表中,選擇自定义,模板內容請複製以下內容,然後單擊创建

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: basic-emr-example
    spec:
      flinkVersion: v1_13
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: file:///flink-data/flink-savepoints
        state.checkpoints.dir: file:///flink-data/flink-checkpoints
      serviceAccount: flink
      podTemplate:
        spec:
          serviceAccount: flink
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
    
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: stateless
    說明

    本文以Flink 1.13版本為例,其他版本時請修改flinkVersion的配置,具體版本以控制台為準。

方式二:通過kubectl工具提交作業

  1. 通過kubectl串連Kubernetes叢集,詳情請參見擷取叢集KubeConfig並通過kubectl工具串連叢集

    您也可以通過API等方式串連Kubernetes叢集,詳情請參見使用Kubernetes API

  2. 建立basic-emr-example.yaml檔案,檔案內容如下。

    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: basic-emr-example
    spec:
      flinkVersion: v1_13
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: file:///flink-data/flink-savepoints
        state.checkpoints.dir: file:///flink-data/flink-checkpoints
      serviceAccount: flink
      podTemplate:
        spec:
          serviceAccount: flink
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
    
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: stateless
    說明
    • 檔案名稱您可以自訂,本文以basic-emr-example.yaml為例介紹。

    • 本文以Flink 1.13版本為例,其他版本時請修改flinkVersion的配置。

  3. 執行以下命令,提交作業。

    kubectl apply -f basic-emr-example.yaml -namespace <叢集對應的namespace>
    說明

    本文範例程式碼中的<叢集對應的namespace>,需要替換為叢集的命名空間,您可以登入E-MapReduce on ACK控制台,在叢集管理頁面查看。