すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:Fluid を使用して Spark アプリケーションのデータアクセスを高速化する

最終更新日:Mar 13, 2025

このトピックでは、Fluid を使用してデータアクセスを高速化する方法と、JindoRuntime を使用して Object Storage Service (OSS) に格納されているデータへのアクセスを高速化する方法について説明します。 このトピックを参照して、データ集約型アプリケーションのパフォーマンスを向上させることができます。

前提条件

はじめに - Fluid

Fluid は、分散データセット向けのオープンソースの Kubernetes ネイティブ オーケストレーターおよびアクセラレーターです。 Fluid は、ビッグデータ アプリケーションや AI アプリケーションなど、クラウドネイティブ シナリオのデータ集約型アプリケーション向けに開発されています。 Fluid の主な機能:

  • Fluid は、データセットの抽象化をネイティブでサポートしています。 この機能は、データ集約型アプリケーションの基本的なサポートを提供し、効率的なデータアクセスを可能にし、複数の側面でデータ管理の費用対効果を向上させます。

  • Fluid は、サードパーティのストレージ サービスとの統合のための統一インターフェイスを備えた拡張可能なデータエンジン プラグインを提供します。さまざまなランタイムがサポートされています。

  • Fluid はデータ操作を自動化し、自動 O&M システムと統合するための複数のモードをサポートします。

  • Fluid は、データキャッシュ技術と弾力的なスケーリングおよびデータアフィニティスケジューリングを組み合わせることで、データアクセスを高速化します。

  • Fluid はランタイム プラットフォームに依存せず、Kubernetes クラスター、Container Service for Kubernetes (ACK) Edge クラスター、および ACK Serverless Kubernetes クラスターをサポートしています。 Fluid は、マルチクラスター シナリオやハイブリッドクラウド シナリオにも適しています。

Fluid の詳細については、「エラスティックデータセット」をご参照ください。

手順 1: Fluid 専用のノードプールを作成する

ACK クラスターに Fluid 専用の fluid という名前のノードプールを作成します。 このノードプールは、Fluid に JindoRuntime ワーカー ポッドをデプロイするために使用されます。 この例で作成されたノードプールには、ネットワーク拡張型ビッグデータ インスタンスファミリーに属する ecs.d1ne.4xlarge インスタンスにデプロイされた 3 つのノードが含まれています。 fluid-cloudnative.github.io/node="true" ラベルと fluid-cloudnative.github.io/node="true":NoSchedule taint が各ノードに追加されます。 各ノードには、8 つの 5,905 GB 高スループット ローカル SATA HDD が搭載されており、フォーマットされて /mnt/disk1/mnt/disk2、...、/mnt/disk8 ディレクトリにアタッチされています。 ノードプールの作成方法の詳細については、「ノードプールを作成および管理する」をご参照ください。 ノードプールのインスタンスタイプの選択方法の詳細については、「Fluid のキャッシュ最適化ポリシーのベストプラクティス」をご参照ください。

手順 2: データセットを作成する

  1. fluid-oss-secret.yaml という名前の Secret YAML ファイルを作成し、OSS バケットにアクセスするために使用する認証情報を格納します。

    <ACCESS_KEY_ID><ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey ペアに置き換えます。

    apiVersion: v1
    kind: Secret
    metadata:
      name: fluid-oss-secret
      namespace: spark
    stringData:
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  1. 次のコマンドを実行して Secret を作成します。

    kubectl create -f fluid-oss-secret.yaml

    予期される出力:

    secret/fluid-oss-secret created
  1. spark-fluid-dataset.yaml という名前の YAML ファイルを作成し、次の内容をファイルにコピーします。

    apiVersion: data.fluid.io/v1alpha1
    kind: Dataset
    metadata:
      name: spark
      namespace: spark
    spec:
      mounts:
      - name: spark
        # データアクセスを高速化したい OSS パス。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。
        mountPoint: oss://<OSS_BUCKET>/
        path: /
        options:
          # OSS バケットのエンドポイント。 <OSS_ENDPOINT> を OSS バケットのエンドポイントに置き換えます。
          # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
          fs.oss.endpoint: <OSS_ENDPOINT>
        encryptOptions:
        - name: fs.oss.accessKeyId
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_ID
        - name: fs.oss.accessKeySecret
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_SECRET
      # 次のアフィニティルールに一致するノードにデータがキャッシュされます。
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: fluid-cloudnative.github.io/node
              operator: In
              values:
              - "true"
      # 特定のノード taint の許容。
      tolerations:
      - key: fluid-cloudnative.github.io/node
        operator: Equal
        value: "true"
        effect: NoSchedule

    次のリストは、上記のコードブロックのパラメーターについて説明しています。

    • mountPoint: データアクセスを高速化したい OSS パス。

    • fs.oss.endpoint: OSS バケットのエンドポイント。 たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。

    • encryptOptions: fluid-oss-secret Secret の OSS_ACCESS_KEY_ID パラメーターと OSS_ACCESS_KEY_SECRET パラメーターから OSS バケットにアクセスするために使用される認証情報を取得します。

  2. 次のコマンドを実行してデータセットを作成します。

    kubectl create -f spark-fluid-dataset.yaml

    予期される出力:

    dataset.data.fluid.io/spark created
  3. 次のコマンドを実行してデータセットのステータスをクエリします。

    kubectl get -n spark dataset spark -o wide

    予期される出力:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE      HCFS URL   TOTAL FILES   CACHE HIT RATIO   AGE
    spark                                                                  NotBound                                              58m

    出力は、データセットが NotBound 状態であることを示しています。

手順 3: JindoRuntime を作成する

  1. spark-fluid-jindoruntime.yaml という名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、JindoRuntime を作成するために使用されます。

    apiVersion: data.fluid.io/v1alpha1
    kind: JindoRuntime
    metadata:
      # 名前は、作成したデータセットの名前と同じである必要があります。
      name: spark
      namespace: spark
    spec:
      # ワーカー ポッドの数。
      replicas: 3
      tieredstore:
        levels:
        # キャッシュタイプは HDD です。
        - mediumtype: HDD
          # データセットのタイプ。
          volumeType: hostPath
          # ノードによって提供されるディスクの数に基づいて値を設定します。
          path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8
          # 各ワーカー ポッドのキャッシュ容量。
          quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi
          high: "0.99"
          low: "0.95"
      worker:
        resources:
          requests:
            cpu: 14
            memory: 56Gi
          limits:
            cpu: 14
            memory: 56Gi

    次のリストは、上記のコードブロックのパラメーターについて説明しています。

    • replicas: JindoFS クラスター内のワーカー ポッドの数。

    • mediumtype: キャッシュのタイプ。

    • path: ストレージパス。

    • quota: キャッシュシステムの最大容量。

    • high: ストレージ容量の上限。

    • low: ストレージ容量の下限。

  1. 次のコマンドを実行して JindoRuntime を作成します。

    kubectl create -f spark-fluid-jindoruntime.yaml

    予期される出力:

    jindoruntime.data.fluid.io/spark created
  2. 次のコマンドを実行して JindoRuntime のステータスをクエリします。

    kubectl get -n spark jindoruntime spark

    予期される出力:

    NAME    MASTER PHASE   WORKER PHASE   FUSE PHASE   AGE
    spark   Ready          Ready          Ready        2m28s

    出力の FUSE PHASE 列に Ready と表示されています。 これは、JindoRuntime がデプロイされていることを示しています。

  3. 次のコマンドを実行してデータセットのステータスをクエリします。

    kubectl get -n spark dataset spark -o wide

    予期される出力:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   [Calculating]    0.00B    128.91TiB                            Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     2m5

    出力は、データセットが Bound 状態であることを示しています。 これは、データセットがデプロイされていることを示しています。

手順 4: (オプション)データをプリフェッチする

初回のクエリはキャッシュにヒットしません。 初回のクエリを高速化するために、Fluid はデータをプリフェッチするために使用できる DataLoad リソースを提供します。 データプリフェッチは、データをキャッシュに事前にロードします。 これにより、データアクセスが高速化され、データ処理効率とシステムパフォーマンスが向上します。

  1. spark-fluid-dataload.yaml という名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、DataLoad を作成するために使用されます。

    apiVersion: data.fluid.io/v1alpha1
    kind: DataLoad
    metadata:
      name: spark
      namespace: spark
    spec:
      dataset:
        name: spark
        namespace: spark
      loadMetadata: true
  2. 次のコマンドを実行して DataLoad を作成します。

    kubectl create -f spark-fluid-dataload.yaml

    予期される出力:

    dataload.data.fluid.io/spark created
  3. 次のコマンドを実行してデータプリフェッチの進捗状況をクエリします。

    kubectl get -n spark dataload spark -w

    予期される出力:

    NAME    DATASET   PHASE      AGE     DURATION
    spark   spark     Executing   20s   Unfinished
    spark   spark     Complete   9m31s   8m37s

    出力の DURATION 列に 8m37s と表示されています。これは、データプリフェッチが 8 分 37 秒で完了したことを示しています。

  4. 次のコマンドを実行してデータセットのステータスをクエリします。

    kubectl get -n spark dataset spark -o wide

    予期される出力:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     19m

    出力の CACHED 列に 326.85GiB と表示されています。これは、データがキャッシュにプリロードされていることを示しています。 前回のクエリの出力には 0.00B と表示されていました。

手順 5: Spark ジョブを実行する

方法 1: 移植可能なオペレーティングシステムインタフェース (POSIX) API を使用する

  1. spark-pagerank-fluid-posix.yaml という名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、SparkApplication を作成するために使用されます。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-posix
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      image: spark:3.5.4
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # file:// 形式を使用してローカルファイルにアクセスします。
      - file:///mnt/fluid/data/pagerank_dataset.txt
      - "10"
      sparkVersion: 3.5.4
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        volumeMounts:
        # データセットで使用される永続ボリューム要求 (PVC) を /mnt/fluid パスにマウントします。
        - name: spark
          mountPath: /mnt/fluid
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 1
        coreLimit: "1"
        memory: 4g
        volumeMounts:
        # データセットで使用される PVC を /mnt/fluid パスにマウントします。
        - name: spark
          mountPath: /mnt/fluid
      volumes:
      # Fluid によってデータセット用に作成された PVC を指定します。 PVC 名はデータセット名と同じです。
      - name: spark
        persistentVolumeClaim:
          claimName: spark
      restartPolicy:
        type: Never
    説明

    上記のサンプルコードブロックでは、Spark コミュニティが提供するイメージを使用しています。 ネットワークの問題によりイメージをプルできない場合は、イメージをイメージリポジトリに同期することをお勧めします。 カスタムイメージを作成してイメージリポジトリにプッシュすることもできます。

  2. 次のコマンドを実行してジョブを送信します。

    kubectl create -f spark-pagerank-fluid-posix.yaml

    予期される出力:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix created
  3. 次のコマンドを実行して Spark ジョブのステータスを表示します。

    kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -w

    予期される出力:

    NAME                         STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   87s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   SUCCEEDING   1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s
    spark-pagerank-fluid-posix   COMPLETED    1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s

    出力は、ジョブが完了したことを示しています。

方法 2: Hadoop Compatible File System (HCFS) API を使用する

  1. 次のコマンドを実行して、データセットで使用される HCFS の URL をクエリします。

    kubectl get -n spark dataset spark -o wide

    予期される出力:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     30m

    出力は、データセットの HCFS URL が spark-jindofs-master-0.spark:19434 であることを示しています。 Spark ジョブを構成する場合は、fs.jindofsx.namespace.rpc.address パラメーターを HCFS URL に設定する必要があります。

  2. spark-pagerank-fluid-hcfs.yaml という名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、SparkApplication を作成するために使用されます。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-hcfs
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      # <SPARK_IMAGE> を使用する Spark イメージに置き換えます。 イメージには JindoSDK 依存関係が含まれている必要があります。
      image: <SPARK_IMAGE>
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # 次の方法のいずれかを選択します。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。
      # 方法 1: oss:// 形式を使用して OSS バケットにアクセスします。
      - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 方法 2: s3:// 形式を使用して OSS バケットにアクセスします。
      # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 方法 3: s3a:// 形式を使用して OSS バケットにアクセスします。
      # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt
      # 反復回数。
      - "10"
      sparkVersion: 3.5.4
      hadoopConf:
        #===================
        # OSS アクセス構成。
        #===================
        # oss:// 形式を使用して OSS バケットにアクセスできます。
        fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
        # OSS バケットのエンドポイント。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。
        # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
        fs.oss.endpoint: <OSS_ENDPOINT>
        # 環境変数から OSS バケットの認証情報を取得します。
        fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # s3:// 形式を使用して OSS バケットにアクセスできます。
        fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # OSS バケットのエンドポイント。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。
        # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
        fs.s3.endpoint: <OSS_ENDPOINT>
        # 環境変数から OSS バケットの認証情報を取得します。
        fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # s3a:// 形式を使用して OSS バケットにアクセスできます。
        fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # OSS バケットのエンドポイント。 <OSS_BUCKET> を OSS バケットの名前に置き換えます。
        # たとえば、中国 (北京) リージョンの OSS バケットの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
        fs.s3a.endpoint: <OSS_ENDPOINT>
        # 環境変数から OSS バケットの認証情報を取得します。
        fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        #===================
        # JindoFS 構成。
        #===================
        fs.xengine: jindofsx
        # データセットの HCFS URL。
        fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434
        fs.jindofsx.data.cache.enable: "true"
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        envFrom:
        - secretRef:
            name: spark-oss-secret
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 2
        coreLimit: "2"
        memory: 8g
        envFrom:
        - secretRef:
            name: spark-oss-secret
      restartPolicy:
        type: Never
    説明

    上記のコードブロックで使用される Spark イメージには、JindoSDK 依存関係が含まれている必要があります。 次の Dockerfile テンプレートを使用してカスタムイメージを作成し、イメージリポジトリにプッシュできます。

    ARG SPARK_IMAGE=spark:3.5.4
    
    FROM ${SPARK_IMAGE}
    
    # JindoSDK サポートの依存関係を追加します
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
  3. 次のコマンドを実行して Spark ジョブを送信します。

    kubectl create -f spark-pagerank-fluid-hcfs.yaml

    予期される出力:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs create
  4. 次のコマンドを実行して Spark ジョブのステータスをクエリします。

    kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -w

    予期される出力:

    NAME                        STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   9s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   15s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   SUCCEEDING   1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s
    spark-pagerank-fluid-hcfs   COMPLETED    1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s

手順 6: (オプション)環境をクリアする

上記の手順を完了した後、次のコマンドを実行して不要になったリソースを削除できます。

kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml