全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Elasticsearch

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan konektor Elasticsearch.

Informasi latar belakang

Alibaba Cloud Elasticsearch kompatibel dengan fitur-fitur Elasticsearch sumber terbuka seperti Keamanan, Pembelajaran Mesin, Grafik, dan Pemantauan Kinerja Aplikasi (APM). Alibaba Cloud Elasticsearch cocok untuk berbagai skenario seperti analisis data dan pencarian data. Layanan kelas perusahaan seperti kontrol akses, pemantauan keamanan, peringatan, serta pembuatan laporan otomatis juga tersedia.

Tabel berikut menjelaskan kemampuan yang didukung oleh konektor Elasticsearch.

Item

Deskripsi

Jenis tabel

Tabel sumber, tabel dimensi, dan tabel sink

Mode operasi

Mode batch dan mode streaming

Format data

JSON

Metric

  • Metrics untuk tabel sumber

    • pendingRecords

    • numRecordsIn

    • numRecordsInPerSecond

    • numBytesIn

    • numBytesInPerSecond

  • Metrics untuk tabel dimensi

    Tidak ada nilai default

  • Metrics untuk tabel sink di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 6.0.6 atau lebih baru

    • numRecordsOut

    • numRecordsOutPerSecond

Catatan

Untuk informasi lebih lanjut tentang metrics, lihat Metrics.

Jenis API

DataStream API dan SQL API

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

Batasan

  • Konektor Elasticsearch dapat digunakan untuk tabel sumber dan tabel dimensi hanya jika kluster Elasticsearch terkait adalah versi yang lebih besar dari atau sama dengan V6.8.X tetapi lebih awal dari V8.X.

  • Konektor Elasticsearch dapat digunakan untuk tabel sink hanya jika kluster Elasticsearch terkait adalah V6.X, V7.X, atau V8.X.

  • Konektor Elasticsearch hanya dapat digunakan untuk tabel sumber Elasticsearch penuh dan tidak dapat digunakan untuk tabel sumber Elasticsearch inkremental.

Sintaksis

  • Buat tabel sumber:

    CREATE TABLE elasticsearch_source(
      name STRING,
      location STRING,
      value FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
  • Buat tabel dimensi:

    CREATE TABLE es_dim(
      field1 STRING, --- Jika bidang ini digunakan sebagai kunci untuk menggabungkan tabel dimensi dengan tabel lain, nilai bidang ini harus bertipe data STRING.
      field2 FLOAT,
      field3 BIGINT,
      PRIMARY KEY (field1) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'indexName' = '<yourIndexName>'
    );
    Catatan
    • Jika Anda mendefinisikan kunci utama untuk tabel dimensi, hanya satu kunci yang dapat digunakan untuk menggabungkan tabel dimensi dengan tabel lain. Kunci tersebut adalah ID dokumen di indeks Elasticsearch Anda.

    • Jika Anda tidak mendefinisikan kunci utama untuk tabel dimensi, satu atau lebih kunci dapat digunakan untuk menggabungkan tabel dimensi dengan tabel lain. Kunci-kunci tersebut adalah bidang-bidang dokumen di indeks Elasticsearch Anda.

    • Secara default, akhiran .keyword ditambahkan ke nama-nama bidang tipe data STRING untuk memastikan kompatibilitas. Jika bidang tipe data TEXT di tabel Elasticsearch tidak dapat dicocokkan, Anda dapat mengatur nilai opsi ignoreKeywordSuffix menjadi true.

  • Buat tabel sink:

    CREATE TABLE es_sink(
      user_id   STRING,
      user_name   STRING,
      uv BIGINT,
      pv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED
    ) WITH (
      'connector' = 'elasticsearch-7' -- Jika versi Elasticsearch adalah V6.X, masukkan elasticsearch-6.
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>'
    );
    Catatan
    • Tabel sink Elasticsearch bekerja dalam mode upsert atau mode append berdasarkan apakah kunci utama didefinisikan.

      • Jika kunci utama didefinisikan untuk tabel sink Elasticsearch, kunci utama harus berupa ID dokumen dan tabel sink Elasticsearch bekerja dalam mode upsert. Dalam mode ini, tabel sink Elasticsearch dapat mengonsumsi pesan UPDATE dan DELETE.

      • Jika tidak ada kunci utama yang didefinisikan untuk tabel sink Elasticsearch, Elasticsearch secara otomatis menghasilkan ID dokumen acak dan tabel sink Elasticsearch bekerja dalam mode append. Dalam mode ini, tabel sink Elasticsearch hanya dapat mengonsumsi pesan INSERT.

    • Tipe data spesifik seperti BYTES, ROW, ARRAY, dan MAP tidak dapat direpresentasikan sebagai string. Oleh karena itu, bidang-bidang tipe data ini tidak dapat digunakan sebagai bidang kunci utama.

    • Bidang-bidang dalam pernyataan DDL sesuai dengan bidang-bidang dalam dokumen Elasticsearch. Metadata seperti ID dokumen dipertahankan pada kluster Elasticsearch. Oleh karena itu, metadata tidak dapat ditulis ke tabel sink Elasticsearch.

Opsi konektor dalam klausa WITH

  • Spesifik Sumber

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel sumber.

    STRING

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi elasticsearch.

    endPoint

    Titik akhir kluster Elasticsearch.

    STRING

    Ya

    Tidak ada nilai default

    Contoh: http://127.0.0.1:XXXX.

    indexName

    Nama indeks Elasticsearch.

    STRING

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    accessId

    Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.

    STRING

    Tidak

    Tidak ada nilai default

    Secara default, opsi ini kosong. Ini menunjukkan bahwa verifikasi izin tidak diperlukan. Jika Anda mengonfigurasi opsi accessId, Anda juga harus mengonfigurasi opsi accessKey.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel daripada menuliskan pasangan AccessKey Anda dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.

    accessKey

    Kata sandi yang digunakan untuk mengakses kluster Elasticsearch.

    STRING

    Tidak

    Tidak ada nilai default

    typeNames

    Nama-nama tipe.

    STRING

    Tidak

    _doc

    Kami sarankan Anda tidak mengonfigurasi opsi ini jika versi kluster Elasticsearch Anda lebih baru dari V7.0.

    batchSize

    Jumlah maksimum dokumen yang dapat diperoleh dari kluster Elasticsearch untuk setiap permintaan gulir.

    INT

    Tidak

    2000

    Tidak tersedia.

    keepScrollAliveSecs

    Periode retensi maksimum konteks gulir.

    INT

    Tidak

    3600

    Unit: detik.

  • Spesifik Sink

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel sink.

    String

    Ya

    Tidak ada nilai default

    Nilai valid: elasticsearch-6, elasticsearch-7, dan elasticsearch-8.

    Catatan

    Hanya VVR 8.0.5 atau lebih baru yang mendukung elasticsearch-8.

    hosts

    Titik akhir kluster Elasticsearch.

    String

    Ya

    Tidak ada nilai default

    Contoh: 127.0.0.1:XXXX.

    index

    Nama indeks Elasticsearch.

    String

    Ya

    Tidak ada nilai default

    Tabel sink Elasticsearch mendukung indeks statis dan dinamis. Saat menggunakan indeks statis dan dinamis, perhatikan hal-hal berikut:

    • Jika Anda menggunakan indeks statis, nilai opsi indeks harus berupa string, seperti myusers. Semua rekaman ditulis ke indeks myusers.

    • Jika Anda menggunakan indeks dinamis, Anda dapat menggunakan {field_name} untuk merujuk nilai bidang dalam rekaman untuk menghasilkan indeks tujuan secara dinamis. Anda juga dapat menggunakan {field_namedate_format_string} untuk mengonversi nilai bidang tipe data TIMESTAMP, DATE, dan TIME ke format yang ditentukan oleh date_format_string. date_format_string kompatibel dengan DateTimeFormatter di Java. Sebagai contoh, jika Anda mengatur indeks dinamis menjadi myusers-{log_tsyyyy-MM-dd}, rekaman 2020-03-27 12:25:55 dalam nilai bidang log_ts ditulis ke indeks myusers-2020-03-27.

    document-type

    Tipe dokumen.

    String

    • Jika opsi konektor diatur ke elasticsearch-6, opsi ini harus dikonfigurasi.

    • Jika opsi konektor diatur ke elasticsearch-7, opsi ini tidak didukung.

    Tidak ada nilai default

    Jika opsi connector diatur ke elasticsearch-6, nilai opsi ini harus sama dengan nilai opsi type yang dikonfigurasi untuk Elasticsearch.

    username

    Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.

    String

    Tidak

    Tidak ada nilai default

    Secara default, opsi ini dibiarkan kosong, yang menunjukkan bahwa verifikasi izin tidak diperlukan. Jika Anda mengonfigurasi opsi username, Anda juga harus mengonfigurasi opsi password.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel daripada menuliskan pasangan AccessKey Anda dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.

    password

    Kata sandi yang digunakan untuk mengakses kluster Elasticsearch.

    String

    Tidak

    Tidak ada nilai default

    document-id.key-delimiter

    Pemisah yang digunakan untuk memisahkan beberapa ID dokumen.

    String

    Tidak

    _

    Pada tabel sink Elasticsearch, kunci utama digunakan untuk menghitung ID dokumen Elasticsearch. Tabel sink Elasticsearch menggabungkan semua bidang kunci utama dalam urutan yang didefinisikan dalam pernyataan DDL menggunakan pemisah kunci yang ditentukan oleh document-id.key-delimiter. ID dokumen juga dihasilkan untuk setiap baris.

    Catatan

    ID dokumen adalah string yang berisi maksimum 512 byte tanpa spasi.

    failure-handler

    Kebijakan penanganan kesalahan yang digunakan saat permintaan Elasticsearch gagal.

    String

    Tidak

    fail

    Nilai valid:

    • fail: Penyebaran gagal jika permintaan gagal. Ini adalah nilai default.

    • ignore: Kegagalan diabaikan dan permintaan dihapus.

    • retry_rejected: Permintaan diulang jika kegagalan disebabkan oleh kapasitas antrian penuh.

    • custom class name: Subkelas ActionRequestFailureHandler digunakan untuk menangani kegagalan.

    sink.flush-on-checkpoint

    Menentukan apakah operasi flush dipicu selama checkpointing.

    Boolean

    Tidak

    true

    • true: Operasi flush dipicu selama checkpointing. Ini adalah nilai default.

    • false: Operasi flush tidak dipicu selama checkpointing. Setelah fitur ini dinonaktifkan, konektor Elasticsearch tidak menunggu untuk memeriksa apakah semua permintaan tertunda selesai selama checkpointing. Oleh karena itu, konektor Elasticsearch tidak memberikan jaminan setidaknya sekali untuk permintaan.

    sink.bulk-flush.backoff.strategy

    Anda dapat mengonfigurasi opsi sink.bulk-flush.backoff.strategy untuk menentukan kebijakan pengulangan jika operasi flush gagal karena kesalahan permintaan sementara.

    Enum

    Tidak

    DISABLED

    • DISABLED: Operasi flush tidak diulang. Operasi flush gagal saat kesalahan permintaan pertama terjadi. Ini adalah nilai default.

    • CONSTANT: Waktu tunggu untuk setiap operasi flush sama.

    • EXPONENTIAL: Waktu tunggu untuk setiap operasi flush meningkat secara eksponensial.

    sink.bulk-flush.backoff.max-retries

    Jumlah maksimum pengulangan.

    Int

    Tidak

    Tidak ada nilai default

    Tidak tersedia.

    sink.bulk-flush.backoff.delay

    Penundaan antara pengulangan.

    Duration

    Tidak

    Tidak ada nilai default

    • Jika opsi sink.bulk-flush.backoff.strategy diatur ke CONSTANT, nilai opsi ini adalah penundaan antara pengulangan.

    • Jika opsi sink.bulk-flush.backoff.strategy diatur ke EXPONENTIAL, nilai opsi ini adalah penundaan awal dasar.

    sink.bulk-flush.max-actions

    Jumlah maksimum operasi flush yang dapat dilakukan untuk setiap batch permintaan.

    Int

    Tidak

    1000

    Nilai 0 menunjukkan bahwa fitur ini dinonaktifkan.

    sink.bulk-flush.max-size

    Ukuran memori maksimum dari buffer tempat permintaan disimpan.

    String

    Tidak

    2 MB

    Nilai default: 2. Unit: MB. Jika opsi ini diatur ke 0, fitur ini dinonaktifkan.

    sink.bulk-flush.interval

    Interval waktu operasi flush dilakukan.

    Duration

    Tidak

    1s

    Nilai default: 1. Unit: detik. Jika opsi ini diatur ke 0, fitur ini dinonaktifkan.

    connection.path-prefix

    Awalan yang harus ditambahkan ke setiap komunikasi REST.

    String

    Tidak

    Tidak ada nilai default

    Tidak tersedia.

    retry-on-conflict

    Jumlah maksimum pengulangan yang diizinkan karena konflik versi dalam operasi pembaruan. Jika jumlah pengulangan melebihi nilai opsi ini, pengecualian terjadi dan penyebaran gagal.

    Int

    Tidak

    0

    Catatan
    • Hanya VVR 4.0.13 atau lebih baru yang mendukung opsi ini.

    • Opsi ini hanya berlaku saat kunci utama ditentukan.

    routing-fields

    Satu atau lebih nama bidang dalam tabel sink Elasticsearch. Nama bidang digunakan untuk merutekan dokumen ke shard yang ditentukan dari kluster Elasticsearch.

    String

    Tidak

    Tidak ada nilai default

    Pisahkan beberapa nama bidang dengan titik koma (;). Jika sebuah bidang kosong, bidang tersebut diatur ke null.

    Catatan

    Hanya VVR 8.0.6 atau lebih baru yang mendukung opsi ini saat opsi connector diatur ke elasticsearch-7 atau elasticsearch-8.

    sink.delete-strategy

    Operasi yang dilakukan saat pesan retraksi (DELETE atau UPDATE) diterima.

    Enum

    Tidak

    DELETE_ROW_ON_PK

    Nilai valid:

    • DELETE_ROW_ON_PK: mengabaikan pesan UPDATE dan menghapus baris (dokumen) yang cocok dengan nilai kunci utama saat pesan DELETE diterima. Ini adalah nilai default.

    • IGNORE_DELETE: mengabaikan pesan UPDATE dan DELETE. Tidak ada retraksi yang terjadi di sink Elasticsearch.

    • NON_PK_FIELD_TO_NULL: mengabaikan pesan UPDATE dan memodifikasi baris (dokumen) yang cocok dengan nilai kunci utama saat pesan DELETE diterima. Nilai kunci utama tetap tidak berubah, dan nilai non-kunci utama dalam skema tabel diatur ke NULL. Nilai ini digunakan untuk pembaruan data sebagian saat beberapa sink digunakan untuk menulis data ke tabel Elasticsearch yang sama.

    • CHANGELOG_STANDARD: Mirip dengan DELETE_ROW_ON_PK. Satu-satunya perbedaan adalah bahwa baris (dokumen) yang cocok dengan nilai kunci utama juga dihapus saat pesan UPDATE diterima.

      Catatan

      Hanya VVR 8.0.8 atau lebih baru yang mendukung opsi ini.

    sink.ignore-null-when-update

    Menentukan apakah akan mengabaikan nilai null saat memperbarui tabel.

    Boolean

    Tidak

    false

    Nilai valid:

    • true: mengabaikan nilai null.

    • false: tidak mengabaikan nilai null.

    Catatan
    • true didukung hanya untuk tabel dengan kunci utama dan opsi format diatur ke JSON.

    • Opsi ini didukung di VVR 11.1 atau lebih baru.

  • Spesifik Tabel Dimensi

    Opsi

    Deskripsi

    Tipe data

    Diperlukan

    Nilai default

    Catatan

    connector

    Jenis tabel dimensi.

    String

    Ya

    Tidak ada nilai default

    Atur nilainya menjadi elasticsearch.

    endPoint

    Titik akhir kluster Elasticsearch.

    String

    Ya

    Tidak ada nilai default

    Contoh: http://127.0.0.1:XXXX.

    indexName

    Nama indeks Elasticsearch.

    String

    Ya

    Tidak ada nilai default

    Tidak tersedia.

    accessId

    Nama pengguna yang digunakan untuk mengakses kluster Elasticsearch.

    String

    Tidak

    Tidak ada nilai default

    Secara default, opsi ini kosong. Ini menunjukkan bahwa verifikasi izin tidak diperlukan. Jika Anda mengonfigurasi opsi accessId, Anda juga harus mengonfigurasi opsi accessKey.

    Penting

    Untuk meningkatkan keamanan, gunakan variabel daripada menuliskan pasangan AccessKey Anda dalam teks biasa. Untuk informasi lebih lanjut, lihat Kelola variabel.

    accessKey

    Kata sandi yang digunakan untuk mengakses kluster Elasticsearch.

    String

    Tidak

    Tidak ada nilai default

    typeNames

    Nama-nama tipe.

    String

    Tidak

    _doc

    Kami sarankan Anda tidak mengonfigurasi opsi ini jika versi kluster Elasticsearch Anda lebih baru dari V7.0.

    maxJoinRows

    Jumlah maksimum baris yang dapat digabungkan.

    Integer

    Tidak

    1024

    Tidak tersedia.

    cache

    Kebijakan cache.

    String

    Tidak

    None

    Nilai valid:

    • ALL: Semua data dalam tabel dimensi di-cache. Sebelum penyebaran berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri berikutnya dalam tabel dimensi. Jika sistem tidak menemukan rekaman data dalam cache, kunci gabungan tidak ada. Sistem memuat ulang semua data dalam cache setelah entri cache kedaluwarsa.

    • LRU: Data sebagian dalam tabel dimensi di-cache. Sistem mencari data dalam cache setiap kali rekaman data dibaca dari tabel sumber. Jika data tidak ditemukan, sistem mencari data dalam tabel dimensi fisik.

    • None: Tidak ada data yang di-cache.

    cacheSize

    Jumlah maksimum baris data yang dapat di-cache.

    Long

    Tidak

    100000

    Opsi cacheSize hanya berlaku saat Anda mengatur opsi cache ke LRU.

    cacheTTLMs

    Periode timeout cache.

    Long

    Tidak

    Long.MAX_VALUE

    Unit: milidetik. Konfigurasi opsi cacheTTLMs bervariasi berdasarkan opsi cache.

    • Jika opsi cache diatur ke LRU, opsi cacheTTLMs menentukan periode timeout cache. Secara default, entri cache tidak kedaluwarsa.

    • Jika Anda mengatur opsi cache ke ALL, opsi cacheTTLMs menentukan interval waktu sistem menyegarkan cache. Secara default, cache tidak disegarkan.

    ignoreKeywordSuffix

    Menentukan apakah akan mengabaikan akhiran .keyword yang secara otomatis ditambahkan ke nama bidang tipe data STRING.

    Boolean

    Tidak

    false

    Realtime Compute for Apache Flink mengonversi bidang tipe data TEXT menjadi bidang tipe data STRING untuk memastikan kompatibilitas. Secara default, akhiran .keyword ditambahkan ke nama-nama bidang tipe data STRING.

    Nilai valid:

    • true: Akhiran .keyword diabaikan.

      Jika bidang tipe data TEXT dalam tabel sink Elasticsearch tidak dapat dicocokkan, atur opsi ini ke true.

    • false: Akhiran .keyword tidak diabaikan.

    cacheEmpty

    Menentukan apakah akan menyimpan hasil kosong yang ditemukan dalam tabel dimensi fisik.

    Boolean

    Tidak

    true

    Opsi cacheEmpty berlaku hanya saat opsi cache diatur ke LRU.

    queryMaxDocs

    Jumlah maksimum dokumen yang dapat dikembalikan saat server Elasticsearch di-query setelah setiap rekaman data dikirim ke tabel dimensi tanpa kunci utama.

    Integer

    Tidak

    10000

    Nilai default 10000 juga merupakan nilai maksimum dari opsi ini.

    Catatan
    • Hanya VVR 8.0.8 atau lebih baru yang mendukung opsi ini.

    • Opsi ini berlaku hanya untuk tabel dimensi tanpa kunci utama karena data dalam tabel kunci utama bersifat unik.

    • Untuk memastikan kebenaran kueri, nilai besar digunakan sebagai nilai default. Namun, nilai besar meningkatkan penggunaan memori selama query Elasticsearch. Jika Anda mengalami masalah kekurangan memori, Anda dapat menurunkan nilainya untuk mengurangi penggunaan memori.

Pemetaan tipe data

Realtime Compute for Apache Flink mengurai data Elasticsearch dalam format JSON. Untuk informasi lebih lanjut, lihat Pemetaan Tipe Data.

Kode contoh

  • Kode contoh untuk tabel sumber

    CREATE TEMPORARY TABLE elasticsearch_source (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      name STRING,
      location STRING,
      `value` FLOAT
    ) WITH (
      'connector' ='blackhole'
    );
    
    INSERT INTO blackhole_sink
    SELECT name, location, `value`
    FROM elasticsearch_source;
  • Kode contoh untuk tabel dimensi

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      data STRING,
      proctime as PROCTIME()
    ) WITH (
      'connector' = 'datagen' 
    );
    
    CREATE TEMPORARY TABLE es_dim (
      id STRING,
      `value` FLOAT,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' ='elasticsearch',
      'endPoint' = '<yourEndPoint>',
      'accessId' = '${secret_values.ak_id}',
      'accessKey' = '${secret_values.ak_secret}',
      'indexName' = '<yourIndexName>',
      'typeNames' = '<yourTypeName>'
    );
    
    CREATE TEMPORARY TABLE blackhole_sink (
      id STRING,
      data STRING,
      `value` FLOAT
    ) WITH (
      'connector' = 'blackhole' 
    );
    
    INSERT INTO blackhole_sink
    SELECT e.*, w.*
    FROM datagen_source AS e
    JOIN es_dim FOR SYSTEM_TIME AS OF e.proctime AS w
    ON e.id = w.id;
  • Kode contoh untuk tabel sink 1

    CREATE TEMPORARY TABLE datagen_source (
      id STRING, 
      name STRING,
      uv BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      user_id STRING,
      user_name STRING,
      uv BIGINT,
      PRIMARY KEY (user_id) NOT ENFORCED -- Kunci utama bersifat opsional. Jika Anda menentukan kunci utama, kunci utama digunakan sebagai ID dokumen. Jika Anda tidak menentukan kunci utama, nilai acak digunakan sebagai ID dokumen.
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, name, uv
    FROM datagen_source;
  • Kode contoh untuk tabel sink 2

    CREATE TEMPORARY TABLE datagen_source(  
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >
    ) WITH (  
        'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE es_sink (
      id STRING,
        details ROW<  
            name STRING,  
            ages ARRAY<INT>,  
            attributes MAP<STRING, STRING>  
        >, 
      PRIMARY KEY (id) NOT ENFORCED  -- Kunci utama bersifat opsional. Jika Anda menentukan kunci utama, kunci utama digunakan sebagai ID dokumen. Jika Anda tidak menentukan kunci utama, nilai acak digunakan sebagai ID dokumen.
    ) WITH (
      'connector' = 'elasticsearch-6',
      'hosts' = '<yourHosts>',
      'index' = '<yourIndex>',
      'document-type' = '<yourElasticsearch.types>',
      'username' ='${secret_values.ak_id}',
      'password' ='${secret_values.ak_secret}'
    );
    
    INSERT INTO es_sink
    SELECT id, details
    FROM datagen_source;