All Products
Search
Document Center

E-MapReduce:Integrate Flink Table Store with Spark

Last Updated:Jun 02, 2023

E-MapReduce (EMR) Flink Table Store allows you to use the Spark SQL client to write data to and read data from Flink Table Store. This topic describes how to use the Spark SQL client to write data to and read data from Flink Table Store.

Limits

  • Only clusters of EMR V3.45.0 version and clusters of EMR V5.11.0 version allow you to use the Spark SQL client to write data to and read data from Flink Table Store and use the Spark CLI to read data from Flink Table Store.

  • Only Spark SQL of Spark 3 allows you to use catalogs to write data to and read data from Flink Table Store.

  • The Spark CLI allows you to read data from Flink Table Store only by using the path of a file system or OSS.

Procedure

Step 1: Create a catalog

Flink Table Store stores data and metadata in a file system or in Object Storage Service (OSS). The root path is specified by the spark.sql.catalog.tablestore.warehouse parameter that is configured when you create a catalog. If the specified root path does not exist, a root path is automatically created. If the specified root path exists, you can use the created catalog to access the existing tables in the path.

You can synchronize metadata to Hive or Data Lake Formation (DLF). This way, other services can access data of Flink Table Store by using DLF.

  • Create a file system catalog.

    A file system catalog stores metadata in the file system or OSS.

    spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=filesystem --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
  • Create a Hive catalog.

    A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog.

    For information about how to query data of Flink Table Store in Hive, see Integrate Flink Table Store with Hive.

    spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=hive --conf spark.sql.catalog.tablestore.uri=thrift://master-1-1:9083 --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
    Note

    spark.sql.catalog.tablestore.uri specifies the address of Hive Metastore.

  • Create a DLF catalog.

    A DLF catalog can synchronize metadata to DLF.

    Important

    When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.

    spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=dlf --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse

Step 2: Use the Spark SQL client to write data to and read data from Flink Table Store

Execute the following Spark SQL statements to create a Flink Table Store table in the created catalog and write data to and read data from the table:

-- Create a test database in the created catalog and use the database. 
CREATE DATABASE tablestore.test_db;
USE tablestore.test_db;

-- Create a Flink Table Store table. 
CREATE TABLE test_tbl (
    uuid int,
    name string,
    price double
) TBLPROPERTIES (
    'primary-key' = 'uuid'
);

-- Write data to the Flink Table Store table. 
INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);

-- Read data from the Flink Table Store table. 
SELECT * FROM test_tbl;

Step 3: Use the Spark CLI to read data from Flink Table Store

  1. Run the following command to start the Spark CLI:

    spark-shell
  2. Run the following code in Scala on the Spark CLI to query data from the Flink Table Store table that is stored in the specified directory:

    val dataset = spark.read.format("tablestore").load("oss://oss-bucket/warehouse/test_db.db/test_tbl")
    dataset.createOrReplaceTempView("test_tbl")
    spark.sql("SELECT * FROM test_tbl").show()