この機能は近日中に提供終了となります。現在は、本機能を既に有効化した特定ユーザーのみが無料でご利用いただけます。今後 ETL タスクを設定する場合は、データ同期またはデータ移行インスタンスをご利用ください。「データ移行または同期タスクにおける ETL の設定」をご参照ください。
ソースデータベースから宛先データベースへデータがリアルタイムで流れている際に、2 つのテーブルの結合、ルックアップデータによるレコードのエンリッチメント、行のフィルター処理など、データをリアルタイムで変換する必要がある場合、DTS ストリーミング ETL を使用すると、そのロジックを SQL で定義できます。Flink SQL モードでは、DAG モードでは利用できないステートメントを含むフル SQL の表現力が得られ、ストリーム処理の内部構造に関する知識は不要です。
本トピックでは、Flink SQL モードでのストリーミング ETL タスクの設定方法について説明します。
前提条件
タスクの設定を開始する前に、以下の点を確認してください。
ETL タスクが次のいずれかのリージョンで実行されるようにしてください:中国 (杭州)、中国 (上海)、中国 (青島)、中国 (北京)、中国 (張家口)、中国 (深セン)、中国 (広州)、または中国 (香港)。
ソースデータベースが次のいずれかのタイプであることを確認してください:MySQL、PolarDB for MySQL、Oracle、PostgreSQL、iSeries DB2 (AS/400)、Db2 for LUW、PolarDB-X 1.0、PolarDB for PostgreSQL、MariaDB、PolarDB for Oracle、SQL Server、または PolarDB-X 2.0。
ターゲットデータベースが次のいずれかのタイプであることを確認してください:MySQL、PolarDB for MySQL、Oracle、AnalyticDB for MySQL V3.0、PolarDB for PostgreSQL、PostgreSQL、Db2 for LUW、iSeries DB2 (AS/400)、AnalyticDB for PostgreSQL、SQL Server、MariaDB、PolarDB-X 1.0、PolarDB for Oracle、または Tablestore。
ソースデータベースとターゲットデータベースが同一リージョンにあり、かつ同一の Alibaba Cloud アカウント下にあることを確認してください。
すべてのストリームテーブルが同一インスタンスに属していることを確認してください。
すべてのデータベース名およびテーブル名は一意です。
宛先データベースにターゲットスキーマが事前に作成済みであることを確認してください。ETL 機能ではスキーマ移行はサポートされていないため、タスク開始前に宛先テーブルを手動で作成する必要があります。たとえば、テーブル A(フィールド 1、フィールド 2、フィールド 3)とテーブル B(フィールド 2、フィールド 3、フィールド 4)を結合し、結果としてフィールド 2 およびフィールド 3 を含むテーブルを作成する場合、まず宛先データベースにこれらのフィールドを持つテーブル C を作成してください。
ソースおよび宛先インスタンスは、Data Management (DMS) に登録されています。「インスタンス管理」を参照してください。
ETL タスクでは増分データのみがサポートされます。完全なデータ同期はサポートされていません。
基本概念
テーブルの種類
| 種類 | 役割 | 説明 |
|---|---|---|
| ストリームテーブル | ソース | リアルタイムで更新されます。ディメンションテーブルと結合してデータエンリッチメントを行うことができます。 |
| ディメンションテーブル | ソース(ルックアップ) | 静的または変更頻度が低いテーブルです。分析用のストリーミングデータをエンリッチするために使用されます。 |
| 出力(シンク) | 送信先 | 変換後のデータが書き込まれるテーブルです。 |
ストリームの種類
CREATE TABLE でストリームテーブルを定義する際、streamType を設定することで、動的テーブルの変更内容が宛先に書き込まれる際のエンコーディング方式を制御できます。
| 値 | サポートされる操作 | 使用タイミング | 備考 |
|---|---|---|---|
append | INSERT のみ | データは挿入のみで、更新や削除は行われません | 宛先には新規の行のみが受信されます。 |
upsert | INSERT、UPDATE、DELETE | データの挿入、更新、削除が可能です | 一意キー(複合キー可)が必要です。INSERT および UPDATE はアップサートメッセージとして、DELETE はデリートメッセージとしてエンコードされます。 |
Flink SQL モードでのストリーミング ETL タスクの設定
設定は以下の 5 つのステップで構成されます。
データフローを作成し、Flink SQL モードを選択する
ソースおよび宛先データベースを追加する
Flink SQL ステートメントを作成する
検証および公開を行う
インスタンスを購入し、タスクを開始する
ステップ 1:データフローの作成
ステップ 2:ソースおよび宛先データベースの追加
ストリーミング ETL ページの データフロー情報 セクションで、ソースおよび宛先データベースを設定します。
| パラメーター | 説明 |
|---|---|
| リージョン | データベースが配置されているリージョンです。 |
| 種別 | このデータベースエントリの役割です。ストリームテーブル をリアルタイムのソース、ディメンションテーブル を静的なルックアップテーブル、出力 を宛先としてそれぞれ選択します。 |
| データベースタイプ | ソースまたは宛先データベースのタイプです。 |
| インスタンス | インスタンスの名前または ID です。インスタンスは Data Management (DMS) に登録済みである必要があります。 |
| データベース | 変換対象のテーブルを含むデータベースです。 |
| 物理テーブル | ソースまたは宛先のテーブルです。 |
| 物理テーブルのエイリアス | テーブルの読みやすい名前です。このエイリアスは、Flink SQL ステートメント内で参照され、ここで選択した物理テーブルと各 CREATE TABLE 宣言を関連付けます。 |
ステップ 3:Flink SQL ステートメントの作成
ストリーミング ETL ページのスクリプトエディタで、ETL ロジックを定義する SQL ステートメントを作成します。
各 SQL ステートメントはセミコロン (;) で終了する必要があります。
完全な Flink SQL スクリプトには、以下の 3 種類のステートメントが必要です。
| ステートメント | 目的 |
|---|---|
CREATE TABLE | ソースおよび宛先テーブルを定義します。ETL パラメーターは WITH 句に指定します。 |
CREATE VIEW | ストリームテーブルとディメンションテーブルの結合など、データ変換ロジックを記述します。 |
INSERT INTO | ビューから変換されたデータを宛先テーブルに書き込みます。 |
WITH 句のパラメーター
各 CREATE TABLE ステートメントでは、WITH 句を使用してテーブルの ETL 動作を設定します。
| パラメーター | 適用対象 | 説明 |
|---|---|---|
streamType | ストリームテーブルのみ | 宛先への書き込み時に変更内容がどのようにエンコードされるかを指定します。有効な値: append、upsert。「ストリームの種類」をご参照ください。 |
alias | すべてのテーブル種別 | ステップ 2 で設定した 物理テーブルのエイリアス の値と完全に一致させる必要があります。この値により、CREATE TABLE 宣言が選択した物理テーブルと関連付けられます。 |
vertexType | すべてのテーブル種別 | テーブルの役割です。有効な値: stream(ストリームテーブル)、lookup(ディメンションテーブル)、sink(宛先テーブル)。 |
例:ストリームテーブルとディメンションテーブルの結合
以下のスクリプトは、ストリームテーブル test_orders とディメンションテーブル product を結合し、その結果を宛先テーブル test_orders_new に挿入します。
CREATE TABLE `etltest_test_orders` (
`order_id` BIGINT,
`user_id` BIGINT,
`product_id` BIGINT,
`total_price` DECIMAL(15,2),
`order_date` TIMESTAMP(6),
`dts_etl_schema_db_table` STRING,
`dts_etl_db_log_time` BIGINT,
`pt` AS PROCTIME(),
WATERMARK FOR `order_date` AS `order_date` - INTERVAL '5' SECOND
) WITH (
'streamType'= 'append',
'alias'= 'test_orders',
'vertexType'= 'stream'
);
CREATE TABLE `etltest_product` (
`product_id` BIGINT,
`product_name` STRING,
`product_price` DECIMAL(15,2)
) WITH (
'alias'= 'product',
'vertexType'= 'lookup'
);
CREATE VIEW `etltest_test_orders_JOIN_etltest_product` AS
SELECT
`etltest_test_orders`.`order_id` AS `order_id`,
`etltest_test_orders`.`user_id` AS `user_id`,
`etltest_test_orders`.`product_id` AS `product_id`,
`etltest_test_orders`.`total_price` AS `total_price`,
`etltest_test_orders`.`order_date` AS `order_date`,
`etltest_test_orders`.`dts_etl_schema_db_table` AS `dts_etl_schema_db_table`,
`etltest_test_orders`.`dts_etl_db_log_time` AS `dts_etl_db_log_time`,
`etltest_product`.`product_id` AS `product_id_0001011101`,
`etltest_product`.`product_name` AS `product_name`,
`etltest_product`.`product_price` AS `product_price`
FROM `etltest_test_orders` LEFT JOIN `etltest_product` FOR SYSTEM_TIME AS OF `etltest_test_orders`.`pt` ON etltest_test_orders.product_id = etltest_product.product_id
;
CREATE TABLE `test_orders_new` (
`order_id` BIGINT,
`user_id` BIGINT,
`product_id` BIGINT,
`total_price` DECIMAL(15,2),
`order_date` TIMESTAMP(6),
`product_name` STRING,
`product_price` DECIMAL(15,2)
) WITH (
'alias'= 'test_orders_new',
'vertexType'= 'sink'
);
INSERT INTO `test_orders_new` (
`order_id`,
`user_id`,
`product_id`,
`total_price`,
`order_date`,
`product_name`,
`product_price`
)
SELECT
`etltest_test_orders_JOIN_etltest_product`.`order_id`,
`etltest_test_orders_JOIN_etltest_product`.`user_id`,
`etltest_test_orders_JOIN_etltest_product`.`product_id`,
`etltest_test_orders_JOIN_etltest_product`.`total_price`,
`etltest_test_orders_JOIN_etltest_product`.`order_date`,
`etltest_test_orders_JOIN_etltest_product`.`product_name`,
`etltest_test_orders_JOIN_etltest_product`.`product_price`
FROM `etltest_test_orders_JOIN_etltest_product`;ステップ 4:検証および公開
Flink SQL 検証の生成 をクリックして、SQL ステートメントを検証します。
検証が成功した場合は、
をクリックして詳細を確認します。検証が失敗した場合は、
をクリックしてエラーの詳細を確認し、SQL ステートメントを修正したうえで再度検証を実行します。
説明公開 をクリックすると、SQL 検証と事前チェックが 1 回の操作で実行されます。
検証が成功した後、公開 をクリックして事前チェックを実行します。
事前チェックの成功率达到 100 % になるまで待ちます。
説明事前チェックが失敗した場合は、各失敗項目の横にある 詳細の表示 をクリックし、問題を解決したうえで再度事前チェックを実行してください。
ステップ 5:インスタンスの購入およびタスクの開始
インスタンスの購入 ページで、インスタンスクラス を選択し、コンピューティングユニット (CU) の値を確認します。パブリックプレビュー期間中は CU の値は固定で 2 です。
Data Transmission Service(従量課金)サービス利用規約 および パブリックプレビュー版サービス利用規約 をお読みになり、該当するチェックボックスを選択して同意してください。
購入して開始 をクリックして ETL タスクを開始します。
パブリックプレビュー期間中は、各ユーザーが無料で最大 2 つの ETL インスタンスを作成できます。