This topic answers frequently asked questions about Dataflow clusters.
Cluster operations and O&M
Job issues
Errors
-
NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict)
-
UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
-
Could not find a file system implementation for scheme 'oss'
-
java.util.concurrent.TimeoutException: Heartbeat of TaskManager timed out
How do I submit Flink jobs from an external client? {#submit-from-external-client}
Make sure the external client can reach the Dataflow cluster network.
Set up a Hadoop YARN environment on the client. Copy the following directories from the Dataflow cluster to the client: Then configure the environment variables on the client:
/opt/apps/YARN/yarn-current— the Hadoop YARN installation/etc/taihao-apps/hadoop-conf/— the Hadoop configuration files
ImportantHadoop configuration files such as
yarn-site.xmluse fully qualified domain names (FQDNs) as service addresses — for example,master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com. Make sure these FQDNs can be resolved from the client, or replace them with IP addresses. See How do I resolve hostnames from an external client?.export HADOOP_HOME=/path/to/yarn-current && \ export PATH=${HADOOP_HOME}/bin/:$PATH && \ export HADOOP_CLASSPATH=$(hadoop classpath) && \ export HADOOP_CONF_DIR=/path/to/hadoop-confSubmit a Flink job. For example:
flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jarAfter submission, view the job on the YARN web UI of the Dataflow cluster.
How do I resolve hostnames from an external client? {#resolve-hostnames}
Use one of the following methods:
Edit `/etc/hosts`: Add mappings between the cluster hostnames and their IP addresses.
Use [Alibaba Cloud DNS PrivateZone](https://www.alibabacloud.com/help/en/document_detail/64611.html#topic-2036614): Configure private DNS resolution for the cluster's domain.
Use a custom DNS service: Add the following JVM parameter to your Flink configuration:
env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"
How do I view Flink job status? {#view-job-status}
Three options are available:
EMR console: E-MapReduce (EMR) supports Apache Knox, which provides access to the YARN and Flink web UIs over the internet. Go to the Apache Flink Dashboard from the YARN web UI. For details, see View the job status on the web UI of Flink (VVR).
SSH tunnel: For details, see Create an SSH tunnel to access web UIs of open source components.
YARN RESTful API:
Ports 8443 and 8088 must be open in your security group, or the client and the Dataflow cluster must be in the same virtual private cloud (VPC).
curl --compressed -v -H "Accept: application/json" -X GET \ "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
How do I view Flink job logs? {#view-job-logs}
Running job: View logs on the job's web UI.
Completed job: View statistics on Flink HistoryServer, or run:
yarn logs -applicationId application_xxxx_yyyyBy default, logs for completed jobs are stored in
hdfs:///tmp/logs/$USERNAME/logs/on the Hadoop Distributed File System (HDFS).
How do I access Flink HistoryServer? {#access-historyserver}
Flink HistoryServer runs on port 18082 of the master-1-1 node (the first node in the master server group). It collects statistics on completed Flink jobs but does not store job logs.
To access it:
Open port 18082 in your security group rules for the
master-1-1node.Navigate to
http://<master-1-1-ip>:18082.
To view logs for completed jobs, call the YARN API or use the YARN web UI instead.
How do I use commercial connectors? {#use-commercial-connectors}
This feature is available only in Dataflow clusters. Available connectors include Hologres, Log Service, MaxCompute, DataHub, Elasticsearch, and ClickHouse. The following example uses the Hologres connector.
Step 1: Install the connector on your local machine
The connector JAR packages are stored in /opt/apps/FLINK/flink-current/opt/connectors/ on the Dataflow cluster. Install the JAR to your local Maven repository:
mvn install:install-file \
-Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar \
-DgroupId=com.alibaba.ververica \
-DartifactId=ververica-connector-hologres \
-Dversion=1.13-vvr-4.0.7 \
-Dpackaging=jarAdd the dependency to your pom.xml with scope set to provided:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>1.13-vvr-4.0.7</version>
<scope>provided</scope>
</dependency>Step 2: Make the connector available at runtime
Choose one of the following methods:
Method 1 — HDFS: Copy the connector JAR to HDFS and reference it when submitting:
hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/ hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar \ hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jarAdd to your submit command:
-D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/Method 2 — Local client: Copy the JAR to the same path on the submit client as it exists in the cluster:
/opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jarAdd to your submit command:
-C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jarMethod 3 — Bundle into job JAR: Package the connector JAR directly into your job JAR.
How do I use GeminiStateBackend? {#use-gemini-state-backend}
GeminiStateBackend is available only in Dataflow clusters. It is an enterprise-edition state backend with 3–5x the performance of open source state backends, and it is enabled by default in Dataflow cluster configuration files.
For advanced configuration options, see Configurations of GeminiStateBackend.
How do I switch to an open source state backend? {#use-open-source-state-backend}
Dataflow clusters use GeminiStateBackend by default. To switch to an open source state backend such as RocksDB for a specific job, pass the -D parameter when submitting:
flink run-application -t yarn-application \
-D state.backend=rocksdb \
/opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jarTo apply the change to all subsequent jobs, update the state.backend parameter in the EMR console: go to the Configure tab of the Flink service page, click flink-conf.yaml, set the value, click Save, and then click Deploy Client Configuration.

How do I enable JobManager high availability? {#enable-jobmanager-ha}
In a Dataflow cluster, Flink runs on YARN. Enable high availability (HA) for JobManager by configuring ZooKeeper-based HA as described in the Apache Flink HA configuration.
Add the following to flink-conf.yaml:
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recoveryBy default, after HA is enabled, JobManager can only be restarted once per failure. To allow more restarts, configure the yarn.resourcemanager.am.max-attempts parameter for YARN and the yarn.application-attempts parameter for Flink.
To prevent JobManager from restarting too frequently, increase yarn.application-attempt-failures-validity-interval from its default of 10000 (10 seconds) to 300000 (5 minutes).
How do I view Flink job metrics? {#view-job-metrics}
Log on to the EMR console, navigate to the Monitoring tab of your cluster, and click Metric Monitoring.
Select FLINK from the Dashboard drop-down list.
Select the application ID and job ID to view metrics for that job.
Application IDs and job IDs appear only when Flink jobs are actively running.
Some metrics, such as sourceIdleTime, require both source and sink to be configured before they report data.How do I troubleshoot upstream and downstream storage issues? {#troubleshoot-storage}
Where are client logs stored? {#client-logs}
The FLINK_LOG_DIR environment variable specifies the client log directory. The default is /var/log/taihao-apps/flink. In EMR versions earlier than V3.43.0, the default is /mnt/disk1/log/flink.
Why don't my flink run parameters take effect? {#parameters-not-taking-effect}
Job parameters must be placed after the JAR file path in the command. For example:
flink run -d -t yarn-per-job test.jar arg1 arg2Parameters placed before the JAR are treated as Flink framework options, not job arguments.
Where are cluster logs stored? {#cluster-logs}
How you access logs depends on whether JobManager is running:
JobManager has stopped: Pull logs with:
yarn logs -applicationId application_xxxx_yyOr access the log link for the completed job on the YARN web UI.
JobManager is running:
View logs on the Flink job web UI, or
Retrieve JobManager logs: ``
bash yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log``Retrieve TaskManager logs: ``
bash yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log``
NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict) {#jar-conflict}
These errors indicate a JAR conflict between the job's dependencies and the Flink installation on the cluster. To resolve:
Identify the conflicting class: Check the error logs for the class name, then run the following in the directory containing your
pom.xmlto inspect the dependency tree:mvn dependency:treeResolve the conflict using one of these approaches:
Change the
scopeof the conflicting dependency toprovidedinpom.xml.Exclude the specific class from the dependency.
Use Maven Shade Plugin to relocate (shade) the class.
Confirm which JAR a class is loaded from: Add the following JVM parameter to
flink-conf.yamlor pass it dynamically:# flink-conf.yaml env.java.opts: -verbose:classOr pass it dynamically:
-Denv.java.opts="-verbose:class"Class loading information is recorded in
jobmanager.outortaskmanager.out.
Multiple factories for identifier found in the classpath {#multiple-factories-error}
This error means multiple implementations of the same connector are on the classpath — typically because a connector dependency is declared in the job's JAR and the connector JAR is also manually placed in $FLINK_HOME/lib.
Remove the duplicate. See NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict) for how to identify and resolve classpath conflicts.
UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS {#oss-error-1}
Dataflow clusters use the built-in JindoSDK for password-free access to Object Storage Service (OSS), which already supports APIs such as StreamingFileSink. Adding the community OSS plugin on top of JindoSDK causes a dependency conflict.
Check whether the oss-fs-hadoop directory exists in $FLINK_HOME/plugins. If it does, delete it and resubmit the job.
Could not find a file system implementation for scheme 'oss' {#section_1b6bc757}
This error affects clusters running EMR V3.40 or earlier, where Jindo-related JAR packages may be missing on nodes other than master-1-1.
EMR V3.40.0 and earlier
Check whether jindo-flink-4.0.0-full.jar (or similar) exists in $FLINK_HOME/lib on the node you use to submit jobs. If it is missing, copy it:
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/libThen resubmit the job.
EMR versions later than V3.40.0
| Deployment mode | Action required |
|---|---|
| Flink on YARN | None. OSS access is handled automatically even without Jindo JARs in $FLINK_HOME/lib. |
| Other modes | Check for Jindo JARs in $FLINK_HOME/lib. If missing, run the copy command above and resubmit. |
java.util.concurrent.TimeoutException: Heartbeat of TaskManager timed out {#taskmanager-heartbeat-timeout}
The direct cause is a TaskManager heartbeat timeout. Check the TaskManager logs for the underlying error — a common root cause is an out of memory (OOM) error due to insufficient heap memory or a memory leak in the job code.
If OOM is the cause, increase the TaskManager memory allocation or analyze the job's memory usage. See java.lang.OutOfMemoryError: GC overhead limit exceeded for steps to generate and analyze a heap dump.
java.lang.OutOfMemoryError: GC overhead limit exceeded {#gc-overhead-limit}
Garbage collection (GC) is timing out because the job has insufficient memory. The most common causes are memory leaks in user-defined functions (UDFs) or insufficient memory for the workload.
To diagnose, generate a heap dump when the error occurs. Pass the JVM parameter when resubmitting:
-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"Or add it to flink-conf.yaml:
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprofAfter reproducing the error, analyze the heap dump at the path specified by HeapDumpPath using Memory Analyzer Tool (MAT) or Java VisualVM.
java.lang.NoSuchFieldError: DEPLOYMENT_MODE {#deployment-mode-error}
The job JAR includes a flink-core dependency whose version is incompatible with the Flink version on the cluster.
Add flink-core to pom.xml with scope set to provided:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<!-- Replace with the Flink version used by your cluster -->
<version>1.16.1</version>
<scope>provided</scope>
</dependency>For background on how incompatible dependencies get introduced, see NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (JAR conflict).
Why is only one operator shown and Records Received = 0? {#single-operator-zero-records}
This is expected behavior. The Records Received metric counts data exchanged between different operators. When Flink optimizes a job into a single operator (chaining all operators together), there is no inter-operator data exchange, so the value is always 0.
How do I enable a flame graph? {#enable-flame-graph}
Flame graphs visualize CPU usage per method, which helps identify performance bottlenecks. The feature is available since Flink 1.13 but is disabled by default to avoid impacting production jobs.
To enable it, go to the Configure tab on the Flink service page in the EMR console, click flink-conf.yaml, and add:
rest.flamegraph.enabled: trueFor details on adding configuration items, see Manage configuration items. For more about flame graphs, see Apache Flink flame graphs.