Simple Message Queue (formerly MNS) membatasi ukuran setiap pesan hingga 64 KB. Untuk mengirim pesan yang lebih besar tanpa membaginya, simpan isi pesan tersebut di Object Storage Service (OSS) dan kirimkan referensi objeknya melalui antrian SMQ. Pola Claim-Check ini memungkinkan produsen dan konsumen bertukar muatan yang lebih besar sambil tetap mematuhi batasan ukuran antrian.
Cara kerja
Produsen memeriksa ukuran isi pesan. Jika melebihi 64 KB, produsen mengunggahnya sebagai objek ke OSS.
Produsen mengirimkan referensi objek OSS (bukan isi lengkapnya) ke antrian SMQ.
Konsumen membaca pesan dari antrian dan memeriksa apakah isinya merupakan referensi objek OSS.
Jika isi pesan merupakan referensi objek OSS, konsumen mengunduh objek tersebut dari OSS dan mengembalikan isi pesan lengkap ke aplikasi.
Diagram berikut mengilustrasikan proses ini.
Catatan penggunaan
Pesan berukuran besar terutama mengonsumsi lebar pita jaringan. Pastikan produsen dan konsumen memiliki bandwidth yang cukup untuk menangani ukuran pesan yang diharapkan.
Transfer pesan besar memakan waktu dan dapat terpengaruh oleh fluktuasi jaringan. Terapkan mekanisme retry dalam aplikasi Anda untuk menangani error sementara.
Prasyarat
Sebelum memulai, pastikan Anda telah:
Kode contoh
Contoh berikut menunjukkan penerapan pola Claim-Check dengan SMQ dan OSS. Unduh kode sumber lengkap dari LargeMessageDemo.java.
package com.aliyun.mns.sample.scenarios.largeMessage;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;
public class LargeMessageDemo {
private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
/**
* Dalam contoh ini, pesan yang ukurannya lebih besar dari 4 KB dikirim ke OSS.
*/
private final static Long payloadSizeThreshold = 4L;
public static void main(String[] args) throws ClientException {
// Ambil kredensial akses dari variabel lingkungan.
EnvironmentVariableCredentialsProvider credentialsProvider =
CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
// Buat client OSS.
OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);
// Buat client SMQ.
// Konfigurasikan ID AccessKey dan Rahasia AccessKey sebagai variabel lingkungan.
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);
// Buat ulang antrian dan topik untuk lingkungan demo yang bersih.
ReCreateUtil.reCreateQueue(client, MNS_QUEUE_NAME);
ReCreateUtil.reCreateTopic(client, MNS_TOPIC_NAME);
// Konfigurasikan client extended dengan pengaturan OSS dan SMQ.
MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
.setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
.setMNSQueue(queue)
.setMNSTopic(cloudTopic)
.setPayloadSizeThreshold(payloadSizeThreshold);
MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);
// Kirim dan terima pesan berukuran normal.
Message normalMessage = new Message();
normalMessage.setMessageBodyAsRawString("1");
mnsExtendedClient.sendMessage(normalMessage);
Message message = mnsExtendedClient.receiveMessage(10);
System.out.println("[normal]ReceiveMsg:" + message.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(message.getReceiptHandle());
// Kirim dan terima pesan berukuran besar.
String largeMsgBody = "largeMessage";
Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);
Message largeMessage = new Message();
largeMessage.setMessageBodyAsRawString(largeMsgBody);
mnsExtendedClient.sendMessage(largeMessage);
Message receiveMessage = mnsExtendedClient.receiveMessage(10);
System.out.println("[large]ReceiveMsg:" + receiveMessage.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());
client.close();
ossClient.shutdown();
}
}