すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Hive 方言ジョブのクイックスタート

最終更新日:Mar 07, 2026

Realtime Compute for Apache Flink では、Hive 方言を使用してバッチジョブを作成できます。これにより、Hive SQL 構文との相互運用性が向上し、既存の Hive ジョブを Realtime Compute for Apache Flink コンソールに容易に移行できます。

前提条件

  • Resource Access Management (RAM) ユーザーまたは RAM ロールを使用してコンソールにアクセスする場合、Flink コンソールに必要な権限があることを確認してください。詳細については、「権限管理」をご参照ください。

  • Flink ワークスペースが作成済みであること。詳細については、「Realtime Compute for Apache Flink の有効化」をご参照ください。

制限事項

  • Ververica Runtime (VVR) 8.0.11 以降のバージョンのみが Hive 方言をサポートします。

  • 現在、Hive 方言の INSERT 文の構文のみがサポートされています。INSERT 文の前に USE Catalog <yourHiveCatalog> を宣言する必要があります。

  • Hive および Flink のユーザー定義関数 (UDF) はサポートされていません。

ステップ 1:Hive カタログの作成

  1. Hive メタデータを設定します。詳細については、「Hive メタデータの設定」をご参照ください。

  2. Hive カタログを作成します。詳細については、「Hive カタログの作成」をご参照ください。

    この例では、Hive カタログの名前は hdfshive です。

ステップ 2:サンプル Hive データテーブルの準備

  1. [DataStudio] > [データクエリ] ページで、[image.png新規作成] をクリックしてクエリ スクリプトを作成します。

  2. 次のサンプル SQL 文を実行します。

    重要

    Hive のソーステーブルと結果テーブルは、CREATE TEMPORARY TABLE を使用して作成された一時テーブルではなく、CREATE TABLE を使用して作成された永続テーブルである必要があります。

    -- Hive カタログを使用します。hdfshive はステップ 1 で作成したカタログの名前です。
    USE CATALOG hdfshive;   
    
    -- ソーステーブル
    CREATE TABLE source_table (
     id INT,
     name STRING,
     age INT,
     city STRING,
     salary FLOAT
    )WITH ('connector' = 'hive');
    
    -- 結果テーブル
    CREATE TABLE target_table (
    city STRING,
    avg_salary FLOAT,
    user_count INT
    )WITH ('connector' = 'hive');
    
    -- テストデータの書き込み
    INSERT INTO source_table VALUES
    (1, 'Alice', 25, 'New York', 5000.0),
    (2, 'Bob', 30, 'San Francisco', 6000.0),
    (3, 'Charlie', 35, 'New York', 7000.0),
    (4, 'David', 40, 'San Francisco', 8000.0),
    (5, 'Eva', 45, 'Los Angeles', 9000.0);

ステップ 3:Hive SQL ジョブの作成

  1. 左側のナビゲーションウィンドウで、[Data Studio] > [ETL] を選択します

  2. [新規作成] をクリックします。[ジョブドラフトの作成] ダイアログボックスで、[空のバッチジョブドラフト (BETA)] を選択し、[次へ] をクリックします。

  3. ジョブ情報を入力します。

    ジョブパラメーター

    説明

    ファイル名

    ジョブの名前。

    説明

    ジョブ名はプロジェクト内で一意である必要があります。

    hive-sql

    保存先

    ジョブのコードファイルが保存されるフォルダ。

    既存のフォルダの横にある 新建文件夹 アイコンをクリックしてサブフォルダを作成することもできます。

    ジョブドラフト

    エンジンバージョン

    ジョブの Flink エンジンバージョン。

    [推奨] ラベルが付いたバージョンを使用してください。これらのバージョンは、より高い信頼性と優れたパフォーマンスを提供します。エンジンバージョンの詳細については、「機能リリースノート」および「エンジンバージョン」をご参照ください。

    vvr-8.0.11-flink-1.17

    SQL 方言

    SQL データ処理言語。

    説明

    この設定項目は、Hive 方言をサポートするエンジンバージョンでのみ表示されます。

    Hive SQL

  4. [作成] をクリックします。

ステップ 4:Hive SQL ジョブの作成とデプロイ

  1. SQL 文を作成します。

    この例では、各都市における 30 歳以上のユーザー数とその平均給与を計算します。次のサンプル SQL 文を SQL エディターにコピーします。

    -- Hive カタログを使用します。hdfshive はステップ 1 で作成したカタログの名前です。
    USE CATALOG hdfshive; 
    
    INSERT INTO TABLE target_table
    SELECT
      city,
      AVG(salary) AS avg_salary, -- 平均給与を計算
      COUNT(id) AS user_count -- ユーザー数を計算
    FROM source_table
    WHERE age > 30 -- 30 歳以上のユーザーをフィルタリング
    GROUP BY city; -- 都市でグループ化
  2. 右上隅にある [デプロイ] をクリックします。表示されるダイアログボックスで、必要に応じてパラメーターを設定し、[OK] をクリックします。この例では、デフォルト設定を使用します。

(任意) ステップ 5:ジョブ実行パラメーターの設定

重要

JindoSDK を使用して Hive クラスターにアクセスする場合に、このステップを実行します。

  1. 左側のナビゲーションウィンドウで、[オペレーションセンター] > [ジョブ管理] を選択します。

  2. ドロップダウンリストから [バッチジョブ] を選択し、対象のジョブの [詳細] をクリックします。

    11

  3. [デプロイメント詳細] ダイアログボックスで、[ランタイム] [パラメーター設定] エリアの右側にある [編集] をクリックします。

  4. [その他の設定] に、次の設定を追加します。

    fs.oss.jindo.endpoint: <YOUR_Endpoint> 
    fs.oss.jindo.buckets: <YOUR_Buckets>
    fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId>
    fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>

    パラメーターの詳細については、「OSS-HDFS へのデータ書き込み」をご参照ください。

  5. [保存] をクリックします。

ステップ 6:ジョブの開始と結果の表示

  1. 対象のジョブの [開始] をクリックします。

    11

  2. ジョブのステータスが [完了] に変わったら、結果を表示できます。

    [データ開発] > [データ照会] ページで、次の SQL サンプルを実行して、各都市における 30 歳以上のユーザー数とその平均給与を表示します。

    -- Hive カタログを使用します。hdfshive はステップ 1 で作成したカタログの名前です。
    USE CATALOG hdfshive; 
    
    select * from target_table;

    55

Hive Jar ジョブの開発

VVR は、Hive 方言ジョブを JAR ジョブとして実行することをサポートしています。JAR ジョブを実行するには、バージョン 11.2 以降の "ververica-connector-hive-2.3.6" JAR パッケージを使用する必要があります。JAR ジョブと VVR の Hive 設定は一致している必要があります。

  1. VVP 設定

    1. JAR Uri フィールドにジョブの JAR パッケージをアップロードします。

    2. [追加の依存関係] セクションで、Hive クラスターから 4 つの設定ファイル (core-site.xml、mapred-site.xml、hdfs-site.xml、hive-site.xml) をアップロードします。また、"ververica-connector-hive-2.3.6" JAR パッケージもアップロードする必要があります。

    3. Hive クラスターの設定に基づいて実行パラメーターを設定します。OSS-HDFS にデータを書き込むには、「(任意) ステップ 5:ジョブ実行パラメーターの設定」の設定をご参照ください。

      table.sql-dialect: HIVE
      classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime
      kubernetes.application-mode.classpath.include-user-jar: true
  2. JAR ジョブコードの例

    1. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      Configuration conf = new Configuration();
      conf.setString("type", "hive");
      conf.setString("default-database", "default");
      conf.setString("hive-version", "2.3.6"); 
      conf.setString("hive-conf-dir", "/flink/usrlib/" );
      conf.setString("hadoop-conf-dir", "/flink/usrlib/");
      
      CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf);
      tableEnv.createCatalog("hivecat", descriptor);
      
      tableEnv.loadModule("hive", new HiveModule());
      tableEnv.useModules("hive");
      tableEnv.useCatalog("hivecat");
      
      tableEnv.executeSql("insert into `hivecat`.`default`.`test_write` select * from `hivecat`.`default`.`test_read`;");

参考資料