All Products
Search
Document Center

Tair (Redis® OSS-Compatible):Tair Serverless KV client-side throttling

Last Updated:Nov 06, 2025

When a Tair Serverless KV instance experiences a traffic spike, it automatically scales out to twice its original peak capacity. By default, requests that exceed the original peak capacity are queued during the scale-out process. This behavior is consistent with open source Redis. To have throttled requests return an error immediately, you can set the return-err-when-throttle parameter to yes. Throttled requests then return a THROTTLED error. You can retry or discard requests that receive this error. The following code samples show how to retry requests.

Jedis

  1. Import the Jedis dependency.

    <!-- This example uses version 5.2.0. -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.2.0</version>
    </dependency>
  2. Code sample.

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    import redis.clients.jedis.exceptions.JedisException;
    
    public class JedisThrottledTest {
        private static final Logger logger = LoggerFactory.getLogger(JedisThrottledTest.class);
    
        private static final int MAX_RETRY = 10; // Maximum retry attempts
    
        public static void main(String[] args) {
            if (args.length < 3) {
                System.out.println("Usage: java -jar JedisThrottledTest.jar <host> <port> <password>");
                return;
            }
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            String password = args[2];
    
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            poolConfig.setMaxTotal(32);
            poolConfig.setMaxIdle(32);
            poolConfig.setMinIdle(16);
    
            JedisPool jedisPool = new JedisPool(poolConfig, host, port, 3000, password);
            for (int i = 0; i < 4; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < Integer.MAX_VALUE; i++) {
                            executeWithRetry(jedisPool, "key" + i, "value" + i);
                        }
                    }
                }).start();
            }
        }
    
        private static void executeWithRetry(JedisPool jedisPool, String key, String value) {
            int retryCount = 0;
            while (retryCount < MAX_RETRY) {
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.set(key, value);
                    break;
                } catch (JedisException e) {
                    if (e.getMessage().contains("THROTTLED")) {
                        logger.info("Throttled error occurred (attempt " + retryCount + "): " + e.getMessage());
                        retryCount++;
                        if (retryCount >= MAX_RETRY) {
                            logger.info("Max retry attempts reached.");
                            throw e;
                        }
                        try {
                            int sleepTime = (int)Math.pow(2, retryCount);
                            Thread.sleep(sleepTime * 1000);
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException("Thread interrupted during retry delay", ie);
                        }
                    } else {
                        throw e;
                    }
                }
            }
        }
    }

Valkey-java

  1. Import the Valkey-java dependency.

    Important

    Use valkey-java version 5.4.0 or later.

    <dependency>
        <groupId>io.valkey</groupId>
        <artifactId>valkey-java</artifactId>
        <version>5.4.0</version>
    </dependency>
  2. Code sample.

    import java.time.Duration;
    
    import io.valkey.DefaultJedisClientConfig;
    import io.valkey.ExceptionHandler;
    import io.valkey.HostAndPort;
    import io.valkey.UnifiedJedis;
    import io.valkey.providers.PooledConnectionProvider;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ThrottledTest {
        private static final Logger logger = LoggerFactory.getLogger(ThrottledTest.class);
    
        /**
         * Implements exponential backoff.
         */
        static class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback {
            private int attempt = 0;
    
            @Override
            public void onError(String errorMessage) {
                int sleepTime = (int)Math.pow(2, attempt);
                try {
                    logger.info("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage);
                    Thread.sleep(sleepTime * 1000);
                } catch (InterruptedException ie) {
                    // Ignore the error.
                }
                attempt++;
            }
        }
    
        public static void main(String[] args) {
            if (args.length < 3) {
                System.out.println("Usage: java -jar ThrottledTest.jar <host> <port> <password>");
                return;
            }
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            String password = args[2];
    
            GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
            poolConfig.setMaxTotal(32);
            poolConfig.setMaxIdle(32);
            poolConfig.setMinIdle(16);
    
            int maxAttempts = 100; // Maximum retry attempts
            Duration maxTotalRetriesDuration = Duration.ofSeconds(1000); // Maximum total duration for retries
            PooledConnectionProvider provider = new PooledConnectionProvider(new HostAndPort(host, port),
                DefaultJedisClientConfig.builder().password(password).build(), poolConfig);
    
            ExceptionHandler handler = new ExceptionHandler();
            handler.register(
                message -> message.contains("THROTTLED"),
                new ExponentialBackoffCallback()
            );
    
            UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler);
            for (int i = 0; i < 4; i++) { // Use four threads to generate high QPS.
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < Integer.MAX_VALUE; i++) {
                            try {
                                unifiedJedis.set("" + i, "" + i);
                            } catch (Exception e) {
                                logger.error("Error occurred {}", e.getMessage());
                            }
                        }
                    }
                }).start();
            }
        }
    }

redis-py

This example uses redis-py version 6.1.1.

import sys
import time
import threading
import logging
from redis import Redis, RedisError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MAX_RETRY = 10  # Maximum retry attempts

def execute_with_retry(redis_client, key, value):
    retry_count = 0
    while retry_count < MAX_RETRY:
        try:
            redis_client.set(key, value)
            break  # If successful, exit the loop.
        except RedisError as e:
            if "THROTTLED" in str(e):
                logger.info(f"Throttled error occurred (attempt {retry_count}): {e}")
                retry_count += 1
                if retry_count >= MAX_RETRY:
                    logger.info("Max retry attempts reached.")
                    raise e
                sleep_time = 2 ** retry_count
                time.sleep(sleep_time)
            else:
                logger.error(f"Non-throttled Redis error: {e}")
                raise e

def worker(redis_client):
    i = 0
    while True:
        try:
            execute_with_retry(redis_client, f"key{i}", f"value{i}")
            i += 1
        except Exception as e:
            logger.exception(f"Unexpected error in worker: {e}")
            time.sleep(1)  # Avoid tight loop in case of persistent errors

def main():
    if len(sys.argv) < 4:
        print("Usage: python script.py <host> <port> <password>")
        return

    host = sys.argv[1]
    port = int(sys.argv[2])
    password = sys.argv[3]

    redis_client = Redis(
        host=host,
        port=port,
        password=password,
        socket_timeout=3,
        decode_responses=True
    )

    # Test the connection.
    try:
        redis_client.ping()
        logger.info("Successfully connected to Redis")
    except RedisError as e:
        logger.error(f"Failed to connect to Redis: {e}")
        return

    # Create and start 10 threads.
    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker, args=(redis_client,))
        thread.daemon = True
        thread.start()
        threads.append(thread)
        logger.info(f"Started worker thread {i}")

    # Keep the main thread running.
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Program interrupted. Exiting...")

    # Wait for all threads to complete.
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()