When a Tair Serverless KV instance experiences a traffic spike, it automatically scales out to twice its original peak capacity. By default, requests that exceed the original peak capacity are queued during the scale-out process. This behavior is consistent with open source Redis. To have throttled requests return an error immediately, you can set the return-err-when-throttle parameter to yes. Throttled requests then return a THROTTLED error. You can retry or discard requests that receive this error. The following code samples show how to retry requests.
Jedis
Import the Jedis dependency.
<!-- This example uses version 5.2.0. --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>5.2.0</version> </dependency>Code sample.
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.exceptions.JedisException; public class JedisThrottledTest { private static final Logger logger = LoggerFactory.getLogger(JedisThrottledTest.class); private static final int MAX_RETRY = 10; // Maximum retry attempts public static void main(String[] args) { if (args.length < 3) { System.out.println("Usage: java -jar JedisThrottledTest.jar <host> <port> <password>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); String password = args[2]; JedisPoolConfig poolConfig = new JedisPoolConfig(); poolConfig.setMaxTotal(32); poolConfig.setMaxIdle(32); poolConfig.setMinIdle(16); JedisPool jedisPool = new JedisPool(poolConfig, host, port, 3000, password); for (int i = 0; i < 4; i++) { new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { executeWithRetry(jedisPool, "key" + i, "value" + i); } } }).start(); } } private static void executeWithRetry(JedisPool jedisPool, String key, String value) { int retryCount = 0; while (retryCount < MAX_RETRY) { try (Jedis jedis = jedisPool.getResource()) { jedis.set(key, value); break; } catch (JedisException e) { if (e.getMessage().contains("THROTTLED")) { logger.info("Throttled error occurred (attempt " + retryCount + "): " + e.getMessage()); retryCount++; if (retryCount >= MAX_RETRY) { logger.info("Max retry attempts reached."); throw e; } try { int sleepTime = (int)Math.pow(2, retryCount); Thread.sleep(sleepTime * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new RuntimeException("Thread interrupted during retry delay", ie); } } else { throw e; } } } } }
Valkey-java
Import the Valkey-java dependency.
ImportantUse valkey-java version 5.4.0 or later.
<dependency> <groupId>io.valkey</groupId> <artifactId>valkey-java</artifactId> <version>5.4.0</version> </dependency>Code sample.
import java.time.Duration; import io.valkey.DefaultJedisClientConfig; import io.valkey.ExceptionHandler; import io.valkey.HostAndPort; import io.valkey.UnifiedJedis; import io.valkey.providers.PooledConnectionProvider; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ThrottledTest { private static final Logger logger = LoggerFactory.getLogger(ThrottledTest.class); /** * Implements exponential backoff. */ static class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback { private int attempt = 0; @Override public void onError(String errorMessage) { int sleepTime = (int)Math.pow(2, attempt); try { logger.info("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage); Thread.sleep(sleepTime * 1000); } catch (InterruptedException ie) { // Ignore the error. } attempt++; } } public static void main(String[] args) { if (args.length < 3) { System.out.println("Usage: java -jar ThrottledTest.jar <host> <port> <password>"); return; } String host = args[0]; int port = Integer.parseInt(args[1]); String password = args[2]; GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); poolConfig.setMaxTotal(32); poolConfig.setMaxIdle(32); poolConfig.setMinIdle(16); int maxAttempts = 100; // Maximum retry attempts Duration maxTotalRetriesDuration = Duration.ofSeconds(1000); // Maximum total duration for retries PooledConnectionProvider provider = new PooledConnectionProvider(new HostAndPort(host, port), DefaultJedisClientConfig.builder().password(password).build(), poolConfig); ExceptionHandler handler = new ExceptionHandler(); handler.register( message -> message.contains("THROTTLED"), new ExponentialBackoffCallback() ); UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler); for (int i = 0; i < 4; i++) { // Use four threads to generate high QPS. new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < Integer.MAX_VALUE; i++) { try { unifiedJedis.set("" + i, "" + i); } catch (Exception e) { logger.error("Error occurred {}", e.getMessage()); } } } }).start(); } } }
redis-py
This example uses redis-py version 6.1.1.
import sys
import time
import threading
import logging
from redis import Redis, RedisError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MAX_RETRY = 10 # Maximum retry attempts
def execute_with_retry(redis_client, key, value):
retry_count = 0
while retry_count < MAX_RETRY:
try:
redis_client.set(key, value)
break # If successful, exit the loop.
except RedisError as e:
if "THROTTLED" in str(e):
logger.info(f"Throttled error occurred (attempt {retry_count}): {e}")
retry_count += 1
if retry_count >= MAX_RETRY:
logger.info("Max retry attempts reached.")
raise e
sleep_time = 2 ** retry_count
time.sleep(sleep_time)
else:
logger.error(f"Non-throttled Redis error: {e}")
raise e
def worker(redis_client):
i = 0
while True:
try:
execute_with_retry(redis_client, f"key{i}", f"value{i}")
i += 1
except Exception as e:
logger.exception(f"Unexpected error in worker: {e}")
time.sleep(1) # Avoid tight loop in case of persistent errors
def main():
if len(sys.argv) < 4:
print("Usage: python script.py <host> <port> <password>")
return
host = sys.argv[1]
port = int(sys.argv[2])
password = sys.argv[3]
redis_client = Redis(
host=host,
port=port,
password=password,
socket_timeout=3,
decode_responses=True
)
# Test the connection.
try:
redis_client.ping()
logger.info("Successfully connected to Redis")
except RedisError as e:
logger.error(f"Failed to connect to Redis: {e}")
return
# Create and start 10 threads.
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(redis_client,))
thread.daemon = True
thread.start()
threads.append(thread)
logger.info(f"Started worker thread {i}")
# Keep the main thread running.
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Program interrupted. Exiting...")
# Wait for all threads to complete.
for thread in threads:
thread.join()
if __name__ == "__main__":
main()