Topik ini menjelaskan cara menggunakan Konektor Tair (Redis OSS-compatible).
Informasi latar belakang
Alibaba Cloud Tair adalah layanan database yang kompatibel dengan protokol open source Redis. Layanan ini menyediakan penyimpanan hibrida yang memanfaatkan memori dan disk secara bersamaan. Tair memiliki arsitektur aktif-aktif dengan ketersediaan tinggi serta arsitektur kluster yang dapat diskalakan untuk memenuhi kebutuhan bisnis akan throughput tinggi, latensi rendah, dan skalabilitas fleksibel. Untuk informasi selengkapnya, lihat Apa itu Tair (Kompatibel dengan Redis OSS)?.
Konektor Redis mendukung fitur-fitur berikut.
Category | Details |
Supported types | Tabel dimensi dan tabel sink |
Supported modes | Streaming |
Format data | String |
Specific monitoring metrics |
Catatan Untuk informasi selengkapnya mengenai metrik tersebut, lihat Deskripsi metrik. |
API types | SQL |
Supports data updates or deletions in sink tables | Yes |
Prasyarat
Buat instans Tair (Kompatibel dengan Redis OSS). Untuk informasi selengkapnya, lihat Langkah 1: Buat instans.
Atur daftar putih. Untuk informasi selengkapnya, lihat Langkah 2: Atur daftar putih.
Batasan
Konektor Redis hanya menyediakan semantik best-effort dan tidak dapat menjamin pengiriman exactly-once. Oleh karena itu, Anda harus memastikan idempotensi.
Batasan berikut berlaku untuk tabel dimensi:
Anda hanya dapat membaca data tipe STRING dan HASHMAP dari penyimpanan data Redis.
Semua bidang dalam tabel dimensi harus bertipe STRING. Anda harus mendeklarasikan tepat satu primary key.
Saat melakukan join dengan tabel dimensi, klausa ON harus mencakup kondisi ekuivalen untuk primary key.
Masalah yang diketahui dan solusi
Ververica Runtime (VVR) 8.0.9 memiliki masalah yang diketahui terkait fitur caching-nya. Untuk mengatasi masalah ini, Anda dapat menonaktifkan fitur tersebut dengan mengatur 'sink.buffer-flush.max-rows' = '0' di klausa WITH tabel sink.
Sintaksis
CREATE TABLE redis_table (
col1 STRING,
col2 STRING,
PRIMARY KEY (col1) NOT ENFORCED -- Wajib.
) WITH (
'connector' = 'redis',
'host' = '<yourHost>',
'mode' = 'STRING' -- Wajib untuk tabel sink.
);Parameter WITH
Umum
Parameter | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
connector | Jenis tabel. | String | Ya | None | Nilainya harus redis. |
host | Alamat koneksi server Redis. | String | Ya | None | Gunakan alamat jaringan internal. Catatan Koneksi ke alamat jaringan publik mungkin tidak stabil karena latensi jaringan dan batasan bandwidth. |
port | Port koneksi server Redis. | Int | Tidak | 6379 | None. |
password | Password untuk database Redis. | String | Tidak | String kosong. Artinya, tidak dilakukan validasi. | None. |
dbNum | Nomor database yang akan dioperasikan. | Int | Tidak | 0 | None. |
clusterMode | Menentukan apakah kluster Redis berada dalam mode kluster. | Boolean | Tidak | false | None. |
hostAndPorts | Host dan nomor port kluster Redis. Catatan Jika mode kluster diaktifkan dan koneksi ketersediaan tinggi tidak diperlukan, Anda dapat menggunakan parameter host dan port untuk mengonfigurasi hanya satu host. Anda juga dapat hanya mengonfigurasi parameter ini. Parameter ini memiliki prioritas lebih tinggi daripada parameter host dan port individual. | String | Tidak | Empty | Jika |
key-prefix | Awalan untuk nilai primary key tabel. | String | Tidak | None | Setelah Anda mengonfigurasi parameter ini, awalan akan secara otomatis ditambahkan ke nilai bidang primary key saat Anda melakukan kueri atau menulis data. Awalan tersebut terdiri atas awalan kunci (key-prefix) dan pemisah awalan (key-prefix-delimiter). Catatan Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru. |
key-prefix-delimiter | Pemisah antara nilai primary key dan awalannya. | String | Tidak | None | |
connection.pool.max-total | Jumlah maksimum koneksi yang dapat dialokasikan oleh kolam koneksi. | Int | Tidak | 8 | Catatan Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru. |
connection.pool.max-idle | Jumlah maksimum koneksi idle dalam kumpulan koneksi. | Int | Tidak | 8 | |
connection.pool.min-idle | Jumlah minimum koneksi idle dalam kumpulan koneksi. | Int | Tidak | 0 | |
connect.timeout | Timeout untuk membuat koneksi. | Duration | Tidak | 3000ms | |
socket.timeout | Timeout untuk menerima data dari server Redis (socket timeout). | Duration | Tidak | 3000ms | |
cacert.filepath | Jalur lengkap file sertifikat SSL/TLS. Format file harus JKS. | String | Tidak | None. Artinya, enkripsi SSL/TLS dinonaktifkan. | Untuk informasi selengkapnya, lihat Aktifkan enkripsi TLS untuk mengunduh sertifikat CA. Unggah sertifikat tersebut di bagian Additional Dependencies pekerjaan. Setelah diunggah, sertifikat CA disimpan di folder /flink/usrlib. Untuk informasi selengkapnya tentang cara mengunggah file di bagian Additional Dependencies, lihat Deploy a job. Contoh: Catatan Parameter ini hanya didukung di VVR 11.1 dan versi yang lebih baru. |
Khusus tabel sink
Parameter | Deskripsi | Tipe data | Diperlukan | Nilai default | Catatan |
mode | Tipe data Redis yang sesuai. | String | Ya | None | Tabel sink Tair mendukung lima tipe data Redis. DDL harus didefinisikan dalam format yang ditentukan, dan primary key harus didefinisikan. Untuk informasi selengkapnya, lihat Format data untuk tabel sink Redis. |
flattenHash | Menentukan apakah data HASHMAP ditulis dalam mode multi-nilai. | Boolean | Tidak | false | Nilai valid:
Catatan
|
ignoreDelete | Menentukan apakah pesan retraction diabaikan. | Boolean | Tidak | false | Nilai valid:
|
expiration | Menetapkan TTL untuk kunci data yang ditulis. | Long | Tidak | 0. Artinya, tidak ada TTL yang ditetapkan. | Jika nilai parameter ini lebih besar dari 0, TTL yang sesuai akan ditetapkan untuk kunci data yang ditulis. Satuannya adalah milidetik. |
sink.buffer-flush.max-rows | Jumlah maksimum catatan yang dapat disimpan dalam cache. | Int | Tidak | 200 | Catatan yang dicache mencakup semua event append, update, dan delete. Cache akan di-flush ketika jumlah maksimum catatan terlampaui. Catatan
|
sink.buffer-flush.interval | Interval flush cache. | Duration | Tidak | 1000ms | Cache di-flush secara asinkron. Catatan
|
Khusus tabel dimensi
Parameter | Description | Data type | Required | Default value | Remarks |
mode | Tipe data yang dibaca dari Redis. | String | Tidak | STRING | Nilai valid: STRING: Secara default membaca data sebagai tipe STRING. HASHMAP: Membaca data sebagai tipe HASHMAP dalam mode multi-nilai. Dalam hal ini, Anda harus mendeklarasikan beberapa bidang non-primary key dalam DDL.
Catatan Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru. Untuk membaca data HASHMAP dalam mode single-value, konfigurasikan parameter hashName. |
hashName | Kunci yang digunakan saat membaca data HASHMAP dalam mode single-value. | String | Tidak | None | Jika Anda tidak menentukan parameter mode tetapi ingin membaca data HASHMAP dalam mode single-value, Anda harus mengonfigurasi hashName. Dalam hal ini, Anda hanya perlu mendeklarasikan dua bidang dalam DDL. Nilai bidang primary key pertama berfungsi sebagai field, dan nilai bidang non-primary key kedua berfungsi sebagai nilai. |
cache | Kebijakan cache. | String | Tidak | None | Tabel dimensi Tair mendukung kebijakan cache berikut: None (default): Tidak ada cache. LRU: Menyimpan sebagian data dari tabel dimensi dalam cache. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari di cache. Jika data tidak ditemukan, sistem mencari di tabel dimensi fisik. ALL: Menyimpan semua data dari tabel dimensi dalam cache. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke dalam cache. Semua pencarian selanjutnya dilakukan terhadap cache. Jika data tidak ditemukan di cache, artinya kunci tersebut tidak ada. Cache penuh memiliki waktu kedaluwarsa. Setelah kedaluwarsa, cache penuh dimuat ulang. Penting
|
cacheSize | Ukuran cache. | Long | Tidak | 10000 | Saat Anda memilih kebijakan cache LRU, Anda harus menetapkan ukuran cache. |
cacheTTLMs | Timeout cache dalam milidetik. | Long | Tidak | None | Konfigurasi cacheTTLMs bergantung pada pengaturan cache: Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya, cache tidak memiliki waktu kedaluwarsa. Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa. Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak direload. |
cacheEmpty | Menentukan apakah hasil kosong di-cache. | Boolean | Tidak | true | None. |
cacheReloadTimeBlackList | Waktu penyegaran cache yang dilarang. Saat kebijakan cache diatur ke ALL, Anda dapat mengaktifkan periode waktu yang dilarang untuk mencegah penyegaran cache selama periode tersebut, misalnya selama acara Double 11. | String | Tidak | None | Format:
Gunakan pemisah sebagai berikut:
|
async | Menentukan apakah data dikembalikan secara asinkron. | Boolean | Tidak | false |
|
Format data untuk tabel sink Redis
Type | Format | Redis command to insert data |
STRING type | DDL memiliki dua kolom:
|
|
LIST type | DDL memiliki dua kolom:
|
|
SET type | DDL memiliki dua kolom:
|
|
HASHMAP type | Secara default, DDL memiliki tiga kolom:
|
|
Saat parameter flattenHash diatur ke true, DDL mendukung beberapa kolom. Contoh berikut menggunakan empat kolom:
|
| |
SORTEDSET type | DDL memiliki tiga kolom:
|
|
Pemetaan tipe
Kategori | Redis field type | Flink field type |
General | STRING | STRING |
Sink table specific | SCORE | DOUBLE |
Tipe SCORE di Redis berlaku untuk struktur data SORTEDSET (sorted set). Oleh karena itu, Anda harus menetapkan skor DOUBLE secara manual untuk setiap nilai. Nilai-nilai tersebut kemudian diurutkan secara ascending berdasarkan skornya.
Contoh
Tabel sink
Menulis data STRING: Pada contoh ini, nilai kolom
user_idpada tabel sinkredis_sinkdigunakan sebagai kunci, sedangkan nilai kolomlogin_timedigunakan sebagai nilai, lalu keduanya ditulis ke Redis.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID login_time STRING -- Logon timestamp ) WITH ( 'connector' = 'kafka', 'topic' = 'user_logins', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- The data format is JSON. 'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset. ); CREATE TEMPORARY TABLE redis_sink ( user_id STRING, -- Redis key login_time STRING, -- Redis value PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'STRING', -- Use STRING mode. 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;Menulis data HASHMAP dalam mode multi-nilai: Pada contoh ini, nilai kolom
order_idpada tabel sinkredis_sinkdigunakan sebagai kunci. Nilai kolomproduct_name,quantity, danamountmasing-masing ditulis ke field `product_name`, `quantity`, dan `amount` di Redis.CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- Order ID product_name STRING, -- Product name quantity STRING, -- Product quantity amount STRING -- Order amount ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- The data format is JSON. 'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset. ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Order ID, used as the Redis key. product_name STRING, -- Product name, used as a Redis Hash field. quantity STRING, -- Product quantity, used as a Redis Hash field. amount STRING, -- Order amount, used as a Redis Hash field. PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', -- Use HASHMAP mode. 'flattenHash' = 'true', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;Menulis data HASHMAP dalam mode single-value: Pada contoh ini, nilai kolom
order_idpada tabel sinkredis_sinkdigunakan sebagai kunci. Nilai kolomproduct_namedigunakan sebagai field, sedangkan nilai kolomquantitydigunakan sebagai nilai, lalu semua data tersebut ditulis ke Redis.CREATE TEMPORARY TABLE kafka_source ( order_id STRING, -- Order ID product_name STRING, -- Product name quantity STRING -- Product quantity ) WITH ( 'connector' = 'kafka', 'topic' = 'orders_topic', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- The data format is JSON. 'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset. ); CREATE TEMPORARY TABLE redis_sink ( order_id STRING, -- Redis key product_name STRING, -- Redis field quantity STRING, -- Redis value PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'HASHMAP', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword' ); INSERT INTO redis_sink SELECT * FROM kafka_source;
Tabel dimensi
Membaca data STRING: Pada contoh ini, nilai kolom
user_idpada tabel dimensiredis_dimberfungsi sebagai kunci, sedangkan nilai kolomuser_nameberfungsi sebagai nilai.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- The data format is JSON. 'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset. ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis key) user_name STRING, -- Username (Redis value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis host address 'port' = 'yourPort', -- Redis port 'password' = 'yourPassword', -- Redis password 'mode' = 'STRING' -- Use STRING mode. ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID redis_user_id STRING, -- User ID in Redis user_name STRING -- Username ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID (from Kafka) t2.user_id, -- User ID in Redis t2.user_name -- Username (from Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;Membaca data HASHMAP dalam mode multi-nilai: Pada contoh ini, nilai kolom
user_idpada tabel dimensiredis_dimberfungsi sebagai kunci. Nilai kolomuser_name,email, danregister_timemasing-masing berfungsi sebagai nilai dari field `user_name`, `email`, dan `register_time`.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID click_time TIMESTAMP(3), -- Click time proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- The data format is JSON. 'scan.startup.mode' = 'earliest-offset' ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis key) user_name STRING, -- Username (part of a Redis field-value pair) email STRING, -- Email (part of a Redis field-value pair) register_time STRING, -- Registration time (part of a Redis field-value pair) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', 'port' = 'yourPort', 'password' = 'yourPassword', 'mode' = 'HASHMAP' -- Use HASHMAP mode. ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID user_name STRING, -- Username email STRING, -- Email register_time STRING, -- Registration time click_time TIMESTAMP(3) -- Click time ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID t2.user_name, -- Username t2.email, -- Email t2.register_time, -- Registration time t1.click_time -- Click time FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;Membaca data HASHMAP dalam mode single-value: Pada contoh ini, nilai `testKey` dari parameter
hashNameberfungsi sebagai kunci. Nilai kolomuser_idpada tabel dimensiredis_dimberfungsi sebagai field, sedangkan nilai kolomuser_nameberfungsi sebagai nilai.CREATE TEMPORARY TABLE kafka_source ( user_id STRING, -- User ID proctime AS PROCTIME() -- Processing time ) WITH ( 'connector' = 'kafka', 'topic' = 'user_clicks', -- Kafka topic 'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address 'format' = 'json', -- The data format is JSON. 'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset. ); CREATE TEMPORARY TABLE redis_dim ( user_id STRING, -- User ID (Redis hash field) user_name STRING, -- Username (Redis hash value) PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'host' = 'yourHost', -- Redis host address 'port' = 'yourPort', -- Redis port 'password' = 'yourPassword',-- Redis password 'hashName' = 'testkey' -- Fixed Redis hash name ); CREATE TEMPORARY TABLE blackhole_sink ( user_id STRING, -- User ID redis_user_id STRING, -- User ID in Redis user_name STRING -- Username ) WITH ( 'connector' = 'blackhole' ); INSERT INTO blackhole_sink SELECT t1.user_id, -- User ID (from Kafka) t2.user_id, -- User ID in Redis t2.user_name -- Username (from Redis) FROM kafka_source AS t1 JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2 ON t1.user_id = t2.user_id;