全部產品
Search
文件中心

Realtime Compute for Apache Flink:Hive方言作業快速入門

更新時間:Aug 09, 2025

Realtime ComputeFlink版支援使用Hive方言建立批次工作,通過相容Hive SQL文法增強與Hive互通性,便於從現有Hive作業平滑遷移至Realtime Compute管理主控台。

前提條件

使用限制

  • 僅VVR 8.0.11及以上版本支援使用Hive方言。

  • 目前僅支援Hive方言INSERT Statements文法,且需要在INSERT Statements之前聲明USE Catalog <yourHiveCatalog>

  • 暫不支援Hive和Flink自訂函數。

步驟一:建立Hive Catalog

  1. 配置Hive中繼資料,詳情請參見配置Hive中繼資料

  2. 建立Hive Catalog,詳情請參見建立Hive Catalog

    本樣本建立的Hive Catalog命名為hdfshive。

步驟二:準備Hive樣本資料表

  1. 資料開發 > 資料查詢頁面,單擊image.png建立,建立查詢指令碼。

  2. 執行如下SQL樣本。

    重要

    Hive源表和結果表必須使用CREATE TABLE建立永久表,而不是CREATE TEMPORARY TABLE建立暫存資料表。

    -- 使用Hive Catalog,其中hdfshive為步驟一建立的樣本名稱
    USE CATALOG hdfshive;   
    
    -- 來源資料表
    CREATE TABLE source_table (
     id INT,
     name STRING,
     age INT,
     city STRING,
     salary FLOAT
    )WITH ('connector' = 'hive');
    
    -- 結果資料表
    CREATE TABLE target_table (
    city STRING,
    avg_salary FLOAT,
    user_count INT
    )WITH ('connector' = 'hive');
    
    -- 寫入測試資料
    INSERT INTO source_table VALUES
    (1, 'Alice', 25, 'New York', 5000.0),
    (2, 'Bob', 30, 'San Francisco', 6000.0),
    (3, 'Charlie', 35, 'New York', 7000.0),
    (4, 'David', 40, 'San Francisco', 8000.0),
    (5, 'Eva', 45, 'Los Angeles', 9000.0);

步驟三:建立Hive SQL作業

  1. 在左側導覽列,單擊資料開發 > ETL

  2. 單擊建立後,在新增作業草稿對話方塊,選擇空白的批作業草稿(BETA),單擊下一步

  3. 填寫作業資訊。

    作業參數

    說明

    樣本

    檔案名稱

    作業的名稱。

    說明

    作業名稱在當前專案中必須保持唯一。

    hive-sql

    儲存位置

    指定該作業的代碼檔案所屬的檔案夾。

    您還可以在現有檔案夾右側,單擊建立檔案夾表徵圖,建立子檔案夾。

    作業草稿

    引擎版本

    當前作業使用的Flink引擎版本。

    建議使用帶有推薦標籤的版本,這些版本具有更高的可靠性和效能表現,關於引擎版本詳情請參見功能發布記錄引擎版本介紹

    vvr-8.0.11-flink-1.17

    SQL方言

    SQL資料處理語言。

    說明

    僅支援Hive方言的引擎版本才會顯示該配置項。

    Hive SQL

  4. 單擊建立

步驟四:編寫Hive SQL作業並部署

  1. 編寫SQL語句。

    本樣本計算各城市年齡大於30的使用者數量和平均工資,您可複製如下SQL樣本到SQL編輯地區。

    -- 使用Hive Catalog,其中hdfshive為步驟一建立的樣本名稱
    USE CATALOG hdfshive; 
    
    INSERT INTO TABLE target_table
    SELECT
      city,
      AVG(salary) AS avg_salary, -- 計算平均工資
      COUNT(id) AS user_count -- 計算使用者數量
    FROM source_table
    WHERE age > 30 -- 篩選年齡大於 30 的使用者
    GROUP BY city; -- 按城市分組
  2. 單擊右上方部署,在對話方塊中根據需要配置相關參數(本樣本保持預設),單擊確定

(可選)步驟五:配置作業運行參數

重要

如果您通過JindoSDK訪問Hive叢集,則需要執行該步驟。

  1. 在左側導覽列,單擊營運中心 > 作業營運

  2. 在下拉框中選擇批作業,單擊目標作業的詳情

    11

  3. 在部署詳情對話方塊中,單擊行參數配置地區右側的編輯

  4. 其他配置中,增加如下配置資訊。

    fs.oss.jindo.endpoint: <YOUR_Endpoint> 
    fs.oss.jindo.buckets: <YOUR_Buckets>
    fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId>
    fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>

    參數說明的詳細資料請參見寫OSS-HDFS

  5. 單擊儲存

步驟六:啟動作業並查看結果

  1. 單擊目標作業的啟動

    11

  2. 作業狀態變為已完成後,查看計算結果。

    資料開發 > 資料查詢頁面,執行以下SQL樣本,查看各城市年齡超過30歲的使用者數量及其平均工資的資料結果。

    -- 使用Hive Catalog,其中hdfshive為步驟一建立的樣本名稱
    USE CATALOG hdfshive; 
    
    select * from target_table;

    55

Hive Jar作業開發

當前VVP支援運行Hive方言作業通過Jar作業的形式運行。為了支援Jar作業運行,需要使用11.2版本及以上的"ververica-connector-hive-2.3.6" jar包,同時Jar作業中和VVP配置需要同步Hive相關配置。

  1. VVP配置。

    1. 上傳Jar作業Jar包在JAR Uri中。

    2. 附加依賴檔案中上傳hive叢集中的四個配置:core-site.xml, mapred-site.xml, hdfs-site.xml和hive-site.xml檔案。並上傳"ververica-connector-hive-2.3.6" jar包。

    3. 運行參數配置。根據hive叢集配置,若需寫入oss-hdfs,可參照(可選)步驟五:配置作業運行參數的配置。

      table.sql-dialect: HIVE
      classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime
      kubernetes.application-mode.classpath.include-user-jar: true
  2. Jar作業寫法樣本。

    1. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      Configuration conf = new Configuration();
      conf.setString("type", "hive");
      conf.setString("default-database", "default");
      conf.setString("hive-version", "2.3.6"); 
      conf.setString("hive-conf-dir", "/flink/usrlib/" );
      conf.setString("hadoop-conf-dir", "/flink/usrlib/");
      
      CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf);
      tableEnv.createCatalog("hivecat", descriptor);
      
      tableEnv.loadModule("hive", new HiveModule());
      tableEnv.useModules("hive");
      tableEnv.useCatalog("hivecat");
      
      tableEnv.executeSql("insert into `hivecat`.`default`.`test_write` select * from `hivecat`.`default`.`test_read`;");

相關文檔