All Products
Search
Document Center

Dataphin:Flink_SQL task development methods

Last Updated:Jan 21, 2025

Dataphin supports a variety of development approaches for Flink_SQL tasks, such as native DDL+DML, Catalog-based development, development based on Catalog, and the use of Dataphin meta tables, data source tables, and computing source physical tables. These tables, including mirror tables, can be freely combined. Each development method offers different usage techniques, applicable scenarios, and has its own set of pros and cons. This topic provides an overview of each method to facilitate more effective Flink_SQL task development.

Dataphin computing source physical table development method

The Dataphin computing source physical table development method enables direct access to physical tables within the computing source by specifying project_name.table_name during Flink_SQL task development. This method also permits cross-project access to physical tables associated with other projects.

Important
  • Currently, this method supports accessing physical table data from Hologres, Hadoop, StarRocks computing sources.

  • The project containing the accessed physical table must have the supported computing source bound to it.

Example

To insert data from the test physical table in the example project's computing source into the test_demo physical table, refer to the following sample code:

insert into test_demo select id,name from example.test;

Dataphin data source table development method

The development method for Dataphin data source tables enables direct access to tables within the data source created in Dataphin when developing Flink_SQL tasks. To utilize this method, configure the data source encoding in Dataphin first. For detailed instructions, see Data source.

Once the data source encoding is set, you can access tables within that data source in Flink SQL tasks by using the syntax data_source_encoding.table or data_source_encoding.schema.table. To dynamically access the data source appropriate for the current environment, use the syntax ${data_source_encoding}.table or ${data_source_encoding}.schema.table. Examples of accessing tables from MySQL and Hologres data sources are as follows:

Important

Currently, only MySQL, Hologres, MaxCompute, Hive, Oracle, StarRocks, and SelectDB data sources are supported.

  • MySQL, Hive, Oracle, and StarRocks data source tables: Support accessing physical tables in the data source using data_source_encoding.table_name.

  • Hologres data source tables: Enable access to Hologres data source physical tables using the format data_source_encoding.schema_name.table_name.

Example

To insert data from the demo_mysql physical table in the MySQL data source (with data source encoding ds_demo_mysql) into the test_demo physical table, refer to the following code:

insert into test_demo select id,name from ds_demo_mysql.demo_mysql;

To insert data from the demo_hologres physical table in the Hologres data source (with data source encoding ds_demo_hologres and schema name hologres) into the test_demo physical table, refer to the following code:

insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;

Dataphin meta table development method

Within Dataphin, a meta table represents a higher-level logical abstraction over native DDL+DML development. It is a cross-storage type table managed through data management. Input, output, and dimension tables required in the development process can be created and managed by establishing meta tables. This approach allows for the creation of a table once and its multiple references thereafter, streamlining development and enhancing efficiency and user experience. Additionally, meta tables help prevent the exposure of sensitive data that could occur when writing native Flink DDL statements directly.

Example

To create demo01 and demo02 data tables and insert data from demo01 into demo02, follow the steps below:

  1. Utilize Dataphin's meta table feature to establish data tables demo01 and demo02. For detailed instructions, see how to create a new meta table.

  2. Compose the insert statement for the Flink_SQL task as shown in the sample code below:

INSERT into demo02 select * from demo01;

Development based on Catalog

Catalog-based development involves connecting to a database by creating a Catalog in Flink_SQL tasks and utilizing tables within the Catalog. This method eliminates the need to write DDL statements for tables, thereby simplifying the coding process in Flink SQL. For instance, after establishing Catalog01 and creating a table t1 in a Flink_SQL task, you can directly access t1 by creating Catalog01 again in a new Flink_SQL task.

Important
  • This method is only compatible with the open-source Flink real-time computing engine.

  • Creation of physical tables in Catalog is not supported (only temporary tables in memory can be created).

  • The use of USE CATALOG/USE DATABASE statements is not supported.

  • The ALTER TABLE statements are supported exclusively by the Flink 1.17 version.

  • Accessing tables using the format catalog.database.'schema.table' is not supported; only the format catalog.database.table is supported.

  • Currently, supported Catalog types include JDBC (MySQL, Oracle) and Paimon.

Example

CREATE CATALOG my_catalog WITH (
    'type' = 'jdbc',
    'base-url' = 'jdbc:mysql://rm-uf*******7o.mysql.rds.aliyuncs.com:3306',
    'default-database' = 'dataphin_01',
    'username' = '*******',
    'password' = '*******'
);
CREATE TEMPORARY TABLE t2 (
    id bigint,
    name STRING
) WITH (
    'connector' = 'print'
);

-- write streaming data to dynamic table
INSERT INTO t2 SELECT id,name FROM my_catalog.dataphin_01.pf_id_name;

Native DDL+DML development method

Native DDL development is the approach where Flink SQL statements are directly utilized to create and manage data tables within Flink_SQL tasks. This involves using commands like CREATE TABLE or CREATE TEMPORARY TABLE for table creation. The method involves defining the table's structure in code and managing tables via SQL statements.

Important

The native DDL+DML development method necessitates writing plaintext usernames and passwords in the code, which poses a risk to data security and could lead to data breaches. Exercise caution when using this method.

Example

To employ the native DDL+DML development method in Flink_SQL tasks, refer to the following sample code for creation. This code demonstrates the input and output of analog data (reading from table t1 and writing to table t2).

Note

To create tables using native DDL+DML statements, it is necessary to first disable the coding standards in Dataphin that Prohibit The Use Of Flink Native DDL Statements. For detailed instructions, see coding standards.

create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);

-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;