Realtime Compute for Apache Flink では、Hive 方言を使用してバッチジョブを作成できます。これにより、Hive SQL 構文との相互運用性が向上し、既存の Hive ジョブを Realtime Compute for Apache Flink コンソールに容易に移行できます。
前提条件
-
Resource Access Management (RAM) ユーザーまたは RAM ロールを使用してコンソールにアクセスする場合、Flink コンソールに必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。
-
Flink ワークスペースが作成済みであること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
制限事項
-
Ververica Runtime (VVR) 8.0.11 以降のバージョンのみが Hive 方言をサポートします。
-
現在、Hive 方言の INSERT 文の構文のみがサポートされています。INSERT 文の前に
USE Catalog <yourHiveCatalog>を宣言する必要があります。 -
Hive および Flink のユーザー定義関数 (UDF) はサポートされていません。
ステップ 1:Hive カタログの作成
-
Hive メタデータを設定します。詳細については、「Hive メタデータの設定」をご参照ください。
-
Hive カタログを作成します。詳細については、「Hive カタログの作成」をご参照ください。
この例では、Hive カタログの名前は hdfshive です。
ステップ 2:サンプル Hive データテーブルの準備
-
ページで、[
新規作成] をクリックしてクエリ スクリプトを作成します。 -
次のサンプル SQL 文を実行します。
重要Hive のソーステーブルと結果テーブルは、
CREATE TEMPORARY TABLEを使用して作成された一時テーブルではなく、CREATE TABLEを使用して作成された永続テーブルである必要があります。-- Hive カタログを使用します。hdfshive はステップ 1 で作成したカタログの名前です。 USE CATALOG hdfshive; -- ソーステーブル CREATE TABLE source_table ( id INT, name STRING, age INT, city STRING, salary FLOAT )WITH ('connector' = 'hive'); -- 結果テーブル CREATE TABLE target_table ( city STRING, avg_salary FLOAT, user_count INT )WITH ('connector' = 'hive'); -- テストデータの書き込み INSERT INTO source_table VALUES (1, 'Alice', 25, 'New York', 5000.0), (2, 'Bob', 30, 'San Francisco', 6000.0), (3, 'Charlie', 35, 'New York', 7000.0), (4, 'David', 40, 'San Francisco', 8000.0), (5, 'Eva', 45, 'Los Angeles', 9000.0);
ステップ 3:Hive SQL ジョブの作成
-
左側のナビゲーションウィンドウで、 を選択します。
-
[新規作成] をクリックします。[ジョブドラフトの作成] ダイアログボックスで、[空のバッチジョブドラフト (BETA)] を選択し、[次へ] をクリックします。
-
ジョブ情報を入力します。
ジョブパラメーター
説明
例
ファイル名
ジョブの名前。
説明ジョブ名はプロジェクト内で一意である必要があります。
hive-sql
保存先
ジョブのコードファイルが保存されるフォルダ。
既存のフォルダの横にある
アイコンをクリックしてサブフォルダを作成することもできます。ジョブドラフト
エンジンバージョン
ジョブの Flink エンジンバージョン。
[推奨] ラベルが付いたバージョンを使用してください。これらのバージョンは、より高い信頼性と優れたパフォーマンスを提供します。エンジンバージョンの詳細については、「機能リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.11-flink-1.17
SQL 方言
SQL データ処理言語。
説明この設定項目は、Hive 方言をサポートするエンジンバージョンでのみ表示されます。
Hive SQL
-
[作成] をクリックします。
ステップ 4:Hive SQL ジョブの作成とデプロイ
-
SQL 文を作成します。
この例では、各都市における 30 歳以上のユーザー数とその平均給与を計算します。次のサンプル SQL 文を SQL エディターにコピーします。
-- Hive カタログを使用します。hdfshive はステップ 1 で作成したカタログの名前です。 USE CATALOG hdfshive; INSERT INTO TABLE target_table SELECT city, AVG(salary) AS avg_salary, -- 平均給与を計算 COUNT(id) AS user_count -- ユーザー数を計算 FROM source_table WHERE age > 30 -- 30 歳以上のユーザーをフィルタリング GROUP BY city; -- 都市でグループ化 -
右上隅にある [デプロイ] をクリックします。表示されるダイアログボックスで、必要に応じてパラメーターを設定し、[OK] をクリックします。この例では、デフォルト設定を使用します。
(任意) ステップ 5:ジョブ実行パラメーターの設定
JindoSDK を使用して Hive クラスターにアクセスする場合に、このステップを実行します。
-
左側のナビゲーションウィンドウで、 を選択します。
-
ドロップダウンリストから [バッチジョブ] を選択し、対象のジョブの [詳細] をクリックします。

-
[デプロイメント詳細] ダイアログボックスで、[ランタイム] [パラメーター設定] エリアの右側にある [編集] をクリックします。
-
[その他の設定] に、次の設定を追加します。
fs.oss.jindo.endpoint: <YOUR_Endpoint> fs.oss.jindo.buckets: <YOUR_Buckets> fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId> fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>パラメーターの詳細については、「OSS-HDFS へのデータ書き込み」をご参照ください。
-
[保存] をクリックします。
ステップ 6:ジョブの開始と結果の表示
-
対象のジョブの [開始] をクリックします。

-
ジョブのステータスが [完了] に変わったら、結果を表示できます。
ページで、次の SQL サンプルを実行して、各都市における 30 歳以上のユーザー数とその平均給与を表示します。
-- Hive カタログを使用します。hdfshive はステップ 1 で作成したカタログの名前です。 USE CATALOG hdfshive; select * from target_table;
Hive Jar ジョブの開発
VVR は、Hive 方言ジョブを JAR ジョブとして実行することをサポートしています。JAR ジョブを実行するには、バージョン 11.2 以降の "ververica-connector-hive-2.3.6" JAR パッケージを使用する必要があります。JAR ジョブと VVR の Hive 設定は一致している必要があります。
-
VVP 設定
-
JAR Uri フィールドにジョブの JAR パッケージをアップロードします。
-
[追加の依存関係] セクションで、Hive クラスターから 4 つの設定ファイル (core-site.xml、mapred-site.xml、hdfs-site.xml、hive-site.xml) をアップロードします。また、"ververica-connector-hive-2.3.6" JAR パッケージもアップロードする必要があります。
-
Hive クラスターの設定に基づいて実行パラメーターを設定します。OSS-HDFS にデータを書き込むには、「(任意) ステップ 5:ジョブ実行パラメーターの設定」の設定をご参照ください。
table.sql-dialect: HIVE classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime kubernetes.application-mode.classpath.include-user-jar: true
-
-
JAR ジョブコードの例
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration conf = new Configuration(); conf.setString("type", "hive"); conf.setString("default-database", "default"); conf.setString("hive-version", "2.3.6"); conf.setString("hive-conf-dir", "/flink/usrlib/" ); conf.setString("hadoop-conf-dir", "/flink/usrlib/"); CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf); tableEnv.createCatalog("hivecat", descriptor); tableEnv.loadModule("hive", new HiveModule()); tableEnv.useModules("hive"); tableEnv.useCatalog("hivecat"); tableEnv.executeSql("insert into `hivecat`.`default`.`test_write` select * from `hivecat`.`default`.`test_read`;");
参考資料
-
Hive 方言の INSERT 構文の詳細については、「INSERT 文 | Apache Flink」をご参照ください。
-
Flink SQL を使用したバッチデータ処理の詳細については、「Flink バッチジョブのクイックスタート」をご参照ください。