This topic describes how to use Fluid to accelerate data access and how to use JindoRuntime to accelerate access to data stored in Object Storage Service (OSS). You can refer to this topic to improve the performance of data-intensive applications.
Prerequisites
The ack-spark-operator component is installed. For more information, see Step 1: Install the ack-spark-operator component.
NoteThe
spark.jobNamespaces=["spark"]setting is specified in the Spark job configurations in this topic. If you want to use a different namespace, you must modify thenamespaceparameter in the Spark job configurations.The cloud-native AI suite is installed and the ack-fluid component is deployed. For more information, see Install the cloud-native AI suite.
Test data is prepared and uploaded to an OSS bucket. For more information, see Prepare and upload test data to an OSS bucket.
Introduction to Fluid
Fluid is an open source, Kubernetes-native orchestrator and accelerator for distributed datasets. Fluid is developed for data-intensive applications in cloud-native scenarios, such as big data applications and AI applications. Key features of Fluid:
Fluid provides native support for dataset abstraction. This feature provides fundamental support for data-intensive applications, enables efficient data access, and improves the cost-effectiveness of data management in multiple aspects.
Fluid provides an extensible data engine plug-in with a unified interface for integration with third-party storage services. A variety of runtimes are supported.
Fluid automates data operations and supports multiple modes to integrate with automated O&M systems.
Fluid accelerates data access by combining the data caching technology with elastic scaling and data affinity-scheduling.
Fluid is independent of runtime platforms and supports Kubernetes clusters, Container Service for Kubernetes (ACK) Edge clusters, and ACK Serverless clusters. Fluid is also suitable for multi-cluster scenarios and hybrid cloud scenarios.
For more information about Fluid, see Elastic datasets.
Step 1: Create a dedicated node pool for Fluid
Create a dedicated node pool named fluid for Fluid in your ACK cluster. The node pool is used to deploy the JindoRuntime worker pods in Fluid. The node pool created in this example contains three nodes that are deployed on ecs.d1ne.4xlarge instances, which belong to the network-enhanced big data instance family. The fluid-cloudnative.github.io/node="true" label and the fluid-cloudnative.github.io/node="true":NoSchedule taint are added to each node. Each node is equipped with eight 5,905 GB high-throughput local SATA HDDs, which are formatted and attached to the following directories: /mnt/disk1, /mnt/disk2, ..., and /mnt/disk8. For more information about how to create a node pool, see Create and manage a node pool. For more information about how to select instance types for a node pool, see Best practices for the cache optimization policies of Fluid.
Step 2: Create a dataset
Create a Secret YAML file named
fluid-oss-secret.yamlto store the credentials used to access your OSS bucket.Replace
<ACCESS_KEY_ID>and<ACCESS_KEY_SECRET>with an AccessKey pair of your Alibaba Cloud account.apiVersion: v1 kind: Secret metadata: name: fluid-oss-secret namespace: spark stringData: OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
Run the following command to create a Secret:
kubectl create -f fluid-oss-secret.yamlExpected output:
secret/fluid-oss-secret created
Create a YAML file named
spark-fluid-dataset.yamland copy the following content to the file:apiVersion: data.fluid.io/v1alpha1 kind: Dataset metadata: name: spark namespace: spark spec: mounts: - name: spark # The OSS path for which you want to accelerate data access. Replace <OSS_BUCKET> with the name of your OSS bucket. mountPoint: oss://<OSS_BUCKET>/ path: / options: # The endpoint of the OSS bucket. Replace <OSS_ENDPOINT> with the endpoint of your OSS bucket. # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. fs.oss.endpoint: <OSS_ENDPOINT> encryptOptions: - name: fs.oss.accessKeyId valueFrom: secretKeyRef: name: fluid-oss-secret key: OSS_ACCESS_KEY_ID - name: fs.oss.accessKeySecret valueFrom: secretKeyRef: name: fluid-oss-secret key: OSS_ACCESS_KEY_SECRET # Data will be cached to nodes that match the following affinity rules. nodeAffinity: required: nodeSelectorTerms: - matchExpressions: - key: fluid-cloudnative.github.io/node operator: In values: - "true" # Tolerations for specific node taints. tolerations: - key: fluid-cloudnative.github.io/node operator: Equal value: "true" effect: NoScheduleThe following list describes the parameters in the preceding code block:
mountPoint: the OSS path for which you want to accelerate data access.fs.oss.endpoint: the endpoint of the OSS bucket. For example, the internal endpoint for OSS buckets in the China (Beijing) region isoss-cn-beijing-internal.aliyuncs.com.encryptOptions: retrieves the credentials used to access the OSS bucket from theOSS_ACCESS_KEY_IDandOSS_ACCESS_KEY_SECRETparameters of thefluid-oss-secretSecret.
Run the following command to create a dataset:
kubectl create -f spark-fluid-dataset.yamlExpected output:
dataset.data.fluid.io/spark createdRun the following command to query the status of the Dataset:
kubectl get -n spark dataset spark -o wideExpected output:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark NotBound 58mThe output shows that the dataset is in the
NotBoundstate.
Step 3: Create a JindoRuntime
Create a file named
spark-fluid-jindoruntime.yamland copy the following content to the file. The file is used to create a JindoRuntime.apiVersion: data.fluid.io/v1alpha1 kind: JindoRuntime metadata: # The name must be the same as the name of the dataset you created. name: spark namespace: spark spec: # The number of worker pods. replicas: 3 tieredstore: levels: # The cache type is HDD. - mediumtype: HDD # The type of the dataset. volumeType: hostPath # Set the value based on the number of disks provided by the node. path: /mnt/disk1,/mnt/disk2,/mnt/disk3,/mnt/disk4,/mnt/disk5,/mnt/disk6,/mnt/disk7,/mnt/disk8 # The cache capacity of each worker pod. quotaList: 5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi,5500Gi high: "0.99" low: "0.95" worker: resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56GiThe following list describes the parameters in the preceding code block:
replicas: the number of worker pods in the JindoFS cluster.mediumtype: the type of the cache.path: the storage path.quota: the maximum capacity of the cache system.high: the upper limit of the storage capacity.low: the lower limit of the storage capacity.
Run the following commands to create a JindoRuntime:
kubectl create -f spark-fluid-jindoruntime.yamlExpected output:
jindoruntime.data.fluid.io/spark createdRun the following command to query the status of the JindoRuntime:
kubectl get -n spark jindoruntime sparkExpected output:
NAME MASTER PHASE WORKER PHASE FUSE PHASE AGE spark Ready Ready Ready 2m28sIn the output,
Readyis displayed in theFUSE PHASEcolumn. This indicates that the JindoRuntime is deployed.Run the following command to query the status of the dataset:
kubectl get -n spark dataset spark -o wideExpected output:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark [Calculating] 0.00B 128.91TiB Bound spark-jindofs-master-0.spark:19434 [Calculating] 2m5The output shows that the dataset is in the
Boundstate. This indicates that the dataset is deployed.
Step 4: (Optional)Prefetch data
First-time queries cannot hit the cache. To accelerate first-time queries, Fluid provides the DataLoad resource that you can use to prefetch data. Data prefetching preloads data to the cache. This accelerates data access and improves data processing efficiency and system performance.
Create a file named
spark-fluid-dataload.yamland copy the following content to the file. The file is used to create a DataLoad.apiVersion: data.fluid.io/v1alpha1 kind: DataLoad metadata: name: spark namespace: spark spec: dataset: name: spark namespace: spark loadMetadata: trueRun the following command to create a DataLoad:
kubectl create -f spark-fluid-dataload.yamlExpected output:
dataload.data.fluid.io/spark createdRun the following command to query the data prefetching progress:
kubectl get -n spark dataload spark -wExpected output:
NAME DATASET PHASE AGE DURATION spark spark Executing 20s Unfinished spark spark Complete 9m31s 8m37s8m37sis displayed in the DURATION column of the output, which indicates that data prefetching is completed in 8 minutes and 37 seconds.Run the following command to query the status of the dataset:
kubectl get -n spark dataset spark -o wideExpected output:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark 0.00B 326.85GiB 128.91TiB 0.0% Bound spark-jindofs-master-0.spark:19434 [Calculating] 19m326.85GiBis displayed in the CACHED column of the output, which indicates that data is preloaded to the cache.0.00Bis displayed in the output of the previous query.
Step 5: Run a Spark job
Method 1: Use Portable Operating System Interface (POSIX) APIs
Create a file named
spark-pagerank-fluid-posix.yamland copy the following content to the file. The file is used to create a SparkApplication.apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-fluid-posix namespace: spark spec: type: Scala mode: cluster image: spark:3.5.4 mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # Access local files by using the file:// format. - file:///mnt/fluid/data/pagerank_dataset.txt - "10" sparkVersion: 3.5.4 driver: cores: 1 coreLimit: 1200m memory: 512m volumeMounts: # Mount the persistent volume claim (PVC) used by the dataset to the /mnt/fluid path. - name: spark mountPath: /mnt/fluid serviceAccount: spark-operator-spark executor: instances: 2 cores: 1 coreLimit: "1" memory: 4g volumeMounts: # Mount the PVC used by the dataset to the /mnt/fluid path. - name: spark mountPath: /mnt/fluid volumes: # Specify the PVC created by Fluid for the dataset. The PVC name is the same as the dataset name. - name: spark persistentVolumeClaim: claimName: spark restartPolicy: type: NeverNoteThe preceding sample code block uses an image provided by the Spark community. If you fail to pull the image due to network issues, we recommend that you synchronize the image to your image repository. You can also build a custom image and push the image to your image repository.
Run the following command to submit a job:
kubectl create -f spark-pagerank-fluid-posix.yamlExpected output:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-posix createdRun the following command to view the status of the Spark job:
kubectl get -n spark sparkapplication spark-pagerank-fluid-posix -wExpected output:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 87s spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 102s spark-pagerank-fluid-posix RUNNING 1 2025-01-16T11:06:15Z <no value> 102s spark-pagerank-fluid-posix SUCCEEDING 1 2025-01-16T11:06:15Z 2025-01-16T11:07:59Z 104s spark-pagerank-fluid-posix COMPLETED 1 2025-01-16T11:06:15Z 2025-01-16T11:07:59Z 104sThe output shows that the job is completed.
Method 2: Use Hadoop Compatible File System (HCFS) APIs
Run the following command to query the URL of the HCFS used by the dataset:
kubectl get -n spark dataset spark -o wideExpected output:
NAME UFS TOTAL SIZE CACHED CACHE CAPACITY CACHED PERCENTAGE PHASE HCFS URL TOTAL FILES CACHE HIT RATIO AGE spark 0.00B 326.85GiB 128.91TiB 0.0% Bound spark-jindofs-master-0.spark:19434 [Calculating] 30mThe output shows that the HCFS URL of the dataset is
spark-jindofs-master-0.spark:19434. When you configure the Spark job, you must set thefs.jindofsx.namespace.rpc.addressparameter to the HCFS URL.Create a file named
spark-pagerank-fluid-hcfs.yamland copy the following content to the file. The file is used to create a SparkApplication.apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-fluid-hcfs namespace: spark spec: type: Scala mode: cluster # Replace <SPARK_IMAGE> with the Spark image that you want to use. The image must include JindoSDK dependencies. image: <SPARK_IMAGE> mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # Select one of the following methods. Replace <OSS_BUCKET> with the name of your OSS bucket. # Method 1: Access the OSS bucket by using the oss:// format. - oss://<OSS_BUCKET>/data/pagerank_dataset.txt # Method 2: Access the OSS bucket by using the s3:// format. # - s3://<OSS_BUCKET>/data/pagerank_dataset.txt # Method 3: Access the OSS bucket by using the s3a:// format. # - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt # The number of iterations. - "10" sparkVersion: 3.5.4 hadoopConf: #=================== # OSS access configurations. #=================== # You can access the OSS bucket by using the oss:// format. fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem # The endpoint of the OSS bucket. Replace <OSS_BUCKET> with the name of your OSS bucket. # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. fs.oss.endpoint: <OSS_ENDPOINT> # Retrieve the credentials of your OSS bucket from environment variables. fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider # You can access the OSS bucket by using the s3:// format. fs.s3.impl: com.aliyun.jindodata.s3.JindoS3FileSystem # The endpoint of the OSS bucket. Replace <OSS_BUCKET> with the name of your OSS bucket. # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. fs.s3.endpoint: <OSS_ENDPOINT> # Retrieve the credentials of your OSS bucket from environment variables. fs.s3.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider # You can access the OSS bucket by using the s3a:// format. fs.s3a.impl: com.aliyun.jindodata.s3.JindoS3FileSystem # The endpoint of the OSS bucket. Replace <OSS_BUCKET> with the name of your OSS bucket. # For example, the internal endpoint for OSS buckets in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com. fs.s3a.endpoint: <OSS_ENDPOINT> # Retrieve the credentials of your OSS bucket from environment variables. fs.s3a.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider #=================== # JindoFS configurations. #=================== fs.xengine: jindofsx # The HCFS URL of the dataset. fs.jindofsx.namespace.rpc.address: spark-jindofs-master-0.spark:19434 fs.jindofsx.data.cache.enable: "true" driver: cores: 1 coreLimit: 1200m memory: 512m envFrom: - secretRef: name: spark-oss-secret serviceAccount: spark-operator-spark executor: instances: 2 cores: 2 coreLimit: "2" memory: 8g envFrom: - secretRef: name: spark-oss-secret restartPolicy: type: NeverNoteThe Spark image used by the preceding code block must include JindoSDK dependencies. You can use the following Dockerfile template to build a custom image and push the image to your image repository:
ARG SPARK_IMAGE=spark:3.5.4 FROM ${SPARK_IMAGE} # Add dependency for JindoSDK support ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.4.0/jindo-core-6.4.0.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.4.0/jindo-sdk-6.4.0.jar ${SPARK_HOME}/jarsRun the following command to submit a Spark job:
kubectl create -f spark-pagerank-fluid-hcfs.yamlExpected output:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-fluid-hcfs createRun the following command to query the status of the Spark job:
kubectl get -n spark sparkapplication spark-pagerank-fluid-hcfs -wExpected output:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 9s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 15s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 77s spark-pagerank-fluid-hcfs RUNNING 1 2025-01-16T11:21:16Z <no value> 77s spark-pagerank-fluid-hcfs SUCCEEDING 1 2025-01-16T11:21:16Z 2025-01-16T11:22:34Z 78s spark-pagerank-fluid-hcfs COMPLETED 1 2025-01-16T11:21:16Z 2025-01-16T11:22:34Z 78s
Step 6: (Optional) Clear the environment
After you complete the preceding steps, you can run the following commands to delete the resources that are no longer required:
kubectl delete -f spark-pagerank-fluid-posix.yaml
kubectl delete -f spark-pagerank-fluid-hcfs.yaml
kubectl delete -f spark-fluid-dataload.yaml
kubectl delete -f spark-fluid-jindoruntime.yaml
kubectl delete -f spark-fluid-dataset.yaml
kubectl delete -f fluid-oss-secret.yaml