このトピックでは、SQL ドラフトとそのデプロイメントを管理して、Flink SQL ジョブを作成、デプロイ、開始、およびキャンセルする方法について説明します。
前提条件
Realtime Compute for Apache Flink の開発コンソールにアクセスするために使用する Resource Access Management (RAM) ユーザーまたはロールには、必要な権限が必要です。 詳細については、「権限管理」をご参照ください。
Flink ワークスペースが作成されています。 「Realtime Compute for Apache Flink のアクティベート」をご参照ください。
ステップ 1: SQL ドラフトの作成
SQL ドラフト作成ページに移動します。
対象のワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
開発コンソールが表示されます。
左側のナビゲーションメニューで、 を選択します。
をクリックします。 名前を入力し、エンジンバージョンを選択します。
または、[テンプレートから新規ドラフト] をクリックして、組み込みテンプレートを使用することもできます。 「コードテンプレート」および「データ同期テンプレート」をご参照ください。
設定項目
説明
例
名前
SQL ドラフト名。
説明現在の名前空間でユニバーサルに一意である必要があります。
flink-test
エンジンバージョン
SQL ジョブの VVR バージョン。
ジョブの信頼性とパフォーマンスを向上させるには、[RECOMMENDED] または [STABLE] のラベルが付いたエンジンバージョンを使用します。 詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-11.2-jdk11-flink-1.20
[作成] をクリックします。
ステップ 2: コードの記述とドラフト構成の表示
SQL でコードを記述します。
次の SQL 文を SQL エディターにコピーします。 この例では、Datagen コネクタを使用してランダムなデータストリームを生成し、Print コネクタを使用して計算結果をコンソール出力に表示します。 「サポートされているコネクタ」もご参照ください。
-- datagen_source という名前の一時ソーステーブルを作成します。 CREATE TEMPORARY TABLE datagen_source( randstr VARCHAR ) WITH ( 'connector'='datagen' -- Datagen コネクタを使用します。 ); -- print_table という名前の一時シンクテーブルを作成します。 CREATE TEMPORARY TABLE print_table( randstr VARCHAR ) WITH ( 'connector' = 'print', -- Print コネクタを使用します。 'logger' = 'true' -- 計算結果をコンソールに表示します。 ); -- print_table テーブルの randstr フィールドのデータを表示します。 INSERT INTO print_table SELECT SUBSTRING(randstr,0,8) from datagen_source;説明この例では、
INSERT INTO文を使用して単一のシンクにデータを書き込みます。INSERT INTO文を使用して複数のシンクにデータを書き込むこともできます。 詳細については、「INSERT INTO」をご参照ください。ドラフトを作成するときは、カタログに登録されているテーブルを使用して、一時テーブルの使用を減らすことをお勧めします。 詳細については、「Data Management」をご参照ください。
ドラフト構成を表示します。
SQL エディターの右側にある [構成] タブで、構成を表示または変更できます。
タブ名
構成の説明
構成
エンジンバージョン: ドラフトの作成時に選択する Flink エンジンのバージョン。 エンジンバージョンの詳細については、「エンジンバージョン」および「ライフサイクルポリシー」をご参照ください。 推奨バージョンまたは安定バージョンを使用することをお勧めします。 有効な値:
推奨: 現在のメジャーバージョンの最新のマイナーバージョン。
安定: 更新の有効期間内にまだ使用されているメジャーバージョンの最新のマイナーバージョン。 以前のバージョンの不具合は修正されています。
通常: 更新の有効期間内にまだ使用されているその他のマイナーバージョン。
非推奨: 有効期限が切れたバージョン。
追加の依存関係: 一時関数など、ドラフトで使用される追加の依存関係。
Kerberos 認証: Kerberos 認証を有効にし、登録済みの Kerberos 化された Hive クラスターと Kerberos プリンシパルを関連付けます。 Kerberos 化された Hive クラスターの登録方法の詳細については、「Kerberos 化された Hive クラスターの登録」をご参照ください。
構造
フロー図: データフローと操作シーケンスのグラフィカルな表現。
ツリー図: データが処理されるソースを表示できるツリー図。
バージョン
デプロイメントのエンジンバージョンを表示できます。 [ドラフトバージョン] パネルの [アクション] 列で実行できる操作の詳細については、「スクリプトバージョンの管理」をご参照ください。
(オプション) ステップ 3: SQL ドラフトの検証とデバッグ
SQL ドラフトを検証します。
ドラフトの SQL セマンティクス、ネットワーク接続、およびドラフト内のテーブルのメタデータ情報を確認します。 計算結果の [SQL アドバイス] をクリックして、SQL のリスクと関連する最適化の提案に関する情報を表示することもできます。
SQL エディターの右上隅にある [検証] をクリックします。
[検証] ダイアログボックスで、[確認] をクリックします。
説明ドラフトの検証中に、次のタイムアウトエラーが発生する場合があります:
The RPC times out maybe because the SQL parsing is too complicated. Please consider enlarging the `flink.sqlserver.rpc.execution.timeout` option in flink-configuration, which by default is `120 s`.このエラーを修正するには、現在の SQL ドラフトの先頭に次の SET 文を追加します。
SET 'flink.sqlserver.rpc.execution.timeout' = '600s';ドラフトをデバッグします。
デバッグ機能を有効にして、ジョブの実行をシミュレートし、出力を確認し、SELECT 文と INSERT 文のビジネスロジックを検証できます。 この機能により、開発効率が向上し、データ品質が低下するリスクが軽減されます。
説明ドラフトのデバッグ中に生成されたデータは、ダウンストリームシステムには書き込まれません。
SQL エディターの右上隅にある [デバッグ] をクリックします。
[デバッグ] ダイアログボックスで、セッションクラスターを選択し、[次へ] をクリックします。
使用可能なセッションクラスターがない場合は、作成します。 セッションクラスターが SQL ドラフトと同じエンジンバージョンを使用していること、およびセッションクラスターが実行中であることを確認してください。 詳細については、「ステップ 1: セッションクラスターの作成」をご参照ください。
デバッグデータを構成し、[確認] をクリックします。
詳細については、「ステップ 2: ドラフトのデバッグ」をご参照ください。
ステップ 4: SQL ドラフトのデプロイ
SQL エディターの右上隅にある [デプロイ] をクリックします。 [ドラフトのデプロイ] ダイアログボックスで、関連するパラメーターを構成し、[確認] をクリックします。
[デプロイメントターゲット] フィールドを構成するときに、ドロップダウンリストからキューまたはセッションクラスターを選択してドラフトをデプロイできます。 次の表は、キューとセッションクラスターを比較したものです:
デプロイメントターゲット | 適用可能な環境 | 特徴 |
キュー | 本番環境 |
|
セッションクラスター | 開発環境またはテスト環境 |
|
ステップ 5: ジョブの開始と結果の表示
左側のナビゲーションメニューで、 を選択します。
対象のデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
[ジョブの開始] ダイアログボックスで、[初期モード] を選択し、[開始] をクリックします。
デプロイメントが [実行中] 状態になると、デプロイメントは期待どおりに実行されています。 詳細については、「デプロイメントの開始」をご参照ください。
計算結果を表示します。
左側のナビゲーションメニューで、 を選択します。
対象のデプロイメントの名前をクリックします。
[ログ] タブと [実行中のタスクマネージャー] サブタブを選択します。
[パス、ID] 列の TaskManager をクリックします。
[実行中のタスクマネージャー] タブの下にある [ログ] サブタブを選択し、
PrintSinkOutputWriterを検索します。
(オプション) ステップ 6: ジョブのキャンセル
SQL ドラフトへの変更の適用、状態なしでのジョブの再起動、静的構成の更新など、いくつかの状況でジョブをキャンセルすることがあります。 詳細については、「デプロイメントのキャンセル」をご参照ください。
に移動します。
対象のデプロイメントを見つけ、[アクション] 列の [キャンセル] をクリックします。
ダイアログボックスで、[OK] をクリックします。
参照
開発と O&M に関するよくある質問
ジョブリソースとログの構成
ジョブを開始する前にジョブのリソースを構成したり、実行中のデプロイメントのリソース構成を変更したりできます。 Realtime Compute for Apache Flink には、基本モード (粗粒度) とエキスパートモード (詳細) のリソース構成モードがあります。 詳細については、「デプロイメントのリソースの構成」をご参照ください。
ジョブログを外部ストレージにエクスポートしたり、さまざまなレベルのログをさまざまなシステムに出力したりできます。 詳細については、「ジョブログ出力の構成」をご参照ください。
開発チュートリアル
ベストプラクティス
> [新しい空白のストリームドラフト]