DataWorks の Data Studio では、Flink SQL ストリーミングノードを使用すると、標準 SQL 文を使用してリアルタイムタスクの処理ロジックを定義できます。 Flink SQL ストリーミングノードは使いやすく、さまざまな SQL 構文をサポートし、強力な状態管理とフォールトトレランス機能を提供します。 さらに、Flink SQL ストリーミングノードはイベント時間と処理時間と互換性があり、柔軟に拡張できます。 Flink SQL ストリーミングノードは、Kafka や Hadoop 分散ファイルシステム ( HDFS ) などのサービスと簡単に統合でき、詳細なログとパフォーマンス監視ツールを提供します。 DataWorks ワークスペースで Flink SQL ストリーミングタスクを作成し、これらのタスクの SQL 文を記述するだけで、リアルタイムでデータを処理できます。 このトピックでは、DataWorks コンソールで Flink SQL ストリーミングタスクを開発し、DataWorks を使用して Realtime Compute for Apache Flink のリアルタイムデータを処理する方法について説明します。
前提条件
DataWorks ワークスペースが作成され、Realtime Compute for Apache Flink 計算リソースが DataWorks コンソールの管理センターで DataWorks ワークスペースに関連付けられています。
Flink SQL ストリーミングノードが作成されます。
手順 1:Flink SQL ストリーミングノードに基づいてタスクを開発する
Flink SQL ストリーミングノードの構成タブで、次の操作を実行して、ノードに基づいてタスクを開発できます。
SQL コードを開発する
SQL エディターで、タスクコードを開発します。 タスクコードで ${変数名} 形式で変数を定義し、[リアルタイム構成] タブの [スクリプトパラメーター] セクションでスケジューリングパラメーターを構成して、スケジューリングパラメーターを変数の値として割り当てることができます。 Flink SQL ストリーミングタスクが実行されるようにスケジュールされると、スケジューリングパラメーターの値はタスクコードで動的に置き換えられます。 サンプルコード:
-- datagen_source という名前のソーステーブルを作成します。
CREATE TEMPORARY TABLE datagen_source(
name VARCHAR
) WITH (
'connector' = 'datagen'
);
-- blackhole_sink という名前の結果テーブルを作成します。
CREATE TEMPORARY TABLE blackhole_sink(
name VARCHAR
) WITH (
'connector' = 'blackhole'
);
-- ソーステーブル datagen_source から結果テーブル blackhole_sink にデータを挿入します。
INSERT INTO blackhole_sink
SELECT
name
FROM datagen_source WHERE LENGTH(name) > ${name_length};この例では、パラメーター name_length の値は 5 です。 このパラメーターを構成して、名前の長さが 5 未満のデータをフィルタリングできます。
手順 2:Flink SQL ストリーミングタスクを構成する
次の表のパラメーターの説明を参照して、ビジネス要件に基づいて Flink SQL ストリーミングタスクを構成できます。
Flink リソース情報セクションのパラメーターを構成する
[リアルタイム構成] タブの [flink リソース情報] セクションで、[リソース構成モード] に基づいてパラメーターを構成します。 次の表にパラメーターを示します。 詳細については、「デプロイメントのリソースを構成する」をご参照ください。
パラメーター | 説明 |
Flink クラスタ | 管理センターで DataWorks ワークスペースに関連付けられている Realtime Compute for Apache Flink ワークスペースの名前。 |
Flink エンジンバージョン | ビジネス要件に基づいてエンジンバージョンを選択します。 |
リソースグループ | Realtime Compute for Apache Flink ワークスペースに接続されている サーバーレスリソースグループ を選択します。 |
2 つの [リソース構成モード] がサポートされています。基本モードとエキスパートモードです。 詳細については、「デプロイメントのリソースを構成する」をご参照ください。
選択したリソース構成モードに基づいてパラメーターを構成します。 Apache Flink のアーキテクチャを理解した後は、より効率的にパラメーターを構成できます。 詳細については、「Flink アーキテクチャ」を参照してください。 | |
基本モード | |
Jobmanager CPU | Realtime Compute for Apache Flink のベストプラクティスでは、JobManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。 JobManager ごとに 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。 最大 16 CPU コアを構成できます。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメントの複雑さに基づいて、このパラメーターを構成する必要があります。 |
Jobmanager メモリ | JobManager のメモリ構成は、JobManager のタスクスケジューリング機能と管理機能に影響します。 システムの安定した効率的な実行を確保するために、このパラメーターに 2 ~ 64 の値を指定することをお勧めします。 単位:GiB。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメント要件に基づいて、このパラメーターを構成する必要があります。 |
Taskmanager CPU | TaskManager の CPU リソース構成は、TaskManager がタスクでデータを処理する能力に影響します。 Realtime Compute for Apache Flink のベストプラクティスでは、TaskManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.5 CPU コアと 2 GiB のメモリが必要です。 TaskManager ごとに 1 CPU コアと 4 GiB のメモリを構成することをお勧めします。 最大 16 CPU コアを構成できます。 ビジネス要件に基づいて、このパラメーターを構成する必要があります。 |
Taskmanager メモリ | TaskManager のメモリ構成は、TaskManager がタスクでデータを処理するデータ量とパフォーマンスを決定します。 タスクの安定性と効率を確保するために、このパラメーターに 2 ~ 64 の値を指定することをお勧めします。 単位:GiB。 |
並列度 | デプロイメントで並列に実行できるタスクの数。 並行性が高いほど、処理速度とリソース使用率が向上します。 ワークスペースリソースとデプロイメント特性に基づいて、このパラメーターを構成する必要があります。 |
Taskmanager ごとのスロット数 | 各 TaskManager のスロット数。 このパラメーターは、並列に実行できるタスクの数を指定します。 スロット構成を調整して、リソース使用率とデプロイメントの並列処理を最適化できます。 |
エキスパートモード | |
Jobmanager CPU | Realtime Compute for Apache Flink のベストプラクティスでは、JobManager には、デプロイメントの安定した実行を確保するために、少なくとも 0.25 CPU コアと 1 GiB のメモリが必要です。 最大 16 CPU コアを構成できます。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメントの複雑さに基づいて、このパラメーターを構成する必要があります。 |
Jobmanager メモリ | JobManager のメモリ構成は、JobManager のタスクスケジューリング機能と管理機能に影響します。 システムの安定した効率的な実行を確保するために、このパラメーターに 1 ~ 64 の値を指定することをお勧めします。 単位:GiB。 Realtime Compute for Apache Flink ワークスペースのサイズとデプロイメント要件に基づいて、このパラメーターを構成する必要があります。 |
Taskmanager ごとのスロット数 | 各 TaskManager のスロット数。 このパラメーターは、並列に実行できるタスクの数を指定します。 スロット構成を調整して、リソース使用率とデプロイメントの並列処理を最適化できます。 |
マルチ SSG モード | デフォルトでは、すべてのオペレーターは 1 つの SSG に配置されます。 各オペレーターのリソース構成を個別に変更することはできません。 個々のオペレーターのリソースを構成する場合は、[複数 SSG] モードを有効にして、各オペレーターが独立したスロットを持つようにする必要があります。 このようにして、スロット内の各オペレーターのリソースを構成できます。 |
(オプション) スクリプトパラメーターセクションのパラメーターを構成する
ノード構成ページの右側のナビゲーションウィンドウで、[リアルタイム構成] タブをクリックします。 [リアルタイム構成] タブの [スクリプトパラメーター] セクションで、[パラメーターを追加] をクリックし、[パラメーター名] と [パラメーター値] を構成して、コードでパラメーターを動的に使用します。
(オプション) Flink ランタイムパラメーターセクションのパラメーターを構成する
ノード構成ページの右側のナビゲーションウィンドウで、[リアルタイム構成] タブをクリックします。 [リアルタイム構成] タブの [flink ランタイムパラメーター] セクションで、次の表に示すパラメーターを構成します。 詳細については、「デプロイメントを構成する」をご参照ください。
パラメーター | 説明 |
チェックポイント間隔 | チェックポイントが生成される間隔。 間隔が短いほど、障害復旧時間は短縮されますが、システムオーバーヘッドが増加します。 このパラメーターを構成しない場合、チェックポイント機能は無効になります。 |
2 つのチェックポイント間の最小間隔 | 2 つのチェックポイント間の最小間隔。 これにより、チェックポイントが頻繁になりすぎてシステムパフォーマンスに影響を与えるのを防ぐことができます。 チェックポイントの最大並列度が 1 の場合、このパラメーターは 2 つのチェックポイント間の最小間隔を指定します。 |
状態データの有効期限 | デプロイメントの状態データをアクセスまたは更新せずに保持できる最大時間。 デフォルト値:36。 単位:時間。 デフォルト値は、デプロイメントの状態データが 36 時間後に期限切れになることを示します。 システムは、状態ストレージとリソース使用量を最適化するために、期限切れのデータを自動的に削除します。 重要 デフォルト値は、Alibaba Cloud のベストプラクティスに基づいて決定されます。 このデフォルト値は、Apache Flink によって提供される TTL のデフォルト値とは異なります。 Apache Flink によって提供される TTL のデフォルト値は 0 で、これは状態データが期限切れにならないことを示します。 |
その他の構成 | その他の Realtime Compute for Apache Flink 設定。 例: 説明 その他のデプロイメントパラメーターの詳細については、「デプロイメントを作成する」をご参照ください。 |
構成が完了したら、[保存] をクリックしてタスクを保存します。
手順 3:Flink SQL ストリーミングタスクを開始する
Flink SQL ストリーミングタスクをデプロイします。
タスクは、オペレーションセンターにデプロイされた後にのみ実行できます。 プロンプトに従って Flink SQL ストリーミングタスクをデプロイできます。
説明DataWorks コンソールで Flink SQL ストリーミングタスクをデプロイすると、タスクは Realtime Compute for Apache Flink の Ververica Platform ( VVP ) プラットフォームにもデプロイされます。 DataWorks からデプロイされたタスクは、Realtime Compute for Apache Flink の開発コンソールの [デプロイメント] ページで表示できます。
Flink SQL ストリーミングタスクを開始します。
タスクをデプロイした後、DataWorks コンソールで [本番オンライン] の下の [O&M を実行] をクリックできます。 オペレーションセンターの左側のナビゲーションウィンドウで、 を選択します。 表示されるページで、タスクを見つけ、[アクション] 列の [開始] をクリックして、タスクの実行ステータスを開始および表示します。