Dataphin supports a variety of development approaches for Flink_SQL tasks, such as native DDL+DML, Catalog-based development, development based on Catalog, and the use of Dataphin meta tables, data source tables, and computing source physical tables. These tables, including mirror tables, can be freely combined. Each development method offers different usage techniques, applicable scenarios, and has its own set of pros and cons. This topic provides an overview of each method to facilitate more effective Flink_SQL task development.
Dataphin computing source physical table development method
The Dataphin computing source physical table development method enables direct access to physical tables within the computing source by specifying project_name.table_name
during Flink_SQL task development. This method also permits cross-project access to physical tables associated with other projects.
Currently, this method supports accessing physical table data from Hologres, Hadoop, StarRocks computing sources.
The project containing the accessed physical table must have the supported computing source bound to it.
Example
To insert data from the test
physical table in the example
project's computing source into the test_demo
physical table, refer to the following sample code:
insert into test_demo select id,name from example.test;
Dataphin data source table development method
The development method for Dataphin data source tables enables direct access to tables within the data source created in Dataphin when developing Flink_SQL tasks. To utilize this method, configure the data source encoding in Dataphin first. For detailed instructions, see Data source.
Once the data source encoding is set, you can access tables within that data source in Flink SQL tasks by using the syntax data_source_encoding.table or data_source_encoding.schema.table
. To dynamically access the data source appropriate for the current environment, use the syntax ${data_source_encoding}.table
or ${data_source_encoding}.schema.table
. Examples of accessing tables from MySQL and Hologres data sources are as follows:
Currently, only MySQL, Hologres, MaxCompute, Hive, Oracle, StarRocks, and SelectDB data sources are supported.
MySQL, Hive, Oracle, and StarRocks data source tables: Support accessing physical tables in the data source using
data_source_encoding.table_name
.Hologres data source tables: Enable access to Hologres data source physical tables using the format
data_source_encoding.schema_name.table_name
.
Example
To insert data from the demo_mysql
physical table in the MySQL data source (with data source encoding ds_demo_mysql
) into the test_demo
physical table, refer to the following code:
insert into test_demo select id,name from ds_demo_mysql.demo_mysql;
To insert data from the demo_hologres
physical table in the Hologres data source (with data source encoding ds_demo_hologres
and schema name hologres
) into the test_demo
physical table, refer to the following code:
insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;
Dataphin meta table development method
Within Dataphin, a meta table represents a higher-level logical abstraction over native DDL+DML development. It is a cross-storage type table managed through data management. Input, output, and dimension tables required in the development process can be created and managed by establishing meta tables. This approach allows for the creation of a table once and its multiple references thereafter, streamlining development and enhancing efficiency and user experience. Additionally, meta tables help prevent the exposure of sensitive data that could occur when writing native Flink DDL statements directly.
Example
To create demo01
and demo02
data tables and insert data from demo01
into demo02
, follow the steps below:
Utilize Dataphin's meta table feature to establish data tables
demo01
anddemo02
. For detailed instructions, see how to create a new meta table.Compose the insert statement for the Flink_SQL task as shown in the sample code below:
INSERT into demo02 select * from demo01;
Development based on Catalog
Catalog-based development involves connecting to a database by creating a Catalog in Flink_SQL tasks and utilizing tables within the Catalog. This method eliminates the need to write DDL statements for tables, thereby simplifying the coding process in Flink SQL. For instance, after establishing Catalog01
and creating a table t1
in a Flink_SQL task, you can directly access t1
by creating Catalog01
again in a new Flink_SQL task.
This method is only compatible with the open-source Flink real-time computing engine.
Creation of physical tables in Catalog is not supported (only temporary tables in memory can be created).
The use of
USE CATALOG/USE DATABASE
statements is not supported.The
ALTER TABLE
statements are supported exclusively by the Flink 1.17 version.Accessing tables using the format
catalog.database.'schema.table'
is not supported; only the formatcatalog.database.table
is supported.Currently, supported Catalog types include JDBC (MySQL, Oracle) and Paimon.
Example
CREATE CATALOG my_catalog WITH (
'type' = 'jdbc',
'base-url' = 'jdbc:mysql://rm-uf*******7o.mysql.rds.aliyuncs.com:3306',
'default-database' = 'dataphin_01',
'username' = '*******',
'password' = '*******'
);
CREATE TEMPORARY TABLE t2 (
id bigint,
name STRING
) WITH (
'connector' = 'print'
);
-- write streaming data to dynamic table
INSERT INTO t2 SELECT id,name FROM my_catalog.dataphin_01.pf_id_name;
Native DDL+DML development method
Native DDL development is the approach where Flink SQL statements are directly utilized to create and manage data tables within Flink_SQL tasks. This involves using commands like CREATE TABLE or CREATE TEMPORARY TABLE
for table creation. The method involves defining the table's structure in code and managing tables via SQL statements.
The native DDL+DML development method necessitates writing plaintext usernames and passwords in the code, which poses a risk to data security and could lead to data breaches. Exercise caution when using this method.
Example
To employ the native DDL+DML development method in Flink_SQL tasks, refer to the following sample code for creation. This code demonstrates the input and output of analog data (reading from table t1 and writing to table t2).
To create tables using native DDL+DML statements, it is necessary to first disable the coding standards in Dataphin that Prohibit The Use Of Flink Native DDL Statements. For detailed instructions, see coding standards.
create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);
-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;