すべてのプロダクト
Search
ドキュメントセンター

Data Transmission Service:Flink SQL モードでの ETL タスクの設定

最終更新日:Mar 28, 2026
重要

この機能は近日中に提供終了となります。現在は、本機能を既に有効化した特定ユーザーのみが無料でご利用いただけます。今後 ETL タスクを設定する場合は、データ同期またはデータ移行インスタンスをご利用ください。「データ移行または同期タスクにおける ETL の設定」をご参照ください。

ソースデータベースから宛先データベースへデータがリアルタイムで流れている際に、2 つのテーブルの結合、ルックアップデータによるレコードのエンリッチメント、行のフィルター処理など、データをリアルタイムで変換する必要がある場合、DTS ストリーミング ETL を使用すると、そのロジックを SQL で定義できます。Flink SQL モードでは、DAG モードでは利用できないステートメントを含むフル SQL の表現力が得られ、ストリーム処理の内部構造に関する知識は不要です。

本トピックでは、Flink SQL モードでのストリーミング ETL タスクの設定方法について説明します。

前提条件

タスクの設定を開始する前に、以下の点を確認してください。

  • ETL タスクが次のいずれかのリージョンで実行されるようにしてください:中国 (杭州)、中国 (上海)、中国 (青島)、中国 (北京)、中国 (張家口)、中国 (深セン)、中国 (広州)、または中国 (香港)。

  • ソースデータベースが次のいずれかのタイプであることを確認してください:MySQLPolarDB for MySQLOraclePostgreSQLiSeries DB2 (AS/400)Db2 for LUWPolarDB-X 1.0PolarDB for PostgreSQLMariaDBPolarDB for OracleSQL Server、または PolarDB-X 2.0

  • ターゲットデータベースが次のいずれかのタイプであることを確認してください:MySQLPolarDB for MySQLOracleAnalyticDB for MySQL V3.0PolarDB for PostgreSQLPostgreSQLDb2 for LUWiSeries DB2 (AS/400)AnalyticDB for PostgreSQLSQL ServerMariaDBPolarDB-X 1.0PolarDB for Oracle、または Tablestore

  • ソースデータベースとターゲットデータベースが同一リージョンにあり、かつ同一の Alibaba Cloud アカウント下にあることを確認してください。

  • すべてのストリームテーブルが同一インスタンスに属していることを確認してください。

  • すべてのデータベース名およびテーブル名は一意です。

  • 宛先データベースにターゲットスキーマが事前に作成済みであることを確認してください。ETL 機能ではスキーマ移行はサポートされていないため、タスク開始前に宛先テーブルを手動で作成する必要があります。たとえば、テーブル A(フィールド 1、フィールド 2、フィールド 3)とテーブル B(フィールド 2、フィールド 3、フィールド 4)を結合し、結果としてフィールド 2 およびフィールド 3 を含むテーブルを作成する場合、まず宛先データベースにこれらのフィールドを持つテーブル C を作成してください。

  • ソースおよび宛先インスタンスは、Data Management (DMS) に登録されています。「インスタンス管理」を参照してください。

  • ETL タスクでは増分データのみがサポートされます。完全なデータ同期はサポートされていません。

基本概念

テーブルの種類

種類役割説明
ストリームテーブルソースリアルタイムで更新されます。ディメンションテーブルと結合してデータエンリッチメントを行うことができます。
ディメンションテーブルソース(ルックアップ)静的または変更頻度が低いテーブルです。分析用のストリーミングデータをエンリッチするために使用されます。
出力(シンク)送信先変換後のデータが書き込まれるテーブルです。

ストリームの種類

CREATE TABLE でストリームテーブルを定義する際、streamType を設定することで、動的テーブルの変更内容が宛先に書き込まれる際のエンコーディング方式を制御できます。

サポートされる操作使用タイミング備考
appendINSERT のみデータは挿入のみで、更新や削除は行われません宛先には新規の行のみが受信されます。
upsertINSERT、UPDATE、DELETEデータの挿入、更新、削除が可能です一意キー(複合キー可)が必要です。INSERT および UPDATE はアップサートメッセージとして、DELETE はデリートメッセージとしてエンコードされます。

Flink SQL モードでのストリーミング ETL タスクの設定

設定は以下の 5 つのステップで構成されます。

  1. データフローを作成し、Flink SQL モードを選択する

  2. ソースおよび宛先データベースを追加する

  3. Flink SQL ステートメントを作成する

  4. 検証および公開を行う

  5. インスタンスを購入し、タスクを開始する

ステップ 1:データフローの作成

  1. DTS コンソール にログインします。DTS コンソール

  2. 左側のナビゲーションウィンドウで、ETL をクリックします。

  3. 新增数据流 をクリックします。データフローの作成 ダイアログボックスで、データフロー名 フィールドに名前を入力し、FlinkSQL開発方法 として選択します。

  4. OK をクリックします。

ステップ 2:ソースおよび宛先データベースの追加

ストリーミング ETL ページの データフロー情報 セクションで、ソースおよび宛先データベースを設定します。

パラメーター説明
リージョンデータベースが配置されているリージョンです。
種別このデータベースエントリの役割です。ストリームテーブル をリアルタイムのソース、ディメンションテーブル を静的なルックアップテーブル、出力 を宛先としてそれぞれ選択します。
データベースタイプソースまたは宛先データベースのタイプです。
インスタンスインスタンスの名前または ID です。インスタンスは Data Management (DMS) に登録済みである必要があります。
データベース変換対象のテーブルを含むデータベースです。
物理テーブルソースまたは宛先のテーブルです。
物理テーブルのエイリアステーブルの読みやすい名前です。このエイリアスは、Flink SQL ステートメント内で参照され、ここで選択した物理テーブルと各 CREATE TABLE 宣言を関連付けます。

ステップ 3:Flink SQL ステートメントの作成

ストリーミング ETL ページのスクリプトエディタで、ETL ロジックを定義する SQL ステートメントを作成します。

重要

各 SQL ステートメントはセミコロン (;) で終了する必要があります。

完全な Flink SQL スクリプトには、以下の 3 種類のステートメントが必要です。

ステートメント目的
CREATE TABLEソースおよび宛先テーブルを定義します。ETL パラメーターは WITH 句に指定します。
CREATE VIEWストリームテーブルとディメンションテーブルの結合など、データ変換ロジックを記述します。
INSERT INTOビューから変換されたデータを宛先テーブルに書き込みます。

WITH 句のパラメーター

CREATE TABLE ステートメントでは、WITH 句を使用してテーブルの ETL 動作を設定します。

パラメーター適用対象説明
streamTypeストリームテーブルのみ宛先への書き込み時に変更内容がどのようにエンコードされるかを指定します。有効な値: appendupsert。「ストリームの種類」をご参照ください。
aliasすべてのテーブル種別ステップ 2 で設定した 物理テーブルのエイリアス の値と完全に一致させる必要があります。この値により、CREATE TABLE 宣言が選択した物理テーブルと関連付けられます。
vertexTypeすべてのテーブル種別テーブルの役割です。有効な値: stream(ストリームテーブル)、lookup(ディメンションテーブル)、sink(宛先テーブル)。

例:ストリームテーブルとディメンションテーブルの結合

以下のスクリプトは、ストリームテーブル test_orders とディメンションテーブル product を結合し、その結果を宛先テーブル test_orders_new に挿入します。

CREATE TABLE `etltest_test_orders` (
  `order_id` BIGINT,
  `user_id` BIGINT,
  `product_id` BIGINT,
  `total_price` DECIMAL(15,2),
  `order_date` TIMESTAMP(6),
  `dts_etl_schema_db_table` STRING,
  `dts_etl_db_log_time` BIGINT,
  `pt` AS PROCTIME(),
  WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
) WITH (
  'streamType'= 'append',
  'alias'= 'test_orders',
  'vertexType'= 'stream'
);
CREATE TABLE `etltest_product` (
  `product_id` BIGINT,
  `product_name` STRING,
  `product_price` DECIMAL(15,2)
) WITH (
  'alias'= 'product',
  'vertexType'= 'lookup'
);
CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
SELECT
  `etltest_test_orders`.`order_id` AS `order_id`,
  `etltest_test_orders`.`user_id` AS `user_id`,
  `etltest_test_orders`.`product_id` AS `product_id`,
  `etltest_test_orders`.`total_price` AS `total_price`,
  `etltest_test_orders`.`order_date` AS `order_date`,
  `etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
  `etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
  `etltest_product`.`product_id` AS `product_id_0001011101`,
  `etltest_product`.`product_name` AS `product_name`,
  `etltest_product`.`product_price` AS `product_price`
FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
;
CREATE TABLE `test_orders_new` (
  `order_id` BIGINT,
  `user_id` BIGINT,
  `product_id` BIGINT,
  `total_price` DECIMAL(15,2),
  `order_date` TIMESTAMP(6),
  `product_name` STRING,
  `product_price` DECIMAL(15,2)
) WITH (
  'alias'= 'test_orders_new',
  'vertexType'= 'sink'
);
INSERT INTO `test_orders_new` (
  `order_id`,
  `user_id`,
  `product_id`,
  `total_price`,
  `order_date`,
  `product_name`,
  `product_price`
)
SELECT
  `etltest_test_orders_JOIN_etltest_product`.`order_id`,
  `etltest_test_orders_JOIN_etltest_product`.`user_id`,
  `etltest_test_orders_JOIN_etltest_product`.`product_id`,
  `etltest_test_orders_JOIN_etltest_product`.`total_price`,
  `etltest_test_orders_JOIN_etltest_product`.`order_date`,
  `etltest_test_orders_JOIN_etltest_product`.`product_name`,
  `etltest_test_orders_JOIN_etltest_product`.`product_price`
FROM `etltest_test_orders_JOIN_etltest_product`;

ステップ 4:検証および公開

  1. Flink SQL 検証の生成 をクリックして、SQL ステートメントを検証します。

    • 検証が成功した場合は、ETL校验成功 をクリックして詳細を確認します。

    • 検証が失敗した場合は、ETL校验成功 をクリックしてエラーの詳細を確認し、SQL ステートメントを修正したうえで再度検証を実行します。

    説明

    公開 をクリックすると、SQL 検証と事前チェックが 1 回の操作で実行されます。

  2. 検証が成功した後、公開 をクリックして事前チェックを実行します。

  3. 事前チェックの成功率达到 100 % になるまで待ちます。

    説明

    事前チェックが失敗した場合は、各失敗項目の横にある 詳細の表示 をクリックし、問題を解決したうえで再度事前チェックを実行してください。

ステップ 5:インスタンスの購入およびタスクの開始

  1. インスタンスの購入 ページで、インスタンスクラス を選択し、コンピューティングユニット (CU) の値を確認します。パブリックプレビュー期間中は CU の値は固定で 2 です。

  2. Data Transmission Service(従量課金)サービス利用規約 および パブリックプレビュー版サービス利用規約 をお読みになり、該当するチェックボックスを選択して同意してください。

  3. 購入して開始 をクリックして ETL タスクを開始します。

説明

パブリックプレビュー期間中は、各ユーザーが無料で最大 2 つの ETL インスタンスを作成できます。

関連トピック