This topic describes how to make preparations for migrating Flink jobs from a self-managed cluster to an E-MapReduce (EMR) Dataflow cluster and how to perform the migration. This topic also provides answers to frequently asked questions about the migration.
Background information
For more information about how to view the basic information about an EMR cluster, see Cluster planning.
Preparations
Cluster planning
When you create a Dataflow cluster, you can select vCPU and memory resources that have the same specifications as those in the source cluster for the core instances in the Dataflow cluster. After the Flink jobs are migrated, you can adjust the specifications based on the status of the Flink jobs that are run in the Dataflow cluster. You can select the type of an Elastic Compute Service (ECS) instance based on the following requirements: whether you want to use local disks and whether you want to use a large physical machine.
The specifications of vCPU and memory resources that you can specify for a master instance vary based on the number of compute units (CUs) allowed in a cluster. The following table describes the mappings between the specifications of vCPU and memory resources of master instances and the maximum number of CUs allowed in clusters.
Master instance specification | Maximum number of CUs allowed in a cluster |
4 vCores and 16 GiB of memory | 80 CUs |
8 vCores and 32 GiB of memory | 160 CUs |
16 vCores and 64 GiB of memory | 800 CUs |
24 vCores and 96 GiB of memory | 800 CUs |
Network connectivity
Before you migrate Flink jobs from a cluster to the Dataflow cluster that you create in the EMR console, make sure that the machine on which the client you use to submit the Flink jobs is deployed can access the Dataflow cluster.
You can select a network connectivity method based on the type and network of the source cluster:
Self-managed cluster in a data center: You can use Alibaba Cloud Express Connect to connect your on-premises data center to the virtual private cloud (VPC) in which the EMR Dataflow cluster is deployed.
Self-managed cluster that is hosted on ECS instances: You can select one of the following network connectivity methods. The VPC service logically isolates VPCs for users. We recommend that you deploy the source cluster in a VPC.
If the source cluster is deployed on the classic network, make sure the classic network can be connected to the VPC in which the EMR Dataflow cluster is deployed. Alibaba Cloud provides two types of private networks: the classic network and VPCs. EMR clusters are deployed in VPCs, but the business systems of many users are deployed in the classic network. Alibaba Cloud provides the ClassicLink feature to allow you to connect the two types of networks. For more information, see Create a ClassicLink connection.
If the source cluster is deployed in a VPC, make sure that the EMR Dataflow cluster is deployed in the same zone as the source cluster.
Configuration of a Hadoop environment
The EMR Dataflow cluster is deployed based on YARN. If the client that you use to submit the Flink jobs is not deployed in the EMR Dataflow cluster, you must configure the Hadoop settings and relevant environment variables for the machine on which the client is deployed.
Verify the migration environment
The environment of the source cluster may be different from the environment of the EMR Dataflow cluster. For example, the Java Development Kit (JDK) versions may be different. To ensure that the Flink jobs can be successfully migrated, we recommend that you check whether your migration plan can work as expected before you migrate the Flink jobs. You can use one of the following methods to perform the check:
Before you migrate the Flink jobs, migrate several test jobs to check whether the entire migration process is complete.
Migrate the jobs with low priorities first. If the migration is successful, migrate the jobs with high priorities. If permitted, you can continue to run a job in the source cluster after you migrate the job. Stop the job in the source cluster only after you perform a successful trial run for the job in the EMR Dataflow cluster.
Migrate and run jobs
Job type | Description |
Checkpoint file | Copy a checkpoint file to the Hadoop Distributed File System (HDFS) service of the Dataflow cluster or upload a checkpoint file to Object Storage Service (OSS). You can configure the -s parameter to specify a checkpoint file when you use a client to submit Flink jobs. This way, you can use the checkpoint file in the Dataflow cluster to resume the Flink jobs. Important For DataStream jobs, the state data for open source Flink is fully compatible with the state data for Flink Ververica Runtime (VVR). However, for SQL jobs, the compatibility between the state data for open source Flink and that for Flink (VVR) cannot be ensured because Flink (VVR) has been optimized based on open source Flink. You cannot use the checkpoint file that is generated in open source Flink to resume the jobs for which state data is incompatible. In this case, you can rerun the jobs in Flink (VVR). You can also resume the jobs in open source Flink. |
Flink jobs that use the same Flink version | Submit Flink jobs to the Dataflow cluster in the same way as you submit the jobs to the source cluster. |
Flink jobs that use different Flink versions | You may use a different Flink version for the Flink jobs in the source cluster, such as Flink 1.9 or Flink 1.12. You can use one of the following methods to make Flink jobs of different Flink versions in the source cluster run in the Dataflow cluster:
|
Integrate a Dataflow cluster with a self-managed big data platform
You can integrate a Dataflow cluster with your self-managed big data platform.
Manage resources and perform O&M operations on resources
A Dataflow cluster schedules and manages resources based on YARN. Therefore, you need to only integrate the Dataflow cluster with the self-managed big data platform in the same way as you integrate a YARN cluster with the platform. You can configure resources for YARN queues based on your business requirements. Then, you can use the RESTful API of YARN to query the YARN job status and perform O&M.
View logs
You can go to the Flink web UI to view the jobs that are running. For more information, see Basic usage.
Flink provides HistoryServer that you can use to view the status of complete jobs. You can also run commands that are provided by YARN, such as the
yarn logs -applicationId application_xxxx_yyyy
command, to access the logs of complete jobs.NoteBy default, the logs of HistoryServer are stored in the hdfs:///flink/flink-jobs/ directory of the HDFS service of the Dataflow cluster. YARN logs are stored in the hdfs:///tmp/logs/$USERNAME/logs/ directory of the HDFS service of the Dataflow cluster.
Monitor metrics
If your self-managed big data platform provides a monitoring and alerting system, you can configure a metric reporter when you configure the settings of a Flink job. In addition, you can view metrics of Flink jobs in the Dataflow cluster on the Monitoring page of EMR.
Configure alerts
In addition to your alerting system, you can also use CloudMonitor provided by Alibaba Cloud to configure alert rules and notification methods such as email addresses and DingTalk group messages. For more information about how to create an alert rule in CloudMonitor, see Create a threshold-triggered alert rule.
FAQ
Q: What do I do if the JDK version of the source cluster and the JDK version of my Dataflow cluster are different?
A: A Dataflow cluster uses OpenJDK. If the jobs in the source cluster use Oracle JDK and some proprietary features of Oracle JDK, the inconsistency between the JDK versions may cause specific classes, such as javafx.util.Pair, to be unavailable when you run the jobs. In this case, you must not explicitly use dependencies that are available only in Oracle JDK in the jobs.
Q: Can different Flink versions be supported in a Dataflow cluster?
A: Yes, different Flink versions can be supported in a Dataflow cluster. You can submit Flink jobs to the Dataflow cluster by using different submission modes such as the Per-job mode and the Application mode. The Dataflow cluster is deployed based on YARN. If the yarn.provided.lib.dirs parameter is not configured, Flink Runtime that is used when Flink jobs are run in the YARN cluster is Flink, such as open source Flink 1.13, that is used by the client to submit Flink jobs. You can use one of the following methods to submit jobs of a specific Flink version:
Directly submit jobs of a specific Flink version and do not configure the yarn.provided.lib.dirs parameter.
Configure the yarn.provided.lib.dirs parameter to specify Flink Runtime. In addition, we recommend that you submit jobs in Per-job mode for the compatibility consideration in this scenario.