This topic describes how to use the serverless Spark engine of Data Lake Analytics (DLA) to access Alibaba Cloud Elasticsearch.
DLA is discontinued. AnalyticDB for MySQL Data Lakehouse Edition supports the features of DLA and provides more features and better performance. For more information about how to access Alibaba Cloud Elasticsearch from AnalyticDB for MySQL Spark, see Access Alibaba Cloud Elasticsearch.
Prerequisites
A Spark virtual cluster is created.
An Alibaba Cloud Elasticsearch cluster is created. The internal endpoint of the Elasticsearch cluster and the username and password that are used to access the cluster are obtained. For more information, see Create an Alibaba Cloud Elasticsearch cluster.
An IP address whitelist is configured for the Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
Write Java code to access an Elasticsearch cluster
Download the JAR file of Elasticsearch-spark-20_2.11 from the Maven repository. In this example, the Elasticsearch-spark-20_2.11-6.3.2.jar file is downloaded.
Upload the JAR file that is downloaded from the Maven repository to an Object Storage Service (OSS) bucket that resides in the same region as your Elasticsearch cluster. For more information, see Upload objects.
Add the following dependencies to the pom.xml file in the downloaded JAR file:
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.7.0</version> </dependency> </dependencies>
ImportantMake sure that the versions of the dependencies in the pom.xml file are consistent with the versions of the related Alibaba Cloud services. For example, the version of Elasticsearch-spark-20_2.11 is consistent with that of your Elasticsearch cluster, and the version of spark-core_2.11 is consistent with that of the serverless Spark engine.
Write the following test code to access the Elasticsearch cluster. In the test code, replace the value of the
es.nodes
parameter with the internal endpoint of your Elasticsearch cluster and the value of thees.net.http.auth.pass
parameter with the password that you configured when you create the Elasticsearch cluster.import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import java.util.Map; public class ReadES { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[*]") .set("es.nodes", "es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com") .set("es.port", "9200") .set("es.net.http.auth.user", "elastic") .set("es.net.http.auth.pass", "xxxxxx") .set("es.nodes.wan.only", "true") .set("es.nodes.discovery","false") .set("es.input.use.sliced.partitions","false") .set("es.resource", "index_name/_doc") .set("es.scroll.size","500"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc); for ( Map<String, Object> item : rdd.values().collect()) { System.out.println(item); } sc.stop(); } }
The following table describes the parameters in the test code.
Parameter
Description
es.nodes
The endpoints of the master nodes in the Elasticsearch cluster. You must enter at least the endpoint of a master node. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to the internal endpoint of the cluster.
es.port
The port number of the Elasticsearch cluster.
es.net.http.auth.user
The username that is used to access the Elasticsearch cluster. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to elastic.
es.net.http.auth.pass
The password that is used to access the Elasticsearch cluster.
es.nodes.wan.only
Specifies whether to access the Elasticsearch cluster over the Internet. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to
true
.es.nodes.discovery
Specifies whether to automatically discover more endpoints. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to
false
. This indicates that only the endpoint specified by thees.nodes
parameter is used.es.resource
The type of data that the serverless Spark engine reads from the Elasticsearch cluster. The data type is in the <index>/<type> format.
ImportantFor more information, see official configuration documentation.
Package the test code into a JAR file and upload the file to your OSS bucket. In this example, the JAR file is named spark-es-examples-0.0.1-SNAPSHOT-shaded.jar.
Log on to the DLA console.
In the top navigation bar, select the region in which the Elasticsearch cluster resides.
In the left-side navigation pane, choose .
Submit a job. Replace the value of the
file
parameter with the OSS path in which the JAR file of the test code is stored and the value of thejars
parameter with the OSS path in which the JAR file downloaded from the Maven repository is stored.{ "name": "es", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****", "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****" }, "file": "oss://{your bucket}/spark-es-examples-0.0.1-SNAPSHOT-shaded.jar", "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar" }
NoteFor more information about the parameters used to configure the job, see Configure a Spark job.
Use PySpark to access an Elasticsearch cluster
Download the JAR file of Elasticsearch-spark-20_2.11 from the Maven repository. In this example, the Elasticsearch-spark-20_2.11-6.3.2.jar file is downloaded.
Upload the JAR file that is downloaded from the Maven repository to an Object Storage Service (OSS) bucket that resides in the same region as your Elasticsearch cluster. For more information, see Upload objects.
Create a file named
example.py
. This file contains the following test code:from pyspark import SparkContext, RDD spark = SparkSession \ .builder \ .getOrCreate() df = spark.read.format("org.elasticsearch.spark.sql")\ .option('es.nodes','es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com') \ .option('es.port','9200') \ .option('es.net.http.auth.user', 'xxx') \ .option('es.net.http.auth.pass', 'xxx') \ .option("es.nodes.wan.only", "true")\ .option("es.nodes.discovery", "false")\ .load("index_name/_doc").show
The following table describes the parameters in the test code.
Parameter
Description
es.nodes
The endpoints of the master nodes in the Elasticsearch cluster. You must enter at least the endpoint of a master node. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to the internal endpoint of the cluster.
es.port
The port number of the Elasticsearch cluster.
es.net.http.auth.user
The username that is used to access the Elasticsearch cluster. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to elastic.
es.net.http.auth.pass
The password that is used to access the Elasticsearch cluster.
es.nodes.wan.only
Specifies whether to access the Elasticsearch cluster over the Internet. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to
true
.es.nodes.discovery
Specifies whether to automatically discover more endpoints. If an Alibaba Cloud Elasticsearch cluster is deployed, set this parameter to
false
. This indicates that only the endpoint specified by thees.nodes
parameter is used.es.resource
The type of data that the serverless Spark engine reads from the Elasticsearch cluster. The data type is in the <index>/<type> format.
ImportantFor more information, see official configuration documentation.
Upload the
example.py
file to your OSS bucket.Log on to the DLA console.
In the top navigation bar, select the region in which the Elasticsearch cluster resides.
In the left-side navigation pane, choose .
Submit a job. Replace the value of the
jars
parameter with the OSS path in which the JAR file downloaded from the Maven repository is stored.{ "name": "es", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****", "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****" }, "file": "oss://{your bucket}/example.py", "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar" }
NoteFor more information about the parameters used to configure the job, see Configure a Spark job.