Pipeline

Last Updated: Dec 07, 2017

Scenario introduction

ApsaraDB for Redis provides a pipeline mechanism similar to that of Redis. A client interacts with a server through one-way pipelines, one for sending requests and the other for receiving responses. You can send operation requests successively from the client to the server; however, the client receives the response to each request from the server until it sends a quit message to the server.

Pipelines can be useful, for example, when several operation commands need to be quickly submitted to the server, but the responses and operation results are not required right away. In this case, Pipelines can be used as a batch processing tool to optimize the performance. The performance is enhanced mainly because the overhead of the TCP connection is reduced.

However, the client using pipelines in the app connects to the server exclusively, and non-pipeline operations are blocked until the pipelines are closed. If you must perform other operations at the same time, you can establish a dedicated connection for pipeline operations to separate them from conventional operations.

Sample code 1

Performance comparison

  1. package pipeline.kvstore.aliyun.com;
  2. import java.util.Date;
  3. import redis.clients.jedis.Jedis;
  4. import redis.clients.jedis.Pipeline;
  5. public class RedisPipelinePerformanceTest {
  6. static final String host = "xxxxxx.m.cnhza.kvstore.aliyuncs.com";
  7. static final int port = 6379;
  8. static final String password = "password";
  9. public static void main(String[] args) {
  10. Jedis jedis = new Jedis(host, port);
  11. //ApsaraDB for Redis instance password
  12. String authString = jedis.auth(password);// password
  13. if (!authString.equals("OK")) {
  14. System.err.println("AUTH Failed: " + authString);
  15. jedis.close();
  16. return;
  17. }
  18. //Execute several commands successively
  19. final int COUNT=5000;
  20. String key = "KVStore-Tanghan";
  21. // 1 ---Without using pipeline operations---
  22. jedis.del(key);//Initializes the key
  23. Date ts1 = new Date();
  24. for (int i = 0; i < COUNT; i++) {
  25. //Sends a request and receives the response
  26. jedis.incr(key);
  27. }
  28. Date ts2 = new Date();
  29. System.out.println("Without Pipeline > value is:"+jedis.get(key)+" > Operating time:" + (ts2.getTime() - ts1.getTime())+ "ms");
  30. //2 ----Using pipeline operations---
  31. jedis.del(key);//Initializes the key
  32. Pipeline p1 = jedis.pipelined();
  33. Date ts3 = new Date();
  34. for (int i = 0; i < COUNT; i++) {
  35. //Sends a request
  36. p1.incr(key);
  37. }
  38. //Receives the response
  39. p1.sync();
  40. Date ts4 = new Date();
  41. System.out.println("Using Pipeline > value is:"+jedis.get(key)+" > Operating time:" + (ts4.getTime() - ts3.getTime())+ "ms");
  42. jedis.close();
  43. }
  44. }

Output 1

After you access the ApsaraDB for Redis instance with the correct address and password and run the preceding Java code, the following output is displayed. The output shows that the performance is enhanced with pipelines.

  1. Without pipelines > value: 5000 > Time elapsed: 5844&nbsp;ms
  2. With pipelines > value: 5000 > Time elapsed: 78&nbsp;ms

Sample code 2

With pipelines defined in Jedis, responses are processed in two methods, as shown in the following sample code:

  1. package pipeline.kvstore.aliyun.com;
  2. import java.util.List;
  3. import redis.clients.jedis.Jedis;
  4. import redis.clients.jedis.Pipeline;
  5. import redis.clients.jedis.Response;
  6. public class PipelineClientTest {
  7. static final String host = "xxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
  8. static final int port = 6379;
  9. static final String password = "password";
  10. public static void main(String[] args) {
  11. Jedis jedis = new Jedis(host, port);
  12. //ApsaraDB for Redis instance password
  13. String authString = jedis.auth(password);// password
  14. if (!authString.equals("OK")) {
  15. System.err.println("AUTH Failed: " + authString);
  16. jedis.close();
  17. return;
  18. }
  19. String key = "KVStore-Test1";
  20. jedis.del(key);//Initialization
  21. // -------- Method 1
  22. Pipeline p1 = jedis.pipelined();
  23. System.out.println("-----Method 1-----");
  24. for (int i = 0; i < 5; i++) {
  25. p1.incr(key);
  26. System.out.println("Pipeline send requests");
  27. }
  28. // After sending all requests, the client starts receiving responses
  29. System.out.println("Sending requests completed,start to receive response");
  30. List<Object> responses = p1.syncAndReturnAll();
  31. if (responses == null || responses.isEmpty()) {
  32. jedis.close();
  33. throw new RuntimeException("Pipeline error: no response received");
  34. }
  35. for (Object resp : responses) {
  36. System.out.println("Pipeline received response: " + resp.toString());
  37. }
  38. System.out.println();
  39. //-------- Method 2
  40. System.out.println("-----Method 2-----");
  41. jedis.del(key);//Initialization
  42. Pipeline p2 = jedis.pipelined();
  43. //Declare the responses first
  44. Response<Long> r1 = p2.incr(key);
  45. System.out.println("Pipeline sending requests");
  46. Response<Long> r2 = p2.incr(key);
  47. System.out.println("Pipeline sending requests");
  48. Response<Long> r3 = p2.incr(key);
  49. System.out.println("Pipeline sending requests");
  50. Response<Long> r4 = p2.incr(key);
  51. System.out.println("Pipeline sending requests");
  52. Response<Long> r5 = p2.incr(key);
  53. System.out.println("Pipeline sending requests");
  54. try{
  55. r1.get(); //Errors are generated because the client has not yet started receiving responses
  56. }catch(Exception e){
  57. System.out.println(" <<< Pipeline error: not start to receive response yet >>> ");
  58. }
  59. // After sending all requests, the client starts receiving responses
  60. System.out.println("Sending requests completed, start to receive response");
  61. p2.sync();
  62. System.out.println("Pipeline receiving response: " + r1.get());
  63. System.out.println("Pipeline receiving response: " + r2.get());
  64. System.out.println("Pipeline receiving response: " + r3.get());
  65. System.out.println("Pipeline receiving response: " + r4.get());
  66. System.out.println("Pipeline receiving response: " + r5.get());
  67. jedis.close();
  68. }
  69. }

Output 2

After you access the ApsaraDB for Redis instance with the correct address and password and run the preceding Java code, the following output is displayed:

  1. -----Method 1-----
  2. Pipeline sends a request
  3. Pipeline sends a request
  4. Pipeline sends a request
  5. Pipeline sends a request
  6. Pipeline sends a request
  7. After sending all requests, the client starts receiving responses
  8. Pipeline receives response 1
  9. Pipeline receives response 2
  10. Pipeline receives response 3
  11. Pipeline receives response 4
  12. Pipeline receives response 5
  13. -----Method 2-----
  14. Pipeline sends a request
  15. Pipeline sends a request
  16. Pipeline sends a request
  17. Pipeline sends a request
  18. Pipeline sends a request
  19. <<< Pipeline error: The client has not yet started receiving responses >>>
  20. After sending all requests, the client starts receiving responses
  21. Pipeline receives response 1
  22. Pipeline receives response 2
  23. Pipeline receives response 3
  24. Pipeline receives response 4
  25. Pipeline receives response 5
Thank you! We've received your feedback.