本トピックでは、シンプルな例を用いて、Flink SQL ジョブの作成、デプロイ、および起動方法を説明します。Flink SQL ジョブの基本的な開発ワークフローを学習できます。
前提条件
-
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用して Realtime Compute for Apache Flink にアクセスする場合、当該 RAM ユーザーまたは RAM ロールが必要な権限を持っていることを確認してください。詳細については、「権限管理」をご参照ください。
-
Flink ワークスペースが作成されました。詳細については、「Realtime Compute for Apache Flink を有効化する」をご参照ください。
ステップ 1:ジョブの作成
-
SQL ジョブ作成ページに移動します。
-
Realtime Compute for Apache Flink コンソール にログインします。
-
対象のワークスペースの コンソール を、操作 列からクリックします。
-
左側のナビゲーションウィンドウで、 を選択します。
-
-
アイコンをクリックし、新規ストリームジョブ を選択します。ファイル名 を入力し、エンジンバージョン を選択します。Realtime Compute for Apache Flink では、さまざまなコードおよびデータ同期テンプレートを提供しています。各テンプレートには特定のユースケース、コード例、および手順が含まれており、Flink の機能や構文を迅速に学習し、ビジネスロジックを実装できます。詳細については、「コードテンプレート」および「データ同期テンプレート」をご参照ください。

ジョブパラメーター
説明
例
ファイル名
ジョブの名称です。
説明ジョブ名は、現在のプロジェクト内で一意である必要があります。
flink-test
エンジンバージョン
当該ジョブで使用する Flink エンジンのバージョンです。
推奨 または 安定版 のタグが付与されたバージョンをご利用ください。これらのバージョンは、高い信頼性とパフォーマンスを提供します。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.8-flink-1.17
-
作成 をクリックします。
ステップ 2:SQL ジョブの記述と構成情報の確認
-
SQL ジョブを記述します。
以下の SQL コードを SQL エディターにコピーします。この例では、Datagen コネクタを使用してランダムなデータストリームを生成し、Print コネクタを使用して結果を開発コンソールに出力します。サポートされているコネクタの詳細については、「サポートされているコネクタ」をご参照ください。
-- datagen_source という一時ソーステーブルを作成します。 CREATE TEMPORARY TABLE datagen_source( randstr VARCHAR ) WITH ( 'connector' = 'datagen' -- Datagen コネクタ ); -- print_table という一時シンクテーブルを作成します。 CREATE TEMPORARY TABLE print_table( randstr VARCHAR ) WITH ( 'connector' = 'print', -- Print コネクタ 'logger' = 'true' -- 結果をコンソールに表示します。 ); -- randstr フィールドを切り取り、その結果を出力します。 INSERT INTO print_table SELECT SUBSTRING(randstr,0,8) from datagen_source;説明-
この SQL の例では、
INSERT INTOを使用して単一のシンクにデータを書き込む方法を示しています。また、INSERT INTOを使用して複数のシンクにデータを書き込むことも可能です。詳細については、「INSERT INTO 文」をご参照ください。 -
本番環境向けのジョブでは、一時テーブルの使用を最小限に抑えてください。代わりに、Data Management に登録されたテーブルをご利用ください。詳細については、「Data Management」をご参照ください。
-
-
構成情報を確認します。
SQL エディター右側のタブから、構成の表示またはアップロードが可能です。
タブ名
説明
その他の構成
-
エンジンバージョン:エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。推奨または安定版のバージョンをご利用いただくことを推奨します。エンジンバージョンのタグの意味は以下のとおりです:
-
推奨:最新のメジャーバージョンにおける最新のマイナーバージョン。
-
安定版:サービス中であるメジャーバージョンにおける最新のマイナーバージョン(以前のバージョンのバグが修正済み)。
-
通常:サービス中であるその他のマイナーバージョン。
-
非推奨:サービス終了済みのバージョン。
-
-
追加の依存関係:ジョブの追加依存関係(一時関数など)。
-
Kerberos 認証:Kerberos 認証を有効化します。登録済みの Kerberos クラスターおよびプリンシパル情報を設定します。Kerberos クラスターを登録していない場合は、「Hive Kerberos クラスターの登録」をご参照ください。
コード構造
-
データフロー:データフローを表示します。
-
ツリー構造:データソースをツリー構造で表示します。
バージョン情報
ジョブのバージョン情報をここで確認できます。操作 列の機能の詳細については、「ジョブバージョンの管理」をご参照ください。
-
ステップ 3(任意):ジョブの詳細チェックおよびデバッグ
-
ジョブの詳細チェックを実行します。
詳細チェックでは、ジョブの SQL セマンティクス、ネットワーク接続、およびジョブで使用されるテーブルのメタデータを検証します。また、結果領域で SQL 最適化 をクリックすると、SQL のリスクアラートおよび最適化の提案を確認できます。
-
SQL エディターの右上隅にある 詳細チェック をクリックします。
-
詳細チェック ダイアログボックスで、確認 をクリックします。
説明ジョブの詳細チェック中にタイムアウトエラーが発生することがあります。次のエラーメッセージが返されます:
RPC がタイムアウトした可能性があります。これは、SQL 解析が複雑すぎるためです。デフォルト値が `120 s` である `flink.sqlserver.rpc.execution.timeout` オプションを flink-configuration で増やすことを検討してください。
解決策:ジョブ編集ページの先頭に、以下のパラメーター設定を追加してください。
SET 'flink.sqlserver.rpc.execution.timeout' = '600s'; -
-
ジョブをデバッグします。
デバッグ機能を使用すると、ジョブの実行をシミュレートし、出力を確認して SELECT や INSERT 文のロジックを検証できます。これにより、開発効率が向上し、データ品質リスクが低減されます。
説明デバッグ機能では、下流のシンクテーブルにデータは書き込まれません。
-
SQL エディターの右上隅にある デバッグ をクリックします。
-
デバッグ ダイアログボックスで、デバッグクラスターを選択し、次へ をクリックします。
利用可能なクラスターがない場合は、セッションクラスターを作成する必要があります。セッションクラスターは、SQL ジョブと同じエンジンバージョンを使用し、実行中(Running)の状態である必要があります。詳細については、「ステップ 1:セッションクラスターの作成」をご参照ください。
-
デバッグデータを構成し、OK をクリックします。
構成の詳細については、「ステップ 2:ジョブのデバッグ」をご参照ください。
-
ステップ 4:ジョブのデプロイメント
SQL エディターの右上隅にある デプロイ をクリックします。新規バージョンのデプロイ ダイアログボックスで、必要に応じてパラメーターを構成し、OK をクリックします。
ジョブをデプロイする際、デプロイ先 を リソースキュー または セッションクラスター に設定できます。各オプションの詳細は以下のとおりです:
|
デプロイ先 |
適用環境 |
主な特徴 |
|
リソースキュー |
本番環境 |
|
|
セッションクラスター |
開発およびテスト環境 |
重要
セッションクラスター上で実行されるジョブのログは表示できません。 |
ステップ 5:ジョブの起動および結果の確認
-
左側のナビゲーションウィンドウで、 を選択します。
-
対象のジョブに対し、起動 を 操作 列からクリックします。
ステートレス起動 を選択し、起動 をクリックします。ジョブのステータスが 実行中 に変更されれば、正常に起動されています。ジョブ起動パラメーターの詳細については、「ジョブの起動」をご参照ください。
-
ジョブ O&M の詳細ページで、Flink の計算結果を確認できます。
-
ページに移動し、対象のジョブをクリックします。
-
ジョブログ タブで、実行中のタスクマネージャー タブをクリックし、パス、ID 列からタスクをクリックします。
-
ログ をクリックし、「PrintSinkOutputWriter」に関連するログを検索します。

-
ステップ 6(任意):ジョブの停止
ジョブを変更し、その変更を反映させるには、ジョブを再デプロイした後、停止して再起動する必要があります。変更の例としては、コードの変更、WITH パラメーターの追加または削除、ジョブバージョンの変更などが挙げられます。また、ジョブがステートを再利用できない場合、新しいジョブを起動したい場合、または動的に効果を及ぼさないパラメーターを更新した場合にも、ジョブを停止・再起動する必要があります。ジョブの停止方法の詳細については、「ジョブの停止」をご参照ください。
-
ページで、対象のジョブに対し、停止 を 操作 列からクリックします。
-
OK をクリックします。
関連ドキュメント
-
ジョブ開発および O&M のよくある質問
-
ジョブ情報の構成
-
ジョブを起動する前またはオンライン化後にジョブリソースを構成できます。リソース割り当てには、基本モード(粗粒度)または専門家モード(細粒度)を使用できます。詳細については、「ジョブリソースの構成」をご参照ください。
-
ジョブのログレベルを構成し、異なるログレベルの出力を構成できます。詳細については、「ジョブログ出力の構成」をご参照ください。
-
-
その他のジョブタイプの開発ワークフロー
-
Flink のベストプラクティス