本文介绍了如何使用DLA Spark访问阿里云Elasticsearch。
前提条件
- 创建了Spark虚拟集群。
- 创建了Elasticsearch实例,并获得了私网地址和用户名密码。具体操作请参见创建阿里云Elasticsearch实例。
- 配置了Elasticsearch实例的白名单。具体操作请参见配置ES白名单。
使用Java连接Elasticsearch
- 从Maven官方仓库下载Elasticsearch-spark-20_2.11对应版本的JAR包,本示例下载JAR包为Elasticsearch-spark-20_2.11-6.3.2.jar。
- 将从官方仓库下载的JAR包上传到一个与Elasticsearch同地域的OSS中,如何上传请参见控制台上传文件。
- 在本地开发工程中添加如下依赖。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.4.5</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.7.0</version> </dependency> </dependencies>
重要 请确保pom依赖中版本与云服务对应版本保持一致,例如Elasticsearch-spark-20_2.11版本与阿里云Elasticsearch版本一致;spark-core_2.11与Spark版本一致。 - 准备以下测试代码来连接Elasticsearch。您需要替换示例中的
es.nodes
为您的私网地址,es.net.http.auth.pass
为您自己创建Elasticsearch时设置的密码。import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import java.util.Map; public class ReadES { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("readEs").setMaster("local[*]") .set("es.nodes", "es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com") .set("es.port", "9200") .set("es.net.http.auth.user", "elastic") .set("es.net.http.auth.pass", "xxxxxx") .set("es.nodes.wan.only", "true") .set("es.nodes.discovery","false") .set("es.input.use.sliced.partitions","false") .set("es.resource", "index_name/_doc") .set("es.scroll.size","500"); JavaSparkContext sc = new JavaSparkContext(conf); JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc); for ( Map<String, Object> item : rdd.values().collect()) { System.out.println(item); } sc.stop(); } }
代码中出现的参数说明如下。参数名称 说明 es.nodes
Elasticsearch的Master角色节点的IP地址,至少需要填入一个Master角色的节点。使用阿里云Elasticsearch时,需填写私网地址。 es.port
Elasticsearch服务的端口号。 es.net.http.auth.user
Elasticsearch服务的用户名。阿里云Elasticsearch的用户名仅支持elastic。 es.net.http.auth.pass
Elasticsearch服务的密码。 es.nodes.wan.only
是否通过公网连接云端Elasticsearch服务。使用阿里云Elasticsearch时,必须配置为 true
。es.nodes.discovery
是否自动发现更多接入点。使用阿里云Elasticsearch时,必须配置为 false
,代表只是用es.nodes
中的接入点。es.resource
Spark读取的Elasticsearch的数据类型,格式为<index>/<type>。 重要 更多配置说明,请参见ES配置官方文档。 - 将测试代码打包成JAR包上传至您的OSS中,本示例JAR包命名为spark-es-examples-0.0.1-SNAPSHOT-shaded.jar。
- 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Elasticsearch实例所在地域。
- 单击左侧导航栏中 。
- 提交作业如下。您需要分别替换
file
和jars
中的JAR包为您自己测试代码和从官方下载的JAR包名称。{ "name": "es", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****", "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****" }, "file": "oss://{your bucket}/spark-es-examples-0.0.1-SNAPSHOT-shaded.jar", "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar" }
说明 代码中的参数说明请参见作业配置指南。
使用PySpark连接Elasticsearch
- 从Maven官方仓库下载Elasticsearch-spark-20_2.11对应版本的JAR包,本示例下载JAR包为Elasticsearch-spark-20_2.11-6.3.2.jar。
- 将从官方仓库下载的JAR包上传到一个与Elasticsearch同地域的OSS中,如何上传请参见控制台上传文件。
- 建立如下内容的
exmaple.py
。from pyspark import SparkContext, RDD spark = SparkSession \ .builder \ .getOrCreate() df = spark.read.format("org.elasticsearch.spark.sql")\ .option('es.nodes','es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com') \ .option('es.port','9200') \ .option('es.net.http.auth.user', 'xxx') \ .option('es.net.http.auth.pass', 'xxx') \ .option("es.nodes.wan.only", "true")\ .option("es.nodes.discovery", "false")\ .load("index_name/_doc").show
代码中出现的参数说明如下。参数名称 说明 es.nodes
Elasticsearch的Master角色节点的IP地址,至少需要填入一个Master角色的节点。使用阿里云Elasticsearch时,需填写私网地址。 es.port
Elasticsearch服务的端口号。 es.net.http.auth.user
Elasticsearch服务的用户名。阿里云Elasticsearch的用户名仅支持elastic。 es.net.http.auth.pass
Elasticsearch服务的密码。 es.nodes.wan.only
是否通过公网连接云端Elasticsearch服务。使用阿里云Elasticsearch时,必须配置为 true
。es.nodes.discovery
是否自动发现更多接入点。使用阿里云Elasticsearch时,必须配置为 false
,代表只是用es.nodes
中的接入点。es.resource
Spark读取的Elasticsearch的数据类型,格式为<index>/<type>。 重要 更多配置说明,请参见ES配置官方文档。 - 将
exmaple.py
文件上传到OSS中。 - 登录Data Lake Analytics管理控制台。
- 在页面左上角,选择Elasticsearch实例所在地域。
- 单击左侧导航栏中 。
- 提交作业如下。您需要替换
jars
中的JAR包为您从官方下载的JAR包名称。{ "name": "es", "className": "com.aliyun.spark.ReadES", "conf": { "spark.driver.resourceSpec": "medium", "spark.executor.instances": 5, "spark.executor.resourceSpec": "medium", "spark.dla.eni.enable": "true", "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****", "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****" }, "file": "oss://{your bucket}/example.py", "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar" }
说明 代码中的参数说明请参见作业配置指南。