Publish and subscribe messages

Last Updated: Sep 05, 2017

Scenario introduction

ApsaraDB for Redis provides publishing (pub) and subscription (sub) functions, just like Redis. This allows multiple clients to subscribe to messages published by a client.

It must be noted that messages published using ApsaraDB for Redis are “non-persistent”. That means the message publisher is only responsible for publishing a message and does not save previously sent messages, whether or not they were received by anyone. Thus, messages are “lost once published.” Message subscribers receive messages published after their subscription. They will not receive the earlier messages in the channel.

In addition, a message publisher (publish client) does not connect to a server exclusively. While publishing messages, you can perform more operations (for example, List) from the same client at the same time. However, a message subscriber (subscribe client) connects to a server exclusively. That is, during the subscription, the client may not perform any other operations. Rather, the operations are blocked while the client is waiting for messages in the channel. Thus, message subscribers must use a dedicated server connection or thread (see the example below).

Sample code

For the message publisher (publish client)

  1. package message.kvstore.aliyun.com;
  2. import redis.clients.jedis.Jedis;
  3. public class KVStorePubClient {
  4. private Jedis jedis;//
  5. public KVStorePubClient(String host,int port, String password){
  6. jedis = new Jedis(host,port);
  7. //KVStore instance password
  8. String authString = jedis.auth(password);//password
  9. if (!authString.equals("OK"))
  10. {
  11. System.err.println("AUTH Failed: " + authString);
  12. return;
  13. }
  14. }
  15. public void pub(String channel,String message){
  16. System.out.println(" >>> PUBLISH > Channel:"+channel+" > Send Message:"+message);
  17. jedis.publish(channel, message);
  18. }
  19. public void close(String channel){
  20. System.out.println(" >>> PUBLISH End > Channel:"+channel+" > Message:quit");
  21. //The message publisher stops sending by sending a “quit” message
  22. jedis.publish(channel, "quit");
  23. }
  24. }

For the message subscriber (subscribe client)

  1. package message.kvstore.aliyun.com;
  2. import redis.clients.jedis.Jedis;
  3. import redis.clients.jedis.JedisPubSub;
  4. public class KVStoreSubClient extends Thread{
  5. private Jedis jedis;
  6. private String channel;
  7. private JedisPubSub listener;
  8. public KVStoreSubClient(String host,int port, String password){
  9. jedis = new Jedis(host,port);
  10. //ApsaraDB for Redis instance password
  11. String authString = jedis.auth(password);//password
  12. if (!authString.equals("OK"))
  13. {
  14. System.err.println("AUTH Failed: " + authString);
  15. return;
  16. }
  17. }
  18. public void setChannelAndListener(JedisPubSub listener,String channel){
  19. this.listener=listener;
  20. this.channel=channel;
  21. }
  22. private void subscribe(){
  23. if(listener==null || channel==null){
  24. System.err.println("Error:SubClient> listener or channel is null");
  25. }
  26. System.out.println(" >>> SUBSCRIBE > Channel:"+channel);
  27. System.out.println();
  28. //When the recipient is listening for subscribed messages, the process is blocked until the quit message is received (passively) or the subscription is cancelled actively
  29. jedis.subscribe(listener, channel);
  30. }
  31. public void unsubscribe(String channel){
  32. System.out.println(" >>> UNSUBSCRIBE > Channel:"+channel);
  33. System.out.println();
  34. listener.unsubscribe(channel);
  35. }
  36. @Override
  37. public void run() {
  38. try{
  39. System.out.println();
  40. System.out.println("----------SUBSCRIBE Begin-------");
  41. subscribe();
  42. System.out.println("----------SUBSCRIBE End-------");
  43. System.out.println();
  44. }catch(Exception e){
  45. e.printStackTrace();
  46. }
  47. }
  48. }

For the message listener

  1. package message.kvstore.aliyun.com;
  2. import redis.clients.jedis.JedisPubSub;
  3. public class KVStoreMessageListener extends JedisPubSub{
  4. @Override
  5. public void onMessage(String channel, String message) {
  6. System.out.println(" <<< SUBSCRIBE< Channel:" + channel + " >Recceived Message:" + message );
  7. System.out.println();
  8. //When a quit message is received, the subscription is cancelled (passively)
  9. if(message.equalsIgnoreCase("quit")){
  10. this.unsubscribe(channel);
  11. }
  12. }
  13. @Override
  14. public void onPMessage(String pattern, String channel, String message) {
  15. // TODO Auto-generated method stub
  16. }
  17. @Override
  18. public void onSubscribe(String channel, int subscribedChannels) {
  19. // TODO Auto-generated method stub
  20. }
  21. @Override
  22. public void onUnsubscribe(String channel, int subscribedChannels) {
  23. // TODO Auto-generated method stub
  24. }
  25. @Override
  26. public void onPUnsubscribe(String pattern, int subscribedChannels) {
  27. // TODO Auto-generated method stub
  28. }
  29. @Override
  30. public void onPSubscribe(String pattern, int subscribedChannels) {
  31. // TODO Auto-generated method stub
  32. }
  33. }

Sample main process

  1. package message.kvstore.aliyun.com;
  2. import java.util.UUID;
  3. import redis.clients.jedis.JedisPubSub;
  4. public class KVStorePubSubTest {
  5. //Information for connecting ApsaraDB for Redis, which can be obtained from the console
  6. static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
  7. static final int port = 6379;
  8. static final String password="password";//password
  9. public static void main(String[] args) throws Exception{
  10. KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
  11. final String channel = "KVStore channel-A";
  12. //The message sender starts sending messages, but there are no subscribers, the messages will not be received
  13. pubClient.pub(channel, "Alibaba Cloud message 1:(no subscribers,this message will not be received)");
  14. //Message recipient
  15. KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
  16. JedisPubSub listener = new KVStoreMessageListener();
  17. subClient.setChannelAndListener(listener, channel);
  18. //The message recipient starts subscribing
  19. subClient.start();
  20. //The message sender continues sending messages
  21. for (int i = 0; i < 5; i++) {
  22. String message=UUID.randomUUID().toString();
  23. pubClient.pub(channel, message);
  24. Thread.sleep(1000);
  25. }
  26. //The message recipient cancels the subscription
  27. subClient.unsubscribe(channel);
  28. Thread.sleep(1000);
  29. pubClient.pub(channel, "Alibaba Cloud message 2:(subscription cancaled, this message will not be received)");
  30. //The message publisher stops sending by sending a “quit” message
  31. //When other message recipients, if any, receive "quit" in listener.onMessage(), the "unsubscribe" operation is performed.
  32. pubClient.close(channel);
  33. }
  34. }

Output

After you access the ApsaraDB for Redis instance with the correct address and password and run the above Java code, the following output is displayed.

  1. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message Alibaba Cloud Message 1: (there are no subscribers yet, so no one would receive the message)
  2. ----------SUBSCRIBE starts-------
  3. >>> SUBSCRIBE > Channel:KVStore Channel-A
  4. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  5. <<< SUBSCRIBE < Channel:KVStore Channel-A > Receives the message 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  6. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message ed5924a9-016b-469b-8203-7db63d06f812
  7. <<< SUBSCRIBE < Channel:KVStore Channel-A > Receives the message ed5924a9-016b-469b-8203-7db63d06f812
  8. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message f1f84e0f-8f35-4362-9567-25716b1531cd
  9. <<< SUBSCRIBE < Channel:KVStore Channel-A > Receives the message f1f84e0f-8f35-4362-9567-25716b1531cd
  10. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message 746bde54-af8f-44d7-8a49-37d1a245d21b
  11. <<< SUBSCRIBE < Channel:KVStore Channel-A > Receives the message 746bde54-af8f-44d7-8a49-37d1a245d21b
  12. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  13. <<< SUBSCRIBE < Channel:KVStore Channel-A > Receives the message 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  14. >>> UNSUBSCRIBE > Channel:KVStore Channel-A
  15. ----------SUBSCRIBE ends-------
  16. >>> PUBLISH > Channel:KVStore Channel-A > Sends the message Alibaba Cloud Message 2: (the subscription has been cancelled, so the message is not received)
  17. >>> PUBLISH ends > Channel:KVStore Channel-A > Message: quit

The example above demonstrates a situation with one publisher and one subscriber. In actual situations, there may be many publishers and subscribers, as well as many message channels. In this case, you have to make slightly changes to the code above.

Thank you! We've received your feedback.