All Products
Search
Document Center

Container Service for Kubernetes:Use Fluid to accelerate data access for Spark applications

Last Updated:Mar 12, 2025

This topic describes how to use Fluid to accelerate data access and how to use JindoRuntime to accelerate access to data stored in Object Storage Service (OSS). You can refer to this topic to improve the performance of data-intensive applications.

Prerequisites

Introduction to Fluid

Fluid is an open source, Kubernetes-native orchestrator and accelerator for distributed datasets. Fluid is developed for data-intensive applications in cloud-native scenarios, such as big data applications and AI applications. Key features of Fluid:

  • Fluid provides native support for dataset abstraction. This feature provides fundamental support for data-intensive applications, enables efficient data access, and improves the cost-effectiveness of data management in multiple aspects.

  • Fluid provides an extensible data engine plug-in with a unified interface for integration with third-party storage services. A variety of runtimes are supported.

  • Fluid automates data operations and supports multiple modes to integrate with automated O&M systems.

  • Fluid accelerates data access by combining the data caching technology with elastic scaling and data affinity-scheduling.

  • Fluid is independent of runtime platforms and supports Kubernetes clusters, Container Service for Kubernetes (ACK) Edge clusters, and ACK Serverless clusters. Fluid is also suitable for multi-cluster scenarios and hybrid cloud scenarios.

For more information about Fluid, see Elastic datasets.

Step 1: Create a dedicated node pool for Fluid

Create a dedicated node pool named fluid for Fluid in your ACK cluster. The node pool is used to deploy the JindoRuntime worker pods in Fluid. The node pool created in this example contains three nodes that are deployed on ecs.d1ne.4xlarge instances, which belong to the network-enhanced big data instance family. The fluid-cloudnative.github.io/node="true" label and the fluid-cloudnative.github.io/node="true":NoSchedule taint are added to each node. Each node is equipped with eight 5,905 GB high-throughput local SATA HDDs, which are formatted and attached to the following directories: /mnt/disk1, /mnt/disk2, ..., and /mnt/disk8. For more information about how to create a node pool, see Create and manage a node pool. For more information about how to select instance types for a node pool, see Best practices for the cache optimization policies of Fluid.

Step 2: Create a dataset

  1. Create a Secret YAML file named fluid-oss-secret.yaml to store the credentials used to access your OSS bucket.

    Replace <ACCESS_KEY_ID> and <ACCESS_KEY_SECRET> with an AccessKey pair of your Alibaba Cloud account.

    apiVersion: v1
    kind: Secret
    metadata:
      name: fluid-oss-secret
      namespace: spark
    stringData:
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  1. Run the following command to create a Secret:

    kubectl create -f fluid-oss-secret.yaml

    Expected output:

    secret/fluid-oss-secret created
  1. Create a YAML file named spark-fluid-dataset.yaml and copy the following content to the file:

    apiVersion: data.fluid.io/v1alpha1
    kind: Dataset
    metadata:
      name: spark
      namespace: spark
    spec:
      mounts:
      - name: spark
        # The OSS path for which you want to accelerate data access. Replace <OSS_BUCKET> with the name of your OSS bucket. 
        mountPoint: oss://<OSS_BUCKET>/
        path: /
        options:
          # The endpoint of the OSS bucket. Replace <OSS_ENDPOINT> with the endpoint of your OSS bucket. 
          # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. 
          fs.oss.endpoint: <OSS_ENDPOINT>
        encryptOptions:
        - name: fs.oss.accessKeyId
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_ID
        - name: fs.oss.accessKeySecret
          valueFrom:
            secretKeyRef:
              name: fluid-oss-secret
              key: OSS_ACCESS_KEY_SECRET
      # Data will be cached to nodes that match the following affinity rules. 
      nodeAffinity:
        required:
          nodeSelectorTerms:
          - matchExpressions:
            - key: fluid-cloudnative.github.io/node
              operator: In
              values:
              - "true"
      # Tolerations for specific node taints. 
      tolerations:
      - key: fluid-cloudnative.github.io/node
        operator: Equal
        value: "true"
        effect: NoSchedule

    The following list describes the parameters in the preceding code block:

    • mountPoint: the OSS path for which you want to accelerate data access.

    • fs.oss.endpoint: the endpoint of the OSS bucket. For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.

    • encryptOptions: retrieves the credentials used to access the OSS bucket from the OSS_ACCESS_KEY_ID and OSS_ACCESS_KEY_SECRET parameters of the fluid-oss-secret Secret.

  2. Run the following command to create a dataset:

    kubectl create -f spark-fluid-dataset.yaml

    Expected output:

    dataset.data.fluid.io/spark created
  3. Run the following command to query the status of the Dataset:

    kubectl get -n spark dataset spark -o wide

    Expected output:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE      HCFS URL   TOTAL FILES   CACHE HIT RATIO   AGE
    spark                                                                  NotBound                                              58m

    The output shows that the dataset is in the NotBound state.

Step 3: Create a JindoRuntime

  1. Create a file named spark-fluid-jindoruntime.yaml and copy the following content to the file. The file is used to create a JindoRuntime.

    apiVersion: data.fluid.io/v1alpha1
    kind: JindoRuntime
    metadata:
      # The name must be the same as the name of the dataset you created. 
      name: spark
      namespace: spark
    spec:
      # The number of worker pods. 
      replicas: 3
      tieredstore:
        levels:
        # The cache type is HDD. 
        - mediumtype: HDD
          # The type of the dataset. 
          volumeType: hostPath
          # Set the value based on the number of disks provided by the node. 
          path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8
          # The cache capacity of each worker pod. 
          quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi
          high: "0.99"
          low: "0.95"
      worker:
        resources:
          requests:
            cpu: 14
            memory: 56Gi
          limits:
            cpu: 14
            memory: 56Gi

    The following list describes the parameters in the preceding code block:

    • replicas: the number of worker pods in the JindoFS cluster.

    • mediumtype: the type of the cache.

    • path: the storage path.

    • quota: the maximum capacity of the cache system.

    • high: the upper limit of the storage capacity.

    • low: the lower limit of the storage capacity.

  1. Run the following commands to create a JindoRuntime:

    kubectl create -f spark-fluid-jindoruntime.yaml

    Expected output:

    jindoruntime.data.fluid.io/spark created
  2. Run the following command to query the status of the JindoRuntime:

    kubectl get -n spark jindoruntime spark

    Expected output:

    NAME    MASTER PHASE   WORKER PHASE   FUSE PHASE   AGE
    spark   Ready          Ready          Ready        2m28s

    In the output, Ready is displayed in the FUSE PHASE column. This indicates that the JindoRuntime is deployed.

  3. Run the following command to query the status of the dataset:

    kubectl get -n spark dataset spark -o wide

    Expected output:

    NAME    UFS TOTAL SIZE   CACHED   CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   [Calculating]    0.00B    128.91TiB                            Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     2m5

    The output shows that the dataset is in the Bound state. This indicates that the dataset is deployed.

Step 4: (Optional)Prefetch data

First-time queries cannot hit the cache. To accelerate first-time queries, Fluid provides the DataLoad resource that you can use to prefetch data. Data prefetching preloads data to the cache. This accelerates data access and improves data processing efficiency and system performance.

  1. Create a file named spark-fluid-dataload.yaml and copy the following content to the file. The file is used to create a DataLoad.

    apiVersion: data.fluid.io/v1alpha1
    kind: DataLoad
    metadata:
      name: spark
      namespace: spark
    spec:
      dataset:
        name: spark
        namespace: spark
      loadMetadata: true
  2. Run the following command to create a DataLoad:

    kubectl create -f spark-fluid-dataload.yaml

    Expected output:

    dataload.data.fluid.io/spark created
  3. Run the following command to query the data prefetching progress:

    kubectl get -n spark dataload spark -w

    Expected output:

    NAME    DATASET   PHASE      AGE     DURATION
    spark   spark     Executing   20s   Unfinished
    spark   spark     Complete   9m31s   8m37s

    8m37s is displayed in the DURATION column of the output, which indicates that data prefetching is completed in 8 minutes and 37 seconds.

  4. Run the following command to query the status of the dataset:

    kubectl get -n spark dataset spark -o wide

    Expected output:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     19m

    326.85GiB is displayed in the CACHED column of the output, which indicates that data is preloaded to the cache. 0.00B is displayed in the output of the previous query.

Step 5: Run a Spark job

Method 1: Use Portable Operating System Interface (POSIX) APIs

  1. Create a file named spark-pagerank-fluid-posix.yaml and copy the following content to the file. The file is used to create a SparkApplication.

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-posix
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      image: spark:3.5.4
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # Access local files by using the file:// format. 
      - file:///mnt/fluid/data/pagerank_dataset.txt
      - "10"
      sparkVersion: 3.5.4
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        volumeMounts:
        # Mount the persistent volume claim (PVC) used by the dataset to the /mnt/fluid path. 
        - name: spark
          mountPath: /mnt/fluid
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 1
        coreLimit: "1"
        memory: 4g
        volumeMounts:
        # Mount the PVC used by the dataset to the /mnt/fluid path. 
        - name: spark
          mountPath: /mnt/fluid
      volumes:
      # Specify the PVC created by Fluid for the dataset. The PVC name is the same as the dataset name. 
      - name: spark
        persistentVolumeClaim:
          claimName: spark
      restartPolicy:
        type: Never
    Note

    The preceding sample code block uses an image provided by the Spark community. If you fail to pull the image due to network issues, we recommend that you synchronize the image to your image repository. You can also build a custom image and push the image to your image repository.

  2. Run the following command to submit a job:

    kubectl create -f spark-pagerank-fluid-posix.yaml

    Expected output:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix created
  3. Run the following command to view the status of the Spark job:

    kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -w

    Expected output:

    NAME                         STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   87s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   RUNNING   1          2025-01-16T11:06:15Z   <no value>   102s
    spark-pagerank-fluid-posix   SUCCEEDING   1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s
    spark-pagerank-fluid-posix   COMPLETED    1          2025-01-16T11:06:15Z   2025-01-16T11:07:59Z   104s

    The output shows that the job is completed.

Method 2: Use Hadoop Compatible File System (HCFS) APIs

  1. Run the following command to query the URL of the HCFS used by the dataset:

    kubectl get -n spark dataset spark -o wide

    Expected output:

    NAME    UFS TOTAL SIZE   CACHED      CACHE CAPACITY   CACHED PERCENTAGE   PHASE   HCFS URL                             TOTAL FILES     CACHE HIT RATIO   AGE
    spark   0.00B            326.85GiB   128.91TiB        0.0%                Bound   spark-jindofs-master-0.spark:19434   [Calculating]                     30m

    The output shows that the HCFS URL of the dataset is spark-jindofs-master-0.spark:19434. When you configure the Spark job, you must set the fs.jindofsx.namespace.rpc.address parameter to the HCFS URL.

  2. Create a file named spark-pagerank-fluid-hcfs.yaml and copy the following content to the file. The file is used to create a SparkApplication.

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-fluid-hcfs
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      # Replace <SPARK_IMAGE> with the Spark image that you want to use. The image must include JindoSDK dependencies. 
      image: <SPARK_IMAGE>
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # Select one of the following methods. Replace <OSS_BUCKET> with the name of your OSS bucket. 
      # Method 1: Access the OSS bucket by using the oss:// format. 
      - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
      # Method 2: Access the OSS bucket by using the s3:// format. 
      # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt
      # Method 3: Access the OSS bucket by using the s3a:// format. 
      # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt
      # The number of iterations.
      - "10"
      sparkVersion: 3.5.4
      hadoopConf:
        #===================
        # OSS access configurations.
        #===================
        # You can access the OSS bucket by using the oss:// format. 
        fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
        # The endpoint of the OSS bucket. Replace <OSS_BUCKET> with the name of your OSS bucket. 
        # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. 
        fs.oss.endpoint: <OSS_ENDPOINT>
        # Retrieve the credentials of your OSS bucket from environment variables. 
        fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # You can access the OSS bucket by using the s3:// format. 
        fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # The endpoint of the OSS bucket. Replace <OSS_BUCKET> with the name of your OSS bucket. 
        # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. 
        fs.s3.endpoint: <OSS_ENDPOINT>
        # Retrieve the credentials of your OSS bucket from environment variables. 
        fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        # You can access the OSS bucket by using the s3a:// format. 
        fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem
        # The endpoint of the OSS bucket. Replace <OSS_BUCKET> with the name of your OSS bucket. 
        # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. 
        fs.s3a.endpoint: <OSS_ENDPOINT>
        # Retrieve the credentials of your OSS bucket from environment variables. 
        fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
    
        #===================
        # JindoFS configurations.
        #===================
        fs.xengine: jindofsx
        # The HCFS URL of the dataset. 
        fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434
        fs.jindofsx.data.cache.enable: "true"
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        envFrom:
        - secretRef:
            name: spark-oss-secret
        serviceAccount: spark-operator-spark
      executor:
        instances: 2
        cores: 2
        coreLimit: "2"
        memory: 8g
        envFrom:
        - secretRef:
            name: spark-oss-secret
      restartPolicy:
        type: Never
    Note

    The Spark image used by the preceding code block must include JindoSDK dependencies. You can use the following Dockerfile template to build a custom image and push the image to your image repository:

    ARG SPARK_IMAGE=spark:3.5.4
    
    FROM ${SPARK_IMAGE}
    
    # Add dependency for JindoSDK support
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jars
  3. Run the following command to submit a Spark job:

    kubectl create -f spark-pagerank-fluid-hcfs.yaml

    Expected output:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs create
  4. Run the following command to query the status of the Spark job:

    kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -w

    Expected output:

    NAME                        STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   9s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   15s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   RUNNING   1          2025-01-16T11:21:16Z   <no value>   77s
    spark-pagerank-fluid-hcfs   SUCCEEDING   1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s
    spark-pagerank-fluid-hcfs   COMPLETED    1          2025-01-16T11:21:16Z   2025-01-16T11:22:34Z   78s

Step 6: (Optional) Clear the environment

After you complete the preceding steps, you can run the following commands to delete the resources that are no longer required:

kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml