All Products
Search
Document Center

E-MapReduce:FAQ

Last Updated:Mar 26, 2026

This topic answers frequently asked questions about Dataflow clusters.

Cluster operations and O&M

Job issues

Errors

How do I submit Flink jobs from an external client? {#submit-from-external-client}

  1. Make sure the external client can reach the Dataflow cluster network.

  2. Set up a Hadoop YARN environment on the client. Copy the following directories from the Dataflow cluster to the client: Then configure the environment variables on the client:

    • /opt/apps/YARN/yarn-current — the Hadoop YARN installation

    • /etc/taihao-apps/hadoop-conf/ — the Hadoop configuration files

    Important

    Hadoop configuration files such as yarn-site.xml use fully qualified domain names (FQDNs) as service addresses — for example, master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com. Make sure these FQDNs can be resolved from the client, or replace them with IP addresses. See How do I resolve hostnames from an external client?.

    export HADOOP_HOME=/path/to/yarn-current && \
    export PATH=${HADOOP_HOME}/bin/:$PATH && \
    export HADOOP_CLASSPATH=$(hadoop classpath) && \
    export HADOOP_CONF_DIR=/path/to/hadoop-conf
  3. Submit a Flink job. For example:

    flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar

    After submission, view the job on the YARN web UI of the Dataflow cluster.

How do I resolve hostnames from an external client? {#resolve-hostnames}

Use one of the following methods:

  • Edit `/etc/hosts`: Add mappings between the cluster hostnames and their IP addresses.

  • Use [Alibaba Cloud DNS PrivateZone](https://www.alibabacloud.com/help/en/document_detail/64611.html#topic-2036614): Configure private DNS resolution for the cluster's domain.

  • Use a custom DNS service: Add the following JVM parameter to your Flink configuration:

    env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"

How do I view Flink job status? {#view-job-status}

Three options are available:

  • EMR console: E-MapReduce (EMR) supports Apache Knox, which provides access to the YARN and Flink web UIs over the internet. Go to the Apache Flink Dashboard from the YARN web UI. For details, see View the job status on the web UI of Flink (VVR).

  • SSH tunnel: For details, see Create an SSH tunnel to access web UIs of open source components.

  • YARN RESTful API:

    Ports 8443 and 8088 must be open in your security group, or the client and the Dataflow cluster must be in the same virtual private cloud (VPC).
    curl --compressed -v -H "Accept: application/json" -X GET \
      "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"

How do I view Flink job logs? {#view-job-logs}

  • Running job: View logs on the job's web UI.

  • Completed job: View statistics on Flink HistoryServer, or run:

    yarn logs -applicationId application_xxxx_yyyy

    By default, logs for completed jobs are stored in hdfs:///tmp/logs/$USERNAME/logs/ on the Hadoop Distributed File System (HDFS).

How do I access Flink HistoryServer? {#access-historyserver}

Flink HistoryServer runs on port 18082 of the master-1-1 node (the first node in the master server group). It collects statistics on completed Flink jobs but does not store job logs.

To access it:

  1. Open port 18082 in your security group rules for the master-1-1 node.

  2. Navigate to http://<master-1-1-ip>:18082.

To view logs for completed jobs, call the YARN API or use the YARN web UI instead.

How do I use commercial connectors? {#use-commercial-connectors}

This feature is available only in Dataflow clusters. Available connectors include Hologres, Log Service, MaxCompute, DataHub, Elasticsearch, and ClickHouse. The following example uses the Hologres connector.

Step 1: Install the connector on your local machine

The connector JAR packages are stored in /opt/apps/FLINK/flink-current/opt/connectors/ on the Dataflow cluster. Install the JAR to your local Maven repository:

mvn install:install-file \
  -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar \
  -DgroupId=com.alibaba.ververica \
  -DartifactId=ververica-connector-hologres \
  -Dversion=1.13-vvr-4.0.7 \
  -Dpackaging=jar

Add the dependency to your pom.xml with scope set to provided:

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>1.13-vvr-4.0.7</version>
    <scope>provided</scope>
</dependency>

Step 2: Make the connector available at runtime

Choose one of the following methods:

  • Method 1 — HDFS: Copy the connector JAR to HDFS and reference it when submitting:

    hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/
    hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar \
      hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar

    Add to your submit command:

    -D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
  • Method 2 — Local client: Copy the JAR to the same path on the submit client as it exists in the cluster:

    /opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar

    Add to your submit command:

    -C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
  • Method 3 — Bundle into job JAR: Package the connector JAR directly into your job JAR.

How do I use GeminiStateBackend? {#use-gemini-state-backend}

GeminiStateBackend is available only in Dataflow clusters. It is an enterprise-edition state backend with 3–5x the performance of open source state backends, and it is enabled by default in Dataflow cluster configuration files.

For advanced configuration options, see Configurations of GeminiStateBackend.

How do I switch to an open source state backend? {#use-open-source-state-backend}

Dataflow clusters use GeminiStateBackend by default. To switch to an open source state backend such as RocksDB for a specific job, pass the -D parameter when submitting:

flink run-application -t yarn-application \
  -D state.backend=rocksdb \
  /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

To apply the change to all subsequent jobs, update the state.backend parameter in the EMR console: go to the Configure tab of the Flink service page, click flink-conf.yaml, set the value, click Save, and then click Deploy Client Configuration.

Configure parameters

How do I enable JobManager high availability? {#enable-jobmanager-ha}

In a Dataflow cluster, Flink runs on YARN. Enable high availability (HA) for JobManager by configuring ZooKeeper-based HA as described in the Apache Flink HA configuration.

Add the following to flink-conf.yaml:

high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
Important

By default, after HA is enabled, JobManager can only be restarted once per failure. To allow more restarts, configure the yarn.resourcemanager.am.max-attempts parameter for YARN and the yarn.application-attempts parameter for Flink.

To prevent JobManager from restarting too frequently, increase yarn.application-attempt-failures-validity-interval from its default of 10000 (10 seconds) to 300000 (5 minutes).

How do I view Flink job metrics? {#view-job-metrics}

  1. Log on to the EMR console, navigate to the Monitoring tab of your cluster, and click Metric Monitoring.

  2. Select FLINK from the Dashboard drop-down list.

  3. Select the application ID and job ID to view metrics for that job.

Application IDs and job IDs appear only when Flink jobs are actively running.
Some metrics, such as sourceIdleTime, require both source and sink to be configured before they report data.

How do I troubleshoot upstream and downstream storage issues? {#troubleshoot-storage}

See Upstream and downstream storage.

Where are client logs stored? {#client-logs}

The FLINK_LOG_DIR environment variable specifies the client log directory. The default is /var/log/taihao-apps/flink. In EMR versions earlier than V3.43.0, the default is /mnt/disk1/log/flink.

Why don't my flink run parameters take effect? {#parameters-not-taking-effect}

Job parameters must be placed after the JAR file path in the command. For example:

flink run -d -t yarn-per-job test.jar arg1 arg2

Parameters placed before the JAR are treated as Flink framework options, not job arguments.

Where are cluster logs stored? {#cluster-logs}

How you access logs depends on whether JobManager is running:

  • JobManager has stopped: Pull logs with:

    yarn logs -applicationId application_xxxx_yy

    Or access the log link for the completed job on the YARN web UI.

  • JobManager is running:

    • View logs on the Flink job web UI, or

    • Retrieve JobManager logs: ``bash yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log ``

    • Retrieve TaskManager logs: ``bash yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log ``

NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict) {#jar-conflict}

These errors indicate a JAR conflict between the job's dependencies and the Flink installation on the cluster. To resolve:

  1. Identify the conflicting class: Check the error logs for the class name, then run the following in the directory containing your pom.xml to inspect the dependency tree:

    mvn dependency:tree
  2. Resolve the conflict using one of these approaches:

    • Change the scope of the conflicting dependency to provided in pom.xml.

    • Exclude the specific class from the dependency.

    • Use Maven Shade Plugin to relocate (shade) the class.

  3. Confirm which JAR a class is loaded from: Add the following JVM parameter to flink-conf.yaml or pass it dynamically:

    # flink-conf.yaml
    env.java.opts: -verbose:class

    Or pass it dynamically:

    -Denv.java.opts="-verbose:class"

    Class loading information is recorded in jobmanager.out or taskmanager.out.

Multiple factories for identifier found in the classpath {#multiple-factories-error}

This error means multiple implementations of the same connector are on the classpath — typically because a connector dependency is declared in the job's JAR and the connector JAR is also manually placed in $FLINK_HOME/lib.

Remove the duplicate. See NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict) for how to identify and resolve classpath conflicts.

UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS {#oss-error-1}

Dataflow clusters use the built-in JindoSDK for password-free access to Object Storage Service (OSS), which already supports APIs such as StreamingFileSink. Adding the community OSS plugin on top of JindoSDK causes a dependency conflict.

Check whether the oss-fs-hadoop directory exists in $FLINK_HOME/plugins. If it does, delete it and resubmit the job.

Could not find a file system implementation for scheme 'oss' {#section_1b6bc757}

This error affects clusters running EMR V3.40 or earlier, where Jindo-related JAR packages may be missing on nodes other than master-1-1.

EMR V3.40.0 and earlier

Check whether jindo-flink-4.0.0-full.jar (or similar) exists in $FLINK_HOME/lib on the node you use to submit jobs. If it is missing, copy it:

cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib

Then resubmit the job.

EMR versions later than V3.40.0

Deployment modeAction required
Flink on YARNNone. OSS access is handled automatically even without Jindo JARs in $FLINK_HOME/lib.
Other modesCheck for Jindo JARs in $FLINK_HOME/lib. If missing, run the copy command above and resubmit.

java.util.concurrent.TimeoutException: Heartbeat of TaskManager timed out {#taskmanager-heartbeat-timeout}

The direct cause is a TaskManager heartbeat timeout. Check the TaskManager logs for the underlying error — a common root cause is an out of memory (OOM) error due to insufficient heap memory or a memory leak in the job code.

If OOM is the cause, increase the TaskManager memory allocation or analyze the job's memory usage. See java.lang.OutOfMemoryError: GC overhead limit exceeded for steps to generate and analyze a heap dump.

java.lang.OutOfMemoryError: GC overhead limit exceeded {#gc-overhead-limit}

Garbage collection (GC) is timing out because the job has insufficient memory. The most common causes are memory leaks in user-defined functions (UDFs) or insufficient memory for the workload.

To diagnose, generate a heap dump when the error occurs. Pass the JVM parameter when resubmitting:

-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"

Or add it to flink-conf.yaml:

env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof

After reproducing the error, analyze the heap dump at the path specified by HeapDumpPath using Memory Analyzer Tool (MAT) or Java VisualVM.

java.lang.NoSuchFieldError: DEPLOYMENT_MODE {#deployment-mode-error}

The job JAR includes a flink-core dependency whose version is incompatible with the Flink version on the cluster.

Add flink-core to pom.xml with scope set to provided:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <!-- Replace with the Flink version used by your cluster -->
  <version>1.16.1</version>
  <scope>provided</scope>
</dependency>

For background on how incompatible dependencies get introduced, see NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict).

Why is only one operator shown and Records Received = 0? {#single-operator-zero-records}

This is expected behavior. The Records Received metric counts data exchanged between different operators. When Flink optimizes a job into a single operator (chaining all operators together), there is no inter-operator data exchange, so the value is always 0.

How do I enable a flame graph? {#enable-flame-graph}

Flame graphs visualize CPU usage per method, which helps identify performance bottlenecks. The feature is available since Flink 1.13 but is disabled by default to avoid impacting production jobs.

To enable it, go to the Configure tab on the Flink service page in the EMR console, click flink-conf.yaml, and add:

rest.flamegraph.enabled: true

For details on adding configuration items, see Manage configuration items. For more about flame graphs, see Apache Flink flame graphs.