すべてのプロダクト
Search
ドキュメントセンター

Tair (Redis® OSS-Compatible):メッセージのパブリッシュとサブスクライブ

最終更新日:Jun 11, 2026

Tair (Redis OSS-compatible) は、Redis の Pub/Sub 機能をサポートしています。あるクライアントがチャネルにメッセージをパブリッシュし、他のクライアントがそれらを受信するためにサブスクライブします。

Pub/Sub の仕組み

Tair (Redis OSS-compatible) によってパブリッシュされるメッセージは非永続であり、ファイアアンドフォーゲットモデルに従います。パブリッシャーはサブスクライバーの有無を確認したり、メッセージを保存したりしません。サブスクライバーは、サブスクライブした後にパブリッシュされたメッセージのみを受信します。

パブリッシャーは専用の接続を必要としません。パブリッシュ中も同じ接続を他の操作に使用できます。しかし、サブスクライバーはメッセージを待つ間ブロックするため、専用の接続が必要です。各サブスクライバーには、個別の接続またはスレッドを使用してください。

コード例

パブリッシャー (パブリッシュクライアント)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
public class KVStorePubClient {
    private Jedis jedis;
    public KVStorePubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
        // インスタンスのパスワード
        String authString = jedis.auth(password);
        if (!authString.equals("OK"))
        {
            System.err.println("AUTH Failed: " + authString);
            return;
        }
    }
    public void pub(String channel,String message){
        System.out.println("  >>> PUBLISH > Channel:"+channel+" > Sent Message:"+message);
        jedis.publish(channel, message);
    }
    public void close(String channel){
        System.out.println("  >>> PUBLISH ends > Channel:"+channel+" > Message:quit");
        // パブリッシャーは "quit" メッセージを送信してメッセージの送信を停止します。
        jedis.publish(channel, "quit");
    }
}

サブスクライバー (サブスクライブクライアント)

package message.kvstore.aliyun.com;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class KVStoreSubClient extends Thread{
    private Jedis jedis;
    private String channel;
    private JedisPubSub listener;
    public KVStoreSubClient(String host,int port, String password){
        jedis = new Jedis(host,port);
                // インスタンスのパスワード
                String authString = jedis.auth(password); // パスワード
                if (!authString.equals("OK"))
                {
                    System.err.println("AUTH Failed: " + authString);
                    return;
                }
    }
    public void setChannelAndListener(JedisPubSub listener,String channel){
        this.listener=listener;
        this.channel=channel;
    }
    private void subscribe(){
        if(listener==null || channel==null){
            System.err.println("Error:SubClient> listener or channel is null");
        }
        System.out.println("  >>> SUBSCRIBE > Channel:"+channel);
        System.out.println();
        // サブスクライブしたメッセージをリッスンする際、受信側は "quit" メッセージを受信する (受動的) か、明示的にアンサブスクライブするまでプロセスをブロックします。
        jedis.subscribe(listener, channel);
    }
    public void unsubscribe(String channel){
        System.out.println("  >>> UNSUBSCRIBE > Channel:"+channel);
        System.out.println();
        listener.unsubscribe(channel);
    }
    @Override
    public void run() {
        try{
            System.out.println();
            System.out.println("----------SUBSCRIBE starts-------");
            subscribe();
            System.out.println("----------SUBSCRIBE ends-------");
            System.out.println();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

メッセージリスナー

package message.kvstore.aliyun.com;
import redis.clients.jedis.JedisPubSub;
public class KVStoreMessageListener extends JedisPubSub{
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("  <<< SUBSCRIBE < Channel:" + channel + " > Received Message:" + message );
        System.out.println();
        // 受信したメッセージが "quit" の場合、チャネルからアンサブスクライブします (受動的)。
        if(message.equalsIgnoreCase("quit")){
            this.unsubscribe(channel);
        }
    }
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        // TODO 自動生成されたメソッドスタブ
    }
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        // TODO 自動生成されたメソッドスタブ
    }
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        // TODO 自動生成されたメソッドスタブ
    }
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        // TODO 自動生成されたメソッドスタブ
    }
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
        // TODO 自動生成されたメソッドスタブ
    }
}

デモのメインプログラム

package message.kvstore.aliyun.com;
import java.util.UUID;
import redis.clients.jedis.JedisPubSub;
public class KVStorePubSubTest {
    // コンソールから取得できるインスタンスの接続情報。
    static final String host = "xxxxxxxxxx.m.cnhza.kvstore.aliyuncs.com";
    static final int port = 6379;
    static final String password="password";// パスワード。
    public static void main(String[] args) throws Exception{
            KVStorePubClient pubClient = new KVStorePubClient(host, port,password);
            final String channel = "KVStore-Channel-A";
            // パブリッシャーがメッセージの送信を開始します。この時点では誰もサブスクライブしていないため、このメッセージは受信されません。
            pubClient.pub(channel, "Alibaba Cloud Message 1: (No one has subscribed yet, so this message will not be received)");
            // メッセージのサブスクライバー。
            KVStoreSubClient subClient = new KVStoreSubClient(host, port,password);
            JedisPubSub listener = new KVStoreMessageListener();
            subClient.setChannelAndListener(listener, channel);
            // サブスクライバーがサブスクライブを開始します。
            subClient.start();
            // パブリッシャーはメッセージの送信を続けます。
            for (int i = 0; i < 5; i++) {
                String message=UUID.randomUUID().toString();
                pubClient.pub(channel, message);
                Thread.sleep(1000);
            }
            // サブスクライバーが明示的にアンサブスクライブします。
            subClient.unsubscribe(channel);
            Thread.sleep(1000);
            pubClient.pub(channel, "Alibaba Cloud Message 2: (The subscription is canceled, so this message will not be received)");
            // パブリッシャーは "quit" メッセージを送信してメッセージの送信を停止します。
            // 他のサブスクライバーがいる場合、listener.onMessage() で "quit" を受信したときに "unsubscribe" 操作を実行します。
            pubClient.close(channel);
        }
    }

出力例

お使いの Tair (Redis OSS-compatible) インスタンスの正しいエンドポイントとパスワードを使用して Java プログラムを実行してください。期待される出力は以下の通りです:

  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:Alibaba Cloud Message 1: (No one has subscribed yet, so this message will not be received)
----------SUBSCRIBE starts-------
  >>> SUBSCRIBE > Channel:KVStore-Channel-A
  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  <<< SUBSCRIBE < Channel:KVStore-Channel-A > Received Message:0f9c2cee-77c7-4498-89a0-1dc5a2f65889
  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:ed5924a9-016b-469b-8203-7db63d06f812
  <<< SUBSCRIBE < Channel:KVStore-Channel-A > Received Message:ed5924a9-016b-469b-8203-7db63d06f812
  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:f1f84e0f-8f35-4362-9567-25716b1531cd
  <<< SUBSCRIBE < Channel:KVStore-Channel-A > Received Message:f1f84e0f-8f35-4362-9567-25716b1531cd
  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:746bde54-af8f-44d7-8a49-37d1a245d21b
  <<< SUBSCRIBE < Channel:KVStore-Channel-A > Received Message:746bde54-af8f-44d7-8a49-37d1a245d21b
  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  <<< SUBSCRIBE < Channel:KVStore-Channel-A > Received Message:8ac3b2b8-9906-4f61-8cad-84fc1f15a3ef
  >>> UNSUBSCRIBE > Channel:KVStore-Channel-A
----------SUBSCRIBE ends-------
  >>> PUBLISH > Channel:KVStore-Channel-A > Sent Message:Alibaba Cloud Message 2: (The subscription is canceled, so this message will not be received)
  >>> PUBLISH ends > Channel:KVStore-Channel-A > Message:quit

この例では、1 つのパブリッシャーと 1 つのサブスクライバーを使用しています。必要に応じて、複数のパブリッシャー、サブスクライバー、チャネルをサポートするようにコードを変更してください。