asg_lifcycle_watcher.py 8.7 KB
Newer Older
e0d committed
1
__author__ = 'e0d'
e0d committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15

"""
Retrieves AWS Auto-scaling lifecycle messages from and SQS queue and processes them.  For
the LifeCycleTransition type of autoscaling:EC2_INSTANCE_TERMINATING, ec2 instances are inspected
for an ok_to_retire tag.  If that tag exists, the termination state transition is continued, if not, the
lifecycle timeout is extended.

Because the lifecycle commands are not yet available in boto, these commands are, unfortunately,
run via a subprocess call to the awscli.  This should be fixed when boto is updated.

This script is meant to be run periodically via some process automation, say, Jenkins.

It relies on some component applying the proper tags and performing pre-retirement activities.

e0d committed
16
./sqs.py -q autoscaling-lifecycle-queue -b /home/you/.virtualenvs/aws/bin --hook MyLifeCycleHook
e0d committed
17 18 19 20
"""

import argparse
import boto
21 22
import boto.ec2
import boto.sqs
e0d committed
23 24 25 26
import json
import subprocess
from boto.sqs.message import RawMessage
import logging
27 28 29
import os
from distutils import spawn

30 31
class MissingHostError(Exception):
    pass
e0d committed
32 33 34

class LifecycleHandler:

e0d committed
35 36 37 38
    INSTANCE_TERMINATION = 'autoscaling:EC2_INSTANCE_TERMINATING'
    TEST_NOTIFICATION = 'autoscaling:TEST_NOTIFICATION'
    NUM_MESSAGES = 10
    WAIT_TIME_SECONDS = 10
e0d committed
39

40
    def __init__(self, profile, queue, hook, dry_run, bin_directory=None):
e0d committed
41 42
        logging.basicConfig(level=logging.INFO)
        self.queue = queue
e0d committed
43
        self.hook = hook
e0d committed
44
        self.profile = profile
45 46 47 48
        if bin_directory:
            os.environ["PATH"] = bin_directory + os.pathsep + os.environ["PATH"]
        self.aws_bin = spawn.find_executable('aws')
        self.python_bin = spawn.find_executable('python')
49
        self.region = os.environ.get('AWS_REGION','us-east-1')
50

51
        self.base_cli_command ="{python_bin} {aws_bin} ".format(
e0d committed
52
            python_bin=self.python_bin,
53 54 55 56 57
            aws_bin=self.aws_bin)
        if self.profile:
            self.base_cli_command += "--profile {profile} ".format(profile=self.profile)
        if self.region:
            self.base_cli_command += "--region {region} ".format(region=self.region)
e0d committed
58

e0d committed
59
        self.dry_run = dry_run
60 61
        self.ec2_con = boto.ec2.connect_to_region(self.region)
        self.sqs_con = boto.sqs.connect_to_region(self.region)
e0d committed
62 63

    def process_lifecycle_messages(self):
64
        queue = self.sqs_con.get_queue(self.queue)
e0d committed
65

e0d committed
66 67 68
        # Needed to get unencoded message for ease of processing
        queue.set_message_class(RawMessage)

e0d committed
69 70
        for sqs_message in queue.get_messages(LifecycleHandler.NUM_MESSAGES,
                                              wait_time_seconds=LifecycleHandler.WAIT_TIME_SECONDS):
e0d committed
71 72 73 74
            body = json.loads(sqs_message.get_body_encoded())
            as_message = json.loads(body['Message'])
            logging.info("Proccessing message {message}.".format(message=as_message))

e0d committed
75 76
            if 'LifecycleTransition' in as_message and as_message['LifecycleTransition'] \
                    == LifecycleHandler.INSTANCE_TERMINATION:
e0d committed
77 78 79 80 81 82
                # Convenience vars, set here to avoid messages that don't meet the criteria in
                # the if condition above.
                instance_id = as_message['EC2InstanceId']
                asg = as_message['AutoScalingGroupName']
                token = as_message['LifecycleActionToken']

83
                try:
e0d committed
84

85
                    if self.verify_ok_to_retire(as_message['EC2InstanceId']):
e0d committed
86

87 88 89 90 91
                        logging.info("Host is marked as OK to retire, retiring {instance}".format(
                            instance=instance_id))

                        self.continue_lifecycle(asg, token, self.hook)

e0d committed
92
                        self.delete_sqs_message(queue, sqs_message, as_message, self.dry_run)
e0d committed
93 94

                    else:
95 96
                        logging.info("Recording lifecycle heartbeat for instance {instance}".format(
                            instance=instance_id))
e0d committed
97

98 99 100 101 102
                        self.record_lifecycle_action_heartbeat(asg, token, self.hook)
                except MissingHostError as mhe:
                    logging.exception(mhe)
                    # There is nothing we can do to recover from this, so we
                    # still delete the message
e0d committed
103
                    self.delete_sqs_message(queue, sqs_message, as_message, self.dry_run)
e0d committed
104

105
            # These notifications are sent when configuring a new lifecycle hook, they can be
e0d committed
106
            # deleted safely
e0d committed
107
            elif as_message['Event'] == LifecycleHandler.TEST_NOTIFICATION:
e0d committed
108
                self.delete_sqs_message(queue, sqs_message, as_message, self.dry_run)
e0d committed
109 110 111
            else:
                raise NotImplemented("Encountered message, {message_id}, of unexpected type.".format(
                    message_id=as_message['MessageId']))
e0d committed
112

e0d committed
113 114
    def delete_sqs_message(self, queue, sqs_message, as_message, dry_run):
        if not dry_run:
115 116 117 118 119
            logging.info("Deleting message with body {message}".format(message=as_message))
            self.sqs_con.delete_message(queue, sqs_message)
        else:
            logging.info("Would have deleted message with body {message}".format(message=as_message))

e0d committed
120
    def record_lifecycle_action_heartbeat(self, asg, token, hook):
e0d committed
121

e0d committed
122
        command = self.base_cli_command + "autoscaling record-lifecycle-action-heartbeat " \
e0d committed
123
                  "--lifecycle-hook-name {hook} " \
e0d committed
124 125
                  "--auto-scaling-group-name {asg} " \
                  "--lifecycle-action-token {token}".format(
126
            hook=hook,asg=asg,token=token)
e0d committed
127

e0d committed
128
        self.run_subprocess_command(command, self.dry_run)
e0d committed
129

e0d committed
130
    def continue_lifecycle(self, asg, token, hook):
e0d committed
131
        command = self.base_cli_command + "autoscaling complete-lifecycle-action --lifecycle-hook-name {hook} " \
e0d committed
132 133
                  "--auto-scaling-group-name {asg} --lifecycle-action-token {token} --lifecycle-action-result " \
                  "CONTINUE".format(
134
                hook=hook, asg=asg, token=token)
e0d committed
135

e0d committed
136 137
        self.run_subprocess_command(command, self.dry_run)

e0d committed
138
    def run_subprocess_command(self, command, dry_run):
e0d committed
139

e0d committed
140
        message = "Running command {command}.".format(command=command)
e0d committed
141

e0d committed
142
        if not dry_run:
e0d committed
143
            logging.info(message)
e0d committed
144 145 146 147 148 149
            try:
                output = subprocess.check_output(command.split(' '))
                logging.info("Output was {output}".format(output=output))
            except Exception as e:
                logging.exception(e)
                raise  e
e0d committed
150 151
        else:
            logging.info("Dry run: {message}".format(message=message))
e0d committed
152

e0d committed
153
    def get_ec2_instance_by_id(self, instance_id):
e0d committed
154 155 156
        """
        Simple boto call to get the instance based on the instance-id
        """
157
        instances = self.ec2_con.get_only_instances([instance_id])
e0d committed
158 159

        if len(instances) == 1:
160
            return self.ec2_con.get_only_instances([instance_id])[0]
e0d committed
161 162 163
        else:
            return None

e0d committed
164
    def verify_ok_to_retire(self, instance_id):
e0d committed
165
        """
166
        Ensure that the safe_to_retire tag has been added to the instance in question
e0d committed
167 168
        with the value 'true'
        """
e0d committed
169
        instance = self.get_ec2_instance_by_id(instance_id)
e0d committed
170 171

        if instance:
e0d committed
172
            if 'safe_to_retire' in instance.tags and instance.tags['safe_to_retire'].lower() == 'true':
e0d committed
173
                logging.info("Instance with id {id} is safe to retire.".format(id=instance_id))
e0d committed
174
                return True
e0d committed
175
            else:
e0d committed
176
                logging.info("Instance with id {id} is not safe to retire.".format(id=instance_id))
e0d committed
177
                return False
e0d committed
178
        else:
e0d committed
179 180
            # No instance for id in SQS message this can happen if something else
            # has terminated the instances outside of this workflow
181 182 183
            message = "Instance with id {id} is referenced in an SQS message, but does not exist.".\
                format(id=instance_id)
            raise MissingHostError(message)
e0d committed
184 185 186 187 188 189

if __name__=="__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-p', '--profile',
                        help='The boto profile to use '
                             'per line.',default=None)
190
    parser.add_argument('-b', '--bin-directory', required=False, default=None,
e0d committed
191
                        help='The bin directory of the virtual env '
John Jarvis committed
192
                             'from which to run the AWS cli (optional)')
e0d committed
193 194 195
    parser.add_argument('-q', '--queue', required=True,
                        help="The SQS queue containing the lifecyle messages")

e0d committed
196 197 198
    parser.add_argument('--hook', required=True,
                        help="The lifecyle hook to act upon.")

e0d committed
199 200 201
    parser.add_argument('-d', "--dry-run", dest="dry_run", action="store_true",
                        help='Print the commands, but do not do anything')
    parser.set_defaults(dry_run=False)
e0d committed
202 203
    args = parser.parse_args()

204
    lh = LifecycleHandler(args.profile, args.queue, args.hook, args.dry_run, args.bin_directory)
e0d committed
205
    lh.process_lifecycle_messages()