All Products
Search
Document Center

AnalyticDB for MySQL:Access Alibaba Cloud Elasticsearch

Last Updated:Mar 29, 2024

This topic describes how to use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to read Alibaba Cloud Elasticsearch data over an elastic network interface (ENI).

Prerequisites

Preparations

  1. Log on to the Elasticsearch console and go to the Basic Information page to obtain the vSwitch ID of the Elasticsearch cluster.

  2. Log on to the Elastic Compute Service (ECS) console and go to the Security Groups page to obtain the ID of the security group to which the Elasticsearch cluster is added. For more information, see Create a security group.

Use Scala to connect to Alibaba Cloud Elasticsearch

  1. Download the JAR package that corresponds to the Elasticsearch cluster version. For more information, see Elasticsearch Spark. In this example, the Elasticsearch-spark-30_2.12-7.17.9.jar package is downloaded.

  2. Add the following dependencies to the pom.xml file:

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    Important

    Make sure that the version of Elasticsearch-spark-30_2.12 in the pom.xml file is the same as that of the Elasticsearch cluster, and that the version of Spark-core_2.12 is the same as that of AnalyticDB for MySQL Spark.

  3. Write and package a program. In this example, the generated package is named spark-example.jar. Sample code:

    package org.example
    
    import org.apache.spark.sql.{SaveMode, SparkSession}
    
    object SparkEs {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().getOrCreate();
    
        // Generate a DataFrame.
        val columns = Seq("language","users_count")
        val data = Seq(("Java", "20000"), ("Python", "100000"), ("Scala", "3000"))
        val writeDF = spark.createDataFrame(data).toDF(columns:_*)
    
        // Write data to the Elasticsearch cluster.
        writeDF.write.format("es").mode(SaveMode.Overwrite)
        // Specify the private endpoint of the Elasticsearch cluster.
        .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
        // Specify the port number of the Elasticsearch cluster.
        .option("es.port", "9200")
        // Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic.
        .option("es.net.http.auth.user", "elastic")
        // Specify the password that corresponds to the username.
        .option("es.net.http.auth.pass", "password")
        // Set es.nodes.wan.only to true. 
        .option("es.nodes.wan.only", "true")
        // Set es.nodes.discovery to false.
        .option("es.nodes.discovery", "false")
        // Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format.
        .save("spark/_doc")
    
        // Read the data.
        spark.read.format("es")
        // Specify the private endpoint of the Elasticsearch cluster.
        .option("es.nodes", "es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com")
        // Specify the port number of the Elasticsearch cluster.
        .option("es.port", "9200")
        // Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic.
        .option("es.net.http.auth.user", "elastic")
        // Specify the password that corresponds to the username.
        .option("es.net.http.auth.pass", "password")
        // Set es.nodes.wan.only to true. 
        .option("es.nodes.wan.only", "true")
        // Set es.nodes.discovery to false.
        .option("es.nodes.discovery", "false")
        // Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format.
        .load("spark/_doc").show
      }
    }
  4. Upload the JAR package downloaded in Step 1 and the spark-example.jar program to OSS. For more information, see Upload objects.

  5. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.

  6. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  7. In the upper part of the editor, select a job resource group and a Spark application type. In this example, the Batch type is selected.

  8. Run the following code in the editor:

    {
    
        "name": "ES-SPARK-EXAMPLE",
        "className": "com.aliyun.spark.ReadES",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/spark-example.jar",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    The following table describes the parameters.

    Parameter

    Description

    name

    The name of the Spark job.

    className

    The entry class of the Java or Scala program. This parameter is not required for a Python program.

    conf

    The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the key:value format. Separate multiple parameters with commas (,). For information about the configuration parameters that are different from the configuration parameters of Apache Spark or the configuration parameters that are specific to AnalyticDB for MySQL, see Spark application configuration parameters.

    spark.adb.eni.enabled

    Specifies whether to enable ENI. You must enable ENI when you use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access Elasticsearch.

    spark.adb.eni.vswitchId

    The vSwitch ID of the Elasticsearch cluster. For information about how to obtain the vSwitch ID, see the "Preparations" section of this topic.

    spark.adb.eni.securityGroupId

    The ID of the security group to which the Elasticsearch cluster is added. For information about how to obtain the security group ID, see the "Preparations" section of this topic.

    file

    The OSS path of the spark-example.jar program.

    jars

    The OSS path of the JAR package on which the Spark job depends.

  9. Click Run Now.

Use PySpark to connect to Alibaba Cloud Elasticsearch

  1. Download the JAR package that corresponds to the Elasticsearch cluster version. For more information, see Elasticsearch Spark. In this example, the Elasticsearch-spark-30_2.12-7.17.9.jar package is downloaded.

  2. Add the following dependencies to the pom.xml file:

    <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-spark-30 -->
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-spark-30_2.12</artifactId>
        <version>7.17.9</version>
        <scope>provided</scope>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.2.0</version>
        <scope>provided</scope>
    </dependency>
    Important

    Make sure that the version of Elasticsearch-spark-30_2.12 in the pom.xml file is the same as that of the Elasticsearch cluster, and that the version of Spark-core_2.12 is the same as that of AnalyticDB for MySQL Spark.

  3. Write a program and name it as es-spark-example.py. Sample code:

    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        spark = SparkSession \
            .builder \
            .getOrCreate()
    
        # Generate a DataFrame.
        dept = [("Finance", 10),
                ("Marketing", 20),
                ("Sales", 30),
                ("IT", 40)
                ]
        deptColumns = ["dept_name", "dept_id"]
        deptDF = spark.createDataFrame(data=dept, schema=deptColumns)
        deptDF.printSchema()
        deptDF.show(truncate=False)
    
        # Write data to the Elasticsearch cluster.
        deptDF.write.format('es').mode("overwrite") \
            # Specify the private endpoint of the Elasticsearch cluster.
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            # Specify the port number of the Elasticsearch cluster.
            .option('es.port', '9200') \
            # Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic.
            .option('es.net.http.auth.user', 'elastic') \
            # Specify the password that corresponds to the username.
            .option('es.net.http.auth.pass', 'password') \
            # Set es.nodes.wan.only to true.
            .option("es.nodes.wan.only", "true") \
            # Set es.nodes.discovery to false.
            .option("es.nodes.discovery", "false") \
            # Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format.
            .save("spark/_doc")
    
        # Read the data.
        df = spark.read.format("es") \
            # Specify the private endpoint of the Elasticsearch cluster.
            .option('es.nodes', 'es-cn-nwy34drji0003****.elasticsearch.aliyuncs.com') \
            # Specify the port number of the Elasticsearch cluster.
            .option('es.port', '9200') \
            # Specify the username that is used to connect to the Elasticsearch cluster. It must be set to elastic.
            .option('es.net.http.auth.user', 'elastic') \
            # Specify the password that corresponds to the username.
            .option('es.net.http.auth.pass', 'password') \
            # Set es.nodes.wan.only to true. 
            .option("es.nodes.wan.only", "true") \
            # Set es.nodes.discovery to false.
            .option("es.nodes.discovery", "false") \
            # Specify the type of data that is read by Spark from the Elasticsearch cluster in the <index>/<type> format.
            .load("spark/_doc").show
                            
  4. Upload the JAR package downloaded in Step 1 and the es-spark-example.py program to OSS. For more information, see Upload objects.

  5. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. On the Data Lakehouse Edition (V3.0) tab, find the cluster that you want to manage and click the cluster ID.

  6. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  7. In the upper part of the editor, select a job resource group and a Spark application type. In this example, the Batch type is selected.

  8. Run the following code in the editor:

    {
        "name": "ES-SPARK-EXAMPLE",
        "conf": {
            "spark.driver.resourceSpec": "small",
            "spark.executor.instances": 1,
            "spark.executor.resourceSpec": "small",
            "spark.adb.eni.enabled": "true",
            "spark.adb.eni.vswitchId": "vsw-bp17jqw3lrrobn6y4****",
            "spark.adb.eni.securityGroupId": "sg-bp163uxgt4zandx1****"
        },
        "file": "oss://testBucketName/es-spark-example.py",
        "jars": "oss://testBucketName/Elasticsearch-spark-30_2.12-7.17.9.jar"
    }

    The following table describes the parameters.

    Parameter

    Description

    name

    The name of the Spark job.

    conf

    The configuration parameters that are required for the Spark job, which are similar to those of Apache Spark. The parameters must be in the key:value format. Separate multiple parameters with commas (,). For information about the configuration parameters that are different from the configuration parameters of Apache Spark or the configuration parameters that are specific to AnalyticDB for MySQL, see Spark application configuration parameters.

    spark.adb.eni.enabled

    Specifies whether to enable ENI. You must enable ENI when you use AnalyticDB for MySQL Data Lakehouse Edition (V3.0) Spark to access Elasticsearch.

    spark.adb.eni.vswitchId

    The vSwitch ID of the Elasticsearch cluster. For information about how to obtain the vSwitch ID, see the "Preparations" section of this topic.

    spark.adb.eni.securityGroupId

    The ID of the security group to which the Elasticsearch cluster is added. For information about how to obtain the security group ID, see the "Preparations" section of this topic.

    file

    The OSS path of the es-spark-example.py program.

    jars

    The OSS path of the JAR package on which the Spark job depends.

  9. Click Run Now.