Topik ini menjelaskan cara menggunakan konektor Elasticsearch.
Informasi latar belakang
Alibaba Cloud Elasticsearch kompatibel dengan fitur-fitur Elasticsearch sumber terbuka seperti Keamanan, Pembelajaran Mesin, Grafik, dan Pemantauan Kinerja Aplikasi (APM). Alibaba Cloud Elasticsearch cocok untuk berbagai skenario seperti analisis data dan pencarian data. Layanan kelas perusahaan seperti kontrol akses, pemantauan keamanan, peringatan, serta pembuatan laporan otomatis juga tersedia.
Tabel berikut menjelaskan kemampuan yang didukung oleh konektor Elasticsearch.
Item | Deskripsi |
Jenis tabel | Tabel sumber, tabel dimensi, dan tabel sink |
Mode operasi | Mode batch dan mode streaming |
Format data | JSON |
Metric |
Catatan Untuk informasi lebih lanjut tentang metrics, lihat Metrics. |
Jenis API | DataStream API dan SQL API |
Pembaruan atau penghapusan data dalam tabel sink | Didukung |
Prasyarat
Indeks Elasticsearch telah dibuat. Untuk informasi lebih lanjut, lihat bagian "Langkah 1: Buat kluster" dari topik Memulai.
Daftar putih alamat IP publik atau pribadi dikonfigurasi untuk kluster Elasticsearch terkait. Untuk informasi lebih lanjut, lihat Konfigurasikan daftar putih alamat IP publik atau pribadi untuk kluster Elasticsearch.
Batasan
Konektor Elasticsearch dapat digunakan untuk tabel sumber dan tabel dimensi hanya jika kluster Elasticsearch terkait adalah versi yang lebih besar dari atau sama dengan V6.8.X tetapi lebih awal dari V8.X.
Konektor Elasticsearch dapat digunakan untuk tabel sink hanya jika kluster Elasticsearch terkait adalah V6.X, V7.X, atau V8.X.
Konektor Elasticsearch hanya dapat digunakan untuk tabel sumber Elasticsearch penuh dan tidak dapat digunakan untuk tabel sumber Elasticsearch inkremental.
Sintaksis
Buat tabel sumber:
CREATE TABLE elasticsearch_source( name STRING, location STRING, value FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );Buat tabel dimensi:
CREATE TABLE es_dim( field1 STRING, --- Jika bidang ini digunakan sebagai kunci untuk menggabungkan tabel dimensi dengan tabel lain, nilai bidang ini harus bertipe data STRING. field2 FLOAT, field3 BIGINT, PRIMARY KEY (field1) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'indexName' = '<yourIndexName>' );CatatanJika Anda mendefinisikan kunci utama untuk tabel dimensi, hanya satu kunci yang dapat digunakan untuk menggabungkan tabel dimensi dengan tabel lain. Kunci tersebut adalah ID dokumen di indeks Elasticsearch Anda.
Jika Anda tidak mendefinisikan kunci utama untuk tabel dimensi, satu atau lebih kunci dapat digunakan untuk menggabungkan tabel dimensi dengan tabel lain. Kunci-kunci tersebut adalah bidang-bidang dokumen di indeks Elasticsearch Anda.
Secara default, akhiran .keyword ditambahkan ke nama-nama bidang tipe data STRING untuk memastikan kompatibilitas. Jika bidang tipe data TEXT di tabel Elasticsearch tidak dapat dicocokkan, Anda dapat mengatur nilai opsi
ignoreKeywordSuffixmenjadi true.
Buat tabel sink:
CREATE TABLE es_sink( user_id STRING, user_name STRING, uv BIGINT, pv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7' -- Jika versi Elasticsearch adalah V6.X, masukkan elasticsearch-6. 'hosts' = '<yourHosts>', 'index' = '<yourIndex>' );CatatanTabel sink Elasticsearch bekerja dalam mode upsert atau mode append berdasarkan apakah kunci utama didefinisikan.
Jika kunci utama didefinisikan untuk tabel sink Elasticsearch, kunci utama harus berupa ID dokumen dan tabel sink Elasticsearch bekerja dalam mode upsert. Dalam mode ini, tabel sink Elasticsearch dapat mengonsumsi pesan UPDATE dan DELETE.
Jika tidak ada kunci utama yang didefinisikan untuk tabel sink Elasticsearch, Elasticsearch secara otomatis menghasilkan ID dokumen acak dan tabel sink Elasticsearch bekerja dalam mode append. Dalam mode ini, tabel sink Elasticsearch hanya dapat mengonsumsi pesan INSERT.
Tipe data spesifik seperti BYTES, ROW, ARRAY, dan MAP tidak dapat direpresentasikan sebagai string. Oleh karena itu, bidang-bidang tipe data ini tidak dapat digunakan sebagai bidang kunci utama.
Bidang-bidang dalam pernyataan DDL sesuai dengan bidang-bidang dalam dokumen Elasticsearch. Metadata seperti ID dokumen dipertahankan pada kluster Elasticsearch. Oleh karena itu, metadata tidak dapat ditulis ke tabel sink Elasticsearch.
Opsi konektor dalam klausa WITH
Spesifik Sumber
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel sumber.
STRING
Ya
Tidak ada nilai default
Atur nilainya menjadi elasticsearch.
endPoint
Titik akhir kluster Elasticsearch.
STRING
Ya
Tidak ada nilai default
Contoh:
http://127.0.0.1:XXXX.indexName
Nama indeks Elasticsearch.
STRING
Ya
Tidak ada nilai default
Tidak tersedia.
accessId
Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.
STRING
Tidak
Tidak ada nilai default
Secara default, opsi ini kosong. Ini menunjukkan bahwa verifikasi izin tidak diperlukan. Jika Anda mengonfigurasi opsi accessId, Anda juga harus mengonfigurasi opsi accessKey.
PentingUntuk meningkatkan keamanan, gunakan variabel daripada menuliskan pasangan AccessKey Anda dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.
accessKey
Kata sandi yang digunakan untuk mengakses kluster Elasticsearch.
STRING
Tidak
Tidak ada nilai default
typeNames
Nama-nama tipe.
STRING
Tidak
_doc
Kami sarankan Anda tidak mengonfigurasi opsi ini jika versi kluster Elasticsearch Anda lebih baru dari V7.0.
batchSize
Jumlah maksimum dokumen yang dapat diperoleh dari kluster Elasticsearch untuk setiap permintaan gulir.
INT
Tidak
2000
Tidak tersedia.
keepScrollAliveSecs
Periode retensi maksimum konteks gulir.
INT
Tidak
3600
Unit: detik.
Spesifik Sink
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel sink.
String
Ya
Tidak ada nilai default
Nilai valid:
elasticsearch-6,elasticsearch-7, danelasticsearch-8.CatatanHanya VVR 8.0.5 atau lebih baru yang mendukung
elasticsearch-8.hosts
Titik akhir kluster Elasticsearch.
String
Ya
Tidak ada nilai default
Contoh:
127.0.0.1:XXXX.index
Nama indeks Elasticsearch.
String
Ya
Tidak ada nilai default
Tabel sink Elasticsearch mendukung indeks statis dan dinamis. Saat menggunakan indeks statis dan dinamis, perhatikan hal-hal berikut:
Jika Anda menggunakan indeks statis, nilai opsi indeks harus berupa string, seperti
myusers. Semua rekaman ditulis ke indeksmyusers.Jika Anda menggunakan indeks dinamis, Anda dapat menggunakan
{field_name}untuk merujuk nilai bidang dalam rekaman untuk menghasilkan indeks tujuan secara dinamis. Anda juga dapat menggunakan{field_namedate_format_string}untuk mengonversi nilai bidang tipe data TIMESTAMP, DATE, dan TIME ke format yang ditentukan olehdate_format_string.date_format_stringkompatibel dengan DateTimeFormatter di Java. Sebagai contoh, jika Anda mengatur indeks dinamis menjadimyusers-{log_tsyyyy-MM-dd}, rekaman2020-03-27 12:25:55dalam nilai bidang log_ts ditulis ke indeksmyusers-2020-03-27.
document-type
Tipe dokumen.
String
Jika opsi konektor diatur ke elasticsearch-6, opsi ini harus dikonfigurasi.
Jika opsi konektor diatur ke elasticsearch-7, opsi ini tidak didukung.
Tidak ada nilai default
Jika opsi
connectordiatur keelasticsearch-6, nilai opsi ini harus sama dengan nilai opsitypeyang dikonfigurasi untuk Elasticsearch.username
Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.
String
Tidak
Tidak ada nilai default
Secara default, opsi ini dibiarkan kosong, yang menunjukkan bahwa verifikasi izin tidak diperlukan. Jika Anda mengonfigurasi opsi
username, Anda juga harus mengonfigurasi opsipassword.PentingUntuk meningkatkan keamanan, gunakan variabel daripada menuliskan pasangan AccessKey Anda dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.
password
Kata sandi yang digunakan untuk mengakses kluster Elasticsearch.
String
Tidak
Tidak ada nilai default
document-id.key-delimiter
Pemisah yang digunakan untuk memisahkan beberapa ID dokumen.
String
Tidak
_
Pada tabel sink Elasticsearch, kunci utama digunakan untuk menghitung ID dokumen Elasticsearch. Tabel sink Elasticsearch menggabungkan semua bidang kunci utama dalam urutan yang didefinisikan dalam pernyataan DDL menggunakan pemisah kunci yang ditentukan oleh document-id.key-delimiter. ID dokumen juga dihasilkan untuk setiap baris.
CatatanID dokumen adalah string yang berisi maksimum 512 byte tanpa spasi.
failure-handler
Kebijakan penanganan kesalahan yang digunakan saat permintaan Elasticsearch gagal.
String
Tidak
fail
Nilai valid:
fail: Penyebaran gagal jika permintaan gagal. Ini adalah nilai default.
ignore: Kegagalan diabaikan dan permintaan dihapus.
retry_rejected: Permintaan diulang jika kegagalan disebabkan oleh kapasitas antrian penuh.
custom class name: Subkelas ActionRequestFailureHandler digunakan untuk menangani kegagalan.
sink.flush-on-checkpoint
Menentukan apakah operasi flush dipicu selama checkpointing.
Boolean
Tidak
true
true: Operasi flush dipicu selama checkpointing. Ini adalah nilai default.
false: Operasi flush tidak dipicu selama checkpointing. Setelah fitur ini dinonaktifkan, konektor Elasticsearch tidak menunggu untuk memeriksa apakah semua permintaan tertunda selesai selama checkpointing. Oleh karena itu, konektor Elasticsearch tidak memberikan jaminan setidaknya sekali untuk permintaan.
sink.bulk-flush.backoff.strategy
Anda dapat mengonfigurasi opsi sink.bulk-flush.backoff.strategy untuk menentukan kebijakan pengulangan jika operasi flush gagal karena kesalahan permintaan sementara.
Enum
Tidak
DISABLED
DISABLED: Operasi flush tidak diulang. Operasi flush gagal saat kesalahan permintaan pertama terjadi. Ini adalah nilai default.
CONSTANT: Waktu tunggu untuk setiap operasi flush sama.
EXPONENTIAL: Waktu tunggu untuk setiap operasi flush meningkat secara eksponensial.
sink.bulk-flush.backoff.max-retries
Jumlah maksimum pengulangan.
Int
Tidak
Tidak ada nilai default
Tidak tersedia.
sink.bulk-flush.backoff.delay
Penundaan antara pengulangan.
Duration
Tidak
Tidak ada nilai default
Jika opsi
sink.bulk-flush.backoff.strategydiatur keCONSTANT, nilai opsi ini adalah penundaan antara pengulangan.Jika opsi
sink.bulk-flush.backoff.strategydiatur keEXPONENTIAL, nilai opsi ini adalah penundaan awal dasar.
sink.bulk-flush.max-actions
Jumlah maksimum operasi flush yang dapat dilakukan untuk setiap batch permintaan.
Int
Tidak
1000
Nilai 0 menunjukkan bahwa fitur ini dinonaktifkan.
sink.bulk-flush.max-size
Ukuran memori maksimum dari buffer tempat permintaan disimpan.
String
Tidak
2 MB
Nilai default: 2. Unit: MB. Jika opsi ini diatur ke 0, fitur ini dinonaktifkan.
sink.bulk-flush.interval
Interval waktu operasi flush dilakukan.
Duration
Tidak
1s
Nilai default: 1. Unit: detik. Jika opsi ini diatur ke 0, fitur ini dinonaktifkan.
connection.path-prefix
Awalan yang harus ditambahkan ke setiap komunikasi REST.
String
Tidak
Tidak ada nilai default
Tidak tersedia.
retry-on-conflict
Jumlah maksimum pengulangan yang diizinkan karena konflik versi dalam operasi pembaruan. Jika jumlah pengulangan melebihi nilai opsi ini, pengecualian terjadi dan penyebaran gagal.
Int
Tidak
0
CatatanHanya VVR 4.0.13 atau lebih baru yang mendukung opsi ini.
Opsi ini hanya berlaku saat kunci utama ditentukan.
routing-fields
Satu atau lebih nama bidang dalam tabel sink Elasticsearch. Nama bidang digunakan untuk merutekan dokumen ke shard yang ditentukan dari kluster Elasticsearch.
String
Tidak
Tidak ada nilai default
Pisahkan beberapa nama bidang dengan titik koma (;). Jika sebuah bidang kosong, bidang tersebut diatur ke null.
CatatanHanya VVR 8.0.6 atau lebih baru yang mendukung opsi ini saat opsi
connectordiatur ke elasticsearch-7 atau elasticsearch-8.sink.delete-strategy
Operasi yang dilakukan saat pesan retraksi (DELETE atau UPDATE) diterima.
Enum
Tidak
DELETE_ROW_ON_PK
Nilai valid:
DELETE_ROW_ON_PK: mengabaikan pesan UPDATE dan menghapus baris (dokumen) yang cocok dengan nilai kunci utama saat pesan DELETE diterima. Ini adalah nilai default.
IGNORE_DELETE: mengabaikan pesan UPDATE dan DELETE. Tidak ada retraksi yang terjadi di sink Elasticsearch.
NON_PK_FIELD_TO_NULL: mengabaikan pesan UPDATE dan memodifikasi baris (dokumen) yang cocok dengan nilai kunci utama saat pesan DELETE diterima. Nilai kunci utama tetap tidak berubah, dan nilai non-kunci utama dalam skema tabel diatur ke NULL. Nilai ini digunakan untuk pembaruan data sebagian saat beberapa sink digunakan untuk menulis data ke tabel Elasticsearch yang sama.
CHANGELOG_STANDARD: Mirip dengan DELETE_ROW_ON_PK. Satu-satunya perbedaan adalah bahwa baris (dokumen) yang cocok dengan nilai kunci utama juga dihapus saat pesan UPDATE diterima.
CatatanHanya VVR 8.0.8 atau lebih baru yang mendukung opsi ini.
sink.ignore-null-when-update
Menentukan apakah akan mengabaikan nilai null saat memperbarui tabel.
Boolean
Tidak
false
Nilai valid:
true: mengabaikan nilai null.
false: tidak mengabaikan nilai null.
Catatantruedidukung hanya untuk tabel dengan kunci utama dan opsiformatdiatur keJSON.Opsi ini didukung di VVR 11.1 atau lebih baru.
Spesifik Tabel Dimensi
Opsi
Deskripsi
Tipe data
Diperlukan
Nilai default
Catatan
connector
Jenis tabel dimensi.
String
Ya
Tidak ada nilai default
Atur nilainya menjadi elasticsearch.
endPoint
Titik akhir kluster Elasticsearch.
String
Ya
Tidak ada nilai default
Contoh:
http://127.0.0.1:XXXX.indexName
Nama indeks Elasticsearch.
String
Ya
Tidak ada nilai default
Tidak tersedia.
accessId
Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.
String
Tidak
Tidak ada nilai default
Secara default, opsi ini kosong. Ini menunjukkan bahwa verifikasi izin tidak diperlukan. Jika Anda mengonfigurasi opsi accessId, Anda juga harus mengonfigurasi opsi accessKey.
PentingUntuk meningkatkan keamanan, gunakan variabel daripada menuliskan pasangan AccessKey Anda dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.
accessKey
Kata sandi yang digunakan untuk mengakses kluster Elasticsearch.
String
Tidak
Tidak ada nilai default
typeNames
Nama-nama tipe.
String
Tidak
_doc
Kami sarankan Anda tidak mengonfigurasi opsi ini jika versi kluster Elasticsearch Anda lebih baru dari V7.0.
maxJoinRows
Jumlah maksimum baris yang dapat digabungkan.
Integer
Tidak
1024
Tidak tersedia.
cache
Kebijakan cache.
String
Tidak
None
Nilai valid:
ALL: Semua data dalam tabel dimensi di-cache. Sebelum penyebaran berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri berikutnya dalam tabel dimensi. Jika sistem tidak menemukan rekaman data dalam cache, kunci gabungan tidak ada. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.
LRU: Data sebagian dalam tabel dimensi di-cache. Sistem mencari data dalam cache setiap kali rekaman data dibaca dari tabel sumber. Jika data tidak ditemukan, sistem mencari data dalam tabel dimensi fisik.
None: Tidak ada data yang di-cache.
cacheSize
Jumlah maksimum baris data yang dapat di-cache.
Long
Tidak
100000
Opsi cacheSize hanya berlaku saat Anda mengatur opsi cache ke
LRU.cacheTTLMs
Periode timeout cache.
Long
Tidak
Long.MAX_VALUE
Unit: milidetik. Konfigurasi opsi cacheTTLMs bervariasi berdasarkan opsi cache.
Jika opsi cache diatur ke LRU, opsi cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.
Jika Anda mengatur opsi cache ke
ALL, opsi cacheTTLMs menentukan interval waktu sistem menyegarkan cache. Secara default, cache tidak disegarkan.
ignoreKeywordSuffix
Menentukan apakah akan mengabaikan akhiran .keyword yang secara otomatis ditambahkan ke nama bidang tipe data STRING.
Boolean
Tidak
false
Realtime Compute for Apache Flink mengonversi bidang tipe data TEXT menjadi bidang tipe data STRING untuk memastikan kompatibilitas. Secara default, akhiran .keyword ditambahkan ke nama-nama bidang tipe data STRING.
Nilai valid:
true: Akhiran .keyword diabaikan.
Jika bidang tipe data TEXT dalam tabel sink Elasticsearch tidak dapat dicocokkan, atur opsi ini ke
true.false: Akhiran .keyword tidak diabaikan.
cacheEmpty
Menentukan apakah akan menyimpan hasil kosong yang ditemukan dalam tabel dimensi fisik.
Boolean
Tidak
true
Opsi cacheEmpty berlaku hanya saat opsi cache diatur ke
LRU.queryMaxDocs
Jumlah maksimum dokumen yang dapat dikembalikan saat server Elasticsearch di-query setelah setiap rekaman data dikirim ke tabel dimensi tanpa kunci utama.
Integer
Tidak
10000
Nilai default 10000 juga merupakan nilai maksimum dari opsi ini.
CatatanHanya VVR 8.0.8 atau lebih baru yang mendukung opsi ini.
Opsi ini berlaku hanya untuk tabel dimensi tanpa kunci utama karena data dalam tabel kunci utama bersifat unik.
Untuk memastikan kebenaran kueri, nilai besar digunakan sebagai nilai default. Namun, nilai besar meningkatkan penggunaan memori selama query Elasticsearch. Jika Anda mengalami masalah kekurangan memori, Anda dapat menurunkan nilainya untuk mengurangi penggunaan memori.
Pemetaan tipe data
Realtime Compute for Apache Flink mengurai data Elasticsearch dalam format JSON. Untuk informasi lebih lanjut, lihat Pemetaan Tipe Data.
Kode contoh
Kode contoh untuk tabel sumber
CREATE TEMPORARY TABLE elasticsearch_source ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( name STRING, location STRING, `value` FLOAT ) WITH ( 'connector' ='blackhole' ); INSERT INTO blackhole_sink SELECT name, location, `value` FROM elasticsearch_source;Kode contoh untuk tabel dimensi
CREATE TEMPORARY TABLE datagen_source ( id STRING, data STRING, proctime as PROCTIME() ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_dim ( id STRING, `value` FLOAT, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' ='elasticsearch', 'endPoint' = '<yourEndPoint>', 'accessId' = '${secret_values.ak_id}', 'accessKey' = '${secret_values.ak_secret}', 'indexName' = '<yourIndexName>', 'typeNames' = '<yourTypeName>' ); CREATE TEMPORARY TABLE blackhole_sink ( id STRING, data STRING, `value` FLOAT ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT e.*, w.* FROM datagen_source AS e JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w ON e.id = w.id;Kode contoh untuk tabel sink 1
CREATE TEMPORARY TABLE datagen_source ( id STRING, name STRING, uv BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( user_id STRING, user_name STRING, uv BIGINT, PRIMARY KEY (user_id) NOT ENFORCED -- Kunci utama bersifat opsional. Jika Anda menentukan kunci utama, kunci utama digunakan sebagai ID dokumen. Jika Anda tidak menentukan kunci utama, nilai acak digunakan sebagai ID dokumen. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, name, uv FROM datagen_source;Kode contoh untuk tabel sink 2
CREATE TEMPORARY TABLE datagen_source( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> > ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE es_sink ( id STRING, details ROW< name STRING, ages ARRAY<INT>, attributes MAP<STRING, STRING> >, PRIMARY KEY (id) NOT ENFORCED -- Kunci utama bersifat opsional. Jika Anda menentukan kunci utama, kunci utama digunakan sebagai ID dokumen. Jika Anda tidak menentukan kunci utama, nilai acak digunakan sebagai ID dokumen. ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<yourHosts>', 'index' = '<yourIndex>', 'document-type' = '<yourElasticsearch.types>', 'username' ='${secret_values.ak_id}', 'password' ='${secret_values.ak_secret}' ); INSERT INTO es_sink SELECT id, details FROM datagen_source;