本文介绍了如何使用DLA Spark访问阿里云Elasticsearch。

前提条件

使用JAVA连接Elasticsearch

  1. 从Maven官方仓库下载Elasticsearch-spark-20_2.11对应版本的JAR包,本示例下载JAR包为Elasticsearch-spark-20_2.11-6.3.2.jar。
  2. 将从官方仓库下载的JAR包上传到一个与Elasticsearch同地域的OSS中,如何上传请参见上传文件
  3. 在本地开发工程中添加如下依赖。
     <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.0</version>
        </dependency>
    </dependencies>
    注意 请确保pom依赖中版本与云服务对应版本保持一致,例如Elasticsearch-spark-20_2.11版本与阿里云Elasticsearch版本一致;spark-core_2.11与Spark版本一致。
  4. 准备以下测试代码来连接Elasticsearch。您需要替换示例中的es.nodes为您的私网地址,es.net.http.auth.pass为您自己创建Elasticsearch时设置的密码。
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.Elasticsearch.spark.rdd.api.java.JavaEsSpark;
    import java.util.Map;
    
    public class ReadES {
    
        public static void main(String[] args) {
        
            SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
                    .set("es.nodes", "es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com")
                    .set("es.port", "9200")
                    .set("es.net.http.auth.user", "elastic")
                    .set("es.net.http.auth.pass", "xxxxxx")
                    .set("es.nodes.wan.only", "true")
                    .set("es.nodes.discovery","false")
                    .set("es.input.use.sliced.partitions","false")
                    .set("es.resource", "index_name/_doc")
                    .set("es.scroll.size","500");
    
            JavaSparkContext sc = new JavaSparkContext(conf);
    
            JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
    
            for ( Map<String, Object> item : rdd.values().collect()) {
                System.out.println(item);
            }
    
            sc.stop();
        }
    
    }
    代码中出现的参数说明如下。
    参数名称 说明
    es.nodes Elasticsearch的Master角色节点的IP地址,至少需要填入一个Master角色的节点。使用阿里云Elasticsearch时,需填写私网地址。
    es.port Elasticsearch服务的端口号。
    es.net.http.auth.user Elasticsearch服务的用户名。阿里云Elasticsearch的用户名仅支持elastic。
    es.net.http.auth.pass Elasticsearch服务的密码。
    es.nodes.wan.only 是否通过公网连接云端Elasticsearch服务。使用阿里云Elasticsearch时,必须配置为true
    es.nodes.discovery 是否自动发现更多接入点。使用阿里云Elasticsearch时,必须配置为false,代表只是用es.nodes中的接入点。
    es.resource Spark读取的Elasticsearch的数据类型,格式为<index>/<type>。
    注意 更多配置说明,请参见ES配置官方文档
  5. 将测试代码打包成JAR包上传至您的OSS中,本示例JAR包命名为spark-es-examples-0.0.1-SNAPSHOT-shaded.jar。
  6. 登录Data Lake Analytics管理控制台
  7. 在页面左上角,选择Elasticsearch实例所在地域。
  8. 单击左侧导航栏中Serverless Spark > 作业管理
  9. 提交作业如下。您需要分别替换filejars中的JAR包为您自己测试代码和从官方下载的JAR包名称。
    {
      
        "name": "es",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****",
            "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****"
        },
        "file": "oss://{your bucket}/spark-es-examples-0.0.1-SNAPSHOT-shaded.jar",
        "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar"
    }
    说明 代码中的参数说明请参见作业配置指南

使用PySpark连接Elasticsearch

  1. 从Maven官方仓库下载Elasticsearch-spark-20_2.11对应版本的JAR包,本示例下载JAR包为Elasticsearch-spark-20_2.11-6.3.2.jar。
  2. 将从官方仓库下载的JAR包上传到一个与Elasticsearch同地域的OSS中,如何上传请参见上传文件
  3. 建立如下内容的exmaple.py
    from pyspark import SparkContext, RDD
    spark = SparkSession \
        .builder \
        .getOrCreate()
    df = spark.read.format("org.Elasticsearch.spark.sql")\
        .option('es.nodes','es-cn-n6w1o1x0w001c****.Elasticsearch.aliyuncs.com') \
        .option('es.port','9200') \
        .option('es.net.http.auth.user', 'xxx') \
        .option('es.net.http.auth.pass', 'xxx') \
        .option("es.nodes.wan.only", "true")\
        .option("es.nodes.discovery", "false")\
        .load("index_name/_doc").show
    代码中出现的参数说明如下。
    参数名称 说明
    es.nodes Elasticsearch的Master角色节点的IP地址,至少需要填入一个Master角色的节点。使用阿里云Elasticsearch时,需填写私网地址。
    es.port Elasticsearch服务的端口号。
    es.net.http.auth.user Elasticsearch服务的用户名。阿里云Elasticsearch的用户名仅支持elastic。
    es.net.http.auth.pass Elasticsearch服务的密码。
    es.nodes.wan.only 是否通过公网连接云端Elasticsearch服务。使用阿里云Elasticsearch时,必须配置为true
    es.nodes.discovery 是否自动发现更多接入点。使用阿里云Elasticsearch时,必须配置为false,代表只是用es.nodes中的接入点。
    es.resource Spark读取的Elasticsearch的数据类型,格式为<index>/<type>。
    注意 更多配置说明,请参见ES配置官方文档
  4. exmaple.py文件上传到OSS中。
  5. 登录Data Lake Analytics管理控制台
  6. 在页面左上角,选择Elasticsearch实例所在地域。
  7. 单击左侧导航栏中Serverless Spark > 作业管理
  8. 提交作业如下。您需要替换jars中的JAR包为您从官方下载的JAR包名称。
    {
        "name": "es",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 5,
            "spark.executor.resourceSpec": "medium",
            "spark.dla.eni.enable": "true",
            "spark.dla.eni.vswitch.id": "vsw-bp17jqw3lrrobn6y*****",
            "spark.dla.eni.security.group.id": "sg-bp163uxgt4zandx*****"
        },
        "file": "oss://{your bucket}/example.py",
        "jars": "oss://{your bucket}/Elasticsearch-spark-20_2.11-6.3.2.jar"
    }
    说明 代码中的参数说明请参见作业配置指南