E-MapReduce (EMR) では、Flink SQL クライアントを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりできます。このトピックでは、Flink SQL クライアントを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりする手順について説明します。
前提条件
Flink サービスと Paimon サービスを含む Dataflow クラスターまたはカスタム クラスターが作成されていること。詳細については、「クラスターの作成」をご参照ください。
Hive カタログを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりする場合は、Flink、Paimon、および Hive サービスを含むカスタム クラスターを作成する必要があります。さらに、メタデータ パラメーターに [セルフマネージド RDS] または [組み込み Mysql] を選択する必要があります。
制限事項
EMR V3.46.0 のクラスターでは、DLF カタログと Hive カタログを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりすることはできません。
EMR V3.46.0 から V3.50.X または V5.12.0 から V5.16.X までのバージョンのクラスターのみ、Flink SQL クライアントを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりできます。
説明EMR V3.51.X 以降のマイナー バージョンのクラスター、または EMR V5.17.X 以降のマイナー バージョンのクラスターでは、ビジネス要件に基づいて依存関係を設定できます。詳細については、「クイックスタート」をご参照ください。
手順
ステップ 1: 依存関係の設定
このトピックでは、Flink SQL クライアントでファイルシステム カタログ、Hive カタログ、および DLF カタログを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりする手順について説明します。シナリオと環境要件に基づいてカタログの種類を選択し、カタログの種類に基づいて依存関係を設定できます。
ファイルシステム カタログ
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/Hive カタログ
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/DLF カタログ
cp /opt/apps/PAIMON/paimon-current/lib/flink/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/PAIMON/paimon-current/lib/jackson/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/METASTORE/metastore-*/hive2/*.jar /opt/apps/FLINK/flink-current/lib/
cp /opt/apps/FLINK/flink-current/opt/catalogs/hive-2.3.6/*.jar /opt/apps/FLINK/flink-current/lib/ステップ 2: EMR クラスターの起動
この例では、セッション モードの EMR クラスターを使用します。他のモードの詳細については、「基本的な使い方」をご参照ください。
次のコマンドを実行して、YARN セッションを開始します。
yarn-session.sh --detachedステップ 3: カタログの作成
Paimon は、Hadoop Distributed File System (HDFS) などのファイルシステム、または OSS-HDFS などのオブジェクトストレージ サービスにデータとメタデータを保存します。ストレージのルートパスは、warehouse パラメーターで指定されます。 指定されたルートパスが存在しない場合は、ルートパスが自動的に作成されます。指定されたルートパスが存在する場合は、作成されたカタログを使用して、パス内の既存のテーブルにアクセスできます。
メタデータを Hive または DLF に同期できます。これにより、他のサービスは Hive または DLF を使用して Paimon のデータにアクセスできます。
EMR V3.46.0 および EMR V5.17.0 のクラスターでは、DLF カタログと Hive カタログを使用して Paimon からデータを読み取ったり、Paimon にデータを書き込んだりすることはできません。
ファイルシステム カタログの作成
ファイルシステム カタログは、ファイルシステムまたはオブジェクトストレージ システムにメタデータを保存します。
次のコマンドを実行して、Flink SQL クライアントを起動します。
sql-client.sh次の Flink SQL ステートメントを実行して、ファイルシステム カタログを作成します。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'filesystem', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
Hive カタログの作成
Hive カタログは、メタデータを Hive Metastore に同期できます。Hive を使用すると、Hive カタログで作成されたテーブルのデータをクエリできます。
Hive で Paimon のデータをクエリする方法については、「Paimon と Hive の統合」をご参照ください。
次のコマンドを実行して、Flink SQL クライアントを起動します。
sql-client.sh説明Hive 3 を使用している場合でも、起動コマンドを変更する必要はありません。
次の Flink SQL ステートメントを実行して、Hive カタログを作成します。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://master-1-1:9083', -- Hive Metastore のアドレス。 'warehouse' = 'oss://<yourBucketName>/warehouse' );
DLF カタログの作成
DLF カタログは、メタデータを DLF に同期できます。
EMR クラスターを作成するときに、メタデータ に [DLF 統合メタデータ] を選択する必要があります。
次のコマンドを実行して、Flink SQL クライアントを起動します。
sql-client.sh説明Hive 3 を使用している場合でも、起動コマンドを変更する必要はありません。
次の Flink SQL ステートメントを実行して、DLF カタログを作成します。
CREATE CATALOG test_catalog WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'hive-conf-dir' = '/etc/taihao-apps/flink-conf', 'warehouse' = 'oss://<yourBucketName>/warehouse' );
ステップ 4: ストリーミング モードで Paimon からデータを読み取り、Paimon にデータを書き込む
次の Flink SQL ステートメントを実行して、作成されたカタログに Paimon テーブルを作成し、テーブルからデータを読み取ったり、テーブルにデータを書き込んだりします。
-- execution.runtime-mode パラメーターを streaming に設定します。
SET 'execution.runtime-mode' = 'streaming';
-- Paimon のチェックポイント間隔を指定します。
SET 'execution.checkpointing.interval' = '10s';
-- 前のステップで作成したカタログを使用します。
USE CATALOG test_catalog;
-- テストデータベースを作成し、データベースを使用します。
CREATE DATABASE test_db;
USE test_db;
-- ランダムデータを生成する Datagen ソーステーブルを作成します。
CREATE TEMPORARY TABLE datagen_source (
uuid int,
kind int,
price int
) WITH (
'connector' = 'datagen',
'fields.kind.min' = '0',
'fields.kind.max' = '9',
'rows-per-second' = '10'
);
-- Paimon テーブルを作成します。
CREATE TABLE test_tbl (
uuid int,
kind int,
price int,
PRIMARY KEY (uuid) NOT ENFORCED
);
-- Paimon テーブルにデータを書き込みます。
INSERT INTO test_tbl SELECT * FROM datagen_source;
-- テーブルからデータを読み取ります。
-- 読み取り操作の実行中に書き込み操作が進行中です。
-- クラスターに書き込み操作と読み取り操作を同時に実行するための十分なリソース (タスクスロット) があることを確認してください。そうでない場合、データの読み取りに失敗します。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;ステップ 5: Paimon で OLAP クエリを実行する
次の Flink SQL ステートメントを実行して、Paimon テーブルでオンライン分析処理 (OLAP) クエリを実行します。
-- execution.runtime-mode パラメーターを batch に設定します。
RESET 'execution.checkpointing.interval';
SET 'execution.runtime-mode' = 'batch';
-- tableau モードを使用して、CLI にクエリ結果を表示します。
SET 'sql-client.execution.result-mode' = 'tableau';
-- Paimon テーブルからデータをクエリします。
SELECT kind, SUM(price) FROM test_tbl GROUP BY kind;ステップ 6: リソースのクリア
テストが完了したら、リソースリークを防ぐために書き込み操作を停止します。
次の Flink SQL ステートメントを実行して、Paimon テーブルを削除します。
DROP TABLE test_tbl;