全部产品
Search
文档中心

Realtime Compute for Apache Flink:Konektor Tair (Enterprise Edition)

更新时间:Jul 06, 2025

Topik ini menjelaskan penggunaan Konektor Tair (Enterprise Edition).

Informasi latar belakang

Tair (Redis OSS-compatible) adalah layanan database yang kompatibel dengan protokol sistem Redis sumber terbuka. Layanan ini mendukung kombinasi penyimpanan memori dan hard disk, serta menyediakan arsitektur hot standby untuk memastikan ketersediaan tinggi. Selain itu, Tair menggunakan arsitektur kluster yang dapat diskalakan untuk memenuhi kebutuhan bisnis akan throughput tinggi, latensi rendah, dan fleksibilitas dalam modifikasi konfigurasi.

Tabel berikut menggambarkan kemampuan yang didukung oleh Konektor Tair.

Item

Deskripsi

Jenis tabel

Tabel sink

Mode operasi

Mode streaming

Format data

STRING

Metrik

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

Catatan

Untuk informasi lebih lanjut tentang data deret waktu, lihat Data deret waktu.

Jenis API

SQL API

Pembaruan atau penghapusan data dalam tabel sink

Didukung

Prasyarat

Batasan

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi 6.0.6 atau lebih baru yang mendukung Konektor Tair (Enterprise Edition).

  • Konektor Tair (Enterprise Edition) tidak mendukung konfigurasi beberapa host.

Sintaksis

Tair (Enterprise Edition) mendukung semua struktur data Tair yang dikembangkan sendiri berdasarkan kompatibilitas dengan struktur data Redis berikut: STRING, LIST, SET, HASHMAP, dan SORTEDSET.

Untuk membuat tabel sink Tair, jalankan pernyataan DDL berikut:

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- Wajib. 
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>'
);
Catatan

Tair kompatibel dengan struktur data Redis. Untuk informasi lebih lanjut tentang contoh sintaksis dari struktur data Redis, lihat Konektor Tair (Redis OSS-compatible).

Parameter dalam klausa WITH

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Catatan

connector

Jenis tabel.

STRING

Ya

Tidak ada nilai default

Atur nilainya menjadi tair.

host

Titik akhir server Tair.

STRING

Ya

Tidak ada nilai default

Kami merekomendasikan Anda menggunakan titik akhir internal.

Catatan

Saat Anda mengakses database Tair melalui Internet, jaringan mungkin tidak stabil karena masalah seperti latensi jaringan dan batasan bandwidth.

mode

Struktur data Tair.

STRING

Ya

Tidak ada nilai default

Nilai valid:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair mendukung struktur data Redis dan struktur data Tair yang dikembangkan sendiri. Untuk informasi lebih lanjut tentang nilai valid, lihat Struktur data yang didukung oleh tabel sink ApsaraDB for Redis dan Format struktur data Tair.

Catatan
  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.1 atau versi lebih baru yang mendukung TairTs, TairCpc, TairRoaring, TairVector, dan TairGis.

  • Tabel sink Tair mendukung struktur data Tair yang dikembangkan sendiri. Pernyataan DDL yang digunakan untuk membuat tabel sink Tair harus mendefinisikan tabel berdasarkan format tertentu dan kunci utama harus ditentukan untuk tabel tersebut.

port

Nomor port server Tair.

INT

Tidak

6379

Tidak tersedia.

password

Kata sandi yang digunakan untuk mengakses database Tair.

STRING

Tidak

String kosong

String kosong menunjukkan bahwa tidak ada verifikasi yang dilakukan.

dbNum

ID database tujuan.

INT

Tidak

0

Tidak tersedia.

clusterMode

Menentukan apakah menggunakan arsitektur kluster.

BOOLEAN

Tidak

false

Nilai valid:

  • true: menggunakan arsitektur kluster.

  • false: menggunakan arsitektur mandiri.

ignoreDelete

Menentukan apakah mengabaikan pesan retraction.

BOOLEAN

Tidak

false

Nilai valid:

  • false: Saat pesan retraction diterima, data yang dimasukkan dan kunci data dihapus. Ini adalah nilai default.

  • true: Saat pesan retraction diterima, data yang dimasukkan dan kunci data dipertahankan.

expiration

Waktu hidup (TTL) yang ditentukan untuk kunci data yang dimasukkan.

LONG

Tidak

0

Nilai 0 menunjukkan bahwa TTL tidak dikonfigurasi. Jika nilai parameter ini lebih besar dari 0, TTL dikonfigurasi untuk kunci data yang dimasukkan. Unit: milidetik.

expirationAt

Waktu kedaluwarsa absolut yang ditentukan untuk kunci data yang dimasukkan.

LONG

Tidak

0

Unit: milidetik. Nilai default: 0. Nilai default menunjukkan bahwa tidak ada waktu kedaluwarsa yang ditentukan.

Jika nilai parameter ini lebih besar dari 0 dan parameter expiration disetel ke 0, waktu kedaluwarsa absolut ditentukan untuk kunci data yang dimasukkan.

incrMode

Mode sink database Tair.

STRING

Tidak

None

Nilai valid:

  • None: menunjukkan operasi insert. Ini adalah nilai default.

  • int: menunjukkan operasi INCRBY. Nilai INCR tetap pada nilai parameter incrValue.

  • float: menunjukkan operasi INCRBYFLOAT. Nilai INCR tetap pada nilai parameter incrValue.

  • dynamic_int: menunjukkan operasi INCRBY. Nilai INCR adalah nama kolom tempat nilai INCR termasuk dalam pernyataan DDL.

  • dynamic_float: menunjukkan operasi INCRBYFLOAT. Nilai INCR adalah nama kolom tempat nilai INCR termasuk dalam pernyataan DDL.

incrValue

Nilai INCR yang ditentukan berdasarkan nilai parameter incrMode.

STRING

Tidak

Tidak ada nilai default

Nilai parameter incrValue bervariasi berdasarkan nilai parameter incrMode.

  • Jika parameter incrMode disetel ke None, Anda tidak perlu mengonfigurasi parameter incrValue.

  • Jika parameter incrMode disetel ke int atau float, nilai parameter incrValue adalah nilai INCR.

  • Jika parameter incrMode disetel ke dynamic_int atau dynamic_float, nilai parameter incrValue adalah nama kolom tempat nilai INCR termasuk dalam pernyataan DDL.

fieldExpireMode

Mode kedaluwarsa bidang dalam TairHash atau skeys dalam TairTS.

STRING

Tidak

None

Nilai valid:

  • None: Tidak ada waktu kedaluwarsa yang ditentukan.

  • millisecond: Waktu kedaluwarsa relatif ditentukan. Waktu kedaluwarsa tetap pada nilai parameter fieldExpireValue.

    Catatan

    Mode kedaluwarsa skeys dalam TairTS harus millisecond.

  • unixtime: Waktu kedaluwarsa absolut ditentukan. Waktu kedaluwarsa tetap pada nilai parameter fieldExpireValue.

  • dynamic_millisecond: Waktu kedaluwarsa relatif ditentukan. Waktu kedaluwarsa adalah nama kolom tempat nilai fieldExpireValue termasuk dalam pernyataan DDL.

  • dynamic_unixtime: Waktu kedaluwarsa absolut ditentukan. Waktu kedaluwarsa adalah nama kolom tempat nilai fieldExpireValue termasuk dalam pernyataan DDL.

fieldExpireValue

Waktu kedaluwarsa bidang dalam TairHash atau skeys dalam TairTS.

STRING

Tidak

Tidak ada nilai default

Nilai valid:

  • Jika parameter fieldExpireMode disetel ke None, tidak ada waktu kedaluwarsa yang ditentukan.

  • Jika parameter fieldExpireMode disetel ke millisecond atau unixtime, nilai parameter fieldExpireValue adalah waktu kedaluwarsa.

  • Jika parameter fieldExpireMode disetel ke dynamic_millisecond atau dynamic_unixtime, nilai parameter fieldExpireValue adalah nama kolom tempat nilai parameter fieldExpireValue termasuk dalam pernyataan DDL.

Pemetaan tipe data

Tipe data Realtime Compute for Apache Flink

Tipe data Tair

VARCHAR

STRING

DOUBLE

DOUBLE

Format struktur data Tair

Struktur data

Format

Perintah untuk memasukkan data ke tabel sink Tair

TairString

Jika parameter incrMode disetel ke None, pernyataan DDL memiliki dua kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan nilai bertipe STRING.

exset key value

Jika parameter incrMode disetel ke int atau float, pernyataan DDL hanya memiliki satu kolom dan kolom tersebut mencantumkan kunci bertipe STRING.

exincrby/exincrbyfloat key incrValue

Jika parameter incrMode disetel ke dynamic_int atau dynamic_float, pernyataan DDL memiliki dua kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan nilai incrValue bertipe STRING.

exincrby/exincrbyfloat key incrValue

TairHash

Jika parameter incrMode disetel ke None, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan bidang bertipe STRING.

  • Kolom ketiga mencantumkan nilai bidang bertipe STRING.

exhset key field value

Jika parameter incrMode disetel ke int atau float, pernyataan DDL memiliki dua kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan bidang bertipe STRING.

 exhincrby/exincrbyfloat key field incrValue

Jika parameter incrMode disetel ke dynamic_int atau dynamic_float, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan bidang bertipe STRING.

  • Kolom ketiga mencantumkan nilai incrValue bertipe STRING yang sesuai dengan bidang.

exhincrby/exincrbyfloat key field incrValue

TairZset

Jika parameter incrMode disetel ke None, TairZset mendukung pengurutan data multidimensi. TairZset memungkinkan Anda mengurutkan data bertipe DOUBLE dari maksimal 256 dimensi. Oleh karena itu, pernyataan DDL memiliki 3 hingga 258 kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan anggota bertipe STRING.

  • Kolom sisanya mencantumkan skor bertipe DOUBLE.

exzadd key score member
Catatan

Jika Anda ingin mengurutkan data dari beberapa dimensi, pastikan bahwa format skor semua dimensi sama.

Jika parameter incrMode disetel ke int atau float, pernyataan DDL memiliki dua kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan anggota bertipe STRING.

exzincyby key member incrValue

Jika parameter incrMode disetel ke dynamic_int atau dynamic_float, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan anggota bertipe STRING.

  • Kolom ketiga mencantumkan nilai incrValue bertipe STRING.

exzincyby key member incrValue

TairBloom

Parameter incrMode harus disetel ke None.

Saat data dimasukkan ke tabel sink Tair untuk pertama kali, sebuah kunci TairBloom dengan kapasitas default 100 elemen dan tingkat kesalahan 0.01 dibuat. Pernyataan DDL memiliki dua kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan item bertipe STRING.

BF.ADD key item

TairDoc

Parameter incrMode harus disetel ke None. Pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan jalur bertipe STRING.

  • Kolom ketiga mencantumkan elemen JSON bertipe STRING.

JSON.SET key path json

TairSearch

Jika parameter incrMode disetel ke None, pernyataan DDL memiliki empat kolom.

  • Kolom pertama mencantumkan indeks bertipe STRING.

  • Kolom kedua mencantumkan ID dokumen bertipe STRING.

  • Kolom ketiga mencantumkan dokumen bertipe STRING. Dokumen harus dalam format JSON.

  • Kolom keempat mencantumkan pemetaan bertipe STRING.

TFT.ADDDOC index document docid
Catatan

Sebelum Anda memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TFT.CREATEINDEX index mappings

Jika parameter incrMode disetel ke int atau float, pernyataan DDL memiliki empat kolom.

  • Kolom pertama mencantumkan indeks bertipe STRING.

  • Kolom kedua mencantumkan ID dokumen bertipe STRING.

  • Kolom ketiga mencantumkan bidang bertipe STRING.

  • Kolom keempat mencantumkan pemetaan bertipe STRING.

Contoh perintah untuk operasi dokumen:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
Catatan

Sebelum Anda memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TFT.CREATEINDEX index mappings

Jika parameter incrMode disetel ke dynamic_int atau dynamic_float, pernyataan DDL memiliki lima kolom.

  • Kolom pertama mencantumkan indeks bertipe STRING.

  • Kolom kedua mencantumkan ID dokumen bertipe STRING.

  • Kolom ketiga mencantumkan bidang bertipe STRING.

  • Kolom keempat mencantumkan pemetaan bertipe STRING.

  • Kolom kelima mencantumkan nilai incrValue bertipe STRING.

Contoh perintah untuk operasi dokumen:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
Catatan

Sebelum Anda memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TFT.CREATEINDEX index mappings

TairCpc

Parameter incrMode harus disetel ke None. Pernyataan DDL memiliki dua kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan item bertipe STRING.

CPC.UPDATE key item

TairGis

Parameter incrMode harus disetel ke None. Pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan nama poligon bertipe STRING.

  • Kolom ketiga mencantumkan nilai Well-known Text (WKT) poligon. Nilai tersebut bertipe STRING.

GIS.ADD area polygonName polygonWkt

TairRoaring

Parameter incrMode harus disetel ke None. Pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan kunci bertipe STRING.

  • Kolom kedua mencantumkan offset yang ditentukan bertipe BIGINT.

  • Kolom ketiga mencantumkan nilai bertipe BIGINT. Nilai valid: 0 dan 1.

TR.SETBIT key offset value

TairVector

Parameter incrMode harus disetel ke None. Pernyataan DDL memiliki enam kolom.

  • Kolom pertama mencantumkan nama indeks bertipe STRING.

  • Kolom kedua mencantumkan kunci utama rekaman. Nilai tersebut bertipe STRING.

  • Kolom ketiga mencantumkan data vektor bertipe STRING.

  • Kolom keempat mencantumkan dimensi vektor bertipe INT.

  • Kolom kelima mencantumkan algoritma yang digunakan untuk membangun dan menanyakan indeks. Nilai tersebut bertipe STRING.

  • Kolom keenam mencantumkan metode jarak yang digunakan untuk menghitung jarak vektor. Nilai tersebut bertipe STRING.

TVS.HSET index_name key VECTOR vector_data
Catatan

Sebelum Anda memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TVS.CREATEINDEX index_name dims algorithm distance_method

TairTs

Jika parameter incrMode disetel ke None, pernyataan DDL memiliki empat kolom.

  • Kolom pertama mencantumkan pkey bertipe STRING. Sebuah pkey menunjukkan sekelompok garis waktu.

  • Kolom kedua mencantumkan skey bertipe STRING. Sebuah skey menunjukkan garis waktu.

  • Kolom ketiga mencantumkan timestamp bertipe STRING.

  • Kolom keempat mencantumkan nilai bertipe STRING.

EXTS.S.RAW_MODIFY Pkey Skey timestamp value

Jika parameter incrMode disetel ke float, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama mencantumkan pkey bertipe STRING.

  • Kolom kedua mencantumkan skey bertipe STRING.

  • Kolom ketiga mencantumkan timestamp bertipe STRING.

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

Jika parameter incrMode disetel ke dynamic_float, pernyataan DDL memiliki empat kolom.

  • Kolom pertama mencantumkan pkey bertipe STRING.

  • Kolom kedua mencantumkan skey bertipe STRING.

  • Kolom ketiga mencantumkan timestamp bertipe STRING.

  • Kolom keempat mencantumkan nilai incrValue bertipe STRING. Nilai incrValue sesuai dengan timestamp yang tercantum di kolom ketiga.

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

Contoh kode

  • Contoh kode untuk memasukkan data ke tabel sink Tair dalam mode umum:

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      index_name STRING,
      doc_id STRING,
      doc STRING,
      mapping STRING,
      PRIMARY KEY(index_name) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairsearch',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}'
    );
    
    INSERT INTO tair_output
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping
    FROM datagen_stream;
  • Contoh kode untuk memasukkan data ke tabel sink Tair saat parameter incrMode dikonfigurasikan:

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      step STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'dynamic_float',
      'incrValue' = 'step'
    );
    
    INSERT INTO tair_output
    SELECT *
    FROM datagen_stream;           
    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'float',
      'incrValue' = '11.11'
    );
    
    INSERT INTO tair_output
    SELECT v
    FROM datagen_stream;
  • Contoh kode untuk memasukkan data ke tabel sink Tair saat parameter fieldExpireMode dikonfigurasikan:

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING,
      s STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_ouput (
    	key STRING,
    	field STRING,
    	value STRING,
    	PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairhash',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'fieldExpireMode' = 'millisecond',
      'fieldExpireValue' = '1000'
    );
    
    INSERT INTO tair_output
    SELECT v, p, s
    FROM datagen_stream;