すべてのプロダクト
Search
ドキュメントセンター

Tair (Redis® OSS-Compatible):Tair Serverless KV のクライアント側のスロットリング

最終更新日:Nov 07, 2025

Tair Serverless KV インスタンスでトラフィックスパイクが発生すると、元のピーク容量の 2 倍に自動的にスケールアウトします。デフォルトでは、元のピーク容量を超えるリクエストは、スケールアウトプロセス中にキューに入れられます。この動作はオープンソースの Redis と同じです。スロットルされたリクエストがすぐにエラーを返すようにするには、return-err-when-throttle パラメーターを yes に設定します。これにより、スロットルされたリクエストは THROTTLED エラーを返すようになります。このエラーを受け取ったリクエストは、リトライまたは破棄できます。以下のコードサンプルは、リクエストをリトライする方法を示しています。

Jedis

  1. Jedis の依存関係をインポートします。

    <!-- この例ではバージョン 5.2.0 を使用します。 -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>5.2.0</version>
    </dependency>
  2. コードサンプル。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    import redis.clients.jedis.exceptions.JedisException;
    
    public class JedisThrottledTest {
        private static final Logger logger = LoggerFactory.getLogger(JedisThrottledTest.class);
    
        private static final int MAX_RETRY = 10; // 最大リトライ回数
    
        public static void main(String[] args) {
            if (args.length < 3) {
                System.out.println("Usage: java -jar JedisThrottledTest.jar <host> <port> <password>");
                return;
            }
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            String password = args[2];
    
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            poolConfig.setMaxTotal(32);
            poolConfig.setMaxIdle(32);
            poolConfig.setMinIdle(16);
    
            JedisPool jedisPool = new JedisPool(poolConfig, host, port, 3000, password);
            for (int i = 0; i < 4; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < Integer.MAX_VALUE; i++) {
                            executeWithRetry(jedisPool, "key" + i, "value" + i);
                        }
                    }
                }).start();
            }
        }
    
        private static void executeWithRetry(JedisPool jedisPool, String key, String value) {
            int retryCount = 0;
            while (retryCount < MAX_RETRY) {
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.set(key, value);
                    break;
                } catch (JedisException e) {
                    if (e.getMessage().contains("THROTTLED")) {
                        logger.info("Throttled error occurred (attempt " + retryCount + "): " + e.getMessage());
                        retryCount++;
                        if (retryCount >= MAX_RETRY) {
                            logger.info("Max retry attempts reached.");
                            throw e;
                        }
                        try {
                            int sleepTime = (int)Math.pow(2, retryCount);
                            Thread.sleep(sleepTime * 1000);
                        } catch (InterruptedException ie) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException("Thread interrupted during retry delay", ie);
                        }
                    } else {
                        throw e;
                    }
                }
            }
        }
    }

Valkey-java

  1. Valkey-java の依存関係をインポートします。

    重要

    valkey-java バージョン 5.4.0 以降を使用してください。

    <dependency>
        <groupId>io.valkey</groupId>
        <artifactId>valkey-java</artifactId>
        <version>5.4.0</version>
    </dependency>
  2. コードサンプル。

    import java.time.Duration;
    
    import io.valkey.DefaultJedisClientConfig;
    import io.valkey.ExceptionHandler;
    import io.valkey.HostAndPort;
    import io.valkey.UnifiedJedis;
    import io.valkey.providers.PooledConnectionProvider;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ThrottledTest {
        private static final Logger logger = LoggerFactory.getLogger(ThrottledTest.class);
    
        /**
         * エクスポネンシャルバックオフを実装します。
         */
        static class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback {
            private int attempt = 0;
    
            @Override
            public void onError(String errorMessage) {
                int sleepTime = (int)Math.pow(2, attempt);
                try {
                    logger.info("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage);
                    Thread.sleep(sleepTime * 1000);
                } catch (InterruptedException ie) {
                    // エラーを無視します。
                }
                attempt++;
            }
        }
    
        public static void main(String[] args) {
            if (args.length < 3) {
                System.out.println("Usage: java -jar ThrottledTest.jar <host> <port> <password>");
                return;
            }
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
            String password = args[2];
    
            GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
            poolConfig.setMaxTotal(32);
            poolConfig.setMaxIdle(32);
            poolConfig.setMinIdle(16);
    
            int maxAttempts = 100; // 最大リトライ回数
            Duration maxTotalRetriesDuration = Duration.ofSeconds(1000); // リトライの最大合計期間
            PooledConnectionProvider provider = new PooledConnectionProvider(new HostAndPort(host, port),
                DefaultJedisClientConfig.builder().password(password).build(), poolConfig);
    
            ExceptionHandler handler = new ExceptionHandler();
            handler.register(
                message -> message.contains("THROTTLED"),
                new ExponentialBackoffCallback()
            );
    
            UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler);
            for (int i = 0; i < 4; i++) { // 4 つのスレッドを使用して高い QPS を生成します。
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < Integer.MAX_VALUE; i++) {
                            try {
                                unifiedJedis.set("" + i, "" + i);
                            } catch (Exception e) {
                                logger.error("Error occurred {}", e.getMessage());
                            }
                        }
                    }
                }).start();
            }
        }
    }

redis-py

この例では redis-py バージョン 6.1.1 を使用します。

import sys
import time
import threading
import logging
from redis import Redis, RedisError

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

MAX_RETRY = 10  # 最大リトライ回数

def execute_with_retry(redis_client, key, value):
    retry_count = 0
    while retry_count < MAX_RETRY:
        try:
            redis_client.set(key, value)
            break  # 成功した場合はループを終了します。
        except RedisError as e:
            if "THROTTLED" in str(e):
                logger.info(f"Throttled error occurred (attempt {retry_count}): {e}")
                retry_count += 1
                if retry_count >= MAX_RETRY:
                    logger.info("Max retry attempts reached.")
                    raise e
                sleep_time = 2 ** retry_count
                time.sleep(sleep_time)
            else:
                logger.error(f"Non-throttled Redis error: {e}")
                raise e

def worker(redis_client):
    i = 0
    while True:
        try:
            execute_with_retry(redis_client, f"key{i}", f"value{i}")
            i += 1
        except Exception as e:
            logger.exception(f"Unexpected error in worker: {e}")
            time.sleep(1)  # 永続的なエラーが発生した場合のタイトなループを回避します

def main():
    if len(sys.argv) < 4:
        print("Usage: python script.py <host> <port> <password>")
        return

    host = sys.argv[1]
    port = int(sys.argv[2])
    password = sys.argv[3]

    redis_client = Redis(
        host=host,
        port=port,
        password=password,
        socket_timeout=3,
        decode_responses=True
    )

    # 接続をテストします。
    try:
        redis_client.ping()
        logger.info("Successfully connected to Redis")
    except RedisError as e:
        logger.error(f"Failed to connect to Redis: {e}")
        return

    # 10 個のスレッドを作成して開始します。
    threads = []
    for i in range(10):
        thread = threading.Thread(target=worker, args=(redis_client,))
        thread.daemon = True
        thread.start()
        threads.append(thread)
        logger.info(f"Started worker thread {i}")

    # メインスレッドを実行し続けます。
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        logger.info("Program interrupted. Exiting...")

    # すべてのスレッドが完了するのを待ちます。
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main()