All Products
Search
Document Center

Realtime Compute for Apache Flink:Tair (Edisi Perusahaan)

Last Updated:Mar 10, 2026

Topik ini menjelaskan cara menggunakan konektor Tair (Edisi Perusahaan).

Informasi latar belakang

Tair (Kompatibel dengan Redis OSS) adalah layanan database yang kompatibel dengan protokol sistem Redis open source. Tair (Kompatibel dengan Redis OSS) mendukung penyimpanan hibrida antara memori dan hard disk, menyediakan arsitektur hot standby untuk menjamin ketersediaan tinggi, serta menggunakan arsitektur kluster yang dapat diskalakan guna memenuhi kebutuhan bisnis akan throughput tinggi, operasi latensi rendah, dan modifikasi konfigurasi yang fleksibel.

Konektor Tair mendukung hal-hal berikut:

Kategori

Deskripsi

Tipe yang didukung

Tabel sink

Mode eksekusi

Hanya mode stream yang didukung.

Format data

STRING

Metrik unik

  • numBytesSend

  • numBytesSendPerSecond

  • numRecordsSend

  • numRecordsSendPerSecond

  • numRecordSendErrors

  • currentSendTime

Catatan

Untuk informasi lebih lanjut mengenai metrik tersebut, lihat Metrik.

Tipe API

SQL

Pembaruan atau penghapusan data dalam tabel sink

Ya.

Prasyarat

Batasan

  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) versi 6.0.6 atau lebih baru yang mendukung konektor Tair (Edisi Perusahaan).

  • Konektor Tair (Edisi Perusahaan) tidak mengizinkan Anda mengonfigurasi multiple host.

Sintaksis

Tair (Edisi Perusahaan) mendukung semua struktur data Tair buatan sendiri berdasarkan kompatibilitas dengan struktur data Redis berikut: STRING, LIST, SET, HASHMAP, dan SORTEDSET.

Berikut ini contoh pernyataan DDL.

CREATE TABLE tair_table (
  a STRING,
  b STRING,
  PRIMARY KEY (a) NOT ENFORCED -- Wajib. 
) WITH (
  'connector'= 'tair',
  'host' = '<yourHost>'
);
Catatan

Tair kompatibel dengan struktur data Redis. Untuk informasi lebih lanjut mengenai contoh sintaksis struktur data Redis, lihat Konektor Tair (Kompatibel dengan Redis OSS).

Parameter dalam klausa WITH

Parameter

Deskripsi

Tipe data

Wajib

Nilai default

Keterangan

connector

Tipe tabel.

String

Ya

None

Nilai bidang statis adalah tair.

host

Titik akhir server Tair.

String

Ya

None

Kami menyarankan agar Anda menggunakan titik akhir internal.

Catatan

Koneksi jaringan publik mungkin tidak stabil karena faktor seperti latensi jaringan dan batasan bandwidth.

mode

Struktur data Tair.

STRING

Ya

None

Nilai yang valid:

  • string

  • list

  • set

  • hashmap

  • sortedset

  • tairstring

  • tairhash

  • tairzset

  • tairbloom

  • tairdoc

  • tairsearch

  • tairts

  • taircpc

  • tairroaring

  • tairgis

  • tairvector

Tair mendukung struktur data Redis dan struktur data Tair buatan sendiri. Untuk informasi lebih lanjut mengenai nilai yang valid, lihat Struktur data yang didukung oleh tabel sink ApsaraDB for Redis dan Format struktur data Tair.

Catatan
  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.1 atau lebih baru yang mendukung TairTs, TairCpc, TairRoaring, TairVector, dan TairGis.

  • Tabel sink Tair mendukung struktur data Tair buatan sendiri. Pernyataan DDL yang digunakan untuk membuat tabel sink Tair harus mendefinisikan tabel sesuai format yang ditentukan, dan kunci primer harus ditentukan untuk tabel tersebut.

port

Nomor port server Tair.

INT

Tidak

6379

N/A.

password

Password yang digunakan untuk mengakses database Tair.

String

Tidak

String kosong

String kosong menunjukkan bahwa tidak ada verifikasi yang dilakukan.

dbNum

ID database tujuan.

INT

Tidak

0

N/A.

clusterMode

Menentukan apakah akan menggunakan arsitektur kluster.

BOOLEAN

Tidak

false

Nilai yang valid:

  • true: menggunakan arsitektur kluster.

  • false: layanan berjalan dalam mode standalone.

ignoreDelete

Menentukan apakah pesan retraction diabaikan.

BOOLEAN

Tidak

false

Nilai yang valid:

  • false: Saat menerima pesan retraction, data yang dimasukkan beserta kuncinya akan dihapus. Ini adalah nilai default.

  • true: Saat menerima pesan retraction, data yang dimasukkan beserta kuncinya tetap dipertahankan.

expiration

Waktu hidup (TTL) yang ditentukan untuk kunci dari data yang dimasukkan.

LONG

Tidak

0

Nilai 0 menunjukkan bahwa TTL tidak dikonfigurasi. Jika nilai parameter ini lebih besar dari 0, TTL akan dikonfigurasi untuk kunci data yang dimasukkan. Satuan: milidetik.

expirationAt

Waktu kedaluwarsa absolut yang ditentukan untuk kunci dari data yang dimasukkan.

LONG

Tidak

0

Satuan: milidetik. Nilai default: 0. Nilai default menunjukkan bahwa tidak ada waktu kedaluwarsa yang ditentukan.

Jika nilai parameter ini lebih besar dari 0 dan parameter expiration diatur ke 0, maka waktu kedaluwarsa absolut akan ditentukan untuk kunci data yang dimasukkan.

incrMode

Mode sink database Tair.

STRING

Tidak

None

Nilai yang valid:

  • None: menunjukkan operasi insert. Ini adalah nilai default.

  • int: menunjukkan operasi INCRBY. Nilai INCR ditetapkan ke nilai parameter incrValue.

  • float: menunjukkan operasi INCRBYFLOAT. Nilai INCR ditetapkan ke nilai parameter incrValue.

  • dynamic_int: Mewakili operasi incrby. Nilai increment diambil dari kolom yang ditentukan oleh incrValue.

  • dynamic_float: Mewakili operasi incrbyfloat, di mana nilai increment ditentukan dalam kolom incrValue.

incrValue

Nilai INCR yang ditentukan berdasarkan nilai parameter incrMode.

STRING

Tidak

None

Parameter ini menerima nilai-nilai berikut:

  • Jika `incrMode` adalah `None`, parameter ini tidak ditentukan.

  • Jika parameter incrMode diatur ke int atau float, nilai parameter incrValue adalah nilai INCR.

  • Jika parameter incrMode diatur ke dynamic_int atau dynamic_float, nilai parameter incrValue adalah nama kolom tempat nilai INCR berada dalam pernyataan DDL.

fieldExpireMode

Mode kedaluwarsa field dalam TairHash atau skey dalam TairTS.

String

Tidak

None

Nilai yang valid:

  • None: Tidak ada waktu kedaluwarsa yang ditentukan.

  • millisecond: Waktu kedaluwarsa relatif ditentukan. Waktu kedaluwarsa ditetapkan ke nilai parameter fieldExpireValue.

    Catatan

    Mode kedaluwarsa skey dalam TairTS harus millisecond.

  • unixtime: Waktu kedaluwarsa absolut ditentukan. Waktu kedaluwarsa ditetapkan ke nilai parameter fieldExpireValue.

  • dynamic_millisecond: Waktu kedaluwarsa relatif ditentukan oleh kolom yang sesuai dengan fieldExpireValue.

  • dynamic_unixtime: Waktu kedaluwarsa absolut disediakan oleh kolom yang ditentukan oleh fieldExpireValue.

fieldExpireValue

Waktu kedaluwarsa field dalam TairHash atau skey dalam TairTS.

String

Tidak

None

Nilai yang valid:

  • Jika parameter fieldExpireMode diatur ke None, tidak ada waktu kedaluwarsa yang ditentukan.

  • Jika parameter fieldExpireMode diatur ke millisecond atau unixtime, nilai parameter fieldExpireValue adalah waktu kedaluwarsa.

  • Jika `fieldExpireMode` adalah `dynamic_millisecond` atau `dynamic_unixtime`, `fieldExpireValue` adalah nama kolom DDL untuk waktu hidup (TTL).

Pemetaan tipe data

Tipe field Flink

Tipe data Tair

VARCHAR

STRING

DOUBLE

DOUBLE

Format struktur data Tair

Tipe

Format

Perintah

TairString

Jika parameter incrMode diatur ke None, pernyataan DDL memiliki dua kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi nilai bertipe STRING.

exset key value

Jika parameter incrMode diatur ke int atau float, pernyataan DDL hanya memiliki satu kolom yang berisi kunci bertipe STRING.

exincrby/exincrbyfloat key incrValue

Jika parameter incrMode diatur ke dynamic_int atau dynamic_float, pernyataan DDL memiliki dua kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi nilai incrValue bertipe STRING.

exincrby/exincrbyfloat key incrValue

TairHash

Jika parameter incrMode diatur ke None, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi field bertipe STRING.

  • Kolom ketiga berisi nilai field bertipe STRING.

exhset key field value

Jika parameter incrMode diatur ke int atau float, pernyataan DDL memiliki dua kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi field bertipe STRING.

 exhincrby/exincrbyfloat key field incrValue

Jika parameter incrMode diatur ke dynamic_int atau dynamic_float, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi field bertipe STRING.

  • Kolom ketiga berisi nilai incrValue bertipe STRING yang sesuai dengan field.

exhincrby/exincrbyfloat key field incrValue

TairZset

Jika parameter incrMode diatur ke None, TairZset mendukung pengurutan data multidimensi. TairZset memungkinkan Anda mengurutkan data bertipe DOUBLE dari maksimal 256 dimensi. Oleh karena itu, pernyataan DDL memiliki 3 hingga 258 kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua adalah member dan bertipe STRING.

  • Kolom-kolom berikutnya berisi skor bertipe DOUBLE.

exzadd key score member
Catatan

Jika Anda ingin mengurutkan data dari beberapa dimensi, pastikan format skor semua dimensi sama.

Jika parameter incrMode diatur ke int atau float, pernyataan DDL memiliki dua kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi member bertipe STRING.

exzincyby key member incrValue

Jika parameter incrMode diatur ke dynamic_int atau dynamic_float, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi member bertipe STRING.

  • Kolom ketiga berisi nilai incrValue bertipe STRING.

exzincyby key member incrValue

TairBloom

Parameter incrMode harus diatur ke None.

Saat data pertama kali dimasukkan ke tabel sink Tair, kunci TairBloom dengan kapasitas default 100 elemen dan laju error 0,01 akan dibuat. Pernyataan DDL memiliki dua kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi item bertipe STRING.

BF.ADD key item

TairDoc

Parameter incrMode harus diatur ke None. Pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi path bertipe STRING.

  • Kolom ketiga bertipe STRING dan menyimpan data JSON.

JSON.SET key path json

TairSearch

Jika parameter incrMode diatur ke None, pernyataan DDL memiliki empat kolom.

  • Kolom pertama adalah indeks dan bertipe STRING.

  • Kolom kedua berisi ID dokumen bertipe STRING.

  • Kolom ketiga berisi dokumen bertipe STRING. Dokumen harus dalam format JSON.

  • Kolom keempat berisi pemetaan bertipe STRING.

TFT.ADDDOC index document docid
Catatan

Sebelum memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TFT.CREATEINDEX index mappings

Jika parameter incrMode diatur ke int atau float, pernyataan DDL memiliki empat kolom.

  • Kolom pertama berisi indeks bertipe STRING.

  • Kolom kedua berisi ID dokumen bertipe STRING.

  • Kolom ketiga berisi field bertipe STRING.

  • Kolom keempat berisi pemetaan bertipe STRING.

Contoh perintah untuk operasi dokumen:

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
Catatan

Sebelum memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TFT.CREATEINDEX index mappings

Jika parameter incrMode diatur ke dynamic_int atau dynamic_float, pernyataan DDL memiliki lima kolom.

  • Kolom pertama berisi indeks bertipe STRING.

  • Kolom kedua berisi ID dokumen bertipe STRING.

  • Kolom ketiga berisi field bertipe STRING.

  • Kolom keempat berisi pemetaan bertipe STRING.

  • Kolom kelima berisi nilai incrValue bertipe STRING.

Perintah berikut digunakan untuk operasi dokumen.

TFT.INCRLONGDOCFIELD/TFT.INCRFLOATDOCFIELD index doc_id field increment
Catatan

Sebelum memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TFT.CREATEINDEX index mappings

TairCpc

Parameter incrMode harus diatur ke None. Pernyataan DDL memiliki dua kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi item bertipe STRING.

CPC.UPDATE key item

TairGis

Parameter incrMode harus diatur ke None. Pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi nama poligon bertipe STRING.

  • Kolom ketiga berisi nilai Well-known Text (WKT) poligon. Nilai ini bertipe STRING.

GIS.ADD area polygonName polygonWkt

TairRoaring

Parameter incrMode harus diatur ke None. Pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi kunci bertipe STRING.

  • Kolom kedua berisi offset yang ditentukan bertipe BIGINT.

  • Kolom ketiga berisi nilai bertipe BIGINT. Nilai yang valid: 0 dan 1.

TR.SETBIT key offset value

Vector

Parameter incrMode harus diatur ke None. Pernyataan DDL memiliki enam kolom.

  • Kolom pertama berisi nama indeks bertipe STRING.

  • Kolom kedua berisi kunci primer catatan. Nilainya bertipe STRING.

  • Kolom ketiga berisi data vektor bertipe STRING.

  • Kolom keempat berisi dimensi vektor bertipe INT.

  • Kolom kelima berisi algoritma yang digunakan untuk membangun dan mengkueri indeks. Nilainya bertipe STRING.

  • Kolom keenam berisi metode jarak yang digunakan untuk menghitung jarak vektor. Nilainya bertipe STRING.

TVS.HSET index_name key VECTOR vector_data
Catatan

Sebelum memasukkan data ke tabel sink Tair, Anda harus membuat indeks dan menambahkan pemetaan. Contoh perintah:

TVS.CREATEINDEX index_name dims algorithm distance_method

TairTs

Jika parameter incrMode diatur ke None, pernyataan DDL memiliki empat kolom.

  • Kolom pertama berisi pkey bertipe STRING. Pkey menunjukkan sekelompok timeline.

  • Kolom kedua berisi skey bertipe STRING. Skey menunjukkan sebuah timeline.

  • Kolom ketiga berisi timestamp bertipe STRING.

  • Kolom keempat berisi nilai bertipe STRING.

EXTS.S.RAW_MODIFY Pkey Skey timestamp value

Jika parameter incrMode diatur ke float, pernyataan DDL memiliki tiga kolom.

  • Kolom pertama berisi pkey bertipe STRING.

  • Kolom kedua berisi skey bertipe STRING.

  • Kolom ketiga berisi timestamp bertipe STRING.

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

Jika parameter incrMode diatur ke dynamic_float, pernyataan DDL memiliki empat kolom.

  • Kolom pertama berisi pkey bertipe STRING.

  • Kolom kedua berisi skey bertipe STRING.

  • Kolom ketiga berisi timestamp bertipe STRING.

  • Kolom keempat berisi nilai incrValue bertipe STRING. Nilai incrValue ini sesuai dengan timestamp yang tercantum di kolom ketiga.

EXTS.S.RAW_INCRBY Pkey Skey timestamp incrValue

Kode contoh

  • Kode contoh untuk memasukkan data ke tabel sink Tair dalam mode umum

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      index_name STRING,
      doc_id STRING,
      doc STRING,
      mapping STRING,
      PRIMARY KEY(index_name) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairsearch',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}'
    );
    
    INSERT INTO tair_output
    SELECT 'index' as index,v,p,'{"mappings":{"_source":{"enabled":true},"properties":{"product_id":{"type":"keyword","ignore_above":128},"product_name":{"type":"text"}}}}' as mapping
    FROM datagen_stream;
  • Contoh data yang dimasukkan untuk pola incr

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      step STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'dynamic_float',
      'incrValue' = 'step'
    );
    
    INSERT INTO tair_output
    SELECT *
    FROM datagen_stream;           
    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_output (
      key STRING,
      PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairstring',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'incrMode' = 'float',
      'incrValue' = '11.11'
    );
    
    INSERT INTO tair_output
    SELECT v
    FROM datagen_stream;
  • Contoh: Data yang ditulis ke tabel sink menggunakan pola fieldExpire

    CREATE TEMPORARY TABLE datagen_stream (
      v STRING,
      p STRING,
      s STRING
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE tair_ouput (
    	key STRING,
    	field STRING,
    	value STRING,
    	PRIMARY KEY (key) NOT ENFORCED
    ) WITH (
      'connector' = 'tair',
      'mode' = 'tairhash',
      'host' = '${tairHost}',
      'port' = '${tairPort}',
      'password' = '${password}',
      'fieldExpireMode' = 'millisecond',
      'fieldExpireValue' = '1000'
    );
    
    INSERT INTO tair_output
    SELECT v, p, s
    FROM datagen_stream;