全部產品
Search
文件中心

E-MapReduce:Paimon與Flink整合

更新時間:Nov 07, 2024

E-MapReduce支援通過Flink SQL對Paimon進行讀寫操作。本文通過樣本為您介紹如何通過Flink SQL對Paimon進行讀寫操作。

前提條件

已建立選擇了Flink和Paimon的DataFlow或Custom類型的叢集,建立叢集詳情請參見建立叢集

說明

如果您需要使用Hive Catalog的方式,則只能建立選擇了Flink、Paimon和Hive的Custom類型的叢集,且中繼資料類型僅可選擇自建RDS內建MySQL

使用限制

  • EMR-3.46.0版本,暫不支援DLF Catalog和Hive Catalog的方式。

  • EMR-3.46.0至EMR-3.50.X版本、EMR-5.12.0至EMR-5.16.X版本的叢集,支援使用Flink SQL對Paimon進行讀寫操作。

    說明

    EMR-3.51.X及其後續版本、EMR-5.17.X及其後續版本,建議您參考Paimon社區文檔,在EMR叢集中自行配置。

操作步驟

步驟一:配置依賴

本文介紹了通過Flink SQL對Paimon進行讀寫操作的三種方法:Filesystem Catalog、Hive Catalog和DLF Catalog。每種方法針對不同的應用情境和環境需求,請根據所選方法配置相應的依賴。

Filesystem Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/

Hive Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

建立DLF Catalog

cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/

步驟二:啟動叢集

本文以Session模式為例,其餘模式請參見基礎使用

執行以下命令,啟動YARN Session。

yarn-session.sh --detached

步驟三:建立Catalog

Paimon將資料和中繼資料都儲存在檔案系統(例如,HDFS)或Object Storage Service(例如,OSS-HDFS)中,儲存的根路徑由warehouse參數指定。如果指定的warehouse路徑不存在,將會自動建立該路徑;如果指定的warehouse路徑存在,則可以通過該Catalog訪問路徑中已有的表。

您還可以將中繼資料額外同步到Hive或DLF中,方便其他服務訪問Paimon。

說明

EMR-3.46.0和EMR-5.17.0版本,暫不支援DLF Catalog和Hive Catalog的方式。

建立Filesystem Catalog

Filesystem Catalog僅將中繼資料儲存在檔案系統或Object Storage Service中。

  1. 執行以下命令,啟動Flink SQL。

    sql-client.sh
  2. 執行以下Flink SQL語句,建立Filesystem Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'filesystem',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

建立Hive Catalog

Hive Catalog會同步中繼資料到Hive MetaStore中。在Hive Catalog中建立的表可以直接在Hive中查詢。

Hive查詢Paimon,詳情請參見Paimon與Hive整合

  1. 執行以下命令,啟動Flink SQL。

    sql-client.sh
    說明

    即使您使用的是Hive3,也無需修改啟動命令。

  2. 執行以下Flink SQL語句,建立Hive Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'hive',
        'uri' = 'thrift://master-1-1:9083', -- uri參數指向Hive metastore service的地址。
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

建立DLF Catalog

DLF Catalog會將中繼資料同步到DLF中。

重要

建立叢集時,中繼資料必須為DLF統一中繼資料

  1. 執行以下命令,啟動Flink SQL。

    sql-client.sh
    說明

    即使您使用的是Hive3,也無需修改啟動命令。

  2. 執行以下Flink SQL語句,建立DLF Catalog。

    CREATE CATALOG test_catalog WITH (
        'type' = 'paimon',
        'metastore' = 'dlf',
        'hive-conf-dir' = '/etc/taihao-apps/flink-conf',
        'warehouse' = 'oss://<yourBucketName>/warehouse'
    );

步驟四 :流作業讀寫Paimon

執行以下Flink SQL語句,在Catalog中建立一張表,並讀寫表中的資料。

-- 設定為流作業。
SET 'execution.runtime-mode' = 'streaming';

-- Paimon在流作業中需要設定checkpoint。
SET 'execution.checkpointing.interval' = '10s';

-- 使用之前建立的catalog。
USE CATALOG test_catalog;

-- 建立並使用一個測試database。
CREATE DATABASE test_db;
USE test_db;

-- 用datagen產生隨機資料。
CREATE TEMPORARY TABLE datagen_source (
    uuid int,
    kind int,
    price int
) WITH (
    'connector' = 'datagen',
    'fields.kind.min' = '0',
    'fields.kind.max' = '9',
    'rows-per-second' = '10'
);

-- 建立Paimon表。
CREATE TABLE test_tbl (
    uuid int,
    kind int,
    price int,
    PRIMARY KEY (uuid) NOT ENFORCED
);

-- 向Paimon中寫入資料。
INSERT INTO test_tbl SELECT * FROM datagen_source;

-- 讀取表中的資料。
-- 流式查詢作業啟動並執行過程中,上面觸發的流式寫入作業仍在運行。
-- 您需要保證Flink叢集有足夠的資源(task slot)同時運行兩個作業,否則無法查到資料。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步驟五:OLAP查詢Paimon

執行以下Flink SQL語句,對剛才建立的表進行OLAP查詢。

-- 設定為批作業。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';

-- 使用tableau展示模式,在命令列中直接打出結果。
SET 'sql-client.execution.result-mode' = 'tableau';

-- 對錶中資料進行查詢。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;

步驟六:清理資源

重要

完成測試後,請手動停止流式寫入Paimon的作業,防止資源流失。

停止作業後,執行以下Flink SQL語句,刪除剛才建立的表。

DROP TABLE test_tbl;