This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Alibaba Cloud Elasticsearch.

Prerequisites

Write Java code to access an Elasticsearch cluster

  1. Download the JAR file that corresponds to Elasticsearch-spark-20_2.11 from the Maven repository. In this topic, the Elasticsearch-spark-20_2.11-6.3.2.jar file is downloaded.
  2. Upload the JAR file that is downloaded from the Maven repository to Object Storage Service (OSS) that resides in the same region as the Elasticsearch cluster. For more information, see Upload objects.
  3. Add the following dependencies to the POM file in the downloaded JAR file.
     <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.Elasticsearch</groupId>
            <artifactId>Elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.0</version>
        </dependency>
    </dependencies>
    Notice Make sure that the versions of the dependencies in the POM file are consistent with the versions of the related Alibaba Cloud services. For example, the version of Elasticsearch-spark-20_2.11 is consistent with that of your Elasticsearch cluster, and the version of spark-core_2.11 is consistent with that of the serverless Spark engine.
  4. Write the following test code to access the Elasticsearch cluster. In the test code, replace the value of the es.nodes parameter with your internal endpoint and replace the value of the es.net.http.auth.pass parameter with the password that you configured when you create the Elasticsearch cluster.
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.Elasticsearch.spark.rdd.api.java.JavaEsSpark;
    import java.util.Map;
    
    public class ReadES {
    
        public static void main(String[] args) {
        
            SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
                    .set("es.nodes", "es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com")
                    .set("es.port", "9200")
                    .set("es.net.http.auth.user", "elastic")
                    .set("es.net.http.auth.pass", "xxxxxx")
                    .set("es.nodes.wan.only", "true")
                    .set("es.nodes.discovery","false")
                    .set("es.input.use.sliced.partitions","false")
                    .set("es.resource", "index_name/_doc")
                    .set("es.scroll.size","500");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
    
            for ( Map<String, Object> item : rdd.values().collect()) {
                System.out.println(item);
            }
    
            sc.stop();
        }
    
    }
    The following table describes the parameters in the test code.
    Parameter Description
    es.nodes The endpoints of the master nodes in the Elasticsearch cluster. You must enter at least the endpoint of a master node. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to the internal endpoint of the cluster.
    es.port The port number of the Elasticsearch cluster.
    es.net.http.auth.user The username that is used to access the Elasticsearch cluster. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to elastic.
    es.net.http.auth.pass The password that is used to access the Elasticsearch cluster.
    es.nodes.wan.only Specifies whether to access the Elasticsearch cluster over the Internet. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to true.
    es.nodes.discovery Specifies whether to automatically discover more endpoints. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to false. This indicates that only the endpoint specified in the es.nodes parameter is used.
    es.resource The type of data that the serverless Spark engine reads from the Elasticsearch cluster. The data type is in the <index>/<type> format.
    Notice For more information, see Configuration.
  5. Package the test code into a JAR file and upload the file to your OSS bucket. In this topic, the JAR file is named spark-es-examples-0.0.1-SNAPSHOT-shaded.jar.
  6. Log on to the DLA console.
  7. In the top navigation bar, select the region in which the Elasticsearch cluster resides.
  8. In the left-side navigation pane, choose Serverless Spark > Submit job.
  9. Submit a job. In the following job configurations, replace the value of file with the OSS directory in which the JAR file of the test code is stored and replace the value of jars with the OSS directory in which the JAR file downloaded from the Maven repository is stored.
    {
      
        "name": "es",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****",
            "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****"
        },
        "file": "oss://{your bucket}/spark-es-examples-0.0.1-SNAPSHOT-shaded.jar",
        "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar"
    }
    Note For more information about the parameters in the job configurations, see Configure a Spark job.

Use PySpark to access an Elasticsearch cluster

  1. Download the JAR file that corresponds to Elasticsearch-spark-20_2.11 from the Maven repository. In this topic, the Elasticsearch-spark-20_2.11-6.3.2.jar file is downloaded.
  2. Upload the JAR file that is downloaded from the Maven repository to Object Storage Service (OSS) that resides in the same region as the Elasticsearch cluster. For more information, see Upload objects.
  3. Create a file named exmaple.py. This file contains the following content:
    from pyspark import SparkContext, RDD
    spark = SparkSession \
        .builder \
        .getOrCreate()
    df = spark.read.format("org.Elasticsearch.spark.sql")\
        .option('es.nodes','es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com') \
        .option('es.port','9200') \
        .option('es.net.http.auth.user', 'xxx') \
        .option('es.net.http.auth.pass', 'xxx') \
        .option("es.nodes.wan.only", "true")\
        .option("es.nodes.discovery", "false")\
        .load("index_name/_doc").show
    The following table describes the parameters in the test code.
    Parameter Description
    es.nodes The endpoints of the master nodes in the Elasticsearch cluster. You must enter at least the endpoint of a master node. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to the internal endpoint of the cluster.
    es.port The port number of the Elasticsearch cluster.
    es.net.http.auth.user The username that is used to access the Elasticsearch cluster. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to elastic.
    es.net.http.auth.pass The password that is used to access the Elasticsearch cluster.
    es.nodes.wan.only Specifies whether to access the Elasticsearch cluster over the Internet. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to true.
    es.nodes.discovery Specifies whether to automatically discover more endpoints. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to false. This indicates that only the endpoint specified in the es.nodes parameter is used.
    es.resource The type of data that the serverless Spark engine reads from the Elasticsearch cluster. The data type is in the <index>/<type> format.
    Notice For more information, see Configuration.
  4. Upload the exmaple.py file to your OSS bucket.
  5. Log on to the DLA console.
  6. In the top navigation bar, select the region in which the Elasticsearch cluster resides.
  7. In the left-side navigation pane, choose Serverless Spark > Submit job.
  8. Submit a job. In the following job configurations, replace the value of the jars parameter with the OSS directory in which the JAR file downloaded from the Maven repository is stored.
    {
        "name": "es",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****",
            "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****"
        },
        "file": "oss://{your bucket}/example.py",
        "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar"
    }
    Note For more information about the parameters in the job configurations, see Configure a Spark job.