This topic describes how to use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to read Alibaba Cloud Elasticsearch data over an elastic network interface (ENI).
Prerequisites
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is created. For more information, see Create a cluster.
A job resource group is created. For more information, see Create a resource group.
A database account is created.
If you use an Alibaba Cloud account, you must create a privileged database account. For more information, see the "Create a privileged account" section of the Create a database account topic.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user. For more information, see Create a database account and Associate or disassociate a database account with or from a RAM user.
An Alibaba Cloud Elasticsearch cluster is created. For more information, see Create an Alibaba Cloud Elasticsearch cluster.
The IP address of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster is added to a whitelist of the Elasticsearch cluster. For more information, see Configure a public or private IP address whitelist for an Elasticsearch cluster.
OSS is activated and a bucket is created in the same region as the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. For more information, see Activate OSS and Create a bucket.
Preparations
Log on to the Elasticsearch console and go to the Basic Information page to obtain the vSwitch ID of the Elasticsearch cluster.
Log on to the Elastic Compute Service (ECS) console and go to the Security Groups page to obtain the ID of the security group to which the Elasticsearch cluster is added. For more information, see Create a security group.
Use Scala to connect to Alibaba Cloud Elasticsearch
Download the JAR package that corresponds to the Elasticsearch cluster version. For more information, see Elasticsearch Spark. In this example, the Elasticsearch-spark-30_2.12-7.17.9.jar package is downloaded.
Add the following dependencies to the pom.xml file:
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
ImportantMake sure that the version of Elasticsearch-spark-30_2.12 in the pom.xml file is the same as that of the Elasticsearch cluster, and that the version of Spark-core_2.12 is the same as that of AnalyticDB for MySQL Spark.
Write and package a program. In this example, the generated package is named
spark-example.jar
. Sample code:package org.example import org.apache.spark.sql.{SaveMode, SparkSession} object SparkEs { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate(); // Generate a DataFrame. val columns = Seq("language","users_count") val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000")) val writeDF = spark.createDataFrame(data).toDF(columns:_*) // Write data to the Elasticsearch cluster. writeDF.write.format("es").mode(SaveMode.Overwrite) // Specify the private endpoint of the Elasticsearch cluster. .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // Specify the port number of the Elasticsearch cluster. .option("es.port", "9200") // Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic. .option("es.net.http.auth.user", "elastic") // Specify the password that corresponds to the username. .option("es.net.http.auth.pass", "password") // Set es.nodes.wan.only to true. .option("es.nodes.wan.only", "true") // Set es.nodes.discovery to false. .option("es.nodes.discovery", "false") // Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format. .save("spark/_doc") // Read the data. spark.read.format("es") // Specify the private endpoint of the Elasticsearch cluster. .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com") // Specify the port number of the Elasticsearch cluster. .option("es.port", "9200") // Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic. .option("es.net.http.auth.user", "elastic") // Specify the password that corresponds to the username. .option("es.net.http.auth.pass", "password") // Set es.nodes.wan.only to true. .option("es.nodes.wan.only", "true") // Set es.nodes.discovery to false. .option("es.nodes.discovery", "false") // Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format. .load("spark/_doc").show } }
Upload the JAR package downloaded in Step 1 and the
spark-example.jar
program to OSS. For more information, see Upload objects.Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose
.In the upper part of the editor, select a job resource group and a Spark application type. In this example, the Batch type is selected.
Run the following code in the editor:
{ "name": "ES-SPARK-EXAMPLE", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/spark-example.jar", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }
The following table describes the parameters.
Parameter
Description
name
The name of the Spark job.
className
The entry class of the Java or Scala program. This parameter is not required for a Python program.
conf
The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the
key:value
format. Separate multiple parameters with commas (,). For information about the configuration parameters that are different from the configuration parameters of Apache Spark or the configuration parameters that are specific to AnalyticDB for MySQL, see Spark application configuration parameters.spark.adb.eni.enabled
Specifies whether to enable ENI. You must enable ENI when you use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access Elasticsearch.
spark.adb.eni.vswitchId
The vSwitch ID of the Elasticsearch cluster. For information about how to obtain the vSwitch ID, see the "Preparations" section of this topic.
spark.adb.eni.securityGroupId
The ID of the security group to which the Elasticsearch cluster is added. For information about how to obtain the security group ID, see the "Preparations" section of this topic.
file
The OSS path of the
spark-example.jar
program.jars
The OSS path of the JAR package on which the Spark job depends.
Click Run Now.
Use PySpark to connect to Alibaba Cloud Elasticsearch
Download the JAR package that corresponds to the Elasticsearch cluster version. For more information, see Elasticsearch Spark. In this example, the Elasticsearch-spark-30_2.12-7.17.9.jar package is downloaded.
Add the following dependencies to the pom.xml file:
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-30_2.12</artifactId> <version>7.17.9</version> <scope>provided</scope> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.0</version> <scope>provided</scope> </dependency>
ImportantMake sure that the version of Elasticsearch-spark-30_2.12 in the pom.xml file is the same as that of the Elasticsearch cluster, and that the version of Spark-core_2.12 is the same as that of AnalyticDB for MySQL Spark.
Write a program and name it as
es-spark-example.py
. Sample code:from pyspark.sql import SparkSession if __name__ == '__main__': spark = SparkSession \ .builder \ .getOrCreate() # Generate a DataFrame. dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40) ] deptColumns = ["dept_name", "dept_id"] deptDF = spark.createDataFrame(data=dept, schema=deptColumns) deptDF.printSchema() deptDF.show(truncate=False) # Write data to the Elasticsearch cluster. deptDF.write.format('es').mode("overwrite") \ # Specify the private endpoint of the Elasticsearch cluster. .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ # Specify the port number of the Elasticsearch cluster. .option('es.port', '9200') \ # Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic. .option('es.net.http.auth.user', 'elastic') \ # Specify the password that corresponds to the username. .option('es.net.http.auth.pass', 'password') \ # Set es.nodes.wan.only to true. .option("es.nodes.wan.only", "true") \ # Set es.nodes.discovery to false. .option("es.nodes.discovery", "false") \ # Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format. .save("spark/_doc") # Read the data. df = spark.read.format("es") \ # Specify the private endpoint of the Elasticsearch cluster. .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \ # Specify the port number of the Elasticsearch cluster. .option('es.port', '9200') \ # Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic. .option('es.net.http.auth.user', 'elastic') \ # Specify the password that corresponds to the username. .option('es.net.http.auth.pass', 'password') \ # Set es.nodes.wan.only to true. .option("es.nodes.wan.only", "true") \ # Set es.nodes.discovery to false. .option("es.nodes.discovery", "false") \ # Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format. .load("spark/_doc").show
Upload the JAR package downloaded in Step 1 and the
es-spark-example.py
program to OSS. For more information, see Upload objects.Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.
In the left-side navigation pane, choose
.In the upper part of the editor, select a job resource group and a Spark application type. In this example, the Batch type is selected.
Run the following code in the editor:
{ "name": "ES-SPARK-EXAMPLE", "conf": { "spark.driver.resourceSpec": "small", "spark.executor.instances": 1, "spark.executor.resourceSpec": "small", "spark.adb.eni.enabled": "true", "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****", "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****" }, "file": "oss://testBucketName/es-spark-example.py", "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar" }
The following table describes the parameters.
Parameter
Description
name
The name of the Spark job.
conf
The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the
key:value
format. Separate multiple parameters with commas (,). For information about the configuration parameters that are different from the configuration parameters of Apache Spark or the configuration parameters that are specific to AnalyticDB for MySQL, see Spark application configuration parameters.spark.adb.eni.enabled
Specifies whether to enable ENI. You must enable ENI when you use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access Elasticsearch.
spark.adb.eni.vswitchId
The vSwitch ID of the Elasticsearch cluster. For information about how to obtain the vSwitch ID, see the "Preparations" section of this topic.
spark.adb.eni.securityGroupId
The ID of the security group to which the Elasticsearch cluster is added. For information about how to obtain the security group ID, see the "Preparations" section of this topic.
file
The OSS path of the
es-spark-example.py
program.jars
The OSS path of the JAR package on which the Spark job depends.
Click Run Now.