All Products
Search
Document Center

E-MapReduce:Integrate Paimon with Flink

Last Updated:Jun 26, 2023

E-MapReduce (EMR) allows you to use the Flink SQL client to read data from and write data to Paimon. This topic describes how to use the Flink SQL client to read data from and write data to Paimon.

Limits

Only clusters of EMR V3.46.0 or a later minor version, or EMR V5.12.0 or a later minor version allow you to use the Flink SQL client to read data from and write data to Paimon.

Procedure

Step 1: Start an EMR cluster

In this example, an EMR cluster in session mode is used. For more information about other modes, see Basic usage.

Run the following command to start a YARN session:

yarn-session.sh --detached

Step 2: Create a catalog

Paimon stores data and metadata in a file system such as Hadoop Distributed File System (HDFS) or an object storage system such as Object Storage Service (OSS). The root path for storage is specified by the warehouse parameter. If the specified root path does not exist, a root path is automatically created. If the specified root path exists, you can use the created catalog to access existing tables in the path.

You can synchronize metadata to Hive or Data Lake Formation (DLF). This way, other services can access data of Paimon by using Hive or DLF.

Create a file system catalog

A file system catalog stores metadata in a file system or an object storage system.

  1. Run the following command to start the Flink SQL client:

    sql-client.sh -l /opt/apps/PAIMON/paimon-current/lib/flink/
  2. Execute the following Flink SQL statement to create a file system catalog:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Create a Hive catalog

A Hive catalog can synchronize metadata to Hive Metastore. Hive allows you to query data in tables that are created in a Hive catalog.

For information about how to query data of Paimon in Hive, see Integrate Paimon with Hive.

  1. Run the following command to start the Flink SQL client:

    sql-client.sh -l /opt/apps/PAIMON/paimon-current/lib/flink/ -l /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/
    Note

    Even if you are using Hive 3, you do not need to modify the startup command.

  2. Execute the following Flink SQL statement to create a Hive catalog:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'hive',
        'uri' = 'thrift://master-1-1:9083', -- The address of Hive Metastore. 
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Create a DLF catalog

A DLF catalog can synchronize metadata to DLF.

Important

When you create an EMR cluster, you must select DLF Unified Metadata for Metadata.

  1. Run the following command to start the Flink SQL client:

    sql-client.sh -l /opt/apps/PAIMON/paimon-current/lib/flink/ -l /opt/apps/PAIMON/paimon-current/lib/jackson/ -l /opt/apps/METASTORE/metastore-current/hive2/ -l /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/
    Note

    Even if you are using Hive 3, you do not need to modify the startup command.

  2. Execute the following Flink SQL statement to create a DLF catalog:

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

Step 3: Read data from and write data to Paimon in streaming mode

Execute the following Flink SQL statements to create a Paimon table in the created catalog and read data from and write data to the table:

-- Set the execution.runtime-mode parameter to streaming. 
SET 'execution.runtime-mode' = 'streaming';

-- Specify a checkpoint interval for Paimon. 
SET 'execution.checkpointing.interval' = '10s';

-- Use the catalog that is created in the previous step. 
USE CATALOG test_catalog;

-- Create a test database and use the database. 
CREATE DATABASE test_db;
USE test_db;

-- Create a Datagen source table that generates random data. 
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- Create a Paimon table. 
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- Write data to the Paimon table. 
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- Read data from the Paimon table. 
-- The write operation is in progress when the read operation is performed. 
-- Make sure that the cluster has sufficient resources (task slots) to perform the write and read operations at the same time. Otherwise, data fails to be read. 
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 4: Perform an OLAP query on Paimon

Execute the following Flink SQL statements to perform an online analytical processing (OLAP) query on the Paimon table:

-- Set the execution.runtime-mode parameter to batch. 
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- Use the tableau mode to display the query result on the CLI. 
SET 'sql-client.execution.result-mode' = 'tableau';

-- Query data from the Paimon table. 
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

Step 5: Clear resources

Important

After the test is complete, stop the write operation to prevent resource leak.

Execute the following Flink SQL statement to drop the Paimon table:

DROP TABLE test_tbl;