すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Flink SQL ストリーミングノードを作成する

最終更新日:Jan 21, 2025

DataWorks の Data Studio では、Flink SQL ストリーミングノードを使用すると、標準 SQL 文を使用してリアルタイムタスクの処理ロジックを定義できます。 Flink SQL ストリーミングノードは使いやすく、さまざまな SQL 構文をサポートし、強力な状態管理とフォールトトレランス機能を提供します。 さらに、Flink SQL ストリーミングノードはイベント時間と処理時間と互換性があり、柔軟に拡張できます。 Flink SQL ストリーミングノードは、Kafka や Hadoop 分散ファイルシステム ( HDFS ) などのサービスと簡単に統合でき、詳細なログとパフォーマンス監視ツールを提供します。 DataWorks ワークスペースで Flink SQL ストリーミングタスクを作成し、これらのタスクの SQL 文を記述するだけで、リアルタイムでデータを処理できます。 このトピックでは、DataWorks コンソールで Flink SQL ストリーミングタスクを開発し、DataWorks を使用して Realtime Compute for Apache Flink のリアルタイムデータを処理する方法について説明します。

前提条件

  • DataWorks ワークスペースが作成され、Realtime Compute for Apache Flink 計算リソースが DataWorks コンソールの管理センターで DataWorks ワークスペースに関連付けられています。

  • Flink SQL ストリーミングノードが作成されます。

手順 1:Flink SQL ストリーミングノードに基づいてタスクを開発する

Flink SQL ストリーミングノードの構成タブで、次の操作を実行して、ノードに基づいてタスクを開発できます。

SQL コードを開発する

SQL エディターで、タスクコードを開発します。 タスクコードで ${変数名} 形式で変数を定義し、[リアルタイム構成] タブの [スクリプトパラメーター] セクションでスケジューリングパラメーターを構成して、スケジューリングパラメーターを変数の値として割り当てることができます。 Flink SQL ストリーミングタスクが実行されるようにスケジュールされると、スケジューリングパラメーターの値はタスクコードで動的に置き換えられます。 サンプルコード:

-- datagen_source という名前のソーステーブルを作成します。
CREATE TEMPORARY TABLE datagen_source(
  name VARCHAR
) WITH (
  'connector' = 'datagen'
);

-- blackhole_sink という名前の結果テーブルを作成します。
CREATE TEMPORARY TABLE blackhole_sink(
  name  VARCHAR
) WITH (
  'connector' = 'blackhole'
);

-- ソーステーブル datagen_source から結果テーブル blackhole_sink にデータを挿入します。
INSERT INTO blackhole_sink
SELECT
  name
FROM datagen_source WHERE LENGTH(name) > ${name_length};
説明

この例では、パラメーター name_length の値は 5 です。 このパラメーターを構成して、名前の長さが 5 未満のデータをフィルタリングできます。

手順 2:Flink SQL ストリーミングタスクを構成する

次の表のパラメーターの説明を参照して、ビジネス要件に基づいて Flink SQL ストリーミングタスクを構成できます。

Flink リソース情報セクションのパラメーターを構成する

[リアルタイム構成] タブの [flink リソース情報] セクションで、[リソース構成モード] に基づいてパラメーターを構成します。 次の表にパラメーターを示します。 詳細については、「デプロイメントのリソースを構成する」をご参照ください。

パラメーター

説明

Flink クラスタ

管理センターで DataWorks ワークスペースに関連付けられている Realtime Compute for Apache Flink ワークスペースの名前。

Flink エンジンバージョン

ビジネス要件に基づいてエンジンバージョンを選択します。

リソースグループ

Realtime Compute for Apache Flink ワークスペースに接続されている サーバーレスリソースグループ を選択します。

2 つの [リソース構成モード] がサポートされています。基本モードとエキスパートモードです。 詳細については、「デプロイメントのリソースを構成する」をご参照ください。

  • 基本モード (デフォルト値):このモードは、初心者や単純なアプリケーションシナリオに適しています。 デフォルト構成と簡略化された設定を使用して、Realtime Compute for Apache Flink デプロイメントをすばやく開始および実行できます。

  • エキスパートモード:このモードは、経験豊富なユーザー向けの詳細設定オプションを提供し、複雑な要件や高パフォーマンスの要件を満たすために、パフォーマンスとリソースの詳細な調整を実行できます。

選択したリソース構成モードに基づいてパラメーターを構成します。 Apache Flink のアーキテクチャを理解した後は、より効率的にパラメーターを構成できます。 詳細については、「Flink アーキテクチャ」を参照してください。

基本モード

Jobmanager CPU

Realtime Compute for Apache Flink のベストプラクティスでは、JobManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。 JobManager ごとに 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。 最大 16 CPU コアを構成できます。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメントの複雑さに基づいて、このパラメーターを構成する必要があります。

Jobmanager メモリ

JobManager のメモリ構成は、JobManager のタスクスケジューリング機能と管理機能に影響します。 システムの安定した効率的な実行を確保するために、このパラメーターに 2 ~ 64 の値を指定することをお勧めします。 単位:GiB。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメント要件に基づいて、このパラメーターを構成する必要があります。

Taskmanager CPU

TaskManager の CPU リソース構成は、TaskManager がタスクでデータを処理する能力に影響します。 Realtime Compute for Apache Flink のベストプラクティスでは、TaskManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。 TaskManager ごとに 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。 最大 16 CPU コアを構成できます。 ビジネス要件に基づいて、このパラメーターを構成する必要があります。

Taskmanager メモリ

TaskManager のメモリ構成は、TaskManager がタスクでデータを処理するデータ量とパフォーマンスを決定します。 タスクの安定性と効率を確保するために、このパラメーターに 2 ~ 64 の値を指定することをお勧めします。 単位:GiB。

並列度

デプロイメントで並列に実行できるタスクの数。 並行性が高いほど、処理速度とリソース使用率が向上します。 ワークスペースリソースとデプロイメント特性に基づいて、このパラメーターを構成する必要があります。

Taskmanager ごとのスロット数

各 TaskManager のスロット数。 このパラメーターは、並列に実行できるタスクの数を指定します。 スロット構成を調整して、リソース使用率とデプロイメントの並列処理を最適化できます。

エキスパートモード

Jobmanager CPU

Realtime Compute for Apache Flink のベストプラクティスでは、JobManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.25 CPU コアと 1 GiB のメモリが必要です。 最大 16 CPU コアを構成できます。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメントの複雑さに基づいて、このパラメーターを構成する必要があります。

Jobmanager メモリ

JobManager のメモリ構成は、JobManager のタスクスケジューリング機能と管理機能に影響します。 システムの安定した効率的な実行を確保するために、このパラメーターに 1 ~ 64 の値を指定することをお勧めします。 単位:GiB。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメント要件に基づいて、このパラメーターを構成する必要があります。

Taskmanager ごとのスロット数

各 TaskManager のスロット数。 このパラメーターは、並列に実行できるタスクの数を指定します。 スロット構成を調整して、リソース使用率とデプロイメントの並列処理を最適化できます。

マルチ SSG モード

デフォルトでは、すべてのオペレーターは 1 つの SSG に配置されます。 各オペレーターのリソース構成を個別に変更することはできません。 個々のオペレーターのリソースを構成する場合は、[複数 SSG] モードを有効にして、各オペレーターが独立したスロットを持つようにする必要があります。 このようにして、スロット内の各オペレーターのリソースを構成できます。

(オプション) スクリプトパラメーターセクションのパラメーターを構成する

ノード構成ページの右側のナビゲーションウィンドウで、[リアルタイム構成] タブをクリックします。 [リアルタイム構成] タブの [スクリプトパラメーター] セクションで、[パラメーターを追加] をクリックし、[パラメーター名][パラメーター値] を構成して、コードでパラメーターを動的に使用します。

(オプション) Flink ランタイムパラメーターセクションのパラメーターを構成する

ノード構成ページの右側のナビゲーションウィンドウで、[リアルタイム構成] タブをクリックします。 [リアルタイム構成] タブの [flink ランタイムパラメーター] セクションで、次の表に示すパラメーターを構成します。 詳細については、「デプロイメントを構成する」をご参照ください。

パラメーター

説明

チェックポイント間隔

チェックポイントが生成される間隔。 間隔が短いほど、障害復旧時間は短縮されますが、システムオーバーヘッドが増加します。 このパラメーターを構成しない場合、チェックポイント機能は無効になります。

2 つのチェックポイント間の最小間隔

2 つのチェックポイント間の最小間隔。 これにより、チェックポイントが頻繁になりすぎてシステムパフォーマンスに影響を与えるのを防ぐことができます。 チェックポイントの最大並列度が 1 の場合、このパラメーターは 2 つのチェックポイント間の最小間隔を指定します。

状態データの有効期限

デプロイメントの状態データをアクセスまたは更新せずに保持できる最大時間。 デフォルト値:36。 単位:時間。 デフォルト値は、デプロイメントの状態データが 36 時間後に期限切れになることを示します。 システムは、状態ストレージとリソース使用量を最適化するために、期限切れのデータを自動的に削除します。

重要

デフォルト値は、Alibaba Cloud のベストプラクティスに基づいて決定されます。 このデフォルト値は、Apache Flink によって提供される TTL のデフォルト値とは異なります。 Apache Flink によって提供される TTL のデフォルト値は 0 で、これは状態データが期限切れにならないことを示します。

その他の構成

その他の Realtime Compute for Apache Flink 設定。 例:taskmanager.network.memory.max:4g

説明

その他のデプロイメントパラメーターの詳細については、「デプロイメントを作成する」をご参照ください。

構成が完了したら、[保存] をクリックしてタスクを保存します。

手順 3:Flink SQL ストリーミングタスクを開始する

  1. Flink SQL ストリーミングタスクをデプロイします。

    タスクは、オペレーションセンターにデプロイされた後にのみ実行できます。 プロンプトに従って Flink SQL ストリーミングタスクをデプロイできます。

    説明

    DataWorks コンソールで Flink SQL ストリーミングタスクをデプロイすると、タスクは Realtime Compute for Apache Flink の Ververica Platform ( VVP ) プラットフォームにもデプロイされます。 DataWorks からデプロイされたタスクは、Realtime Compute for Apache Flink の開発コンソールの [デプロイメント] ページで表示できます。

  2. Flink SQL ストリーミングタスクを開始します。

    タスクをデプロイした後、DataWorks コンソールで [本番オンライン] の下の [O&M を実行] をクリックできます。 オペレーションセンターの左側のナビゲーションウィンドウで、[ノード O&M] > リアルタイムノード O&M > リアルタイムコンピューティングノード を選択します。 表示されるページで、タスクを見つけ、[アクション] 列の [開始] をクリックして、タスクの実行ステータスを開始および表示します。