Simple Message Queue (旧称:MNS) は、各メッセージを 64 KB に制限しています。メッセージを分割せずに、より大きなメッセージを送信するには、メッセージ本文を Object Storage Service (OSS) に保存し、オブジェクトリファレンスを SMQ キュー経由で渡します。この Claim-Check パターンにより、プロデューサーとコンシューマーは、キューのサイズ制約内に留まりながら、より大きなペイロードを交換できます。
仕組み
プロデューサーはメッセージ本文のサイズを確認します。本文が 64 KB を超える場合、プロデューサーはそれをオブジェクトとして OSS にアップロードします。
プロデューサーは、完全な本文ではなく、OSS オブジェクトリファレンスを SMQ キューに送信します。
コンシューマーはキューからメッセージを読み取り、本文が OSS オブジェクトリファレンスであるかどうかを確認します。
本文が OSS オブジェクトリファレンスである場合、コンシューマーは OSS からオブジェクトをダウンロードし、完全なメッセージ本文をアプリケーションに返します。
以下の図は、このプロセスを示しています。
注意事項
サイズ超過メッセージは、主にネットワーク帯域幅を消費します。プロデューサーとコンシューマーが、想定されるメッセージサイズに対して十分な帯域幅を確保していることを確認してください。
大きなメッセージの転送は時間がかかり、ネットワークジッターの影響を受ける可能性があります。一時的なエラーを処理するために、アプリケーションに再試行を実装してください。
前提条件
開始する前に、以下が準備できていることを確認してください:
SMQ SDK for Java がインストール済みであること
エンドポイントとアクセス認証情報 が設定済みであること
サンプルコード
次の例は、SMQ と OSS を使用した Claim-Check パターンを示しています。完全なソースは LargeMessageDemo.java からダウンロードしてください。
package com.aliyun.mns.sample.scenarios.largeMessage;
import com.aliyun.mns.client.CloudAccount;
import com.aliyun.mns.client.CloudQueue;
import com.aliyun.mns.client.CloudTopic;
import com.aliyun.mns.client.MNSClient;
import com.aliyun.mns.common.utils.ServiceSettings;
import com.aliyun.mns.model.Message;
import com.aliyun.mns.sample.scenarios.largeMessage.service.MNSExtendedClient;
import com.aliyun.mns.sample.scenarios.largeMessage.service.bean.MNSExtendedConfiguration;
import com.aliyun.mns.sample.scenarios.largeMessage.service.impl.MNSExtendedClientImpl;
import com.aliyun.mns.sample.utils.ReCreateUtil;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.common.auth.CredentialsProviderFactory;
import com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider;
import com.aliyuncs.exceptions.ClientException;
import org.junit.Assert;
public class LargeMessageDemo {
private final static String OSS_ENDPOINT = "oss-cn-XXX.aliyuncs.com";
private final static String OSS_BUCKET_NAME = "mns-test-XXXXX-bucket";
private final static String MNS_QUEUE_NAME = "test-largeMessage-queue";
private final static String MNS_TOPIC_NAME = "test-largeMessage-topic";
/**
* この例では、サイズが 4 KB を超えるメッセージが OSS に送信されます。
*/
private final static Long payloadSizeThreshold = 4L;
public static void main(String[] args) throws ClientException {
// 環境変数からアクセス認証情報を取得します。
EnvironmentVariableCredentialsProvider credentialsProvider =
CredentialsProviderFactory.newEnvironmentVariableCredentialsProvider();
// OSS クライアントを作成します。
OSS ossClient = new OSSClientBuilder().build(OSS_ENDPOINT, credentialsProvider);
// SMQ クライアントを作成します。
// AccessKey ID と AccessKey Secret を環境変数として設定します。
CloudAccount account = new CloudAccount(ServiceSettings.getMNSAccountEndpoint());
MNSClient client = account.getMNSClient();
CloudQueue queue = client.getQueueRef(MNS_QUEUE_NAME);
CloudTopic cloudTopic = client.getTopicRef(MNS_TOPIC_NAME);
// クリーンなデモ環境のために、キューと Topic を再作成します。
ReCreateUtil.reCreateQueue(client, MNS_QUEUE_NAME);
ReCreateUtil.reCreateTopic(client, MNS_TOPIC_NAME);
// 拡張クライアントに OSS と SMQ の設定を構成します。
MNSExtendedConfiguration configuration = new MNSExtendedConfiguration()
.setOssClient(ossClient).setOssBucketName(OSS_BUCKET_NAME)
.setMNSQueue(queue)
.setMNSTopic(cloudTopic)
.setPayloadSizeThreshold(payloadSizeThreshold);
MNSExtendedClient mnsExtendedClient = new MNSExtendedClientImpl(configuration);
// 通常サイズのメッセージを送受信します。
Message normalMessage = new Message();
normalMessage.setMessageBodyAsRawString("1");
mnsExtendedClient.sendMessage(normalMessage);
Message message = mnsExtendedClient.receiveMessage(10);
System.out.println("[normal]ReceiveMsg:" + message.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(message.getReceiptHandle());
// サイズ超過メッセージを送受信します。
String largeMsgBody = "largeMessage";
Assert.assertTrue(largeMsgBody.getBytes().length > payloadSizeThreshold);
Message largeMessage = new Message();
largeMessage.setMessageBodyAsRawString(largeMsgBody);
mnsExtendedClient.sendMessage(largeMessage);
Message receiveMessage = mnsExtendedClient.receiveMessage(10);
System.out.println("[large]ReceiveMsg:" + receiveMessage.getMessageBodyAsRawString());
mnsExtendedClient.deleteMessage(receiveMessage.getReceiptHandle());
client.close();
ossClient.shutdown();
}
}