すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink SQL ジョブの開始

最終更新日:Nov 06, 2025

このトピックでは、SQL ドラフトとそのデプロイメントを管理して、Flink SQL ジョブを作成、デプロイ、開始、およびキャンセルする方法について説明します。

前提条件

  • Realtime Compute for Apache Flink の開発コンソールにアクセスするために使用する Resource Access Management (RAM) ユーザーまたはロールには、必要な権限が必要です。 詳細については、「権限管理」をご参照ください。

  • Flink ワークスペースが作成されています。 「Realtime Compute for Apache Flink のアクティベート」をご参照ください。

ステップ 1: SQL ドラフトの作成

  1. SQL ドラフト作成ページに移動します。

    1. Realtime Compute for Apache Flink 管理コンソールにログインします。

    2. 対象のワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

      開発コンソールが表示されます。

    3. 左側のナビゲーションメニューで、[開発] > [ETL] を選択します。

  2. image > [新しい空白のストリームドラフト] をクリックします。 名前を入力し、エンジンバージョンを選択します。

    または、[テンプレートから新規ドラフト] をクリックして、組み込みテンプレートを使用することもできます。 「コードテンプレート」および「データ同期テンプレート」をご参照ください。

    設定項目

    説明

    名前

    SQL ドラフト名。

    説明

    現在の名前空間でユニバーサルに一意である必要があります。

    flink-test

    エンジンバージョン

    SQL ジョブの VVR バージョン。

    ジョブの信頼性とパフォーマンスを向上させるには、[RECOMMENDED] または [STABLE] のラベルが付いたエンジンバージョンを使用します。 詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-11.2-jdk11-flink-1.20

  3. [作成] をクリックします。

ステップ 2: コードの記述とドラフト構成の表示

  1. SQL でコードを記述します。

    次の SQL 文を SQL エディターにコピーします。 この例では、Datagen コネクタを使用してランダムなデータストリームを生成し、Print コネクタを使用して計算結果をコンソール出力に表示します。 「サポートされているコネクタ」もご参照ください。

    -- datagen_source という名前の一時ソーステーブルを作成します。
    CREATE TEMPORARY TABLE datagen_source(
      randstr VARCHAR
    ) WITH (
      'connector'='datagen' -- Datagen コネクタを使用します。
    );
    
    -- print_table という名前の一時シンクテーブルを作成します。
    CREATE TEMPORARY TABLE print_table(
      randstr  VARCHAR
    ) WITH (
      'connector' = 'print',   -- Print コネクタを使用します。
      'logger' = 'true'        -- 計算結果をコンソールに表示します。
    );
    
    -- print_table テーブルの randstr フィールドのデータを表示します。
    INSERT INTO print_table
    SELECT SUBSTRING(randstr,0,8) from datagen_source;
    説明
    • この例では、INSERT INTO 文を使用して単一のシンクにデータを書き込みます。 INSERT INTO 文を使用して複数のシンクにデータを書き込むこともできます。 詳細については、「INSERT INTO」をご参照ください。

    • ドラフトを作成するときは、カタログに登録されているテーブルを使用して、一時テーブルの使用を減らすことをお勧めします。 詳細については、「Data Management」をご参照ください。

  2. ドラフト構成を表示します。

    SQL エディターの右側にある [構成] タブで、構成を表示または変更できます。

    タブ名

    構成の説明

    構成

    • エンジンバージョン: ドラフトの作成時に選択する Flink エンジンのバージョン。 エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。 推奨バージョンまたは安定バージョンを使用することをお勧めします。 有効な値:

      • 推奨: 現在のメジャーバージョンの最新のマイナーバージョン。

      • 安定: 更新の有効期間内にまだ使用されているメジャーバージョンの最新のマイナーバージョン。 以前のバージョンの不具合は修正されています。

      • 通常: 更新の有効期間内にまだ使用されているその他のマイナーバージョン。

      • 非推奨: 有効期限が切れたバージョン。

    • 追加の依存関係: 一時関数など、ドラフトで使用される追加の依存関係。

    • Kerberos 認証: Kerberos 認証を有効にし、登録済みの Kerberos 化された Hive クラスターと Kerberos プリンシパルを関連付けます。 Kerberos 化された Hive クラスターの登録方法の詳細については、「Kerberos 化された Hive クラスターの登録」をご参照ください。

    構造

    • フロー図: データフローと操作シーケンスのグラフィカルな表現。

    • ツリー図: データが処理されるソースを表示できるツリー図。

    バージョン

    デプロイメントのエンジンバージョンを表示できます。 [ドラフトバージョン] パネルの [アクション] 列で実行できる操作の詳細については、「スクリプトバージョンの管理」をご参照ください。

(オプション) ステップ 3: SQL ドラフトの検証とデバッグ

  1. SQL ドラフトを検証します。

    ドラフトの SQL セマンティクス、ネットワーク接続、およびドラフト内のテーブルのメタデータ情報を確認します。 計算結果の [SQL アドバイス] をクリックして、SQL のリスクと関連する最適化の提案に関する情報を表示することもできます。

    1. SQL エディターの右上隅にある [検証] をクリックします。

    2. [検証] ダイアログボックスで、[確認] をクリックします。

    説明

    ドラフトの検証中に、次のタイムアウトエラーが発生する場合があります:

    The RPC times out maybe because the SQL parsing is too complicated. Please consider enlarging the `flink.sqlserver.rpc.execution.timeout` option in flink-configuration, which by default is `120 s`.

    このエラーを修正するには、現在の SQL ドラフトの先頭に次の SET 文を追加します。

    SET 'flink.sqlserver.rpc.execution.timeout' = '600s';
  2. ドラフトをデバッグします。

    デバッグ機能を有効にして、ジョブの実行をシミュレートし、出力を確認し、SELECT 文と INSERT 文のビジネスロジックを検証できます。 この機能により、開発効率が向上し、データ品質が低下するリスクが軽減されます。

    説明

    ドラフトのデバッグ中に生成されたデータは、ダウンストリームシステムには書き込まれません。

    1. SQL エディターの右上隅にある [デバッグ] をクリックします。

    2. [デバッグ] ダイアログボックスで、セッションクラスターを選択し、[次へ] をクリックします。

      使用可能なセッションクラスターがない場合は、作成します。 セッションクラスターが SQL ドラフトと同じエンジンバージョンを使用していること、およびセッションクラスターが実行中であることを確認してください。 詳細については、「ステップ 1: セッションクラスターの作成」をご参照ください。

    3. デバッグデータを構成し、[確認] をクリックします。

      詳細については、「ステップ 2: ドラフトのデバッグ」をご参照ください。

ステップ 4: SQL ドラフトのデプロイ

SQL エディターの右上隅にある [デプロイ] をクリックします。 [ドラフトのデプロイ] ダイアログボックスで、関連するパラメーターを構成し、[確認] をクリックします。

[デプロイメントターゲット] フィールドを構成するときに、ドロップダウンリストからキューまたはセッションクラスターを選択してドラフトをデプロイできます。 次の表は、キューとセッションクラスターを比較したものです:

デプロイメントターゲット

適用可能な環境

特徴

キュー

本番環境

  • 排他的リソース: リソースはプリエンプションされないため、安定したジョブ実行が保証されます。

  • リソースの隔離: 追加のキューを追加して、リソースを隔離および管理できます。

  • 適用シナリオ: リソースキューは、長時間実行されるワークロードやビジネスクリティカルなワークロードに最適です。

セッションクラスター

開発環境またはテスト環境

  • 共有リソース: JobManager は複数の Flink ジョブで共有され、リソース使用率が向上します。

  • クイックスタートアップ: 開始されたリソースインスタンスは複数のジョブで再利用され、ジョブの起動を高速化します。

  • 適用シナリオ: セッションクラスターは、開発、テスト、または軽量のワークロードに最適です。 ただし、JobManager の再利用メカニズムが Flink ジョブの安定性を損なわないように、適切なリソースクォータを割り当てる必要があります。

    重要

    セッションクラスターのジョブログは利用できません。

ステップ 5: ジョブの開始と結果の表示

  1. 左側のナビゲーションメニューで、[O&M] > [デプロイメント] を選択します。

  2. 対象のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。

  3. [ジョブの開始] ダイアログボックスで、[初期モード] を選択し、[開始] をクリックします。

    デプロイメントが [実行中] 状態になると、デプロイメントは期待どおりに実行されています。 詳細については、「デプロイメントの開始」をご参照ください。

  4. 計算結果を表示します。

    1. 左側のナビゲーションメニューで、[O&M] > [デプロイメント] を選択します。

    2. 対象のデプロイメントの名前をクリックします。

    3. [ログ] タブと [実行中のタスクマネージャー] サブタブを選択します。

    4. [パス、ID] 列の TaskManager をクリックします。

    5. [実行中のタスクマネージャー] タブの下にある [ログ] サブタブを選択し、PrintSinkOutputWriter を検索します。

      flinksql作业快速启动.jpg

(オプション) ステップ 6: ジョブのキャンセル

SQL ドラフトへの変更の適用、状態なしでのジョブの再起動、静的構成の更新など、いくつかの状況でジョブをキャンセルすることがあります。 詳細については、「デプロイメントのキャンセル」をご参照ください。

  1. [O&M] > [デプロイメント] に移動します。

  2. 対象のデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。

  3. ダイアログボックスで、[OK] をクリックします。

参照