全部產品
Search
文件中心

Dataphin:Flink SQL任務開發方式

更新時間:Jan 14, 2026

Dataphin的Flink SQL任務支援多種開發方式,包括原生DDL+DML開發、基於Catalog開發、使用Dataphin元表進行開發、使用Dataphin資料來源表進行開發、以及使用計算源物理表進行開發,且不同開發方式所建立的表支援任意混用,包括鏡像表。不同開發方式的使用方法、使用情境以及優缺點各異。本文將為您介紹各開發方式協助您更好地完成Flink SQL任務開發。

Dataphin計算源物理表開發方式

Dataphin計算源物理表開發方式是指您在開發Flink SQL任務時,可以直接通過寫專案名.表名稱的方式直接存取計算源中的物理表。並且支援跨專案訪問,訪問其他專案綁定的物理表。

重要
  • 目前支援訪問HologresHadoop、StarRocks計算源的物理表資料

  • 訪問的物理表所在的專案已綁定支援的計算源。

使用樣本

若您需要將example專案計算源中的test物理表資料插入到test_demo物理表中。您可以參考以下範例程式碼:

insert into test_demo select id,name from example.test;

Dataphin資料來源表開發方式

Dataphin資料來源表開發方式是指您在開發Flink SQL任務時,可以直接存取在Dataphin中所建立的資料來源中的表進行任務開發。如果您希望使用此種方式,需要先在資料來源上配置資料來源編碼。具體操作,請參見資料來源管理

配置資料來源編碼後,可在Flink SQL任務中通過資料來源編碼.table資料來源編碼.schema.table的格式引用資料來源中的表;如果需要根據所處環境自動訪問對應環境的資料來源,請通過${資料來源編碼}.table或${資料來源編碼}.schema.table的格式訪問。

重要

目前僅支援MySQLHologresMaxCompute、HiveOracleStarRocksSelectDBGaussDB(DWS)資料來源。

  • 不支援Schema的資料來源(包括MySQL、Hive、StarRocks、SelectDB),可通過資料來源編碼.表名稱的形式訪問資料來源中的物理表。

  • 支援Schema的資料來源(包括Hologres、GaussDB(DWS)),可通過資料來源編碼.schema名稱.表名稱的形式訪問資料來源中的物理表。

使用樣本

若您需要將MySQL資料來源(資料來源編碼為ds_demo_mysql)的demo_mysql物理表的資料插入至test_demo物理表中。您可以參考以下程式碼完成開發。

insert into test_demo select id,name from ds_demo_mysql.demo_mysql;

若您需要將Hologres資料來源(資料來源編碼為ds_demo_hologres、schema的名稱為hologres)的demo_hologres物理表的資料插入至test_demo物理表中。您可以參考以下程式碼完成開發。

insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;

Dataphin元表開發方式

在Dataphin中,元表是在原生DDL+DML開發上更高一層的邏輯概念,元表是通過資料管理的跨儲存類型表。開發過程中所用到的輸入表、輸出表、維表可以通過建立元表方式來進行建立和管理,以支援您通過引用元表的方式來建立其他資料表。這種方式可以使您一次建表,可多次引用。您無需重複編寫DDL語句,無需進行繁雜的輸入、輸出、維表映射,從而簡化開發,提升效率和體驗。同時通過元表可以有效避免直接編寫原生Flink DDL語句導致的敏感資訊透出等問題。

使用樣本

若您需要建立demo01demo02資料表,並將demo01的資料插入至demo02。您可以參考以下步驟完成開發。

  1. 通過Dataphin元表功能,建立demo01demo02資料表,具體操作,請參見建立元表

  2. 在Flink SQL任務中編寫插入語句,範例程式碼如下:

INSERT into demo02 select * from demo01;

基於Catalog開發

基於Catalog的開發方式是指在Flink SQL任務中通過建立Catalog串連資料庫,並使用Catalog中的表。通過該方式能夠避免編寫表的DDL語句,以簡化Flink SQL編碼工作。例如,在Flink SQL任務中建立Catalog01並建表t1後,在新的Flink SQL任務中再次建立Catalog01,可以直接存取表t1

重要
  • 僅支援開源FlinkRealtime Compute引擎。

  • 不支援在Catalog中建立物理表(僅支援建立記憶體暫存資料表)。

  • 不支援USE CATALOG/USE DATABASE語句。

  • ALTER TABLE語句僅支援Flink 1.17版本。

  • 不支援以catalog.database.'schema.table'的格式訪問表,僅支援以catalog.database.table格式訪問表。

  • 目前支援的Catalog類型包括JDBCMySQLOracle)和Paimon

使用樣本

CREATE CATALOG my_catalog WITH (
    'type' = 'jdbc',
    'base-url' = 'jdbc:mysql://rm-uf*******7o.mysql.rds.aliyuncs.com:3306',
    'default-database' = 'dataphin_01',
    'username' = '*******',
    'password' = '*******'
);
CREATE TEMPORARY TABLE t2 (
    id bigint,
    name STRING
) WITH (
    'connector' = 'print'
);

-- write streaming data to dynamic table
INSERT INTO t2 SELECT id,name FROM my_catalog.dataphin_01.pf_id_name;

原生DDL+DML開發方式

原生DDL開發是指在Flink SQL任務使用Flink SQL語句直接建立和管理資料表的開發方式。如使用CREATE TABLE/CREATE TEMPORARY TABLE建立表。這種開發方式通過代碼定義表結構並通過SQL語句來建立和管理表。

重要

原生DDL+DML開發方式因為需要在代碼中編寫明文的使用者名稱和密碼,導致資料不安全,可能造成資料泄露,請謹慎使用

使用樣本

若您需要在Flink SQL任務中使用原生DDL+DML開發方式,您可以參考以下範例程式碼進行建立。以下範例程式碼實現了類比資料的輸入輸出(從t1表中讀資料寫到t2表中)。

說明

使用原生DDL+DML開發語句建立表,您需關閉Dataphin編碼規範中的禁止使用Flink原生DDL語句設定。具體操作,請參見編碼規範

create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);

-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;