All Products
Search
Document Center

E-MapReduce:Migration plans

Last Updated:Nov 20, 2023

This topic describes how to make preparations for migrating Flink jobs from a self-managed cluster to an E-MapReduce (EMR) Dataflow cluster and how to perform the migration. This topic also provides answers to frequently asked questions about the migration.

Background information

For more information about how to view the basic information about an EMR cluster, see Cluster planning.

Preparations

Cluster planning

When you create a Dataflow cluster, you can select vCPU and memory resources that have the same specifications as those in the source cluster for the core instances in the Dataflow cluster. After the Flink jobs are migrated, you can adjust the specifications based on the status of the Flink jobs that are run in the Dataflow cluster. You can select the type of an Elastic Compute Service (ECS) instance based on the following requirements: whether you want to use local disks and whether you want to use a large physical machine.

The specifications of vCPU and memory resources that you can specify for a master instance vary based on the number of compute units (CUs) allowed in a cluster. The following table describes the mappings between the specifications of vCPU and memory resources of master instances and the maximum number of CUs allowed in clusters.

Master instance specification

Maximum number of CUs allowed in a cluster

4 vCores and 16 GiB of memory

80 CUs

8 vCores and 32 GiB of memory

160 CUs

16 vCores and 64 GiB of memory

800 CUs

24 vCores and 96 GiB of memory

800 CUs

Network connectivity

Before you migrate Flink jobs from a cluster to the Dataflow cluster that you create in the EMR console, make sure that the machine on which the client you use to submit the Flink jobs is deployed can access the Dataflow cluster.

You can select a network connectivity method based on the type and network of the source cluster:

  • Self-managed cluster in a data center: You can use Alibaba Cloud Express Connect to connect your on-premises data center to the virtual private cloud (VPC) in which the EMR Dataflow cluster is deployed.

  • Self-managed cluster that is hosted on ECS instances: You can select one of the following network connectivity methods. The VPC service logically isolates VPCs for users. We recommend that you deploy the source cluster in a VPC.

    • If the source cluster is deployed on the classic network, make sure the classic network can be connected to the VPC in which the EMR Dataflow cluster is deployed. Alibaba Cloud provides two types of private networks: the classic network and VPCs. EMR clusters are deployed in VPCs, but the business systems of many users are deployed in the classic network. Alibaba Cloud provides the ClassicLink feature to allow you to connect the two types of networks. For more information, see Create a ClassicLink connection.

    • If the source cluster is deployed in a VPC, make sure that the EMR Dataflow cluster is deployed in the same zone as the source cluster.

Configuration of a Hadoop environment

The EMR Dataflow cluster is deployed based on YARN. If the client that you use to submit the Flink jobs is not deployed in the EMR Dataflow cluster, you must configure the Hadoop settings and relevant environment variables for the machine on which the client is deployed.

Verify the migration environment

The environment of the source cluster may be different from the environment of the EMR Dataflow cluster. For example, the Java Development Kit (JDK) versions may be different. To ensure that the Flink jobs can be successfully migrated, we recommend that you check whether your migration plan can work as expected before you migrate the Flink jobs. You can use one of the following methods to perform the check:

  • Before you migrate the Flink jobs, migrate several test jobs to check whether the entire migration process is complete.

  • Migrate the jobs with low priorities first. If the migration is successful, migrate the jobs with high priorities. If permitted, you can continue to run a job in the source cluster after you migrate the job. Stop the job in the source cluster only after you perform a successful trial run for the job in the EMR Dataflow cluster.

Migrate and run jobs

Job type

Description

Checkpoint file

Copy a checkpoint file to the Hadoop Distributed File System (HDFS) service of the Dataflow cluster or upload a checkpoint file to Object Storage Service (OSS). You can configure the -s parameter to specify a checkpoint file when you use a client to submit Flink jobs. This way, you can use the checkpoint file in the Dataflow cluster to resume the Flink jobs.

Important

For DataStream jobs, the state data for open source Flink is fully compatible with the state data for Flink Ververica Runtime (VVR). However, for SQL jobs, the compatibility between the state data for open source Flink and that for Flink (VVR) cannot be ensured because Flink (VVR) has been optimized based on open source Flink. You cannot use the checkpoint file that is generated in open source Flink to resume the jobs for which state data is incompatible. In this case, you can rerun the jobs in Flink (VVR). You can also resume the jobs in open source Flink.

Flink jobs that use the same Flink version

Submit Flink jobs to the Dataflow cluster in the same way as you submit the jobs to the source cluster.

Flink jobs that use different Flink versions

You may use a different Flink version for the Flink jobs in the source cluster, such as Flink 1.9 or Flink 1.12. You can use one of the following methods to make Flink jobs of different Flink versions in the source cluster run in the Dataflow cluster:

  • For jobs of a specific Flink version, submit the jobs in Per-job mode of YARN.

    In this mode, the Flink version that is used when Flink jobs are executed is the same as the Flink version that is used by the client to submit the Flink jobs. Therefore, you need to only use the specified Flink version for the client to submit the Flink jobs.

    Note

    If you want to use Flink Runtime locally, you do not need to configure the yarn.provided.lib.dirs parameter.

  • If you want to submit jobs of a Flink (VVR) version supported for the Dataflow cluster, you must configure the yarn.provided.lib.dirs parameter to specify VVR that is used in the HDFS service of the Dataflow cluster. For example, you can configure the -D yarn.provided.lib.dirs=hdfs:///flink-current/ parameter to specify VVR that is used in the HDFS service. We recommend that you submit the jobs in Application mode to fully utilize cluster resources.

Integrate a Dataflow cluster with a self-managed big data platform

You can integrate a Dataflow cluster with your self-managed big data platform.

  • Manage resources and perform O&M operations on resources

    A Dataflow cluster schedules and manages resources based on YARN. Therefore, you need to only integrate the Dataflow cluster with the self-managed big data platform in the same way as you integrate a YARN cluster with the platform. You can configure resources for YARN queues based on your business requirements. Then, you can use the RESTful API of YARN to query the YARN job status and perform O&M.

  • View logs

    • You can go to the Flink web UI to view the jobs that are running. For more information, see Basic usage.

    • Flink provides HistoryServer that you can use to view the status of complete jobs. You can also run commands that are provided by YARN, such as the yarn logs -applicationId application_xxxx_yyyy command, to access the logs of complete jobs.

      Note

      By default, the logs of HistoryServer are stored in the hdfs:///flink/flink-jobs/ directory of the HDFS service of the Dataflow cluster. YARN logs are stored in the hdfs:///tmp/logs/$USERNAME/logs/ directory of the HDFS service of the Dataflow cluster.

  • Monitor metrics

    If your self-managed big data platform provides a monitoring and alerting system, you can configure a metric reporter when you configure the settings of a Flink job. In addition, you can view metrics of Flink jobs in the Dataflow cluster on the Monitoring page of EMR.

  • Configure alerts

    In addition to your alerting system, you can also use CloudMonitor provided by Alibaba Cloud to configure alert rules and notification methods such as email addresses and DingTalk group messages. For more information about how to create an alert rule in CloudMonitor, see Create a threshold-triggered alert rule.

FAQ

  • Q: What do I do if the JDK version of the source cluster and the JDK version of my Dataflow cluster are different?

    A: A Dataflow cluster uses OpenJDK. If the jobs in the source cluster use Oracle JDK and some proprietary features of Oracle JDK, the inconsistency between the JDK versions may cause specific classes, such as javafx.util.Pair, to be unavailable when you run the jobs. In this case, you must not explicitly use dependencies that are available only in Oracle JDK in the jobs.

  • Q: Can different Flink versions be supported in a Dataflow cluster?

    A: Yes, different Flink versions can be supported in a Dataflow cluster. You can submit Flink jobs to the Dataflow cluster by using different submission modes such as the Per-job mode and the Application mode. The Dataflow cluster is deployed based on YARN. If the yarn.provided.lib.dirs parameter is not configured, Flink Runtime that is used when Flink jobs are run in the YARN cluster is Flink, such as open source Flink 1.13, that is used by the client to submit Flink jobs. You can use one of the following methods to submit jobs of a specific Flink version:

    • Directly submit jobs of a specific Flink version and do not configure the yarn.provided.lib.dirs parameter.

    • Configure the yarn.provided.lib.dirs parameter to specify Flink Runtime. In addition, we recommend that you submit jobs in Per-job mode for the compatibility consideration in this scenario.