Data Studio の Flink SQL ストリーミングノードを使用すると、標準 SQL 文を使用してリアルタイムタスクの処理ロジックを定義できます。Flink SQL ストリーミングは使いやすく、豊富な SQL をサポートし、強力な状態管理と堅牢なフォールトトレランスを提供します。イベント時間と処理時間の両方と互換性があり、柔軟な拡張機能を提供します。このノードは、Kafka や Hadoop 分散ファイルシステム (HDFS) などのシステムと簡単に統合できます。また、詳細なログとパフォーマンスモニタリングツールも提供します。リアルタイムデータ処理を開始するには、DataWorks プロジェクトに Flink SQL ストリーミングタスクを追加し、SQL 文を記述します。このトピックでは、DataWorks で Flink SQL ストリーミングノードタスクを開発し、DataWorks を使用して Flink でリアルタイムデータ処理を実行する方法について説明します。
前提条件
管理センターで Realtime Compute for Apache Flink のコンピューティングリソースをアタッチ済みであること。詳細については、「コンピューティングリソースのアタッチ」をご参照ください。
Flink SQL ストリーミングノードを作成済みであること。詳細については、「スケジュールされたワークフローのノードの作成」をご参照ください。
ステップ 1: Flink SQL ストリーミングノードの開発
Flink SQL ストリーミングノードの編集ページで、ノードタスクを開発できます。
SQL コードの開発
SQL 編集エリアでは、タスクコードを開発し、${variable_name} 形式を使用して変数を定義できます。その後、ノード編集ページの右側にある [リアルタイム構成] の下の [スクリプトパラメーター] セクションで、これらの変数に値を割り当てることができます。これにより、次の例に示すように、スケジューリングシナリオでコードにパラメーターを動的に渡すことができます。
--ソーステーブル datagen_source を作成します。
CREATE TEMPORARY TABLE datagen_source(
name VARCHAR
) WITH (
'connector' = 'datagen'
);
--結果テーブル blackhole_sink を作成します。
CREATE TEMPORARY TABLE blackhole_sink(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
--ソーステーブルのデータを結果テーブルに挿入します。
INSERT INTO blackhole_sink
SELECT
name
FROM datagen_source WHERE LENGTH(name) > ${name_length};この例では、name_length パラメーターの値は 5 です。このパラメーターを設定すると、名前の長さが 5 以下のデータが除外されます。
ステップ 2: Flink SQL ストリーミングノードの構成
以下のパラメーターの説明に基づいて、必要に応じて Flink SQL ストリーミングノードタスクを構成できます。
Flink リソースの構成
編集ページの右側にある [リアルタイム構成] ペインの [Flink リソース情報] セクションで、[リソース割り当て] に基づいて次のパラメーターを構成できます。詳細については、「ジョブリソースの構成」をご参照ください。
パラメーター | 説明 |
Flink クラスター | 管理センターにアタッチした、フルマネージド型の Flink コンピューティングリソースの名前です。 |
Flink エンジンバージョン | 必要に応じてエンジンバージョンを選択します。 |
リソースグループ | Flink ネットワークに接続可能な サーバーレスリソースグループ を選択します。 |
[リソースモード] は、以下の 2 つのモードをサポートしています。詳細については、「ジョブリソースの設定」をご参照ください。
選択したリソース割り当てモードに応じて、関連するパラメーターを構成してください。Flink アーキテクチャについてより深く理解しておくと、パラメーターの構成がより効果的になります。Flink アーキテクチャの詳細については、「Flink Architecture | Apache Flink」をご参照ください。 | |
ベーシックモード | |
JobManager CPU | Flink のベストプラクティスでは、JobManager は安定動作のために最低でも 0.5 CPU コアおよび 2 GiB のメモリを必要とします。推奨構成は 1 CPU コアおよび 4 GiB のメモリです。最大値は 16 CPU コアです。クラスター規模およびジョブの複雑度に応じて構成を調整してください。 |
JobManager メモリ | JobManager のメモリ構成は、スケジューリング処理およびタスク管理能力に影響を与えます。安定かつ効率的な動作を確保するため、2 GiB ~ 64 GiB の範囲で構成することを推奨します。クラスター規模およびジョブ要件に応じてサイズを調整してください。 |
TaskManager CPU | TaskManager の CPU リソース構成は、タスク処理能力に影響を与えます。Flink のベストプラクティスでは、最低構成として 0.5 CPU コアおよび 2 GiB のメモリを推奨します。推奨構成は 1 CPU コアおよび 4 GiB のメモリです。最大値は 16 CPU コアです。実際の要件に応じて構成を調整してください。 |
TaskManager メモリ | TaskManager のメモリ構成は、処理可能なデータ量およびパフォーマンスを決定します。ジョブの安定実行および効率的な処理を確保するため、メモリサイズは最低でも 2 GiB 以上である必要があります。最大値は 64 GiB です。 |
同時実行数 | これは、Flink ジョブ内で並列に実行可能なタスク数を決定します。同時実行数を高く設定すると、処理速度およびリソース利用率が向上します。クラスターのリソースおよびジョブの特性に応じて、このパラメーターを適切に設定してください。 |
TaskManager あたりのスロット数 | TaskManager あたりのスロット数は、その TaskManager が並列に実行可能なタスク数を決定します。スロット構成を調整することで、リソース利用率およびジョブの並列処理能力を最適化できます。 |
エキスパートモード | |
JobManager CPU | Flink のベストプラクティスでは、JobManager は安定動作のために最低でも 0.25 CPU コアおよび 1 GiB のメモリを必要とします。最大値は 16 CPU コアです。クラスター規模およびジョブの複雑度に応じて構成を調整してください。 |
JobManager メモリ | JobManager のメモリ構成は、スケジューリング処理およびタスク管理能力に影響を与えます。安定かつ効率的な動作を確保するため、1 GiB ~ 64 GiB の範囲で構成することを推奨します。クラスター規模およびジョブ要件に応じてサイズを調整してください。 |
TaskManager あたりのスロット数 | TaskManager あたりのスロット数は、その TaskManager が並列に実行可能なタスク数を決定します。スロット構成を調整することで、リソース利用率およびジョブの並列処理能力を最適化できます。 |
マルチ SSG モード | デフォルトでは、すべてのオペレーターが 1 つの SSG に配置されます。各オペレーターのリソース構成を個別に変更することはできません。個々のオペレーターに対してリソースを構成するには、マルチ SSG モード を有効化します。これにより、各オペレーターに独立したスロットが割り当てられ、対応するスロット上で直接リソースを構成できるようになります。 |
(オプション) スクリプトパラメーターの構成
コードでパラメーターを動的に使用するには、右側のナビゲーションバーの [リアルタイム構成] ペインの [スクリプトパラメーター] セクションで [パラメーターの追加] をクリックし、[パラメーター名] と [パラメーター値] を編集します。
(オプション) Flink 実行時パラメーターの構成
右側のナビゲーションバーの [リアルタイム構成] ペインで、[Flink 実行パラメーター] セクションの次のパラメーターを構成できます。詳細については、「ジョブデプロイ情報の構成」をご参照ください。
パラメーター | 説明 |
システムチェックポイント間隔 | このパラメーターは、Flink ジョブが定期的にシステムチェックポイントを実行する時間間隔を決定します。間隔を短くすると障害回復時間を短縮できますが、システムオーバーヘッドが増加します。このパラメーターを入力しない場合、システムチェックポイントは無効になります。 |
[システムチェックポイント間の最小間隔] | このパラメーターは、Flink が連続するチェックポイント間で待機する必要がある最小時間を定義します。これにより、頻繁すぎるチェックポイントがシステムパフォーマンスに影響を与えるのを防ぎます。これにより、システムチェックポイントの最大並列度が 1 の場合に、2 つのチェックポイント間に最小時間間隔があることが保証されます。 |
ステート データ TTL | このパラメーターは、Flink ジョブの状態データがアクセスまたは更新されずに保持できる最大時間を決定します。デフォルト値は 36 時間です。これは、ジョブの状態情報が 36 時間後に自動的に有効期限切れになり、パージされることを意味します。これにより、状態ストレージとリソース使用量が最適化されます。 重要 ここでのデフォルト値はクラウドのベストプラクティスに基づいており、オープンソースのデフォルトとは異なります。オープンソースのデフォルトは 0 で、状態情報が期限切れにならないことを意味します。 |
[その他の構成] | 他の Flink 実行時パラメーターの構成をサポートします。ここで、 説明 パラメーター設定の詳細については、「ジョブデプロイ情報の構成」をご参照ください。 |
タスクを構成した後、[保存] をクリックします。
ステップ 3: Flink SQL ストリーミングノードの開始
Flink SQL ストリーミングノードの公開
タスクを実行する前に、オペレーションセンターに公開する必要があります。画面の指示に従って、Flink SQL ストリーミングノードを公開します。詳細については、「ノードまたはワークフローの公開」をご参照ください。
説明公開操作により、タスクは Flink vvp スペースにも同期されます。DataWorks から公開されたタスクは、Flink vvp オペレーションセンターの [ジョブ O&M] で表示できます。
Flink SQL ストリーミングノードの開始
タスクが公開された後、[本番環境に公開] の下にある [O&M に移動] をクリックします。オペレーションセンターで、 に移動し、開始するタスクを見つけて、[アクション] 列の [開始] ボタンをクリックします。その後、その実行ステータスを表示できます。