全部产品
Search
文档中心

Elasticsearch:Gunakan Real-time Computing untuk memproses dan menyinkronkan data ke ES

更新时间:Dec 07, 2025

Untuk membangun sistem pengambilan log, Anda dapat menggunakan Realtime Compute for Apache Flink untuk memproses data log dan menuliskannya ke Elasticsearch guna pencarian. Topik ini menggunakan Alibaba Cloud Simple Log Service (SLS) sebagai contoh untuk menjelaskan proses tersebut.

Prasyarat

Anda telah menyelesaikan tugas-tugas berikut:

Informasi latar belakang

Realtime Compute for Apache Flink adalah layanan berbasis Flink yang didukung resmi oleh Alibaba Cloud. Layanan ini mendukung berbagai sistem sumber dan sink, seperti Kafka dan Elasticsearch. Kombinasi Realtime Compute for Apache Flink dan Elasticsearch memenuhi persyaratan skenario pengambilan log khas.

Log dari sistem seperti Kafka atau Simple Log Service diproses oleh Flink menggunakan komputasi sederhana atau kompleks, lalu dituliskan ke Elasticsearch untuk pencarian. Dengan menggabungkan kemampuan komputasi Flink yang andal dan kemampuan pencarian Elasticsearch yang kuat, Anda dapat menerapkan transformasi data real-time dan kueri untuk bisnis Anda, sehingga memudahkan transisi ke layanan real-time.

Realtime Compute for Apache Flink menyediakan cara sederhana untuk terhubung ke Elasticsearch. Misalnya, jika log atau data bisnis Anda dituliskan ke Simple Log Service dan perlu diproses sebelum dituliskan ke Elasticsearch untuk pencarian, Anda dapat menggunakan pipeline seperti pada gambar berikut.Flink+ES data link

Prosedur

  1. Masuk ke Realtime Compute for Apache Flink console.

  2. Buat pekerjaan Realtime Compute for Apache Flink.

    Untuk informasi selengkapnya, lihat bagian Job Development > Development dari Blink SQL Development Guide dalam dokumen Alibaba Cloud Blink Exclusive Mode (Phased-Out for Alibaba Cloud) document.

  3. Tulis Flink SQL.

    1. Buat tabel sumber untuk Layanan Log Sederhana.

      create table sls_stream(
        a int,
        b int,
        c VARCHAR
      )
      WITH (
        type ='sls',  
        endPoint ='<yourEndpoint>',
        accessId ='<yourAccessId>',
        accessKey ='<yourAccessKey>',
        startTime = '<yourStartTime>',
        project ='<yourProjectName>',
        logStore ='<yourLogStoreName>',
        consumerGroup ='<yourConsumerGroupName>'
      );

      Tabel berikut menjelaskan parameter dalam klausa WITH.

      Variable

      Deskripsi

      endPoint

      Titik akhir publik Alibaba Cloud Simple Log Service, yaitu URL untuk mengakses proyek yang sesuai dan data log-nya. Untuk informasi selengkapnya, lihat Endpoints.

      Misalnya, titik akhir Simple Log Service di wilayah China (Hangzhou) adalah http://cn-hangzhou.log.aliyuncs.com. Titik akhir harus diawali dengan http://.

      accessId

      ID AccessKey Anda.

      accessKey

      Rahasia AccessKey Anda.

      startTime

      Waktu mulai mengonsumsi log. Saat menjalankan pekerjaan Flink, waktu yang dipilih harus lebih baru daripada waktu yang ditetapkan di sini.

      project

      Nama proyek Layanan Log Sederhana.

      logStore

      Nama penyimpanan log di proyek.

      consumerGroup

      Nama kelompok konsumen untuk Simple Log Service.

    2. Buat tabel sink Elasticsearch.

      Penting
      • Realtime Compute for Apache Flink versi 3.2.2 dan yang lebih baru mendukung tabel sink Elasticsearch. Saat membuat pekerjaan Flink, pilih versi yang didukung.

      • Tabel sink Elasticsearch diimplementasikan menggunakan REST API dan kompatibel dengan semua versi Elasticsearch.

      CREATE TABLE es_stream_sink(
        a int,
        cnt BIGINT,
        PRIMARY KEY(a)
      )
      WITH(
        type ='elasticsearch-7',
        endPoint = 'http://<instanceid>.public.elasticsearch.aliyuncs.com:<port>',
        accessId = '<yourAccessId>',
        accessKey = '<yourAccessSecret>',
        index = '<yourIndex>',
        typeName = '<yourTypeName>'
      );

      Tabel berikut menjelaskan parameter dalam klausa WITH.

      Parameter

      Deskripsi

      endPoint

      Titik akhir publik instans Alibaba Cloud Elasticsearch Anda, dalam format http://<instanceid>.public.elasticsearch.aliyuncs.com:9200. Anda dapat memperolehnya dari halaman informasi dasar instans. Untuk informasi selengkapnya, lihat Lihat informasi dasar instans.

      accessId

      Username untuk mengakses instans Alibaba Cloud Elasticsearch. Nilai default-nya adalah elastic.

      accessKey

      Password untuk pengguna tersebut. Password untuk pengguna elastic ditetapkan saat Anda membuat instans. Jika lupa password, Anda dapat mengatur ulang. Untuk informasi tentang langkah-langkah dan hal yang perlu diperhatikan saat mengatur ulang password, lihat Atur ulang password akses instans.

      index

      Nama indeks. Jika Anda belum membuat indeks, buat terlebih dahulu. Untuk informasi selengkapnya, lihat Panduan pemula: Dari pembuatan instans hingga pengambilan data. Anda juga dapat mengaktifkan pembuatan indeks otomatis. Untuk informasi selengkapnya, lihat Konfigurasi parameter YML.

      typeName

      Tipe indeks. Untuk instans Elasticsearch versi 7.0 atau lebih baru, nilai ini harus _doc.

      Catatan
      • Elasticsearch mendukung pembaruan dokumen menggunakan primary key. Hanya satu field yang dapat ditentukan sebagai PRIMARY KEY. Setelah Anda menentukan PRIMARY KEY, nilai field PRIMARY KEY digunakan sebagai ID dokumen. Jika PRIMARY KEY tidak ditentukan, sistem akan menghasilkan ID dokumen secara acak. Untuk informasi selengkapnya, lihat Index API.

      • Elasticsearch mendukung beberapa mode pembaruan. Anda dapat menentukan mode tersebut menggunakan parameter updateMode dalam klausa WITH:

        • Jika updateMode=full, dokumen baru sepenuhnya menimpa dokumen yang ada.

        • Jika updateMode=inc, Elasticsearch memperbarui field yang sesuai berdasarkan nilai input field tersebut.

      • Semua pembaruan di Elasticsearch secara default menggunakan semantik UPSERT, artinya INSERT atau UPDATE.

    3. Tulis logika bisnis untuk memproses dan menyinkronkan data.

      INSERT INTO es_stream_sink
      SELECT 
        a,
        count(*) as cnt
      FROM sls_stream GROUP BY a
  4. Publikasikan dan mulai pekerjaan.

    Setelah Anda mempublikasikan dan menjalankan pekerjaan, data dari Simple Log Service akan diagregasi lalu dituliskan ke Alibaba Cloud Elasticsearch.

Informasi tambahan

Anda dapat menggunakan Realtime Compute for Apache Flink bersama Elasticsearch untuk dengan cepat membangun pipeline pencarian real-time. Jika Anda memiliki persyaratan yang lebih kompleks untuk menuliskan data ke Elasticsearch, Anda dapat menggunakan fitur sink kustom dari Realtime Compute for Apache Flink.