All Products
Search
Document Center

Dataphin:Dataphin Flink SQL lexical enhancement

Last Updated:Jan 23, 2025

Dataphin has introduced syntax improvements to Flink SQL in line with the ANSI-SQL standard, aimed at boosting development efficiency. This topic outlines the new lexical features and their application within Dataphin Flink SQL.

Support cross-project reference

DDL and DML table name syntax enhancement

A typical Flink SQL task requires complete DDL and DML statements. Consider the following example:

--DDL
CREATE TABLE input_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'datagen'
  );
CREATE TABLE output_table (
  	field1 VARCHAR,
    field2 BIGINT
  ) WITH (
		type = 'print'
  );
--DML
INSERT INTO output_table
SELECT field1, field2 FROM input_table;

To eliminate the need for repetitive DDL statement definitions across multiple tasks and enhance development efficiency, Dataphin has refined the syntax for referencing table names in Flink SQL. Once a schema is defined in Dataphin, it can be directly referenced in any task using the syntax below, thereby avoiding repeated declarations and enabling cross-project references.

<compoundTableName> ::= [projectName <DOT>] tableName

Explanation of Example

As demonstrated by the preceding SQL example, after defining the schema, you simply need to include the following statement in your development task:

INSERT INTO output_table -- Default project prefix is the current project.
SELECT field1, field2 FROM ${project_name}.input_table;

Built-in function and UDX syntax enhancement

Typically, a Flink SQL task necessitates the declaration of UDX usage. See the example below:

CREATE FUNCTION MY_UDX AS 'package.class';
...
CREATE VIEW my_view AS 
SELECT MY_UDX(args)
FROM ...

The custom UDX declarations above are often required in multiple tasks. To avoid this redundancy and improve efficiency, Dataphin has introduced the concept of resources. Once a function is registered in Dataphin, it can be directly referenced in any task using the syntax below, without the need for repeated declarations, and it supports cross-project references.

<compoundFunctionName> ::= [projectName <COLON>] functionName

Explanation of Example

Following the SQL example above, after registering the function in Dataphin, you only need to enter the following statement in the development task:

CREATE VIEW my_view AS SELECT ${project_name}:MY_UDX(args) -- Default project prefix is the current projectFROM.
FROM ...

Special Note

Dataphin allows for the overloading or overriding of built-in functions, giving precedence to the overloaded or overridden versions. Consider the following examples:

  • Example 1: Including the project prefix.

SELECT ${project_name}:SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin will default to using the overloaded built-in function `SUBSTRING`. If the custom function is not found under `${project_name}`, it will fail during precompilation.
  • Example 2: Omitting the project prefix.

SELECT SUBSTRING('20200108171740', 1, 8) FROM dual;
--Dataphin will prioritize finding the custom `SUBSTRING` function in the current project. If not found, it is considered a built-in function.

Use UpdateStatement and TableHints to set task-level DDL parameters

In Dataphin, you can set DDL WITH parameters for existing schemas, and the platform supports the configuration of task-level parameters, such as Watermark, computed columns, primary keys, Header, and more. Additionally, Dataphin has introduced SetOption syntax support. The syntax specification is as follows:

SET {
	[ projectName <DOT>] tableName <DOT> 
		{ 
			propertyName
			| WATERMARK
			| <computedColumn> 
			| <primaryKey> 
			| <procTime> 
			| {fieldName <DOT> <isHeader>} 
		}
} 	<EQ> {
		@variable = identifier
		| @variable = literal
		| @variable = expression
}

When configuring WITH parameters, it is advisable to define common table DDL WITH parameters in the schema. Task-level personalized parameters can be specified in the code using the SET statement or TableHints, with multiple SET statements separated by ;.

Important

When using Flink SQL reserved words, you need to add (` `) symbols. See the following example:

  • Watermark: In the SET statement, use SET [project.]table.`watermark` = ....

  • Partition: In the SET statement, use SET [project.]table.`partition` = ....

DDL parameter precedence

Task-level parameter overwrite

Task-level parameter settings will overwrite the parameter settings configured in the meta table (only for parameters that can be overwritten). For example, if the meta table sets the primary key of table T as field1, after using table T in a task, any primary key settings in the task will take precedence (i.e., if the task sets the primary key of table T as field2, then field1 will no longer be the primary key of table T in that task).

Important

Due to Dataphin's security and management policies, certain schema parameters cannot be overwritten in SQL tasks using the SET statement. For example, the table name defined in a MaxCompute schema cannot be altered in an SQL task using SET. The parameters that can be overwritten vary by schema type and are subject to actual operations.

Task internal SET parameters

Internal task SET parameters have global precedence, while TableHints are context-specific. SET parameters will supplement those absent in TableHints, with identical parameter items being superseded.

For example, following version 2.2.5, Blink introduced support for setting INDEX and UNIQUE INDEX for dimension tables. Currently, there are three keywords for declaring JOIN KEY: PRIMARY KEY, INDEX, and UNIQUE INDEX. Dataphin will make a combined judgment based on these. See the example below:

SET my_dim_table.primarykey = 'field1';

CREATE VIEW tmp1 AS 
SELECT my_source_table1.*
FROM my_source_table1 JOIN my_dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

CREATE VIEW tmp2 AS 
SELECT my_source_table2.*
FROM my_source_table2 JOIN my_dim_table WITH (primarykey = 'field3') FOR SYSTEM_TIME AS OF PROCTIME()
ON ...;

In the code above, Dataphin interprets the settings as assigning field1 as the primary key of my_dim_table (parameter padding) for the creation of tmp1, and field2 as the index; for the creation of tmp2, it sets field3 as the primary key of my_dim_table (parameter supersession).

Additional Example Explanations:

  • Example 1: Flink SQL Source Table Parameters.

    For instance, when creating a Kafka source table:

    -- vvr syntax
    CREATE TEMPORARY TABLE kafkaTable (
        `user_id` BIGINT,
        `item_id` BIGINT,
        `category_id` BIGINT,
        `behavior` STRING,
        `topic` STRING METADATA VIRTUAL,
        `partition` BIGINT METADATA VIRTUAL
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'my_excellent_topic',
        'properties.bootstrap.servers' = 'mykafka:9092',
        'properties.group.id' = 'my_excellent_group'
        'format' = 'csv',
        'scan.startup.mode' = 'earliest-offset'
    )

    Personalized parameters are translated into Dataphin Flink SQL SET expressions.

    -- If the name of the table property is a SQL keyword or contains other characters, you can escape it with backticks.
    SET kafkaTable.`properties.group.id` = 'new_group_id';
    SET kafkaTable.`scan.startup.mode` = 'latest-offset';
  • Example 2: Flink SQL Sink Table Parameters.

    For example, when creating an aliHBase result table:

    create table hbase_output(
      rk  varchar,
      rk1 varchar,
      rk2 varchar,
      f bigint,
      PRIMARY KEY(rk)
    ) with (
      type='alihbase',
      diamondKey='xxxxxxx',
      diamondGroup='yyyyyyy',
      columnFamily='cf',
      tableName='blink_hbase_test',
      bufferSize=500;
    );

    To overwrite schema configurations at the task level, use the following settings:

    SET hbase_output.bufferSize = 1000;
  • Example 3: Flink SQL Dimension Table Parameters.

    For instance, when creating a MaxCompute dimension table:

     CREATE TABLE white_list (
      id varchar,
      name varchar,
      age int,
      PRIMARY KEY (id)
      --PERIOD FOR SYSTEM_TIME -- Note: From blink3.x, dimension table DDL does not require PERIOD FOR SYSTEM_TIME.
    ) with (
      type = 'odps',
      endPoint = 'your_end_point_name',
      project = 'your_project_name',
      tableName = 'your_table_name',
      accessId = 'your_access_id',
      accessKey = 'your_access_key',
      `partition` = 'ds=20180905',
      cache = 'ALL'
    );

    These are translated into Dataphin Flink SQL SET expressions:

    SET white_list.cache='ALL';
    SET white_list.cacheTTLMs=86400000;
    --If you want to specifically set the cache update time of this table at the task level, you can add SET here.
  • Example 4: Configuring WATERMARK, Computed Columns, HEADER, PROCTIME, PRIMARY KEY/(UNIQUE)INDEX.

    Currently, the schema supports only primary key and Header settings. For WATERMARK, PROCTIME, computed columns, and others, they must be set at the task level. The syntax specifications are as follows:

    • Setting WATERMARK.

      The syntax is:

      SET {
      	[projectName <DOT>] tableName <DOT> WATERMARK
      } 	<EQ> {
      		WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
      }

      Here's an example of a standard Flink SQL task DDL for WATERMARK:

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts TIMESTAMP,
      		WATERMARK FOR ts AS withOffset(ts, 1000)
      	  ) with (
      		type = 'sls',
        ...);

      This is converted into the Dataphin Flink SQL method for setting WATERMARK:

      Version 3.7 and above, single quotes are required.
      SET sls_stream.`watermark`= 'WATERMARK FOR ts AS withOffset(ts, 1000)';
      Version 3.6
      SET sls_stream.`watermark`= WATERMARK FOR ts AS withOffset(ts, 1000);
    • Setting Computed Columns.

      The syntax is:

      SET {
      	[projectName <DOT>] tableName <DOT> <computedColumn>
      } 	<EQ> {
      		column_name AS computed_column_expression
          |
          {
          	<LPAREN>
          		column_name AS computed_column_expression
          		(<COMMA> column_name AS computed_column_expression)*
            <RPAREN>  
          }
      }

      Here's an example of a standard Flink SQL task DDL for computed columns:

      CREATE TABLE sls_stream(
      		a INT,
      		b BIGINT,
      		c VARCHAR,
      		ts AS to_timestamp(c, 'yyyyMMddHHmmss')
      	  ) with (
      		type = 'sls',
        ...);

      This is converted into the Dataphin Flink SQL method for setting computed columns:

      • Method for a Single Computed Column.

        SET sls_stream.computedColumn= ts AS to_timestamp(c, 'yyyyMMddHHmmss');
      • Method for Multiple Computed Columns.

        SET sls_stream.computedColumn= ( ts1 AS to_timestamp(c, 'yyyyMMddHHmmss'), ts2 AS to_timestamp(d, 'yyyy-MM-dd HH:mm:ss') );
    • Setting PROCTIME.

      The syntax is:

      SET {
      	[projectName <DOT>] tableName <DOT> <procTime>
      } 	<EQ> {
      		columnName AS PROCTIME()
      }

      This is converted into the Dataphin Flink SQL method for setting PROCTIME:

      SET sls_stream.procTime= d AS PROCTIME();
    • Setting PRIMARY KEY, INDEX, UNIQUE INDEX.

      Important

      Starting with Blink 3.6.0, TableHints employ the community-accepted Hints method. Neither Ververica Flink nor open-source Flink supports INDEX or UNIQUE INDEX.

      The syntax is:

      query :
            select /*+ hint_content */
            ...
            from
                table_name1 /*+ hint_content */
                join 
                table_name2 /*+ hint_content */
            ...
      hint_content :
             hint_item[, hint_item]*
      hint_item :
             hint_name
         |   hint_name(k1=v1 [ , k2=v2 ]*)
         |   hint_name(hint_opt [ ,hint_opt ]*)
      k :
            simple_identifier
      v :
            string_literal
      hint_opt :
            simple_identifier
         |  numeric_literal
         |  string_literal

      Here's an example of a standard Flink SQL task using TableHints:

      INSERT INTO table
      SELECT source_table.*
      FROM source_table JOIN dim_table /*+ primarykey(field1) */ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;

      When joining stream tables, Flink allows users to specify JOIN KEY keywords based on their requirements. SET statements have global precedence, and TableHints are context-specific.

      SET dim_table.index = 'field1'; -- set pk and set unique index follow the same pattern
      
      -- Equivalent to declaring INDEX (field1) in dim_table's DDL
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table JOIN dim_table FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- Equivalent to declaring INDEX (field2) in dim_table's DDL.
      INSERT INTO table2
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (index = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      
      -- Equivalent to declaring PRIMARY KEY (field1), INDEX (field2) in dim_table's DDL.
      INSERT INTO table3
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field2') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- Equivalent to declaring PRIMARY KEY (field1), INDEX (field1) in dim_table's DDL.
      INSERT INTO table4
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (primarykey = 'field1') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- Equivalent to declaring PRIMARY KEY (field1), UNIQUE INDEX (field2, field3) in dim_table's DDL.
      INSERT INTO table5
      SELECT source_table.*
      FROM source_table JOIN dim_table WITH (UNIQUEINDEX = 'field2, field3') FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      Important

      The join with syntax was previously used in the early Blink engine. For newer versions of Blink, Ververica Flink, and open-source Flink, please use TableHint.

      The following is an example of TableHint usage:

      -- Specify that the primary key of dim_table is the id field (supported by blink, vvr, apache-flink).
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+primarykey(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- Specify that the primary key of the dimension table of dim_table is id (only supported by blink).
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+index(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
      
      -- Specify that the unique primary key of the dimension table of dim_table is id (only supported by blink).
      INSERT INTO table1
      SELECT source_table.*
      FROM source_table 
      JOIN dim_table/*+uniqueindex(id)*/ FOR SYSTEM_TIME AS OF PROCTIME()
      ON ...;
    • Configure HEADER.

      The syntax is as follows:

      SET {
      	[projectName <DOT>] tableName <DOT> <isHeader>
      } 	<EQ> {
      		TRUE
      }

      Refer to the example below:

      -- blink demo
      create table kafka_table (
          `message` VARBINARY,
          topic varchar HEADER
      ) WITH (
          type = 'kafka010',
          topic = 'test_kafka_topic',
          `group.id` = 'test_kafka_consumer_group',
          bootstrap.servers = 'ip1:port1,ip2:port2,ip3:port3'
      );
      
      -- vvr demo
      create table kafka_table (
          `message` VARBINARY,
          topic varchar METADATA VIRTUAL
      ) WITH (
          'connector' = 'kafka',
          'topic' = 'my_excellent_topic',
          'properties.bootstrap.servers' = 'mykafka:9092',
          'properties.group.id' = 'my_excellent_group'
          'format' = 'csv',
          'scan.startup.mode' = 'earliest-offset'
      );

      In Dataphin, SET can be used with the following two syntax options:

      SET kafka_table.topic.isHeader = true; -- Without single quotes.
      
      SET kafka_table.topic.isHeader = 'true'; -- With single quotes.
  • Example 5: TimeTunnel Automatic Subscription Feature (with TimeTunnel as Input).

    The sample code is as follows:

    set xx_project.xx_table.accessId='xxxxxx';
    set xx_project.xx_table.accessKey='xxxxxx';
    • Superuser accounts and accounts lacking permissions must manually configure subscriptions using SQL; they cannot use the automatic subscription feature.

    • For other personal accounts, configuring the SET information is optional. Typically, if the JobName is not specified, the default JobName default is used to log the subscription information for the Topic. To prevent redundant subscriptions for the same Topic across different Jobs, the subscription is not repeated within a separate precompilation function once an archive record exists. The submit and publish functions will automatically subscribe to the Topic the first time they are used for a given Job, and will not subscribe again if an archive record is present.

  • Example 6: Resolving Table Name Conflicts.

    When a project is linked to an offline computing source, conflicts may arise between the names of offline and streaming tables. The approach to resolving these conflicts is outlined below:

    1. Should a table name coincide in both streaming and offline contexts within a task's SQL, an error will be triggered during the precompilation process with the following message: Please assign a single type to the table adi_ae_matrix_rt_slr_prim_cate using the syntax SET XXX.XXX.tableType = 'XX'. Available types include: [CLOUD_HBASE, MAX_COMPUTE].

    2. In such cases, it is necessary to explicitly define the table's data type within the SQL to avoid name duplication within the same project. Depending on the error message, select either CLOUD_HBASE or MAX_COMPUTE. An example of the code is as follows:

      -- Force the use of MaxCompute physical table.
      SET xx_project.xx_table.tableType = 'MAX_COMPUTE';
  • Example 7: Support for MaxCompute (odps) Data Source Tables.

    To utilize a MaxCompute data source table, ensure to define tableType='odps'; to avoid a parameter missing error.

  • Example 8: Cropping Columns in Dimension Tables.

    To manually enable column cropping, use the SET statement as follows:

    SET {project}.{dimtable}.dataphinColumnPruning='true';
    Note

    Note: Column cropping is applicable only to dimension tables and supports only direct join tables, not subqueries within direct joins.

  • Example 9: Customizing Parameters for Stream-Batch Integration.

    In stream-batch integration tasks, mirror tables are commonly utilized. These mirror tables are converted into the appropriate stream or batch tables for use.

    Given the variety of stream and batch tables, which may have different data sources and thus require distinct key parameters in the with clause, and may have unique settings like batchSize, TableHints are employed to map the corresponding stream and batch tables. The syntax is as follows:

    set project.table.${mode}.${key}

    To define the start and end times of the batch task, refer to the example below:

    set project.table.batch.startTime='2020-11-11 00:00:00';
    set project.table.batch.endTime='2020-11-12 00:00:00';

    To set the AccessKey for the stream, see the following example:

    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessId='xxxxxx';
    set dwd_lux_trd_ord_subpay_mirror.`stream`.accessKey='xxxxxx';
  • Example 10: Result Table Column Cropping.

    You can use the hint method to specify which columns to crop in the result table.

    Important

    Note: This feature is applicable only to HBase Sink tables.

    -- Suppose the HBase DDL statement is as follows
    CREATE TABLE hbase_sink(
      rowkey INT,
      family1 ROW<q1 INT>,
      family2 ROW<q2 STRING, q3 BIGINT>
    ) with (
      'connector'='cloudhbase',
      'table-name'='<yourTableName>',
      'zookeeper.quorum'='<yourZookeeperQuorum>'
    );
    
    -- Error: Because the table fields do not match
    insert into hbase_sink select key, ROW(f1q1) from ...
    
    -- Pass: Here, the hint method specifies cropping the sink table, specifying writing to rowkey and q1 columns
    insert into hbase_sink/*+dataphincolumns(rowkey,family1.q1)*/
    select key, ROW(f1q1) from ...
    
    -- Pass: Here, the hint method specifies cropping the sink table, specifying writing to rowkey and q2, q3 columns
    insert into hbase_sink/*+dataphincolumns(rowkey,family2.q2,family2.q3)*/
    select key, ROW(f2q2,f2q3) from ...