すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Flink SQL バッチ

最終更新日:Apr 03, 2025

Flink SQL バッチノードを使用すると、標準 SQL 文を使用してデータ処理タスクを定義および実行できます。Flink SQL バッチノードは、データクレンジングや集約など、大規模なデータセットの分析と変換に適しています。このタイプのノードは、視覚的に構成できるため、大規模データに対して効率的かつ柔軟なバッチ処理ソリューションを提供できます。Flink SQL バッチノードでは、SQL に似た文を使用して、大規模データのバッチ処理を実行できます。このトピックでは、Flink SQL バッチノードを構成する方法と、Flink SQL バッチノードで SQL 文を使用してデータをバッチ処理する方法について説明します。

前提条件

手順 1:Flink SQL バッチノードに基づいてタスクを開発する

Flink SQL バッチノードの構成タブで、次の操作を実行して、ノードに基づいてタスクを開発できます。

SQL コードを開発する

SQL エディターで、タスクコードを開発します。タスクコードで ${変数名} 形式で変数を定義し、[プロパティ] タブの [スケジューリングパラメーター] セクションでスケジューリングパラメーターを構成して、スケジューリングパラメーターを変数に値として割り当てることができます。Flink SQL バッチタスクが実行されるようにスケジュールされると、スケジューリングパラメーターの値がタスクコードで動的に置き換えられます。スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポートされている形式」をご参照ください。サンプルコード:

-- datagen_source という名前のソーステーブルを作成します。
CREATE TEMPORARY TABLE datagen_source_${var}(
  name VARCHAR
) WITH (
  'connector' = 'datagen',
  'number-of-rows' = '1000' 
);

-- blackhole_sink という名前の結果テーブルを作成します。
CREATE TEMPORARY TABLE blackhole_sink_${var}(
  name  VARCHAR
) WITH (
  'connector' = 'blackhole'
);

-- ソーステーブル datagen_source から結果テーブル blackhole_sink にデータを挿入します。
INSERT INTO blackhole_sink_${var}
SELECT
  name
FROM datagen_source_${var};
説明

この例では、パラメーター bizdate の値は $[yyyymmdd] です。このパラメーターを構成して、日次増分データをバッチで同期できます。

手順 2:Flink SQL バッチタスクを構成する

次の表のパラメーターの説明を参照して、ビジネス要件に基づいて Flink SQL バッチタスクを構成します。

Flink リソース情報セクションのパラメーターを構成する

[プロパティ] タブの [flink リソース情報] セクションで、次のパラメーターを構成します。詳細については、「デプロイメントを構成する」をご参照ください。

パラメーター

説明

Flink クラスタ

管理センターの DataWorks ワークスペースに関連付けられている Realtime Compute for Apache Flink ワークスペースの名前。

Flink エンジンバージョン

ビジネス要件に基づいてエンジンバージョンを選択します。

スケジューリングのリソースグループ

Realtime Compute for Apache Flink ワークスペースに接続されているサーバーレスリソースグループを選択します。

Jobmanager CPU

Realtime Compute for Apache Flink のベストプラクティスでは、JobManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。JobManager には 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。最大 16 CPU コアを構成できます。Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメントの複雑さに基づいて、このパラメーターを構成する必要があります。

Jobmanager メモリ

JobManager のメモリ構成は、JobManager のタスクスケジューリング機能と管理機能に影響します。システムの安定した効率的な実行を確保するために、このパラメーターには 2 ~ 64 の値を指定することをお勧めします。単位:GiB。Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメント要件に基づいて、このパラメーターを構成する必要があります。

Taskmanager CPU

TaskManager の CPU リソース構成は、TaskManager がタスクでデータを処理する能力に影響します。Realtime Compute for Apache Flink のベストプラクティスでは、TaskManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。各 TaskManager に 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。最大 16 CPU コアを構成できます。ビジネス要件に基づいて、このパラメーターを構成する必要があります。

Taskmanager メモリ

TaskManager のメモリ構成は、TaskManager がタスクでデータを処理するデータ量とパフォーマンスを決定します。タスクの安定性と効率性を確保するために、このパラメーターには 2 ~ 64 の値を指定することをお勧めします。単位:GiB。

並列度

デプロイメントで並列に実行できるタスクの数。並行性が高いほど、処理速度とリソース使用率が向上します。ワークスペースリソースとデプロイメント特性に基づいて、このパラメーターを構成する必要があります。

スロットの最大数

TaskManager のタスクに割り当てることができるスロットの最大数。各スロットは、タスクまたはオペレーターを実行できます。ビジネス要件に基づいて、スロットの最大数を調整できます。

各 Taskmanager のスロット

各 TaskManager のスロットの数。このパラメーターは、並列に実行できるタスクの数を指定します。スロット構成を調整して、リソース使用率とデプロイメントの並列処理を最適化できます。

(オプション)スケジューリングパラメーターを構成する

Flink SQL バッチノードの構成タブの右側ナビゲーションウィンドウで、[プロパティ] をクリックします。プロパティ[スケジューリングパラメーター] タブの [パラメーターを追加]パラメーター名パラメーター値 セクションで、 をクリックし、 パラメーターと パラメーターを構成して、コードで動的に使用するために Flink SQL バッチノードのスケジューリングパラメーターを構成します。

(オプション)Flink ランタイムパラメーターセクションのパラメーターを構成する

Flink SQL バッチノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[flink ランタイムパラメーター] タブの [Flink ランタイムパラメーター] セクションで、ランタイムパラメーターを構成します。詳細については、「デプロイを構成する」をご参照ください。

[flink ランタイムパラメーター] セクションでパラメーターを構成する場合、パラメーター構成は Ververica Platform(VVP)のパラメーター構成と互換性がある必要があります。セミコロン(;)や改行などの特殊文字を追加せずに、YAML 構文形式でパラメーターを構成できます。

説明

Flink SQL バッチノードのタスクを定期的に実行する場合は、ビジネス要件に基づいて、[スケジューリングポリシー][スケジューリング時間][スケジューリング依存関係]、および [ノード出力パラメーター] セクションのパラメーターを構成する必要があります。詳細については、「ノードスケジューリング」をご参照ください。

構成が完了したら、[保存] をクリックしてタスクを保存します。

手順 3:Flink SQL バッチノードをデプロイし、O&M 操作を実行する

  1. Flink SQL バッチノードのタスクが構成されたら、タスクをコミットしてデプロイします。詳細については、「ノードまたはワークフローのデプロイメント」をご参照ください。

  2. タスクがデプロイされたら、[本番オンライン] の下の [O&M を実行] をクリックして、オペレーションセンターでタスクの実行ステータスを表示できます。詳細については、「オペレーションセンターの概要」をご参照ください。

参照