このトピックでは、Fluid を使用してデータアクセスを高速化する方法と、JindoRuntime を使用して Object Storage Service (OSS) に格納されているデータへのアクセスを高速化する方法について説明します。 このトピックを参照して、データ集約型アプリケーションのパフォーマンスを向上させることができます。
前提条件
ack-spark-operator コンポーネントがインストールされていること。 詳細については、「手順 1: ack-spark-operator コンポーネントをインストールする」をご参照ください。
説明このトピックの Spark ジョブ構成では、
spark.jobNamespaces=["spark"]設定が指定されています。 別の名前空間を使用する場合は、Spark ジョブ構成のnamespaceパラメーターを変更する必要があります。クラウドネイティブ AI スイートがインストールされ、ack-fluid コンポーネントがデプロイされていること。 詳細については、「クラウドネイティブ AI スイートをインストールする」をご参照ください。
テストデータが準備され、OSS バケットにアップロードされていること。 詳細については、「テストデータを準備して OSS バケットにアップロードする」をご参照ください。
はじめに - Fluid
Fluid は、分散データセット向けのオープンソースの Kubernetes ネイティブ オーケストレーターおよびアクセラレーターです。 Fluid は、ビッグデータ アプリケーションや AI アプリケーションなど、クラウドネイティブ シナリオのデータ集約型アプリケーション向けに開発されています。 Fluid の主な機能:
Fluid は、データセットの抽象化をネイティブでサポートしています。 この機能は、データ集約型アプリケーションの基本的なサポートを提供し、効率的なデータアクセスを可能にし、複数の側面でデータ管理の費用対効果を向上させます。
Fluid は、サードパーティのストレージ サービスとの統合のための統一インターフェイスを備えた拡張可能なデータエンジン プラグインを提供します。さまざまなランタイムがサポートされています。
Fluid はデータ操作を自動化し、自動 O&M システムと統合するための複数のモードをサポートします。
Fluid は、データキャッシュ技術と弾力的なスケーリングおよびデータアフィニティスケジューリングを組み合わせることで、データアクセスを高速化します。
Fluid はランタイム プラットフォームに依存せず、Kubernetes クラスター、Container Service for Kubernetes (ACK) Edge クラスター、および ACK Serverless Kubernetes クラスターをサポートしています。 Fluid は、マルチクラスター シナリオやハイブリッドクラウド シナリオにも適しています。
Fluid の詳細については、「エラスティックデータセット」をご参照ください。
手順 1: Fluid 専用のノードプールを作成する
ACK クラスターに Fluid 専用の fluid という名前のノードプールを作成します。 このノードプールは、Fluid に JindoRuntime ワーカー ポッドをデプロイするために使用されます。 この例で作成されたノードプールには、ネットワーク拡張型ビッグデータ インスタンスファミリーに属する ecs.d1ne.4xlarge インスタンスにデプロイされた 3 つのノードが含まれています。 fluid-cloudnative.github.io/node="true" ラベルと fluid-cloudnative.github.io/node="true":NoSchedule taint が各ノードに追加されます。 各ノードには、8 つの 5,905 GB 高スループット ローカル SATA HDD が搭載されており、フォーマットされて /mnt/disk1、/mnt/disk2、...、/mnt/disk8 ディレクトリにアタッチされています。 ノードプールの作成方法の詳細については、「ノードプールを作成および管理する」をご参照ください。 ノードプールのインスタンスタイプの選択方法の詳細については、「Fluid のキャッシュ最適化ポリシーのベストプラクティス」をご参照ください。
手順 2: データセットを作成する
fluid-oss-secret.yamlという名前の Secret YAML ファイルを作成し、OSS バケットにアクセスするために使用する認証情報を格納します。<ACCESS_KEY_ID>と<ACCESS_KEY_SECRET>を Alibaba Cloud アカウントの AccessKey ペアに置き換えます。apiVersion: v1 kind: Secret metadata: name: fluid-oss-secret namespace: spark stringData: OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
次のコマンドを実行して Secret を作成します。
kubectl create -f fluid-oss-secret.yaml予期される出力:
secret/fluid-oss-secret created
spark-fluid-dataset.yamlという名前の YAML ファイルを作成し、次の内容をファイルにコピーします。apiVersion: data.fluid.io/v1alpha1 kind: Dataset metadata: name: spark namespace: spark spec: mounts: - name: spark # データアクセスを高速化したい OSS パス。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。 mountPoint: oss://<OSS_BUCKET>/ path: / options: # OSS バケットのエンドポイント。 <OSS_ENDPOINT> を OSS バケットのエンドポイントに置き換えます。 # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。 fs.oss.endpoint: <OSS_ENDPOINT> encryptOptions: - name: fs.oss.accessKeyId valueFrom: secretKeyRef: name: fluid-oss-secret key: OSS_ACCESS_KEY_ID - name: fs.oss.accessKeySecret valueFrom: secretKeyRef: name: fluid-oss-secret key: OSS_ACCESS_KEY_SECRET # 次のアフィニティルールに一致するノードにデータがキャッシュされます。 nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: fluid-cloudnative.github.io/node operator: In values: - "true" # 特定のノード taint の許容。 tolerations: - key: fluid-cloudnative.github.io/node operator: Equal value: "true" effect: NoSchedule次のリストは、上記のコードブロックのパラメーターについて説明しています。
mountPoint: データアクセスを高速化したい OSS パス。fs.oss.endpoint: OSS バケットのエンドポイント。 たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントはoss-cn-beijing-internal.aliyuncs.comです。encryptOptions:fluid-oss-secretSecret のOSS_ACCESS_KEY_IDパラメーターとOSS_ACCESS_KEY_SECRETパラメーターから OSS バケットにアクセスするために使用される認証情報を取得します。
次のコマンドを実行してデータセットを作成します。
kubectl create -f spark-fluid-dataset.yaml予期される出力:
dataset.data.fluid.io/spark created次のコマンドを実行してデータセットのステータスをクエリします。
kubectl get -n spark dataset spark -o wide予期される出力:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark NotBound 58m出力は、データセットが
NotBound状態であることを示しています。
手順 3: JindoRuntime を作成する
spark-fluid-jindoruntime.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、JindoRuntime を作成するために使用されます。apiVersion: data.fluid.io/v1alpha1 kind: JindoRuntime metadata: # 名前は、作成したデータセットの名前と同じである必要があります。 name: spark namespace: spark spec: # ワーカー ポッドの数。 replicas: 3 tieredstore: levels: # キャッシュタイプは HDD です。 - mediumtype: HDD # データセットのタイプ。 volumeType: hostPath # ノードによって提供されるディスクの数に基づいて値を設定します。 path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8 # 各ワーカー ポッドのキャッシュ容量。 quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi high: "0.99" low: "0.95" worker: resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi次のリストは、上記のコードブロックのパラメーターについて説明しています。
replicas: JindoFS クラスター内のワーカー ポッドの数。mediumtype: キャッシュのタイプ。path: ストレージパス。quota: キャッシュシステムの最大容量。high: ストレージ容量の上限。low: ストレージ容量の下限。
次のコマンドを実行して JindoRuntime を作成します。
kubectl create -f spark-fluid-jindoruntime.yaml予期される出力:
jindoruntime.data.fluid.io/spark created次のコマンドを実行して JindoRuntime のステータスをクエリします。
kubectl get -n spark jindoruntime spark予期される出力:
NAME MASTER PHASE WORKER PHASE FUSE PHASE AGE spark Ready Ready Ready 2m28s出力の
FUSE PHASE列にReadyと表示されています。 これは、JindoRuntime がデプロイされていることを示しています。次のコマンドを実行してデータセットのステータスをクエリします。
kubectl get -n spark dataset spark -o wide予期される出力:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark [Calculating] 0.00B 128.91TiB Bound spark-jindofs-master-0.spark:19434 [Calculating] 2m5出力は、データセットが
Bound状態であることを示しています。 これは、データセットがデプロイされていることを示しています。
手順 4: (オプション)データをプリフェッチする
初回のクエリはキャッシュにヒットしません。 初回のクエリを高速化するために、Fluid はデータをプリフェッチするために使用できる DataLoad リソースを提供します。 データプリフェッチは、データをキャッシュに事前にロードします。 これにより、データアクセスが高速化され、データ処理効率とシステムパフォーマンスが向上します。
spark-fluid-dataload.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、DataLoad を作成するために使用されます。apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: spark namespace: spark spec: dataset: name: spark namespace: spark loadMetadata: true次のコマンドを実行して DataLoad を作成します。
kubectl create -f spark-fluid-dataload.yaml予期される出力:
dataload.data.fluid.io/spark created次のコマンドを実行してデータプリフェッチの進捗状況をクエリします。
kubectl get -n spark dataload spark -w予期される出力:
NAME DATASET PHASE AGE DURATION spark spark Executing 20s Unfinished spark spark Complete 9m31s 8m37s出力の DURATION 列に
8m37sと表示されています。これは、データプリフェッチが 8 分 37 秒で完了したことを示しています。次のコマンドを実行してデータセットのステータスをクエリします。
kubectl get -n spark dataset spark -o wide予期される出力:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark 0.00B 326.85GiB 128.91TiB 0.0% Bound spark-jindofs-master-0.spark:19434 [Calculating] 19m出力の CACHED 列に
326.85GiBと表示されています。これは、データがキャッシュにプリロードされていることを示しています。 前回のクエリの出力には0.00Bと表示されていました。
手順 5: Spark ジョブを実行する
方法 1: 移植可能なオペレーティングシステムインタフェース (POSIX) API を使用する
spark-pagerank-fluid-posix.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、SparkApplication を作成するために使用されます。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-fluid-posix namespace: spark spec: type: Scala mode: cluster image: spark:3.5.4 mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # file:// 形式を使用してローカルファイルにアクセスします。 - file:///mnt/fluid/data/pagerank_dataset.txt - "10" sparkVersion: 3.5.4 driver: cores: 1 coreLimit: 1200m memory: 512m volumeMounts: # データセットで使用される永続ボリューム要求 (PVC) を /mnt/fluid パスにマウントします。 - name: spark mountPath: /mnt/fluid serviceAccount: spark-operator-spark executor: instances: 2 cores: 1 coreLimit: "1" memory: 4g volumeMounts: # データセットで使用される PVC を /mnt/fluid パスにマウントします。 - name: spark mountPath: /mnt/fluid volumes: # Fluid によってデータセット用に作成された PVC を指定します。 PVC 名はデータセット名と同じです。 - name: spark persistentVolumeClaim: claimName: spark restartPolicy: type: Never説明上記のサンプルコードブロックでは、Spark コミュニティが提供するイメージを使用しています。 ネットワークの問題によりイメージをプルできない場合は、イメージをイメージリポジトリに同期することをお勧めします。 カスタムイメージを作成してイメージリポジトリにプッシュすることもできます。
次のコマンドを実行してジョブを送信します。
kubectl create -f spark-pagerank-fluid-posix.yaml予期される出力:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix created次のコマンドを実行して Spark ジョブのステータスを表示します。
kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -w予期される出力:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 87s spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 102s spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 102s spark-pagerank-fluid-posix SUCCEEDING 1 2025-01-16T11:06:15Z 2025-01-16T11:07:59Z 104s spark-pagerank-fluid-posix COMPLETED 1 2025-01-16T11:06:15Z 2025-01-16T11:07:59Z 104s出力は、ジョブが完了したことを示しています。
方法 2: Hadoop Compatible File System (HCFS) API を使用する
次のコマンドを実行して、データセットで使用される HCFS の URL をクエリします。
kubectl get -n spark dataset spark -o wide予期される出力:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark 0.00B 326.85GiB 128.91TiB 0.0% Bound spark-jindofs-master-0.spark:19434 [Calculating] 30m出力は、データセットの HCFS URL が
spark-jindofs-master-0.spark:19434であることを示しています。 Spark ジョブを構成する場合は、fs.jindofsx.namespace.rpc.addressパラメーターを HCFS URL に設定する必要があります。spark-pagerank-fluid-hcfs.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、SparkApplication を作成するために使用されます。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-fluid-hcfs namespace: spark spec: type: Scala mode: cluster # <SPARK_IMAGE> を使用する Spark イメージに置き換えます。 イメージには JindoSDK 依存関係が含まれている必要があります。 image: <SPARK_IMAGE> mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # 次の方法のいずれかを選択します。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。 # 方法 1: oss:// 形式を使用して OSS バケットにアクセスします。 - oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 方法 2: s3:// 形式を使用して OSS バケットにアクセスします。 # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt # 方法 3: s3a:// 形式を使用して OSS バケットにアクセスします。 # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt # 反復回数。 - "10" sparkVersion: 3.5.4 hadoopConf: #=================== # OSS アクセス構成。 #=================== # oss:// 形式を使用して OSS バケットにアクセスできます。 fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem # OSS バケットのエンドポイント。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。 # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。 fs.oss.endpoint: <OSS_ENDPOINT> # 環境変数から OSS バケットの認証情報を取得します。 fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider # s3:// 形式を使用して OSS バケットにアクセスできます。 fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem # OSS バケットのエンドポイント。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。 # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。 fs.s3.endpoint: <OSS_ENDPOINT> # 環境変数から OSS バケットの認証情報を取得します。 fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider # s3a:// 形式を使用して OSS バケットにアクセスできます。 fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem # OSS バケットのエンドポイント。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。 # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。 fs.s3a.endpoint: <OSS_ENDPOINT> # 環境変数から OSS バケットの認証情報を取得します。 fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider #=================== # JindoFS 構成。 #=================== fs.xengine: jindofsx # データセットの HCFS URL。 fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434 fs.jindofsx.data.cache.enable: "true" driver: cores: 1 coreLimit: 1200m memory: 512m envFrom: - secretRef: name: spark-oss-secret serviceAccount: spark-operator-spark executor: instances: 2 cores: 2 coreLimit: "2" memory: 8g envFrom: - secretRef: name: spark-oss-secret restartPolicy: type: Never説明上記のコードブロックで使用される Spark イメージには、JindoSDK 依存関係が含まれている必要があります。 次の Dockerfile テンプレートを使用してカスタムイメージを作成し、イメージリポジトリにプッシュできます。
ARG SPARK_IMAGE=spark:3.5.4 FROM ${SPARK_IMAGE} # JindoSDK サポートの依存関係を追加します ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars次のコマンドを実行して Spark ジョブを送信します。
kubectl create -f spark-pagerank-fluid-hcfs.yaml予期される出力:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs create次のコマンドを実行して Spark ジョブのステータスをクエリします。
kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -w予期される出力:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 9s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 15s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 77s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 77s spark-pagerank-fluid-hcfs SUCCEEDING 1 2025-01-16T11:21:16Z 2025-01-16T11:22:34Z 78s spark-pagerank-fluid-hcfs COMPLETED 1 2025-01-16T11:21:16Z 2025-01-16T11:22:34Z 78s
手順 6: (オプション)環境をクリアする
上記の手順を完了した後、次のコマンドを実行して不要になったリソースを削除できます。
kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml