QPS スロットリング機能を有効にすると、インスタンスへのリクエストが指定されたしきい値を超えた場合に THROTTLED エラーが発生することがあります。このトピックでは、これらのエラーを処理するためのリトライ戦略を実装するコードサンプルを提供します。
Jedis
この例では、Jedis バージョン 5.2.0 を使用します。
リトライパラメーター
| パラメーター | 値 | 説明 |
|---|---|---|
MAX_RETRY |
10 | 例外をスローする前の最大リトライ回数 |
| 初期バックオフ | 2 s | 最初のリトライまでの待機時間 (2¹ 秒) |
| バックオフ乗数 | 2x | 後続のリトライごとに待機時間が倍になります:2 s、4 s、8 s... |
| リトライ対象のエラー | THROTTLED |
THROTTLED エラーのみがリトライされます。他のすべてのエラーは即座に伝播されます |
Maven 依存関係
<!-- この例ではバージョン 5.2.0 を使用します。 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>5.2.0</version>
</dependency>
コードサンプル
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisException;
public class JedisThrottledTest {
private static final Logger logger = LoggerFactory.getLogger(JedisThrottledTest.class);
private static final int MAX_RETRY = 10; // 最大リトライ回数
public static void main(String[] args) {
if (args.length < 3) {
System.out.println("Usage: java -jar JedisThrottledTest.jar <host> <port> <password>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
String password = args[2];
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(32);
poolConfig.setMaxIdle(32);
poolConfig.setMinIdle(16);
JedisPool jedisPool = new JedisPool(poolConfig, host, port, 3000, password);
for (int i = 0; i < 4; i++) {
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executeWithRetry(jedisPool, "key" + i, "value" + i);
}
}
}).start();
}
}
private static void executeWithRetry(JedisPool jedisPool, String key, String value) {
int retryCount = 0;
while (retryCount < MAX_RETRY) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.set(key, value);
break;
} catch (JedisException e) {
if (e.getMessage().contains("THROTTLED")) {
logger.info("Throttled error occurred (attempt " + retryCount + "): " + e.getMessage());
retryCount++;
if (retryCount >= MAX_RETRY) {
logger.info("Max retry attempts reached.");
throw e;
}
try {
int sleepTime = (int)Math.pow(2, retryCount);
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted during retry delay", ie);
}
} else {
throw e;
}
}
}
}
}
Valkey-java
重要
valkey-java バージョン 5.4.0 以降を使用してください。
リトライパラメーター
| パラメーター | 値 | 説明 |
|---|---|---|
maxAttempts |
100 | 最大リトライ回数 |
maxTotalRetriesDuration |
1,000 s | リトライに費やされる最大合計時間 |
| 初期バックオフ | 1 s | 最初のリトライまでの待機時間 (2⁰ 秒) |
| バックオフ乗数 | 2x | 後続のリトライごとに待機時間が倍になります:1 s、2 s、4 s... |
| リトライ対象のエラー | THROTTLED |
THROTTLED を含むメッセージのみがバックオフコールバックをトリガーします |
Maven 依存関係
<dependency>
<groupId>io.valkey</groupId>
<artifactId>valkey-java</artifactId>
<version>5.4.0</version>
</dependency>
コードサンプル
Valkey-java の ExceptionHandler を使用すると、特定のエラーパターンに対してコールバックを登録できます。この例では、THROTTLED エラーに対してエクスポネンシャルバックオフのコールバックを登録しているため、リトライロジックがアプリケーションコードから分離されます。
import java.time.Duration;
import io.valkey.DefaultJedisClientConfig;
import io.valkey.ExceptionHandler;
import io.valkey.HostAndPort;
import io.valkey.UnifiedJedis;
import io.valkey.providers.PooledConnectionProvider;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThrottledTest {
private static final Logger logger = LoggerFactory.getLogger(ThrottledTest.class);
/**
* エクスポネンシャルバックオフを実装します。
*/
static class ExponentialBackoffCallback implements ExceptionHandler.ErrorCallback {
private int attempt = 0;
@Override
public void onError(String errorMessage) {
int sleepTime = (int)Math.pow(2, attempt);
try {
logger.info("Sleeping for " + sleepTime + " seconds before handling: " + errorMessage);
Thread.sleep(sleepTime * 1000);
} catch (InterruptedException ie) {
// エラーを無視します。
}
attempt++;
}
}
public static void main(String[] args) {
if (args.length < 3) {
System.out.println("Usage: java -jar ThrottledTest.jar <host> <port> <password>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
String password = args[2];
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(32);
poolConfig.setMaxIdle(32);
poolConfig.setMinIdle(16);
int maxAttempts = 100; // 最大リトライ回数
Duration maxTotalRetriesDuration = Duration.ofSeconds(1000); // リトライの最大合計期間
PooledConnectionProvider provider = new PooledConnectionProvider(new HostAndPort(host, port),
DefaultJedisClientConfig.builder().password(password).build(), poolConfig);
ExceptionHandler handler = new ExceptionHandler();
handler.register(
message -> message.contains("THROTTLED"),
new ExponentialBackoffCallback()
);
UnifiedJedis unifiedJedis = new UnifiedJedis(provider, maxAttempts, maxTotalRetriesDuration, handler);
for (int i = 0; i < 4; i++) { // 4 つのスレッドを使用して高い QPS を生成します。
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
try {
unifiedJedis.set("" + i, "" + i);
} catch (Exception e) {
logger.error("Error occurred {}", e.getMessage());
}
}
}
}).start();
}
}
}
redis-py
この例では、redis-py バージョン 6.1.1 を使用します。
リトライパラメーター
| パラメーター | 値 | 説明 |
|---|---|---|
MAX_RETRY |
10 | 例外を発生させる前の最大リトライ回数 |
| 初期バックオフ | 2 s | 最初のリトライまでの待機時間 (2¹ 秒) |
| バックオフ乗数 | 2x | 後続のリトライごとに待機時間が倍になります:2 s、4 s、8 s... |
socket_timeout |
3 s | 接続タイムアウト |
| リトライ対象のエラー | THROTTLED |
THROTTLED エラーのみがリトライされます。他のすべての RedisError 例外は即座に伝播されます |
コードサンプル
import sys
import time
import threading
import logging
from redis import Redis, RedisError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
MAX_RETRY = 10 # 最大リトライ回数
def execute_with_retry(redis_client, key, value):
retry_count = 0
while retry_count < MAX_RETRY:
try:
redis_client.set(key, value)
break # 成功した場合は、ループを終了します。
except RedisError as e:
if "THROTTLED" in str(e):
logger.info(f"Throttled error occurred (attempt {retry_count}): {e}")
retry_count += 1
if retry_count >= MAX_RETRY:
logger.info("Max retry attempts reached.")
raise e
sleep_time = 2 ** retry_count
time.sleep(sleep_time)
else:
logger.error(f"Non-throttled Redis error: {e}")
raise e
def worker(redis_client):
i = 0
while True:
try:
execute_with_retry(redis_client, f"key{i}", f"value{i}")
i += 1
except Exception as e:
logger.exception(f"Unexpected error in worker: {e}")
time.sleep(1) # 永続的なエラーが発生した場合のタイトなループを回避します
def main():
if len(sys.argv) < 4:
print("Usage: python script.py <host> <port> <password>")
return
host = sys.argv[1]
port = int(sys.argv[2])
password = sys.argv[3]
redis_client = Redis(
host=host,
port=port,
password=password,
socket_timeout=3,
decode_responses=True
)
# 接続をテストします。
try:
redis_client.ping()
logger.info("Successfully connected to Redis")
except RedisError as e:
logger.error(f"Failed to connect to Redis: {e}")
return
# 10 個のスレッドを作成して開始します。
threads = []
for i in range(10):
thread = threading.Thread(target=worker, args=(redis_client,))
thread.daemon = True
thread.start()
threads.append(thread)
logger.info(f"Started worker thread {i}")
# メインスレッドを実行し続けます。
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Program interrupted. Exiting...")
# すべてのスレッドが完了するのを待ちます。
for thread in threads:
thread.join()
if __name__ == "__main__":
main()