Dataphin は、Flink SQL タスクを開発するための複数のメソッドをサポートしています。これらのメソッドには、ネイティブのデータ定義言語 (DDL) とデータ操作言語 (DML) による開発、カタログベースの開発、Dataphin メタテーブルによる開発、Dataphin データソーステーブルによる開発、コンピューティングソースの物理テーブルによる開発などがあります。イメージテーブルなど、異なるメソッドで作成されたテーブルは併用できます。各メソッドには、それぞれ固有の手順、適用シナリオ、長所と短所があります。このトピックでは、これらの開発メソッドについて説明し、Flink SQL タスクを効率的に開発できるよう支援します。
Dataphin コンピューティングソース物理テーブル開発方法
Dataphin コンピューティングソース物理テーブル開発メソッドを使用すると、Flink SQL タスクの開発時に、コンピューティングソース内の物理テーブルに直接アクセスできます。そのためには、project_name.table_name というフォーマットでテーブルを指定します。このメソッドは、他のプロジェクトにアタッチされている物理テーブルへのクロスプロジェクトアクセスもサポートしています。
現在、このメソッドは Hologres、 Hadoop、StarRocks のコンピューティングソースからの物理テーブルデータへのアクセスをサポートしています。
物理テーブルを含むプロジェクトは、サポートされているコンピューティングソースにアタッチされている必要があります。
例
example プロジェクトのコンピューティングソースにある物理テーブル test のデータを物理テーブル test_demo に挿入するには、次のサンプルコードをご参照ください。
insert into test_demo select id,name from example.test;Dataphin データソーステーブル開発方法
Dataphin データソーステーブル開発メソッドを使用すると、Flink SQL タスクの開発時に、Dataphin で作成されたデータソースのテーブルに直接アクセスできます。このメソッドを使用するには、まずデータソースのデータソースエンコーディングを設定する必要があります。詳細については、「データソース管理」をご参照ください。
データソースエンコーディングを設定した後、data_source_encoding.table または data_source_encoding.schema.table というフォーマットを使用して、Flink SQL タスクでデータソースのテーブルを参照できます。現在の環境に対応するデータソースに自動的にアクセスするには、${data_source_encoding}.table または ${data_source_encoding}.schema.table というフォーマットを使用します。
現在、サポートされているデータソースは MySQL、Hologres、MaxCompute、 Hive、Oracle、StarRocks、SelectDB、GaussDB (DWS) のみです。
MySQL、 Hive、StarRocks、SelectDB など、スキーマをサポートしないデータソースの場合、
data_source_encoding.table_nameというフォーマットを使用して物理テーブルにアクセスできます。Hologres や GaussDB (DWS) など、スキーマをサポートするデータソースの場合、
data_source_encoding.schema_name.table_nameというフォーマットを使用して物理テーブルにアクセスできます。
例
エンコーディングが ds_demo_mysql の MySQL データソースの物理テーブル demo_mysql から、物理テーブル test_demo にデータを挿入するには、次のサンプルコードをご参照ください。
insert into test_demo select id,name from ds_demo_mysql.demo_mysql;エンコーディングが ds_demo_hologres でスキーマ名が hologres の Hologres データソースの物理テーブル demo_hologres から、物理テーブル test_demo にデータを挿入するには、次のサンプルコードをご参照ください。
insert into test_demo select id,name from ds_demo_hologres.hologres.demo_hologres;Dataphin メタテーブル開発方法
Dataphin において、メタテーブルはネイティブの DDL および DML 開発よりも高いレベルの抽象化を提供する論理的な概念です。メタテーブルは、Data Management によって管理されるクロスストレージクラスのテーブルです。開発プロセス中に、入力テーブル、出力テーブル、ディメンションテーブルをメタテーブルとして作成および管理できます。このメソッドを使用すると、テーブルを一度作成すれば複数回参照できます。入力テーブル、出力テーブル、ディメンションテーブルに対して、繰り返し DDL 文を記述したり、複雑なマッピングを実行したりする必要はありません。これにより、開発が簡素化され、効率とユーザーエクスペリエンスが向上します。また、メタテーブルは、ネイティブ Flink DDL 文を直接記述する際に発生する可能性のある機密情報の漏洩を防ぐのにも役立ちます。
例
データテーブル demo01 と demo02 を作成し、demo01 から demo02 にデータを挿入するには、次の手順に従います。
Dataphin のメタテーブル機能を使用して、データテーブル
demo01とdemo02を作成します。詳細については、「メタテーブルの作成」をご参照ください。Flink SQL タスクで INSERT 文を記述します。以下はサンプルコードです。
INSERT into demo02 select * from demo01;カタログベースの開発
カタログベースの開発では、Flink SQL タスクでカタログを作成してデータベースに接続し、そのカタログ内のテーブルを使用します。このメソッドでは、テーブルの DDL 文を記述する必要がないため、Flink SQL の開発が簡素化されます。たとえば、Flink SQL タスクで Catalog01 とテーブル t1 を作成した後、新しい Flink SQL タスクで Catalog01 を再度作成することで、テーブル t1 に直接アクセスできます。
オープンソースの Flink リアルタイムコンピューティングエンジンのみがサポートされています。
カタログでの物理テーブルの作成はサポートされていません。インメモリの一時テーブルのみがサポートされています。
USE CATALOG/USE DATABASE文はサポートされていません。ALTER TABLE文は Flink 1.17 でのみサポートされています。catalog.database.'schema.table'というフォーマットでのテーブルへのアクセスはサポートされていません。catalog.database.tableというフォーマットのみがサポートされています。サポートされているカタログタイプには、Java Database Connectivity (JDBC) (MySQL、Oracle) および Paimon が含まれます。
例
CREATE CATALOG my_catalog WITH (
'type' = 'jdbc',
'base-url' = 'jdbc:mysql://rm-uf*******7o.mysql.rds.aliyuncs.com:3306',
'default-database' = 'dataphin_01',
'username' = '*******',
'password' = '*******'
);
CREATE TEMPORARY TABLE t2 (
id bigint,
name STRING
) WITH (
'connector' = 'print'
);
-- ストリーミングデータを動的テーブルに書き込みます
INSERT INTO t2 SELECT id,name FROM my_catalog.dataphin_01.pf_id_name;ネイティブ DDL+DML 開発方法
ネイティブのデータ定義言語 (DDL) とデータ操作言語 (DML) の開発メソッドでは、Flink SQL タスクで Flink SQL 文を直接使用してデータテーブルを作成および管理します。たとえば、CREATE TABLE/CREATE TEMPORARY TABLE を使用してテーブルを作成できます。このメソッドは、コードでテーブルスキーマを定義し、SQL 文を使用してテーブルを管理します。
ネイティブ DDL および DML 開発メソッドでは、コード内にプレーンテキストのユーザー名とパスワードを記述する必要があります。この方法は安全ではなく、データ漏洩につながる可能性があります。このメソッドは注意して使用してください。
例
Flink SQL タスクでネイティブ DDL および DML 開発メソッドを使用するには、次のサンプルコードをご参照ください。このサンプルコードは、テーブル t1 から模擬データ (analog data) を読み取り、テーブル t2 に書き込む方法を示しています。
ネイティブ DDL および DML 文を使用してテーブルを作成するには、Dataphin の コーディング規約 で [ネイティブ Flink DDL 文の使用を禁止する] 設定を無効にする必要があります。詳細については、「コーディング規約」をご参照ください。
create temporary table t1 (
id bigint,
name varchar
) with (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create temporary table t2 (
id bigint,
name varchar
) with (
'connector' = 'print'
);
-- begin statement set;
insert into t2 select id,replace(name, '\"', '"') as name from t1;
-- set;