Topik ini menjawab pertanyaan umum mengenai kluster Dataflow.
Operasi dan O&M kluster
Job Issues
Kesalahan
NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR)
UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS
Could not find a file system implementation for scheme 'oss'
java.util.concurrent.TimeoutException: Heartbeat of TaskManager timed out
Mengapa hanya satu operator yang ditampilkan dan Records Received = 0?
Bagaimana cara mengirimkan job Flink dari client eksternal? {#submit-from-external-client}
Pastikan client eksternal dapat mengakses jaringan kluster Dataflow.
Siapkan lingkungan Hadoop YARN pada client. Salin direktori berikut dari kluster Dataflow ke client, lalu konfigurasikan variabel lingkungan pada client:
/opt/apps/YARN/yarn-current— instalasi Hadoop YARN/etc/taihao-apps/hadoop-conf/— file konfigurasi Hadoop
PentingFile konfigurasi Hadoop seperti
yarn-site.xmlmenggunakan fully qualified domain names (FQDN) sebagai alamat layanan — contohnya,master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com. Pastikan FQDN tersebut dapat diselesaikan dari client, atau ganti dengan alamat IP. Lihat Bagaimana cara menyelesaikan hostname dari client eksternal?.export HADOOP_HOME=/path/to/yarn-current && \ export PATH=${HADOOP_HOME}/bin/:$PATH && \ export HADOOP_CLASSPATH=$(hadoop classpath) && \ export HADOOP_CONF_DIR=/path/to/hadoop-confKirimkan job Flink. Contohnya:
flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jarSetelah pengiriman, lihat job tersebut di UI web YARN kluster Dataflow.
Bagaimana cara menyelesaikan hostname dari client eksternal? {#resolve-hostnames}
Gunakan salah satu metode berikut:
Edit `/etc/hosts`: Tambahkan pemetaan antara hostname kluster dan alamat IP-nya.
Gunakan [Alibaba Cloud DNS PrivateZone](https://www.alibabacloud.com/help/en/document_detail/64611.html#topic-2036614): Konfigurasikan resolusi DNS privat untuk domain kluster.
Gunakan layanan DNS kustom: Tambahkan parameter JVM berikut ke konfigurasi Flink Anda:
env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"
Bagaimana cara melihat status job Flink? {#view-job-status}
Tiga opsi tersedia:
Konsol EMR: E-MapReduce (EMR) mendukung Apache Knox, yang menyediakan akses ke UI web YARN dan Flink melalui internet. Buka Dashboard Apache Flink dari UI web YARN. Untuk detailnya, lihat Lihat status job di UI web Flink (VVR).
SSH tunnel: Untuk detailnya, lihat Buat SSH tunnel untuk mengakses UI web komponen open source.
YARN RESTful API:
Port 8443 dan 8088 harus dibuka di security group Anda, atau client dan kluster Dataflow harus berada dalam virtual private cloud (VPC) yang sama.
curl --compressed -v -H "Accept: application/json" -X GET \ "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
Bagaimana cara melihat log job Flink? {#view-job-logs}
Job sedang berjalan: Lihat log di UI web job tersebut.
Job selesai: Lihat statistik di Flink HistoryServer, atau jalankan:
yarn logs -applicationId application_xxxx_yyyySecara default, log untuk job yang telah selesai disimpan di
hdfs:///tmp/logs/$USERNAME/logs/pada Hadoop Distributed File System (HDFS).
Bagaimana cara mengakses Flink HistoryServer? {#access-historyserver}
Flink HistoryServer berjalan di port 18082 pada node master-1-1 (node pertama dalam grup server master). Layanan ini mengumpulkan statistik job Flink yang telah selesai tetapi tidak menyimpan log job.
Untuk mengaksesnya:
Buka port 18082 di aturan security group untuk node
master-1-1.Buka
http://<master-1-1-ip>:18082.
Untuk melihat log job yang telah selesai, gunakan API YARN atau UI web YARN sebagai gantinya.
Bagaimana cara menggunakan connector komersial? {#use-commercial-connectors}
Fitur ini hanya tersedia di kluster Dataflow. Connector yang tersedia mencakup Hologres, Log Service, MaxCompute, DataHub, Elasticsearch, dan ClickHouse. Contoh berikut menggunakan connector Hologres.
Langkah 1: Pasang konektor pada mesin lokal Anda
Paket JAR connector disimpan di /opt/apps/FLINK/flink-current/opt/connectors/ pada kluster Dataflow. Instal JAR tersebut ke repositori Maven lokal Anda:
mvn install:install-file \
-Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar \
-DgroupId=com.alibaba.ververica \
-DartifactId=ververica-connector-hologres \
-Dversion=1.13-vvr-4.0.7 \
-Dpackaging=jarTambahkan dependensi ke pom.xml Anda dengan scope diatur ke provided:
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>1.13-vvr-4.0.7</version>
<scope>provided</scope>
</dependency>Langkah 2: Jadikan connector tersedia saat runtime
Pilih salah satu metode berikut:
Metode 1 — HDFS: Salin JAR connector ke HDFS dan rujuk saat mengirimkan job:
hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/ hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar \ hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jarTambahkan ke perintah submit Anda:
-D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/Metode 2 — Client lokal: Salin JAR ke path yang sama pada client submit seperti yang ada di kluster:
/opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jarTambahkan ke perintah submit Anda:
-C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jarMetode 3 — Bundle ke dalam JAR job: Masukkan langsung JAR connector ke dalam JAR job Anda.
Bagaimana cara menggunakan GeminiStateBackend? {#use-gemini-state-backend}
GeminiStateBackend hanya tersedia di kluster Dataflow. Ini adalah state backend edisi enterprise dengan performa 3–5x lebih tinggi daripada state backend open source, dan diaktifkan secara default dalam file konfigurasi kluster Dataflow.
Untuk opsi konfigurasi lanjutan, lihat Konfigurasi GeminiStateBackend.
Bagaimana cara beralih ke state backend open source? {#use-open-source-state-backend}
Kluster Dataflow menggunakan GeminiStateBackend secara default. Untuk beralih ke state backend open source seperti RocksDB untuk job tertentu, berikan parameter -D saat mengirimkan job:
flink run-application -t yarn-application \
-D state.backend=rocksdb \
/opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jarUntuk menerapkan perubahan ke semua job berikutnya, perbarui parameter state.backend di konsol EMR: buka tab Configure di halaman layanan Flink, klik flink-conf.yaml, atur nilainya, klik Save, lalu klik Deploy Client Configuration.

Bagaimana cara mengaktifkan high availability JobManager? {#enable-jobmanager-ha}
Di kluster Dataflow, Flink berjalan di atas YARN. Aktifkan high availability (HA) untuk JobManager dengan mengonfigurasi HA berbasis ZooKeeper seperti yang dijelaskan dalam Konfigurasi HA Apache Flink.
Tambahkan konfigurasi berikut ke flink-conf.yaml:
high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recoverySecara default, setelah HA diaktifkan, JobManager hanya dapat direstart sekali per kegagalan. Untuk mengizinkan lebih banyak restart, konfigurasikan parameter yarn.resourcemanager.am.max-attempts untuk YARN dan parameter yarn.application-attempts untuk Flink.
Untuk mencegah JobManager restart terlalu sering, naikkan nilai yarn.application-attempt-failures-validity-interval dari nilai default 10000 (10 detik) menjadi 300000 (5 menit).
Bagaimana cara melihat metrik job Flink? {#view-job-metrics}
Login ke konsol EMR, buka tab Monitoring kluster Anda, lalu klik Metric Monitoring.
Pilih FLINK dari daftar drop-down Dashboard.
Pilih application ID dan job ID untuk melihat metrik job tersebut.
Application ID dan job ID hanya muncul saat job Flink sedang aktif berjalan.
Beberapa metrik, seperti sourceIdleTime, memerlukan konfigurasi baik source maupun sink sebelum melaporkan data.Bagaimana cara memecahkan masalah penyimpanan hulu dan hilir? {#troubleshoot-storage}
Lihat Penyimpanan hulu dan hilir.
Di mana log client disimpan? {#client-logs}
Variabel lingkungan FLINK_LOG_DIR menentukan direktori log client. Nilai default-nya adalah /var/log/taihao-apps/flink. Pada versi EMR sebelum V3.43.0, nilai default-nya adalah /mnt/disk1/log/flink.
Mengapa parameter run flink saya tidak berlaku? {#parameters-not-taking-effect}
Parameter job harus ditempatkan setelah path file JAR dalam perintah. Contohnya:
flink run -d -t yarn-per-job test.jar arg1 arg2Parameter yang ditempatkan sebelum file JAR dianggap sebagai opsi framework Flink, bukan argumen job.
Di mana log kluster disimpan? {#cluster-logs}
Cara mengakses log bergantung pada apakah JobManager sedang berjalan:
JobManager telah berhenti: Tarik log dengan:
yarn logs -applicationId application_xxxx_yyAtau akses tautan log untuk job yang telah selesai di UI web YARN.
JobManager sedang berjalan:
Lihat log di UI web job Flink, atau
Ambil log JobManager: ``
bash yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log``Ambil log TaskManager: ``
bash yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log``
NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR) {#jar-conflict}
Error ini menunjukkan adanya konflik JAR antara dependensi job dan instalasi Flink di kluster. Untuk menyelesaikannya:
Identifikasi kelas yang bentrok: Periksa log error untuk nama kelasnya, lalu jalankan perintah berikut di direktori yang berisi
pom.xmlAnda untuk memeriksa pohon dependensi:mvn dependency:treeSelesaikan konflik menggunakan salah satu pendekatan berikut:
Ubah
scopedependensi yang bentrok menjadiprovideddipom.xml.Keluarkan kelas tertentu dari dependensi tersebut.
Gunakan Maven Shade Plugin untuk memindahkan (shade) kelas tersebut.
Konfirmasi dari JAR mana kelas dimuat: Tambahkan parameter JVM berikut ke
flink-conf.yamlatau berikan secara dinamis:# flink-conf.yaml env.java.opts: -verbose:classAtau berikan secara dinamis:
-Denv.java.opts="-verbose:class"Informasi pemuatan kelas dicatat di
jobmanager.outatautaskmanager.out.
Multiple factories for identifier found in the classpath {#multiple-factories-error}
Error ini berarti terdapat beberapa implementasi connector yang sama di classpath — biasanya karena dependensi connector dideklarasikan di JAR job dan JAR connector juga ditempatkan secara manual di $FLINK_HOME/lib.
Hapus duplikat tersebut. Lihat NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR) untuk cara mengidentifikasi dan menyelesaikan konflik classpath.
UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS {#oss-error-1}
Kluster Dataflow menggunakan JindoSDK bawaan untuk akses tanpa password ke Object Storage Service (OSS), yang sudah mendukung API seperti StreamingFileSink. Menambahkan plugin OSS komunitas di atas JindoSDK menyebabkan konflik dependensi.
Periksa apakah direktori oss-fs-hadoop ada di $FLINK_HOME/plugins. Jika ada, hapus dan kirim ulang job tersebut.
Could not find a file system implementation for scheme 'oss' {#section_1b6bc757}
Error ini memengaruhi kluster yang menjalankan EMR V3.40 atau lebih lama, di mana paket JAR terkait Jindo mungkin tidak tersedia di node selain master-1-1.
EMR V3.40.0 dan lebih lama
Periksa apakah jindo-flink-4.0.0-full.jar (atau serupa) ada di $FLINK_HOME/lib pada node yang Anda gunakan untuk mengirimkan job. Jika tidak ada, salin file tersebut:
cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/libLalu kirim ulang job tersebut.
Versi EMR setelah V3.40.0
| Mode deployment | Tindakan yang diperlukan |
|---|---|
| Flink on YARN | Tidak diperlukan tindakan. Akses OSS ditangani secara otomatis bahkan tanpa JAR Jindo di $FLINK_HOME/lib. |
| Mode lainnya | Periksa keberadaan JAR Jindo di $FLINK_HOME/lib. Jika tidak ada, jalankan perintah salin di atas dan kirim ulang job. |
java.util.concurrent.TimeoutException: Heartbeat of TaskManager timed out {#taskmanager-heartbeat-timeout}
Penyebab langsungnya adalah timeout heartbeat TaskManager. Periksa log TaskManager untuk menemukan error dasarnya — penyebab umumnya adalah error out of memory (OOM) akibat memori heap tidak mencukupi atau memory leak pada kode job.
Jika OOM adalah penyebabnya, tambah alokasi memori TaskManager atau analisis penggunaan memori job tersebut. Lihat java.lang.OutOfMemoryError: GC overhead limit exceeded untuk langkah-langkah menghasilkan dan menganalisis heap dump.
java.lang.OutOfMemoryError: GC overhead limit exceeded {#gc-overhead-limit}
Garbage collection (GC) mengalami timeout karena job memiliki memori yang tidak mencukupi. Penyebab paling umum adalah memory leak pada user-defined function (UDF) atau memori tidak mencukupi untuk workload tersebut.
Untuk mendiagnosis, hasilkan heap dump saat error terjadi. Berikan parameter JVM saat mengirim ulang job:
-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"Atau tambahkan ke flink-conf.yaml:
env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprofSetelah mereproduksi error, analisis heap dump di path yang ditentukan oleh HeapDumpPath menggunakan Memory Analyzer Tool (MAT) atau Java VisualVM.
java.lang.NoSuchFieldError: DEPLOYMENT_MODE {#deployment-mode-error}
JAR job menyertakan dependensi flink-core yang versinya tidak kompatibel dengan versi Flink di kluster.
Tambahkan flink-core ke pom.xml dengan scope diatur ke provided:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<!-- Ganti dengan versi Flink yang digunakan oleh kluster Anda -->
<version>1.16.1</version>
<scope>provided</scope>
</dependency>Untuk latar belakang bagaimana dependensi yang tidak kompatibel masuk, lihat NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR).
Mengapa hanya satu operator yang ditampilkan dan Records Received = 0? {#single-operator-zero-records}
Ini adalah perilaku yang diharapkan. Metrik Records Received menghitung data yang dipertukarkan antar operator berbeda. Saat Flink mengoptimalkan job menjadi satu operator tunggal (menggabungkan semua operator), tidak ada pertukaran data antar operator, sehingga nilainya selalu 0.
Bagaimana cara mengaktifkan flame graph? {#enable-flame-graph}
Flame graph memvisualisasikan penggunaan CPU per metode, yang membantu mengidentifikasi bottleneck performa. Fitur ini tersedia sejak Flink 1.13 tetapi dinonaktifkan secara default untuk menghindari dampak pada job produksi.
Untuk mengaktifkannya, buka tab Configure di halaman layanan Flink di konsol EMR, klik flink-conf.yaml, lalu tambahkan:
rest.flamegraph.enabled: trueUntuk detail cara menambahkan item konfigurasi, lihat Kelola item konfigurasi. Untuk informasi lebih lanjut tentang flame graph, lihat Flame graph Apache Flink.