All Products
Search
Document Center

E-MapReduce:Integrate Flink Table Store with Trino

Last Updated:Jun 26, 2023

E-MapReduce (EMR) allows you to query data of Flink Table Store in Trino. This topic describes how to query data of Flink Table Store in Trino.

Limits

Only clusters of EMR V3.45.0 version and clusters of EMR V5.11.0 version allow you to query data of Flink Table Store in Trino.

Procedure

  1. Modify the warehouse parameter.

    Flink Table Store stores data and metadata in the file system or Object Storage Service (OSS). The root path is specified by the warehouse parameter. You can change the root path based on your business requirements.

    1. Go to the Configure tab of the Trino service page.

      1. Log on to the EMR on ECS console.

      2. In the top navigation bar, select the region where your cluster resides and select a resource group based on your business requirements.

      3. On the EMR on ECS page, find the cluster that you want to manage and click Services in the Actions column.

      4. On the Services tab, find the Trino service and click Configure.

    2. On the Configure tab, click the tablestore.properties tab.

    3. Modify the warehouse parameter.

    4. Save the configuration.

      1. Click Save.

      2. In the dialog box that appears, configure the Execution Reason parameter, turn on Automatically Update Configurations, and then click Save.

  2. Optional:Modify the metastore parameter.

    The type of Metastore used by Trino is automatically specified based on the services that you selected when you create the cluster. If you want to change the type of Metastore used by Trino, you can change the value of the metastore parameter on the tablestore.properties tab of the Configure tab on the Trino service page.

    Valid values of the metastore parameter for Flink Table Store:

    • filesystem: Metadata is stored in the file system or OSS.

    • hive: Metadata is synchronized to the specified Hive Metastore.

    • dlf: Metadata is synchronized to Data Lake Formation (DLF).

  3. Restart the Trino service.

    1. In the upper-right corner of the Configure tab on the Trino service page, choose More > Restart.

    2. In the dialog box that appears, configure the Execution Reason parameter and click OK.

    3. In the Confirm message, click OK.

  4. Query data of Flink Table Store.

    The following example shows how to use Spark to write data to a catalog and query data of Flink Table Store in Trino.

    1. Run the following command to start Spark SQL:

      spark-sql --conf spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog --conf spark.sql.catalog.tablestore.metastore=filesystem --conf spark.sql.catalog.tablestore.warehouse=oss://oss-bucket/warehouse
    2. Execute the following Spark SQL statements to create a Flink Table Store table in the created catalog and write data to the table:

      -- Create a test database in the created catalog and use the database. 
      CREATE DATABASE tablestore.test_db;
      USE tablestore.test_db;
      
      -- Create a Flink Table Store table. 
      CREATE TABLE test_tbl (
          uuid int,
          name string,
          price double
      ) TBLPROPERTIES (
          'primary-key' = 'uuid'
      );
      
      -- Write data to the Flink Table Store table. 
      INSERT INTO test_tbl VALUES (1, 'apple', 3.5), (2, 'banana', 4.0), (3, 'cherry', 20.5);
    3. Run the following command to start Trino:

      trino --server master-1-1:9090 --catalog tablestore --schema default --user hadoop
    4. Execute the following statements to query the data that is written to the Flink Table Store table:

      USE test_db;
      
      SELECT * FROM test_tbl;