全部产品
Search
文档中心

Realtime Compute for Apache Flink:Tair (Kompatibel dengan Redis OSS)

更新时间:Mar 05, 2026

Topik ini menjelaskan cara menggunakan konektor Tair (Kompatibel dengan Redis OSS).

Informasi latar belakang

Alibaba Cloud Tair adalah layanan database yang kompatibel dengan protokol open source Redis dan menyediakan penyimpanan hibrida memori-dan-disk. Tair menggunakan arsitektur aktif-aktif dengan ketersediaan tinggi (HA) serta arsitektur kluster yang dapat diskalakan untuk memenuhi kebutuhan bisnis akan throughput tinggi, latensi rendah, dan skalabilitas fleksibel ke atas maupun ke bawah. Untuk informasi selengkapnya, lihat Apa itu Tair (Kompatibel dengan Redis OSS)?.

Tabel berikut menjelaskan fitur-fitur yang didukung oleh konektor Redis.

Category

Details

Jenis yang didukung

Tabel dimensi dan tabel sink

Mode yang didukung

Streaming mode

Format data

String

Metrik pemantauan spesifik

  • Tabel dimensi: Tidak ada

  • Tabel sink:

    • numBytesOut

    • numRecordsOutPerSecond

    • numBytesOutPerSecond

    • currentSendTime

Catatan

Untuk informasi selengkapnya tentang metrik tersebut, lihat Metrik pemantauan.

Jenis API

SQL

Mendukung pembaruan atau penghapusan data di tabel sink

Ya

Prasyarat

Batasan

  • Konektor Redis menyediakan semantik best-effort dan tidak menjamin pengiriman exactly-once. Anda harus memastikan idempotensi dalam aplikasi Anda.

  • Batasan berikut berlaku untuk tabel dimensi:

    • Hanya tipe data STRING dan HASHMAP yang dapat dibaca dari Redis.

    • Semua bidang dalam tabel dimensi harus bertipe STRING. Anda harus mendeklarasikan tepat satu primary key.

    • Saat melakukan join dengan tabel dimensi, klausa ON harus mencakup kondisi ekuivalen untuk primary key.

Isu yang diketahui dan solusi

Fitur cache pada Ververica Runtime (VVR) 8.0.9 memiliki masalah. Untuk menonaktifkannya, tambahkan sink.buffer-flush.max-rows = '0' ke klausa WITH pada tabel sink.

Sintaksis

CREATE TABLE redis_table (
  col1 STRING,
  col2 STRING,
  PRIMARY KEY (col1) NOT ENFORCED -- Wajib.
) WITH (
  'connector' = 'redis',
  'host' = '<yourHost>',
  'mode' = 'STRING'  -- Wajib untuk tabel sink.
);

Parameter WITH

Umum

Parameter

Description

Data type

Required

Default

Remarks

connector

Jenis tabel.

String

Ya

None

Nilainya harus redis.

host

Alamat koneksi server Redis.

String

Ya

None

Gunakan titik akhir jaringan internal.

Catatan

Koneksi ke titik akhir jaringan publik mungkin tidak stabil karena faktor seperti latensi jaringan dan batasan bandwidth.

port

Port koneksi server Redis.

Int

Tidak

6379

None.

password

Password database Redis.

String

Tidak

String kosong, yang berarti tidak ada validasi yang dilakukan.

None.

dbNum

Nomor database yang akan dioperasikan.

Int

Tidak

0

None.

clusterMode

Menentukan apakah kluster Redis berada dalam cluster mode.

Boolean

Tidak

false

None.

hostAndPorts

Host dan nomor port kluster Redis.

Catatan

Jika cluster mode diaktifkan dan HA tidak diperlukan, Anda dapat mengonfigurasi hanya satu host menggunakan parameter host dan port, atau hanya mengonfigurasi parameter ini. Parameter ini memiliki prioritas lebih tinggi daripada parameter host dan port individual.

String

Tidak

Empty

Jika ClusterMode = true dan Anda memerlukan HA untuk koneksi Jedis ke kluster Redis self-managed, Anda harus mengonfigurasi parameter ini. Formatnya berupa string: "host1:port1,host2:port2".

key-prefix

Awalan untuk nilai primary key tabel.

String

Tidak

None

Setelah parameter ini dikonfigurasi, awalan akan secara otomatis ditambahkan ke nilai bidang primary key saat Anda melakukan kueri atau menulis data. Awalan tersebut terdiri dari awalan kunci (key-prefix) dan pemisah awalan (key-prefix-delimiter).

Catatan

Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru.

key-prefix-delimiter

Pemisah antara nilai primary key dan awalannya.

String

Tidak

None

connection.pool.max-total

Jumlah maksimum koneksi yang dapat dialokasikan oleh kolam koneksi.

Int

Tidak

8

Catatan

Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru.

connection.pool.max-idle

Jumlah maksimum koneksi idle dalam kolam koneksi.

Int

Tidak

8

connection.pool.min-idle

Jumlah minimum koneksi idle dalam kolam koneksi.

Int

Tidak

0

connect.timeout

Timeout untuk membuat koneksi.

Duration

Tidak

3000 ms

socket.timeout

Timeout untuk menerima data dari server Redis (socket timeout).

Duration

Tidak

3000 ms

cacert.filepath

Jalur lengkap file sertifikat SSL/TLS. Format file harus jks.

String

Tidak

None, yang menunjukkan bahwa enkripsi SSL/TLS dinonaktifkan.

Unduh sertifikat CA sebagaimana dijelaskan dalam Aktifkan enkripsi TLS, lalu unggah sertifikat tersebut di bagian Additional Dependency Files pekerjaan. Setelah diunggah, sertifikat CA disimpan di direktori /flink/usrlib. Untuk informasi selengkapnya tentang cara mengunggah file di bagian Additional Dependency Files, lihat Deploy a job. Contoh: 'cacert.filepath' = '/flink/usrlib/ca.jks'.

Catatan

Parameter ini hanya didukung di VVR 11.1 dan versi yang lebih baru.

Khusus Sink

Parameter

Description

Data type

Required

Default

Remarks

mode

Tipe data Redis yang digunakan.

String

Ya

None

Tabel sink Tair mendukung lima tipe data Redis. DDL harus didefinisikan dalam format tertentu, dan primary key harus didefinisikan. Untuk informasi selengkapnya, lihat Format tipe data untuk tabel sink Redis.

flattenHash

Menentukan apakah data HASHMAP ditulis dalam mode multi-value.

Boolean

Tidak

false

Nilai yang valid:

  • true: Menulis data dalam mode multi-value. Dalam hal ini, deklarasikan beberapa bidang non-primary key dalam DDL. Nilai bidang primary key sesuai dengan kunci. Nama setiap bidang non-primary key sesuai dengan field, dan nilainya sesuai dengan nilai field tersebut.

  • false: Menulis data dalam mode single-value. Dalam hal ini, deklarasikan tiga bidang dalam DDL. Nilai bidang primary key pertama sesuai dengan kunci. Nilai bidang non-primary key kedua sesuai dengan field. Nilai bidang non-primary key ketiga sesuai dengan nilai.

Catatan
  • Parameter ini hanya berlaku ketika parameter mode diatur ke HASHMAP.

  • Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru.

ignoreDelete

Menentukan apakah pesan retraction diabaikan.

Boolean

Tidak

false

Nilai yang valid:

  • true: Mengabaikan pesan retraction saat diterima.

  • false: Menghapus kunci yang sesuai dan data yang dimasukkan saat pesan retraction diterima.

expiration

Menetapkan TTL untuk kunci data yang ditulis.

Long

Tidak

0, yang berarti tidak ada TTL yang ditetapkan.

Jika nilai parameter ini lebih besar dari 0, TTL yang sesuai akan ditetapkan untuk kunci data yang ditulis. Satuannya adalah milidetik.

sink.buffer-flush.max-rows

Jumlah maksimum catatan yang dapat disimpan dalam cache.

Int

Tidak

200

Catatan yang dicache mencakup semua event append, update, dan delete. Cache akan di-flush ketika jumlah maksimum catatan terlampaui.

Catatan
  • Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru. Hanya berlaku dalam mode non-cluster (clusterMode = false).

  • Di VVR 11.4.0 dan versi yang lebih baru, parameter ini juga didukung dalam mode kluster Redis (clusterMode = true).

sink.buffer-flush.interval

Interval flush cache.

Duration

Tidak

1000 ms

Cache di-flush secara asinkron.

Catatan
  • Parameter ini hanya didukung di VVR 8.0.9 dan versi yang lebih baru. Hanya berlaku dalam mode non-cluster (clusterMode = false).

  • Di VVR 11.4.0 dan versi yang lebih baru, parameter ini juga didukung dalam mode kluster Redis (clusterMode = true).

Khusus Tabel Dimensi

Parameter

Description

Data type

Required

Default

Remarks

mode

Tipe data Redis yang dibaca.

String

Tidak

STRING

Nilai yang valid:

STRING: Membaca data sebagai tipe STRING secara default.

HASHMAP: Membaca data HASHMAP dalam mode multi-value.

Dalam hal ini, DDL harus mendeklarasikan beberapa bidang non-primary key.

  • Bidang primary key: Nilai bidang primary key digunakan sebagai kunci dalam HASHMAP.

  • Bidang non-primary key: Nama setiap bidang non-primary key digunakan sebagai field, dan nilainya digunakan sebagai nilai field tersebut.

Catatan

Parameter ini hanya didukung di VVR 8.0.7 dan versi yang lebih baru.

Untuk membaca data HASHMAP dalam mode single-value, konfigurasikan parameter hashName.

hashName

Kunci yang digunakan saat membaca data HASHMAP dalam mode single-value.

String

Tidak

None

Jika Anda tidak menentukan parameter mode tetapi ingin membaca data HASHMAP dalam mode single-value, Anda harus mengonfigurasi hashName.

Dalam hal ini, DDL hanya perlu mendeklarasikan dua bidang. Nilai bidang primary key pertama sesuai dengan field, dan nilai bidang non-primary key kedua sesuai dengan nilai.

cache

Kebijakan cache.

String

Tidak

None

Tabel dimensi Tair mendukung kebijakan cache berikut:

None (default): Tidak ada cache.

LRU: Menyimpan sebagian data tabel dimensi dalam cache. Untuk setiap catatan dari tabel sumber, sistem terlebih dahulu mencari di cache. Jika data tidak ditemukan, sistem melakukan kueri ke tabel dimensi fisik.

ALL: Menyimpan semua data dari tabel dimensi dalam cache. Sebelum pekerjaan dijalankan, sistem memuat semua data dari tabel dimensi ke dalam cache. Semua pencarian selanjutnya dilakukan terhadap cache. Jika data tidak ditemukan dalam cache, berarti kunci tersebut tidak ada. Cache penuh memiliki waktu kedaluwarsa, setelah itu cache dimuat ulang.

Penting
  • Kebijakan cache ALL hanya didukung di VVR 8.0.3 dan versi yang lebih baru.

  • Untuk versi VVR dari 8.0.3 hingga 11.0, kebijakan cache ALL hanya mendukung pembacaan data HASHMAP dalam mode single-value. Dalam DDL, Anda harus mendeklarasikan tiga bidang. Bidang pertama ditentukan sebagai kunci menggunakan parameter hashName dalam klausa WITH. Nilai bidang primary key kedua sesuai dengan field. Nilai bidang non-primary key ketiga sesuai dengan nilai.

  • Di VVR 11.1 dan versi yang lebih baru, kebijakan cache ALL mendukung pembacaan data HASHMAP dalam mode multi-value. Dalam DDL, deklarasikan beberapa bidang non-primary key. Nilai bidang primary key sesuai dengan kunci. Nama setiap bidang non-primary key sesuai dengan field, dan nilainya sesuai dengan nilai field tersebut. Anda juga harus mengatur parameter mode ke HASHMAP dalam klausa WITH.

  • Anda juga harus mengonfigurasi parameter ukuran cache (cacheSize) dan interval pembaruan cache (cacheTTLMs).

cacheSize

Ukuran cache.

Long

Tidak

10000

Saat Anda memilih kebijakan cache LRU, Anda harus menetapkan ukuran cache.

cacheTTLMs

Durasi timeout cache, dalam milidetik.

Long

Tidak

None

Konfigurasi cacheTTLMs bergantung pada pengaturan cache:

Jika cache diatur ke None, Anda tidak perlu mengonfigurasi cacheTTLMs. Artinya cache tidak memiliki waktu kedaluwarsa.

Jika cache diatur ke LRU, cacheTTLMs adalah periode timeout cache. Secara default, cache tidak kedaluwarsa.

Jika cache diatur ke ALL, cacheTTLMs adalah waktu reload cache. Secara default, cache tidak direload.

cacheEmpty

Menentukan apakah hasil kosong di-cache.

Boolean

Tidak

true

None.

cacheReloadTimeBlackList

Waktu penyegaran cache yang dilarang. Saat kebijakan cache diatur ke ALL, Anda dapat mengaktifkan waktu yang dilarang untuk mencegah cache diperbarui selama periode tersebut, misalnya selama festival belanja Double 11.

String

Tidak

None

Formatnya sebagai berikut:

  • Rentang tanggal dan waktu lengkap: 2017-10-24 14:00 -> 2017-10-24 15:00.

  • Periode lintas hari: 2017-11-10 23:30 -> 2017-11-11 08:00.

  • Periode waktu tetap harian: 12:00 -> 14:00, 22:00 -> 2:00.

    Catatan

    Di VVR 11.1 dan versi yang lebih baru, periode waktu tanpa tanggal yang ditentukan berlaku setiap hari.

Gunakan pemisah sebagai berikut:

  • Gunakan koma (,) untuk memisahkan beberapa periode waktu yang dilarang.

  • Gunakan panah (->) untuk memisahkan waktu mulai dan akhir periode yang dilarang.

async

Menentukan apakah data dikembalikan secara asinkron.

Boolean

Tidak

false

  • true: Mengembalikan data secara asinkron. Data yang dikembalikan secara asinkron tidak terurut secara default.

  • false (default): Tidak mengembalikan data secara asinkron.

Format tipe data untuk tabel sink Redis

Type

Format

Redis insert command

STRING type

DDL dua kolom:

  • Kolom 1: key (STRING)

  • Kolom 2: value (STRING)

set key value

LIST type

DDL dua kolom:

  • Kolom 1: key (STRING)

  • Kolom 2: value (STRING)

lpush key value

SET type

DDL dua kolom:

  • Kolom 1: key (STRING)

  • Kolom 2: value (STRING)

sadd key value

HASHMAP type

Secara default, DDL tiga kolom:

  • Kolom 1: key (STRING)

  • Kolom 2: field (STRING)

  • Kolom 3: value (STRING)

hmset key field value

Jika flattenHash diatur ke true, DDL mendukung beberapa kolom. Contoh berikut menggunakan empat kolom:

  • Kolom 1: key (STRING)

  • Kolom 2: Nama kolom (misalnya, col1) sesuai dengan field, dan nilai kolom (misalnya, value1) sesuai dengan nilai field tersebut (STRING).

  • Kolom 3: Nama kolom (misalnya, col2) sesuai dengan field, dan nilai kolom (misalnya, value2) sesuai dengan nilai field tersebut (STRING).

  • Kolom 4: Nama kolom (misalnya, col3) sesuai dengan field, dan nilai kolom (misalnya, value3) sesuai dengan nilai field tersebut (STRING).

hmset key col1 value1 col2 value2 col3 value3

SORTEDSET type

DDL tiga kolom:

  • Kolom 1: key (STRING)

  • Kolom 2: score (DOUBLE)

  • Kolom 3: value (STRING)

zadd key score value

Pemetaan tipe

Type

Redis field type

Flink field type

Umum

STRING

STRING

Hanya untuk tabel sink

SCORE

DOUBLE

Catatan

Tipe SCORE Redis berlaku untuk SORTEDSET (sorted sets). Anda harus secara manual menetapkan skor DOUBLE untuk setiap nilai. Nilai-nilai tersebut kemudian diurutkan secara ascending berdasarkan skornya.

Contoh penggunaan

  • Sink table

    • Menulis data STRING: Pada contoh kode, nilai kolom user_id pada tabel sink redis_sink ditulis ke Redis sebagai kunci, sedangkan nilai kolom login_time ditulis sebagai nilai.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,          -- User ID
        login_time STRING        -- Logon timestamp
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_logins',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- Data format is JSON
        'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        user_id STRING,               -- Redis key
        login_time STRING,            -- Redis value
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'STRING',   -- Use STRING mode
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • Menulis data HASHMAP dalam mode multi-value: Pada contoh kode, nilai kolom order_id pada tabel sink redis_sink ditulis ke Redis sebagai kunci. Nilai kolom product_name, quantity, dan amount masing-masing ditulis ke field product_name, quantity, dan amount.

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,     -- Order ID
        product_name STRING, -- Product name
        quantity STRING,         -- Quantity
        amount STRING         -- Order amount
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',        -- Data format is JSON
        'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,        -- Order ID, used as the Redis key
        product_name STRING,    -- Product name, used as a Redis Hash field
        quantity STRING,        -- Quantity, used as a Redis Hash field
        amount STRING,          -- Order amount, used as a Redis Hash field
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',   -- Use HASHMAP mode
        'flattenHash' = 'true',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
    • Menulis data HASHMAP dalam mode single-value: Pada contoh kode, nilai kolom order_id pada tabel sink redis_sink ditulis ke Redis sebagai kunci. Nilai kolom product_name ditulis sebagai field, sedangkan nilai kolom quantity ditulis sebagai nilai.

      CREATE TEMPORARY TABLE kafka_source (
        order_id STRING,          -- Order ID
        product_name STRING,      -- Product name
        quantity STRING           -- Quantity
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'orders_topic', -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',        -- Data format is JSON
        'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset
      );
      
      CREATE TEMPORARY TABLE redis_sink (
        order_id STRING,          -- Redis key
        product_name STRING,      -- Redis field
        quantity STRING,          -- Redis value
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'mode' = 'HASHMAP',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword'
      );
      
      INSERT INTO redis_sink
      SELECT *
      FROM kafka_source;
  • Tabel dimensi

    • Membaca data STRING: Pada contoh kode, nilai kolom user_id pada tabel dimensi redis_dim sesuai dengan kunci, sedangkan nilai kolom user_name sesuai dengan nilai.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- User ID
        proctime AS PROCTIME()        -- Processing time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- Data format is JSON
        'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- User ID (Redis key)
        user_name STRING,             -- Username (Redis value)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',    -- Redis host address
        'port' = 'yourPort',    -- Redis port
        'password' = 'yourPassword',  -- Redis password
        'mode' = 'STRING'             -- Use STRING mode
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- User ID
        redis_user_id STRING,         -- User ID from Redis
        user_name STRING              -- Username
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,                   -- User ID (from Kafka)
        t2.user_id,                   -- User ID (from Redis)
        t2.user_name                  -- Username (from Redis)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • Membaca data HASHMAP dalam mode multi-value: Pada contoh kode, nilai kolom user_id pada tabel dimensi redis_dim merupakan kunci. Nilai kolom user_name dipetakan ke field user_name, nilai kolom email dipetakan ke field email, dan nilai kolom register_time dipetakan ke field register_time.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,               -- User ID
        click_time TIMESTAMP(3),      -- Click time
        proctime AS PROCTIME()        -- Processing time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- Data format is JSON
        'scan.startup.mode' = 'earliest-offset'
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,     -- User ID (Redis key)
        user_name STRING,    -- Username (part of a Redis field-value pair)
        email STRING,        -- Email (part of a Redis field-value pair)
        register_time STRING, -- Registration time (part of a Redis field-value pair)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',
        'port' = 'yourPort',
        'password' = 'yourPassword',
        'mode' = 'HASHMAP'    -- Use HASHMAP mode
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,        -- User ID
        user_name STRING,      -- Username
        email STRING,          -- Email
        register_time STRING,   -- Registration time
        click_time TIMESTAMP(3) -- Click time
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
        t1.user_id,    -- User ID
        t2.user_name,  -- Username
        t2.email,      -- Email
        t2.register_time,  -- Registration time
        t1.click_time      -- Click time
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;
    • Membaca data HASHMAP dalam mode single-value: Pada contoh kode, nilai parameter hashName adalah testKey, yang digunakan sebagai kunci. Nilai kolom user_id pada tabel dimensi redis_dim merupakan field, sedangkan nilai kolom user_name merupakan nilai.

      CREATE TEMPORARY TABLE kafka_source (
        user_id STRING,            -- User ID
        proctime AS PROCTIME()     -- Processing time
      ) WITH (
        'connector' = 'kafka',
        'topic' = 'user_clicks',      -- Kafka topic
        'properties.bootstrap.servers' = 'yourKafkaBroker', -- Kafka broker address
        'format' = 'json',            -- Data format is JSON
        'scan.startup.mode' = 'earliest-offset' -- Consume from the earliest offset
      );
      
      CREATE TEMPORARY TABLE redis_dim (
        user_id STRING,               -- User ID (Redis hash field)
        user_name STRING,             -- Username (Redis hash value)
        PRIMARY KEY (user_id) NOT ENFORCED
      ) WITH (
        'connector' = 'redis',
        'host' = 'yourHost',        -- Redis host address
        'port' = 'yourPort',        -- Redis port
        'password' = 'yourPassword',-- Redis password
        'hashName' = 'testkey'      -- Fixed Redis hash name
      );
      
      CREATE TEMPORARY TABLE blackhole_sink (
        user_id STRING,               -- User ID
        redis_user_id STRING,         -- User ID from Redis
        user_name STRING              -- Username
      ) WITH (
        'connector' = 'blackhole'
      );
      
      INSERT INTO blackhole_sink
      SELECT 
         t1.user_id,     -- User ID (from Kafka)
         t2.user_id,     -- User ID (from Redis)
         t2.user_name    -- Username (from Redis)
      FROM kafka_source AS t1
      JOIN redis_dim FOR SYSTEM_TIME AS OF t1.proctime AS t2
      ON t1.user_id = t2.user_id;