All Products
Search
Document Center

E-MapReduce:Use Elastic Container Instance to elastically schedule Flink jobs

Last Updated:May 17, 2023

Alibaba Cloud E-MapReduce (EMR) allows you to elastically schedule Spark jobs by using Elastic Container Instance. This way, you can create pods as required without being limited by the computing capabilities of Container Service for Kubernetes (ACK) clusters. This effectively reduces computing costs. This topic describes how to use Elastic Container Instance to elastically schedule Flink jobs.

Background information

To use more advanced features of Elastic Container Instance, you can add more pod annotations to configure parameters as required. For more information, see Pod annotations.

Prerequisites

Procedure

  1. Install virtual nodes that are required by Elastic Container Instance in an ACK cluster. For more information, see the "Step 1: Deploy ack-virtual-node in ACK clusters" section in Step 1: Deploy ack-virtual-node in ACK clusters.

  2. When you submit a Flink job in a Flink cluster created on the EMR on ACK page, you can configure a pod label or a pod annotation to use Elastic Container Instance to schedule the Flink job.

    For more information, see Submit a Flink job.

    Note

    In this example, Flink 1.13 for EMR-3.41.0-ack is used. If you use Flink of another version, modify the flinkVersion parameter. For more information about the parameters of Flink, see Flink Kubernetes Operator.

    • Method 1: Configure a pod label

      Set the alibabacloud.com/eci parameter to true to schedule specific pods to Elastic Container Instance. Sample code:

      
      apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment
      metadata:
        name: basic-emr-example
      spec:
        flinkVersion: v1_13
        flinkConfiguration:
          state.savepoints.dir: file:///flink-data/flink-savepoints
          state.checkpoints.dir: file:///flink-data/flink-checkpoints
          metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
          # Configure pod labels to grant permissions on Elastic Container Instance to the JobManager and TaskManager of Flink. 
          kubernetes.jobmanager.labels: alibabacloud.com/eci:true
          kubernetes.taskmanager.labels: alibabacloud.com/eci:true
          # Optional. Enable the image cache feature of Elastic Container Instance to improve performance. 
          kubernetes.jobmanager.annotations: k8s.aliyun.com/eci-image-cache:true
          kubernetes.taskmanager.annotations: k8s.aliyun.com/eci-image-cache:true
        serviceAccount: flink
        podTemplate:
          metadata:
            annotations:
              prometheus.io/path: /metrics
              prometheus.io/port: "9249"
              prometheus.io/scrape: "true"
          spec:
            serviceAccount: flink
            containers:
              - name: flink-main-container
                volumeMounts:
                  - mountPath: /flink-data
                    name: flink-volume
                ports:
                  - containerPort: 9249
                    name: metrics
                    protocol: TCP
            volumes:
              - name: flink-volume
                emptyDir: {}
      
        jobManager:
          replicas: 1
          resource:
            memory: "2048m"
            cpu: 1
        taskManager:
          resource:
            memory: "2048m"
            cpu: 1
      
        job:
          jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
          parallelism: 2
          upgradeMode: stateless
      
                                          
    • Method 2: Configure a pod annotation

      Set the alibabacloud.com/burst-resource parameter to eci to schedule specific pods to Elastic Container Instance. Valid values of the alibabacloud.com/burst-resource parameter:

      • eci: Elastic Container Instance is used when resources on the regular nodes of a cluster are insufficient.

      • eci_only: Only Elastic Container Instance is used.

      Sample code:

      
      apiVersion: flink.apache.org/v1beta1
      kind: FlinkDeployment
      metadata:
        name: basic-emr-example
      spec:
        flinkVersion: v1_13
        flinkConfiguration:
          state.savepoints.dir: file:///flink-data/flink-savepoints
          state.checkpoints.dir: file:///flink-data/flink-checkpoints
          metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
          # Configure a pod annotation to allow executors to use Elastic Container Instance when resources on regular nodes of the cluster are insufficient. 
          kubernetes.jobmanager.annotations: alibabacloud.com/burst-resource:eci_only
          kubernetes.taskmanager.annotations: alibabacloud.com/burst-resource:eci_only
          # Optional. Enable the image cache feature of Elastic Container Instance to improve performance. 
          kubernetes.jobmanager.annotations: k8s.aliyun.com/eci-image-cache:true
          kubernetes.taskmanager.annotations: k8s.aliyun.com/eci-image-cache:true
        serviceAccount: flink
        podTemplate:
          metadata:
            annotations:
              prometheus.io/path: /metrics
              prometheus.io/port: "9249"
              prometheus.io/scrape: "true"
          spec:
            serviceAccount: flink
            containers:
              - name: flink-main-container
                volumeMounts:
                  - mountPath: /flink-data
                    name: flink-volume
                ports:
                  - containerPort: 9249
                    name: metrics
                    protocol: TCP
            volumes:
              - name: flink-volume
                emptyDir: {}
      
        jobManager:
          replicas: 1
          resource:
            memory: "2048m"
            cpu: 1
        taskManager:
          resource:
            memory: "2048m"
            cpu: 1
      
        job:
          jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
          parallelism: 2
          upgradeMode: stateless                              
    • Method 3: Use Elastic Container Instance pods to submit Flink jobs by default

      1. Go to the flink-conf.yaml tab.

        1. Log on to the EMR on ACK console.

        2. On the EMR on ACK page, find the Flink cluster that you want to configure and click Configure in the Actions column.

        3. On the Configure tab, click the flink-conf.yaml tab.

      2. Grant the permissions on Elastic Container Instance to the Flink cluster.

        1. On the flink-conf.yaml tab, click Add Configuration Item.

        2. In the Add Configuration Item dialog box, add the configuration items described in the following table.

          Configuration item

          Description

          kubernetes.jobmanager.annotations

          The value is alibabacloud.com/burst-resource:eci_only,k8s.aliyun.com/eci-image-cache:true.

          kubernetes.taskmanager.annotations

          The value is alibabacloud.com/burst-resource:eci_only,k8s.aliyun.com/eci-image-cache:true.

        3. Click OK.

        4. In the Save dialog box, configure the Execution Reason parameter and then click Save.

      3. Deploy the configuration items.

        1. In the lower part of the Configure tab, click Deploy Client Configuration.

        2. In the dialog box that appears, configure the Execution Reason parameter and click OK.

        3. In the Confirm message, click OK.

          Wait for approximately 30 minutes until the configurations take effect. The modified configurations are applied to newly submitted Flink jobs.