本文通过实践案例为您介绍云监控如何通过消息服务MNS的队列实现自动化处理ECS主机状态变化事件。

前提条件

  • 请您确保已在消息服务MNS控制台,创建队列,例如:ecs-cms-event。

    关于如何创建队列,请参见创建队列

  • 请您确保已在云监控控制台,创建系统事件报警规则。

    关于如何创建队列,请参见创建系统事件报警规则

  • 请您确保已安装Python依赖。

    本文所有代码均以Python 3.6为例,您也可以使用其他编程语言,例如:Java和PHP。

    关于如何安装Python SDK,请参见Python SDK安装

背景信息

ECS在已有的系统事件基础上,通过云监控新发布了状态变化类事件和抢占型实例的中断通知事件。每当ECS主机的状态发生变化时,都会触发一条ECS状态变化事件。这种变化包括您通过控制台、OpenAPI或SDK操作导致的变化,也包括弹性伸缩或欠费等原因而自动触发的变化,还包括因为系统异常而触发的变化。

云监控提供四种事件报警处理方式,包括:消息服务队列、函数计算、URL回调和日志服务。本文以消息服务队列为例,为您介绍云监控自动化处理ECS主机状态变更事件的三种最佳实践。

操作步骤

云监控将ECS主机所有的状态变化事件投递到消息服务MNS,消息服务MNS获取消息并进行消息处理。

  • 实践一:对所有ECS主机的创建和释放事件进行记录。
    目前ECS控制台无法查询已经释放的实例。如果您有查询需求,可以通过ECS主机状态变化事件将所有ECS主机的生命周期记录在数据库或日志服务中。每当您创建ECS主机时,会发送一个Pending事件,每当您释放ECS主机时,会发送一个Deleted事件。
    1. 编辑一个Conf文件。
      Conf文件中需要包含消息服务MNS的endpoint、阿里云的access_keyaccess_key_secretregion_id(例如:cn-beijing)和queue_name
      说明 endpoint可以在消息服务MNS控制台的队列页面,单击获取Endpoint
      class Conf:
          endpoint = 'http://<id>.mns.<region>.aliyuncs.com/'
          access_key = '<access_key>'
          access_key_secret = '<access_key_secrect>'
          region_id = 'cn-beijing'
          queue_name = 'test'
          vsever_group_id = '<your_vserver_group_id>'
                                          
    2. 使用消息服务MNS的SDK编写一个MNS Client用来获取MNS消息。
      # -*- coding: utf-8 -*-
      import json
      from mns.mns_exception import MNSExceptionBase
      import logging
      from mns.account import Account
      from . import Conf
      
      
      class MNSClient(object):
          def __init__(self):
              self.account =  Account(Conf.endpoint, Conf.access_key, Conf.access_key_secret)
              self.queue_name = Conf.queue_name
              self.listeners = dict()
      
          def regist_listener(self, listener, eventname='Instance:StateChange'):
              if eventname in self.listeners.keys():
                  self.listeners.get(eventname).append(listener)
              else:
                  self.listeners[eventname] = [listener]
      
          def run(self):
              queue = self.account.get_queue(self.queue_name)
              while True:
                  try:
                      message = queue.receive_message(wait_seconds=5)
                      event = json.loads(message.message_body)
                      if event['name'] in self.listeners:
                          for listener in self.listeners.get(event['name']):
                              listener.process(event)
                      queue.delete_message(receipt_handle=message.receipt_handle)
                  except MNSExceptionBase as e:
                      if e.type == 'QueueNotExist':
                          logging.error('Queue %s not exist, please create queue before receive message.', self.queue_name)
                      else:
                          logging.error('No Message, continue waiting')
      
      
      class BasicListener(object):
          def process(self, event):
              pass
                                      

      上述代码只对MNS消息获取的数据,调用Listener消费消息之后删除消息。

    3. 注册一个指定Listener消费事件。这个简单的Listener判断收到Pending和Deleted事件时,打印一行日志。
       # -*- coding: utf-8 -*-
      import logging
      from .mns_client import BasicListener
      
      
      class ListenerLog(BasicListener):
          def process(self, event):
              state = event['content']['state']
              resource_id = event['content']['resourceId']
              if state == 'Panding':
                  logging.info(f'The instance {resource_id} state is {state}')
              elif state == 'Deleted':
                  logging.info(f'The instance {resource_id} state is {state}')
                                      
      Main函数写法如下:
      mns_client = MNSClient()
      
      mns_client.regist_listener(ListenerLog())
      
      mns_client.run()

      实际生产环境下,可能需要将事件存储在数据库或日志服务SLS中,方便后期的搜索和审计。

  • 实践二:ECS主机关机自动重启。

    在某些场景下,ECS主机会非预期的关机,您可能需要自动重启已经关机的ECS主机。

    为了实现ECS主机关机后自动重启,您可以复用实践一中的MNS Client,添加一个新的Listener。当您收到Stopped事件时,对该ECS主机执行Start命令。

    # -*- coding: utf-8 -*-
    import logging
    from aliyunsdkecs.request.v20140526 import StartInstanceRequest
    from aliyunsdkcore.client import AcsClient
    from .mns_client import BasicListener
    from .config import Conf
    
    
    class ECSClient(object):
        def __init__(self, acs_client):
            self.client = acs_client
    
        # 启动ECS主机
        def start_instance(self, instance_id):
            logging.info(f'Start instance {instance_id} ...')
            request = StartInstanceRequest.StartInstanceRequest()
            request.set_accept_format('json')
            request.set_InstanceId(instance_id)
            self.client.do_action_with_exception(request)
    
    
    class ListenerStart(BasicListener):
        def __init__(self):
            acs_client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id)
            self.ecs_client = ECSClient(acs_client)
    
        def process(self, event):
            detail = event['content']
            instance_id = detail['resourceId']
            if detail['state'] == 'Stopped':
                self.ecs_client.start_instance(instance_id)
                        

    在实际生产环境下,执行完Start命令后,可能还需要继续接收后续的Starting、Running或Stopped等事件,再配合计时器和计数器,进行成功或失败之后的处理。

  • 实践三:抢占型实例释放前,自动从负载均衡SLB移除。

    抢占型实例在释放之前五分钟左右,会发出释放告警事件,您可以在这短暂的时间运行业务不中断逻辑,例如:主动从负载均衡SLB的后端服务器中去掉这台即将被释放的抢占型实例,而非被动等待实例释放后负载均衡SLB的自动处理。

    您复用实践一的MNS Client,添加一个新的Listener,当收到抢占型实例的释放告警时,调用负载均衡SLB的SDK。

    # -*- coding: utf-8 -*-
    from aliyunsdkcore.client import AcsClient
    from aliyunsdkcore.request import CommonRequest
    from .mns_client import BasicListener
    from .config import Conf
    
    
    class SLBClient(object):
        def __init__(self):
            self.client = AcsClient(Conf.access_key, Conf.access_key_secret, Conf.region_id)
            self.request = CommonRequest()
            self.request.set_method('POST')
            self.request.set_accept_format('json')
            self.request.set_version('2014-05-15')
            self.request.set_domain('slb.aliyuncs.com')
            self.request.add_query_param('RegionId', Conf.region_id)
    
        def remove_vserver_group_backend_servers(self, vserver_group_id, instance_id):
            self.request.set_action_name('RemoveVServerGroupBackendServers')
            self.request.add_query_param('VServerGroupId', vserver_group_id)
            self.request.add_query_param('BackendServers',
                                         "[{'ServerId':'" + instance_id + "','Port':'80','Weight':'100'}]")
            response = self.client.do_action_with_exception(self.request)
            return str(response, encoding='utf-8')
    
    
    class ListenerSLB(BasicListener):
        def __init__(self, vsever_group_id):
            self.slb_caller = SLBClient()
            self.vsever_group_id = Conf.vsever_group_id
    
        def process(self, event):
            detail = event['content']
            instance_id = detail['instanceId']
            if detail['action'] == 'delete':
                self.slb_caller.remove_vserver_group_backend_servers(self.vsever_group_id, instance_id)
                        
    注意

    抢占型实例释放告警的event name与前面不同,应该是mns_client.regist_listener(ListenerSLB(Conf.vsever_group_id), 'Instance:PreemptibleInstanceInterruption')

    在实际生产环境下,您需要再申请一台新的抢占型实例,挂载到负载均衡SLB,来保证服务能力。