すべてのプロダクト
Search
ドキュメントセンター

Elastic Compute Service:SMQ を使用してスポットインスタンスの割り込みイベントを検出し、対応する

最終更新日:Jul 07, 2025

スポットインスタンスは中断される可能性があります。ビジネスがインスタンスの中断の影響を受けやすい場合は、スポットインスタンスの中断を迅速に検出し、適切な方法で中断イベントに対応して、ビジネスの損失を最小限に抑える必要があります。このトピックでは、Simple Message Queue(SMQ)(旧称 Message Service(MNS))を使用してスポットインスタンスの中断イベントを検出し、対応する例を示します。

ワークフロー

image

準備

  1. AccessKey ペアを作成します。

    Resource Access Management (RAM) ユーザーの AccessKey ペアを作成します。Alibaba Cloud アカウントはリソースに対するすべての権限を持っています。Alibaba Cloud アカウントの AccessKey ペアが漏洩した場合、リソースはリスクにさらされます。RAM ユーザーの AccessKey ペアを使用することをお勧めします。AccessKey ペアの作成方法については、「AccessKey ペアを作成する」をご参照ください。

  2. RAM ユーザーに権限を付与します。

    RAM ユーザーに、SMQ 関連のリソースに対する操作を実行するための権限を付与します。このトピックで提供されているサンプルコードは、SMQ からメッセージを消費します。次の表に、付与できる SMQ の権限を示します。

    クラウドサービス

    ポリシー

    SMQ (旧称 MNS)

    AliyunMNSFullAccess

  3. アクセス認証情報とエンドポイントを構成します。

    このトピックのサンプルコードは、システム環境変数からアクセス認証情報とエンドポイントを読み取ります。

  4. SMQ SDK をインストールします。

    Java 用 SMQ SDK を取得します。この例では、Maven の依存関係を追加することで Java 用 SMQ SDK をインストールします。その他のインストール方法については、「Java 用 SMQ SDK をインストールする」をご参照ください。

    次のサンプルコードは、Maven の依存関係を追加する方法の例を示しています。

    <dependencies>
        <! -- Alibaba Cloud SMQ SDK --> 
        <dependency>
          <groupId>com.aliyun.mns</groupId>
          <artifactId>aliyun-sdk-mns</artifactId>
          <version>1.2.0</version>
        </dependency>
    </dependencies>

手順

  1. SMQ を作成します。

    CloudMonitor が送信するスポットインスタンスの中断通知を受信するための SMQ を作成します。

    1. SMQ コンソール にログインします。左側のナビゲーションウィンドウで、Queue Model > Queues を選択します。

    2. 上部のナビゲーションバーで、リージョンを選択します。Queues ページで、Create Queue をクリックします。

    3. Create Queue パネルで、画面の指示に従って必須パラメーターを構成し、OK をクリックします。

      image

  2. サブスクリプションポリシーを作成します。

    CloudMonitor は、スポットインスタンスの中断イベントをリアルタイムで監視します。イベントアラートが発生した場合、CloudMonitor はサブスクリプションポリシーで指定されたプッシュチャネルを介して中断通知をプッシュします。

    1. CloudMonitor コンソール にログインします。 左側のナビゲーションウィンドウで、[イベントセンター] > [イベントサブスクリプション] を選択します。

    2. [サブスクリプションポリシー] タブで、[サブスクリプションポリシーの作成] をクリックし、必須パラメーターを構成します。

      この例では、スポットインスタンスの中断イベントをサブスクライブするために使用される主要なパラメーターのみを説明します。ビジネス要件に基づいてパラメーターを指定できます。詳細については、「イベントサブスクリプションを管理する」をご参照ください。

      • [サブスクリプションタイプ]: [システムイベント] を選択します。

        image

      • [サブスクリプションスコープ]: 次の図に示すようにパラメーターを指定します。

        image

      • [プッシュと統合]: [チャネルを追加] をクリックします。表示されるダイアログボックスで、[チャネルを増やす] をクリックします。表示されるダイアログボックスで、手順 1 で作成した SMQ を選択し、画面の指示に従って他のパラメーターを構成します。プッシュチャネルについては、「プッシュチャネルを管理する」をご参照ください。

  3. 中断イベントをシミュレートします。

    スポットインスタンスの中断イベントはトリガーイベントです。スポットインスタンスの中断イベントハンドラを開発する場合、コードをデバッグすることはできません。[イベントサブスクリプションのデバッグ] を使用して、スポットインスタンスの中断イベントをシミュレートできます。

    1. [サブスクリプションポリシー] タブで、[イベントサブスクリプションのデバッグ] をクリックします。

    2. [イベントデバッグの作成] パネルで、[製品][Elastic Compute Service (ECS)] に、[名前][インスタンス: プリエンティブルインスタンスの中断] に設定します。

      システムは JSON 形式でデバッグコンテンツを自動的に生成します。JSON ファイルのリソース情報を、中断イベントをシミュレートするスポットインスタンスの情報に置き換えます。

      • Alibaba Cloud アカウント ID を Alibaba Cloud アカウントの ID に置き換えます。

      • <resource-id><instanceId> をスポットインスタンスの ID に置き換えます。

      • <region ID> をスポットインスタンスのリージョン ID に置き換えます。

        {
            "product": "ECS",
            "resourceId": "acs:ecs:cn-shanghai:<Alibaba Cloud account ID>UID:instance/<resource-id>", // Alibaba Cloud アカウント ID、resource-id を置き換える
            "level": "WARN",
            "instanceName": "instanceName",
            "regionId": "<Region ID>", // リージョン ID を置き換える
            "groupId": "0",
            "name": "Instance:PreemptibleInstanceInterruption",
            "content": {
                "instanceId": "<instanceId>", // instanceId を置き換える
                "instanceName": "wor***b73",
                "action": "delete"
            },
            "status": "Normal"
        }
    3. [OK] をクリックします。操作が成功したことを示すメッセージが表示されます。CloudMonitor は SMQ にアラート通知を自動的に送信します。

  4. メッセージをプルして対応します。

    中断イベントハンドラをシミュレートして、スポットインスタンスの中断通知メッセージを SMQ からプルします。ビジネス要件に基づいて、対応するビジネス処理ロジックを追加することもできます。次のサンプルコードは、グレースケール画像変換ハンドラを使用して中断イベントに対応および処理する方法の例を示しています。

    1. スレッドタスクを使用して、グレースケール画像変換ハンドラをシミュレートします。

      import com.aliyun.mns.client.CloudAccount;
      import com.aliyun.mns.client.CloudQueue;
      import com.aliyun.mns.client.MNSClient;
      import com.aliyun.mns.common.utils.ServiceSettings;
      import com.aliyun.mns.model.Message;
      import org.json.JSONObject;
      
      import javax.imageio.ImageIO;
      import java.awt.image.BufferedImage;
      import java.io.File;
      import java.util.Base64;
      import java.util.concurrent.atomic.AtomicBoolean;
      
      /**
       * グレースケール変換をサポートする中断可能な画像プロセッサの実装クラス。
       * アトミック変数とスレッド中断フラグに基づく中断検出メカニズム。
       * 特徴:
       * 1. チャンクごとに処理を実行して、進捗状況を自動的に保存します。
       * 2. スポットインスタンスの中断イベントに即座に対応します。
       * 3. スポットインスタンスの中断イベント発生後に部分的な結果を含むファイルを生成します。
       */
      public class InterruptibleImageProcessor implements Runnable {
          /**
           * スレッドセーフなステータス制御にアトミックブール値を使用します。
           */
          private final AtomicBoolean running = new AtomicBoolean(true);
          /**
           * 処理中の画像データを格納します。
           */
          private BufferedImage processedImage;
          /**
           * 処理の進捗状況(パーセント)。有効な値: 0 ~ 100。
           */
          private int progress;
          /**
           * スレッド実行エントリ。
           * **中断処理ロジック**:
           * 1. システムが中断例外をキャプチャした後、現在の進捗状況を保存します。
           * 2. 中断セマンティクスを保持するために、スレッドを中断が発生した状態に戻します。
           */
          @Override
          public void run() {
              try {
                  convertToGrayScale(new File("input.jpg"), new File("output.jpg"));
                  // スポットインスタンスの実行中に中断イベントをシミュレートします。
                  Thread.sleep(5000); 
                  System.out.println("画像処理が完了しました");
              } catch (InterruptedException e) {
                  System.out.println("処理が中断され、進捗状況は " + progress + "% に保存されました");
                  saveProgress(new File("partial_output.jpg"));
                  Thread.currentThread().interrupt(); // スレッドが中断された状態にスレッドを再開します。
              } catch (Exception e) {
                  System.err.println("エラー処理: " + e.getMessage());
              }
          }
      
          /**
           * 外部中断をトリガーするメソッド
           * **コラボレーションメカニズム**:
           * スレッド中断フラグに基づいて中断を二重チェックします。
           */
          public void stop() {
              running.set(false);
          }
      }
      
    2. グレースケール画像変換メソッド。

      /**
       * 入力した画像をグレースケール画像に変換して保存します。
       * @param inputFile 元の画像ファイルのオブジェクト。
       * @param outputFile 出力ファイルオブジェクト。
       * @throws Exception I/O 異常と中断例外が含まれます。
       *
       * **アルゴリズムの説明**:
       * 次の数式で赤、緑、青の光の知覚される明るさに基づいて設計された加重平均値と係数を使用して、画像をグレースケールに変換します。
       * Gray = 0.30*R + 0.59*G + 0.11*B
       * 参考文献: ITU-R 勧告 BT. 601。
       */
      public void convertToGrayScale(File inputFile, File outputFile) throws Exception {
          // ソース画像データを読み取ります。
          BufferedImage original = ImageIO.read(inputFile);
          int width = original.getWidth();
          int height = original.getHeight();
          // グレースケール画像バッファを作成します。
          processedImage = new BufferedImage(width, height, BufferedImage.TYPE_BYTE_GRAY);
      
          // チャンクごとに処理を実行して、進捗状況を保存します。
          for (int y = 0; y < height && running.get(); y++) {
              // 画像のピクセルごとの処理を実行します。
              for (int x = 0; x < width; x++) {
                  // 最初の中断検出で、スレッド中断フラグを確認します。
                  if (Thread.interrupted()) {
                      throw new InterruptedException("画像処理が中断されました");
                  }
                  /* グレースケール画像変換のコアアルゴリズム * /
                  int rgb = original.getRGB(x, y);
                  // ARGB 形式で RGB チャネルを分解します。
                  // 赤チャネル。
                  int r = (rgb >> 16) & 0xFF;
                  // 緑チャネル。
                  int g = (rgb >> 8) & 0xFF;
                  // 青チャネル。
                  int b = rgb & 0xFF;
                  // 加重平均法を使用してグレースケール値を計算します。
                  int gray = (int)(0.3 * r + 0.59 * g + 0.11 * b);
                  // グレースケール値を赤、緑、青のチャネルにコピーして RGB 値を再構築します。
                  processedImage.setRGB(x, y, (gray << 16) | (gray << 8) | gray);
                  // 進捗状況をパーセントで更新します。小数値ではなく整数値が返される整数除算の問題に注意してください。
                  progress = (y * width + x) * 100 / (width * height);
              }
              // チェックポイントメカニズムに基づいて、50 行ごとに処理した後、進捗状況を自動的に保存します。
              if (y % 50 == 0) {
                  saveProgress(outputFile);
              }
          }
          // 完全な結果を保存します。
          ImageIO.write(processedImage, "jpg", outputFile);
      }
    3. 画像処理の進捗状況が保存されます。

      /**
       * 処理の進捗状況を指定されたファイルに保存します。
       * @param outputFile 出力ファイルオブジェクト。
       *
       * **注**:
       * 1. 保存プロセスの中断を防ぐために、フェイルサイレントメカニズムが実装されています。
       * 2. partial_output.jpg という名前の一時ファイルが生成されます。
       */
      private void saveProgress(File outputFile) {
          try {
              // 最終的なファイルが上書きされるのを防ぐために、一時ファイル名を使用します。
              ImageIO.write(processedImage, "jpg", new File("partial_output.jpg"));
          } catch (Exception e) {
              System.err.println("自動保存に失敗しました: " + e.getMessage());
          }
      }
    4. 応答処理をテストします。

      テスト画像処理プログラムの実行中に、スポットインスタンスが中断および回収されるイベントを受信し、イベントに対応します。

      /**
       * メインテストメソッド
       * **テストシナリオ**:
       * 1. スレッドを処理します。
       * 2. 中断イベント通知メッセージをプルします。
       * 3. スレッドが終了するのを待ちます。
       */
      public static void main(String[] args) throws InterruptedException {
          // MNSClient アカウントを初期化します。
          CloudAccount account = new CloudAccount(
                  ServiceSettings.getMNSAccessKeyId(),
                  ServiceSettings.getMNSAccessKeySecret(),
                  ServiceSettings.getMNSAccountEndpoint());
          MNSClient client = account.getMNSClient();
          // スポットインスタンスで中断イベントが発生したかどうかを確認します。
          boolean isMatch = false;
          // 画像処理プログラムを開始します。
          InterruptibleImageProcessor processor = new InterruptibleImageProcessor();
          Thread processThread = new Thread(processor);
          processThread.start();
          try{
              // SMQ からメッセージを取得します。
              CloudQueue queue = client.getQueueRef("spot-interruption");
              Message popMsg = queue.popMessage();
              if (popMsg != null){
                  // デフォルトでは、メッセージ本文は Base64 でエンコードされています。
                  System.out.println("message body: " + popMsg.getMessageBodyAsRawString());
                  // Base64 デコードを実行します。
                  byte[] decodedBytes = Base64.getDecoder().decode(popMsg.getMessageBodyAsRawString());
                  String decodedString = new String(decodedBytes);
                  System.out.println("message content: " + decodedString);
                  // JSON 文字列を解析します。
                  JSONObject json = new JSONObject(decodedString);
                  // イベント名フィールドの値を取得します。
                  String name = json.getString("name");
                  isMatch = "Instance:PreemptibleInstanceInterruption".equals(name);
                  // スポットインスタンスの中断イベントに対応します。
                  if(isMatch){
                      System.out.println("スポットインスタンスが中断および回収されようとしています。");
                      // 画像処理プログラムを終了します。
                      processor.stop();
                      processThread.interrupt();
                      System.out.println("プログラムが終了しました");
                      processThread.join();
                      // メッセージを削除します。
                      queue.deleteMessage(popMsg.getReceiptHandle());
                  }
              }
          }catch (Exception e){
              System.out.println("不明な例外が発生しました!");
              e.printStackTrace();
          }
          client.close();
      }

    サンプルコード:

    /* グレースケール変換をサポートする中断可能な画像プロセッサの実装クラス。
     * アトミック変数とスレッド中断フラグに基づく中断検出メカニズム。
     * 特徴:
     * 1. チャンクごとに処理を実行して、進捗状況を自動的に保存します。
     * 2. スポットインスタンスの中断イベントに即座に対応します。
     * 3. スポットインスタンスの中断イベント発生後に部分的な結果を含むファイルを生成します。
     */
    // ... (既存のコードは変更なし) ...
    
説明
  • ビジネスロジックにスナップショットの作成が含まれる場合は、「CreateSnapshot」をご参照ください。

  • ビジネスロジックにカスタムイメージの作成が含まれる場合は、「CreateImage」をご参照ください。

参照

重要なデータまたは構成をスポットインスタンスに保存する場合は、スポットインスタンスのデータ復旧方法をよく理解し、データ損失を防ぐために必要な設定を事前に構成することをお勧めします。詳細については、「スポットインスタンスのデータを保持および復元する」をご参照ください。