Use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to read from and write to Alibaba Cloud Elasticsearch over an elastic network interface (ENI).
Prerequisites
Before you begin, make sure you have:
-
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. See Create a cluster.
-
A job resource group. See Create a resource group.
-
A database account:
-
Alibaba Cloud account: create a privileged account. See Create a privileged account.
-
Resource Access Management (RAM) user: create a privileged account and a standard account, then associate the standard account with the RAM user. See Create a database account and Associate or disassociate a database account with or from a RAM user.
-
-
An Alibaba Cloud Elasticsearch cluster. See Create an Alibaba Cloud Elasticsearch cluster.
-
The AnalyticDB for MySQL cluster IP address added to the Elasticsearch cluster whitelist. See Configure a public or private IP address whitelist for an Elasticsearch cluster.
-
Object Storage Service (OSS) activated with a bucket in the same region as your AnalyticDB for MySQL cluster. See Activate OSS and Create a bucket.
-
The connector JAR version matching your Elasticsearch cluster version, and the
spark-coreversion matching your AnalyticDB for MySQL Spark version. Download the JAR from Elasticsearch Spark (Maven). The examples in this topic useElasticsearch-spark-30_2.12-7.17.9.jar.
Collect network identifiers
ENI connectivity requires the vSwitch ID and security group ID of your Elasticsearch cluster.
-
In the Elasticsearch console, go to the Basic Information page and note the vSwitch ID.
-
In the Elastic Compute Service (ECS) console, go to the Security Groups page and note the security group ID associated with your Elasticsearch cluster. See Create a security group.
Connect using Scala
-
Add the following dependencies to your
pom.xml:ImportantThe
elasticsearch-spark-30_2.12version must match your Elasticsearch cluster version. Thespark-core_2.12version must match your AnalyticDB for MySQL Spark version.<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency> -
Write and package your program. The following sample writes a DataFrame to Elasticsearch and reads it back. Package it as
spark-example.jar.package org.example import org.apache.spark.sql.{SaveMode, SparkSession} object SparkEs { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Write data to Elasticsearch. val columns = Seq("language", "users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000")) val writeDF = spark.createDataFrame(data).toDF(columns: _*) writeDF.write.format("es").mode(SaveMode.Overwrite) .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") .option("es.port", "9200") .option("es.net.http.auth.user", "elastic") .option("es.net.http.auth.pass", "password") .option("es.nodes.wan.only", "true") .option("es.nodes.discovery", "false") .save("spark/_doc") // Read data from Elasticsearch. spark.read.format("es") .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") .option("es.port", "9200") .option("es.net.http.auth.user", "elastic") .option("es.net.http.auth.pass", "password") .option("es.nodes.wan.only", "true") .option("es.nodes.discovery", "false") .load("spark/_doc").show } }The parameters are as follows:
Parameter name
Description
es.nodes
The private endpoint of the Alibaba Cloud Elasticsearch instance. To obtain the endpoint, see View the basic information of an instance.
es.port
The private port number of the Alibaba Cloud Elasticsearch instance. To obtain the port number, see View the basic information of an instance.
es.net.http.auth.user
The username of the Alibaba Cloud Elasticsearch instance. The username must be set to
elastic.es.net.http.auth.pass
The password of the Alibaba Cloud Elasticsearch instance.
es.nodes.wan.only
Specifies whether to connect to the Alibaba Cloud Elasticsearch instance over the Internet. Valid values:
-
true: Connects over the Internet.
-
false (default): No
ImportantThis parameter must be set to true when you connect to an Alibaba Cloud Elasticsearch instance.
es.nodes.discovery
Specifies whether to automatically discover more endpoints. Valid values:
-
true (default): Enabled
-
false: No
ImportantThis parameter must be set to false when you connect to an Alibaba Cloud Elasticsearch instance. This indicates that only the endpoints specified in the es.nodes parameter are used.
es.resource
Spark reads data from an Alibaba Cloud Elasticsearch instance in the
<index>/<type>format. For example,index_name/_doc.NoteFor more information about configuration parameters, see Configuration.
-
-
Upload
spark-example.jarandElasticsearch-spark-30_2.12-7.17.9.jarto OSS. See Upload objects. -
Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, find your cluster, and click the cluster ID.
-
In the left-side navigation pane, choose Job Development > Spark JAR Development.
-
In the upper part of the editor, select a job resource group and set the Spark application type to Batch.
-
Run the following job configuration in the editor:
Parameter Description nameName of the Spark job. classNameEntry class of the Java or Scala program. Not required for Python programs. confSpark job configuration in key:valueformat, separated by commas. For parameters specific to AnalyticDB for MySQL, see Spark application configuration parameters.spark.adb.eni.enabledWhether to enable ENI. Set to trueto route Spark traffic through the Elasticsearch cluster's VPC network, which is required for private connectivity.spark.adb.eni.vswitchIdvSwitch ID of the Elasticsearch cluster, collected in Collect network identifiers. spark.adb.eni.securityGroupIdSecurity group ID of the Elasticsearch cluster, collected in Collect network identifiers. fileOSS path of the spark-example.jarprogram.jarsOSS path of the connector JAR dependency. { "name": "ES-SPARK-EXAMPLE", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/spark-example.jar", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" } -
Click Run Now.
Connect using PySpark
-
Add the following dependencies to your
pom.xml(same as the Scala workflow):<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency> -
Write a program named
es-spark-example.py. The following sample writes a DataFrame to Elasticsearch and reads it back.from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession.builder.getOrCreate() # Write data to Elasticsearch. dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40)] deptColumns = ["dept_name", "dept_id"] deptDF = spark.createDataFrame(data=dept, schema=deptColumns) deptDF.printSchema() deptDF.show(truncate=False) deptDF.write.format('es').mode("overwrite") \ .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ .option('es.port', '9200') \ .option('es.net.http.auth.user', 'elastic') \ .option('es.net.http.auth.pass', 'password') \ .option("es.nodes.wan.only", "true") \ .option("es.nodes.discovery", "false") \ .save("spark/_doc") # Read data from Elasticsearch. spark.read.format("es") \ .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ .option('es.port', '9200') \ .option('es.net.http.auth.user', 'elastic') \ .option('es.net.http.auth.pass', 'password') \ .option("es.nodes.wan.only", "true") \ .option("es.nodes.discovery", "false") \ .load("spark/_doc").show()The following table describes the parameters.
Parameter
Description
es.nodes
The private endpoint of the Alibaba Cloud Elasticsearch instance. To obtain the endpoint, see View the basic information of an instance.
es.port
The private port number of the Alibaba Cloud Elasticsearch instance. To obtain the port number, see View the basic information of an instance.
es.net.http.auth.user
The username of the Alibaba Cloud Elasticsearch instance. The username must be set to
elastic.es.net.http.auth.pass
The password of the Alibaba Cloud Elasticsearch instance.
es.nodes.wan.only
Specifies whether to connect to the Alibaba Cloud Elasticsearch instance over the Internet. Valid values:
-
true: Connects over the Internet.
-
false (default): The setting is disabled.
ImportantThis parameter must be set to true when you connect to an Alibaba Cloud Elasticsearch instance.
es.nodes.discovery
Specifies whether to automatically discover more endpoints. Valid values:
-
true (default): The option is enabled.
-
false: No
ImportantThis parameter must be set to false when you connect to an Alibaba Cloud Elasticsearch instance. This indicates that only the endpoints specified in the es.nodes parameter are used.
-
-
Upload
es-spark-example.pyandElasticsearch-spark-30_2.12-7.17.9.jarto OSS. See Upload objects. -
Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, find your cluster, and click the cluster ID.
-
In the left-side navigation pane, choose Job Development > Spark JAR Development.
-
In the upper part of the editor, select a job resource group and set the Spark application type to Batch.
-
Run the following job configuration in the editor:
{ "name": "ES-SPARK-EXAMPLE", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/es-spark-example.py", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }The parameters are the same as those in the Scala workflow, except
classNameis not required for Python programs. See the parameter table in Connect using Scala. -
Click Run Now.
Connection parameters
Both workflows use the same Elasticsearch connector options. The following table describes the parameters set in the code samples.
| Parameter | Value | Description |
|---|---|---|
es.nodes |
Private endpoint of the Elasticsearch cluster | The hostname used to reach the cluster. Use the private endpoint for VPC-internal access. |
es.port |
9200 |
The HTTP port of the Elasticsearch cluster. |
es.net.http.auth.user |
elastic |
The username for authentication. Must be set to elastic. |
es.net.http.auth.pass |
Your password | The password for the elastic user. |
es.nodes.wan.only |
true |
Disables node discovery and connects only through es.nodes. Required when Elasticsearch runs inside a VPC and its internal nodes are not directly reachable from the Spark cluster. |
es.nodes.discovery |
false |
Prevents the connector from expanding the connection list by auto-discovering cluster nodes. Set to false alongside es.nodes.wan.only to ensure all traffic routes through the declared endpoint. |
| Index path | <index>/<type> (for example, spark/_doc) |
The target index and document type for read and write operations. |