This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Object Storage Service (OSS). After DLA is granted permissions to access OSS, you can execute SQL statements or submit Spark code to access OSS.

Procedure

  1. Grant DLA permissions to access OSS.
    • If you use an Alibaba Cloud account, you have permissions to access all OSS data within your account and the OSS tables in DLA by default. You can directly access OSS without additional configurations.
    • If you want to access OSS and submit jobs as a RAM user, you must grant the RAM user permissions to access OSS. For more information, see Grant permissions to a RAM user (detailed version).
    • If you use Spark SQL to access OSS tables in DLA, make sure that your RAM user is associated with a DLA sub-account and the DLA sub-account has permissions to access data in these tables. If your RAM user is not associated with a DLA sub-account, associate them. For more information about how to associate a RAM user with a DLA sub-account, see Bind a RAM user with a DLA sub-account. To grant or revoke OSS access permissions to or from a DLA sub-account, you can log on to the DLA console. In the left-side navigation pane, choose Serverless Presto > Execute and execute the GRANT or REVOKE statement.
  2. Configure spark.dla.connectors.
    After DLA is granted the required permissions, you can use the serverless Spark engine of DLA to access OSS data. Before you access OSS data, you must set spark.dla.connectors to oss in the configuration file of your Spark job. This is because the OSS access feature of DLA does not take effect by default. You must use this parameter to make the OSS access feature take effect. If you do not want to use this feature, this parameter is not required. You need only to submit your JAR file and add the required configurations.
  3. Access OSS data.
    You can use one of the following methods to access OSS data:
    • Execute Spark SQL statements to access OSS data. For more information, see Spark SQL. Sample statements in a Spark job:
      {
          "sqls": [
              "select * from `1k_tables`.`table0` limit 100",
              "insert into `1k_tables`.`table0` values(1, 'test')"
          ],
          "name": "sql oss test",
          "conf": {
              "spark.dla.connectors": "oss",
              "spark.driver.resourceSpec": "small",
              "spark.sql.hive.metastore.version": "dla",
              "spark.executor.instances": 10,
              "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
              "spark.executor.resourceSpec": "small"
          }
      }
    • Use the Java, Scala, or Python code to access OSS data. Sample Scala code:
      {  
        "args": ["oss://${oss-buck-name}/data/test/test.csv"],
        "name": "spark-oss-test",
        "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
        "className": "com.aliyun.spark.oss.SparkReadOss",
        "conf": {
          "spark.driver.resourceSpec": "medium",
          "spark.executor.resourceSpec": "medium",
          "spark.executor.instances": 2,
          "spark.dla.connectors": "oss"
        }
      }
      Note For the source code of com.aliyun.spark.oss.SparkReadOss, see DLA Spark OSS demo.

Enable OSS data write performance optimization

If you use a self-managed Hive metastore or the metadata service of DLA to access OSS, the rename operation of the Spark HiveClient provided by the Apache Spark community is inefficient. To address this issue, DLA optimizes the performance of OSS data write operations. To enable this optimization, you need only to set the spark.sql.hive.dla.metastoreV2.enable parameter to true. Sample code:

{
  "args": ["oss://${oss-buck-name}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.WriteParquetFile",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.dla.connectors": "oss",
    "spark.sql.hive.dla.metastoreV2.enable": "true"
  }
}

Enable data write performance optimization of the OSS connector

When the serverless Spark engine writes data to OSS, a large number of OSS API operations are called. As a result, data write performance deteriorates. To address this issue, DLA introduces the feature of optimizing the data write performance of the OSS connector. This feature is developed based on OSS multipart upload. In typical scenarios, the data write performance of the OSS connector can be improved by 1 to 3 times.

To use this feature, you must enable the built-in OSS connector of the serverless Spark engine and enable data write performance optimization. Detailed configurations:
spark.dla.connectors = oss;  // Enable the built-in OSS connector of the serverless Spark engine. 
spark.hadoop.job.oss.fileoutputcommitter.enable = true; // Enable data write performance optimization. 
Note
  • If data write performance optimization is enabled, some parts may not be cleared and still occupy your OSS bucket in some scenarios, for example, when a job is forcibly killed. We recommend that you configure lifecycle rules for parts for the related OSS bucket. This way, OSS automatically deletes the parts that are not merged within a specified period of time from the bucket. We recommend that you set the expiration period to more than 3 days. For more information, see Configure lifecycle rules.
  • This performance optimization feature does not take effect on regression discontinuity design (RDD) methods that are prefixed with saveAsHadoop or saveAsNewAPIHadoop.
Sample code:
{
  "args": ["oss://${oss-buck-name}/data/test/test.csv"],
  "name": "spark-oss-test",
  "file": "oss://${oss-buck-name}/jars/test/spark-examples-0.0.1-SNAPSHOT.jar",
  "className": "com.aliyun.spark.oss.WriteParquetFile",
  "conf": {
    "spark.driver.resourceSpec": "medium",
    "spark.executor.resourceSpec": "medium",
    "spark.executor.instances": 2,
    "spark.dla.connectors": "oss",
    "spark.hadoop.job.oss.fileoutputcommitter.enable": true
  }
}