Similar to Redis, ApsaraDB for Redis provides publishing (pub) and subscription (sub) features. ApsaraDB for Redis allows multiple clients to subscribe to messages published by a client.
Scenario
Messages published by ApsaraDB for Redis are non-persistent. This means the message publisher is only responsible for publishing a message and does not save previously sent messages, regardless of whether these messages were received. Thus, messages are lost after being published. Message subscribers can only receive messages after they have subscribed to the publisher. They will not receive the earlier messages in the channel.
In addition, the message sender (publisher client) does not necessarily connect to a server exclusively. While you are publishing messages, you can also perform other operations (for example, the List operations) from the same client at the same time. However, the message receiver (subscriber client) needs to connect to a server exclusively. That is, during the subscription period, the client cannot perform any other operations. The operations are blocked while the client is waiting for messages in the channel. Therefore, message subscribers must use a dedicated server or a separate thread to receive messages (see the following example).
Sample code
For the message sender (publisher client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
private Jedis jedis;
public KVStorePubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//The password of the ApsaraDB for Redis instance.
String authString = jedis.auth(password);
if (! authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void pub(String channel,String message){
System.out.println(" >>> PUBLISH > Channel:"+channel+" > message sent: "+message);
jedis.publish(channel, message);
}
public void close(String channel){
System.out.println(" >>> PUBLISH ends > Channel: "+channel+" > Message:quit");
//The message publisher stops sending by sending a quit message.
jedis.publish(channel, "quit");
}
}
For the message receiver (subscriber client)
package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
private Jedis jedis;
private String channel;
private JedisPubSub listener;
public KVStoreSubClient(String host,int port, String password){
jedis = new Jedis(host,port);
//The password of the ApsaraDB for Redis instance.
String authString = jedis.auth(password);//password
if (! authString.equals("OK"))
{
System.err.println("AUTH Failed: " + authString);
return;
}
}
public void setChannelAndListener(JedisPubSub listener,String channel){
this.listener=listener;
this.channel=channel;
}
private void subscribe(){
if(listener==null || channel==null){
System.err.println("Error:SubClient> listener or channel is null");
}
System.out.println(" >>> SUBSCRIBE > Channel:"+channel);
System.out.println();
//When the receiver is listening for subscribed messages, the process is blocked until the quit message is received (in a passive manner) or the subscription is actively canceled.
jedis.subscribe(listener, channel);
}
public void unsubscribe(String channel){
System.out.println(" >>> UNSUBSCRIBE > Channel:"+channel);
System.out.println();
listener.unsubscribe(channel);
}
@Override
public void run() {
try{
System.out.println();
System.out.println("---------SUBSCRIBE begins-------");
subscribe();
System.out.println("----------SUBSCRIBE ends-------");
System.out.println();
}catch(Exception e){
e.printStackTrace();
}
}
}
For the message listener
package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
@Override
public void onMessage(String channel, String message) {
System.out.println(" <<< SUBSCRIBE< Channel: " + channel + ">Message received: " + message );
System.out.println();
//When a quit message is received, the subscription is canceled (in a passive manner).
if(message.equalsIgnoreCase("quit")){
this.unsubscribe(channel);
}
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// TODO Auto-generated method stub
}
}
Sample main code block
package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
//The connection information of ApsaraDB for Redis. This information can be obtained from the console.
static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
static final int port = 6379;
static final String password="password";//password
public static void main(String[] args) throws Exception{
KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
final String channel = "KVStore Channel-A";
//The message sender starts sending messages, but no clients have subscribed to the channel, so the messages will not be received.
pubClient.pub(channel, "Alibaba Cloud message 1: (No subscribers. This message will not be received)");
//The message receiver.
KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
JedisPubSub listener = new KVStoreMessageListener();
subClient.setChannelAndListener(listener, channel);
//The message receiver subscribes.
subClient.start();
//The message sender continues sending messages.
for (int i = 0; i < 5; i++) {
String message=UUID.randomUUID().toString();
pubClient.pub(channel, message);
Thread.sleep(1000);
}
//The message receiver unsubscribes.
subClient.unsubscribe(channel);
Thread.sleep(1000);
pubClient.pub(channel, "Alibaba Cloud message 2:(Subscription canceled. This message will not be received)");
//The message publisher stops sending by sending a quit message.
//When other message receivers receive quit in listener.onMessage(), the UNSUBSCRIBE operation is performed.
pubClient.close(channel);
}
}
Returned result
After you access the ApsaraDB for Redis instance with the correct address and password and run the preceding Java code, the following output is displayed:
>>> PUBLISH > Channel:KVStore Channel-A > Sends the message Aliyun Message 1: (No subscribers. This message will not be received)
----------SUBSCRIBE starts-------
>>> SUBSCRIBE > Channel: KVStore Channel-A
>>> PUBLISH > Channel: KVStore Channel-A> sends message: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
<<< SUBSCRIBE < Channel:KVStore Channel-A >receives message: 0f9c2cee-77c7-4498-89a0-1dc5a2f65889
>>> PUBLISH > Channel: KVStore Channel-A> sends message: ed5924a9-016b-469b-8203-7db63d06f812
<<< SUBSCRIBE < Channel:KVStore Channel-A >receives message: ed5924a9-016b-469b-8203-7db63d06f812
>>> PUBLISH > Channel: KVStore Channel-A> sends message: f1f84e0f-8f35-4362-9567-25716b1531cd
<<< SUBSCRIBE < Channel:KVStore Channel-A >receives message: f1f84e0f-8f35-4362-9567-25716b1531cd
>>> PUBLISH > Channel: KVStore Channel-A> sends message: 746bde54-af8f-44d7-8a49-37d1a245d21b
<<< SUBSCRIBE< Channel:KVStore Channel-A >receives message: 746bde54-af8f-44d7-8a49-37d1a245d21b
>>> PUBLISH > Channel: KVStore Channel-A> sends message: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
<<< SUBSCRIBE < Channel:KVStore Channel-A >receives message: 8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
>>> UNSUBSCRIBE > Channel: KVStore Channel-A
----------SUBSCRIBE ends-------
>>> PUBLISH > Channel:KVStore Channel-A > sends the message Aliyun Message 2: (The subscription has been canceled, so the message will not be received)
>>> PUBLISH ends> Channel:KVStore Channel-A > Message:quit
The preceding example demonstrates a situation where only one publisher and one subscriber are involved. There can be multiple publishers, subscribers, and even multiple message channels. In such scenarios, you are required to change the code to fit the scenario.