全部产品
Search
文档中心

E-MapReduce:FAQ

更新时间:Jul 06, 2025

Topik ini memberikan jawaban atas beberapa pertanyaan yang sering diajukan tentang kluster Dataflow.

Di mana log disimpan di dalam kluster? Bagaimana cara saya melihat log tersebut?

Anda dapat melihat log kluster berdasarkan status JobManager:

  • Jika JobManager kluster Flink telah berhenti, Anda dapat melihat log dengan menjalankan perintah yarn logs -applicationId application_xxxx_yy pada node kluster untuk menarik log ke mesin lokal. Anda juga dapat mengakses tautan log job yang selesai pada antarmuka web YARN untuk melihat log secara langsung.

  • Jika JobManager kluster Flink sedang berjalan, Anda dapat melihat log dengan salah satu cara berikut:

    • Lihat log pada antarmuka web job Flink.

    • Jalankan perintah yarn logs -applicationId application_xxxx_yy -am ALL -logFiles jobmanager.log untuk melihat log JobManager. Jalankan perintah yarn logs -applicationId application_xxxx_yy -containerId container_xxxx_yy_aa_bb -logFiles taskmanager.log untuk melihat log TaskManager.

Apa yang harus saya lakukan jika paket JAR sebuah job bertentangan dengan paket JAR Flink di dalam kluster?

Dalam banyak kasus, kesalahan seperti NoSuchFieldError, NoSuchMethodError, atau ClassNotFoundException dicatat dalam log job jika masalah ini terjadi. Anda dapat melakukan langkah-langkah berikut untuk mendiagnosis dan menyelesaikan masalah:

  1. Identifikasi kelas dependensi yang menyebabkan konflik. Periksa log kesalahan untuk menemukan kelas yang menyebabkan konflik. Temukan paket JAR tempat kelas tersebut berada. Kemudian, jalankan perintah mvn dependency:tree di direktori tempat file pom.xml untuk job disimpan untuk melihat pohon dependensi JAR.

  2. Keluarkan kelas dependensi yang menyebabkan konflik.

    • Jika parameter scope dari paket JAR salah diatur dalam file pom.xml, Anda dapat mengubah nilai parameter scope menjadi provided untuk mengecualikan paket JAR.

    • Jika Anda harus menggunakan paket JAR tempat kelas yang menyebabkan konflik berada, Anda dapat mengecualikan kelas dalam dependensi.

    • Jika kelas yang menyebabkan konflik tidak dapat diganti oleh kelas versi yang sesuai di dalam kluster, Anda dapat menggunakan Maven Shade Plugin untuk men-shade kelas tersebut.

    Selain itu, jika beberapa versi paket JAR ditentukan dalam classpath, versi kelas yang digunakan oleh job bergantung pada urutan pemuatan kelas. Untuk mengonfirmasi dari paket JAR mana suatu kelas dimuat, Anda dapat menentukan parameter Java virtual machine (JVM) env.java.opts: -verbose:class dalam file flink-conf.yaml atau menentukan parameter dinamis -Denv.java.opts="-verbose:class". Dengan cara ini, sistem mencatat kelas-kelas yang dimuat dan paket JAR dari mana kelas-kelas tersebut dimuat.

    Catatan

    Untuk JobManager atau TaskManager, informasi sebelumnya dicatat dalam file jobmanager.out atau taskmanager.out.

Bagaimana cara saya mengirimkan job Flink ke kluster Dataflow menggunakan klien yang tidak diterapkan di dalam kluster Dataflow?

Lakukan langkah-langkah berikut untuk mengirimkan job Flink ke kluster Dataflow menggunakan klien eksternal:

  1. Pastikan bahwa kluster Dataflow terhubung ke klien.

  2. Konfigurasikan lingkungan Hadoop YARN untuk klien.

    Dalam kluster Dataflow, perangkat lunak Hadoop YARN diinstal di direktori /opt/apps/YARN/yarn-current dan file konfigurasi disimpan di direktori /etc/taihao-apps/hadoop-conf/. Anda harus mengunduh file di direktori yarn-current dan hadoop-conf dan menyimpan file tersebut di klien yang digunakan untuk mengirimkan job Flink.

    Kemudian, jalankan perintah berikut pada klien untuk mengonfigurasi variabel lingkungan:

    export HADOOP_HOME=/path/to/yarn-current && \
    export PATH=${HADOOP_HOME}/bin/:$PATH && \
    export HADOOP_CLASSPATH=$(hadoop classpath) && \
    export HADOOP_CONF_DIR=/path/to/hadoop-conf
    Penting

    Dalam file konfigurasi Hadoop seperti yarn-site.xml, komponen seperti ResourceManager menggunakan nama domain lengkap (FQDN) sebagai alamat layanan. Contoh: master-1-1.c-xxxxxxxxxx.cn-hangzhou.emr.aliyuncs.com. Oleh karena itu, jika Anda mengirimkan job Flink menggunakan klien eksternal, pastikan bahwa FQDN ini dapat diselesaikan atau ubah FQDN menjadi alamat IP.

  3. Setelah Anda menyelesaikan konfigurasi sebelumnya, mulailah job Flink pada klien eksternal. Sebagai contoh, Anda dapat menjalankan perintah flink run -d -t yarn-per-job -ynm flink-test $FLINK_HOME/examples/streaming/TopSpeedWindowing.jar untuk memulai job Flink. Kemudian, Anda dapat melihat job Flink pada antarmuka web YARN dari kluster Dataflow.

Jika saya menggunakan klien yang tidak diterapkan di dalam kluster Dataflow untuk mengirimkan job Flink, bagaimana cara saya menyelesaikan nama host yang ditentukan dalam file konfigurasi kluster Dataflow pada klien?

Gunakan salah satu metode berikut untuk menyelesaikan nama host yang ditentukan dalam file konfigurasi kluster Dataflow pada klien eksternal:

  • Ubah file /etc/hosts pada klien untuk menambahkan pemetaan antara nama host dan alamat IP.

  • Gunakan Alibaba Cloud DNS PrivateZone.

    Anda juga dapat menentukan parameter JVM berikut untuk menggunakan layanan DNS Anda sendiri:

    env.java.opts.client: "-Dsun.net.spi.nameservice.nameservers=xxx -Dsun.net.spi.nameservice.provider.1=dns,sun -Dsun.net.spi.nameservice.domain=yyy"

Bagaimana cara saya melihat status job Flink?

  • Gunakan konsol E-MapReduce (EMR).

    EMR mendukung Knox yang memungkinkan Anda mengakses antarmuka web layanan seperti YARN dan Flink melalui Internet. Anda dapat pergi ke halaman Apache Flink Dashboard dari antarmuka web YARN untuk melihat status job Flink. Untuk informasi lebih lanjut, lihat Lihat status job pada antarmuka web Flink (VVR).

  • Gunakan terowongan SSH. Untuk informasi lebih lanjut, lihat Buat terowongan SSH untuk mengakses antarmuka web komponen sumber terbuka.

  • Panggil API RESTful YARN.

    curl --compressed -v  -H "Accept: application/json" -X GET "http://master-1-1:8088/ws/v1/cluster/apps?states=RUNNING&queue=default&user.name=***"
    Catatan

    Pastikan port 8443 dan 8088 diaktifkan dalam grup keamanan Anda untuk mengizinkan panggilan ke API RESTful YARN. Atau, pastikan bahwa kluster Dataflow dan node yang ingin Anda akses berada di dalam virtual private cloud (VPC) yang sama.

Bagaimana cara saya melihat log job Flink?

  • Jika job Flink sedang berjalan, Anda dapat melihat log job Flink pada antarmuka webnya.

  • Jika job Flink selesai, Anda dapat melihat statistik job Flink pada Flink HistoryServer atau dengan menjalankan perintah yarn logs -applicationId application_xxxx_yyyy. Secara default, log job Flink yang selesai disimpan di direktori hdfs:///tmp/logs/$USERNAME/logs/ dari kluster Hadoop Distributed File System (HDFS).

Bagaimana cara saya mengakses Flink HistoryServer di dalam kluster Dataflow?

Secara default, Flink HistoryServer dimulai pada port 18082 dari node master-1-1, yang merupakan server pertama dari kelompok server master, di dalam kluster Dataflow. Flink HistoryServer mengumpulkan statistik pada job Flink yang selesai. Untuk mengakses Flink HistoryServer, lakukan langkah-langkah berikut:

  1. Konfigurasikan aturan grup keamanan untuk mengaktifkan akses ke port 18082 dari node master-1-1.

  2. Akses http://$master-1-1-ip:18082.

Penting

Flink HistoryServer tidak menyimpan log job Flink yang selesai. Untuk melihat log, Anda dapat menggunakan Operasi API YARN atau mengakses antarmuka web YARN.

Bagaimana cara saya menggunakan konektor komersial yang didukung oleh kluster Dataflow?

Kluster Dataflow menyediakan berbagai konektor komersial, seperti konektor Hologres, Log Service, MaxCompute, DataHub, Elasticsearch, dan ClickHouse. Anda dapat menggunakan konektor sumber terbuka atau konektor komersial dalam job Flink. Dalam contoh ini, konektor Hologres digunakan.

  • Instal konektor

    1. Unduh paket JAR konektor Hologres dan instal konektor Hologres ke Maven pada mesin lokal Anda. Paket JAR konektor Hologres disimpan di direktori /opt/apps/FLINK/flink-current/opt/connectors dari kluster Dataflow.

      mvn install:install-file -Dfile=/path/to/ververica-connector-hologres-1.13-vvr-4.0.7.jar -DgroupId=com.alibaba.ververica -DartifactId=ververica-connector-hologres -Dversion=1.13-vvr-4.0.7 -Dpackaging=jar
    2. Tambahkan dependensi berikut ke file pom.xml:

      <dependency>
          <groupId>com.alibaba.ververica</groupId>
          <artifactId>ververica-connector-hologres</artifactId>
          <version>1.13-vvr-4.0.7</version>
          <scope>provided</scope>
      </dependency>
  • Jalankan job

    • Metode 1:

      1. Salin paket JAR konektor Hologres ke direktori terpisah.

        hdfs mkdir hdfs:///flink-current/opt/connectors/hologres/
        hdfs cp hdfs:///flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar  hdfs:///flink-current/opt/connectors/hologres/ververica-connector-hologres-1.13-vvr-4.0.7.jar
      2. Tambahkan informasi berikut ke perintah yang digunakan untuk mengirimkan job:

        -D yarn.provided.lib.dirs=hdfs:///flink-current/opt/connectors/hologres/
    • Metode 2:

      1. Salin paket JAR konektor Hologres ke direktori /opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar dari klien yang Anda gunakan untuk mengirimkan job Flink. Direktori ini memiliki struktur yang sama dengan direktori yang menyimpan paket JAR konektor Hologres di dalam kluster Dataflow.

      2. Tambahkan informasi berikut ke perintah yang digunakan untuk mengirimkan job:

        -C file:///opt/apps/FLINK/flink-current/opt/connectors/ververica-connector-hologres-1.13-vvr-4.0.7.jar
    • Metode 3: Tambahkan paket JAR konektor Hologres ke paket JAR job yang ingin Anda jalankan.

Bagaimana cara saya menggunakan GeminiStateBackend?

Kluster Dataflow menyediakan GeminiStateBackend, yang merupakan backend status edisi perusahaan. Performa GeminiStateBackend tiga hingga lima kali lipat dari backend status sumber terbuka. Secara default, GeminiStateBackend digunakan dalam file konfigurasi kluster Dataflow. Untuk informasi lebih lanjut tentang konfigurasi lanjutan GeminiStateBackend, lihat Konfigurasi GeminiStateBackend.

Bagaimana cara saya menggunakan backend status sumber terbuka?

Secara default, GeminiStateBackend digunakan dalam file konfigurasi kluster Dataflow. Jika Anda ingin menggunakan backend status sumber terbuka seperti RocksDB untuk sebuah job, Anda dapat menggunakan parameter -D untuk menentukan backend status. Contoh:

flink run-application -t yarn-application -D state.backend=rocksdb  /opt/apps/FLINK/flink-current/examples/streaming/TopSpeedWindowing.jar

Jika Anda ingin konfigurasi sebelumnya berlaku untuk job berikutnya, ubah nilai parameter state.backend ke backend status yang ingin Anda gunakan di konsol E-MapReduce (EMR). Sebagai contoh, Anda dapat mengubah backend status menjadi RocksDB. Klik Save dan kemudian klik Deploy Client Configuration.Configure parameters

Di mana log klien disimpan? Bagaimana cara saya melihat log klien?

Dalam kluster EMR, variabel lingkungan FLINK_LOG_DIR menentukan direktori tempat log klien Flink disimpan. Direktori default adalah /var/log/taihao-apps/flink. Dalam versi EMR yang lebih awal dari V3.43.0, direktori default adalah /mnt/disk1/log/flink. Anda dapat melihat log lengkap klien, seperti SQL Client, dalam file yang sesuai di direktori ini.

Mengapa parameter job Flink tidak berpengaruh ketika saya menjalankan perintah flink run untuk memulai job?

Ketika Anda memulai job Flink dengan menjalankan perintah, Anda harus meletakkan parameter job Flink di belakang paket JAR job Flink. Contoh: flink run -d -t yarn-per-job test.jar arg1 arg2.

Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan: Multiple factories for identifier '...' that implement '...' found in the classpath?

Bagaimana cara saya mengaktifkan high availability untuk JobManager guna meningkatkan stabilitas job Flink?

Dalam kluster Dataflow, job Flink diterapkan dan dijalankan di YARN. Anda dapat mengikuti instruksi yang dijelaskan dalam bagian Konfigurasi dari dokumentasi yang disediakan oleh komunitas Apache Flink untuk mengaktifkan high availability untuk JobManager guna menerapkan pelaksanaan stabil job Flink. Contoh konfigurasi:

high-availability: zookeeper
high-availability.zookeeper.quorum: 192.168.**.**:2181,192.168.**.**:2181,192.168.**.**:2181
high-availability.zookeeper.path.root: /flink
high-availability.storageDir: hdfs:///flink/recovery
Penting

Secara default, JobManager hanya dapat di-restart sekali dalam kasus kegagalan setelah high availability diaktifkan. Jika Anda ingin mengizinkan JobManager untuk di-restart lebih dari sekali, Anda harus mengonfigurasi parameter yarn.resourcemanager.am.max-attempts untuk YARN dan parameter yarn.application-attempts untuk Flink. Untuk informasi lebih lanjut, lihat Dokumentasi Resmi Apache Flink. Untuk mencegah JobManager di-restart berulang kali, Anda dapat meningkatkan nilai parameter yarn.application-attempt-failures-validity-interval. Nilai default parameter ini adalah 10000. Unit: milidetik. Nilai default menunjukkan 10 detik. Anda dapat meningkatkan nilai parameter ini menjadi 300000, yang sama dengan 5 menit.

Bagaimana cara saya melihat metrik job Flink?

  1. Masuk ke konsol EMR dan buka tab Monitoring dari kluster yang ingin Anda kelola. Pada tab Pemantauan, klik Metric Monitoring.

  2. Pilih FLINK dari daftar drop-down Dashboard.

  3. Pilih ID aplikasi dan ID job. Metrik job Flink akan ditampilkan.

    Catatan
    • ID aplikasi dan ID job hanya tersedia ketika job Flink yang ada sedang berjalan di kluster.

    • Informasi output beberapa metrik, seperti metrik sourceIdleTime, hanya tersedia ketika sumber dan sink diatur untuk metrik tersebut.

Bagaimana cara saya memecahkan masalah yang terkait dengan penyimpanan upstream dan downstream?

Untuk informasi lebih lanjut, lihat FAQ tentang konektor.

Apa yang harus saya lakukan jika kesalahan dilaporkan ketika saya menjalankan job Flink di kluster Dataflow untuk membaca data dari dan menulis data ke OSS tanpa kata sandi?

Selesaikan kesalahan berdasarkan pesan kesalahan spesifik.

  • Pesan kesalahan: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are only supported for HDFS

    • Penyebab: Kluster Dataflow menggunakan JindoSDK bawaan untuk mengimplementasikan akses tanpa kata sandi ke Object Storage Service (OSS) dan mendukung API seperti StreamingFileSink. Anda tidak perlu melakukan konfigurasi tambahan seperti yang dijelaskan dalam dokumentasi komunitas. Jika tidak, kesalahan ini mungkin terjadi karena konflik dependensi.

    • Solusi: Periksa apakah direktori oss-fs-hadoop ada di direktori $FLINK_HOME/plugins dari node yang digunakan untuk mengirimkan job di kluster Anda. Jika direktori oss-fs-hadoop ada, hapus direktori tersebut dan kirimkan job lagi.

  • Pesan kesalahan: Could not find a file system implementation for scheme 'oss'. The scheme is directly supported by Flink through the following plugin: flink-oss-fs-hadoop. ....

    • Penyebab: Dalam kluster EMR V3.40 dan versi sebelumnya, paket JAR terkait Jindo mungkin hilang pada node selain master-1-1 di kelompok server master.

    • Solusi:

      • Dalam kluster EMR V3.40.0 atau kluster versi minor sebelum EMR V3.40.0, periksa apakah paket JAR terkait Jindo, seperti jindo-flink-4.0.0-full.jar, ada di direktori $FLINK_HOME/lib dari node yang digunakan untuk mengirimkan job di kluster Anda. Jika paket JAR terkait Jindo tidak ada, jalankan perintah berikut di kluster untuk menyalin paket JAR terkait Jindo ke direktori $FLINK_HOME/lib dan kemudian kirimkan job lagi:

        cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib
      • Versi minor setelah EMR V3.40.0

        • Mode Flink pada YARN: Mekanisme untuk akses OSS dioptimalkan. Bahkan jika paket JAR terkait Jindo tidak ada di direktori $FLINK_HOME/lib, job yang digunakan untuk membaca data dari dan menulis data ke OSS dapat berjalan seperti yang diharapkan.

        • Mode penyebaran lainnya: Periksa apakah paket JAR terkait Jindo, seperti jindo-flink-4.0.0-full.jar, ada di direktori $FLINK_HOME/lib dari node yang digunakan untuk mengirimkan job di kluster Anda. Jika paket JAR terkait Jindo tidak ada, jalankan perintah berikut di kluster untuk menyalin paket JAR terkait Jindo ke direktori $FLINK_HOME/lib dan kemudian kirimkan job lagi:

          cp /opt/apps/extra-jars/flink/jindo-flink-*-full.jar $FLINK_HOME/lib

Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id timed out?

Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan: java.lang.OutOfMemoryError: GC overhead limit exceeded?

  • Penyebab

    Pesan kesalahan ini menunjukkan bahwa garbage collection (GC) habis waktu karena memori yang dikonfigurasikan untuk job tidak mencukupi. Penyebab umum adalah bahwa kode job, seperti fungsi yang ditentukan pengguna (UDF), menyebabkan kebocoran memori atau ukuran memori tidak memenuhi persyaratan bisnis.

  • Solusi

    • Ketika Anda mengirimkan job lagi untuk mereproduksi kesalahan, gunakan parameter -D untuk menentukan parameter JVM untuk membuat heap dump saat terjadi kesalahan OOM. Contoh: -D env.java.opts="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof".

    • Tambahkan parameter env.java.opts: -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/dump.hprof ke file flink-conf.yaml untuk membuat heap dump saat terjadi kesalahan OOM.

    Setelah kesalahan direproduksi, analisis file heap dump di jalur yang ditentukan oleh parameter HeapDumpPath. Sebagai contoh, Anda dapat menggunakan Memory Analyzer Tool (MAT) atau Java VisualVM untuk menganalisis file dan mengidentifikasi penyebab utama kesalahan.

Mengapa hanya satu operator yang ditampilkan untuk sebuah job di antarmuka web Flink dan nilai metrik Records Received adalah 0?

Ini adalah situasi normal. Metrik Records Received Flink digunakan untuk menunjukkan komunikasi data antara operator yang berbeda. Jika sebuah job dioptimalkan untuk memiliki hanya satu operator, nilai metrik ini selalu 0.

Bagaimana cara saya mengaktifkan grafik nyala api untuk sebuah job Flink?

Grafik nyala api memvisualisasikan beban CPU dari setiap metode dalam proses. Ini membantu Anda menyelesaikan masalah hambatan kinerja job Flink. Fitur grafik nyala api didukung sejak Flink 1.13. Namun, untuk menghindari dampak grafik nyala api pada job di lingkungan produksi, fitur grafik nyala api dinonaktifkan secara default. Jika Anda ingin menggunakan fitur grafik nyala api untuk menganalisis kinerja job Flink, lakukan operasi berikut: Masuk ke konsol EMR dan buka tab Configure halaman layanan Flink. Pada tab Konfigurasi, klik flink-conf.yaml. Pada tab flink-conf.yaml, tambahkan item konfigurasi rest.flamegraph.enabled dan atur nilainya menjadi true. Untuk informasi lebih lanjut tentang cara menambahkan item konfigurasi, lihat Kelola item konfigurasi.

Untuk informasi lebih lanjut tentang grafik nyala api, lihat Grafik Nyala Api.

Apa yang harus saya lakukan jika pesan kesalahan berikut dikembalikan: Exception in thread "main" java.lang.NoSuchFieldError: DEPLOYMENT_MODE?

  • Penyebab

    Paket JAR job Anda berisi dependensi flink-core yang tidak kompatibel dengan versi Flink di kluster.

  • Solusi

    Tambahkan dependensi berikut ke file pom.xml. Dalam dependensi ini, parameter scope diatur ke provided.

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <!-- Ubah ke versi flink Anda sendiri -->
      <version>1.16.1</version>
      <scope>provided</scope>
    </dependency>
    Catatan

    Ubah version dalam dependensi sebelumnya ke versi Flink yang Anda gunakan.

    Untuk informasi lebih lanjut tentang bagaimana dependensi yang tidak kompatibel diperkenalkan, lihat Apa yang harus saya lakukan jika paket JAR sebuah job bertentangan dengan paket JAR Flink di dalam kluster?.