すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:開発とデバッグ

最終更新日:Nov 09, 2025

このトピックでは、開発とデバッグに関するよくある質問への回答を提供します。

同じスクリプトで DDL 文と DML 文を実行する場合、DDL 文をどのように宣言すればよいですか?

同じスクリプトで DDL 文と DML 文を実行する場合、DDL 文は CREATE TEMPORARY TABLE として宣言する必要があり、CREATE TABLE ではありません。そうしないと、[検証] をクリックしたときにエラーが返されます。次の図はエラーの詳細を示しています。

image

複数の INSERT INTO 文を記述するにはどうすればよいですか?

複数の INSERT INTO 文は、論理的な単位を形成するために BEGIN STATEMENT SET;END; の間に記述する必要があります。詳細については、「INSERT INTO 文」をご参照ください。そうしないと、[検証] をクリックしたときに検証エラーが発生します。次の図はエラーの詳細を示しています。

image

エントリポイントの main 引数として特殊文字を渡すにはどうすればよいですか?

  • 原因

    エントリポイントの main 引数を使用してパラメーターを渡す場合、# や $ などの特殊文字は破棄されます。バックスラッシュ (\) を使用してエスケープしても識別できません。

  • 解決策

    [ジョブ O&M] ページで、対象のジョブの名前をクリックします。[実行時パラメーター設定] セクションで、env.java.opts: -Dconfig.disable-inline-comment=true パラメーターを [その他の設定] に追加します。詳細については、「カスタムジョブ実行時パラメーターを設定する方法」をご参照ください。

UDF JAR パッケージを複数回変更した後にアップロードが失敗するのはなぜですか?

  • 原因

    ユーザー定義関数 (UDF) には、JAR パッケージ間でクラス名を重複できないという制限があります。

  • 解決策

    • 既存のパッケージを削除して、再度アップロードします。

    • JAR パッケージを追加の依存関係としてアップロードし、コードで一時的な関数を使用します。一時的な関数の使用方法の詳細については、「UDF の登録」をご参照ください。例:

      CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';

      image

POJO クラスを UDTF の戻り値の型として使用すると、フィールドがずれるのはなぜですか?

  • 問題の詳細

    Plain Old Java Object (POJO) クラスをユーザー定義のテーブル値関数 (UDTF) の戻り値の型として使用し、SQL で返される列のエイリアスリストを明示的に宣言すると、フィールドの不整合が発生する可能性があります。これは、データ型が一致していても、使用されるフィールドが期待したものではない可能性があることを意味します。

    たとえば、次の POJO クラスを UDTF の戻り値の型として使用し、「ユーザー定義関数の開発」の要件に従ってパッケージ化し、「ジョブレベルのユーザー定義関数」として登録すると、SQL の検証は失敗します。

    package com.aliyun.example;
    
    public class TestPojoWithoutConstructor {
    	public int c;
    	public String d;
    	public boolean a;
    	public String b;
    }
    package com.aliyun.example;
    
    import org.apache.flink.table.functions.TableFunction;
    
    public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> {
    	private static final long serialVersionUID = 1L;
    
    	public void eval(String str1, Integer i2) {
    		TestPojoWithoutConstructor p = new TestPojoWithoutConstructor();
    		p.d = str1 + "_d";
    		p.c = i2 + 2;
    		p.b = str1 + "_b";
    		collect(p);
    	}
    }
    CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor';
    
    CREATE TEMPORARY TABLE src ( 
      id STRING,
      cnt INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE sink ( 
      f1 INT,
      f2 STRING,
      f3 BOOLEAN,
      f4 STRING
    ) WITH (
     'connector' = 'print'
    );
    
    INSERT INTO sink
    SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);

    次の SQL 検証エラーが報告されます:

    org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match.
    Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL.
    Hint: You will need to rewrite or cast the expression.
    
    Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING]
    Sink schema:  [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING]
    	at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)

    UDTF から返されたフィールドが POJO クラスのフィールドとずれているように見えます。SQL では、フィールド `c` は BOOLEAN で、フィールド `a` は INT であり、これは POJO クラスでの定義とは逆です。

  • 原因

    POJO クラスの型ルールによると:

    • POJO クラスにパラメーター化されたコンストラクターがある場合、推論される戻り値の型はコンストラクターのパラメーターの順序に従います。

    • POJO クラスにパラメーター化されたコンストラクターがない場合、フィールドはフィールド名の辞書順に並べ替えられます。

    この例では、UDTF の戻り値の型のための POJO クラスには、パラメーター化されたコンストラクターがありません。したがって、対応する戻り値の型は BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d) です。このステップではエラーは発生しませんが、SQL 文には返されるフィールドの名前変更リストが含まれています: LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b)。これにより、フィールドの位置に基づいて推論された型が明示的に名前変更され、POJO クラスのフィールドとの不一致が発生します。この不一致は、検証例外や予期しないデータの不整合につながる可能性があります。

  • 解決策

    • POJO クラスにパラメーター化されたコンストラクターがない場合は、UDTF の返されたフィールドの明示的な名前変更を削除します。たとえば、前の SQL の INSERT 文を次のように変更します:

      -- POJO クラスにパラメーター化されたコンストラクターがない場合は、必要なフィールド名を明示的に選択します。T.* を使用する場合は、返されるフィールドの実際の順序を知る必要があります。
      SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
    • POJO クラスにパラメーター化されたコンストラクターを実装して、戻り値の型のフィールドの順序を定義します。この場合、UDTF の戻り値の型のフィールド順序は、コンストラクターのパラメーター順序と一致します。

      package com.aliyun.example;
      
      public class TestPojoWithConstructor {
      	public int c;
      	public String d;
      	public boolean a;
      	public String b;
      
      	// アルファベット順ではなく、特定のフィールド順を使用する
      	public TestPojoWithConstructor(int c, String d, boolean a, String b) {
      		this.c = c;
      		this.d = d;
      		this.a = a;
      		this.b = b;
      	}
      }

Flink の依存関係の競合を解決するにはどうすればよいですか?

  • 症状

    • 明示的なエラーが報告され、そのエラーは Flink または Hadoop 関連のクラスが原因です。

      java.lang.AbstractMethodError
      java.lang.ClassNotFoundException
      java.lang.IllegalAccessError
      java.lang.IllegalAccessException
      java.lang.InstantiationError
      java.lang.InstantiationException
      java.lang.InvocationTargetException
      java.lang.NoClassDefFoundError
      java.lang.NoSuchFieldError
      java.lang.NoSuchFieldException
      java.lang.NoSuchMethodError
      java.lang.NoSuchMethodException
    • 明示的なエラーは報告されませんが、次のような予期しない動作が発生します:

      • ログが生成されない、または log4j の設定が有効にならない。

        この種の問題は通常、依存関係に含まれる log4j 関連の設定が原因です。ジョブの JAR パッケージに log4j の設定を導入する依存関係があるかどうかを確認してください。依存関係の除外を使用して log4j の設定を削除できます。

        説明

        異なるバージョンの log4j を使用する必要がある場合は、maven-shade-plugin を使用して log4j 関連のクラスを再配置します。

      • リモートプロシージャコール (RPC) の呼び出し例外。

        Flink の Akka RPC 呼び出しにおける依存関係の競合によって引き起こされる例外は、デフォルトではログに表示されません。これらを確認するには、デバッグログを有効にする必要があります。

        たとえば、デバッグログには Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx} と表示されますが、Registering TaskManager with ResourceID xxx の後、JobManager (JM) のログには、リソースリクエストのタイムアウトエラー NoResourceAvailableException が発生するまで、それ以上のメッセージは表示されません。さらに、TaskManager (TM) は継続的にエラー Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx} を報告します。

        原因: デバッグログを有効にすると、RPC 呼び出し中に InvocationTargetException エラーが発生することがわかります。このエラーにより、TM のスロット割り当てが途中で失敗し、不整合な状態になります。ResourceManager (RM) は継続的にスロットの割り当てに失敗し、回復できません。

  • 原因

    • ジョブの JAR パッケージには、基本設定、Flink、Hadoop、log4j の依存関係など、不要な依存関係が含まれています。これにより、依存関係の競合やさまざまな問題が発生します。

    • ジョブで必要なコネクタの依存関係が JAR パッケージに含まれていません。

  • トラブルシューティング方法

    • ジョブの pom.xml ファイルをチェックして、不要な依存関係が含まれていないかを確認します。

    • jar tf foo.jar コマンドを実行して、ジョブの JAR パッケージの内容を表示し、依存関係の競合を引き起こす可能性のある内容をチェックします。

    • mvn dependency:tree コマンドを実行して、ジョブの依存関係ツリーを表示し、競合する依存関係をチェックします。

  • 解決策

    • 基本設定については、スコープを provided に設定して、ジョブの JAR ファイルにパッケージ化されないようにします。

      • DataStream Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataStream Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Java

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
      • DataSet Scala

        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    • ジョブが必要とするコネクタの依存関係を追加します。それらのスコープを compile (デフォルトのスコープ) に設定して、ジョブの JAR ファイルにパッケージ化されるようにします。次のコードは、Kafka コネクタを例として使用しています。

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>${flink.version}</version>
      </dependency>
    • 他の Flink、Hadoop、および log4j の依存関係は追加しないでください。ただし:

      • ジョブが基本設定またはコネクタに直接依存している場合は、そのスコープを provided に設定します。例:

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <scope>provided</scope>
        </dependency>
      • ジョブが基本設定またはコネクタに間接的に依存している場合は、除外を使用して依存関係を削除します。例:

        <dependency>
            <groupId>foo</groupId>
              <artifactId>bar</artifactId>
              <exclusions>
                <exclusion>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
               </exclusion>
            </exclusions>
        </dependency>

エラー: 位置 50 で型を解析できませんでした: 期待値: 、実際値: 。入力文字列: ROW

  • エラーの詳細

    SQL エディターで SQL を記述すると、UDTF に対して構文チェックエラー (赤い波線) が発生します。

    Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>

    サンプルコード:

    @FunctionHint(
        //input = @DataTypeHint("BYTES"),
        output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>"))
    public class PointChangeMetaQPaser1 extends TableFunction<Row> {
    
        Logger logger = LoggerFactory.getLogger(this.getClass().getName());
    
        public void eval(byte[] bytes) {
            try {
                String messageBody = new String(bytes, "UTF-8");
                Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class);
                logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO));
    
                collect(Row.of(
                        getString(resultDO.get("resultId")),
                        getString(resultDO.get("pointRange")),
                        getString(resultDO.get("from")),
                        getString(resultDO.get("to")),
                        getString(resultDO.get("type")),
                        getString(resultDO.get("pointScope")),
                        getString(resultDO.get("userId")),
                        getString(resultDO.get("point")),
                        getString(resultDO.getOrDefault("triggerSource", "NULL")),
                        getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))),
                        getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID())))
                ));
            } catch (Exception e) {
                logger.error("PointChangeMetaQPaser1 error", e);
            }
        }
    
        private String getString(Object o) {
            if (o == null) {
                return null;
            }
            return String.valueOf(o);
        }
    }
  • 原因

    DataTypeHint を使用して関数のデータ型を定義するときに、システムの予約キーワードがフィールド名として使用されています。

  • 解決策

    • 変数名をキーワード以外の名前に変更します。たとえば、`to` を `fto` に、`from` を `ffrom` に変更します。

    • 変数名として使用されるキーワードをバックティック (``) で囲みます。

テーブルへの書き込み時のエラー: "Invalid primary key. Column 'xxx' is nullable."

  • 原因

    これは Flink によるプライマリキーのセマンティクスの必須チェックです。Flink は、すべてのプライマリキー列を明示的に NOT NULL として宣言することを要求します。データに NULL 値が含まれていなくても、テーブル定義のプライマリキー列が INT NULL のように NULL 値を許可している場合、Flink は書き込み前に操作を拒否します。これは、実行時エラーではなく、DDL 解析フェーズでのセマンティックチェックです。

  • 解決策

    エラーで言及されているプライマリキー列を NOT NULL として宣言し、テーブルを再作成します。

JSON ファイルがダウンロードされずにブラウザで開かれるのはなぜですか?

  • 症状

    ファイル管理インターフェイスから JSON ファイルをダウンロードするためにクリックすると、ブラウザはダウンロードを開始する代わりに、新しいタブでファイルを開いてその内容を表示します。

  • 原因

    Object Storage Service (OSS) の JSON ファイルに Content-Disposition: attachment HTTP 応答ヘッダーがありません。これにより、ブラウザはデフォルトの動作に従い、ファイルをレンダリング可能なコンテンツとして表示します。

  • 解決策

    • 解決策 1: ファイルを再アップロードする

      この問題はプラットフォームバージョン 4.5.0 で修正されていますが、この修正は 2025 年 5 月以降にアップロードされたファイルにのみ適用されます。この日付より前にアップロードされたファイルは手動で処理する必要があります。

    • 解決策 2: OSS オブジェクトのメタデータを変更する

      対応するオブジェクトに、次の標準 HTTP プロパティを手動で追加します:

      • ヘッダー名: Content-Disposition

      • ヘッダー値: attachment

      詳細については、「ファイルメタデータの管理」をご参照ください。