Topik ini menjelaskan cara menggunakan konektor Elasticsearch.
Informasi latar belakang
Alibaba Cloud Elasticsearch kompatibel dengan fitur Elasticsearch open source, seperti Security, Machine Learning, Graph, dan application performance management (APM). Layanan ini cocok untuk analitik data, pencarian data, dan skenario lainnya, serta menyediakan layanan tingkat enterprise seperti kontrol akses, pemantauan keamanan dan peringatan, serta pembuatan laporan otomatis.
Konektor Elasticsearch mendukung hal-hal berikut:
Kategori | Deskripsi |
Tipe yang didukung | Tabel sumber, tabel dimensi, dan tabel sink |
Running mode | Mode batch dan streaming |
Format data | JSON |
Metrik pemantauan spesifik |
Catatan Untuk informasi selengkapnya tentang metrik tersebut, lihat Metrik. |
Tipe API | DataStream dan SQL |
Pembaruan atau penghapusan data pada tabel sink | Ya |
Prasyarat
Indeks Elasticsearch telah dibuat. Untuk informasi selengkapnya, lihat Buat indeks.
Daftar putih akses publik atau akses internal telah dikonfigurasi untuk instans Elasticsearch. Untuk informasi selengkapnya, lihat Konfigurasikan daftar putih akses publik atau akses internal untuk instans.
Batasan
Tabel sumber dan dimensi mendukung Elasticsearch 6.8.x atau versi lebih baru, tetapi tidak mendukung 8.x atau versi lebih baru.
Tabel sink hanya mendukung Elasticsearch 6.x, 7.x, dan 8.x.
Hanya tabel sumber Elasticsearch lengkap yang didukung. Tabel sumber inkremental tidak didukung.
Sintaksis
Tabel sumber
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );Tabel dimensi
CREATE TABLE es_dim( field1 STRING, -- Bidang ini digunakan sebagai kunci untuk operasi JOIN dan harus bertipe STRING. field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );CatatanJika Anda menentukan primary key, Anda hanya dapat menggunakan satu bidang kunci untuk operasi JOIN pada tabel dimensi. Kunci ini harus merupakan ID dokumen di indeks Elasticsearch yang sesuai.
Jika Anda tidak menentukan primary key, Anda dapat menggunakan satu atau beberapa bidang kunci untuk operasi JOIN pada tabel dimensi. Kunci-kunci ini harus merupakan bidang dalam dokumen di indeks Elasticsearch yang sesuai.
Untuk tipe String, akhiran .keyword ditambahkan ke nama bidang secara default untuk memastikan kompatibilitas. Jika hal ini menghambat pencocokan dengan bidang Text di Elasticsearch, Anda dapat mengatur parameter ignoreKeywordSuffix ke true.
Tabel sink
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', -- Jika Anda menggunakan Elasticsearch 6.x, atur parameter ini ke elasticsearch-6. 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );CatatanTabel sink Elasticsearch bekerja dalam mode upsert atau append, tergantung pada apakah primary key didefinisikan.
Jika primary key didefinisikan, primary key tersebut harus menjadi ID dokumen. Tabel sink Elasticsearch bekerja dalam mode upsert, yang dapat memproses pesan UPDATE dan DELETE.
Jika tidak ada primary key yang didefinisikan, Elasticsearch akan menghasilkan ID dokumen acak secara otomatis. Tabel sink Elasticsearch bekerja dalam mode append, yang hanya dapat mengonsumsi pesan INSERT.
Beberapa tipe data, seperti BYTES, ROW, ARRAY, dan MAP, tidak memiliki representasi string yang sesuai. Oleh karena itu, bidang-bidang dengan tipe tersebut tidak dapat digunakan sebagai bidang primary key.
Bidang-bidang dalam pernyataan DDL berkorespondensi dengan bidang-bidang dalam dokumen Elasticsearch. Anda tidak dapat menulis metadata, seperti ID dokumen, ke tabel sink Elasticsearch karena metadata tersebut dikelola oleh instans Elasticsearch.
Parameter WITH
Tabel sumber
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Tipe tabel sumber. | String | Ya | Tidak ada | Bidang statis adalah Elasticsearch. |
endPoint | Alamat server. | String | Ya | Tidak ada | Contoh: |
indexName | Nama indeks. | String | Ya | Tidak ada | Tidak ada. |
accessId | Username untuk instans Elasticsearch. | String | Tidak | Tidak ada | Nilai default-nya kosong, yang berarti autentikasi tidak dilakukan. Jika Anda menentukan accessId, Anda juga harus menentukan accessKey yang tidak kosong. Penting Untuk mencegah kebocoran username dan password Anda, gunakan variabel. Untuk informasi selengkapnya, lihat Variabel proyek. |
accessKey | Password untuk instans Elasticsearch. | String | Tidak | Tidak ada | |
typeNames | Nama tipe. | String | Tidak | _doc | Jangan atur parameter ini untuk Elasticsearch 7.0 atau versi lebih baru. |
batchSize | Jumlah maksimum dokumen yang diambil dari kluster Elasticsearch untuk setiap permintaan scroll. | Int | Tidak | 2000 | Tidak ada. |
keepScrollAliveSecs | Waktu maksimum untuk menjaga konteks scroll tetap aktif. | Int | Tidak | 3600 | Unitnya adalah detik. |
Tabel sink
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Tipe tabel sink. | String | Ya | Tidak ada | Nilai yang valid adalah Catatan Anda hanya dapat mengatur parameter ini ke |
hosts | Alamat server. | String | Ya | Tidak ada | Contoh: |
index | Nama indeks. | String | Ya | Tidak ada | Tabel sink Elasticsearch mendukung indeks statis maupun dinamis. Perhatikan poin-poin berikut saat menggunakan indeks statis dan dinamis:
|
document-type | Tipe dokumen. | String |
| Tidak ada | Jika parameter connector diatur ke |
username | Username. | String | Tidak | Kosong | Nilai default-nya kosong, yang berarti autentikasi tidak dilakukan. Jika Anda menentukan username, Anda juga harus menentukan password yang tidak kosong. Penting Untuk mencegah kebocoran username dan password Anda, gunakan variabel. Untuk informasi selengkapnya, lihat Variabel proyek. |
password | Password. | String | Tidak | Kosong | |
document-id.key-delimiter | Pemisah untuk ID dokumen. | String | Tidak | _ | Pada tabel sink Elasticsearch, primary key digunakan untuk menghitung ID dokumen Elasticsearch. Tabel sink Elasticsearch menghasilkan string ID dokumen untuk setiap baris dengan menggabungkan semua bidang primary key sesuai urutan yang ditentukan dalam DDL, menggunakan pemisah kunci yang ditentukan oleh document-id.key-delimiter. Catatan ID dokumen adalah string hingga 512 byte yang tidak mengandung spasi. |
failure-handler | Kebijakan penanganan kesalahan untuk permintaan Elasticsearch yang gagal. | String | Tidak | fail | Kebijakan berikut tersedia:
|
sink.flush-on-checkpoint | Menentukan apakah operasi flush dilakukan pada checkpoint. | Boolean | Tidak | true |
|
sink.bulk-flush.backoff.strategy | Jika operasi flush gagal karena kesalahan permintaan sementara, atur sink.bulk-flush.backoff.strategy untuk menentukan kebijakan percobaan ulang. | Enum | Tidak | DISABLED |
|
sink.bulk-flush.backoff.max-retries | Jumlah maksimum percobaan ulang backoff. | Int | Tidak | Tidak ada | Tidak ada. |
sink.bulk-flush.backoff.delay | Penundaan antara setiap percobaan backoff. | Duration | Tidak | Tidak ada |
|
sink.bulk-flush.max-actions | Jumlah maksimum operasi yang dibuffer untuk setiap permintaan batch. | Int | Tidak | 1000 | Nilai 0 menonaktifkan fitur ini. |
sink.bulk-flush.max-size | Ukuran memori maksimum buffer untuk permintaan. | String | Tidak | 2 MB | Unitnya adalah MB. Nilai default-nya adalah 2 MB. Nilai 0 MB menonaktifkan fitur ini. |
sink.bulk-flush.interval | Interval flush. | Duration | Tidak | 1s | Unitnya adalah detik. Nilai default-nya adalah 1s. Nilai 0s menonaktifkan fitur ini. |
connection.path-prefix | String awalan yang ditambahkan ke setiap komunikasi REST. | String | Tidak | Kosong | Tidak ada. |
retry-on-conflict | Jumlah maksimum percobaan ulang yang diizinkan untuk operasi pembaruan akibat pengecualian konflik versi. Jika jumlah percobaan ulang melebihi nilai ini, pengecualian dilemparkan dan pekerjaan gagal. | Int | Tidak | 0 | Catatan
|
routing-fields | Menentukan satu atau beberapa nama bidang ES untuk merutekan dokumen ke shard tertentu di Elasticsearch. | String | Tidak | Tidak ada | Pisahkan beberapa nama bidang dengan titik koma (;). Jika suatu bidang kosong, nilainya diatur ke null. Catatan Parameter ini hanya didukung oleh VVR 8.0.6 atau versi lebih baru untuk elasticsearch-7 dan elasticsearch-8. |
sink.delete-strategy | Mengonfigurasi perilaku saat menerima pesan retraction (-D/-U). | Enum | Tidak | DELETE_ROW_ON_PK | Perilaku berikut tersedia:
|
sink.ignore-null-when-update | Saat memperbarui data, menentukan apakah akan memperbarui bidang yang sesuai ke null atau tidak memperbarui bidang tersebut jika nilai bidang data masuk adalah null. | BOOLEAN | Tidak | false | Nilai yang valid:
Catatan Parameter ini hanya didukung oleh VVR 11.1 atau versi lebih baru. |
connection.request-timeout | Timeout untuk meminta koneksi dari manajer koneksi. | Duration | Tidak | Tidak ada | Catatan Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru. |
connect.timeout | Timeout untuk membuat koneksi. | Duration | Tidak | Tidak ada | Catatan Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru. |
socket.timeout | Timeout untuk menunggu data. Ini adalah waktu idle maksimum antara dua paket data berturut-turut. | Duration | Tidak | Tidak ada | Catatan Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru. |
sink.bulk-flush.update.doc_as_upsert | Menentukan apakah akan menggunakan dokumen sebagai bidang pembaruan. | BOOLEAN | Tidak | false | Nilai yang valid:
Berdasarkan https://github.com/elastic/elasticsearch/issues/105804, pipeline data preset Elasticsearch tidak mendukung pembaruan parsial untuk pembaruan massal. Jika Anda ingin menggunakan pipeline data, atur parameter ini ke true. Catatan Parameter ini hanya didukung oleh VVR 11.5 atau versi lebih baru. |
Tabel dimensi
Parameter | Deskripsi | Tipe data | Wajib | Nilai default | Catatan |
connector | Tipe tabel dimensi. | String | Ya | Tidak ada | Nilainya tetap Elasticsearch. |
endPoint | Alamat server. | String | Ya | Tidak ada | Contoh: |
indexName | Nama indeks. | String | Ya | Tidak ada | Tidak ada. |
accessId | Username untuk instans Elasticsearch. | String | Tidak | Tidak ada | Nilai default-nya kosong, yang berarti autentikasi tidak dilakukan. Jika Anda menentukan accessId, Anda juga harus menentukan accessKey yang tidak kosong. Penting Untuk mencegah kebocoran username dan password Anda, gunakan variabel. Untuk informasi selengkapnya, lihat Variabel proyek. |
accessKey | Password untuk instans Elasticsearch. | String | Tidak | Tidak ada | |
typeNames | Nama tipe. | String | Tidak | _doc | Jangan atur parameter ini untuk Elasticsearch 7.0 atau versi lebih baru. |
maxJoinRows | Jumlah maksimum baris yang di-join untuk satu baris data. | Integer | Tidak | 1024 | Tidak ada. |
cache | Kebijakan cache. | String | Tidak | Tidak ada | Tiga kebijakan cache berikut didukung:
|
cacheSize | Ukuran cache, yaitu jumlah baris data yang dicache. | Long | Tidak | 100000 | Parameter cacheSize hanya berlaku ketika parameter cache diatur ke LRU. |
cacheTTLMs | Periode timeout hingga cache kedaluwarsa. | Long | Tidak | Long.MAX_VALUE | Unitnya adalah milidetik. Konfigurasi cacheTTLMs bergantung pada konfigurasi cache:
|
ignoreKeywordSuffix | Menentukan apakah akan mengabaikan akhiran .keyword yang secara otomatis ditambahkan ke bidang String. | Boolean | Tidak | false | Untuk memastikan kompatibilitas, Flink mengonversi tipe Text di Elasticsearch ke tipe String dan secara default menambahkan akhiran .keyword ke nama bidang bertipe String. Nilai yang valid:
|
cacheEmpty | Menentukan apakah akan menyimpan cache hasil kosong dari pencarian di tabel dimensi fisik. | Boolean | Tidak | true | Parameter cacheEmpty hanya berlaku ketika parameter cache diatur ke LRU. |
queryMaxDocs | Jumlah maksimum dokumen yang dikembalikan saat mengkueri server Elasticsearch untuk setiap catatan data masuk dari input tabel dimensi tanpa primary key. | Integer | Tidak | 10000 | Nilai default 10000 adalah batas maksimum dokumen yang dapat dikembalikan oleh server Elasticsearch. Nilai parameter ini tidak boleh melebihi batas tersebut. Catatan
|
Pemetaan tipe
Flink mengurai data Elasticsearch dalam format JSON. Untuk informasi selengkapnya, lihat Pemetaan Tipe Data.
Contoh
Contoh tabel sumber
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;Contoh tabel dimensi
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;Contoh tabel sink 1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- Primary key bersifat opsional. Jika primary key didefinisikan, primary key tersebut digunakan sebagai ID dokumen. Jika tidak, nilai acak digunakan sebagai ID dokumen. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;Contoh tabel sink 2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- Primary key bersifat opsional. Jika primary key didefinisikan, primary key tersebut digunakan sebagai ID dokumen. Jika tidak, nilai acak digunakan sebagai ID dokumen. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;