このトピックでは、リアルタイムコンピュート for Apache FlinkとE-MapReduce(EMR)Serverless Sparkを使用して、Paimonデータレイク分析プロセスを構築する方法について説明します。このプロセスにより、オブジェクトストレージサービス(OSS)へのデータの書き込み、インタラクティブクエリの実行、オフラインデータの圧縮が可能になります。EMR Serverless SparkはPaimonと完全に互換性があります。 EMR Serverless SparkはData Lake Formation(DLF)と統合されており、リアルタイムコンピュート for Apache Flinkなどの他のクラウドサービスとのメタデータ共有を可能にします。これは、バッチ処理とストリーム処理を統合するためのソリューションを策定するのに役立ちます。 EMR Serverless Sparkでは、ジョブを実行し、ジョブパラメータを柔軟に設定して、リアルタイム分析とジョブスケジューリングのさまざまな要件に対応できます。
背景情報
リアルタイムコンピュート for Apache Flink
Alibaba Cloudリアルタイムコンピュート for Apache Flinkは、エンドツーエンドの開発、運用、管理をサポートする、フルマネージドのすぐに使えるサーバーレスFlinkサービスです。リアルタイムコンピュート for Apache Flinkは複数の課金方法をサポートしています。リアルタイムコンピュート for Apache Flinkは、ドラフト開発、データデバッグ、運用と監視、自動チューニング、インテリジェント診断など、プロジェクトライフサイクル全体にわたる強力な機能も提供します。詳細については、「Alibaba Cloudリアルタイムコンピュート for Apache Flinkとは」をご参照ください。
Apache Paimon
Apache Paimonは統合データレイクフォーマットです。 Apache Paimonは、Apache FlinkおよびApache Sparkと連携して、バッチ処理とストリーム処理の統合をサポートするリアルタイムのレイクハウスアーキテクチャを構築できます。 Apache Paimonは、レイクフォーマットとログ構造化マージツリー(LSM)テクノロジーを革新的に組み合わせることで、リアルタイムのストリーム更新とストリームコンピューティングをサポートします。詳細については、「Apache Paimon」をご参照ください。
手順
ステップ1:リアルタイムコンピュート for Apache FlinkでPaimonカタログを作成する
Apache Paimonカタログを使用すると、同じウェアハウスディレクトリにあるすべてのApache Paimonテーブルを効率的に管理できます。 Apache Paimonカタログは、他のAlibaba Cloudサービスでも使用できます。 Apache Paimonカタログの作成と使用方法については、「Apache Paimonカタログの管理」をご参照ください。
目的のワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。
Apache Paimonカタログを作成します。
左側のナビゲーションペインで、[開発] > [スクリプト] を選択します。
スクリプトタブで、
アイコンをクリックしてスクリプトを作成します。
スクリプトエディタにSQLコードを入力します。
サンプルコード:
CREATE CATALOG `paimon` WITH ( 'type' = 'paimon', 'metastore' = 'dlf', 'warehouse' = '<warehouse>', 'dlf.catalog.id' = '<dlf.catalog.id>', 'dlf.catalog.accessKeyId' = '<dlf.catalog.accessKeyId>', 'dlf.catalog.accessKeySecret' = '<dlf.catalog.accessKeySecret>', 'dlf.catalog.endpoint' = '<dlf.catalog.endpoint>', 'dlf.catalog.region' = '<dlf.catalog.region>', );
パラメータ
説明
必須
備考
paimon
Apache Paimonカタログの名前。
はい
カスタム名を入力します。
type
カタログのタイプ。
はい
値をpaimonに設定します。
metastore
メタデータストレージタイプ。
はい
値をdlfに設定します。 DLFを使用してメタデータを一元管理し、エンジン間でシームレスな統合を実現できます。
warehouse
データウェアハウスディレクトリ。
はい
ビジネス要件に基づいてこのパラメータを設定します。
dlf.catalog.id
DLFデータカタログのID。
はい
DLFコンソールでデータカタログのIDを確認できます
dlf.catalog.accessKeyId
DLFへのアクセスに使用されるAccessKey ID。
はい
AccessKeyペアの取得方法については、「AccessKeyの作成」をご参照ください。
dlf.catalog.accessKeySecret
DLFへのアクセスに使用されるAccessKeyシークレット。
はい
AccessKeyペアの取得方法については、「AccessKeyの作成」をご参照ください。
dlf.catalog.endpoint
DLFのエンドポイント。
はい
詳細については、「サポートされているリージョンとエンドポイント」をご参照ください。
説明DLFがリアルタイムコンピュート for Apache Flinkと同じリージョンにある場合、VPCエンドポイントが使用されます。それ以外の場合、パブリックエンドポイントが使用されます。
dlf.catalog.region
DLFが存在するリージョン。
はい
詳細については、「サポートされているリージョンとエンドポイント」をご参照ください。
説明このパラメータの値が、dlf.catalog.endpointパラメータで指定されたエンドポイントと一致していることを確認してください。
セッションクラスタを選択または作成します。
スクリプト編集ページの右下隅にある 環境 ドロップダウンリストから、セッションクラスタを選択します。 Ververica Runtime(VVR) 8.0.4以降を使用するセッションクラスタを選択できます。使用可能なセッションクラスタがない場合は、セッションクラスタを作成します。セッションクラスタの作成方法については、「デプロイメントのデバッグ」トピックの「ステップ1:セッションクラスタを作成する」セクションをご参照ください。
実行するコードを選択し、コードの左側にある [実行] をクリックします。
Apache Paimonテーブルを作成します。
[スクリプト] エディタで、次のコードを入力して選択します。次に、スクリプトエディタの左上隅にある [実行] をクリックします。
CREATE TABLE IF NOT EXISTS `paimon`.`test_paimon_db`.`test_append_tbl` ( id STRING, data STRING, category INT, ts STRING, dt STRING, hh STRING ) PARTITIONED BY (dt, hh) WITH ( 'write-only' = 'true' );
ストリーミングドラフトを作成します。
ストリーミングドラフトを作成します。
左側のナビゲーションペインで、[開発] > [ETL] を選択します。
ドラフト タブで、[新規(ドラフトの作成)] アイコンをクリックします。
[新規ドラフト] ダイアログボックスの SQLスクリプト タブで、[空のストリームドラフト] をクリックします。
[次へ] をクリックします。
[新規ドラフト] ダイアログボックスで、ドラフトのパラメータを設定します。次の表にパラメータを示します。
パラメータ
説明
[名前]
作成するドラフトの名前。
説明ドラフト名は、現在のプロジェクト内で一意である必要があります。
[場所]
ドラフトのコードファイルが保存されるフォルダ。
既存のフォルダの右側にある
アイコンをクリックして、サブフォルダを作成することもできます。
[エンジンバージョン]
ドラフトで使用されるFlinkのエンジンバージョン。エンジンバージョン、バージョンマッピング、各バージョンのライフサイクルにおける重要な時点については、「エンジンバージョン」をご参照ください。
[作成] をクリックします。
コードを記述します。
ドラフトエディタで、次のコードを入力してDatagenを使用して継続的にデータを生成し、Paimonテーブルにデータを書き込みます。サンプルコード:
CREATE TEMPORARY TABLE datagen ( id string, data string, category int ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '100', 'fields.category.kind' = 'random', 'fields.category.min' = '1', 'fields.category.max' = '10' ); INSERT INTO `paimon`.`test_paimon_db`.`test_append_tbl` SELECT id, data, category, cast(LOCALTIMESTAMP as string) as ts, cast(CURRENT_DATE as string) as dt, cast(hour(LOCALTIMESTAMP) as string) as hh FROM datagen;
ツールバーの右上隅にある [デプロイ] をクリックして、ドラフトを本番環境にデプロイします。
[デプロイメント] ページでデプロイメントを開始します。詳細については、「デプロイメントを開始する」をご参照ください。
ステップ 2:EMR Serverless Spark で SQL セッションを作成する
SQL開発とクエリのためにSQLセッションを作成する必要があります。セッションの詳細については、「セッションの管理」をご参照ください。
セッションページに移動します。
EMRコンソールにログオンします。
左側のナビゲーションペインで、
を選択します。[spark] ページで、管理するワークスペースの名前をクリックします。
[EMR Serverless Spark] ページの左側のナビゲーションペインで、[オペレーションセンター] > [セッション] を選択します。
SQLセッションを作成します。
[SQLセッション] タブで、[SQLセッションの作成] をクリックします。
[SQLセッションの作成] ページで、パラメータを設定し、[作成] をクリックします。次の表にパラメータを示します。
パラメータ
説明
[名前]
SQLセッションの名前。例:paimon_compute。
[spark設定]
Paimonに接続するには、次のコードを入力します。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>
ビジネス要件に基づいて、次のパラメータを設定します。
<warehouse>
:データウェアハウスディレクトリ。<dlf.catalog.id>
:DLFデータカタログのID。
[アクション] 列の [開始] をクリックします。
ステップ 3:EMR Serverless Spark でインタラクティブクエリを実行し、ジョブをスケジュールする
EMR Serverless Sparkでは、インタラクティブクエリを実行し、ワークフローでジョブをスケジュールして、さまざまな要件に対応できます。インタラクティブクエリを実行して、高速クエリとデバッグを実装できます。また、ワークフローを作成してジョブを開発、公開、および保守することもできます。これは、ジョブの完全なライフサイクル管理を実装するのに役立ちます。
データ書き込みプロセス中に、EMR Serverless Sparkを使用して、いつでもPaimonテーブルでインタラクティブクエリを実行できます。これにより、データの状態をリアルタイムで取得し、データの高速分析を実行できます。開発済みのジョブを公開し、ジョブに基づいてワークフローを作成してジョブを調整し、ワークフローを公開できます。スケジューリングポリシーを設定して、ジョブを定期的にスケジュールできます。これにより、データの処理と分析を自動的かつ効率的に実行できます。
インタラクティブクエリ
SQLジョブを作成します。
[EMR Serverless Spark] ページの左側のナビゲーションペインで、[データ開発] をクリックします。
[開発] タブで、[作成] をクリックします。
[作成] ダイアログボックスで、[名前] パラメータと [タイプ] パラメータを設定し、[OK] をクリックします。この例では、[名前] パラメータは paimon_compact に設定され、[タイプ] パラメータは [sparksql] に設定されています。
ツールバーの右上隅で、[デフォルトカタログ] ドロップダウンリストからデータカタログを、[デフォルトデータベース] ドロップダウンリストからデータベースを、[SQLセッション] ドロップダウンリストから開始されたSQLコンピュートを選択します。
作成したジョブのエディタにSQLステートメントを入力します。
例 1:
test_append_tbl
テーブルから最初の 10 行のデータをクエリします。SELECT * FROM paimon.test_paimon_db.test_append_tbl limit 10;
次の図は結果を示しています。
例 2:
test_append_tbl
テーブルで特定の条件を満たす行数をカウントします。SELECT COUNT(*) FROM paimon.test_paimon_db.test_append_tbl WHERE dt = '2024-06-24' AND hh = '19';
次の図は結果を示しています。
ジョブを実行して公開します。
[実行] をクリックします。
ページの下部にある [実行結果] タブで結果を確認できます。例外が発生した場合、ページの下部にある [実行の問題] タブで例外を確認できます。
ジョブが期待どおりに実行されていることを確認します。次に、ページの右上隅にある [公開] をクリックしてジョブを公開します。
[公開] ダイアログボックスで、[備考] パラメータを設定し、[OK] をクリックします。
ジョブのスケジューリング
圧縮される前のファイルに関する情報をクエリします。
[開発] タブで、PaimonのシステムテーブルファイルをクエリするSQLジョブを作成します。これにより、圧縮される前のファイルに関する情報を取得できます。 SQLジョブの作成方法については、「SQLジョブの開発」をご参照ください。
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';
[開発] タブで、paimon_compactという名前の別のSQLジョブを作成します。作成したジョブのエディタに、圧縮操作のSQLステートメントを入力します。
SQLジョブの作成方法については、「SQLジョブの開発」をご参照ください。
CALL paimon.sys.compact ( table => 'test_paimon_db.test_append_tbl', partitions => 'dt=\"2024-06-24\",hh=\"19\"', order_strategy => 'zorder', order_by => 'category' );
ワークフローを作成します。
[EMR Serverless Spark] ページの左側のナビゲーションペインで、[オペレーションセンター] > [ワークフロー] を選択します。
[ワークフロー] タブで、[ワークフローの作成] をクリックします。
[ワークフローの作成] パネルで、[名前] パラメータを設定し、[次へ] をクリックします。この例では、[名前] パラメータは paimon_workflow_task に設定されています。
[その他の設定] セクションのパラメータは、ビジネス要件に基づいて設定できます。パラメータ設定の詳細については、「ワークフローの管理」をご参照ください。
表示されるキャンバスで、[ノードの追加] をクリックします。
[ノードの追加] パネルで、[ソースファイルパス] ドロップダウンリストからジョブ paimon_compact を選択し、[spark設定] パラメータを設定して、[保存] をクリックします。
パラメータ
説明
[名前]
SQLセッションの名前。例:paimon_compute。
[spark設定]
Paimonに接続するには、次のコードを入力します。
spark.sql.extensions org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions spark.sql.catalog.paimon org.apache.paimon.spark.SparkCatalog spark.sql.catalog.paimon.metastore dlf spark.sql.catalog.paimon.warehouse <warehouse> spark.sql.catalog.paimon.dlf.catalog.id <dlf.catalog.id>
ビジネス要件に基づいて、次のパラメータを設定します。
<warehouse>
:データウェアハウスディレクトリ。<dlf.catalog.id>
:DLFデータカタログのID。
キャンバスの右上隅にある [ワークフローの公開] をクリックします。 [公開] ダイアログボックスで、[OK] をクリックします。
ワークフローを実行します。
[ワークフロー] タブで、ワークフロー paimon_workflow_task を見つけ、[名前] 列の名前をクリックします。
[ワークフローの実行] タブの右上隅にある [手動実行] をクリックします。
[実行] メッセージで、[OK] をクリックします。
圧縮効果を確認します。
ワークフローが正常に実行された後、最初のSQLジョブを再度実行して、圧縮操作前後のファイル数、レコード数、ファイルサイズを比較します。
SELECT file_path, record_count, file_size_in_bytes FROM paimon.test_paimon_db.test_append_tbl$files WHERE partition='[2024-06-24, 19]';