All Products
Search
Document Center

Message Service:Send messages from one producer client to multiple consumer clients

Last Updated:Mar 20, 2024

This topic describes how to use MNS to send messages from one producer client to multiple consumer clients without exposing the endpoints of consumer clients.

Background information

MNS provides queue-based and topic-based message services. These two types of message services can meet your needs in most business scenarios. These two types of message services can meet your requirements in most business scenarios. The two message services have the following differences:
  • Queue-based message services: One client sends messages to an MNS queue. Multiple clients consume messages by pulling messages from the queue.
  • Topic-based message services: One client sends messages to an MNS topic. Messages are automatically pushed from the MNS server to multiple clients.

MNS allows clients to consume messages in real time by pushing messages from an MNS server to clients. In this scenario, the endpoints of consumer clients are exposed to receive the message. You cannot expose the endpoints of consumer clients in specific scenarios, for example, when you use private networks of an enterprise. In this case, the consumer clients can consume messages by pulling messages from the MNS server. You can combine MNS queues and topics to send messages from one client to multiple consumer clients without exposing the endpoints of the consumer clients.

Solution

Create a subscription to a topic with a specified queue endpoint so that messages can be pushed from the topic to the queue. Then, consumer clients can pull the messages from the queue. In this case, you can send messages from one client to multiple clients without exposing the endpoints of the consumer clients. The following figure shows the process.Message flow

API operations

Java SDK 1.1.8 provides the CloudPullTopic class to support the preceding solution. MNSClient provides the following API operations to create a CloudPullTopic object:

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
A CloudPullTopic object contains the following parameters:
  • TopicMeta: specifies the metadata of a topic.
  • QueueMeta: specifies the metadata of a queue.
  • queueNameList: specifies a list of queues to which the messages in a specified topic are pushed.
  • needCreateQueue: specifies whether to create the queues that are specified by the queueNameList parameter.
  • queueMetaTemplate: specifies the metadata template of a queue.

Sample code

CloudAccount account = new CloudAccount(
    ServiceSettings.getMNSAccessKeyId(),
    ServiceSettings.getMNSAccessKeySecret(),
    ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();

// Create a consumer list. 
Vector<String> consumerNameList = new Vector<String>();
String consumerName1 = "consumer001";
String consumerName2 = "consumer002";
String consumerName3 = "consumer003";
consumerNameList.add(consumerName1);
consumerNameList.add(consumerName2);
consumerNameList.add(consumerName3);
QueueMeta queueMetaTemplate = new QueueMeta();
queueMetaTemplate.setPollingWaitSeconds(30);

try{

    // Create a topic. 
    String topicName = "demo-topic-for-pull";
    TopicMeta topicMeta = new TopicMeta();
    topicMeta.setTopicName(topicName);
    CloudPullTopic pullTopic = client.createPullTopic(topicMeta, consumerNameList, true, queueMetaTemplate);

    // Publish a message to the topic. 
    String messageBody = "broadcast message to all the consumers:hello the world.";
    // If an original message is sent, getMessageBodyAsRawString is used to parse the message body. 
    TopicMessage tMessage = new RawTopicMessage();
    tMessage.setBaseMessageBody(messageBody);
    pullTopic.publishMessage(tMessage);


    // Receive the message. 
    CloudQueue queueForConsumer1 = client.getQueueRef(consumerName1);
    CloudQueue queueForConsumer2 = client.getQueueRef(consumerName2);
    CloudQueue queueForConsumer3 = client.getQueueRef(consumerName3);

    Message consumer1Msg = queueForConsumer1.popMessage(30);
    if(consumer1Msg != null)
    {
        System.out.println("consumer1 receive message:" + consumer1Msg.getMessageBodyAsRawString());
    } else{
        System.out.println("the queue is empty");
    }

    Message consumer2Msg = queueForConsumer2.popMessage(30);
    if(consumer2Msg != null)
    {
        System.out.println("consumer2 receive message:" + consumer2Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    Message consumer3Msg = queueForConsumer3.popMessage(30);
    if(consumer3Msg != null)
    {
        System.out.println("consumer3 receive message:" + consumer3Msg.getMessageBodyAsRawString());
    }else{
        System.out.println("the queue is empty");
    }

    // Delete the topic. 
    pullTopic.delete();
} catch(ClientException ce) {
    System.out.println("Something wrong with the network connection between client and MNS service."
            + "Please check your network and DNS availability.");
    ce.printStackTrace();
} catch(ServiceException se) {

    se.printStackTrace();
}

client.close();