All Products
Search
Document Center

E-MapReduce:Integrate Flink Table Store with Hive

Last Updated:Jun 02, 2023

E-MapReduce (EMR) allows you to query data of Flink Table Store in Hive. This topic describes how to query data of Flink Table Store in Hive.

Limits

Only clusters of EMR V3.45.0 version and clusters of EMR V5.11.0 version allow you to query data of Flink Table Store in Hive.

Procedure

  1. Query data from tables that are created in a Hive catalog and a Data Lake Formation (DLF) catalog.

    After you synchronize metadata to Hive Metastore by using a Hive catalog, you can query data from the tables in the Hive catalog. If you select DLF Unified Metadata for Metadata when you create an EMR cluster, you can use a DLF catalog to synchronize metadata from other services to DLF and query the metadata in Hive.

    The following example shows how to use Spark to write data to a Hive catalog and query data of Flink Table Store in Hive.

    1. Run the following command to start Spark SQL:

      spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=hive --conf spark.sql.catalog.tablestore.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
    2. Execute the following Spark SQL statements to create a Flink Table Store table in the created catalog and write data to the table:

      -- Create a test database in the created catalog and use the database. 
      CREATE DATABASE tablestore.test_db;
      USE tablestore.test_db;
      
      -- Create a Flink Table Store table. 
      CREATE TABLE test_tbl (
          uuid int,
          name string,
          price double
      ) TBLPROPERTIES (
          'primary-key' = 'uuid'
      );
      
      -- Write data to the Flink Table Store table. 
      INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
    3. Run the following command to start the Hive CLI:

      hive
    4. Execute the following Hive SQL statement to query the data that is written to the Flink Table Store table:

      select * from test_db.test_tbl;
  2. Create an external table and query data from the external table.

    Hive allows you to create an external table that maps to the Flink Table Store table in a specific path and query data from the external table. Sample code:

    CREATE EXTERNAL TABLE test_ext_tbl
    STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
    LOCATION 'oss://oss-bucket/warehouse/test_db.db/test_tbl';
    
    SELECT * FROM test_ext_tbl;