全部產品
Search
文件中心

ApsaraMQ for RocketMQ:收發定時訊息

更新時間:Jul 01, 2024

本文提供使用TCP協議下的.NET SDK收發定時訊息的範例程式碼。目前支援的地區包括公網、華東1(杭州)、華北2(北京)、華東2(上海)、華南1(深圳)。

定時訊息可以做到在指定時間戳記之後才可被消費者消費,適用於對訊息生產和消費有時間視窗要求,或者利用訊息觸發定時任務的情境。

定時訊息的概念介紹及使用過程中的注意事項,請參見定時和延時訊息

前提條件

  • 下載.NET SDK。更多資訊,請參見版本說明

  • 環境準備。更多資訊,請參見環境準備

  • 建立資源。代碼中涉及的資源資訊,例如執行個體、Topic和Group ID等,需要在控制台上提前建立。更多資訊,請參見建立資源

  • 擷取阿里雲存取金鑰AccessKey ID和AccessKey Secret。更多資訊,請參見建立AccessKey

發送定時訊息

說明

具體的範例程式碼,請以雲訊息佇列 RocketMQ 版程式碼程式庫為準。

發送定時訊息的程式碼範例如下所示。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Runtime.InteropServices;
using ons;

namespace ons
{
    class onscsharp
    {
        private static readonly DateTime Jan1st1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);

        public static long CurrentTimeMillis()
        {
           return (long) (DateTime.UtcNow - Jan1st1970).TotalMilliseconds;
        }    
       
        static void Main(string[] args)
        {
            // producer建立和正常工作的參數,必須輸入。
            ONSFactoryProperty factoryInfo = new ONSFactoryProperty();
            // 您在訊息佇列RocketMQ版控制台建立的Group ID。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.ProducerId, "XXX ");
            // 設定TCP接入網域名稱,進入訊息佇列RocketMQ版控制台執行個體詳情頁面的存取點地區查看。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.NAMESRV_ADDR, "XXX");
            // 您在訊息佇列RocketMQ版控制台建立的Topic。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.PublishTopics, "XXX");
            // 訊息內容。
            factoryInfo.setFactoryProperty(ONSFactoryProperty.MsgContent, "XXX");
            //請確保環境變數ALIBABA_CLOUD_ACCESS_KEY_ID、ALIBABA_CLOUD_ACCESS_KEY_SECRET已設定。
            //AccessKey ID,阿里雲身分識別驗證標識。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::AccessKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
	          //AccessKey Secret,阿里雲身分識別驗證密鑰。
            factoryInfo.setFactoryProperty(ONSFactoryProperty::SecretKey, getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
            
            // 建立producer。
            Producer pProducer = ONSFactory.getInstance().createProducer(factoryInfo);

            // 在發送訊息前,必須調用start方法來啟動Producer,只需調用一次即可。
            pProducer.start();

            Message msg = new Message(
                // 訊息主題。
                factoryInfo.getPublishTopics(),
                // 訊息標籤。
                "TagA",
                // 訊息主體。
                factoryInfo.getMessageContent()
            );

            // 設定代表訊息的業務關鍵屬性,請儘可能全域唯一。
            // 以方便您在無法正常收到訊息情況下,可通過訊息佇列RocketMQ版控制台查詢訊息並補發。
            // 注意:不設定也不會影響訊息正常收發。
            msg.setKey("ORDERID_100");

            // deliver time,單位:ms。指定一個時刻,在這個時刻之後訊息才能被消費,這個例子表示3s後才能被消費。
            long deliverTime = CurrentTimeMillis() + 3000;
            msg.setStartDeliverTime(deliverTime);

            // 發送訊息,只要不拋出異常,就代表發送成功。
            try
            {
                SendResultONS sendResult = pProducer.send(msg);
            }
            catch(ONSClientException e)
            {
                // 發送失敗處理。
            }

            // 在應用退出前,必須銷毀Producer對象,否則會導致記憶體泄露等問題。
            pProducer.shutdown();

        }
    }
}           

訂閱定時訊息

定時訊息的訂閱與普通訊息訂閱一致,更多資訊,請參見訂閱訊息