Apache Flink 向けリアルタイムコンピューティングは、バッチ処理のための SQL ジョブ開発で Hive dialect をサポートしており、これにより Apache Hive との相互運用性が向上します。これにより、Apache Hive ジョブを Apache Flink 向けリアルタイムコンピューティングにシームレスに移行できます。
前提条件
Realtime Compute for Apache Flink の開発コンソールへのアクセスに使用する RAM ユーザーまたは RAM ロールに必要な権限が付与されていること。詳細については、「権限管理」をご参照ください。
Realtime Compute for Apache Flink ワークスペースが作成されていること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。
制限事項
Ververica Runtime (VVR) 8.0.11 以降を使用する Realtime Compute for Apache Flink のみ Hive ダイアレクトをサポートしています。
Hive dialect の INSERT 文のみがサポートされています。INSERT 文を使用するには、INSERT 文の前に
USE Catalog <yourHiveCatalog>を宣言します。Apache Hive および Realtime Compute for Apache Flink のユーザー定義関数 (UDF) はサポートされていません。
手順 1:Hive カタログを作成する
Hive メタデータを構成します。詳細については、「Hive メタデータの構成」をご参照ください。
Hive カタログを作成します。詳細については、「Hive カタログの作成」をご参照ください。
この例では、hdfshive という名前の Hive カタログが作成されます。
手順 2:Hive サンプルデータテーブルを準備する
を選択します。表示されるページで、
[新規] をクリックしてスクリプトを作成します。次の SQL 文を実行します。
重要CREATE TABLE文を実行して作成された永続テーブルを Hive のソーステーブルおよび結果テーブルとして使用する必要があります。CREATE TEMPORARY TABLE文を実行して作成された一時テーブルは、ソーステーブルおよび結果テーブルとして使用できません。-- 手順 1 で作成した hdfshive カタログを使用します。 USE CATALOG hdfshive; -- ソーステーブルを作成します。 CREATE TABLE source_table ( id INT, name STRING, age INT, city STRING, salary FLOAT )WITH ('connector' = 'hive'); -- 結果テーブルを作成します。 CREATE TABLE target_table ( city STRING, avg_salary FLOAT, user_count INT )WITH ('connector' = 'hive'); -- テストデータを結果テーブルに書き込みます。 INSERT INTO source_table VALUES (1, 'Alice', 25, 'New York', 5000.0), (2, 'Bob', 30, 'San Francisco', 6000.0), (3, 'Charlie', 35, 'New York', 7000.0), (4, 'David', 40, 'San Francisco', 8000.0), (5, 'Eva', 45, 'Los Angeles', 9000.0);
手順 3:Hive SQL デプロイメントを作成する
Realtime Compute for Apache Flink 開発コンソールの左側のナビゲーションウィンドウで、 を選択します。
[新規] をクリックします。[新規ドラフト] ダイアログボックスで、BETA ラベルの付いた [空のバッチドラフト] を選択し、[次へ] をクリックします。
ドラフト情報を入力します。
パラメーター
説明
例
名前
作成するドラフトの名前。
説明ドラフト名は、現在のプロジェクト内で一意である必要があります。
hive-sql
場所
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。ドラフト
エンジンバージョン
作成されるドラフトのエンジンバージョン。
[推奨] ラベルが付いたエンジンバージョンを使用することをお勧めします。このラベルが付いたバージョンは、より高い信頼性とパフォーマンスを提供します。エンジンバージョンの詳細については、「リリースノート」および「エンジンバージョン」をご参照ください。
vvr-8.0.11-flink-1.17
SQL ダイアレクト
SQL データ処理言語。
説明このパラメーターは、Hive ダイアレクトをサポートするエンジンバージョンが選択されている場合にのみ表示されます。
Hive SQL
[作成] をクリックします。
手順 4:Hive SQL ドラフトを作成してデプロイする
SQL エディターに SQL 文を入力します。
この例では、30 歳以上のユーザー数と、都市ごとのユーザーの平均給与を計算します。次の SQL 文を SQL エディターにコピーできます。
-- 手順 1 で作成した Hive カタログを使用します。 USE CATALOG hdfshive; INSERT INTO TABLE target_table SELECT city, AVG(salary) AS avg_salary, -- ユーザーの平均給与を計算します。 COUNT(id) AS user_count -- ユーザー数を計算します。 FROM source_table WHERE age > 30 -- 30 歳以上のユーザーをフィルタリングします。 GROUP BY city; -- 都市別にデータをグループ化します。右上隅にある [デプロイ] をクリックします。[ドラフトのデプロイ] ダイアログボックスで、ビジネス要件に基づいてパラメーターを構成します。この例では、デフォルト値を保持します。次に、[確認] をクリックします。
(オプション)手順 5:デプロイメント実行のパラメーターを構成するデプロイメント実行のパラメーター
JindoSDK を使用して Hive クラスタにアクセスする場合は、この手順を実行する必要があります。
Realtime Compute for Apache Flink 開発コンソールの左側のナビゲーションウィンドウで、 を選択します。
[デプロイメント] ページの上部にあるドロップダウンリストから [バッチ] を選択します。管理するデプロイメントを見つけて、[詳細] をクリックします。

表示されるパネルの [パラメーター] セクションで、セクションの右上隅にある [編集] をクリックします。
[その他の構成] フィールドに、次の構成を追加します。
fs.oss.jindo.endpoint: <YOUR_Endpoint> fs.oss.jindo.buckets: <YOUR_Buckets> fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId> fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>パラメーターの詳細については、「OSS-HDFS にデータを書き込む」をご参照ください。
セクションの右上隅にある [保存] をクリックします。
手順 6:デプロイメントを開始し、起動結果を表示する
デプロイメントの横にある [開始] をクリックします。

デプロイメントが [完了] 状態になったら、デプロイメントの計算結果を表示します。
を選択します。表示されるページで、次の SQL 文の例を実行して、30 歳以上のユーザー数と都市ごとの平均給与をクエリします。
-- 手順 1 で作成した Hive カタログを使用します。 USE CATALOG hdfshive; select * from target_table;
Table API で Hive dialect を使用する
Hologres VVR 11.2 以降では、Table API で Hive dialect を使用できます。ジョブが正しく実行されるようにするには、ververica-connector-hive-2.3.6 JAR をダウンロードし、コードとジョブ設定に Hive 構成を反映させます。
JAR デプロイメントを作成し、ジョブを構成します。
[デプロイメント] ページで JAR デプロイメントを作成し、[JAR URI] にアプリケーション JAR をアップロードします。
[追加の依存関係] には、Hive クラスターの構成ファイル (
core-site.xml、mapred-site.xml、hdfs-site.xml、hive-site.xml) と ververica-connector-hive-2.3.6 JAR をアップロードします。デプロイメント詳細ページに移動し、[構成] タブの [パラメーター] セクションで、次のようなジョブの実行時パラメーターを入力します。OSS-HDFS に書き込むには、「(オプション) ステップ 5: デプロイメント実行のためのパラメーターの構成」をご参照ください。
table.sql-dialect: HIVE classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime kubernetes.application-mode.classpath.include-user-jar: true
サンプルコード:
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration conf = new Configuration(); conf.setString("type", "hive"); conf.setString("default-database", "default"); conf.setString("hive-version", "2.3.6"); conf.setString("hive-conf-dir", "/flink/usrlib/" ); conf.setString("hadoop-conf-dir", "/flink/usrlib/"); CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf); tableEnv.createCatalog("hivecat", descriptor); tableEnv.loadModule("hive", new HiveModule()); tableEnv.useModules("hive"); tableEnv.useCatalog("hivecat"); tableEnv.executeSql("insert into `hivecat`.`default`.`test_write` select * from `hivecat`.`default`.`test_read`;");
関連情報
Hive ダイアレクトの INSERT 構文の詳細については、「INSERT 文 | Apache Flink」を参照してください。
Realtime Compute for Apache Flink のバッチ処理機能の使用方法の詳細については、「Realtime Compute for Apache Flink のバッチ処理の概要」をご参照ください。