Realtime ComputeFlink版支援使用Hive方言建立批次工作,通過相容Hive SQL文法增強與Hive互通性,便於從現有Hive作業平滑遷移至Realtime Compute管理主控台。
前提條件
如果您使用RAM使用者或RAM角色等身份訪問,需要確認已具有Flink控制台相關許可權,詳情請參見許可權管理。
已建立Flink工作空間,詳情請參見開通Realtime ComputeFlink版。
使用限制
僅VVR 8.0.11及以上版本支援使用Hive方言。
目前僅支援Hive方言INSERT Statements文法,且需要在INSERT Statements之前聲明
USE Catalog <yourHiveCatalog>。暫不支援Hive和Flink自訂函數。
步驟一:建立Hive Catalog
配置Hive中繼資料,詳情請參見配置Hive中繼資料。
建立Hive Catalog,詳情請參見建立Hive Catalog。
本樣本建立的Hive Catalog命名為hdfshive。
步驟二:準備Hive樣本資料表
在頁面,單擊
建立,建立查詢指令碼。執行如下SQL樣本。
重要Hive源表和結果表必須使用
CREATE TABLE建立永久表,而不是CREATE TEMPORARY TABLE建立暫存資料表。-- 使用Hive Catalog,其中hdfshive為步驟一建立的樣本名稱 USE CATALOG hdfshive; -- 來源資料表 CREATE TABLE source_table ( id INT, name STRING, age INT, city STRING, salary FLOAT )WITH ('connector' = 'hive'); -- 結果資料表 CREATE TABLE target_table ( city STRING, avg_salary FLOAT, user_count INT )WITH ('connector' = 'hive'); -- 寫入測試資料 INSERT INTO source_table VALUES (1, 'Alice', 25, 'New York', 5000.0), (2, 'Bob', 30, 'San Francisco', 6000.0), (3, 'Charlie', 35, 'New York', 7000.0), (4, 'David', 40, 'San Francisco', 8000.0), (5, 'Eva', 45, 'Los Angeles', 9000.0);
步驟三:建立Hive SQL作業
在左側導覽列,單擊。
單擊建立後,在新增作業草稿對話方塊,選擇空白的批作業草稿(BETA),單擊下一步。
填寫作業資訊。
作業參數
說明
樣本
檔案名稱
作業的名稱。
說明作業名稱在當前專案中必須保持唯一。
hive-sql
儲存位置
指定該作業的代碼檔案所屬的檔案夾。
您還可以在現有檔案夾右側,單擊
表徵圖,建立子檔案夾。作業草稿
引擎版本
當前作業使用的Flink引擎版本。
vvr-8.0.11-flink-1.17
SQL方言
SQL資料處理語言。
說明僅支援Hive方言的引擎版本才會顯示該配置項。
Hive SQL
單擊建立。
步驟四:編寫Hive SQL作業並部署
編寫SQL語句。
本樣本計算各城市年齡大於30的使用者數量和平均工資,您可複製如下SQL樣本到SQL編輯地區。
-- 使用Hive Catalog,其中hdfshive為步驟一建立的樣本名稱 USE CATALOG hdfshive; INSERT INTO TABLE target_table SELECT city, AVG(salary) AS avg_salary, -- 計算平均工資 COUNT(id) AS user_count -- 計算使用者數量 FROM source_table WHERE age > 30 -- 篩選年齡大於 30 的使用者 GROUP BY city; -- 按城市分組單擊右上方部署,在對話方塊中根據需要配置相關參數(本樣本保持預設),單擊確定。
(可選)步驟五:配置作業運行參數
如果您通過JindoSDK訪問Hive叢集,則需要執行該步驟。
在左側導覽列,單擊。
在下拉框中選擇批作業,單擊目標作業的詳情。

在部署詳情對話方塊中,單擊運行參數配置地區右側的編輯。
在其他配置中,增加如下配置資訊。
fs.oss.jindo.endpoint: <YOUR_Endpoint> fs.oss.jindo.buckets: <YOUR_Buckets> fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId> fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>參數說明的詳細資料請參見寫OSS-HDFS。
單擊儲存。
步驟六:啟動作業並查看結果
單擊目標作業的啟動。

作業狀態變為已完成後,查看計算結果。
在頁面,執行以下SQL樣本,查看各城市年齡超過30歲的使用者數量及其平均工資的資料結果。
-- 使用Hive Catalog,其中hdfshive為步驟一建立的樣本名稱 USE CATALOG hdfshive; select * from target_table;
Hive Jar作業開發
當前VVP支援運行Hive方言作業通過Jar作業的形式運行。為了支援Jar作業運行,需要使用11.2版本及以上的"ververica-connector-hive-2.3.6" jar包,同時Jar作業中和VVP配置需要同步Hive相關配置。
VVP配置。
上傳Jar作業Jar包在JAR Uri中。
附加依賴檔案中上傳hive叢集中的四個配置:core-site.xml, mapred-site.xml, hdfs-site.xml和hive-site.xml檔案。並上傳"ververica-connector-hive-2.3.6" jar包。
運行參數配置。根據hive叢集配置,若需寫入oss-hdfs,可參照(可選)步驟五:配置作業運行參數的配置。
table.sql-dialect: HIVE classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime kubernetes.application-mode.classpath.include-user-jar: true
Jar作業寫法樣本。
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Configuration conf = new Configuration(); conf.setString("type", "hive"); conf.setString("default-database", "default"); conf.setString("hive-version", "2.3.6"); conf.setString("hive-conf-dir", "/flink/usrlib/" ); conf.setString("hadoop-conf-dir", "/flink/usrlib/"); CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf); tableEnv.createCatalog("hivecat", descriptor); tableEnv.loadModule("hive", new HiveModule()); tableEnv.useModules("hive"); tableEnv.useCatalog("hivecat"); tableEnv.executeSql("insert into `hivecat`.`default`.`test_write` select * from `hivecat`.`default`.`test_read`;");
相關文檔
Hive方言的INSERT文法,詳情請參見INSERT Statements | Apache Flink。
使用Flink SQL資料批處理,詳情請參見Flink批處理快速入門。