全部产品
Search
文档中心

Realtime Compute for Apache Flink:FAQ tentang pengembangan dan debugging

更新时间:Nov 10, 2025

Dokumen ini menjawab beberapa pertanyaan umum terkait pengembangan draft dan debugging penyebaran.

Bagaimana cara mendeklarasikan pernyataan DDL ketika saya mengeksekusi pernyataan DDL dan DML dalam teks yang sama?

Saat mengeksekusi pernyataan DDL dan DML dalam teks yang sama, gunakan CREATE TEMPORARY TABLE alih-alih CREATE TABLE dalam pernyataan DDL. Jika tidak, pesan kesalahan seperti pada gambar berikut akan muncul setelah Anda mengklik Validate.

image

Bagaimana cara menulis beberapa pernyataan INSERT INTO?

Tulis pernyataan INSERT INTO di antara BEGIN STATEMENT SET; dan END; untuk membentuk satu unit logis. Untuk informasi selengkapnya, lihat INSERT INTO. Jika tidak, pesan kesalahan seperti pada gambar berikut akan muncul setelah Anda mengklik Validate.

image

Apa yang harus saya lakukan jika ingin menentukan karakter khusus di nilai parameter Entry Point Main Arguments?

  • Penyebab

    Jika nilai parameter Entry Point Main Arguments mengandung karakter khusus seperti tanda pagar (#) atau tanda dolar ($), karakter tersebut tidak dapat diidentifikasi setelah diloloskan dengan backslash (\). Akibatnya, karakter tersebut dibuang.

  • Solusi

    Pada halaman Deployments, klik nama penerapan yang diinginkan. Di bagian Parameters pada tab Konfigurasi, tambahkan konfigurasi env.java.opts: -Dconfig.disable-inline-comment=true ke kolom Other Configuration. Untuk informasi selengkapnya, lihat Bagaimana cara mengonfigurasi parameter kustom untuk menjalankan penerapan?.

Mengapa paket JAR UDF gagal diunggah setelah paket dimodifikasi berkali-kali?

  • Penyebab

    Paket JAR yang ingin diunggah mengandung kelas dengan nama yang sama dengan kelas dalam paket JAR yang sudah ada, menyebabkan konflik fungsi terdefinisi pengguna (UDF).

  • Solusi

    • Hapus paket JAR lama dan unggah kembali paket tersebut.

    • Di konsol pengembangan Realtime Compute for Apache Flink, tambahkan pernyataan berikut ke editor SQL untuk menggunakan fungsi sementara sebelum mengunggah paket JAR. Lalu, klik ikon Unggah untuk mengunggah paket JAR di bagian Additional Dependencies tab Configurations pada halaman ETL. Untuk informasi lebih lanjut tentang cara menggunakan fungsi sementara, lihat Daftarkan UDF.

      CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';

      image

Mengapa bidang tidak selaras ketika saya menggunakan kelas POJO sebagai tipe data untuk nilai balik UDTF?

  • Deskripsi Masalah

    Jika kelas Plain Old Java Object (POJO) digunakan sebagai tipe data untuk nilai balik fungsi tabel bernilai pengguna (UDTF) dan nama alias dari bidang yang dikembalikan oleh UDTF dideklarasikan secara eksplisit dalam pernyataan SQL, bidang mungkin tidak selaras. Dalam kasus ini, bidang yang digunakan mungkin tidak memenuhi persyaratan meskipun tipe datanya konsisten.

    Sebagai contoh, kegagalan validasi SQL terjadi dalam situasi berikut: Anda menggunakan kelas POJO berikut sebagai tipe data untuk nilai balik UDTF, mengemas UDTF sesuai dengan persyaratan yang dijelaskan dalam Ikhtisar, dan mendaftarkan UDTF sesuai dengan persyaratan yang dijelaskan dalam Daftarkan UDF tingkat penyebaran.

    package com.aliyun.example;
    
    public class TestPojoWithoutConstructor {
    	public int c;
    	public String d;
    	public boolean a;
    	public String b;
    }
    package com.aliyun.example;
    
    import org.apache.flink.table.functions.TableFunction;
    
    public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> {
    	private static final long serialVersionUID = 1L;
    
    	public void eval(String str1, Integer i2) {
    		TestPojoWithoutConstructor p = new TestPojoWithoutConstructor();
    		p.d = str1 + "_d";
    		p.c = i2 + 2;
    		p.b = str1 + "_b";
    		collect(p);
    	}
    }
    CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor';
    
    CREATE TEMPORARY TABLE src ( 
      id STRING,
      cnt INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE sink ( 
      f1 INT,
      f2 STRING,
      f3 BOOLEAN,
      f4 STRING
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink
    SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);

    Pesan kesalahan berikut untuk validasi SQL dilaporkan:

    org.apache.flink.table.api.ValidationException: Validasi SQL gagal. Tipe kolom hasil query dan sink untuk 'vvp.default.sink' tidak cocok.
    Penyebab: Kolom sink 'f1' di posisi 0 bertipe INT tetapi ekspresi dalam query bertipe BOOLEAN NOT NULL.
    Petunjuk: Anda perlu menulis ulang atau melempar ekspresi.
    
    Skema Query: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING]
    Skema Sink:  [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING]
    	at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)

    Dalam contoh ini, bidang yang dikembalikan dari UDTF dan bidang dalam kelas POJO mungkin tidak selaras. Dalam pernyataan SQL, Bidang c bertipe BOOLEAN, sedangkan Bidang a bertipe INT, yang merupakan kebalikan dari tipe data yang ditentukan oleh kelas POJO.

  • Penyebab

    Urutan bidang yang dikembalikan bervariasi berdasarkan konstruktor berparameter dari kelas POJO:

    • Jika kelas POJO mengimplementasikan konstruktor berparameter, bidang diurutkan berdasarkan urutan parameter dari konstruktor berparameter.

    • Jika kelas POJO mengimplementasikan konstruktor berparameter, bidang akan diurutkan sesuai dengan urutan parameter pada konstruktor tersebut.

    Dalam contoh ini, kelas POJO tidak mengimplementasikan konstruktor berparameter. Akibatnya, tipe data untuk nilai kembali dari UDTF adalah BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d). Tidak ada kesalahan dalam contoh sebelumnya. Namun, daftar penggantian nama LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b) ditambahkan ke bidang yang dikembalikan dalam pernyataan SQL. Bidang-bidang yang dikembalikan diganti namanya berdasarkan posisi. Akibatnya, bidang tidak sejajar saat menggunakan kelas POJO, sehingga menyebabkan kesalahan verifikasi atau ketidaksesuaian data yang tidak diharapkan.

  • Solusi

    • Jika kelas POJO tidak mengimplementasikan konstruktor berparameter, jangan ubah nama secara eksplisit bidang yang dikembalikan oleh UDTF. Sebagai contoh, Anda dapat mengubah klausa SELECT dalam pernyataan INSERT sebelumnya menjadi klausa SELECT berikut:

      -- Jika kelas POJO tidak mengimplementasikan konstruktor berparameter, kami sarankan Anda memilih nama bidang yang diperlukan. Saat menggunakan T.*, Anda harus mengetahui urutan aktual dari bidang yang dikembalikan. 
      SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
    • Implementasikan konstruktor berparameter dalam kelas POJO untuk menentukan urutan bidang yang dikembalikan. Dalam kasus ini, urutan bidang yang dikembalikan adalah urutan parameter dari konstruktor berparameter.

      package com.aliyun.example;
      
      public class TestPojoWithConstructor {
      	public int c;
      	public String d;
      	public boolean a;
      	public String b;
      
      	// Menggunakan urutan bidang tertentu alih-alih urutan abjad
      	public TestPojoWithConstructor(int c, String d, boolean a, String b) {
      		this.c = c;
      		this.d = d;
      		this.a = a;
      		this.b = b;
      	}
      }

Bagaimana cara menganalisis konflik dependensi Realtime Compute for Apache Flink?

  • Deskripsi Masalah

    • Kesalahan dilaporkan, yang disebabkan oleh pengecualian terkait Realtime Compute for Apache Flink atau Hadoop.

      java.lang.AbstractMethodError
      java.lang.ClassNotFoundException
      java.lang.IllegalAccessError
      java.lang.IllegalAccessException
      java.lang.InstantiationError
      java.lang.InstantiationException
      java.lang.InvocationTargetException
      java.lang.NoClassDefFoundError
      java.lang.NoSuchFieldError
      java.lang.NoSuchFieldException
      java.lang.NoSuchMethodError
      java.lang.NoSuchMethodException
    • Tidak ada kesalahan yang dilaporkan, tetapi salah satu dari masalah berikut terjadi:

      • Log tidak dihasilkan atau konfigurasi Log4j tidak berlaku.

        Dalam banyak kasus, masalah ini terjadi karena dependensi mengandung konfigurasi Log4j. Untuk menyelesaikan masalah ini, Anda harus memeriksa apakah dependensi dalam file JAR penyebaran Anda mengandung konfigurasi Log4j. Jika dependensi mengandung konfigurasi Log4j, Anda dapat mengonfigurasi pengecualian dalam dependensi untuk menghapus konfigurasi Log4j.

        Catatan

        Jika Anda menggunakan versi Log4j yang berbeda, Anda harus menggunakan maven-shade-plugin untuk merelokasi kelas terkait Log4j.

      • Pemanggilan prosedur jarak jauh (RPC) gagal.

        Secara default, kesalahan yang disebabkan oleh konflik dependensi selama RPC Akka Realtime Compute for Apache Flink tidak dicatat dalam log. Untuk memeriksa kesalahan ini, Anda harus mengaktifkan logging debug.

        Sebagai contoh, log debug mencatat Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}. Namun, log JobManager berhenti pada pesan Registering TaskManager with ResourceID xxx dan tidak menampilkan informasi apa pun hingga permintaan sumber daya melebihi batas waktu dan menampilkan pesan NoResourceAvailableException. Selain itu, Pengelola Tugas terus-menerus melaporkan pesan kesalahan Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}.

        Penyebab: Setelah Anda mengaktifkan logging debug, muncul pesan kesalahan RPC InvocationTargetException. Dalam kasus ini, slot gagal dialokasikan untuk Pengelola Tugas dan status Pengelola Tugas menjadi tidak konsisten. Akibatnya, slot tidak dapat dialokasikan dan kesalahan tidak dapat diperbaiki.

  • Penyebab

    • Paket JAR penyebaran Anda mengandung dependensi yang tidak perlu, seperti dependensi untuk konfigurasi dasar, Realtime Compute for Apache Flink, Hadoop, dan Log4j. Akibatnya, konflik dependensi terjadi dan menyebabkan beberapa masalah.

    • Dependensi yang sesuai dengan konektor yang diperlukan untuk penyebaran Anda tidak termasuk dalam paket JAR.

  • Metode Identifikasi

    • Periksa apakah file pom.xml penyebaran Anda mengandung dependensi yang tidak perlu.

    • Jalankan perintah jar tf foo.jar untuk melihat isi paket JAR dan menentukan apakah paket tersebut berisi konten yang menyebabkan konflik dependensi.

    • Jalankan perintah mvn dependency:tree untuk memeriksa hubungan dependensi penerapan Anda dan menentukan apakah terdapat konflik dependensi.

  • Solusi

    • Kami sarankan Anda menyetel scope ke provided untuk dependensi konfigurasi dasar. Dengan cara ini, dependensi konfigurasi dasar tidak termasuk dalam paket JAR penyebaran Anda.

      • DataStream Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataStream Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    • Tambahkan dependensi yang sesuai dengan konektor yang diperlukan untuk penyebaran, dan setel scope ke compile. Dengan cara ini, dependensi yang sesuai dengan konektor yang diperlukan termasuk dalam paket JAR. Nilai default dari scope adalah compile. Dalam kode berikut, konektor Kafka digunakan sebagai contoh.

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
    • Kami sarankan Anda tidak menambahkan dependensi untuk Realtime Compute for Apache Flink, Hadoop, atau Log4j. Perhatikan pengecualian berikut:

      • Jika penyebaran memiliki dependensi langsung untuk konfigurasi dasar atau konektor, kami sarankan Anda menyetel scope ke provided. Contoh kode:

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <scope>provided</scope>
        </dependency>
      • Jika penyebaran memiliki dependensi tidak langsung untuk konfigurasi dasar atau konektor, kami sarankan Anda mengonfigurasi pengecualian untuk menghapus dependensi. Contoh kode:

        <dependency>
            <groupId>foo</groupId>
              <artifactId>bar</artifactId>
              <exclusions>
                <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
               </exclusion>
            </exclusions>
        </dependency>

Apa yang harus saya lakukan jika pesan kesalahan "Could not parse type at position 50: expected but was . Input type string: ROW" muncul?

  • Deskripsi Masalah

    Ketika Anda menulis pernyataan SQL yang mencakup UDTF di editor SQL, kesalahan pemeriksaan sintaksis terjadi. Kesalahan sintaksis digarisbawahi dengan garis bergelombang merah.

    Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>

    Contoh kode:

    @FunctionHint(
        //input = @DataTypeHint("BYTES"),
        output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>"))
    public class PointChangeMetaQPaser1 extends TableFunction<Row> {
    
        Logger logger = LoggerFactory.getLogger(this.getClass().getName());
    
        public void eval(byte[] bytes) {
            try {
                String messageBody = new String(bytes, "UTF-8");
                Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class);
                logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO));
    
                collect(Row.of(
                        getString(resultDO.get("resultId")),
                        getString(resultDO.get("pointRange")),
                        getString(resultDO.get("from")),
                        getString(resultDO.get("to")),
                        getString(resultDO.get("type")),
                        getString(resultDO.get("pointScope")),
                        getString(resultDO.get("userId")),
                        getString(resultDO.get("point")),
                        getString(resultDO.getOrDefault("triggerSource", "NULL")),
                        getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))),
                        getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID())))
                ));
            } catch (Exception e) {
                logger.error("PointChangeMetaQPaser1 error", e);
            }
        }
    
        private String getString(Object o) {
            if (o == null) {
                return null;
            }
            return String.valueOf(o);
        }
    }
  • Penyebab

    Ketika Anda menggunakan anotasi @DataTypeHint untuk menentukan tipe data nilai input dan output untuk UDTF, kata kunci yang dicadangkan oleh sistem digunakan sebagai nama bidang.

  • Solusi

    • Ubah nama bidang menjadi nama yang berbeda dari kata kunci tersebut. Misalnya, jika kata kuncinya adalah to, ubah nama bidang menjadi fto; jika kata kuncinya adalah from, ubah menjadi ffrom.

    • Tutup nama bidang yang namanya sama dengan kata kunci yang dicadangkan dalam sepasang tanda kutip grave (``).

Mengapa saya mendapatkan kesalahan "Invalid primary key. Column 'xxx' is nullable." saat menulis ke tabel?

  • Penyebab

    Ini adalah validasi ketat oleh Flink terkait semantik kunci primer. Flink mengharuskan semua kolom kunci primer dideklarasikan secara eksplisit sebagai NOT NULL. Bahkan jika data tersebut tidak berisi nilai NULL, Flink akan menolak operasi tersebut selama fase penguraian DDL jika kolom kunci primer dalam definisi tabel mengizinkan NULL (misalnya, INT NULL). Ini merupakan pemeriksaan semantik selama penguraian DDL, bukan kesalahan waktu proses.

  • Solusi

    Buat ulang tabel dengan mendeklarasikan kolom kunci primer yang terlibat dalam kesalahan sebagai NOT NULL.

Mengapa file JSON saya ditampilkan di browser alih-alih langsung diunduh?

  • Deskripsi

    Ketika Anda mencoba mengunduh file JSON dari halaman Artifacts, file tersebut terbuka di tab browser baru untuk dilihat alih-alih memulai unduhan langsung.

  • Penyebab

    File JSON di OSS tidak memiliki header respons HTTP Content-Disposition: attachment. Browser mengartikan ketidakhadiran header ini sebagai sinyal untuk merender konten secara langsung.

  • Solusi

    • Metode 1: Unggah ulang file JSON

      Bug ini telah diperbaiki untuk file yang diunggah mulai Mei 2025. Untuk file yang diunggah sebelum tanggal tersebut, unggah ulang file tersebut untuk menerapkan perbaikan.

    • Metode 2: Perbarui metadata objek OSS

      Tambahkan secara manual header Content-Disposition ke metadata objek OSS Anda

      • Nama header: Content-Disposition

      • Nilai header: attachment

      Untuk informasi selengkapnya, lihat Mengelola metadata objek.