Alibaba Cloud E-MapReduce (EMR) allows you to elastically schedule Spark jobs by using Elastic Container Instance. This way, you can create pods as required without being limited by the computing capabilities of Container Service for Kubernetes (ACK) clusters. This effectively reduces computing costs. This topic describes how to use Elastic Container Instance to elastically schedule Flink jobs.
Background information
To use more advanced features of Elastic Container Instance, you can add more pod annotations to configure parameters as required. For more information, see Pod annotations.
Prerequisites
A Flink cluster is created on the EMR on ACK page of the new EMR console. For more information, see Getting started.
Elastic Container Instance is activated. For more information, see Getting started with Elastic Container Instance.
Procedure
Install virtual nodes that are required by Elastic Container Instance in an ACK cluster. For more information, see the "Step 1: Deploy ack-virtual-node in ACK clusters" section in Step 1: Deploy ack-virtual-node in ACK clusters.
When you submit a Flink job in a Flink cluster created on the EMR on ACK page, you can configure a pod label or a pod annotation to use Elastic Container Instance to schedule the Flink job.
For more information, see Submit a Flink job.
NoteIn this example, Flink 1.13 for EMR-3.41.0-ack is used. If you use Flink of another version, modify the
flinkVersionparameter. For more information about the parameters of Flink, see Flink Kubernetes Operator.Method 1: Configure a pod label
Set the alibabacloud.com/eci parameter to true to schedule specific pods to Elastic Container Instance. Sample code:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-emr-example spec: flinkVersion: v1_13 flinkConfiguration: state.savepoints.dir: file:///flink-data/flink-savepoints state.checkpoints.dir: file:///flink-data/flink-checkpoints metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory # Configure pod labels to grant permissions on Elastic Container Instance to the JobManager and TaskManager of Flink. kubernetes.jobmanager.labels: alibabacloud.com/eci:true kubernetes.taskmanager.labels: alibabacloud.com/eci:true # Optional. Enable the image cache feature of Elastic Container Instance to improve performance. kubernetes.jobmanager.annotations: k8s.aliyun.com/eci-image-cache:true kubernetes.taskmanager.annotations: k8s.aliyun.com/eci-image-cache:true serviceAccount: flink podTemplate: metadata: annotations: prometheus.io/path: /metrics prometheus.io/port: "9249" prometheus.io/scrape: "true" spec: serviceAccount: flink containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume ports: - containerPort: 9249 name: metrics protocol: TCP volumes: - name: flink-volume emptyDir: {} jobManager: replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: statelessMethod 2: Configure a pod annotation
Set the alibabacloud.com/burst-resource parameter to eci to schedule specific pods to Elastic Container Instance. Valid values of the alibabacloud.com/burst-resource parameter:
eci: Elastic Container Instance is used when resources on the regular nodes of a cluster are insufficient.
eci_only: Only Elastic Container Instance is used.
Sample code:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-emr-example spec: flinkVersion: v1_13 flinkConfiguration: state.savepoints.dir: file:///flink-data/flink-savepoints state.checkpoints.dir: file:///flink-data/flink-checkpoints metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory # Configure a pod annotation to allow executors to use Elastic Container Instance when resources on regular nodes of the cluster are insufficient. kubernetes.jobmanager.annotations: alibabacloud.com/burst-resource:eci_only kubernetes.taskmanager.annotations: alibabacloud.com/burst-resource:eci_only # Optional. Enable the image cache feature of Elastic Container Instance to improve performance. kubernetes.jobmanager.annotations: k8s.aliyun.com/eci-image-cache:true kubernetes.taskmanager.annotations: k8s.aliyun.com/eci-image-cache:true serviceAccount: flink podTemplate: metadata: annotations: prometheus.io/path: /metrics prometheus.io/port: "9249" prometheus.io/scrape: "true" spec: serviceAccount: flink containers: - name: flink-main-container volumeMounts: - mountPath: /flink-data name: flink-volume ports: - containerPort: 9249 name: metrics protocol: TCP volumes: - name: flink-volume emptyDir: {} jobManager: replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 2 upgradeMode: statelessMethod 3: Use Elastic Container Instance pods to submit Flink jobs by default
Go to the flink-conf.yaml tab.
Log on to the EMR on ACK console.
On the EMR on ACK page, find the Flink cluster that you want to configure and click Configure in the Actions column.
On the Configure tab, click the flink-conf.yaml tab.
Grant the permissions on Elastic Container Instance to the Flink cluster.
On the flink-conf.yaml tab, click Add Configuration Item.
In the Add Configuration Item dialog box, add the configuration items described in the following table.
Configuration item
Description
kubernetes.jobmanager.annotations
The value is alibabacloud.com/burst-resource:eci_only,k8s.aliyun.com/eci-image-cache:true.
kubernetes.taskmanager.annotations
The value is alibabacloud.com/burst-resource:eci_only,k8s.aliyun.com/eci-image-cache:true.
Click OK.
In the Save dialog box, configure the Execution Reason parameter and then click Save.
Deploy the configuration items.
In the lower part of the Configure tab, click Deploy Client Configuration.
In the dialog box that appears, configure the Execution Reason parameter and click OK.
In the Confirm message, click OK.
Wait for approximately 30 minutes until the configurations take effect. The modified configurations are applied to newly submitted Flink jobs.