All Products
Search
Document Center

Data Lake Analytics - Deprecated:Access Alibaba Cloud Elasticsearch

Last Updated:Feb 19, 2024

This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Alibaba Cloud Elasticsearch.

Important

DLA is discontinued. AnalyticDB for MySQL Data Lakehouse Edition supports the features of DLA and provides more features and better performance. For more information about how to access Alibaba Cloud Elasticsearch from AnalyticDB for MySQL Spark, see Access Alibaba Cloud Elasticsearch.

Prerequisites

Write Java code to access an Elasticsearch cluster

  1. Download the JAR file of Elasticsearch-spark-20_2.11 from the Maven repository. In this example, the Elasticsearch-spark-20_2.11-6.3.2.jar file is downloaded.

  2. Upload the JAR file that is downloaded from the Maven repository to an Object Storage Service (OSS) bucket that resides in the same region as your Elasticsearch cluster. For more information, see Upload objects.

  3. Add the following dependencies to the pom.xml file in the downloaded JAR file:

     <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.0</version>
        </dependency>
    </dependencies>
    Important

    Make sure that the versions of the dependencies in the pom.xml file are consistent with the versions of the related Alibaba Cloud services. For example, the version of Elasticsearch-spark-20_2.11 is consistent with that of your Elasticsearch cluster, and the version of spark-core_2.11 is consistent with that of the serverless Spark engine.

  4. Write the following test code to access the Elasticsearch cluster. In the test code, replace the value of the es.nodes parameter with the internal endpoint of your Elasticsearch cluster and the value of the es.net.http.auth.pass parameter with the password that you configured when you create the Elasticsearch cluster.

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
    import java.util.Map;
    
    public class ReadES {
    
        public static void main(String[] args) {
        
            SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
                    .set("es.nodes", "es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com")
                    .set("es.port", "9200")
                    .set("es.net.http.auth.user", "elastic")
                    .set("es.net.http.auth.pass", "xxxxxx")
                    .set("es.nodes.wan.only", "true")
                    .set("es.nodes.discovery","false")
                    .set("es.input.use.sliced.partitions","false")
                    .set("es.resource", "index_name/_doc")
                    .set("es.scroll.size","500");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
    
            for ( Map<String, Object> item : rdd.values().collect()) {
                System.out.println(item);
            }
    
            sc.stop();
        }
    
    }

    The following table describes the parameters in the test code.

    Parameter

    Description

    es.nodes

    The endpoints of the master nodes in the Elasticsearch cluster. You must enter at least the endpoint of a master node. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to the internal endpoint of the cluster.

    es.port

    The port number of the Elasticsearch cluster.

    es.net.http.auth.user

    The username that is used to access the Elasticsearch cluster. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to elastic.

    es.net.http.auth.pass

    The password that is used to access the Elasticsearch cluster.

    es.nodes.wan.only

    Specifies whether to access the Elasticsearch cluster over the Internet. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to true.

    es.nodes.discovery

    Specifies whether to automatically discover more endpoints. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to false. This indicates that only the endpoint specified by the es.nodes parameter is used.

    es.resource

    The type of data that the serverless Spark engine reads from the Elasticsearch cluster. The data type is in the <index>/<type> format.

    Important

    For more information, see official configuration documentation.

  5. Package the test code into a JAR file and upload the file to your OSS bucket. In this example, the JAR file is named spark-es-examples-0.0.1-SNAPSHOT-shaded.jar.

  6. Log on to the DLA console.

  7. In the top navigation bar, select the region in which the Elasticsearch cluster resides.

  8. In the left-side navigation pane, choose Serverless Spark > Submit job.

  9. Submit a job. Replace the value of the file parameter with the OSS path in which the JAR file of the test code is stored and the value of the jars parameter with the OSS path in which the JAR file downloaded from the Maven repository is stored.

    {
      
        "name": "es",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****",
            "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****"
        },
        "file": "oss://{your bucket}/spark-es-examples-0.0.1-SNAPSHOT-shaded.jar",
        "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar"
    }
    Note

    For more information about the parameters used to configure the job, see Configure a Spark job.

Use PySpark to access an Elasticsearch cluster

  1. Download the JAR file of Elasticsearch-spark-20_2.11 from the Maven repository. In this example, the Elasticsearch-spark-20_2.11-6.3.2.jar file is downloaded.

  2. Upload the JAR file that is downloaded from the Maven repository to an Object Storage Service (OSS) bucket that resides in the same region as your Elasticsearch cluster. For more information, see Upload objects.

  3. Create a file named example.py. This file contains the following test code:

    from pyspark import SparkContext, RDD
    spark = SparkSession \
        .builder \
        .getOrCreate()
    df = spark.read.format("org.elasticsearch.spark.sql")\
        .option('es.nodes','es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com') \
        .option('es.port','9200') \
        .option('es.net.http.auth.user', 'xxx') \
        .option('es.net.http.auth.pass', 'xxx') \
        .option("es.nodes.wan.only", "true")\
        .option("es.nodes.discovery", "false")\
        .load("index_name/_doc").show

    The following table describes the parameters in the test code.

    Parameter

    Description

    es.nodes

    The endpoints of the master nodes in the Elasticsearch cluster. You must enter at least the endpoint of a master node. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to the internal endpoint of the cluster.

    es.port

    The port number of the Elasticsearch cluster.

    es.net.http.auth.user

    The username that is used to access the Elasticsearch cluster. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to elastic.

    es.net.http.auth.pass

    The password that is used to access the Elasticsearch cluster.

    es.nodes.wan.only

    Specifies whether to access the Elasticsearch cluster over the Internet. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to true.

    es.nodes.discovery

    Specifies whether to automatically discover more endpoints. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to false. This indicates that only the endpoint specified by the es.nodes parameter is used.

    es.resource

    The type of data that the serverless Spark engine reads from the Elasticsearch cluster. The data type is in the <index>/<type> format.

    Important

    For more information, see official configuration documentation.

  4. Upload the example.py file to your OSS bucket.

  5. Log on to the DLA console.

  6. In the top navigation bar, select the region in which the Elasticsearch cluster resides.

  7. In the left-side navigation pane, choose Serverless Spark > Submit job.

  8. Submit a job. Replace the value of the jars parameter with the OSS path in which the JAR file downloaded from the Maven repository is stored.

    {
        "name": "es",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****",
            "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****"
        },
        "file": "oss://{your bucket}/example.py",
        "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar"
    }
    Note

    For more information about the parameters used to configure the job, see Configure a Spark job.