All Products
Search
Document Center

AnalyticDB:Access an Elasticsearch data source

Last Updated:Apr 02, 2026

Use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to read from and write to Alibaba Cloud Elasticsearch over an elastic network interface (ENI).

Prerequisites

Before you begin, make sure you have:

Collect network identifiers

ENI connectivity requires the vSwitch ID and security group ID of your Elasticsearch cluster.

  1. In the Elasticsearch console, go to the Basic Information page and note the vSwitch ID.

  2. In the Elastic Compute Service (ECS) console, go to the Security Groups page and note the security group ID associated with your Elasticsearch cluster. See Create a security group.

Connect using Scala

  1. Add the following dependencies to your pom.xml:

    Important

    The elasticsearch-spark-30_2.12 version must match your Elasticsearch cluster version. The spark-core_2.12 version must match your AnalyticDB for MySQL Spark version.

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
  2. Write and package your program. The following sample writes a DataFrame to Elasticsearch and reads it back. Package it as spark-example.jar.

    package org.example
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkEs {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().getOrCreate()
    
        // Write data to Elasticsearch.
        val columns = Seq("language", "users_count")
        val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
        val writeDF = spark.createDataFrame(data).toDF(columns: _*)
    
        writeDF.write.format("es").mode(SaveMode.Overwrite)
          .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
          .option("es.port", "9200")
          .option("es.net.http.auth.user", "elastic")
          .option("es.net.http.auth.pass", "password")
          .option("es.nodes.wan.only", "true")
          .option("es.nodes.discovery", "false")
          .save("spark/_doc")
    
        // Read data from Elasticsearch.
        spark.read.format("es")
          .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
          .option("es.port", "9200")
          .option("es.net.http.auth.user", "elastic")
          .option("es.net.http.auth.pass", "password")
          .option("es.nodes.wan.only", "true")
          .option("es.nodes.discovery", "false")
          .load("spark/_doc").show
      }
    }

    The parameters are as follows:

    Parameter name

    Description

    es.nodes

    The private endpoint of the Alibaba Cloud Elasticsearch instance. To obtain the endpoint, see View the basic information of an instance.

    es.port

    The private port number of the Alibaba Cloud Elasticsearch instance. To obtain the port number, see View the basic information of an instance.

    es.net.http.auth.user

    The username of the Alibaba Cloud Elasticsearch instance. The username must be set to elastic.

    es.net.http.auth.pass

    The password of the Alibaba Cloud Elasticsearch instance.

    es.nodes.wan.only

    Specifies whether to connect to the Alibaba Cloud Elasticsearch instance over the Internet. Valid values:

    • true: Connects over the Internet.

    • false (default): No

    Important

    This parameter must be set to true when you connect to an Alibaba Cloud Elasticsearch instance.

    es.nodes.discovery

    Specifies whether to automatically discover more endpoints. Valid values:

    • true (default): Enabled

    • false: No

    Important

    This parameter must be set to false when you connect to an Alibaba Cloud Elasticsearch instance. This indicates that only the endpoints specified in the es.nodes parameter are used.

    es.resource

    Spark reads data from an Alibaba Cloud Elasticsearch instance in the <index>/<type> format. For example, index_name/_doc.

    Note

    For more information about configuration parameters, see Configuration.

  3. Upload spark-example.jar and Elasticsearch-spark-30_2.12-7.17.9.jar to OSS. See Upload objects.

  4. Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, find your cluster, and click the cluster ID.

  5. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  6. In the upper part of the editor, select a job resource group and set the Spark application type to Batch.

  7. Run the following job configuration in the editor:

    Parameter Description
    name Name of the Spark job.
    className Entry class of the Java or Scala program. Not required for Python programs.
    conf Spark job configuration in key:value format, separated by commas. For parameters specific to AnalyticDB for MySQL, see Spark application configuration parameters.
    spark.adb.eni.enabled Whether to enable ENI. Set to true to route Spark traffic through the Elasticsearch cluster's VPC network, which is required for private connectivity.
    spark.adb.eni.vswitchId vSwitch ID of the Elasticsearch cluster, collected in Collect network identifiers.
    spark.adb.eni.securityGroupId Security group ID of the Elasticsearch cluster, collected in Collect network identifiers.
    file OSS path of the spark-example.jar program.
    jars OSS path of the connector JAR dependency.
    {
        "name": "ES-SPARK-EXAMPLE",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/spark-example.jar",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }
  8. Click Run Now.

Connect using PySpark

  1. Add the following dependencies to your pom.xml (same as the Scala workflow):

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
  2. Write a program named es-spark-example.py. The following sample writes a DataFrame to Elasticsearch and reads it back.

    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = SparkSession.builder.getOrCreate()
    
        # Write data to Elasticsearch.
        dept = [("Finance", 10), ("Marketing", 20), ("Sales", 30), ("IT", 40)]
        deptColumns = ["dept_name", "dept_id"]
        deptDF = spark.createDataFrame(data=dept, schema=deptColumns)
        deptDF.printSchema()
        deptDF.show(truncate=False)
    
        deptDF.write.format('es').mode("overwrite") \
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            .option('es.port', '9200') \
            .option('es.net.http.auth.user', 'elastic') \
            .option('es.net.http.auth.pass', 'password') \
            .option("es.nodes.wan.only", "true") \
            .option("es.nodes.discovery", "false") \
            .save("spark/_doc")
    
        # Read data from Elasticsearch.
        spark.read.format("es") \
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            .option('es.port', '9200') \
            .option('es.net.http.auth.user', 'elastic') \
            .option('es.net.http.auth.pass', 'password') \
            .option("es.nodes.wan.only", "true") \
            .option("es.nodes.discovery", "false") \
            .load("spark/_doc").show()

    The following table describes the parameters.

    Parameter

    Description

    es.nodes

    The private endpoint of the Alibaba Cloud Elasticsearch instance. To obtain the endpoint, see View the basic information of an instance.

    es.port

    The private port number of the Alibaba Cloud Elasticsearch instance. To obtain the port number, see View the basic information of an instance.

    es.net.http.auth.user

    The username of the Alibaba Cloud Elasticsearch instance. The username must be set to elastic.

    es.net.http.auth.pass

    The password of the Alibaba Cloud Elasticsearch instance.

    es.nodes.wan.only

    Specifies whether to connect to the Alibaba Cloud Elasticsearch instance over the Internet. Valid values:

    • true: Connects over the Internet.

    • false (default): The setting is disabled.

    Important

    This parameter must be set to true when you connect to an Alibaba Cloud Elasticsearch instance.

    es.nodes.discovery

    Specifies whether to automatically discover more endpoints. Valid values:

    • true (default): The option is enabled.

    • false: No

    Important

    This parameter must be set to false when you connect to an Alibaba Cloud Elasticsearch instance. This indicates that only the endpoints specified in the es.nodes parameter are used.

  3. Upload es-spark-example.py and Elasticsearch-spark-30_2.12-7.17.9.jar to OSS. See Upload objects.

  4. Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, find your cluster, and click the cluster ID.

  5. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  6. In the upper part of the editor, select a job resource group and set the Spark application type to Batch.

  7. Run the following job configuration in the editor:

    {
        "name": "ES-SPARK-EXAMPLE",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/es-spark-example.py",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    The parameters are the same as those in the Scala workflow, except className is not required for Python programs. See the parameter table in Connect using Scala.

  8. Click Run Now.

Connection parameters

Both workflows use the same Elasticsearch connector options. The following table describes the parameters set in the code samples.

Parameter Value Description
es.nodes Private endpoint of the Elasticsearch cluster The hostname used to reach the cluster. Use the private endpoint for VPC-internal access.
es.port 9200 The HTTP port of the Elasticsearch cluster.
es.net.http.auth.user elastic The username for authentication. Must be set to elastic.
es.net.http.auth.pass Your password The password for the elastic user.
es.nodes.wan.only true Disables node discovery and connects only through es.nodes. Required when Elasticsearch runs inside a VPC and its internal nodes are not directly reachable from the Spark cluster.
es.nodes.discovery false Prevents the connector from expanding the connection list by auto-discovering cluster nodes. Set to false alongside es.nodes.wan.only to ensure all traffic routes through the declared endpoint.
Index path <index>/<type> (for example, spark/_doc) The target index and document type for read and write operations.