All Products
Search
Document Center

Data Lake Analytics - Deprecated:OSS

Last Updated:Feb 20, 2024

This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Object Storage Service (OSS). After DLA is authorized to access OSS, you can execute SQL statements or submit Spark jobs to access OSS.

Important

DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use AnalyticDB for MySQL to access OSS, see Access OSS.

Procedure

  1. Authorize DLA to access OSS.

    • If you use an Alibaba Cloud account, you have permissions to access all OSS data within your account and the OSS tables in DLA by default. You can directly access OSS without additional configurations.

    • If you want to access OSS and submit jobs as a Resource Access Management (RAM) user, you must grant the RAM user permissions to access OSS. For more information, see Grant permissions to a RAM user (detailed version).

    • If you use Spark SQL to access OSS tables in DLA, make sure that your RAM user is associated with a DLA sub-account and the DLA sub-account has permissions to access data in these tables. If your RAM user is not associated with a DLA sub-account, associate the RAM user with a DLA sub-account. For more information, see Bind a RAM user with a DLA sub-account. To grant permissions to or revoke permissions from a DLA sub-account, perform the following operations: Log on to the DLA console. In the left-side navigation pane, choose Serverless Presto > Execute. On the Execute page, execute the GRANT or REVOKE statement.

  2. Configure the spark.dla.connectors parameter.

    After DLA is authorized to access OSS, you can use the serverless Spark engine of DLA to access OSS data. Before you access OSS data, you must set the spark.dla.connectors parameter to oss in the configuration file of your Spark job. This is because the OSS access feature of DLA does not take effect by default. You must use this parameter to make the OSS access feature take effect. If you do not want to use this feature, you do not need to configure this parameter. You need to only submit your JAR file and add the required configurations.

  3. Access OSS data.

    You can use one of the following methods to access OSS data:

    • Execute Spark SQL statements to access OSS data. For more information, see Spark SQL. Sample statements in a Spark job:

      {
          "sqls": [
              "select * from `1k_tables`.`table0` limit 100",
              "insert into `1k_tables`.`table0` values(1, 'test')"
          ],
          "name": "sql oss test",
          "conf": {
              "spark.dla.connectors": "oss",
              "spark.driver.resourceSpec": "small",
              "spark.sql.hive.metastore.version": "dla",
              "spark.executor.instances": 10,
              "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
              "spark.executor.resourceSpec": "small"
          }
      }
    • Use the Java, Scala, or Python code to access OSS data. Sample Scala code:

      {  
        "args": ["oss://${oss-buck-name}/data/test/test.csv"],
        "name": "spark-oss-test",
        "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
        "className": "com.aliyun.spark.oss.SparkReadOss",
        "conf": {
          "spark.driver.resourceSpec": "medium",
          "spark.executor.resourceSpec": "medium",
          "spark.executor.instances": 2,
          "spark.dla.connectors": "oss"
        }
      }
      Note

      For more information about the source code of SparkReadOss, see DLA Spark OSS demo.

Enable the feature of optimizing OSS data write performance

If you use a self-managed Hive metastore or the metadata service of DLA to access OSS, the rename operation of the Spark HiveClient provided by the Apache Spark community is inefficient. To solve this issue, DLA optimizes the performance of OSS data write operations. To enable this feature, you need to only set the spark.sql.hive.dla.metastoreV2.enable parameter to true. Sample code:

{
  "args": ["oss://${oss-buck-name}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.WriteParquetFile",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.dla.connectors": "oss",
    "spark.sql.hive.dla.metastoreV2.enable": "true"
  }
}

Enable the feature of optimizing the data write performance of the OSS connector

When the serverless Spark engine writes data to OSS, a large number of OSS API operations are called. As a result, data write performance deteriorates. To solve this issue, DLA introduces the feature of optimizing the data write performance of the OSS connector. This feature is developed based on OSS multipart upload. In typical scenarios, the data write performance of the OSS connector can be improved by 1 to 3 times after you enable the feature.

To use this feature, you must enable the built-in OSS connector of the serverless Spark engine and enable the feature of optimizing data write performance. The following sample code provides and example on the configurations:

spark.dla.connectors = oss;  // Enable the built-in OSS connector of the serverless Spark engine. 
spark.hadoop.job.oss.fileoutputcommitter.enable = true; // Enable the feature of optimizing data write performance.

Note
  • If the feature of optimizing data write performance is enabled, some parts may not be cleared and still occupy your OSS bucket in some scenarios, such as the scenario in which a job is forcibly terminated. We recommend that you configure lifecycle rules for parts for the related OSS bucket. This way, OSS automatically deletes the parts that are not merged within a specific period of time from the bucket. We recommend that you set the expiration period to more than 3 days. For more information, see Lifecycle rules based on the last modified time.

  • This performance optimization feature does not take effect on regression discontinuity design (RDD) methods that are prefixed with saveAsHadoop or saveAsNewAPIHadoop.

Sample code:

{
  "args": ["oss://${oss-buck-name}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.WriteParquetFile",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.dla.connectors": "oss",
    "spark.hadoop.job.oss.fileoutputcommitter.enable": true
  }
}