Pipeline

Last Updated: Jul 14, 2017

Scenario introduction

ApsaraDB for Redis provides a pipeline mechanism similar to that of Redis. A client interacts with a server through one-way pipelines, one for sending requests and the other for receiving responses. You can send operation requests successively from the client to the server; however, the client receives the response to each request from the server until it sends a quit message to the server.

Pipelines can be very useful, for example, when several operation commands need to be quickly submitted to the server, but the responses and operation results are not required right away. In this case, Pipelines can be used as a batch processing tool to optimize the performance. The performance is enhanced mainly because the overhead of the TCP connection is reduced.

However, the client using pipelines in the app connects to the server exclusively, and non-pipeline operations are blocked until the pipelines are closed. If you need to perform other operations at the same time, you can establish a dedicated connection for pipeline operations to separate them from conventional operations.

Sample code 1

Performance comparison

  1. package pipeline.kvstore.aliyun.com;
  2. import java.util.Date;
  3. import redis.clients.jedis.Jedis;
  4. import redis.clients.jedis.Pipeline;
  5. public class RedisPipelinePerformanceTest {
  6. static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
  7. static final int port = 6379;
  8. static final String password = "password";
  9. public static void main(String[] args) {
  10. Jedis jedis = new Jedis(host, port);
  11. //ApsaraDB for Redis instance password
  12. String authString = jedis.auth(password);// password
  13. if (!authString.equals("OK")) {
  14. System.err.println("AUTH Failed: " + authString);
  15. jedis.close();
  16. return;
  17. }
  18. //Execute several commands successively
  19. final int COUNT=5000;
  20. String key = "KVStore-Tanghan";
  21. // 1 ---Without using pipeline operations---
  22. jedis.del(key);//Initializes the key
  23. Date ts1 = new Date();
  24. for (int i = 0; i < COUNT; i++) {
  25. //Sends a request and receives the response
  26. jedis.incr(key);
  27. }
  28. Date ts2 = new Date();
  29. System.out.println("不用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts2.getTime() - ts1.getTime())+ "ms");
  30. //2 ----Using pipeline operations---
  31. jedis.del(key);//Initializes the key
  32. Pipeline p1 = jedis.pipelined();
  33. Date ts3 = new Date();
  34. for (int i = 0; i < COUNT; i++) {
  35. //Sends a request
  36. p1.incr(key);
  37. }
  38. //Receives the response
  39. p1.sync();
  40. Date ts4 = new Date();
  41. System.out.println("使用Pipeline > value为:"+jedis.get(key)+" > 操作用时:" + (ts4.getTime() - ts3.getTime())+ "ms");
  42. jedis.close();
  43. }
  44. }

Output 1

After you access the ApsaraDB for Redis instance with the correct address and password and run the preceding Java code, the following output is displayed. The output shows that the performance is enhanced with pipelines.

  1. Without pipelines > value: 5000 > Time elapsed: 5844&nbsp;ms
  2. With pipelines > value: 5000 > Time elapsed: 78&nbsp;ms

Sample code 2

With pipelines defined in Jedis, responses are processed in two methods, as shown in the following sample code:

  1. package pipeline.kvstore.aliyun.com;
  2. import java.util.List;
  3. import redis.clients.jedis.Jedis;
  4. import redis.clients.jedis.Pipeline;
  5. import redis.clients.jedis.Response;
  6. public class PipelineClientTest {
  7. static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
  8. static final int port = 6379;
  9. static final String password = "password";
  10. public static void main(String[] args) {
  11. Jedis jedis = new Jedis(host, port);
  12. //ApsaraDB for Redis instance password
  13. String authString = jedis.auth(password);// password
  14. if (!authString.equals("OK")) {
  15. System.err.println("AUTH Failed: " + authString);
  16. jedis.close();
  17. return;
  18. }
  19. String key = "KVStore-Test1";
  20. jedis.del(key);//Initialization
  21. // -------- Method 1
  22. Pipeline p1 = jedis.pipelined();
  23. System.out.println("-----方法1-----");
  24. for (int i = 0; i < 5; i++) {
  25. p1.incr(key);
  26. System.out.println("Pipeline发送请求");
  27. }
  28. // After sending all requests, the client starts receiving responses
  29. System.out.println("发送请求完成,开始接收响应");
  30. List<Object> responses = p1.syncAndReturnAll();
  31. if (responses == null || responses.isEmpty()) {
  32. jedis.close();
  33. throw new RuntimeException("Pipeline error: 没有接收到响应");
  34. }
  35. for (Object resp : responses) {
  36. System.out.println("Pipeline接收响应Response: " + resp.toString());
  37. }
  38. System.out.println();
  39. //-------- Method 2
  40. System.out.println("-----方法2-----");
  41. jedis.del(key);//Initialization
  42. Pipeline p2 = jedis.pipelined();
  43. //Declare the responses first
  44. Response<Long> r1 = p2.incr(key);
  45. System.out.println("Pipeline发送请求");
  46. Response<Long> r2 = p2.incr(key);
  47. System.out.println("Pipeline发送请求");
  48. Response<Long> r3 = p2.incr(key);
  49. System.out.println("Pipeline发送请求");
  50. Response<Long> r4 = p2.incr(key);
  51. System.out.println("Pipeline发送请求");
  52. Response<Long> r5 = p2.incr(key);
  53. System.out.println("Pipeline发送请求");
  54. try{
  55. r1.get(); //Errors are generated because the client has not yet started receiving responses
  56. }catch(Exception e){
  57. System.out.println(" <<< Pipeline error:还未开始接收响应 >>> ");
  58. }
  59. // After sending all requests, the client starts receiving responses
  60. System.out.println("发送请求完成,开始接收响应");
  61. p2.sync();
  62. System.out.println("Pipeline接收响应Response: " + r1.get());
  63. System.out.println("Pipeline接收响应Response: " + r2.get());
  64. System.out.println("Pipeline接收响应Response: " + r3.get());
  65. System.out.println("Pipeline接收响应Response: " + r4.get());
  66. System.out.println("Pipeline接收响应Response: " + r5.get());
  67. jedis.close();
  68. }
  69. }

Output 2

After you access the ApsaraDB for Redis instance with the correct address and password and run the above Java code, the following output is displayed:

  1. -----Method 1-----
  2. Pipeline sends a request
  3. Pipeline sends a request
  4. Pipeline sends a request
  5. Pipeline sends a request
  6. Pipeline sends a request
  7. After sending all requests, the client starts receiving responses
  8. Pipeline receives response 1
  9. Pipeline receives response 2
  10. Pipeline receives response 3
  11. Pipeline receives response 4
  12. Pipeline receives response 5
  13. -----Method 2-----
  14. Pipeline sends a request
  15. Pipeline sends a request
  16. Pipeline sends a request
  17. Pipeline sends a request
  18. Pipeline sends a request
  19. <<< Pipeline error: The client has not yet started receiving responses >>>
  20. After sending all requests, the client starts receiving responses
  21. Pipeline receives response 1
  22. Pipeline receives response 2
  23. Pipeline receives response 3
  24. Pipeline receives response 4
  25. Pipeline receives response 5
Thank you! We've received your feedback.