Topik ini menjelaskan cara menggunakan konektor Tair (Kompatibel dengan Redis OSS).
Informasi latar belakang
Alibaba Cloud Tair adalah layanan database yang kompatibel dengan protokol open source Redis dan menyediakan penyimpanan hibrida memori-dan-disk. Tair menggunakan arsitektur aktif-aktif dengan ketersediaan tinggi (HA) serta arsitektur kluster yang dapat diskalakan untuk memenuhi kebutuhan bisnis akan throughput tinggi, latensi rendah, dan skalabilitas fleksibel ke atas maupun ke bawah. Untuk informasi selengkapnya, lihat Apa itu Tair (Kompatibel dengan Redis OSS)?.
Tabel berikut menjelaskan fitur-fitur yang didukung oleh konektor Redis.
Category | Details |
Jenis yang didukung | Tabel dimensi dan tabel sink |
Mode yang didukung | Streaming mode |
Format data | String |
Metrik pemantauan spesifik |
Catatan Untuk informasi selengkapnya tentang metrik tersebut, lihat Metrik pemantauan. |
Jenis API | SQL |
Mendukung pembaruan atau penghapusan data di tabel sink | Ya |
Prasyarat
Anda telah membuat instans Tair (Kompatibel dengan Redis OSS). Untuk informasi selengkapnya, lihat Langkah 1: Buat instans.
Anda telah mengonfigurasi daftar putih. Untuk informasi selengkapnya, lihat Langkah 2: Konfigurasi daftar putih.
Batasan
Konektor Redis menyediakan semantik best-effort dan tidak menjamin pengiriman exactly-once. Anda harus memastikan idempotensi dalam aplikasi Anda.
Batasan berikut berlaku untuk tabel dimensi:
Hanya tipe data STRING dan HASHMAP yang dapat dibaca dari Redis.
Semua bidang dalam tabel dimensi harus bertipe STRING. Anda harus mendeklarasikan tepat satu primary key.
Saat melakukan join dengan tabel dimensi, klausa ON harus mencakup kondisi ekuivalen untuk primary key.
Isu yang diketahui dan solusi
Fitur cache pada Ververica Runtime (VVR) 8.0.9 memiliki masalah. Untuk menonaktifkannya, tambahkan sink.buffer-flush.max-rows = '0' ke klausa WITH pada tabel sink.
Sintaksis
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- Wajib.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- Wajib untuk tabel sink.
);Parameter WITH
Umum
Parameter | Description | Data type | Required | Default | Remarks |
connector | Jenis tabel. | String | Ya | None | Nilainya harus redis. |
host | Alamat koneksi server Redis. | String | Ya | None | Gunakan titik akhir jaringan internal. Catatan Koneksi ke titik akhir jaringan publik mungkin tidak stabil karena faktor seperti latensi jaringan dan batasan bandwidth. |
port | Port koneksi server Redis. | Int | Tidak | 6379 | None. |
password | Password database Redis. | String | Tidak | String kosong, yang berarti tidak ada validasi yang dilakukan. | None. |
dbNum | Nomor database yang akan dioperasikan. | Int | Tidak | 0 | None. |
clusterMode | Menentukan apakah kluster Redis berada dalam cluster mode. | Boolean | Tidak | false | None. |
hostAndPorts | Host dan nomor port kluster Redis. Catatan Jika cluster mode diaktifkan dan HA tidak diperlukan, Anda dapat mengonfigurasi hanya satu host menggunakan parameter host dan port, atau hanya mengonfigurasi parameter ini. Parameter ini memiliki prioritas lebih tinggi daripada parameter host dan port individual. | String | Tidak | Empty | Jika |
key-prefix | Awalan untuk nilai primary key tabel. | String | Tidak | None | Setelah parameter ini dikonfigurasi, awalan akan secara otomatis ditambahkan ke nilai bidang primary key saat Anda melakukan kueri atau menulis data. Awalan tersebut terdiri dari awalan kunci (key-prefix) dan pemisah awalan (key-prefix-delimiter). Catatan Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru. |
key-prefix-delimiter | Pemisah antara nilai primary key dan awalannya. | String | Tidak | None | |
connection.pool.max-total | Jumlah maksimum koneksi yang dapat dialokasikan oleh kolam koneksi. | Int | Tidak | 8 | Catatan Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru. |
connection.pool.max-idle | Jumlah maksimum koneksi idle dalam kolam koneksi. | Int | Tidak | 8 | |
connection.pool.min-idle | Jumlah minimum koneksi idle dalam kolam koneksi. | Int | Tidak | 0 | |
connect.timeout | Timeout untuk membuat koneksi. | Duration | Tidak | 3000 ms | |
socket.timeout | Timeout untuk menerima data dari server Redis (socket timeout). | Duration | Tidak | 3000 ms | |
cacert.filepath | Jalur lengkap file sertifikat SSL/TLS. Format file harus jks. | String | Tidak | None, yang menunjukkan bahwa enkripsi SSL/TLS dinonaktifkan. | Unduh sertifikat CA sebagaimana dijelaskan dalam Aktifkan enkripsi TLS, lalu unggah sertifikat tersebut di bagian Additional Dependency Files pekerjaan. Setelah diunggah, sertifikat CA disimpan di direktori /flink/usrlib. Untuk informasi selengkapnya tentang cara mengunggah file di bagian Additional Dependency Files, lihat Deploy a job. Contoh: Catatan Parameter ini hanya didukung di VVR 11.1 dan versi yang lebih baru. |
Khusus Sink
Parameter | Description | Data type | Required | Default | Remarks |
mode | Tipe data Redis yang digunakan. | String | Ya | None | Tabel sink Tair mendukung lima tipe data Redis. DDL harus didefinisikan dalam format tertentu, dan primary key harus didefinisikan. Untuk informasi selengkapnya, lihat Format tipe data untuk tabel sink Redis. |
flattenHash | Menentukan apakah data HASHMAP ditulis dalam mode multi-value. | Boolean | Tidak | false | Nilai yang valid:
Catatan
|
ignoreDelete | Menentukan apakah pesan retraction diabaikan. | Boolean | Tidak | false | Nilai yang valid:
|
expiration | Menetapkan TTL untuk kunci data yang ditulis. | Long | Tidak | 0, yang berarti tidak ada TTL yang ditetapkan. | Jika nilai parameter ini lebih besar dari 0, TTL yang sesuai akan ditetapkan untuk kunci data yang ditulis. Satuannya adalah milidetik. |
sink.buffer-flush.max-rows | Jumlah maksimum catatan yang dapat disimpan dalam cache. | Int | Tidak | 200 | Catatan yang dicache mencakup semua event append, update, dan delete. Cache akan di-flush ketika jumlah maksimum catatan terlampaui. Catatan
|
sink.buffer-flush.interval | Interval flush cache. | Duration | Tidak | 1000 ms | Cache di-flush secara asinkron. Catatan
|
Khusus Tabel Dimensi
Parameter | Description | Data type | Required | Default | Remarks |
mode | Tipe data Redis yang dibaca. | String | Tidak | STRING | Nilai yang valid: STRING: Membaca data sebagai tipe STRING secara default. HASHMAP: Membaca data HASHMAP dalam mode multi-value. Dalam hal ini, DDL harus mendeklarasikan beberapa bidang non-primary key.
Catatan Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru. Untuk membaca data HASHMAP dalam mode single-value, konfigurasikan parameter hashName. |
hashName | Kunci yang digunakan saat membaca data HASHMAP dalam mode single-value. | String | Tidak | None | Jika Anda tidak menentukan parameter mode tetapi ingin membaca data HASHMAP dalam mode single-value, Anda harus mengonfigurasi hashName. Dalam hal ini, DDL hanya perlu mendeklarasikan dua bidang. Nilai bidang primary key pertama sesuai dengan field, dan nilai bidang non-primary key kedua sesuai dengan nilai. |
cache | Kebijakan cache. | String | Tidak | None | Tabel dimensi Tair mendukung kebijakan cache berikut: None (default): Tidak ada cache. LRU: Menyimpan sebagian data tabel dimensi dalam cache. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari di cache. Jika data tidak ditemukan, sistem melakukan kueri ke tabel dimensi fisik. ALL: Menyimpan semua data dari tabel dimensi dalam cache. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke dalam cache. Semua pencarian selanjutnya dilakukan terhadap cache. Jika data tidak ditemukan dalam cache, berarti kunci tersebut tidak ada. Cache penuh memiliki waktu kedaluwarsa, setelah itu cache dimuat ulang. Penting
|
cacheSize | Ukuran cache. | Long | Tidak | 10000 | Saat Anda memilih kebijakan cache LRU, Anda harus menetapkan ukuran cache. |
cacheTTLMs | Durasi timeout cache, dalam milidetik. | Long | Tidak | None | Konfigurasi cacheTTLMs bergantung pada pengaturan cache: Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya cache tidak memiliki waktu kedaluwarsa. Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa. Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak direload. |
cacheEmpty | Menentukan apakah hasil kosong di-cache. | Boolean | Tidak | true | None. |
cacheReloadTimeBlackList | Waktu penyegaran cache yang dilarang. Saat kebijakan cache diatur ke ALL, Anda dapat mengaktifkan waktu yang dilarang untuk mencegah cache diperbarui selama periode tersebut, misalnya selama festival belanja Double 11. | String | Tidak | None | Formatnya sebagai berikut:
Gunakan pemisah sebagai berikut:
|
async | Menentukan apakah data dikembalikan secara asinkron. | Boolean | Tidak | false |
|
Format tipe data untuk tabel sink Redis
Type | Format | Redis insert command |
STRING type | DDL dua kolom:
|
|
LIST type | DDL dua kolom:
|
|
SET type | DDL dua kolom:
|
|
HASHMAP type | Secara default, DDL tiga kolom:
|
|
Jika flattenHash diatur ke true, DDL mendukung beberapa kolom. Contoh berikut menggunakan empat kolom:
|
| |
SORTEDSET type | DDL tiga kolom:
|
|
Pemetaan tipe
Type | Redis field type | Flink field type |
Umum | STRING | STRING |
Hanya untuk tabel sink | SCORE | DOUBLE |
Tipe SCORE Redis berlaku untuk SORTEDSET (sorted sets). Anda harus secara manual menetapkan skor DOUBLE untuk setiap nilai. Nilai-nilai tersebut kemudian diurutkan secara ascending berdasarkan skornya.
Contoh penggunaan
Sink table
Menulis data STRING: Pada contoh kode, nilai kolom
user_idpada tabel sinkredis_sinkditulis ke Redis sebagai kunci, sedangkan nilai kolomlogin_timeditulis sebagai nilai.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID login_time STRING -- Logon timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis key login_time STRING, -- Redis value PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- Use STRING mode 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;Menulis data HASHMAP dalam mode multi-value: Pada contoh kode, nilai kolom
order_idpada tabel sinkredis_sinkditulis ke Redis sebagai kunci. Nilai kolomproduct_name,quantity, danamountmasing-masing ditulis ke field product_name, quantity, dan amount.CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- Order ID product_name STRING, -- Product name quantity STRING, -- Quantity amount STRING -- Order amount ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Order ID, used as the Redis key product_name STRING, -- Product name, used as a Redis Hash field quantity STRING, -- Quantity, used as a Redis Hash field amount STRING, -- Order amount, used as a Redis Hash field PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- Use HASHMAP mode 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;Menulis data HASHMAP dalam mode single-value: Pada contoh kode, nilai kolom
order_idpada tabel sinkredis_sinkditulis ke Redis sebagai kunci. Nilai kolomproduct_nameditulis sebagai field, sedangkan nilai kolomquantityditulis sebagai nilai.CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- Order ID product_name STRING, -- Product name quantity STRING -- Quantity ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis key product_name STRING, -- Redis field quantity STRING, -- Redis value PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
Tabel dimensi
Membaca data STRING: Pada contoh kode, nilai kolom
user_idpada tabel dimensiredis_dimsesuai dengan kunci, sedangkan nilai kolomuser_namesesuai dengan nilai.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis key) user_name STRING, -- Username (Redis value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis host address 'port' = 'yourPort', -- Redis port 'password' = 'yourPassword', -- Redis password 'mode' = 'STRING' -- Use STRING mode ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID redis_user_id STRING, -- User ID from Redis user_name STRING -- Username ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID (from Kafka) t2.user_id, -- User ID (from Redis) t2.user_name -- Username (from Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;Membaca data HASHMAP dalam mode multi-value: Pada contoh kode, nilai kolom
user_idpada tabel dimensiredis_dimmerupakan kunci. Nilai kolomuser_namedipetakan ke field user_name, nilai kolomemaildipetakan ke field email, dan nilai kolomregister_timedipetakan ke field register_time.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID click_time TIMESTAMP(3), -- Click time proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis key) user_name STRING, -- Username (part of a Redis field-value pair) email STRING, -- Email (part of a Redis field-value pair) register_time STRING, -- Registration time (part of a Redis field-value pair) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- Use HASHMAP mode ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID user_name STRING, -- Username email STRING, -- Email register_time STRING, -- Registration time click_time TIMESTAMP(3) -- Click time ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID t2.user_name, -- Username t2.email, -- Email t2.register_time, -- Registration time t1.click_time -- Click time FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;Membaca data HASHMAP dalam mode single-value: Pada contoh kode, nilai parameter
hashNameadalah testKey, yang digunakan sebagai kunci. Nilai kolomuser_idpada tabel dimensiredis_dimmerupakan field, sedangkan nilai kolomuser_namemerupakan nilai.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- Data format is JSON 'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis hash field) user_name STRING, -- Username (Redis hash value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis host address 'port' = 'yourPort', -- Redis port 'password' = 'yourPassword',-- Redis password 'hashName' = 'testkey' -- Fixed Redis hash name ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID redis_user_id STRING, -- User ID from Redis user_name STRING -- Username ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID (from Kafka) t2.user_id, -- User ID (from Redis) t2.user_name -- Username (from Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;