Flink SQL バッチノードを使用すると、標準 SQL 文を使用してデータ処理タスクを定義および実行できます。Flink SQL バッチノードは、データクレンジングや集約など、大規模なデータセットの分析と変換に適しています。このタイプのノードは、視覚的に構成できるため、大規模データに対して効率的かつ柔軟なバッチ処理ソリューションを提供できます。Flink SQL バッチノードでは、SQL に似た文を使用して、大規模データのバッチ処理を実行できます。このトピックでは、Flink SQL バッチノードを構成する方法と、Flink SQL バッチノードで SQL 文を使用してデータをバッチ処理する方法について説明します。
前提条件
DataWorks ワークスペースが作成され、Realtime Compute for Apache Flink の計算リソースが管理センターの DataWorks ワークスペースに関連付けられています。詳細については、「ワークスペースに計算リソースを関連付ける(Data Studio のパブリックプレビューへの参加が有効になっている)」をご参照ください。
Flink SQL バッチノードが作成されます。詳細については、「ノード開発」をご参照ください。
手順 1:Flink SQL バッチノードに基づいてタスクを開発する
Flink SQL バッチノードの構成タブで、次の操作を実行して、ノードに基づいてタスクを開発できます。
SQL コードを開発する
SQL エディターで、タスクコードを開発します。タスクコードで ${変数名} 形式で変数を定義し、[プロパティ] タブの [スケジューリングパラメーター] セクションでスケジューリングパラメーターを構成して、スケジューリングパラメーターを変数に値として割り当てることができます。Flink SQL バッチタスクが実行されるようにスケジュールされると、スケジューリングパラメーターの値がタスクコードで動的に置き換えられます。スケジューリングパラメーターの使用方法の詳細については、「スケジューリングパラメーターのサポートされている形式」をご参照ください。サンプルコード:
-- datagen_source という名前のソーステーブルを作成します。
CREATE TEMPORARY TABLE datagen_source_${var}(
name VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '1000'
);
-- blackhole_sink という名前の結果テーブルを作成します。
CREATE TEMPORARY TABLE blackhole_sink_${var}(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
-- ソーステーブル datagen_source から結果テーブル blackhole_sink にデータを挿入します。
INSERT INTO blackhole_sink_${var}
SELECT
name
FROM datagen_source_${var};この例では、パラメーター bizdate の値は $[yyyymmdd] です。このパラメーターを構成して、日次増分データをバッチで同期できます。
手順 2:Flink SQL バッチタスクを構成する
次の表のパラメーターの説明を参照して、ビジネス要件に基づいて Flink SQL バッチタスクを構成します。
Flink リソース情報セクションのパラメーターを構成する
[プロパティ] タブの [flink リソース情報] セクションで、次のパラメーターを構成します。詳細については、「デプロイメントを構成する」をご参照ください。
パラメーター | 説明 |
Flink クラスタ | 管理センターの DataWorks ワークスペースに関連付けられている Realtime Compute for Apache Flink ワークスペースの名前。 |
Flink エンジンバージョン | ビジネス要件に基づいてエンジンバージョンを選択します。 |
スケジューリングのリソースグループ | Realtime Compute for Apache Flink ワークスペースに接続されているサーバーレスリソースグループを選択します。 |
Jobmanager CPU | Realtime Compute for Apache Flink のベストプラクティスでは、JobManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。JobManager には 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。最大 16 CPU コアを構成できます。Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメントの複雑さに基づいて、このパラメーターを構成する必要があります。 |
Jobmanager メモリ | JobManager のメモリ構成は、JobManager のタスクスケジューリング機能と管理機能に影響します。システムの安定した効率的な実行を確保するために、このパラメーターには 2 ~ 64 の値を指定することをお勧めします。単位:GiB。Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメント要件に基づいて、このパラメーターを構成する必要があります。 |
Taskmanager CPU | TaskManager の CPU リソース構成は、TaskManager がタスクでデータを処理する能力に影響します。Realtime Compute for Apache Flink のベストプラクティスでは、TaskManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。各 TaskManager に 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。最大 16 CPU コアを構成できます。ビジネス要件に基づいて、このパラメーターを構成する必要があります。 |
Taskmanager メモリ | TaskManager のメモリ構成は、TaskManager がタスクでデータを処理するデータ量とパフォーマンスを決定します。タスクの安定性と効率性を確保するために、このパラメーターには 2 ~ 64 の値を指定することをお勧めします。単位:GiB。 |
並列度 | デプロイメントで並列に実行できるタスクの数。並行性が高いほど、処理速度とリソース使用率が向上します。ワークスペースリソースとデプロイメント特性に基づいて、このパラメーターを構成する必要があります。 |
スロットの最大数 | TaskManager のタスクに割り当てることができるスロットの最大数。各スロットは、タスクまたはオペレーターを実行できます。ビジネス要件に基づいて、スロットの最大数を調整できます。 |
各 Taskmanager のスロット | 各 TaskManager のスロットの数。このパラメーターは、並列に実行できるタスクの数を指定します。スロット構成を調整して、リソース使用率とデプロイメントの並列処理を最適化できます。 |
(オプション)スケジューリングパラメーターを構成する
Flink SQL バッチノードの構成タブの右側ナビゲーションウィンドウで、[プロパティ] をクリックします。プロパティ[スケジューリングパラメーター] タブの [パラメーターを追加]パラメーター名パラメーター値 セクションで、 をクリックし、 パラメーターと パラメーターを構成して、コードで動的に使用するために Flink SQL バッチノードのスケジューリングパラメーターを構成します。
(オプション)Flink ランタイムパラメーターセクションのパラメーターを構成する
Flink SQL バッチノードの構成タブの右側のナビゲーションウィンドウで、[プロパティ] をクリックします。[flink ランタイムパラメーター] タブの [Flink ランタイムパラメーター] セクションで、ランタイムパラメーターを構成します。詳細については、「デプロイを構成する」をご参照ください。
[flink ランタイムパラメーター] セクションでパラメーターを構成する場合、パラメーター構成は Ververica Platform(VVP)のパラメーター構成と互換性がある必要があります。セミコロン(;)や改行などの特殊文字を追加せずに、YAML 構文形式でパラメーターを構成できます。
Flink SQL バッチノードのタスクを定期的に実行する場合は、ビジネス要件に基づいて、[スケジューリングポリシー]、[スケジューリング時間]、[スケジューリング依存関係]、および [ノード出力パラメーター] セクションのパラメーターを構成する必要があります。詳細については、「ノードスケジューリング」をご参照ください。
構成が完了したら、[保存] をクリックしてタスクを保存します。
手順 3:Flink SQL バッチノードをデプロイし、O&M 操作を実行する
Flink SQL バッチノードのタスクが構成されたら、タスクをコミットしてデプロイします。詳細については、「ノードまたはワークフローのデプロイメント」をご参照ください。
タスクがデプロイされたら、[本番オンライン] の下の [O&M を実行] をクリックして、オペレーションセンターでタスクの実行ステータスを表示できます。詳細については、「オペレーションセンターの概要」をご参照ください。