Dataphin has introduced syntax improvements to Flink SQL in line with the ANSI-SQL standard, aimed at boosting development efficiency. This topic outlines the new lexical features and their application within Dataphin Flink SQL.
Support cross-project reference
DDL and DML table name syntax enhancement
A typical Flink SQL task requires complete DDL and DML statements. Consider the following example:
--DDL
CREATE TABLE input_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'datagen'
);
CREATE TABLE output_table (
field1 VARCHAR,
field2 BIGINT
) WITH (
type = 'print'
);
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;To eliminate the need for repetitive DDL statement definitions across multiple tasks and enhance development efficiency, Dataphin has refined the syntax for referencing table names in Flink SQL. Once a schema is defined in Dataphin, it can be directly referenced in any task using the syntax below, thereby avoiding repeated declarations and enabling cross-project references.
<compoundTableName> ::= [projectName <DOT>] tableNameExplanation of Example
As demonstrated by the preceding SQL example, after defining the schema, you simply need to include the following statement in your development task:
INSERT INTO output_table -- Default project prefix is the current project.
SELECT field1, field2 FROM ${project_name}.input_table;Built-in function and UDX syntax enhancement
Typically, a Flink SQL task necessitates the declaration of UDX usage. See the example below:
CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS
SELECT MY_UDX(args)
FROM ...The custom UDX declarations above are often required in multiple tasks. To avoid this redundancy and improve efficiency, Dataphin has introduced the concept of resources. Once a function is registered in Dataphin, it can be directly referenced in any task using the syntax below, without the need for repeated declarations, and it supports cross-project references.
<compoundFunctionName> ::= [projectName <COLON>] functionNameExplanation of Example
Following the SQL example above, after registering the function in Dataphin, you only need to enter the following statement in the development task:
CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- Default project prefix is the current projectFROM.
FROM ...Special Note
Dataphin allows for the overloading or overriding of built-in functions, giving precedence to the overloaded or overridden versions. Consider the following examples:
Example 1: Including the project prefix.
SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin will default to using the overloaded built-in function `SUBSTRING`. If the custom function is not found under `${project_name}`, it will fail during precompilation.Example 2: Omitting the project prefix.
SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin will prioritize finding the custom `SUBSTRING` function in the current project. If not found, it is considered a built-in function.Use UpdateStatement and TableHints to set task-level DDL parameters
In Dataphin, you can set DDL WITH parameters for existing schemas, and the platform supports the configuration of task-level parameters, such as Watermark, computed columns, primary keys, Header, and more. Additionally, Dataphin has introduced SetOption syntax support. The syntax specification is as follows:
SET {
[ projectName <DOT>] tableName <DOT>
{
propertyName
| WATERMARK
| <computedColumn>
| <primaryKey>
| <procTime>
| {fieldName <DOT> <isHeader>}
}
} <EQ> {
@variable = identifier
| @variable = literal
| @variable = expression
}When configuring WITH parameters, it is advisable to define common table DDL WITH parameters in the schema. Task-level personalized parameters can be specified in the code using the SET statement or TableHints, with multiple SET statements separated by ;.
When using Flink SQL reserved words, you need to add (` `) symbols. See the following example:
Watermark: In the SET statement, use
SET [project.]table.`watermark` = ....Partition: In the SET statement, use
SET [project.]table.`partition` = ....
DDL parameter precedence
Task-level parameter overwrite
Task-level parameter settings will overwrite the parameter settings configured in the meta table (only for parameters that can be overwritten). For example, if the meta table sets the primary key of table T as field1, after using table T in a task, any primary key settings in the task will take precedence (i.e., if the task sets the primary key of table T as field2, then field1 will no longer be the primary key of table T in that task).
Due to Dataphin's security and management policies, certain schema parameters cannot be overwritten in SQL tasks using the SET statement. For example, the table name defined in a MaxCompute schema cannot be altered in an SQL task using SET. The parameters that can be overwritten vary by schema type and are subject to actual operations.
Task internal SET parameters
Internal task SET parameters have global precedence, while TableHints are context-specific. SET parameters will supplement those absent in TableHints, with identical parameter items being superseded.
For example, following version 2.2.5, Blink introduced support for setting INDEX and UNIQUE INDEX for dimension tables. Currently, there are three keywords for declaring JOIN KEY: PRIMARY KEY, INDEX, and UNIQUE INDEX. Dataphin will make a combined judgment based on these. See the example below:
SET my_dim_table.primarykey = 'field1';
CREATE VIEW tmp1 AS
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;
CREATE VIEW tmp2 AS
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;In the code above, Dataphin interprets the settings as assigning field1 as the primary key of my_dim_table (parameter padding) for the creation of tmp1, and field2 as the index; for the creation of tmp2, it sets field3 as the primary key of my_dim_table (parameter supersession).
Additional Example Explanations:
Example 1: Flink SQL Source Table Parameters.
For instance, when creating a Kafka source table:
-- vvr syntax CREATE TEMPORARY TABLE kafkaTable ( `user_id` BIGINT, `item_id` BIGINT, `category_id` BIGINT, `behavior` STRING, `topic` STRING METADATA VIRTUAL, `partition` BIGINT METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' )Personalized parameters are translated into Dataphin Flink SQL SET expressions.
-- If the name of the table property is a SQL keyword or contains other characters, you can escape it with backticks. SET kafkaTable.`properties.group.id` = 'new_group_id'; SET kafkaTable.`scan.startup.mode` = 'latest-offset';Example 2: Flink SQL Sink Table Parameters.
For example, when creating an aliHBase result table:
create table hbase_output( rk varchar, rk1 varchar, rk2 varchar, f bigint, PRIMARY KEY(rk) ) with ( type='alihbase', diamondKey='xxxxxxx', diamondGroup='yyyyyyy', columnFamily='cf', tableName='blink_hbase_test', bufferSize=500; );To overwrite schema configurations at the task level, use the following settings:
SET hbase_output.bufferSize = 1000;Example 3: Flink SQL Dimension Table Parameters.
For instance, when creating a MaxCompute dimension table:
CREATE TABLE white_list ( id varchar, name varchar, age int, PRIMARY KEY (id) --PERIOD FOR SYSTEM_TIME -- Note: From blink3.x, dimension table DDL does not require PERIOD FOR SYSTEM_TIME. ) with ( type = 'odps', endPoint = 'your_end_point_name', project = 'your_project_name', tableName = 'your_table_name', accessId = 'your_access_id', accessKey = 'your_access_key', `partition` = 'ds=20180905', cache = 'ALL' );These are translated into Dataphin Flink SQL SET expressions:
SET white_list.cache='ALL'; SET white_list.cacheTTLMs=86400000; --If you want to specifically set the cache update time of this table at the task level, you can add SET here.Example 4: Configuring WATERMARK, Computed Columns, HEADER, PROCTIME, PRIMARY KEY/(UNIQUE)INDEX.
Currently, the schema supports only primary key and Header settings. For WATERMARK, PROCTIME, computed columns, and others, they must be set at the task level. The syntax specifications are as follows:
Setting WATERMARK.
The syntax is:
SET { [projectName <DOT>] tableName <DOT> WATERMARK } <EQ> { WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset) }Here's an example of a standard Flink SQL task DDL for WATERMARK:
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts TIMESTAMP, WATERMARK FOR ts AS withOffset(ts, 1000) ) with ( type = 'sls', ...);This is converted into the Dataphin Flink SQL method for setting WATERMARK:
Version 3.7 and above, single quotes are required. SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)'; Version 3.6 SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);Setting Computed Columns.
The syntax is:
SET { [projectName <DOT>] tableName <DOT> <computedColumn> } <EQ> { column_name AS computed_column_expression | { <LPAREN> column_name AS computed_column_expression (<COMMA> column_name AS computed_column_expression)* <RPAREN> } }Here's an example of a standard Flink SQL task DDL for computed columns:
CREATE TABLE sls_stream( a INT, b BIGINT, c VARCHAR, ts AS to_timestamp(c, 'yyyyMMddHHmmss') ) with ( type = 'sls', ...);This is converted into the Dataphin Flink SQL method for setting computed columns:
Method for a Single Computed Column.
SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');Method for Multiple Computed Columns.
SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
Setting PROCTIME.
The syntax is:
SET { [projectName <DOT>] tableName <DOT> <procTime> } <EQ> { columnName AS PROCTIME() }This is converted into the Dataphin Flink SQL method for setting PROCTIME:
SET sls_stream.procTime= d AS PROCTIME();Setting PRIMARY KEY, INDEX, UNIQUE INDEX.
ImportantStarting with Blink 3.6.0, TableHints employ the community-accepted Hints method. Neither Ververica Flink nor open-source Flink supports INDEX or UNIQUE INDEX.
The syntax is:
query : select /*+ hint_content */ ... from table_name1 /*+ hint_content */ join table_name2 /*+ hint_content */ ... hint_content : hint_item[, hint_item]* hint_item : hint_name | hint_name(k1=v1 [ , k2=v2 ]*) | hint_name(hint_opt [ ,hint_opt ]*) k : simple_identifier v : string_literal hint_opt : simple_identifier | numeric_literal | string_literalHere's an example of a standard Flink SQL task using TableHints:
INSERT INTO table SELECT source_table.* FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;When joining stream tables, Flink allows users to specify
JOIN KEYkeywords based on their requirements. SET statements have global precedence, and TableHints arecontext-specific.SET dim_table.index = 'field1'; -- set pk and set unique index follow the same pattern -- Equivalent to declaring INDEX (field1) in dim_table's DDL INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- Equivalent to declaring INDEX (field2) in dim_table's DDL. INSERT INTO table2 SELECT source_table.* FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- Equivalent to declaring PRIMARY KEY (field1), INDEX (field2) in dim_table's DDL. INSERT INTO table3 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- Equivalent to declaring PRIMARY KEY (field1), INDEX (field1) in dim_table's DDL. INSERT INTO table4 SELECT source_table.* FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- Equivalent to declaring PRIMARY KEY (field1), UNIQUE INDEX (field2, field3) in dim_table's DDL. INSERT INTO table5 SELECT source_table.* FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME() ON ...;ImportantThe
join withsyntax was previously used in the early Blink engine. For newer versions of Blink, Ververica Flink, and open-source Flink, please use TableHint.The following is an example of TableHint usage:
-- Specify that the primary key of dim_table is the id field (supported by blink, vvr, apache-flink). INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- Specify that the primary key of the dimension table of dim_table is id (only supported by blink). INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...; -- Specify that the unique primary key of the dimension table of dim_table is id (only supported by blink). INSERT INTO table1 SELECT source_table.* FROM source_table JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME() ON ...;Configure HEADER.
The syntax is as follows:
SET { [projectName <DOT>] tableName <DOT> <isHeader> } <EQ> { TRUE }Refer to the example below:
-- blink demo create table kafka_table ( `message` VARBINARY, topic varchar HEADER ) WITH ( type = 'kafka010', topic = 'test_kafka_topic', `group.id` = 'test_kafka_consumer_group', bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3' ); -- vvr demo create table kafka_table ( `message` VARBINARY, topic varchar METADATA VIRTUAL ) WITH ( 'connector' = 'kafka', 'topic' = 'my_excellent_topic', 'properties.bootstrap.servers' = 'mykafka:9092', 'properties.group.id' = 'my_excellent_group' 'format' = 'csv', 'scan.startup.mode' = 'earliest-offset' );In Dataphin, SET can be used with the following two syntax options:
SET kafka_table.topic.isHeader = true; -- Without single quotes. SET kafka_table.topic.isHeader = 'true'; -- With single quotes.
Example 5: TimeTunnel Automatic Subscription Feature (with TimeTunnel as Input).
The sample code is as follows:
set xx_project.xx_table.accessId='xxxxxx'; set xx_project.xx_table.accessKey='xxxxxx';Superuser accounts and accounts lacking permissions must manually configure subscriptions using SQL; they cannot use the automatic subscription feature.
For other personal accounts, configuring the SET information is optional. Typically, if the JobName is not specified, the default JobName
defaultis used to log the subscription information for the Topic. To prevent redundant subscriptions for the same Topic across different Jobs, the subscription is not repeated within a separate precompilation function once an archive record exists. The submit and publish functions will automatically subscribe to the Topic the first time they are used for a given Job, and will not subscribe again if an archive record is present.
Example 6: Resolving Table Name Conflicts.
When a project is linked to an offline computing source, conflicts may arise between the names of offline and streaming tables. The approach to resolving these conflicts is outlined below:
Should a table name coincide in both streaming and offline contexts within a task's SQL, an error will be triggered during the precompilation process with the following message: Please assign a single type to the table adi_ae_matrix_rt_slr_prim_cate using the syntax SET XXX.XXX.tableType = 'XX'. Available types include: [CLOUD_HBASE, MAX_COMPUTE].
In such cases, it is necessary to explicitly define the table's data type within the SQL to avoid name duplication within the same
project. Depending on the error message, select either CLOUD_HBASE or MAX_COMPUTE. An example of the code is as follows:-- Force the use of MaxCompute physical table. SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
Example 7: Support for MaxCompute (odps) Data Source Tables.
To utilize a MaxCompute data source table, ensure to define
tableType='odps';to avoid a parameter missing error.Example 8: Cropping Columns in Dimension Tables.
To manually enable column cropping, use the SET statement as follows:
SET {project}.{dimtable}.dataphinColumnPruning='true';NoteNote: Column cropping is applicable only to dimension tables and supports only direct join tables, not subqueries within direct joins.
Example 9: Customizing Parameters for Stream-Batch Integration.
In stream-batch integration tasks, mirror tables are commonly utilized. These mirror tables are converted into the appropriate stream or batch tables for use.
Given the variety of stream and batch tables, which may have different data sources and thus require distinct
keyparameters in thewithclause, and may have unique settings likebatchSize, TableHints are employed to map the corresponding stream and batch tables. The syntax is as follows:set project.table.${mode}.${key}To define the start and end times of the batch task, refer to the example below:
set project.table.batch.startTime='2020-11-11 00:00:00'; set project.table.batch.endTime='2020-11-12 00:00:00';To set the AccessKey for the stream, see the following example:
set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx'; set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';Example 10: Result Table Column Cropping.
You can use the hint method to specify which columns to crop in the result table.
ImportantNote: This feature is applicable only to HBase Sink tables.
-- Suppose the HBase DDL statement is as follows CREATE TABLE hbase_sink( rowkey INT, family1 ROW<q1 INT>, family2 ROW<q2 STRING, q3 BIGINT> ) with ( 'connector'='cloudhbase', 'table-name'='<yourTableName>', 'zookeeper.quorum'='<yourZookeeperQuorum>' ); -- Error: Because the table fields do not match insert into hbase_sink select key, ROW(f1q1) from ... -- Pass: Here, the hint method specifies cropping the sink table, specifying writing to rowkey and q1 columns insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/ select key, ROW(f1q1) from ... -- Pass: Here, the hint method specifies cropping the sink table, specifying writing to rowkey and q2, q3 columns insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/ select key, ROW(f2q2,f2q3) from ...