すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Flink SQL ジョブのクイックスタート

最終更新日:Mar 07, 2026

本トピックでは、シンプルな例を用いて、Flink SQL ジョブの作成、デプロイ、および起動方法を説明します。Flink SQL ジョブの基本的な開発ワークフローを学習できます。

前提条件

  • Resource Access Management (RAM) ユーザーまたは RAM ロールを使用して Realtime Compute for Apache Flink にアクセスする場合、当該 RAM ユーザーまたは RAM ロールが必要な権限を持っていることを確認してください。詳細については、「権限管理」をご参照ください。

  • Flink ワークスペースが作成されました。詳細については、「Realtime Compute for Apache Flink を有効化する」をご参照ください。

ステップ 1:ジョブの作成

  1. SQL ジョブ作成ページに移動します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. 対象のワークスペースの コンソール を、操作 列からクリックします。

    3. 左側のナビゲーションウィンドウで、データ開発 > ETL を選択します。

  2. image アイコンをクリックし、新規ストリームジョブ を選択します。ファイル名 を入力し、エンジンバージョン を選択します。

    Realtime Compute for Apache Flink では、さまざまなコードおよびデータ同期テンプレートを提供しています。各テンプレートには特定のユースケース、コード例、および手順が含まれており、Flink の機能や構文を迅速に学習し、ビジネスロジックを実装できます。詳細については、「コードテンプレート」および「データ同期テンプレート」をご参照ください。

    image

    ジョブパラメーター

    説明

    ファイル名

    ジョブの名称です。

    説明

    ジョブ名は、現在のプロジェクト内で一意である必要があります。

    flink-test

    エンジンバージョン

    当該ジョブで使用する Flink エンジンのバージョンです。

    推奨 または 安定版 のタグが付与されたバージョンをご利用ください。これらのバージョンは、高い信頼性とパフォーマンスを提供します。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-8.0.8-flink-1.17

  3. 作成 をクリックします。

ステップ 2:SQL ジョブの記述と構成情報の確認

  1. SQL ジョブを記述します。

    以下の SQL コードを SQL エディターにコピーします。この例では、Datagen コネクタを使用してランダムなデータストリームを生成し、Print コネクタを使用して結果を開発コンソールに出力します。サポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。

    -- datagen_source という一時ソーステーブルを作成します。
    CREATE TEMPORARY TABLE datagen_source(
      randstr VARCHAR
    ) WITH (
      'connector' = 'datagen' -- Datagen コネクタ
    );
    
    -- print_table という一時シンクテーブルを作成します。
    CREATE TEMPORARY TABLE print_table(
      randstr  VARCHAR
    ) WITH (
      'connector' = 'print',   -- Print コネクタ
      'logger' = 'true'        -- 結果をコンソールに表示します。
    );
    
    -- randstr フィールドを切り取り、その結果を出力します。
    INSERT INTO print_table
    SELECT SUBSTRING(randstr,0,8) from datagen_source;
    説明
    • この SQL の例では、INSERT INTO を使用して単一のシンクにデータを書き込む方法を示しています。また、INSERT INTO を使用して複数のシンクにデータを書き込むことも可能です。詳細については、「INSERT INTO 文」をご参照ください。

    • 本番環境向けのジョブでは、一時テーブルの使用を最小限に抑えてください。代わりに、Data Management に登録されたテーブルをご利用ください。詳細については、「Data Management」をご参照ください。

  2. 構成情報を確認します。

    SQL エディター右側のタブから、構成の表示またはアップロードが可能です。

    タブ名

    説明

    その他の構成

    • エンジンバージョン:エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。推奨または安定版のバージョンをご利用いただくことを推奨します。エンジンバージョンのタグの意味は以下のとおりです:

      • 推奨:最新のメジャーバージョンにおける最新のマイナーバージョン。

      • 安定版:サービス中であるメジャーバージョンにおける最新のマイナーバージョン(以前のバージョンのバグが修正済み)。

      • 通常:サービス中であるその他のマイナーバージョン。

      • 非推奨:サービス終了済みのバージョン。

    • 追加の依存関係:ジョブの追加依存関係(一時関数など)。

    • Kerberos 認証:Kerberos 認証を有効化します。登録済みの Kerberos クラスターおよびプリンシパル情報を設定します。Kerberos クラスターを登録していない場合は、「Hive Kerberos クラスターの登録」をご参照ください。

    コード構造

    • データフロー:データフローを表示します。

    • ツリー構造:データソースをツリー構造で表示します。

    バージョン情報

    ジョブのバージョン情報をここで確認できます。操作 列の機能の詳細については、「ジョブバージョンの管理」をご参照ください。

ステップ 3(任意):ジョブの詳細チェックおよびデバッグ

  1. ジョブの詳細チェックを実行します。

    詳細チェックでは、ジョブの SQL セマンティクス、ネットワーク接続、およびジョブで使用されるテーブルのメタデータを検証します。また、結果領域で SQL 最適化 をクリックすると、SQL のリスクアラートおよび最適化の提案を確認できます。

    1. SQL エディターの右上隅にある 詳細チェック をクリックします。

    2. 詳細チェック ダイアログボックスで、確認 をクリックします。

    説明

    ジョブの詳細チェック中にタイムアウトエラーが発生することがあります。次のエラーメッセージが返されます:

    RPC がタイムアウトした可能性があります。これは、SQL 解析が複雑すぎるためです。デフォルト値が `120 s` である `flink.sqlserver.rpc.execution.timeout` オプションを flink-configuration で増やすことを検討してください。

    解決策:ジョブ編集ページの先頭に、以下のパラメーター設定を追加してください。

    SET 'flink.sqlserver.rpc.execution.timeout' = '600s';
  2. ジョブをデバッグします。

    デバッグ機能を使用すると、ジョブの実行をシミュレートし、出力を確認して SELECT や INSERT 文のロジックを検証できます。これにより、開発効率が向上し、データ品質リスクが低減されます。

    説明

    デバッグ機能では、下流のシンクテーブルにデータは書き込まれません。

    1. SQL エディターの右上隅にある デバッグ をクリックします。

    2. デバッグ ダイアログボックスで、デバッグクラスターを選択し、次へ をクリックします。

      利用可能なクラスターがない場合は、セッションクラスターを作成する必要があります。セッションクラスターは、SQL ジョブと同じエンジンバージョンを使用し、実行中(Running)の状態である必要があります。詳細については、「ステップ 1:セッションクラスターの作成」をご参照ください。

    3. デバッグデータを構成し、OK をクリックします。

      構成の詳細については、「ステップ 2:ジョブのデバッグ」をご参照ください。

ステップ 4:ジョブのデプロイメント

SQL エディターの右上隅にある デプロイ をクリックします。新規バージョンのデプロイ ダイアログボックスで、必要に応じてパラメーターを構成し、OK をクリックします。

ジョブをデプロイする際、デプロイ先リソースキュー または セッションクラスター に設定できます。各オプションの詳細は以下のとおりです:

デプロイ先

適用環境

主な特徴

リソースキュー

本番環境

  • 排他的リソース:ジョブのリソースが他から奪われないため、安定性が確保されます。

  • リソース隔離:リソースキューを追加することで、リソースの隔離および管理が可能です。

  • 適用シーン:長時間実行または高優先度のタスクに適しています。

セッションクラスター

開発およびテスト環境

  • 共有リソース:複数のジョブが同じ Job Manager(JM)を共有し、JM のリソース利用率を向上させます。

  • 高速起動:複数のジョブが初期化済みのリソースインスタンスを再利用することで、ジョブの起動を加速します。

  • 適用シーン:開発、テスト、軽量タスクに適しています。JM の再利用メカニズムがジョブの安定性に影響を与えないよう、リソースクォータを計画してください。

重要

セッションクラスター上で実行されるジョブのログは表示できません。

ステップ 5:ジョブの起動および結果の確認

  1. 左側のナビゲーションウィンドウで、オペレーションセンター > ジョブ操作 を選択します。

  2. 対象のジョブに対し、起動操作 列からクリックします。

    ステートレス起動 を選択し、起動 をクリックします。ジョブのステータスが 実行中 に変更されれば、正常に起動されています。ジョブ起動パラメーターの詳細については、「ジョブの起動」をご参照ください。

  3. ジョブ O&M の詳細ページで、Flink の計算結果を確認できます。

    1. オペレーションセンター > ジョブ O&M ページに移動し、対象のジョブをクリックします。

    2. ジョブログ タブで、実行中のタスクマネージャー タブをクリックし、パス、ID 列からタスクをクリックします。

    3. ログ をクリックし、「PrintSinkOutputWriter」に関連するログを検索します。

      flinksql作业快速启动.jpg

ステップ 6(任意):ジョブの停止

ジョブを変更し、その変更を反映させるには、ジョブを再デプロイした後、停止して再起動する必要があります。変更の例としては、コードの変更、WITH パラメーターの追加または削除、ジョブバージョンの変更などが挙げられます。また、ジョブがステートを再利用できない場合、新しいジョブを起動したい場合、または動的に効果を及ぼさないパラメーターを更新した場合にも、ジョブを停止・再起動する必要があります。ジョブの停止方法の詳細については、「ジョブの停止」をご参照ください。

  1. オペレーションセンター > ジョブ O&M ページで、対象のジョブに対し、停止操作 列からクリックします。

  2. OK をクリックします。

関連ドキュメント