全部产品
Search
文档中心

Elasticsearch:Gunakan ES-Hadoop untuk mengaktifkan Apache Spark menulis data ke dan membaca data dari Alibaba Cloud Elasticsearch

更新时间:Jul 06, 2025

Apache Spark adalah kerangka kerja tujuan umum untuk komputasi data besar yang menawarkan semua keunggulan Hadoop MapReduce. Perbedaannya terletak pada penyimpanan data di memori, memungkinkan iterasi cepat pada dataset besar. Dengan pendekatan ini, data dapat langsung dibaca dari cache alih-alih disk, sehingga memberikan kinerja pemrosesan lebih tinggi daripada MapReduce. Topik ini menjelaskan cara mengaktifkan Spark untuk menulis dan membaca data dari Alibaba Cloud Elasticsearch menggunakan Elasticsearch-Hadoop (ES-Hadoop).

Persiapan

  1. Buat kluster Alibaba Cloud Elasticsearch dan aktifkan fitur Auto Indexing.
    Untuk detail lebih lanjut, lihat Buat Kluster Alibaba Cloud Elasticsearch dan Akses dan Konfigurasikan Kluster Elasticsearch. Dalam topik ini, kluster Elasticsearch V6.7.0 dibuat.
    Penting Dalam lingkungan produksi, disarankan untuk menonaktifkan fitur Auto Indexing. Anda harus membuat indeks dan mengonfigurasi pemetaan terlebih dahulu. Kluster Elasticsearch dalam topik ini hanya digunakan untuk pengujian, sehingga fitur Auto Indexing diaktifkan.
  2. Buat kluster E-MapReduce (EMR) di virtual private cloud (VPC) tempat kluster Elasticsearch berada.

    Konfigurasi kluster EMR:

    • Versi EMR: Pilih EMR-3.29.0.

    • Layanan yang Diperlukan: Spark (2.4.5) adalah salah satu layanan yang diperlukan. Pengaturan default dipertahankan untuk layanan lainnya.

    Untuk informasi lebih lanjut, lihat Buat Kluster.

    Penting

    Secara default, 0.0.0.0/0 ditentukan dalam daftar putih alamat IP privat kluster Elasticsearch. Anda dapat melihat konfigurasi whitelist pada halaman konfigurasi keamanan kluster. Jika pengaturan default tidak digunakan, tambahkan alamat IP privat kluster EMR ke daftar putih.

  3. Persiapkan lingkungan Java. Versi JDK harus 1.8.0 atau lebih baru.

Kompilasi dan jalankan pekerjaan Spark

  1. Persiapkan data uji.

    1. Masuk ke Konsol E-MapReduce dan peroleh alamat IP node master kluster EMR. Kemudian, gunakan SSH untuk masuk ke instance Elastic Compute Service (ECS) yang ditunjukkan oleh alamat IP tersebut.

      Untuk informasi lebih lanjut, lihat Masuk ke Kluster.

    2. Tulis data uji ke file.

      Dalam contoh ini, data uji berikut dalam format JSON ditulis ke file http_log.txt:

      {"id": 1, "name": "zhangsan", "birth": "1990-01-01", "addr": "No.969, wenyixi Rd, yuhang, hangzhou"}
      {"id": 2, "name": "lisi", "birth": "1991-01-01", "addr": "No.556, xixi Rd, xihu, hangzhou"}
      {"id": 3, "name": "wangwu", "birth": "1992-01-01", "addr": "No.699 wangshang Rd, binjiang, hangzhou"}
    3. Jalankan perintah berikut untuk mengunggah file ke direktori /tmp/hadoop-es pada node master kluster EMR:

      hadoop fs -put http_log.txt /tmp/hadoop-es
  2. Tambahkan dependensi POM.

    Buat proyek Java Maven dan tambahkan dependensi POM berikut ke file pom.xml proyek:

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.11</artifactId>
            <version>6.7.0</version>
        </dependency>
    </dependencies>
    Penting

    Pastikan versi dependensi POM konsisten dengan versi layanan Alibaba Cloud terkait. Misalnya, versi elasticsearch-spark-20_2.11 konsisten dengan versi kluster Elasticsearch Anda, dan versi spark-core_2.12 konsisten dengan versi HDFS.

  3. Kompilasi kode.

    1. Tulis data

      Kode sampel berikut digunakan untuk menulis data uji ke indeks perusahaan kluster Elasticsearch:

      import java.util.Map;
      import java.util.concurrent.atomic.AtomicInteger;
      import org.apache.spark.SparkConf;
      import org.apache.spark.SparkContext;
      import org.apache.spark.api.java.JavaRDD;
      import org.apache.spark.api.java.function.Function;
      import org.apache.spark.sql.Row;
      import org.apache.spark.sql.SparkSession;
      
      import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
      import org.spark_project.guava.collect.ImmutableMap;
      public class SparkWriteEs {
          public static void main(String[] args) {
              SparkConf conf = new SparkConf();
              conf.setAppName("Es-write");
              conf.set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com");
              conf.set("es.net.http.auth.user", "elastic");
              conf.set("es.net.http.auth.pass", "xxxxxx");
              conf.set("es.nodes.wan.only", "true");
              conf.set("es.nodes.discovery","false");
              conf.set("es.input.use.sliced.partitions","false");
              SparkSession ss = new SparkSession(new SparkContext(conf));
              final AtomicInteger employeesNo = new AtomicInteger(0);
              //Ganti /tmp/hadoop-es/http_log.txt dengan path sebenarnya dari data uji Anda.
              JavaRDD<Map<Object, ?>> javaRDD = ss.read().text("/tmp/hadoop-es/http_log.txt")
                      .javaRDD().map((Function<Row, Map<Object, ?>>) row -> ImmutableMap.of("employees"   employeesNo.getAndAdd(1), row.mkString()));
              JavaEsSpark.saveToEs(javaRDD, "company/_doc");
          }
      }
    2. Baca data

      Kode sampel berikut digunakan untuk membaca dan menampilkan data uji yang ditulis ke kluster Elasticsearch:

      import org.apache.spark.SparkConf;
      import org.apache.spark.api.java.JavaPairRDD;
      import org.apache.spark.api.java.JavaSparkContext;
      import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
      import  java.util.Map;
      
      public class ReadES {
      
          public static void main(String[] args) {
      
              SparkConf  conf = new SparkConf().setAppName("readEs").setMaster("local[*]")
                      .set("es.nodes", "es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com")
                      .set("es.port", "9200")
                      .set("es.net.http.auth.user", "elastic")
                      .set("es.net.http.auth.pass", "xxxxxx")
                      .set("es.nodes.wan.only", "true")
                      .set("es.nodes.discovery","false")
                      .set("es.input.use.sliced.partitions","false")
                      .set("es.resource", "company/_doc")
                      .set("es.scroll.size","500");
      
              JavaSparkContext sc = new JavaSparkContext(conf);
      
              JavaPairRDD<String, Map<String, Object>> rdd = JavaEsSpark.esRDD(sc);
      
              for ( Map<String, Object> item : rdd.values().collect()) {
                  System.out.println(item);
              }
      
              sc.stop();
          }
      
      }

    Tabel 1. Parameter

    Parameter

    Nilai default

    Deskripsi

    es.nodes

    localhost

    Titik akhir yang digunakan untuk mengakses kluster Elasticsearch. Kami sarankan Anda menggunakan titik akhir internal. Anda dapat memperoleh titik akhir internal pada halaman Informasi Dasar kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Lihat informasi dasar kluster.

    es.port

    9200

    Nomor port yang digunakan untuk mengakses kluster Elasticsearch.

    es.net.http.auth.user

    elastic

    Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.

    Catatan

    Jika Anda menggunakan akun elastic untuk mengakses kluster Elasticsearch Anda dan mereset kata sandi akun tersebut, mungkin memerlukan waktu bagi kata sandi baru untuk berlaku. Selama periode ini, Anda tidak dapat menggunakan akun elastic untuk mengakses kluster. Oleh karena itu, kami sarankan Anda tidak menggunakan akun elastic untuk mengakses kluster Elasticsearch. Anda dapat masuk ke Konsol Kibana dan membuat pengguna dengan peran yang diperlukan untuk mengakses kluster Elasticsearch. Untuk informasi lebih lanjut, lihat Gunakan mekanisme RBAC yang disediakan oleh Elasticsearch X-Pack untuk mengimplementasikan kontrol akses.

    es.net.http.auth.pass

    /

    Kata sandi yang sesuai dengan nama pengguna elastic. Kata sandi ditentukan saat Anda membuat kluster Elasticsearch. Jika Anda lupa kata sandi, Anda dapat meresetnya. Untuk informasi lebih lanjut, lihat Reset kata sandi akses untuk kluster Elasticsearch.

    es.nodes.wan.only

    false

    Menentukan apakah akan mengaktifkan sniffing node ketika kluster Elasticsearch menggunakan alamat IP virtual untuk koneksi. Nilai valid:

    • true: menunjukkan bahwa sniffing node diaktifkan.

    • false: menunjukkan bahwa sniffing node dinonaktifkan.

    es.nodes.discovery

    true

    Menentukan apakah akan melarang mekanisme penemuan node. Nilai valid:

    • true: menunjukkan bahwa mekanisme penemuan node dilarang.

    • false: menunjukkan bahwa mekanisme penemuan node tidak dilarang.

    Penting

    Jika Anda menggunakan Alibaba Cloud Elasticsearch, Anda harus mengatur parameter ini ke false.

    es.input.use.sliced.partitions

    true

    Menentukan apakah akan menggunakan partisi. Nilai valid:

    • true: menggunakan partisi. Dalam hal ini, mungkin diperlukan lebih banyak waktu untuk fase pembacaan indeks awal. Waktu yang diperlukan untuk fase ini mungkin lebih lama daripada waktu yang diperlukan untuk kueri data. Untuk meningkatkan efisiensi kueri, kami sarankan Anda mengatur parameter ini ke false.

    • false: tidak menggunakan partisi.

    es.index.auto.create

    true

    Menentukan apakah sistem membuat indeks di kluster Elasticsearch ketika Anda menggunakan ES-Hadoop untuk menulis data ke kluster. Nilai valid:

    • true: menunjukkan bahwa sistem membuat indeks di kluster Elasticsearch.

    • false: menunjukkan bahwa sistem tidak membuat indeks di kluster Elasticsearch.

    es.resource

    /

    Nama dan tipe indeks tempat operasi baca atau tulis data dilakukan.

    es.mapping.names

    /

    Pemetaan antara nama bidang dalam tabel dan nama bidang dalam indeks kluster Elasticsearch.

    Untuk informasi lebih lanjut tentang item konfigurasi ES-Hadoop, lihat konfigurasi ES-Hadoop open source.

  4. Kompres kode menjadi paket JAR dan unggah ke klien EMR, seperti node master di kluster EMR atau kluster gateway yang terkait dengan kluster EMR ini.
  5. Di klien EMR, jalankan pekerjaan Spark berikut:

    • Tulis data

      cd /usr/lib/spark-current
      ./bin/spark-submit  --master yarn --executor-cores 1 --class "SparkWriteEs" /usr/local/spark_es.jar
      Penting

      Ganti /usr/local/spark_es.jar dengan path tempat Anda telah mengunggah paket JAR Anda.

    • Baca data

      cd /usr/lib/spark-current
      ./bin/spark-submit  --master yarn --executor-cores 1 --class "ReadES"  /usr/local/spark_es.jar

      Setelah data dibaca, hasil yang ditunjukkan pada gambar berikut dikembalikan.Returned result

Verifikasi hasil

  1. Masuk ke konsol Kibana kluster Elasticsearch.
    Untuk informasi lebih lanjut, lihat Masuk ke Konsol Kibana.
  2. Di panel navigasi sisi kiri, klik Dev Tools.
  3. Di tab Console halaman yang muncul, jalankan perintah berikut untuk menanyakan data yang ditulis oleh pekerjaan Spark:

    GET company/_search
    {
      "query": {
        "match_all": {}
      }
    }

    Jika perintah berhasil dijalankan, hasil yang ditunjukkan pada gambar berikut dikembalikan.Query result

Ringkasan

Topik ini menjelaskan cara menggunakan ES-Hadoop untuk menulis dan membaca data di Alibaba Cloud Elasticsearch dengan menjalankan pekerjaan Spark di kluster EMR. Setelah ES-Hadoop diintegrasikan dengan Spark, ES-Hadoop mendukung dataset Spark, resilient distributed datasets (RDD), Spark Streaming, Scala, dan Spark SQL. Anda dapat mengonfigurasi ES-Hadoop sesuai kebutuhan. Untuk informasi lebih lanjut, lihat Dukungan Apache Spark.