All Products
Search
Document Center

E-MapReduce:Integrate Flink Table Store with Flink

Last Updated:Jun 02, 2023

E-MapReduce (EMR) Flink Table Store allows you to use the Flink SQL client to write data to and read data from Flink Table Store. This topic describes how to use the Flink SQL client to write data to and read data from Flink Table Store.

Limits

Only clusters of EMR V3.45.0 version and clusters of EMR V5.11.0 version allow you to use the Flink SQL client to write data to and read data from Flink Table Store.

Procedure

Step 1: Start an EMR cluster

In this example, an EMR cluster in session mode is used. For more information about other modes, see Basic usage.

Run the following command to start a YARN session:

yarn-session.sh --detached

Step 2: Create a catalog

Flink Table Store stores data and metadata in a file system or in Object Storage Service (OSS). The root path is specified by the warehouse parameter that is configured when you create a catalog. If the specified root path does not exist, a root path is automatically created. If the specified root path exists, you can use the created catalog to access the existing tables in the path.

You can synchronize metadata to Hive or Data Lake Formation (DLF). This way, other services can access data of Flink Table Store by using DLF.

  • Create a file system catalog.

    A file system catalog stores metadata in the file system or OSS.

    1. Run the following command to start the Flink SQL client:

      sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/
    2. Execute the following Flink SQL statement to create a file system catalog:

      CREATE CATALOG test_catalog WITH (
          'type' = 'table-store',
          'metastore' = 'filesystem',
          'warehouse' = 'oss://oss-bucket/warehouse'
      );
  • Create a Hive catalog.

    A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog.

    For information about how to query data of Flink Table Store in Hive, see Integrate Flink Table Store with Hive.

    1. Run the following command to start the Flink SQL client:

      sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/ -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/catalog/hive2/ -l /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/
    2. Execute the following Flink SQL statement to create a Hive catalog:

      CREATE CATALOG test_catalog WITH (
          'type' = 'table-store',
          'metastore' = 'hive',
          'uri' = 'thrift://master-1-1:9083', -- The address of Hive Metastore. 
          'warehouse' = 'oss://oss-bucket/warehouse'
      );
  • Create a DLF catalog.

    A DLF catalog can synchronize metadata to DLF.

    Important

    When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.

    1. Run the following command to start the Flink SQL client:

      sql-client.sh -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/flink/ -l /opt/apps/FLINK-TABLE-STORE/flink-table-store-current/lib/catalog/dlf/
    2. Execute the following Flink SQL statement to create a DLF catalog:

      CREATE CATALOG test_catalog WITH (
          'type' = 'table-store',
          'metastore' = 'dlf',
          'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
          'warehouse' = 'oss://oss-bucket/warehouse'
      );

Step 3: Write data to and read data from Flink Table Store in streaming mode

Execute the following Flink SQL statements to create a Flink Table Store table in the created catalog and write data to and read data from the table:

-- Set the execution.runtime-mode parameter to streaming. 
SET 'execution.runtime-mode' = 'streaming';

-- Specify a checkpoint interval for Flink Table Store. 
SET 'execution.checkpointing.interval' = '10s';

-- Use the catalog that is created in the previous step. 
USE CATALOG test_catalog;

-- Create a test database and use the database. 
CREATE DATABASE test_db;
USE test_db;

-- Create a Datagen source table that generates random data. 
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- Create a Flink Table Store table. 
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- Write data to the Flink Table Store table. 
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- Read data from the Flink Table Store table. 
-- The write operation is in progress when the read operation is performed. 
-- Make sure that the cluster has sufficient resources (task slots) to perform the write and read operations at the same time. Otherwise, data fails to be read. 
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 4: Perform an OLAP query on Flink Table Store

Execute the following Flink SQL statements to perform an online analytical processing (OLAP) query on the Flink Table Store table:

-- Set the execution.runtime-mode parameter to batch. 
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- Use the tableau mode to display the query result on the command line. 
SET 'sql-client.execution.result-mode' = 'tableau';

-- Query data from the Flink Table Store table. 
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 5: Clear resources

Important

After the test is complete, stop the write operation to prevent resource leak.

Execute the following Flink SQL statement to drop the Flink Table Store table:

DROP TABLE test_tbl;