This topic provides answers to some frequently asked questions about draft development and deployment debugging.
How do I declare the DDL statement when I execute a DDL statement and a DML statement in the same text?
When you execute a DDL statement and a DML statement in the same text, declare CREATE TEMPORARY TABLE
instead of CREATE TABLE
in the DDL statement. Otherwise, after you click Validate, the error message shown in the following figure appears.
How do I write multiple INSERT INTO statements?
Write the INSERT INTO statements between BEGIN STATEMENT SET;
and END;
to form a logical unit. For more information, see INSERT INTO statement. Otherwise, after you click Validate, the error message shown in the following figure appears.
What do I do if I want to specify a special character in the value of the Entry Point Main Arguments parameter?
Cause
If the value of the Entry Point Main Arguments parameter contains a special character, such as a number sign (#) or a dollar sign ($), the special character cannot be identified after the special character is escaped by using backslashes (\). As a result, the special character is discarded.
Solution
On the Deployments page, click the name of the desired deployment. In the Parameters section of the Configuration tab, add the
env.java.opts: -Dconfig.disable-inline-comment=true
configuration to the Other Configuration field. For more information, see Console operations.
Why does a UDF JAR package fail to be uploaded after the package is modified multiple times?
Cause
The JAR package that you want to upload contains a class that is named the same as a class in an existing JAR package. This causes a user-defined function (UDF) conflict.
Solution
Delete the JAR package and upload the package again.
In the development console of Realtime Compute for Apache Flink, add the following statement to the SQL editor to use a temporary function before you upload the JAR package. Then, click the Upload icon to upload the JAR package in the Additional Dependencies section of the Configurations tab on the ETL page. For more information about how to use temporary functions, see Register a UDF. Example:
CREATE TEMPORARY FUNCTION `cp_record_reduce` AS 'com.taobao.test.udf.blink.CPRecordReduceUDF';
Why are fields misaligned when I use a POJO class as the data types for the return values of a UDTF?
Problem description
If a Plain Old Java Object (POJO) class is used as the data types for the return values of a user-defined table-valued function (UDTF) and the alias names of the returned fields of the UDTF are explicitly declared in the SQL statement, fields may be misaligned. In this case, the fields that are used may not meet requirements even if the data types of the fields are consistent.
For example, the SQL verification failure occurs in the following situations: You use the following POJO class as the data types for the return values of a UDTF, package the UDTF based on the requirements described in Overview, and register the UDTF based on the requirements described in Register a deployment-level UDF.
package com.aliyun.example; public class TestPojoWithoutConstructor { public int c; public String d; public boolean a; public String b; }
package com.aliyun.example; import org.apache.flink.table.functions.TableFunction; public class MyTableFuncPojoWithoutConstructor extends TableFunction<TestPojoWithoutConstructor> { private static final long serialVersionUID = 1L; public void eval(String str1, Integer i2) { TestPojoWithoutConstructor p = new TestPojoWithoutConstructor(); p.d = str1 + "_d"; p.c = i2 + 2; p.b = str1 + "_b"; collect(p); } }
CREATE TEMPORARY FUNCTION MyTableFuncPojoWithoutConstructor as 'com.aliyun.example.MyTableFuncPojoWithoutConstructor'; CREATE TEMPORARY TABLE src ( id STRING, cnt INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE sink ( f1 INT, f2 STRING, f3 BOOLEAN, f4 STRING ) WITH ( 'connector' = 'print' ); INSERT INTO sink SELECT T.* FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b);
The following error message for SQL verification is reported:
org.apache.flink.table.api.ValidationException: SQL validation failed. Column types of query result and sink for 'vvp.default.sink' do not match. Cause: Sink column 'f1' at position 0 is of type INT but expression in the query is of type BOOLEAN NOT NULL. Hint: You will need to rewrite or cast the expression. Query schema: [c: BOOLEAN NOT NULL, d: STRING, a: INT NOT NULL, b: STRING] Sink schema: [f1: INT, f2: STRING, f3: BOOLEAN, f4: STRING] at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
In this example, the fields returned from the UDTF and the fields in the POJO class may be misaligned. In the SQL statement, Field c is of the BOOLEAN data type, while Field a is of the INT data type, which is the opposite of the data types specified by the POJO class.
Cause
The order of the returned fields varies based on a parameterized constructor of the POJO class:
If the POJO class implements a parameterized constructor, the fields are sorted based on the order of the parameters of the parameterized constructor.
If the POJO class implements a parameterized constructor, the fields are sorted based on the order of the parameters of the parameterized constructor.
In this example, the POJO class does not implement a parameterized constructor. As a result, the data types for the return values of the UDTF are
BOOLEAN a, VARCHAR(2147483647) b, INTEGER c, VARCHAR(2147483647) d)
. No error occurs in the preceding example. However, a rename listLATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T(c, d, a, b)
is added to the returned fields in the SQL statement. The returned fields are renamed. The renaming operation is performed based on the field position. As a result, fields are misaligned when the POJO class is used. This causes verification errors or unexpected data misalignment.Solution
If the POJO class does not implement a parameterized constructor, do not explicitly rename the fields returned by the UDTF. For example, you can change the SELECT clause in the preceding INSERT statement to the following SELECT clause:
-- If the POJO class does not implement a parameterized constructor, we recommend that you select the required field names. When you use T.*, you must know the actual order of the returned fields. SELECT T.c, T.d, T.a, T.b FROM src, LATERAL TABLE(MyTableFuncPojoWithoutConstructor(id, cnt)) AS T;
Implement a parameterized constructor in the POJO class to determine the order of the returned fields. In this case, the order of the returned fields is the order of the parameters of the parameterized constructor.
package com.aliyun.example; public class TestPojoWithConstructor { public int c; public String d; public boolean a; public String b; // Using specific fields order instead of alphabetical order public TestPojoWithConstructor(int c, String d, boolean a, String b) { this.c = c; this.d = d; this.a = a; this.b = b; } }
How do I troubleshoot dependency conflicts of Realtime Compute for Apache Flink?
Problem description
An error is reported, which is caused by an exception related to Realtime Compute for Apache Flink or Hadoop.
java.lang.AbstractMethodError java.lang.ClassNotFoundException java.lang.IllegalAccessError java.lang.IllegalAccessException java.lang.InstantiationError java.lang.InstantiationException java.lang.InvocationTargetException java.lang.NoClassDefFoundError java.lang.NoSuchFieldError java.lang.NoSuchFieldException java.lang.NoSuchMethodError java.lang.NoSuchMethodException
No error is reported, but one of the following issues occur:
Logs are not generated or the Log4j configuration does not take effect.
In most cases, this issue occurs because the dependency contains the Log4j configuration. To resolve this issue, you must check whether the dependency in the JAR file of your deployment contains the Log4j configuration. If the dependency contains the Log4j configuration, you can configure exclusions in the dependency to remove the Log4j configuration.
NoteIf you use different versions of Log4j, you must use maven-shade-plugin to relocate Log4j-related classes.
The remote procedure call (RPC) fails.
By default, errors caused by dependency conflicts during Akka RPCs of Realtime Compute for Apache Flink are not recorded in logs. To check these errors, you must enable debug logging.
For example, a debug log records
Cannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}
. However, the JobManager log stops at the messageRegistering TaskManager with ResourceID xxx
and does not display any information until a resource request times out and displays the messageNoResourceAvailableException
. In addition, TaskManagers continuously report the error messageCannot allocate the requested resources. Trying to allocate ResourceProfile{xxx}
.Cause: After you enable debug logging, the RPC error message
InvocationTargetException
appears. In this case, slots fail to be allocated for TaskManagers and the status of the TaskManagers becomes inconsistent. As a result, slots cannot be allocated and the error cannot be fixed.
Causes
The JAR package of your deployment contains unnecessary dependencies, such as the dependencies for basic configurations, Realtime Compute for Apache Flink, Hadoop, and Log4j. As a result, dependency conflicts occur and cause some issues.
The dependency that corresponds to the connector that is required for your deployment is not included in the JAR package.
Identification methods
Check whether the pom.xml file of your deployment contains unnecessary dependencies.
Run the
jar tf foo.jar
command to view the content of the JAR package and determine whether the package contains the content that causes dependency conflicts.Run the
mvn dependency:tree
command to check the dependency relationship of your deployment and determine whether dependency conflicts exist.
Solutions
We recommend that you set scope to provided for the dependencies for basic configurations. This way, the dependencies for basic configurations are not included in the JAR package of your deployment.
DataStream Java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
DataStream Scala
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
DataSet Java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
DataSet Scala
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
Add the dependencies that correspond to the connectors required for the deployment, and set scope to compile. This way, the dependencies that correspond to the required connectors are included in the JAR package. The default value of scope is compile. In the following code, the Kafka connector is used as an example.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency>
We recommend that you do not add the dependencies for Realtime Compute for Apache Flink, Hadoop, or Log4j. Take note of the following exceptions:
If the deployment has direct dependencies for basic configurations or connectors, we recommend that you set scope to provided. Sample code:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <scope>provided</scope> </dependency>
If the deployment has indirect dependencies for basic configurations or connectors, we recommend that you configure exclusions to remove the dependencies. Sample code:
<dependency> <groupId>foo</groupId> <artifactId>bar</artifactId> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency>
What do I do if the error message "Could not parse type at position 50: expected but was . Input type string: ROW" appears?
Problem description
When you write SQL statements that include a UDTF in the SQL editor, a syntax check error occurs. The syntax error is underlined with a red wavy line.
Caused by: org.apache.flink.table.api.ValidationException: Could not parse type at position 50: <IDENTIFIER> expected but was <KEYWORD>. Input type string: ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>
Sample code:
@FunctionHint( //input = @DataTypeHint("BYTES"), output = @DataTypeHint("ROW<resultId String,pointRange String,from String,to String,type String,pointScope String,userId String,point String,triggerSource String,time String,uuid String>")) public class PointChangeMetaQPaser1 extends TableFunction<Row> { Logger logger = LoggerFactory.getLogger(this.getClass().getName()); public void eval(byte[] bytes) { try { String messageBody = new String(bytes, "UTF-8"); Map<String, String> resultDO = JSON.parseObject(messageBody, Map.class); logger.info("PointChangeMetaQPaser1 logger:" + JSON.toJSONString(resultDO)); collect(Row.of( getString(resultDO.get("resultId")), getString(resultDO.get("pointRange")), getString(resultDO.get("from")), getString(resultDO.get("to")), getString(resultDO.get("type")), getString(resultDO.get("pointScope")), getString(resultDO.get("userId")), getString(resultDO.get("point")), getString(resultDO.getOrDefault("triggerSource", "NULL")), getString(resultDO.getOrDefault("time", String.valueOf(System.currentTimeMillis()))), getString(resultDO.getOrDefault("uuid", String.valueOf(UUID.randomUUID()))) )); } catch (Exception e) { logger.error("PointChangeMetaQPaser1 error", e); } } private String getString(Object o) { if (o == null) { return null; } return String.valueOf(o); } }
Cause
When you use the @DataTypeHint annotation to specify the data types of the input and output values for the UDTF, a keyword that is reserved by the system is used as the name of a field.
Solutions
Change the name of the field to a name that is different from the keyword. For example, if the keyword is to, change the name of the field to fto. If the keyword is from, change the name of the field to ffrom.
Enclose the name of the field whose name is the same as the reserved keyword in a pair of grave accents (``).