E-MapReduce支援通過Flink SQL對Paimon進行讀寫操作。本文通過樣本為您介紹如何通過Flink SQL對Paimon進行讀寫操作。
前提條件
已建立選擇了Flink和Paimon的DataFlow或Custom類型的叢集,建立叢集詳情請參見建立叢集。
如果您需要使用Hive Catalog的方式,則只能建立選擇了Flink、Paimon和Hive的Custom類型的叢集,且中繼資料類型僅可選擇自建RDS或內建MySQL。
使用限制
EMR-3.46.0版本,暫不支援DLF Catalog和Hive Catalog的方式。
EMR-3.46.0至EMR-3.50.X版本、EMR-5.12.0至EMR-5.16.X版本的叢集,支援使用Flink SQL對Paimon進行讀寫操作。
說明EMR-3.51.X及其後續版本、EMR-5.17.X及其後續版本,建議您參考Paimon社區文檔,在EMR叢集中自行配置。
操作步驟
步驟一:配置依賴
本文介紹了通過Flink SQL對Paimon進行讀寫操作的三種方法:Filesystem Catalog、Hive Catalog和DLF Catalog。每種方法針對不同的應用情境和環境需求,請根據所選方法配置相應的依賴。
Filesystem Catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/Hive Catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/建立DLF Catalog
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/步驟二:啟動叢集
本文以Session模式為例,其餘模式請參見基礎使用。
執行以下命令,啟動YARN Session。
yarn-session.sh --detached步驟三:建立Catalog
Paimon將資料和中繼資料都儲存在檔案系統(例如,HDFS)或Object Storage Service(例如,OSS-HDFS)中,儲存的根路徑由warehouse參數指定。如果指定的warehouse路徑不存在,將會自動建立該路徑;如果指定的warehouse路徑存在,則可以通過該Catalog訪問路徑中已有的表。
您還可以將中繼資料額外同步到Hive或DLF中,方便其他服務訪問Paimon。
EMR-3.46.0和EMR-5.17.0版本,暫不支援DLF Catalog和Hive Catalog的方式。
建立Filesystem Catalog
Filesystem Catalog僅將中繼資料儲存在檔案系統或Object Storage Service中。
執行以下命令,啟動Flink SQL。
sql-client.sh執行以下Flink SQL語句,建立Filesystem Catalog。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
建立Hive Catalog
Hive Catalog會同步中繼資料到Hive MetaStore中。在Hive Catalog中建立的表可以直接在Hive中查詢。
Hive查詢Paimon,詳情請參見Paimon與Hive整合。
執行以下命令,啟動Flink SQL。
sql-client.sh說明即使您使用的是Hive3,也無需修改啟動命令。
執行以下Flink SQL語句,建立Hive Catalog。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://master-1-1:9083', -- uri參數指向Hive metastore service的地址。 'warehouse' = 'oss://<yourBucketName>/warehouse' );
建立DLF Catalog
DLF Catalog會將中繼資料同步到DLF中。
建立叢集時,中繼資料必須為DLF統一中繼資料。
執行以下命令,啟動Flink SQL。
sql-client.sh說明即使您使用的是Hive3,也無需修改啟動命令。
執行以下Flink SQL語句,建立DLF Catalog。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'hive-conf-dir' = '/etc/taihao-apps/flink-conf', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
步驟四 :流作業讀寫Paimon
執行以下Flink SQL語句,在Catalog中建立一張表,並讀寫表中的資料。
-- 設定為流作業。
SET 'execution.runtime-mode' = 'streaming';
-- Paimon在流作業中需要設定checkpoint。
SET 'execution.checkpointing.interval' = '10s';
-- 使用之前建立的catalog。
USE CATALOG test_catalog;
-- 建立並使用一個測試database。
CREATE DATABASE test_db;
USE test_db;
-- 用datagen產生隨機資料。
CREATE TEMPORARY TABLE datagen_source (
uuid int,
kind int,
price int
) WITH (
'connector' = 'datagen',
'fields.kind.min' = '0',
'fields.kind.max' = '9',
'rows-per-second' = '10'
);
-- 建立Paimon表。
CREATE TABLE test_tbl (
uuid int,
kind int,
price int,
PRIMARY KEY (uuid) NOT ENFORCED
);
-- 向Paimon中寫入資料。
INSERT INTO test_tbl SELECT * FROM datagen_source;
-- 讀取表中的資料。
-- 流式查詢作業啟動並執行過程中,上面觸發的流式寫入作業仍在運行。
-- 您需要保證Flink叢集有足夠的資源(task slot)同時運行兩個作業,否則無法查到資料。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;步驟五:OLAP查詢Paimon
執行以下Flink SQL語句,對剛才建立的表進行OLAP查詢。
-- 設定為批作業。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- 使用tableau展示模式,在命令列中直接打出結果。
SET 'sql-client.execution.result-mode' = 'tableau';
-- 對錶中資料進行查詢。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;步驟六:清理資源
完成測試後,請手動停止流式寫入Paimon的作業,防止資源流失。
停止作業後,執行以下Flink SQL語句,刪除剛才建立的表。
DROP TABLE test_tbl;