Realtime Compute for Apache Flink supports the Hive dialect for batch jobs, enhancing interoperability with Hive SQL syntax and facilitating the migration of existing Hive jobs to the Flink console. The Hive Catalog connects Flink to your Hive Metastore, making your existing tables and metadata immediately available. This guide walks you through creating a Hive Catalog, setting up sample tables, and running your first Hive SQL batch job.
Prerequisites
Before you begin, ensure that you have:
-
The required permissions for the Flink console. If you access the console as a Resource Access Management (RAM) user or RAM role, see Permission management.
-
A Flink workspace. To create one, see Activate Realtime Compute for Apache Flink.
Limitations
-
VVR version: The Hive dialect requires Ververica Runtime (VVR) 8.0.11 or later. Jobs created on earlier versions will not recognize the Hive SQL dialect option.
-
Supported syntax: Only the
INSERTstatement syntax of the Hive dialect is supported. You must includeUSE CATALOG <yourHiveCatalog>before anyINSERTstatement; otherwise, the job falls back to Flink's default dialect and Hive tables will not be resolved. -
UDF support: User-defined functions (UDFs) from both Hive and Flink are not supported in Hive dialect jobs.
Step 1: Create a Hive Catalog
A Hive Catalog connects Flink to your Hive Metastore, giving Flink jobs access to your existing Hive tables and metadata.
-
Configure Hive metadata. For more information, see Configure Hive metadata.
-
Create a Hive Catalog. For more information, see Create a Hive Catalog.
In this example, the Hive Catalog is named hdfshive.
Step 2: Prepare sample Hive data tables
-
Go to Data Studio > Data Query and click
New to create a query script. -
Run the following SQL statements to create source and sink tables and insert test data.
The source and sink tables must be permanent tables created with CREATE TABLE. Temporary tables created with CREATE TEMPORARY TABLE are not supported.
-- Switch to the Hive Catalog created in Step 1
USE CATALOG hdfshive;
-- Source table: stores user records
CREATE TABLE source_table (
id INT,
name STRING,
age INT,
city STRING,
salary FLOAT
) WITH ('connector' = 'hive');
-- Sink table: stores aggregated results by city
CREATE TABLE target_table (
city STRING,
avg_salary FLOAT,
user_count INT
) WITH ('connector' = 'hive');
-- Insert test data into the source table
INSERT INTO source_table VALUES
(1, 'Alice', 25, 'New York', 5000.0),
(2, 'Bob', 30, 'San Francisco', 6000.0),
(3, 'Charlie', 35, 'New York', 7000.0),
(4, 'David', 40, 'San Francisco', 8000.0),
(5, 'Eva', 45, 'Los Angeles', 9000.0);
Step 3: Create a Hive SQL job
-
In the left navigation pane, choose Data Studio > ETL.
-
Click New. In the Create Job Draft dialog box, select Blank Batch Job Draft (BETA) and click Next.
-
Fill in the job details.
Parameter Description Example File name The job name. Must be unique within the project. hive-sqlStorage location The folder where the job's code file is stored. Click the
icon next to an existing folder to create a subfolder.Job DraftsEngine version The Flink engine version. Select a version labeled Recommended for better reliability and performance. See Feature release notes and Engine versions. vvr-8.0.11-flink-1.17SQL dialect The SQL dialect for data processing. This field appears only for engine versions that support the Hive dialect. Hive SQL -
Click Create.
Step 4: Write and deploy the Hive SQL job
-
Copy the following SQL into the editor. This query counts users older than 30 and calculates their average salary, grouped by city.
-- Switch to the Hive Catalog created in Step 1 USE CATALOG hdfshive; INSERT INTO TABLE target_table SELECT city, AVG(salary) AS avg_salary, -- Average salary for users older than 30 COUNT(id) AS user_count -- Number of users older than 30 FROM source_table WHERE age > 30 -- Filter: users older than 30 only GROUP BY city; -- Group results by city -
In the upper-right corner, click Deploy. Configure the parameters as needed in the dialog box and click OK. This example uses the default settings.
(Optional) Step 5: Configure job running parameters
Perform this step only if you access the Hive cluster using JindoSDK.
-
In the left navigation pane, choose Operation Center > Job O&M.
-
From the drop-down list, select Batch Job and click Details for the target job.

-
In the Deployment Details dialog box, click Edit to the right of Runtime Parameter Settings.
-
In Other Configurations, add the following parameters. Replace each placeholder with the actual value for your environment.
fs.oss.jindo.endpoint: <YOUR_Endpoint> fs.oss.jindo.buckets: <YOUR_Buckets> fs.oss.jindo.accessKeyId: <YOUR_AccessKeyId> fs.oss.jindo.accessKeySecret: <YOUR_AccessKeySecret>For parameter descriptions, see Write data to OSS-HDFS.
-
Click Save.
Step 6: Start the job and view the results
-
Click Start for the target job.

-
After the job status changes to Finished, run the following query on the Data Studio > Data Query page to verify the results.
-- Switch to the Hive Catalog created in Step 1 USE CATALOG hdfshive; SELECT * FROM target_table;The output shows the number of users older than 30 and their average salary for each city.

Hive JAR job development
VVR also supports running Hive dialect jobs as JAR jobs. JAR jobs require the ververica-connector-hive-2.3.6 connector at version 11.2 or later, and the Hive configurations in the JAR job must match the VVR configuration.
VVP configuration
-
Upload your JAR package in the JAR Uri field.
-
In the Additional Dependencies section, upload the four configuration files from your Hive cluster:
core-site.xml,mapred-site.xml,hdfs-site.xml, andhive-site.xml. Also upload the ververica-connector-hive-2.3.6 JAR package. -
Add the following running parameters. To write data to OSS-HDFS, also include the JindoSDK parameters from (Optional) Step 5.
table.sql-dialect: HIVE classloader.parent-first-patterns.additional: org.apache.hadoop;org.antlr.runtime kubernetes.application-mode.classpath.include-user-jar: true
JAR job code example
The following example sets up a StreamTableEnvironment, registers a Hive Catalog, and inserts data from one table into another using the Hive dialect.
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration conf = new Configuration();
conf.setString("type", "hive");
conf.setString("default-database", "default");
conf.setString("hive-version", "2.3.6");
conf.setString("hive-conf-dir", "/flink/usrlib/");
conf.setString("hadoop-conf-dir", "/flink/usrlib/");
CatalogDescriptor descriptor = CatalogDescriptor.of("hivecat", conf);
tableEnv.createCatalog("hivecat", descriptor);
tableEnv.loadModule("hive", new HiveModule());
tableEnv.useModules("hive");
tableEnv.useCatalog("hivecat");
tableEnv.executeSql(
"INSERT INTO `hivecat`.`default`.`test_write` " +
"SELECT * FROM `hivecat`.`default`.`test_read`;"
);
What's next
-
For the full
INSERTsyntax supported by the Hive dialect, see INSERT statements in the Apache Flink documentation. -
To learn about batch data processing with Flink SQL, see Quick start for Flink batch jobs.