Scenario introduction

Similar to Redis, ApsaraDB for Redis provides publishing (pub) and subscription (sub) features. ApsaraDB for Redis allows multiple clients to subscribe to messages published by a client.

It must be noted that messages published using ApsaraDB for Redis are "non-persistent". This means the message publisher is only responsible for publishing a message and does not save previously sent messages, whether or not these messages were received. Thus, messages are "lost once published". Message subscribers can only receive messages that are subscribed. They will not receive the earlier messages in the channel.

In addition, the message publisher (publish client) does not necessarily connect to a server exclusively. While publishing messages, you can perform other operations (for example, the List operations) from the same client at the same time. However, the message subscriber (subscribe client) needs to connect to a server exclusively. That is, during the subscription period, the client may not perform any other operations. Rather, the operations are blocked while the client is waiting for messages in the channel. Therefore, message subscribers must use a dedicated server connection or thread (see the following example).

Sample code

For the message publisher (publish client)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
    private Jedis jedis;
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        //KVStore instance password
        String authString = jedis.auth(password);
        if (! authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System. out. println ("> Publish> channel: "+ channel +"> message sent: "+ message );
        jedis.publish(channel, message);
    }
    public void close(String channel){
         System.out.println(" >>> PUBLISH End > Channel:"+channel+" > Message:quit");
         //The message publisher stops sending by sending a “quit” message
        jedis.publish(channel, "quit");
    }
}

For the message subscriber (subscribe client)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
                //ApsaraDB for Redis instance password
                String authString = jedis.auth(password); //password
                if (! authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> SUBSCRIBE > Channel:"+channel);
        System.out.println();
         //When the recipient is listening for subscribed messages, the process is blocked until the quit message is received (passively) or the subscription is canceled actively
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println(" >>> UNSUBSCRIBE > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("---------Subscription begins-------");
            subscribe();
            System.out.println("----------Subscription ends-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

For the message listener

package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< SUBSCRIBE< Channel:" + channel + " >Message received:" + message );
        System.out.println();
        //When a quit message is received, the subscription is canceled (passively)
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO Auto-generated method stub
    }
}

Sample main process

package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
    //The connection information of ApsaraDB for Redis. This information can be obtained from the console
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="password";//password
    public static void main(String[] args) throws Exception{
            KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
            final String channel = "KVStore Channel-A";
            //The message sender starts sending messages, but there are no subscribers, so the messages will not be received
            pubClient.pub(channel, "Alibaba Cloud message 1: (No subscribers. This message will not be received)");
            // Message recipient
            KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
            JedisPubSub listener = new KVStoreMessageListener();
            subClient.setChannelAndListener(listener, channel);
            // Message recipient starts subscribing
            subClient.start();
            //The message sender continues sending messages
            for (int i = 0; i < 5; i++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }
            // The message recipient cancels the subscription
            subClient.unsubscribe(channel);
            Thread.sleep(1000);
            pubClient.pub(channel, "Alibaba Cloud message 2:(Subscription canceled. This message will not be received)");
             //The message publisher stops sending by sending a “quit” message
            //When other message recipients, if any, receive "quit" in listener.onMessage(), the "unsubscribe" operation is performed.
            pubClient.close(channel);
        }
    }

Output

After you access the ApsaraDB for Redis instance with the correct address and password and run the preceding Java code, the following output is displayed.

  >>> PUBLISH > Channel:KVStore Channel-A > Sends the message Aliyun Message 1: (No subscribers. This message will not be received)
----------Subscription starts-------
  >>> SUBSCRIBE> Channel: KVStore Channel-A
  >>> PUBLISH> Channel: KVStore Channel-A> sends message: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  <<< SUBSCRIBE< Channel:KVStore Channel-A >receives message: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  >>> PUBLISH> Channel: KVStore Channel-A> sends message: ed5924a9-016b-469b-8203-7db63d06f812
  <<< SUBSCRIBE< Channel:KVStore Channel-A >receives message: ed5924a9-016b-469b-8203-7db63d06f812
  >>> PUBLISH> Channel: KVStore Channel-A> sends message: f1f84e0f-8f35-4362-9567-25716b1531cd
  <<< SUBSCRIBE< Channel:KVStore Channel-A >receives message: f1f84e0f-8f35-4362-9567-25716b1531cd 
  >>> PUBLISH> Channel: KVStore Channel-A> sends message: 746bde54-af8f-44d7-8a49-37d1a245d21b
  <<< SUBSCRIBE< Channel:KVStore Channel-A >receives message: 746bde54-af8f-44d7-8a49-37d1a245d21b
  >>> PUBLISH> Channel: KVStore Channel-A> sends message: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  <<< SUBSCRIBE< Channel:KVStore Channel-A >receives message: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  >>> UNSUBSCRIBE> Channel: KVStore Channel-A
----------Subscription ends-------
  >>> PUBLISH > Channel:KVStore Channel-A > sends the message Aliyun Message 2: (The subscription has been canceled, so the message will not be received)
  >>> PUBLISH ends> Channel:KVStore Channel-A > Message:quit

The preceding example demonstrates a situation with one publisher and one subscriber. There can be multiple publishers, subscribers, and even multiple message channels. In such scenarios, you are required to slightly change the code to fit the scenario.