All Products
Search
Document Center

E-MapReduce:FAQ kluster Dataflow

Last Updated:Mar 26, 2026

Topik ini menjawab pertanyaan umum mengenai kluster Dataflow.

Operasi dan O&M kluster

Job Issues

Kesalahan

Bagaimana cara mengirimkan job Flink dari client eksternal? {#submit-from-external-client}

  1. Pastikan client eksternal dapat mengakses jaringan kluster Dataflow.

  2. Siapkan lingkungan Hadoop YARN pada client. Salin direktori berikut dari kluster Dataflow ke client, lalu konfigurasikan variabel lingkungan pada client:

    • /opt/apps/YARN/yarn-current — instalasi Hadoop YARN

    • /etc/taihao-apps/hadoop-conf/ — file konfigurasi Hadoop

    Penting

    File konfigurasi Hadoop seperti yarn-site.xml menggunakan fully qualified domain names (FQDN) sebagai alamat layanan — contohnya, master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com. Pastikan FQDN tersebut dapat diselesaikan dari client, atau ganti dengan alamat IP. Lihat Bagaimana cara menyelesaikan hostname dari client eksternal?.

    export HADOOP_HOME=/path/to/yarn-current && \
    export PATH=${HADOOP_HOME}/bin/:$PATH && \
    export HADOOP_CLASSPATH=$(hadoop classpath) && \
    export HADOOP_CONF_DIR=/path/to/hadoop-conf
  3. Kirimkan job Flink. Contohnya:

    flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar

    Setelah pengiriman, lihat job tersebut di UI web YARN kluster Dataflow.

Bagaimana cara menyelesaikan hostname dari client eksternal? {#resolve-hostnames}

Gunakan salah satu metode berikut:

  • Edit `/etc/hosts`: Tambahkan pemetaan antara hostname kluster dan alamat IP-nya.

  • Gunakan [Alibaba Cloud DNS PrivateZone](https://www.alibabacloud.com/help/en/document_detail/64611.html#topic-2036614): Konfigurasikan resolusi DNS privat untuk domain kluster.

  • Gunakan layanan DNS kustom: Tambahkan parameter JVM berikut ke konfigurasi Flink Anda:

    env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"

Bagaimana cara melihat status job Flink? {#view-job-status}

Tiga opsi tersedia:

  • Konsol EMR: E-MapReduce (EMR) mendukung Apache Knox, yang menyediakan akses ke UI web YARN dan Flink melalui internet. Buka Dashboard Apache Flink dari UI web YARN. Untuk detailnya, lihat Lihat status job di UI web Flink (VVR).

  • SSH tunnel: Untuk detailnya, lihat Buat SSH tunnel untuk mengakses UI web komponen open source.

  • YARN RESTful API:

    Port 8443 dan 8088 harus dibuka di security group Anda, atau client dan kluster Dataflow harus berada dalam virtual private cloud (VPC) yang sama.
    curl --compressed -v -H "Accept: application/json" -X GET \
      "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"

Bagaimana cara melihat log job Flink? {#view-job-logs}

  • Job sedang berjalan: Lihat log di UI web job tersebut.

  • Job selesai: Lihat statistik di Flink HistoryServer, atau jalankan:

    yarn logs -applicationId application_xxxx_yyyy

    Secara default, log untuk job yang telah selesai disimpan di hdfs:///tmp/logs/$USERNAME/logs/ pada Hadoop Distributed File System (HDFS).

Bagaimana cara mengakses Flink HistoryServer? {#access-historyserver}

Flink HistoryServer berjalan di port 18082 pada node master-1-1 (node pertama dalam grup server master). Layanan ini mengumpulkan statistik job Flink yang telah selesai tetapi tidak menyimpan log job.

Untuk mengaksesnya:

  1. Buka port 18082 di aturan security group untuk node master-1-1.

  2. Buka http://<master-1-1-ip>:18082.

Untuk melihat log job yang telah selesai, gunakan API YARN atau UI web YARN sebagai gantinya.

Bagaimana cara menggunakan connector komersial? {#use-commercial-connectors}

Fitur ini hanya tersedia di kluster Dataflow. Connector yang tersedia mencakup Hologres, Log Service, MaxCompute, DataHub, Elasticsearch, dan ClickHouse. Contoh berikut menggunakan connector Hologres.

Langkah 1: Pasang konektor pada mesin lokal Anda

Paket JAR connector disimpan di /opt/apps/FLINK/flink-current/opt/connectors/ pada kluster Dataflow. Instal JAR tersebut ke repositori Maven lokal Anda:

mvn install:install-file \
  -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar \
  -DgroupId=com.alibaba.ververica \
  -DartifactId=ververica-connector-hologres \
  -Dversion=1.13-vvr-4.0.7 \
  -Dpackaging=jar

Tambahkan dependensi ke pom.xml Anda dengan scope diatur ke provided:

<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>ververica-connector-hologres</artifactId>
    <version>1.13-vvr-4.0.7</version>
    <scope>provided</scope>
</dependency>

Langkah 2: Jadikan connector tersedia saat runtime

Pilih salah satu metode berikut:

  • Metode 1 — HDFS: Salin JAR connector ke HDFS dan rujuk saat mengirimkan job:

    hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/
    hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar \
      hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar

    Tambahkan ke perintah submit Anda:

    -D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
  • Metode 2 — Client lokal: Salin JAR ke path yang sama pada client submit seperti yang ada di kluster:

    /opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar

    Tambahkan ke perintah submit Anda:

    -C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
  • Metode 3 — Bundle ke dalam JAR job: Masukkan langsung JAR connector ke dalam JAR job Anda.

Bagaimana cara menggunakan GeminiStateBackend? {#use-gemini-state-backend}

GeminiStateBackend hanya tersedia di kluster Dataflow. Ini adalah state backend edisi enterprise dengan performa 3–5x lebih tinggi daripada state backend open source, dan diaktifkan secara default dalam file konfigurasi kluster Dataflow.

Untuk opsi konfigurasi lanjutan, lihat Konfigurasi GeminiStateBackend.

Bagaimana cara beralih ke state backend open source? {#use-open-source-state-backend}

Kluster Dataflow menggunakan GeminiStateBackend secara default. Untuk beralih ke state backend open source seperti RocksDB untuk job tertentu, berikan parameter -D saat mengirimkan job:

flink run-application -t yarn-application \
  -D state.backend=rocksdb \
  /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

Untuk menerapkan perubahan ke semua job berikutnya, perbarui parameter state.backend di konsol EMR: buka tab Configure di halaman layanan Flink, klik flink-conf.yaml, atur nilainya, klik Save, lalu klik Deploy Client Configuration.

Configure parameters

Bagaimana cara mengaktifkan high availability JobManager? {#enable-jobmanager-ha}

Di kluster Dataflow, Flink berjalan di atas YARN. Aktifkan high availability (HA) untuk JobManager dengan mengonfigurasi HA berbasis ZooKeeper seperti yang dijelaskan dalam Konfigurasi HA Apache Flink.

Tambahkan konfigurasi berikut ke flink-conf.yaml:

high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
Penting

Secara default, setelah HA diaktifkan, JobManager hanya dapat direstart sekali per kegagalan. Untuk mengizinkan lebih banyak restart, konfigurasikan parameter yarn.resourcemanager.am.max-attempts untuk YARN dan parameter yarn.application-attempts untuk Flink.

Untuk mencegah JobManager restart terlalu sering, naikkan nilai yarn.application-attempt-failures-validity-interval dari nilai default 10000 (10 detik) menjadi 300000 (5 menit).

Bagaimana cara melihat metrik job Flink? {#view-job-metrics}

  1. Login ke konsol EMR, buka tab Monitoring kluster Anda, lalu klik Metric Monitoring.

  2. Pilih FLINK dari daftar drop-down Dashboard.

  3. Pilih application ID dan job ID untuk melihat metrik job tersebut.

Application ID dan job ID hanya muncul saat job Flink sedang aktif berjalan.
Beberapa metrik, seperti sourceIdleTime, memerlukan konfigurasi baik source maupun sink sebelum melaporkan data.

Bagaimana cara memecahkan masalah penyimpanan hulu dan hilir? {#troubleshoot-storage}

Lihat Penyimpanan hulu dan hilir.

Di mana log client disimpan? {#client-logs}

Variabel lingkungan FLINK_LOG_DIR menentukan direktori log client. Nilai default-nya adalah /var/log/taihao-apps/flink. Pada versi EMR sebelum V3.43.0, nilai default-nya adalah /mnt/disk1/log/flink.

Mengapa parameter run flink saya tidak berlaku? {#parameters-not-taking-effect}

Parameter job harus ditempatkan setelah path file JAR dalam perintah. Contohnya:

flink run -d -t yarn-per-job test.jar arg1 arg2

Parameter yang ditempatkan sebelum file JAR dianggap sebagai opsi framework Flink, bukan argumen job.

Di mana log kluster disimpan? {#cluster-logs}

Cara mengakses log bergantung pada apakah JobManager sedang berjalan:

  • JobManager telah berhenti: Tarik log dengan:

    yarn logs -applicationId application_xxxx_yy

    Atau akses tautan log untuk job yang telah selesai di UI web YARN.

  • JobManager sedang berjalan:

    • Lihat log di UI web job Flink, atau

    • Ambil log JobManager: ``bash yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log ``

    • Ambil log TaskManager: ``bash yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log ``

NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR) {#jar-conflict}

Error ini menunjukkan adanya konflik JAR antara dependensi job dan instalasi Flink di kluster. Untuk menyelesaikannya:

  1. Identifikasi kelas yang bentrok: Periksa log error untuk nama kelasnya, lalu jalankan perintah berikut di direktori yang berisi pom.xml Anda untuk memeriksa pohon dependensi:

    mvn dependency:tree
  2. Selesaikan konflik menggunakan salah satu pendekatan berikut:

    • Ubah scope dependensi yang bentrok menjadi provided di pom.xml.

    • Keluarkan kelas tertentu dari dependensi tersebut.

    • Gunakan Maven Shade Plugin untuk memindahkan (shade) kelas tersebut.

  3. Konfirmasi dari JAR mana kelas dimuat: Tambahkan parameter JVM berikut ke flink-conf.yaml atau berikan secara dinamis:

    # flink-conf.yaml
    env.java.opts: -verbose:class

    Atau berikan secara dinamis:

    -Denv.java.opts="-verbose:class"

    Informasi pemuatan kelas dicatat di jobmanager.out atau taskmanager.out.

Multiple factories for identifier found in the classpath {#multiple-factories-error}

Error ini berarti terdapat beberapa implementasi connector yang sama di classpath — biasanya karena dependensi connector dideklarasikan di JAR job dan JAR connector juga ditempatkan secara manual di $FLINK_HOME/lib.

Hapus duplikat tersebut. Lihat NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR) untuk cara mengidentifikasi dan menyelesaikan konflik classpath.

UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS {#oss-error-1}

Kluster Dataflow menggunakan JindoSDK bawaan untuk akses tanpa password ke Object Storage Service (OSS), yang sudah mendukung API seperti StreamingFileSink. Menambahkan plugin OSS komunitas di atas JindoSDK menyebabkan konflik dependensi.

Periksa apakah direktori oss-fs-hadoop ada di $FLINK_HOME/plugins. Jika ada, hapus dan kirim ulang job tersebut.

Could not find a file system implementation for scheme 'oss' {#section_1b6bc757}

Error ini memengaruhi kluster yang menjalankan EMR V3.40 atau lebih lama, di mana paket JAR terkait Jindo mungkin tidak tersedia di node selain master-1-1.

EMR V3.40.0 dan lebih lama

Periksa apakah jindo-flink-4.0.0-full.jar (atau serupa) ada di $FLINK_HOME/lib pada node yang Anda gunakan untuk mengirimkan job. Jika tidak ada, salin file tersebut:

cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib

Lalu kirim ulang job tersebut.

Versi EMR setelah V3.40.0

Mode deploymentTindakan yang diperlukan
Flink on YARNTidak diperlukan tindakan. Akses OSS ditangani secara otomatis bahkan tanpa JAR Jindo di $FLINK_HOME/lib.
Mode lainnyaPeriksa keberadaan JAR Jindo di $FLINK_HOME/lib. Jika tidak ada, jalankan perintah salin di atas dan kirim ulang job.

java.util.concurrent.TimeoutException: Heartbeat of TaskManager timed out {#taskmanager-heartbeat-timeout}

Penyebab langsungnya adalah timeout heartbeat TaskManager. Periksa log TaskManager untuk menemukan error dasarnya — penyebab umumnya adalah error out of memory (OOM) akibat memori heap tidak mencukupi atau memory leak pada kode job.

Jika OOM adalah penyebabnya, tambah alokasi memori TaskManager atau analisis penggunaan memori job tersebut. Lihat java.lang.OutOfMemoryError: GC overhead limit exceeded untuk langkah-langkah menghasilkan dan menganalisis heap dump.

java.lang.OutOfMemoryError: GC overhead limit exceeded {#gc-overhead-limit}

Garbage collection (GC) mengalami timeout karena job memiliki memori yang tidak mencukupi. Penyebab paling umum adalah memory leak pada user-defined function (UDF) atau memori tidak mencukupi untuk workload tersebut.

Untuk mendiagnosis, hasilkan heap dump saat error terjadi. Berikan parameter JVM saat mengirim ulang job:

-D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof"

Atau tambahkan ke flink-conf.yaml:

env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof

Setelah mereproduksi error, analisis heap dump di path yang ditentukan oleh HeapDumpPath menggunakan Memory Analyzer Tool (MAT) atau Java VisualVM.

java.lang.NoSuchFieldError: DEPLOYMENT_MODE {#deployment-mode-error}

JAR job menyertakan dependensi flink-core yang versinya tidak kompatibel dengan versi Flink di kluster.

Tambahkan flink-core ke pom.xml dengan scope diatur ke provided:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-core</artifactId>
  <!-- Ganti dengan versi Flink yang digunakan oleh kluster Anda -->
  <version>1.16.1</version>
  <scope>provided</scope>
</dependency>

Untuk latar belakang bagaimana dependensi yang tidak kompatibel masuk, lihat NoSuchFieldError / NoSuchMethodError / ClassNotFoundException (konflik JAR).

Mengapa hanya satu operator yang ditampilkan dan Records Received = 0? {#single-operator-zero-records}

Ini adalah perilaku yang diharapkan. Metrik Records Received menghitung data yang dipertukarkan antar operator berbeda. Saat Flink mengoptimalkan job menjadi satu operator tunggal (menggabungkan semua operator), tidak ada pertukaran data antar operator, sehingga nilainya selalu 0.

Bagaimana cara mengaktifkan flame graph? {#enable-flame-graph}

Flame graph memvisualisasikan penggunaan CPU per metode, yang membantu mengidentifikasi bottleneck performa. Fitur ini tersedia sejak Flink 1.13 tetapi dinonaktifkan secara default untuk menghindari dampak pada job produksi.

Untuk mengaktifkannya, buka tab Configure di halaman layanan Flink di konsol EMR, klik flink-conf.yaml, lalu tambahkan:

rest.flamegraph.enabled: true

Untuk detail cara menambahkan item konfigurasi, lihat Kelola item konfigurasi. Untuk informasi lebih lanjut tentang flame graph, lihat Flame graph Apache Flink.