全部产品
Search
文档中心

Realtime Compute for Apache Flink:Tair (Redis OSS-compatible)

更新时间:Dec 04, 2025

Topik ini menjelaskan cara menggunakan Konektor Tair (Redis OSS-compatible).

Informasi latar belakang

Alibaba Cloud Tair adalah layanan database yang kompatibel dengan protokol open source Redis. Layanan ini menyediakan penyimpanan hibrida yang memanfaatkan memori dan disk secara bersamaan. Tair memiliki arsitektur aktif-aktif dengan ketersediaan tinggi serta arsitektur kluster yang dapat diskalakan untuk memenuhi kebutuhan bisnis akan throughput tinggi, latensi rendah, dan skalabilitas fleksibel. Untuk informasi selengkapnya, lihat Apa itu Tair (Kompatibel dengan Redis OSS)?.

Konektor Redis mendukung fitur-fitur berikut.

Category

Details

Supported types

Tabel dimensi dan tabel sink

Supported modes

Streaming

Format data

String

Specific monitoring metrics

  • Tabel dimensi: Tidak ada

  • Tabel sink:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi selengkapnya mengenai metrik tersebut, lihat Deskripsi metrik.

API types

SQL

Supports data updates or deletions in sink tables

Yes

Prasyarat

Batasan

  • Konektor Redis hanya menyediakan semantik best-effort dan tidak dapat menjamin pengiriman exactly-once. Oleh karena itu, Anda harus memastikan idempotensi.

  • Batasan berikut berlaku untuk tabel dimensi:

    • Anda hanya dapat membaca data tipe STRING dan HASHMAP dari penyimpanan data Redis.

    • Semua bidang dalam tabel dimensi harus bertipe STRING. Anda harus mendeklarasikan tepat satu primary key.

    • Saat melakukan join dengan tabel dimensi, klausa ON harus mencakup kondisi ekuivalen untuk primary key.

Masalah yang diketahui dan solusi

Ververica Runtime (VVR) 8.0.9 memiliki masalah yang diketahui terkait fitur caching-nya. Untuk mengatasi masalah ini, Anda dapat menonaktifkan fitur tersebut dengan mengatur 'sink.buffer-flush.max-rows' = '0' di klausa WITH tabel sink.

Sintaksis

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- Wajib.
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- Wajib untuk tabel sink.
);

Parameter WITH

Umum

Parameter

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

connector

Jenis tabel.

String

Ya

None

Nilainya harus redis.

host

Alamat koneksi server Redis.

String

Ya

None

Gunakan alamat jaringan internal.

Catatan

Koneksi ke alamat jaringan publik mungkin tidak stabil karena latensi jaringan dan batasan bandwidth.

port

Port koneksi server Redis.

Int

Tidak

6379

None.

password

Password untuk database Redis.

String

Tidak

String kosong. Artinya, tidak dilakukan validasi.

None.

dbNum

Nomor database yang akan dioperasikan.

Int

Tidak

0

None.

clusterMode

Menentukan apakah kluster Redis berada dalam mode kluster.

Boolean

Tidak

false

None.

hostAndPorts

Host dan nomor port kluster Redis.

Catatan

Jika mode kluster diaktifkan dan koneksi ketersediaan tinggi tidak diperlukan, Anda dapat menggunakan parameter host dan port untuk mengonfigurasi hanya satu host. Anda juga dapat hanya mengonfigurasi parameter ini. Parameter ini memiliki prioritas lebih tinggi daripada parameter host dan port individual.

String

Tidak

Empty

Jika ClusterMode = true dan ketersediaan tinggi diperlukan untuk koneksi Jedis ke kluster Redis yang dikelola sendiri, Anda harus mengonfigurasi parameter ini. Formatnya berupa string: "host1:port1,host2:port2".

key-prefix

Awalan untuk nilai primary key tabel.

String

Tidak

None

Setelah Anda mengonfigurasi parameter ini, awalan akan secara otomatis ditambahkan ke nilai bidang primary key saat Anda melakukan kueri atau menulis data. Awalan tersebut terdiri atas awalan kunci (key-prefix) dan pemisah awalan (key-prefix-delimiter).

Catatan

Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru.

key-prefix-delimiter

Pemisah antara nilai primary key dan awalannya.

String

Tidak

None

connection.pool.max-total

Jumlah maksimum koneksi yang dapat dialokasikan oleh kolam koneksi.

Int

Tidak

8

Catatan

Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru.

connection.pool.max-idle

Jumlah maksimum koneksi idle dalam kumpulan koneksi.

Int

Tidak

8

connection.pool.min-idle

Jumlah minimum koneksi idle dalam kumpulan koneksi.

Int

Tidak

0

connect.timeout

Timeout untuk membuat koneksi.

Duration

Tidak

3000ms

socket.timeout

Timeout untuk menerima data dari server Redis (socket timeout).

Duration

Tidak

3000ms

cacert.filepath

Jalur lengkap file sertifikat SSL/TLS. Format file harus JKS.

String

Tidak

None. Artinya, enkripsi SSL/TLS dinonaktifkan.

Untuk informasi selengkapnya, lihat Aktifkan enkripsi TLS untuk mengunduh sertifikat CA. Unggah sertifikat tersebut di bagian Additional Dependencies pekerjaan. Setelah diunggah, sertifikat CA disimpan di folder /flink/usrlib. Untuk informasi selengkapnya tentang cara mengunggah file di bagian Additional Dependencies, lihat Deploy a job. Contoh: 'cacert.filepath' = '/flink/usrlib/ca.jks'.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan versi yang lebih baru.

Khusus tabel sink

Parameter

Deskripsi

Tipe data

Diperlukan

Nilai default

Catatan

mode

Tipe data Redis yang sesuai.

String

Ya

None

Tabel sink Tair mendukung lima tipe data Redis. DDL harus didefinisikan dalam format yang ditentukan, dan primary key harus didefinisikan. Untuk informasi selengkapnya, lihat Format data untuk tabel sink Redis.

flattenHash

Menentukan apakah data HASHMAP ditulis dalam mode multi-nilai.

Boolean

Tidak

false

Nilai valid:

  • true: Menulis data dalam mode multi-nilai. Dalam hal ini, deklarasikan beberapa bidang non-primary key dalam DDL. Nilai bidang primary key berfungsi sebagai kunci. Nama setiap bidang non-primary key berfungsi sebagai field, dan nilai bidang tersebut berfungsi sebagai nilai field.

  • false: Menulis data dalam mode single-value. Dalam hal ini, deklarasikan tiga bidang dalam DDL. Nilai bidang primary key pertama berfungsi sebagai kunci. Nilai bidang non-primary key kedua berfungsi sebagai field. Nilai bidang non-primary key ketiga berfungsi sebagai nilai.

Catatan
  • Parameter ini hanya berlaku jika parameter mode diatur ke HASHMAP.

  • Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru.

ignoreDelete

Menentukan apakah pesan retraction diabaikan.

Boolean

Tidak

false

Nilai valid:

  • true: Mengabaikan pesan retraction saat diterima.

  • false: Menghapus kunci dan data yang dimasukkan saat pesan retraction diterima.

expiration

Menetapkan TTL untuk kunci data yang ditulis.

Long

Tidak

0. Artinya, tidak ada TTL yang ditetapkan.

Jika nilai parameter ini lebih besar dari 0, TTL yang sesuai akan ditetapkan untuk kunci data yang ditulis. Satuannya adalah milidetik.

sink.buffer-flush.max-rows

Jumlah maksimum catatan yang dapat disimpan dalam cache.

Int

Tidak

200

Catatan yang dicache mencakup semua event append, update, dan delete. Cache akan di-flush ketika jumlah maksimum catatan terlampaui.

Catatan
  • Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru.

  • Instans kluster Redis hanya didukung di VVR 11.4.0 dan versi yang lebih baru.

  • Jika Anda menulis data ke instans kluster Redis menggunakan versi sebelum VVR 11.4.0, atur parameter ini ke 0 untuk menonaktifkannya.

sink.buffer-flush.interval

Interval flush cache.

Duration

Tidak

1000ms

Cache di-flush secara asinkron.

Catatan
  • Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru.

  • Instans kluster Redis hanya didukung di VVR 11.4.0 dan versi yang lebih baru.

  • Jika Anda menulis data ke instans kluster Redis menggunakan versi sebelum VVR 11.4.0, atur parameter ini ke 0 untuk menonaktifkannya.

Khusus tabel dimensi

Parameter

Description

Data type

Required

Default value

Remarks

mode

Tipe data yang dibaca dari Redis.

String

Tidak

STRING

Nilai valid:

STRING: Secara default membaca data sebagai tipe STRING.

HASHMAP: Membaca data sebagai tipe HASHMAP dalam mode multi-nilai.

Dalam hal ini, Anda harus mendeklarasikan beberapa bidang non-primary key dalam DDL.

  • Bidang primary key: Nilai bidang primary key digunakan sebagai kunci dalam HASHMAP.

  • Bidang non-primary key: Nama setiap bidang non-primary key digunakan sebagai field dalam HASHMAP, dan nilai bidang tersebut berfungsi sebagai nilai.

Catatan

Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru.

Untuk membaca data HASHMAP dalam mode single-value, konfigurasikan parameter hashName.

hashName

Kunci yang digunakan saat membaca data HASHMAP dalam mode single-value.

String

Tidak

None

Jika Anda tidak menentukan parameter mode tetapi ingin membaca data HASHMAP dalam mode single-value, Anda harus mengonfigurasi hashName.

Dalam hal ini, Anda hanya perlu mendeklarasikan dua bidang dalam DDL. Nilai bidang primary key pertama berfungsi sebagai field, dan nilai bidang non-primary key kedua berfungsi sebagai nilai.

cache

Kebijakan cache.

String

Tidak

None

Tabel dimensi Tair mendukung kebijakan cache berikut:

None (default): Tidak ada cache.

LRU: Menyimpan sebagian data dari tabel dimensi dalam cache. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari di cache. Jika data tidak ditemukan, sistem mencari di tabel dimensi fisik.

ALL: Menyimpan semua data dari tabel dimensi dalam cache. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke dalam cache. Semua pencarian selanjutnya dilakukan terhadap cache. Jika data tidak ditemukan di cache, artinya kunci tersebut tidak ada. Cache penuh memiliki waktu kedaluwarsa. Setelah kedaluwarsa, cache penuh dimuat ulang.

Penting
  • Kebijakan cache ALL hanya didukung di VVR 8.0.3 dan versi yang lebih baru.

  • Untuk versi VVR dari 8.0.3 hingga 11.0, kebijakan cache ALL hanya mendukung pembacaan data HASHMAP dalam mode single-value. Dalam DDL, Anda harus mendeklarasikan tiga bidang. Bidang pertama ditentukan sebagai kunci menggunakan parameter hashName di klausa WITH. Bidang primary key kedua berfungsi sebagai field, dan bidang non-primary key ketiga berfungsi sebagai nilai.

  • Di VVR 11.1 dan versi yang lebih baru, kebijakan cache ALL mendukung pembacaan data HASHMAP dalam mode multi-nilai. Dalam DDL, deklarasikan beberapa bidang non-primary key. Nilai bidang primary key berfungsi sebagai kunci. Nama setiap bidang non-primary key berfungsi sebagai field, dan nilainya berfungsi sebagai nilai field. Selain itu, atur parameter mode ke HASHMAP di klausa WITH.

  • Anda juga harus mengonfigurasi parameter ukuran cache (cacheSize) dan interval pembaruan cache (cacheTTLMs).

cacheSize

Ukuran cache.

Long

Tidak

10000

Saat Anda memilih kebijakan cache LRU, Anda harus menetapkan ukuran cache.

cacheTTLMs

Timeout cache dalam milidetik.

Long

Tidak

None

Konfigurasi cacheTTLMs bergantung pada pengaturan cache:

Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya, cache tidak memiliki waktu kedaluwarsa.

Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa.

Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak direload.

cacheEmpty

Menentukan apakah hasil kosong di-cache.

Boolean

Tidak

true

None.

cacheReloadTimeBlackList

Waktu penyegaran cache yang dilarang. Saat kebijakan cache diatur ke ALL, Anda dapat mengaktifkan periode waktu yang dilarang untuk mencegah penyegaran cache selama periode tersebut, misalnya selama acara Double 11.

String

Tidak

None

Format:

  • Rentang tanggal dan waktu lengkap: 2017-10-24 14:00 -> 2017-10-24 15:00.

  • Rentang waktu yang melintasi hari: 2017-11-10 23:30 -> 2017-11-11 08:00.

  • Rentang waktu harian tetap: 12:00 -> 14:00, 22:00 -> 2:00.

    Catatan

    Di VVR 11.1 dan versi yang lebih baru, rentang waktu tanpa tanggal tertentu berlaku setiap hari.

Gunakan pemisah sebagai berikut:

  • Gunakan koma (,) untuk memisahkan beberapa periode waktu yang dilarang.

  • Gunakan panah (->) untuk memisahkan waktu mulai dan akhir periode yang dilarang.

async

Menentukan apakah data dikembalikan secara asinkron.

Boolean

Tidak

false

  • true: Mengembalikan data secara asinkron. Data yang dikembalikan secara asinkron tidak terurut secara default.

  • false (default): Tidak mengembalikan data secara asinkron.

Format data untuk tabel sink Redis

Type

Format

Redis command to insert data

STRING type

DDL memiliki dua kolom:

  • Kolom 1 adalah kunci, tipe STRING.

  • Kolom 2 adalah nilai, tipe STRING.

set key value

LIST type

DDL memiliki dua kolom:

  • Kolom 1 adalah kunci, tipe STRING.

  • Kolom 2 adalah nilai, tipe STRING.

lpush key value

SET type

DDL memiliki dua kolom:

  • Kolom 1 adalah kunci, tipe STRING.

  • Kolom 2 adalah nilai, tipe STRING.

sadd key value

HASHMAP type

Secara default, DDL memiliki tiga kolom:

  • Kolom 1 adalah kunci, tipe STRING.

  • Kolom 2 adalah field, tipe STRING.

  • Kolom 3 adalah nilai, tipe STRING.

hmset key field value

Saat parameter flattenHash diatur ke true, DDL mendukung beberapa kolom. Contoh berikut menggunakan empat kolom:

  • Kolom 1 adalah kunci, tipe STRING.

  • Nama kolom 2 (misalnya, col1) berfungsi sebagai field, dan nilainya (misalnya, value1) berfungsi sebagai nilai field. Tipe STRING.

  • Nama kolom 3 (misalnya, col2) berfungsi sebagai field, dan nilainya (misalnya, value2) berfungsi sebagai nilai field. Tipe STRING.

  • Nama kolom 4 (misalnya, col3) berfungsi sebagai field, dan nilainya (misalnya, value3) berfungsi sebagai nilai field. Tipe STRING.

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET type

DDL memiliki tiga kolom:

  • Kolom 1 adalah kunci, tipe STRING.

  • Kolom 2 adalah skor, tipe DOUBLE.

  • Kolom 3 adalah nilai, tipe STRING.

zadd key score value

Pemetaan tipe

Kategori

Redis field type

Flink field type

General

STRING

STRING

Sink table specific

SCORE

DOUBLE

Catatan

Tipe SCORE di Redis berlaku untuk struktur data SORTEDSET (sorted set). Oleh karena itu, Anda harus menetapkan skor DOUBLE secara manual untuk setiap nilai. Nilai-nilai tersebut kemudian diurutkan secara ascending berdasarkan skornya.

Contoh

  • Tabel sink

    • Menulis data STRING: Pada contoh ini, nilai kolom user_id pada tabel sink redis_sink digunakan sebagai kunci, sedangkan nilai kolom login_time digunakan sebagai nilai, lalu keduanya ditulis ke Redis.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,          -- User ID
        login_time STRING        -- Logon timestamp
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_logins',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- The data format is JSON.
        'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset.
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        user_id STRING,               -- Redis key
        login_time STRING,            -- Redis value
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',   -- Use STRING mode.
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • Menulis data HASHMAP dalam mode multi-nilai: Pada contoh ini, nilai kolom order_id pada tabel sink redis_sink digunakan sebagai kunci. Nilai kolom product_name, quantity, dan amount masing-masing ditulis ke field `product_name`, `quantity`, dan `amount` di Redis.

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,     -- Order ID
        product_name STRING, -- Product name
        quantity STRING,         -- Product quantity
        amount STRING         -- Order amount
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',        -- The data format is JSON.
        'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset.
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,        -- Order ID, used as the Redis key.
        product_name STRING,    -- Product name, used as a Redis Hash field.
        quantity STRING,        -- Product quantity, used as a Redis Hash field.
        amount STRING,          -- Order amount, used as a Redis Hash field.
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',   -- Use HASHMAP mode.
        'flattenHash' = 'true',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • Menulis data HASHMAP dalam mode single-value: Pada contoh ini, nilai kolom order_id pada tabel sink redis_sink digunakan sebagai kunci. Nilai kolom product_name digunakan sebagai field, sedangkan nilai kolom quantity digunakan sebagai nilai, lalu semua data tersebut ditulis ke Redis.

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,          -- Order ID
        product_name STRING,      -- Product name
        quantity STRING           -- Product quantity
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',        -- The data format is JSON.
        'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset.
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,          -- Redis key
        product_name STRING,      -- Redis field
        quantity STRING,          -- Redis value
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
  • Tabel dimensi

    • Membaca data STRING: Pada contoh ini, nilai kolom user_id pada tabel dimensi redis_dim berfungsi sebagai kunci, sedangkan nilai kolom user_name berfungsi sebagai nilai.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- User ID
        proctime AS PROCTIME()        -- Processing time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- The data format is JSON.
        'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset.
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- User ID (Redis key)
        user_name STRING,             -- Username (Redis value)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',    -- Redis host address
        'port' = 'yourPort',    -- Redis port
        'password' = 'yourPassword',  -- Redis password
        'mode' = 'STRING'             -- Use STRING mode.
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- User ID
        redis_user_id STRING,         -- User ID in Redis
        user_name STRING              -- Username
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,                   -- User ID (from Kafka)
        t2.user_id,                   -- User ID in Redis
        t2.user_name                  -- Username (from Redis)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • Membaca data HASHMAP dalam mode multi-nilai: Pada contoh ini, nilai kolom user_id pada tabel dimensi redis_dim berfungsi sebagai kunci. Nilai kolom user_name, email, dan register_time masing-masing berfungsi sebagai nilai dari field `user_name`, `email`, dan `register_time`.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- User ID
        click_time TIMESTAMP(3),      -- Click time
        proctime AS PROCTIME()        -- Processing time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- The data format is JSON.
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,     -- User ID (Redis key)
        user_name STRING,    -- Username (part of a Redis field-value pair)
        email STRING,        -- Email (part of a Redis field-value pair)
        register_time STRING, -- Registration time (part of a Redis field-value pair)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword',
        'mode' = 'HASHMAP'    -- Use HASHMAP mode.
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,        -- User ID
        user_name STRING,      -- Username
        email STRING,          -- Email
        register_time STRING,   -- Registration time
        click_time TIMESTAMP(3) -- Click time
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,    -- User ID
        t2.user_name,  -- Username
        t2.email,      -- Email
        t2.register_time,  -- Registration time
        t1.click_time      -- Click time
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • Membaca data HASHMAP dalam mode single-value: Pada contoh ini, nilai `testKey` dari parameter hashName berfungsi sebagai kunci. Nilai kolom user_id pada tabel dimensi redis_dim berfungsi sebagai field, sedangkan nilai kolom user_name berfungsi sebagai nilai.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,            -- User ID
        proctime AS PROCTIME()     -- Processing time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- The data format is JSON.
        'scan.startup.mode' = 'earliest-offset' -- Start consuming from the earliest offset.
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- User ID (Redis hash field)
        user_name STRING,             -- Username (Redis hash value)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',        -- Redis host address
        'port' = 'yourPort',        -- Redis port
        'password' = 'yourPassword',-- Redis password
        'hashName' = 'testkey'      -- Fixed Redis hash name
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- User ID
        redis_user_id STRING,         -- User ID in Redis
        user_name STRING              -- Username
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
         t1.user_id,     -- User ID (from Kafka)
         t2.user_id,     -- User ID in Redis
         t2.user_name    -- Username (from Redis)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;