asg_lifcycle_watcher.py 8.4 KB
Newer Older
e0d committed
1
__author__ = 'e0d'
e0d committed
2 3 4 5 6 7 8 9 10 11 12 13 14 15

"""
Retrieves AWS Auto-scaling lifecycle messages from and SQS queue and processes them.  For
the LifeCycleTransition type of autoscaling:EC2_INSTANCE_TERMINATING, ec2 instances are inspected
for an ok_to_retire tag.  If that tag exists, the termination state transition is continued, if not, the
lifecycle timeout is extended.

Because the lifecycle commands are not yet available in boto, these commands are, unfortunately,
run via a subprocess call to the awscli.  This should be fixed when boto is updated.

This script is meant to be run periodically via some process automation, say, Jenkins.

It relies on some component applying the proper tags and performing pre-retirement activities.

e0d committed
16
./sqs.py -q autoscaling-lifecycle-queue -b /home/you/.virtualenvs/aws/bin --hook MyLifeCycleHook
e0d committed
17 18 19 20 21 22 23 24
"""

import argparse
import boto
import json
import subprocess
from boto.sqs.message import RawMessage
import logging
25 26 27
import os
from distutils import spawn

28 29
class MissingHostError(Exception):
    pass
e0d committed
30 31 32

class LifecycleHandler:

e0d committed
33 34 35 36
    INSTANCE_TERMINATION = 'autoscaling:EC2_INSTANCE_TERMINATING'
    TEST_NOTIFICATION = 'autoscaling:TEST_NOTIFICATION'
    NUM_MESSAGES = 10
    WAIT_TIME_SECONDS = 10
e0d committed
37

38
    def __init__(self, profile, queue, hook, dry_run, bin_directory=None):
e0d committed
39 40
        logging.basicConfig(level=logging.INFO)
        self.queue = queue
e0d committed
41
        self.hook = hook
e0d committed
42
        self.profile = profile
43 44 45 46 47
        if bin_directory:
            os.environ["PATH"] = bin_directory + os.pathsep + os.environ["PATH"]
        self.aws_bin = spawn.find_executable('aws')
        self.python_bin = spawn.find_executable('python')

e0d committed
48 49 50 51 52
        self.base_cli_command ="{python_bin} {aws_bin} --profile {profile} ".format(
            python_bin=self.python_bin,
            aws_bin=self.aws_bin,
            profile=self.profile)

e0d committed
53
        self.dry_run = dry_run
54 55
        self.ec2_con = boto.connect_ec2()
        self.sqs_con = boto.connect_sqs()
e0d committed
56 57

    def process_lifecycle_messages(self):
58
        queue = self.sqs_con.get_queue(self.queue)
e0d committed
59

e0d committed
60 61 62
        # Needed to get unencoded message for ease of processing
        queue.set_message_class(RawMessage)

e0d committed
63 64
        for sqs_message in queue.get_messages(LifecycleHandler.NUM_MESSAGES,
                                              wait_time_seconds=LifecycleHandler.WAIT_TIME_SECONDS):
e0d committed
65 66 67 68
            body = json.loads(sqs_message.get_body_encoded())
            as_message = json.loads(body['Message'])
            logging.info("Proccessing message {message}.".format(message=as_message))

e0d committed
69 70
            if 'LifecycleTransition' in as_message and as_message['LifecycleTransition'] \
                    == LifecycleHandler.INSTANCE_TERMINATION:
e0d committed
71 72 73 74 75 76
                # Convenience vars, set here to avoid messages that don't meet the criteria in
                # the if condition above.
                instance_id = as_message['EC2InstanceId']
                asg = as_message['AutoScalingGroupName']
                token = as_message['LifecycleActionToken']

77
                try:
e0d committed
78

79
                    if self.verify_ok_to_retire(as_message['EC2InstanceId']):
e0d committed
80

81 82 83 84 85
                        logging.info("Host is marked as OK to retire, retiring {instance}".format(
                            instance=instance_id))

                        self.continue_lifecycle(asg, token, self.hook)

e0d committed
86
                        self.delete_sqs_message(queue, sqs_message, as_message, self.dry_run)
e0d committed
87 88

                    else:
89 90
                        logging.info("Recording lifecycle heartbeat for instance {instance}".format(
                            instance=instance_id))
e0d committed
91

92 93 94 95 96
                        self.record_lifecycle_action_heartbeat(asg, token, self.hook)
                except MissingHostError as mhe:
                    logging.exception(mhe)
                    # There is nothing we can do to recover from this, so we
                    # still delete the message
e0d committed
97
                    self.delete_sqs_message(queue, sqs_message, as_message, self.dry_run)
e0d committed
98

99
            # These notifications are sent when configuring a new lifecycle hook, they can be
e0d committed
100
            # deleted safely
e0d committed
101
            elif as_message['Event'] == LifecycleHandler.TEST_NOTIFICATION:
e0d committed
102
                self.delete_sqs_message(queue, sqs_message, as_message, self.dry_run)
e0d committed
103 104 105
            else:
                raise NotImplemented("Encountered message, {message_id}, of unexpected type.".format(
                    message_id=as_message['MessageId']))
e0d committed
106

e0d committed
107 108
    def delete_sqs_message(self, queue, sqs_message, as_message, dry_run):
        if not dry_run:
109 110 111 112 113
            logging.info("Deleting message with body {message}".format(message=as_message))
            self.sqs_con.delete_message(queue, sqs_message)
        else:
            logging.info("Would have deleted message with body {message}".format(message=as_message))

e0d committed
114
    def record_lifecycle_action_heartbeat(self, asg, token, hook):
e0d committed
115

e0d committed
116
        command = self.base_cli_command + "autoscaling record-lifecycle-action-heartbeat " \
e0d committed
117
                  "--lifecycle-hook-name {hook} " \
e0d committed
118 119
                  "--auto-scaling-group-name {asg} " \
                  "--lifecycle-action-token {token}".format(
120
            hook=hook,asg=asg,token=token)
e0d committed
121

e0d committed
122
        self.run_subprocess_command(command, self.dry_run)
e0d committed
123

e0d committed
124
    def continue_lifecycle(self, asg, token, hook):
e0d committed
125
        command = self.base_cli_command + "autoscaling complete-lifecycle-action --lifecycle-hook-name {hook} " \
e0d committed
126 127
                  "--auto-scaling-group-name {asg} --lifecycle-action-token {token} --lifecycle-action-result " \
                  "CONTINUE".format(
128
                hook=hook, asg=asg, token=token)
e0d committed
129

e0d committed
130 131
        self.run_subprocess_command(command, self.dry_run)

e0d committed
132
    def run_subprocess_command(self, command, dry_run):
e0d committed
133

e0d committed
134
        message = "Running command {command}.".format(command=command)
e0d committed
135

e0d committed
136
        if not dry_run:
e0d committed
137
            logging.info(message)
e0d committed
138 139 140 141 142 143
            try:
                output = subprocess.check_output(command.split(' '))
                logging.info("Output was {output}".format(output=output))
            except Exception as e:
                logging.exception(e)
                raise  e
e0d committed
144 145
        else:
            logging.info("Dry run: {message}".format(message=message))
e0d committed
146

e0d committed
147
    def get_ec2_instance_by_id(self, instance_id):
e0d committed
148 149 150
        """
        Simple boto call to get the instance based on the instance-id
        """
151
        instances = self.ec2_con.get_only_instances([instance_id])
e0d committed
152 153

        if len(instances) == 1:
154
            return self.ec2_con.get_only_instances([instance_id])[0]
e0d committed
155 156 157
        else:
            return None

e0d committed
158
    def verify_ok_to_retire(self, instance_id):
e0d committed
159 160 161 162
        """
        Ensure that the ok_to_retire tag has been added to the instance in question
        with the value 'true'
        """
e0d committed
163
        instance = self.get_ec2_instance_by_id(instance_id)
e0d committed
164 165

        if instance:
e0d committed
166
            if 'safe_to_retire' in instance.tags and instance.tags['safe_to_retire'].lower() == 'true':
e0d committed
167
                logging.info("Instance with id {id} is safe to retire.".format(id=instance_id))
e0d committed
168
                return True
e0d committed
169
            else:
e0d committed
170
                logging.info("Instance with id {id} is not safe to retire.".format(id=instance_id))
e0d committed
171
                return False
e0d committed
172
        else:
e0d committed
173 174
            # No instance for id in SQS message this can happen if something else
            # has terminated the instances outside of this workflow
175 176 177
            message = "Instance with id {id} is referenced in an SQS message, but does not exist.".\
                format(id=instance_id)
            raise MissingHostError(message)
e0d committed
178 179 180 181 182 183

if __name__=="__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-p', '--profile',
                        help='The boto profile to use '
                             'per line.',default=None)
184
    parser.add_argument('-b', '--bin-directory', required=False, default=None,
e0d committed
185
                        help='The bin directory of the virtual env '
John Jarvis committed
186
                             'from which to run the AWS cli (optional)')
e0d committed
187 188 189
    parser.add_argument('-q', '--queue', required=True,
                        help="The SQS queue containing the lifecyle messages")

e0d committed
190 191 192
    parser.add_argument('--hook', required=True,
                        help="The lifecyle hook to act upon.")

e0d committed
193 194 195
    parser.add_argument('-d', "--dry-run", dest="dry_run", action="store_true",
                        help='Print the commands, but do not do anything')
    parser.set_defaults(dry_run=False)
e0d committed
196 197
    args = parser.parse_args()

198
    lh = LifecycleHandler(args.profile, args.queue, args.hook, args.dry_run, args.bin_directory)
e0d committed
199
    lh.process_lifecycle_messages()