全部產品
Search
文件中心

Realtime Compute for Apache Flink:開發及調試

更新時間:Nov 06, 2025

本文為您介紹開發及調試有關的常見問題。

在DDL和DML同在一個文本中提交運行時,DDL需要怎麼聲明?

在DDL和DML同在一個文本中提交運行時,DDL需要聲明為CREATE TEMPORARY TABLE,而不是聲明為CREATE TABLE。否則單擊深度檢查後,出現報錯詳情如下。

image

多個INSERT INTO語句需要怎麼寫?

將多個INSERT INTO語句寫在BEGIN STATEMENT SET;END;之間組成一個邏輯單元。詳情請參見INSERT INTO語句。否則單擊深度檢查後,驗證報錯詳情如下。

image

使用Entry point Main Arguments傳參數時,需要傳特殊字元,應該如何處理?

  • 問題原因

    使用Entry Point Main Arguments傳參時,需要傳特殊字元時,例如#$,使用反斜線(\)轉義也無法識別,特殊字元出現會被丟棄。

  • 解決方案

    作業營運頁面單擊目標作業名稱,在運行參數配置地區其他配置中添加參數env.java.opts: -Dconfig.disable-inline-comment=true,具體操作請參見如何配置自訂的作業運行參數?

為什麼相同UDF JAR包在經過多次修改後上傳失敗?

  • 問題原因

    因為UDF裡面限制了JAR包之間的類名不能重複。

  • 解決方案

    • 刪除後重新上傳。

    • 在附加依賴檔案中上傳JAR包,並在代碼中使用臨時函數。臨時函數使用方式詳情請參見註冊UDF。樣本如下。

      CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';

      image

為什麼使用POJO類作為UDTF傳回型別時欄位會出現“錯位”?

  • 問題詳情

    當使用POJO類作為UDTF傳回型別,並在SQL中顯式聲明了UDTF返回列的別名列表(Alias Name)時,可能會出現欄位錯位(即使類型一致,但實際使用的欄位可能與預期不符)問題。

    例如,如果使用如下POJO類作為UDTF的傳回型別,並根據自訂函數開發的要求進行打包並完成函數註冊(這裡使用作業級自訂函數註冊方式)後,SQL校正會失敗。

    package com.aliyun.example;
    
    public class TestPojoWithoutConstructor {
    	public int c;
    	public String d;
    	public boolean a;
    	public String b;
    }
    package com.aliyun.example;
    
    import org.apache.flink.table.functions.TableFunction;
    
    public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> {
    	private static final long serialVersionUID = 1L;
    
    	public void eval(String str1, Integer i2) {
    		TestPojoWithoutConstructor p = new TestPojoWithoutConstructor();
    		p.d = str1 + "_d";
    		p.c = i2 + 2;
    		p.b = str1 + "_b";
    		collect(p);
    	}
    }
    CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor';
    
    CREATE TEMPORARY TABLE src ( 
      id STRING,
      cnt INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE sink ( 
      f1 INT,
      f2 STRING,
      f3 BOOLEAN,
      f4 STRING
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink
    SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);

    SQL校正報錯資訊如下:

    org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match.
    Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL.
    Hint: You will need to rewrite or cast the expression.
    
    Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING]
    Sink schema:  [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING]
    	at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)

    看起來從UDTF返回的欄位和POJO類中的欄位可能錯位了,SQL中欄位c最終是BOOLEAN,而欄位a是INT類型,和POJO類的定義恰好相反。

  • 問題原因

    根據POJO類的類型規則:

    • 如果POJO類實現了有參建構函式,推導的傳回型別會按建構函式的參數列表順序。

    • 如果POJO類缺少有參建構函式,就會按欄位名的字典序重新排列。

    在上述樣本中,由於UDTF傳回型別缺少有參建構函式,因此對應的傳回型別為BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)。雖然這一步並沒有產生錯誤,但因為SQL中對返回欄位加了重新命名列表LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b),這導致對推匯出的類型顯式進行了重新命名(基於欄位位置進行映射),進而引發與POJO類中的欄位錯位問題,出現校正異常或非預期的資料錯位問題。

  • 解決方案

    • POJO類缺少有參建構函式時,去掉對UDTF返回欄位的顯式重新命名,如將上述SQL的INSERT語句改為:

      -- POJO類無有參建構函式時,推薦明確選取需要的欄位名,使用 T.* 時需要明確知曉實際返回的欄位順序。
      SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
    • POJO類實現有參建構函式,以確定傳回型別的欄位順序。這種情況下UDTF傳回型別的欄位順序就是有參建構函式的參數順序。

      package com.aliyun.example;
      
      public class TestPojoWithConstructor {
      	public int c;
      	public String d;
      	public boolean a;
      	public String b;
      
      	// Using specific fields order instead of alphabetical order
      	public TestPojoWithConstructor(int c, String d, boolean a, String b) {
      		this.c = c;
      		this.d = d;
      		this.a = a;
      		this.b = b;
      	}
      }

如何解決Flink依賴衝突問題?

  • 問題現象

    • 有明顯報錯,且引發錯誤的為Flink或Hadoop相關類。

      java.lang.AbstractMethodError
      java.lang.ClassNotFoundException
      java.lang.IllegalAccessError
      java.lang.IllegalAccessException
      java.lang.InstantiationError
      java.lang.InstantiationException
      java.lang.InvocationTargetException
      java.lang.NoClassDefFoundError
      java.lang.NoSuchFieldError
      java.lang.NoSuchFieldException
      java.lang.NoSuchMethodError
      java.lang.NoSuchMethodException
    • 無明顯報錯,但會引起一些不符合預期的現象,例如:

      • 日誌不輸出或log4j配置不生效。

        該類問題通常是由於依賴中攜帶了log4j相關配置導致的。需要排查作業JAR包中是否引入了log4j配置的依賴,可以通過在dependency中配置exclusions的方式去掉log4j配置。

        說明

        如果必須要使用不同版本的log4j,需要使用maven-shade-plugin將log4j相關的類進行relocation。

      • RPC調用異常。

        Flink的Akka RPC調用出現依賴衝突可能導致的異常,預設不會顯示在日誌中,需要開啟Debug日誌進行確認。

        例如,Debug日誌中出現Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx},但是JM日誌在Registering TaskManager with ResourceID xxx後,沒有下文,直到資源請求逾時報錯NoResourceAvailableException。此外TM持續報錯Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}

        原因:開啟Debug日誌後,發現RPC調用報錯InvocationTargetException,該報錯導致TM Slot分配到一半失敗出現狀態不一致,RM持續嘗試分配Slot失敗無法恢複。

  • 問題原因

    • 作業JAR包中包含了不必要的依賴(例如基本配置、Flink、Hadoop和log4j依賴),造成依賴衝突從而引發各種問題。

    • 作業需要的Connector對應的依賴未被打入JAR包中。

  • 排查方法

    • 查看作業pom.xml檔案,判斷是否存在不必要的依賴。

    • 通過jar tf foo.jar命令查看作業JAR包內容,判斷是否存在引發依賴衝突的內容。

    • 通過mvn dependency:tree命令查看作業的依賴關係,判斷是否存在衝突的依賴。

  • 解決方案

    • 基本配置建議將scope全部設定為provided,即不打入作業JAR包。

      • DataStream Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataStream Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    • 添加作業需要的Connector對應的依賴,並將scope全部設定為compile(預設的scope是compile),即打入作業JAR包中,以Kafka Connector為例,代碼如下。

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
    • 其他Flink、Hadoop和log4j依賴不建議添加。但是:

      • 如果作業本身存在基本配置或Connector相關的直接依賴,建議將scope設定為provided,樣本如下。

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <scope>provided</scope>
        </dependency>
      • 如果作業存在基本配置或Connector相關的間接依賴,建議通過exclusion將依賴去掉,樣本如下。

        <dependency>
            <groupId>foo</groupId>
              <artifactId>bar</artifactId>
              <exclusions>
                <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
               </exclusion>
            </exclusions>
        </dependency>

報錯:Could not parse type at position 50: expected but was . Input type string: ROW

  • 報錯詳情

    在SQL編輯器中編寫SQL時,使用UDTF出現語法檢查錯誤(紅色波浪線)。

    Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>

    代碼如下:

    @FunctionHint(
        //input = @DataTypeHint("BYTES"),
        output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>"))
    public class PointChangeMetaQPaser1 extends TableFunction<Row> {
    
        Logger logger = LoggerFactory.getLogger(this.getClass().getName());
    
        public void eval(byte[] bytes) {
            try {
                String messageBody = new String(bytes, "UTF-8");
                Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class);
                logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO));
    
                collect(Row.of(
                        getString(resultDO.get("resultId")),
                        getString(resultDO.get("pointRange")),
                        getString(resultDO.get("from")),
                        getString(resultDO.get("to")),
                        getString(resultDO.get("type")),
                        getString(resultDO.get("pointScope")),
                        getString(resultDO.get("userId")),
                        getString(resultDO.get("point")),
                        getString(resultDO.getOrDefault("triggerSource", "NULL")),
                        getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))),
                        getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID())))
                ));
            } catch (Exception e) {
                logger.error("PointChangeMetaQPaser1 error", e);
            }
        }
    
        private String getString(Object o) {
            if (o == null) {
                return null;
            }
            return String.valueOf(o);
        }
    }
  • 報錯原因

    當使用DataTypeHint定義函數的資料類型時,系統保留的關鍵字被直接作為了欄位名稱。

  • 解決方案

    • 將變數名換成非關鍵字的名稱,例如to換成fto,from換成ffrom等。

    • 將已經用關鍵字取名的變數名加上反撇號(``)。

寫入表時報錯:“Invalid primary key. Column 'xxx' is nullable.”

  • 報錯原因

    這是 Flink 對主鍵語義的強制校正。Flink 要求:所有主鍵列必須顯式聲明為 NOT NULL。即使資料中沒有 NULL 值,只要建表語句中主鍵列允許 NULL(如 INT NULL),Flink 就會在寫入前拒絕操作。這不是執行階段錯誤,而是 DDL 解析階段的語義檢查。

  • 解決方案

    將報錯中涉及的主鍵列聲明為NOT NULL後重建立表。

JSON 檔案下載時在瀏覽器中開啟而非直接下載

  • 問題現象

    在檔案管理介面點擊下載 JSON 檔案時,瀏覽器未觸發下載,而是新開標籤頁直接展示 JSON 內容。

  • 問題原因

    OSS 中該 JSON 檔案缺少Content-Disposition: attachmentHTTP 回應標頭,導致瀏覽器按預設行為將其作為可渲染內容直接顯示。

  • 解決方案

    • 方案一:重新上傳檔案

      該問題已在平台 4.5.0 版本修複,但僅對 2025 年 5 月之後上傳的檔案生效。此前上傳的檔案仍需手動處理。

    • 方案二:修改 OSS 對象中繼資料

      手動為對應 Object 添加如下 HTTP 標準屬性:

      • Header 名稱:Content-Disposition

      • Header 值:attachment

      詳情請參見管理檔案中繼資料