ストリーム処理とバッチ処理を統合したコンピューティングフレームワークとして、Flink は低レイテンシのストリーミングデータと高スループットのバッチデータを処理できます。Realtime Compute for Apache Flink は、ドラフト開発、デプロイメント O&M、ワークフロー、キュー管理、データプロファイリングなどの機能でバッチ処理をサポートしています。バッチ処理機能を使用して、ビジネス要件を満たすことができます。このトピックでは、Realtime Compute for Apache Flink の主要機能を使用してバッチ処理を実行する方法について説明します。
機能
Realtime Compute for Apache Flink の以下の主要機能は、バッチ処理をサポートしています。
SQL ドラフト開発: SQL エディターページのドラフトタブで、バッチドラフトを作成できます。バッチドラフトをバッチデプロイメントとしてデプロイし、バッチデプロイメントを実行できます。詳細については、「SQL ドラフトを開発する」をご参照ください。
デプロイメント管理: [デプロイメント] ページで、JAR または Python バッチドラフトをバッチデプロイメントとして直接デプロイできます。詳細については、「デプロイメント管理」をご参照ください。[デプロイメント] ページで、デプロイメント タイプのドロップダウンリストから [BATCH] を選択します。目的のバッチデプロイメントを展開して、ジョブを表示します。ほとんどの場合、バッチデプロイメントの異なるジョブは、同じデータ処理ロジックを使用しますが、データ処理日などのパラメーターが異なります。
スクリプト: SQL エディターページのスクリプトタブで、DDL 文または短いクエリ文を実行して、データを迅速に管理し、データプロファイリングを実行できます。スクリプトの作成方法の詳細については、「スクリプト」をご参照ください。短いクエリ文は、Realtime Compute for Apache Flink で事前に作成されたセッションで実行されます。このようにして、リソースを再利用することで、低レイテンシで単純なクエリを実行できます。
カタログ: カタログページで、データベースとテーブルに関する情報を含むカタログを作成および表示できます。詳細については、「カタログの管理」をご参照ください。SQL エディターページのカタログタブでカタログを表示することもできます。これにより、開発効率が向上します。
ワークフロー: ワークフローページで、ワークフローを作成し、ワークフローで実行されるタスクの依存関係を視覚的に構成できます。タスクはバッチデプロイメントに関連付けられています。詳細については、「ワークフロー (パブリックプレビュー)」をご参照ください。バッチデプロイメントは、構成した依存関係に基づいてワークフローで実行されます。手動スケジュールまたは定期スケジュールに基づいて作成したワークフローでタスクを実行できます。
キュー管理: キュー管理ページで、ワークスペース内のリソースを分割して、ストリームデプロイメント、バッチデプロイメント、および優先順位が異なるデプロイメント間でのリソース競合を防ぐことができます。詳細については、「キューの管理」をご参照ください。
注意事項
ワークスペースが作成されます。詳細については、「Realtime Compute for Apache Flink をアクティブ化する」をご参照ください。
Object Storage Service (OSS) がアクティブ化されます。詳細については、「OSS コンソールを使用して始める」をご参照ください。OSS バケットのストレージタイプは 標準 である必要があります。詳細については、「概要」をご参照ください。
このトピックの例では、Apache Paimon を使用してデータを保存し、Ververica Runtime (VVR) 8.0.5 以降を使用する Realtime Compute for Apache Flink にのみ適用されます。
例
このトピックの例では、e コマースプラットフォームのビジネスデータを処理し、データを Apache Paimon のレイクハウス形式で保存します。オペレーショナルデータストア (ODS)、データウェアハウスの詳細 (DWD)、およびデータウェアハウスサービス (DWS) レイヤーを含むデータウェアハウス構造がシミュレートされます。Realtime Compute for Apache Flink のバッチ処理機能により、データを処理およびクレンジングし、Apache Paimon テーブルに書き込むことができます。このようにして、階層型データストレージ構造が確立されます。

準備
スクリプト を作成します。
[スクリプト] タブの SQL エディターページで、カタログを作成し、カタログにデータベースとテーブルを作成してから、シミュレートされたデータをテーブルに挿入できます。
Apache Paimon カタログを作成します。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力します。
CREATE CATALOG `my_catalog` WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = '<warehouse>', 'fs.oss.endpoint' = '<fs.oss.endpoint>', 'fs.oss.accessKeyId' = '<fs.oss.accessKeyId>', 'fs.oss.accessKeySecret' = '<fs.oss.accessKeySecret>' );次の表は、サンプルコードのパラメーターについて説明しています。
パラメーター
説明
必須
備考
type
カタログタイプ。
はい
値を Paimon に設定します。
metastore
メタデータストレージタイプ。
はい
サンプルコードでは、値は filesystem に設定されています。他のタイプの詳細については、「Apache Paimon カタログの管理」をご参照ください。
warehouse
OSS で指定されたデータウェアハウスディレクトリ。
はい
形式は oss://<bucket>/<object> です。ディレクトリ内のパラメーター:
bucket: 作成した OSS バケットの名前を示します。
object: データが保存されているパスを示します。
OSS コンソール でバケット名とオブジェクト名を表示できます。
fs.oss.endpoint
OSS のエンドポイント。
いいえ
warehouse パラメーターで指定された OSS バケットが Realtime Compute for Apache Flink ワークスペースと同じリージョンにない場合、または別の Alibaba Cloud アカウント内の OSS バケットを使用する場合は、このパラメーターが必要です。
詳細については、「リージョンとエンドポイント」をご参照ください。
fs.oss.accessKeyId
OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey ID。
いいえ
warehouse パラメーターで指定された OSS バケットが Realtime Compute for Apache Flink ワークスペースと同じリージョンにない場合、または別の Alibaba Cloud アカウント内の OSS バケットを使用する場合は、このパラメーターが必要です。AccessKey ペアの取得方法の詳細については、「AccessKey を作成する」をご参照ください。
fs.oss.accessKeySecret
OSS に対する読み取りおよび書き込み権限を持つ Alibaba Cloud アカウントまたは RAM ユーザーの AccessKey シークレット。
いいえ
上記のコードを選択し、スクリプトエディターの左側にある 実行 をクリックします。
The following statement has been executed successfully!というメッセージが表示された場合は、カタログが作成されています。[カタログ] ページ、または [SQL エディター] ページの [カタログ] タブで、作成したカタログを表示できます。
手順
手順 1: ODS レイヤーにテーブルを作成し、テーブルにテストデータを挿入する
テストデータは、DWD または DWS レイヤーのテーブルで後続のデータ生成を行うために、ODS レイヤーのテーブルに直接挿入されます。これにより、例の手順が簡素化されます。実際の運用環境では、Realtime Compute for Apache Flink はストリーム処理機能を使用して外部データソースからデータを読み取り、ODS レイヤーのデータとしてデータレイクに書き込みます。詳細については、「Apache Paimon の基本機能入門」をご参照ください。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックします。
CREATE DATABASE `my_catalog`.`order_dw`; USE `my_catalog`.`order_dw`; CREATE TABLE orders ( order_id BIGINT, user_id STRING, shop_id BIGINT, product_id BIGINT, buy_fee BIGINT, create_time TIMESTAMP, update_time TIMESTAMP, state INT ); CREATE TABLE orders_pay ( pay_id BIGINT, order_id BIGINT, pay_platform INT, create_time TIMESTAMP ); CREATE TABLE product_catalog ( product_id BIGINT, catalog_name STRING ); -- Insert test data into the tables. INSERT INTO orders VALUES (100001, 'user_001', 12345, 1, 5000, TO_TIMESTAMP('2023-02-15 16:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100002, 'user_002', 12346, 2, 4000, TO_TIMESTAMP('2023-02-15 15:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100003, 'user_003', 12347, 3, 3000, TO_TIMESTAMP('2023-02-15 14:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100004, 'user_001', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 13:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100005, 'user_002', 12348, 5, 1000, TO_TIMESTAMP('2023-02-15 12:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100006, 'user_001', 12348, 1, 1000, TO_TIMESTAMP('2023-02-15 11:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100007, 'user_003', 12347, 4, 2000, TO_TIMESTAMP('2023-02-15 10:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1); INSERT INTO orders_pay VALUES (2001, 100001, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2002, 100002, 1, TO_TIMESTAMP('2023-02-15 17:40:56')), (2003, 100003, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2004, 100004, 0, TO_TIMESTAMP('2023-02-15 17:40:56')), (2005, 100005, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2006, 100006, 0, TO_TIMESTAMP('2023-02-15 18:40:56')), (2007, 100007, 0, TO_TIMESTAMP('2023-02-15 18:40:56')); INSERT INTO product_catalog VALUES (1, 'phone_aaa'), (2, 'phone_bbb'), (3, 'phone_ccc'), (4, 'phone_ddd'), (5, 'phone_eee');説明この例では、プライマリキーのない Apache Paimon 追加専用テーブルが作成されます。これらのテーブルは、プライマリキーを持つ Apache Paimon テーブルよりもバッチ書き込みパフォーマンスが優れていますが、プライマリキーに基づくデータ更新はサポートしていません。
実行結果は複数のサブタブに表示されます。
次の文は正常に実行されました!というメッセージが表示された場合、DDL 文の実行は成功しています。ジョブ ID が返された場合は、INSERT 文などの DML 文が実行されます。この場合、Realtime Compute for Apache Flink デプロイメントが作成され、Realtime Compute for Apache Flink セッションで実行されます。[結果] タブの左側にある Flink UI をクリックして、文の実行ステータスを表示できます。文の実行が完了するまで数秒待ちます。
ODS レイヤーのテーブルでデータプロファイリングを実行します。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックします。
SELECT count(*) as order_count FROM `my_catalog`.`order_dw`.`orders`; SELECT count(*) as pay_count FROM `my_catalog`.`order_dw`.`orders_pay`; SELECT * FROM `my_catalog`.`order_dw`.`product_catalog`;これらの SQL 文も Realtime Compute for Apache Flink セッションで実行されます。3 つのクエリのそれぞれの [結果] タブで実行結果を表示できます。

手順 2: DWD レイヤーと DWS レイヤーにテーブルを作成する
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックします。
USE `my_catalog`.`order_dw`;
CREATE TABLE dwd_orders (
order_id BIGINT,
order_user_id STRING,
order_shop_id BIGINT,
order_product_id BIGINT,
order_product_catalog_name STRING,
order_fee BIGINT,
order_create_time TIMESTAMP,
order_update_time TIMESTAMP,
order_state INT,
pay_id BIGINT,
pay_platform INT COMMENT 'platform 0: phone, 1: pc',
pay_create_time TIMESTAMP
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_users (
user_id STRING,
ds STRING,
total_fee BIGINT COMMENT 'Total amount of payment that is complete on the current day'
) WITH (
'sink.parallelism' = '2'
);
CREATE TABLE dws_shops (
shop_id BIGINT,
ds STRING,
total_fee BIGINT COMMENT 'Total amount of payment that is complete on the current day'
) WITH (
'sink.parallelism' = '2'
);この手順では、Apache Paimon 追加専用テーブルが作成されます。Apache Paimon テーブルを Flink シンクとして使用する場合、並列度の自動推論はサポートされていません。Apache Paimon テーブルの並列度を明示的に構成する必要があります。そうしないと、エラーが発生する可能性があります。
手順 3: DWD レイヤーと DWS レイヤーにドラフトを作成し、ドラフトをデプロイメントとしてデプロイする
DWD レイヤーにドラフトを作成し、ドラフトをデプロイメントとしてデプロイします。
DWD レイヤーのテーブルの更新ドラフトを作成します。
ページで、dwd_orders という名前の空のバッチドラフトを作成し、次の SQL 文をスクリプトエディターにコピーします。「RECOMMENDED」ラベルが付いた VVR バージョンを選択することをお勧めします。デフォルトでは、Flink SQL 文が使用され、以前のバージョンのエンジンでは SQL ダイアレクトは使用できません。DWD レイヤーのテーブルは Apache Paimon 追加専用テーブルであるため、INSERT OVERWRITE 文を使用して DWD レイヤーのテーブルを上書きします。
INSERT OVERWRITE my_catalog.order_dw.dwd_orders SELECT o.order_id, o.user_id, o.shop_id, o.product_id, c.catalog_name, o.buy_fee, o.create_time, o.update_time, o.state, p.pay_id, p.pay_platform, p.create_time FROM my_catalog.order_dw.orders as o, my_catalog.order_dw.product_catalog as c, my_catalog.order_dw.orders_pay as p WHERE o.product_id = c.product_id AND o.order_id = p.order_idSQL エディターページの右上隅にある [デプロイ] をクリックします。次に、[OK] をクリックして、dwd_orders ドラフトをデプロイメントとしてデプロイします。
DWS レイヤーにドラフトを作成し、ドラフトをデプロイメントとしてデプロイします。
DWS レイヤーのテーブルの更新ドラフトを作成します。
DWD レイヤーのテーブルの更新ドラフトを作成する を参照して、dws_shops と dws_users という名前の 2 つの空のバッチドラフトを作成します。次に、次の SQL 文のいずれかを関連するドラフトのスクリプトエディターにコピーします。
INSERT OVERWRITE my_catalog.order_dw.dws_shops SELECT order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds, SUM(order_fee) as total_fee FROM my_catalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_shop_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');INSERT OVERWRITE my_catalog.order_dw.dws_users SELECT order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd') as ds, SUM(order_fee) as total_fee FROM my_catalog.order_dw.dwd_orders WHERE pay_id IS NOT NULL AND order_fee IS NOT NULL GROUP BY order_user_id, DATE_FORMAT(pay_create_time, 'yyyyMMdd');SQL エディターページの右上隅にある [デプロイ] をクリックします。次に、[OK] をクリックして、dws_shops ドラフトと dws_users ドラフトをデプロイメントとしてデプロイします。
手順 4: DWD レイヤーと DWS レイヤーのデプロイメントを開始して表示する
DWD レイヤーのデプロイメントを開始して表示します。
ページで、デプロイメントタイプのドロップダウンリストから [バッチ] を選択します。dwd_orders デプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
次の図に示すように、[開始中] 状態のバッチジョブが生成されます。

ジョブの状態が [完了] に変わると、データ処理は完了です。
データプロファイリング結果を表示します。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックして、DWD レイヤーのテーブルのデータをクエリします。
SELECT * FROM `my_catalog`.`order_dw`.`dwd_orders`;次の図は、クエリ結果を示しています。

DWS レイヤーのデプロイメントを開始して表示します。
ページで、デプロイメントタイプのドロップダウンリストから [バッチ] を選択します。dws_shops デプロイメントと dws_users デプロイメントを見つけ、各デプロイメントの [アクション] 列の [開始] をクリックします。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックして、DWS レイヤーのテーブルのデータをクエリします。
SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`; SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;次の図は、クエリ結果を示しています。

手順 5: デプロイメントをバッチ処理ワークフローとしてオーケストレーションする
このセクションでは、前のセクションで作成したデプロイメントをワークフローとしてオーケストレーションする方法について説明します。このようにして、特定のシーケンスに基づいてデプロイメントを一元的に実行できます。
ワークフローを作成します。
Realtime Compute for Apache Flink コンソールの左側のナビゲーションウィンドウで、 をクリックします。表示されるページで、[ワークフローの作成] をクリックします。
[ワークフローの作成] パネルの [名前] フィールドに wf_orders と入力し、[スケジューリングタイプ] パラメーターの値を変更せずにそのままにし (デフォルト値: [手動スケジューリング])、[リソースキュー] パラメーターに default-queue を選択し、[作成] をクリックします。ワークフロー編集ページが表示されます。
ワークフローを編集します。
最初のタスクをクリックし、タスクに v_dwd_orders という名前を付け、タスクに dwd_orders デプロイメントを選択します。
[タスクの追加] をクリックして、v_dws_shops という名前のタスクを作成します。タスクに dws_shops デプロイメントを選択し、v_dwd_orders タスクをアップストリームタスクとして構成します。
もう一度 [タスクの追加] をクリックして、v_dws_users という名前のタスクを作成します。タスクに dws_users デプロイメントを選択し、v_dwd_orders タスクをアップストリームタスクとして構成します。
ワークフロー編集ページで、右上隅にある [保存] をクリックし、[OK] をクリックします。
次の図は、作成したワークフローを示しています。

ワークフローを手動で実行します。
説明定期的なスケジュールワークフローの編集[ワークフロー] ページのワークフローの [アクション] 列にある ワークフロー (パブリックプレビュー) をクリックすると、ワークフローのスケジューリングタイプを に変更できます。詳細については、「」をご参照ください。
ワークフローを実行する前に、ODS レイヤーのテーブルにデータを挿入して、ワークフローの実行結果を確認します。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックします。
USE `my_catalog`.`order_dw`; INSERT INTO orders VALUES (100008, 'user_001', 12346, 1, 10000, TO_TIMESTAMP('2023-02-15 17:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100009, 'user_002', 12347, 2, 20000, TO_TIMESTAMP('2023-02-15 18:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1), (100010, 'user_003', 12348, 3, 30000, TO_TIMESTAMP('2023-02-15 19:40:56'), TO_TIMESTAMP('2023-02-15 18:42:56'), 1); INSERT INTO orders_pay VALUES (2008, 100008, 1, TO_TIMESTAMP('2023-02-15 20:40:56')), (2009, 100009, 1, TO_TIMESTAMP('2023-02-15 20:40:56')), (2010, 100010, 1, TO_TIMESTAMP('2023-02-15 20:40:56'));[結果] タブの左側にある [flink UI] をクリックして、デプロイメントステータスを表示します。
ページで、作成したワークフローを見つけ、[アクション] 列の [実行] をクリックします。表示されるダイアログボックスで、[OK] をクリックしてワークフローを実行します。

ワークフローの名前をクリックして、ワークフローの詳細ページに移動します。[概要] タブで、タスクインスタンスのリストを表示します。

ワークフローで実行中のインスタンスの [ID] をクリックして、インスタンスの実行詳細ページに移動します。表示されるページで、各タスクの実行ステータスを表示します。ワークフローの実行が完了するまで待ちます。

ワークフローの実行結果を表示します。
[スクリプト] タブのスクリプトエディターに、次の SQL 文を入力して選択し、スクリプトエディターの左側にある [実行] をクリックします。
SELECT * FROM `my_catalog`.`order_dw`.`dws_shops`; SELECT * FROM `my_catalog`.`order_dw`.`dws_users`;ワークフローの実行結果を表示します。
ODS レイヤーの新しいデータが処理され、DWS レイヤーのテーブルに書き込まれます。

参照資料
Realtime Compute for Apache Flink のバッチ処理の原則と構成の最適化の詳細については、「Realtime Compute for Apache Flink のバッチ処理を最適化する」をご参照ください。
Realtime Compute for Apache Flink と Apache Paimon を使用してリアルタイム データウェアハウスを構築する方法の詳細については、「Realtime Compute for Apache Flink、Apache Paimon、および StarRocks を使用してストリーミング データレイクハウスを構築する」をご参照ください。
Realtime Compute for Apache Flink の開発コンソール とオンプレミス環境で、ドラフト開発などの操作を実行できます。詳細については、「VSCode を使用してオンプレミス環境でプラグインを開発する」をご参照ください。