全部产品
Search
文档中心

Realtime Compute for Apache Flink:Elasticsearch

更新时间:Jan 06, 2026

Topik ini menjelaskan cara menggunakan konektor Elasticsearch.

Informasi latar belakang

Alibaba Cloud Elasticsearch kompatibel dengan fitur Elasticsearch open source, seperti Security, Machine Learning, Graph, dan application performance management (APM). Layanan ini cocok untuk analitik data, pencarian data, dan skenario lainnya, serta menyediakan layanan tingkat enterprise seperti kontrol akses, pemantauan keamanan dan peringatan, serta pembuatan laporan otomatis.

Konektor Elasticsearch mendukung hal-hal berikut:

Kategori

Deskripsi

Tipe yang didukung

Tabel sumber, tabel dimensi, dan tabel sink

Running mode

Mode batch dan streaming

Format data

JSON

Metrik pemantauan spesifik

  • Tabel sumber

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • Tabel dimensi

    Tidak ada

  • Tabel sink (untuk Ververica Runtime (VVR) 6.0.6 atau versi lebih baru)

    • numRecordsOut

    • numRecordsOutPerSecond

Catatan

Untuk informasi selengkapnya tentang metrik tersebut, lihat Metrik.

Tipe API

DataStream dan SQL

Pembaruan atau penghapusan data pada tabel sink

Ya

Prasyarat

Batasan

  • Tabel sumber dan dimensi mendukung Elasticsearch 6.8.x atau versi lebih baru, tetapi tidak mendukung 8.x atau versi lebih baru.

  • Tabel sink hanya mendukung Elasticsearch 6.x, 7.x, dan 8.x.

  • Hanya tabel sumber Elasticsearch lengkap yang didukung. Tabel sumber inkremental tidak didukung.

Sintaksis

  • Tabel sumber

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • Tabel dimensi

    CREATE TABLE es_dim(
      field1 STRING, -- Bidang ini digunakan sebagai kunci untuk operasi JOIN dan harus bertipe STRING.
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    Catatan
    • Jika Anda menentukan primary key, Anda hanya dapat menggunakan satu bidang kunci untuk operasi JOIN pada tabel dimensi. Kunci ini harus merupakan ID dokumen di indeks Elasticsearch yang sesuai.

    • Jika Anda tidak menentukan primary key, Anda dapat menggunakan satu atau beberapa bidang kunci untuk operasi JOIN pada tabel dimensi. Kunci-kunci ini harus merupakan bidang dalam dokumen di indeks Elasticsearch yang sesuai.

    • Untuk tipe String, akhiran .keyword ditambahkan ke nama bidang secara default untuk memastikan kompatibilitas. Jika hal ini menghambat pencocokan dengan bidang Text di Elasticsearch, Anda dapat mengatur parameter ignoreKeywordSuffix ke true.

  • Tabel sink

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7', -- Jika Anda menggunakan Elasticsearch 6.x, atur parameter ini ke elasticsearch-6.
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    Catatan
    • Tabel sink Elasticsearch bekerja dalam mode upsert atau append, tergantung pada apakah primary key didefinisikan.

      • Jika primary key didefinisikan, primary key tersebut harus menjadi ID dokumen. Tabel sink Elasticsearch bekerja dalam mode upsert, yang dapat memproses pesan UPDATE dan DELETE.

      • Jika tidak ada primary key yang didefinisikan, Elasticsearch akan menghasilkan ID dokumen acak secara otomatis. Tabel sink Elasticsearch bekerja dalam mode append, yang hanya dapat mengonsumsi pesan INSERT.

    • Beberapa tipe data, seperti BYTES, ROW, ARRAY, dan MAP, tidak memiliki representasi string yang sesuai. Oleh karena itu, bidang-bidang dengan tipe tersebut tidak dapat digunakan sebagai bidang primary key.

    • Bidang-bidang dalam pernyataan DDL berkorespondensi dengan bidang-bidang dalam dokumen Elasticsearch. Anda tidak dapat menulis metadata, seperti ID dokumen, ke tabel sink Elasticsearch karena metadata tersebut dikelola oleh instans Elasticsearch.

Parameter WITH

Tabel sumber

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Tipe tabel sumber.

String

Ya

Tidak ada

Bidang statis adalah Elasticsearch.

endPoint

Alamat server.

String

Ya

Tidak ada

Contoh: http://127.0.0.1:XXXX.

indexName

Nama indeks.

String

Ya

Tidak ada

Tidak ada.

accessId

Username untuk instans Elasticsearch.

String

Tidak

Tidak ada

Nilai default-nya kosong, yang berarti autentikasi tidak dilakukan. Jika Anda menentukan accessId, Anda juga harus menentukan accessKey yang tidak kosong.

Penting

Untuk mencegah kebocoran username dan password Anda, gunakan variabel. Untuk informasi selengkapnya, lihat Variabel proyek.

accessKey

Password untuk instans Elasticsearch.

String

Tidak

Tidak ada

typeNames

Nama tipe.

String

Tidak

_doc

Jangan atur parameter ini untuk Elasticsearch 7.0 atau versi lebih baru.

batchSize

Jumlah maksimum dokumen yang diambil dari kluster Elasticsearch untuk setiap permintaan scroll.

Int

Tidak

2000

Tidak ada.

keepScrollAliveSecs

Waktu maksimum untuk menjaga konteks scroll tetap aktif.

Int

Tidak

3600

Unitnya adalah detik.

Tabel sink

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Tipe tabel sink.

String

Ya

Tidak ada

Nilai yang valid adalah elasticsearch-6, elasticsearch-7, dan elasticsearch-8.

Catatan

Anda hanya dapat mengatur parameter ini ke elasticsearch-8 pada VVR 8.0.5 atau versi lebih baru.

hosts

Alamat server.

String

Ya

Tidak ada

Contoh: 127.0.0.1:XXXX.

index

Nama indeks.

String

Ya

Tidak ada

Tabel sink Elasticsearch mendukung indeks statis maupun dinamis. Perhatikan poin-poin berikut saat menggunakan indeks statis dan dinamis:

  • Jika Anda menggunakan indeks statis, nilai parameter index harus berupa string biasa, seperti myusers. Semua catatan ditulis ke indeks myusers.

  • Jika Anda menggunakan indeks dinamis, Anda dapat menggunakan {field_name} untuk mereferensikan nilai bidang dalam catatan guna menghasilkan indeks tujuan secara dinamis. Anda juga dapat menggunakan {field_name|date_format_string} untuk mengonversi nilai bidang bertipe TIMESTAMP, DATE, dan TIME ke format yang ditentukan oleh date_format_string. date_format_string kompatibel dengan DateTimeFormatter Java. Misalnya, jika Anda mengatur parameter ke myusers-{log_ts|yyyy-MM-dd}, catatan dengan nilai bidang log_ts sebesar 2020-03-27 12:25:55 akan ditulis ke indeks myusers-2020-03-27.

document-type

Tipe dokumen.

String

  • elasticsearch-6: Wajib

  • elasticsearch-7: Tidak didukung

Tidak ada

Jika parameter connector diatur ke elasticsearch-6, nilai parameter ini harus sama dengan nilai parameter type di sisi Elasticsearch.

username

Username.

String

Tidak

Kosong

Nilai default-nya kosong, yang berarti autentikasi tidak dilakukan. Jika Anda menentukan username, Anda juga harus menentukan password yang tidak kosong.

Penting

Untuk mencegah kebocoran username dan password Anda, gunakan variabel. Untuk informasi selengkapnya, lihat Variabel proyek.

password

Password.

String

Tidak

Kosong

document-id.key-delimiter

Pemisah untuk ID dokumen.

String

Tidak

_

Pada tabel sink Elasticsearch, primary key digunakan untuk menghitung ID dokumen Elasticsearch. Tabel sink Elasticsearch menghasilkan string ID dokumen untuk setiap baris dengan menggabungkan semua bidang primary key sesuai urutan yang ditentukan dalam DDL, menggunakan pemisah kunci yang ditentukan oleh document-id.key-delimiter.

Catatan

ID dokumen adalah string hingga 512 byte yang tidak mengandung spasi.

failure-handler

Kebijakan penanganan kesalahan untuk permintaan Elasticsearch yang gagal.

String

Tidak

fail

Kebijakan berikut tersedia:

  • fail (default): Pekerjaan gagal jika permintaan gagal.

  • ignore: Mengabaikan kegagalan dan menghapus permintaan.

  • retry-rejected: Menambahkan kembali permintaan yang gagal karena kapasitas antrian penuh.

  • nama kelas kustom: Gunakan subclass dari ActionRequestFailureHandler untuk penanganan kesalahan.

sink.flush-on-checkpoint

Menentukan apakah operasi flush dilakukan pada checkpoint.

Boolean

Tidak

true

  • true: Nilai default.

  • false: Jika Anda menonaktifkan fitur ini, konektor tidak menunggu konfirmasi bahwa semua permintaan tertunda telah selesai selama checkpoint. Oleh karena itu, konektor tidak memberikan jaminan at-least-once untuk permintaan.

sink.bulk-flush.backoff.strategy

Jika operasi flush gagal karena kesalahan permintaan sementara, atur sink.bulk-flush.backoff.strategy untuk menentukan kebijakan percobaan ulang.

Enum

Tidak

DISABLED

  • DISABLED (default): Tidak melakukan percobaan ulang. Operasi gagal setelah kesalahan permintaan pertama.

  • CONSTANT: Backoff konstan. Waktu tunggu antar percobaan ulang sama.

  • EXPONENTIAL: Backoff eksponensial. Waktu tunggu antar percobaan ulang meningkat secara eksponensial.

sink.bulk-flush.backoff.max-retries

Jumlah maksimum percobaan ulang backoff.

Int

Tidak

Tidak ada

Tidak ada.

sink.bulk-flush.backoff.delay

Penundaan antara setiap percobaan backoff.

Duration

Tidak

Tidak ada

  • Untuk kebijakan backoff CONSTANT, nilai ini adalah penundaan antara setiap percobaan ulang.

  • Untuk kebijakan backoff EXPONENTIAL, nilai ini adalah penundaan dasar awal.

sink.bulk-flush.max-actions

Jumlah maksimum operasi yang dibuffer untuk setiap permintaan batch.

Int

Tidak

1000

Nilai 0 menonaktifkan fitur ini.

sink.bulk-flush.max-size

Ukuran memori maksimum buffer untuk permintaan.

String

Tidak

2 MB

Unitnya adalah MB. Nilai default-nya adalah 2 MB. Nilai 0 MB menonaktifkan fitur ini.

sink.bulk-flush.interval

Interval flush.

Duration

Tidak

1s

Unitnya adalah detik. Nilai default-nya adalah 1s. Nilai 0s menonaktifkan fitur ini.

connection.path-prefix

String awalan yang ditambahkan ke setiap komunikasi REST.

String

Tidak

Kosong

Tidak ada.

retry-on-conflict

Jumlah maksimum percobaan ulang yang diizinkan untuk operasi pembaruan akibat pengecualian konflik versi. Jika jumlah percobaan ulang melebihi nilai ini, pengecualian dilemparkan dan pekerjaan gagal.

Int

Tidak

0

Catatan
  • Parameter ini hanya didukung oleh VVR 4.0.13 atau versi lebih baru.

  • Parameter ini hanya berlaku ketika primary key didefinisikan.

routing-fields

Menentukan satu atau beberapa nama bidang ES untuk merutekan dokumen ke shard tertentu di Elasticsearch.

String

Tidak

Tidak ada

Pisahkan beberapa nama bidang dengan titik koma (;). Jika suatu bidang kosong, nilainya diatur ke null.

Catatan

Parameter ini hanya didukung oleh VVR 8.0.6 atau versi lebih baru untuk elasticsearch-7 dan elasticsearch-8.

sink.delete-strategy

Mengonfigurasi perilaku saat menerima pesan retraction (-D/-U).

Enum

Tidak

DELETE_ROW_ON_PK

Perilaku berikut tersedia:

  • DELETE_ROW_ON_PK (default): Mengabaikan pesan -U tetapi menghapus baris (dokumen) yang sesuai dengan primary key ketika menerima pesan -D.

  • IGNORE_DELETE: Mengabaikan pesan -U dan -D. Sink Elasticsearch tidak melakukan retraksi.

  • NON_PK_FIELD_TO_NULL: Mengabaikan pesan -U tetapi memodifikasi baris (dokumen) yang sesuai dengan primary key ketika menerima pesan -D. Nilai primary key tetap tidak berubah, dan semua nilai bidang non-primary key lainnya dalam skema tabel diatur ke NULL. Ini terutama digunakan untuk pembaruan parsial ketika beberapa sink menulis ke tabel Elasticsearch yang sama.

  • CHANGELOG_STANDARD: Mirip dengan DELETE_ROW_ON_PK, tetapi juga menghapus baris (dokumen) yang sesuai dengan primary key ketika menerima pesan -U.

    Catatan

    Parameter ini hanya didukung oleh VVR 8.0.8 atau versi lebih baru.

sink.ignore-null-when-update

Saat memperbarui data, menentukan apakah akan memperbarui bidang yang sesuai ke null atau tidak memperbarui bidang tersebut jika nilai bidang data masuk adalah null.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Tidak memperbarui bidang tersebut. Anda hanya dapat mengatur parameter ini ke true ketika primary key ditetapkan untuk tabel Flink dan format data Elasticsearch adalah JSON.

  • false: Memperbarui bidang tersebut ke null.

Catatan

Parameter ini hanya didukung oleh VVR 11.1 atau versi lebih baru.

connection.request-timeout

Timeout untuk meminta koneksi dari manajer koneksi.

Duration

Tidak

Tidak ada

Catatan

Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru.

connect.timeout

Timeout untuk membuat koneksi.

Duration

Tidak

Tidak ada

Catatan

Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru.

socket.timeout

Timeout untuk menunggu data. Ini adalah waktu idle maksimum antara dua paket data berturut-turut.

Duration

Tidak

Tidak ada

Catatan

Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru.

sink.bulk-flush.update.doc_as_upsert

Menentukan apakah akan menggunakan dokumen sebagai bidang pembaruan.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: Mengatur bidang doc_as_upsert dari Update Request ke true.

  • false: Mengisi bidang upsert dari Update Request dengan dokumen.

Berdasarkan https://github.com/elastic/elasticsearch/issues/105804, pipeline data preset Elasticsearch tidak mendukung pembaruan parsial untuk pembaruan massal. Jika Anda ingin menggunakan pipeline data, atur parameter ini ke true.

Catatan

Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru.

Tabel dimensi

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Tipe tabel dimensi.

String

Ya

Tidak ada

Nilainya tetap Elasticsearch.

endPoint

Alamat server.

String

Ya

Tidak ada

Contoh: http://127.0.0.1:XXXX.

indexName

Nama indeks.

String

Ya

Tidak ada

Tidak ada.

accessId

Username untuk instans Elasticsearch.

String

Tidak

Tidak ada

Nilai default-nya kosong, yang berarti autentikasi tidak dilakukan. Jika Anda menentukan accessId, Anda juga harus menentukan accessKey yang tidak kosong.

Penting

Untuk mencegah kebocoran username dan password Anda, gunakan variabel. Untuk informasi selengkapnya, lihat Variabel proyek.

accessKey

Password untuk instans Elasticsearch.

String

Tidak

Tidak ada

typeNames

Nama tipe.

String

Tidak

_doc

Jangan atur parameter ini untuk Elasticsearch 7.0 atau versi lebih baru.

maxJoinRows

Jumlah maksimum baris yang di-join untuk satu baris data.

Integer

Tidak

1024

Tidak ada.

cache

Kebijakan cache.

String

Tidak

Tidak ada

Tiga kebijakan cache berikut didukung:

  • ALL: Menyimpan cache semua data di tabel dimensi. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke cache. Semua pencarian selanjutnya dilakukan terhadap cache. Jika data tidak ditemukan di cache, kunci tersebut tidak ada. Cache lengkap dimuat ulang setelah kedaluwarsa.

  • LRU: Menyimpan cache sebagian data di tabel dimensi. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari data di cache. Jika data tidak ditemukan, sistem mencarinya di tabel dimensi fisik.

  • None: Tidak ada cache.

cacheSize

Ukuran cache, yaitu jumlah baris data yang dicache.

Long

Tidak

100000

Parameter cacheSize hanya berlaku ketika parameter cache diatur ke LRU.

cacheTTLMs

Periode timeout hingga cache kedaluwarsa.

Long

Tidak

Long.MAX_VALUE

Unitnya adalah milidetik. Konfigurasi cacheTTLMs bergantung pada konfigurasi cache:

  • Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa.

  • Jika cache diatur ke ALL, cacheTTLMs adalah interval untuk memuat ulang cache. Secara default, cache tidak dimuat ulang.

ignoreKeywordSuffix

Menentukan apakah akan mengabaikan akhiran .keyword yang secara otomatis ditambahkan ke bidang String.

Boolean

Tidak

false

Untuk memastikan kompatibilitas, Flink mengonversi tipe Text di Elasticsearch ke tipe String dan secara default menambahkan akhiran .keyword ke nama bidang bertipe String.

Nilai yang valid:

  • true: Pengaturan diabaikan.

    Jika hal ini menghambat pencocokan dengan bidang bertipe Text di Elasticsearch, atur parameter ini ke true.

  • false: Item tidak diabaikan.

cacheEmpty

Menentukan apakah akan menyimpan cache hasil kosong dari pencarian di tabel dimensi fisik.

Boolean

Tidak

true

Parameter cacheEmpty hanya berlaku ketika parameter cache diatur ke LRU.

queryMaxDocs

Jumlah maksimum dokumen yang dikembalikan saat mengkueri server Elasticsearch untuk setiap catatan data masuk dari input tabel dimensi tanpa primary key.

Integer

Tidak

10000

Nilai default 10000 adalah batas maksimum dokumen yang dapat dikembalikan oleh server Elasticsearch. Nilai parameter ini tidak boleh melebihi batas tersebut.

Catatan
  • Parameter ini hanya didukung oleh VVR 8.0.8 atau versi lebih baru.

  • Parameter ini hanya berlaku untuk tabel dimensi tanpa primary key karena data di tabel dengan primary key bersifat unik.

  • Untuk memastikan kebenaran kueri, nilai default dibuat besar. Namun, nilai besar meningkatkan penggunaan memori saat mengkueri Elasticsearch. Jika Anda mengalami masalah memori, Anda dapat menurunkan nilai ini untuk mengoptimalkan penggunaan memori.

Pemetaan tipe

Flink mengurai data Elasticsearch dalam format JSON. Untuk informasi selengkapnya, lihat Pemetaan Tipe Data.

Contoh

  • Contoh tabel sumber

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • Contoh tabel dimensi

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Contoh tabel sink 1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- Primary key bersifat opsional. Jika primary key didefinisikan, primary key tersebut digunakan sebagai ID dokumen. Jika tidak, nilai acak digunakan sebagai ID dokumen.
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • Contoh tabel sink 2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- Primary key bersifat opsional. Jika primary key didefinisikan, primary key tersebut digunakan sebagai ID dokumen. Jika tidak, nilai acak digunakan sebagai ID dokumen.
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;