sqs.py 5.48 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
# Copyright 2013 John Jarvis <john@jarv.org>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.


import os
import sys
import time
import json
23
import socket
24
try:
25 26 27 28
    import boto
except ImportError:
    boto = None
else:
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
    import boto.sqs
    from boto.exception import NoAuthHandlerFound


class CallbackModule(object):
    """
    This Ansible callback plugin sends task events
    to SQS.

    The following vars must be set in the environment:
        ANSIBLE_ENABLE_SQS - enables the callback module
        SQS_REGION - AWS region to connect to
        SQS_MSG_PREFIX - Additional data that will be put
                         on the queue (optional)

    The following events are put on the queue
        - FAILURE events
        - OK events
        - TASK events
        - START events
    """
    def __init__(self):
51 52 53
        self.enable_sqs = 'ANSIBLE_ENABLE_SQS' in os.environ
        if not self.enable_sqs:
            return
54

55 56 57 58 59 60
        # make sure we got our imports
        if not boto:
            raise ImportError(
                "The sqs callback module requires the boto Python module, "
                "which is not installed or was not found."
            )
61

62
        self.start_time = time.time()
63

64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
        if not 'SQS_REGION' in os.environ:
            print 'ANSIBLE_ENABLE_SQS enabled but SQS_REGION ' \
                  'not defined in environment'
            sys.exit(1)
        self.region = os.environ['SQS_REGION']
        try:
            self.sqs = boto.sqs.connect_to_region(self.region)
        except NoAuthHandlerFound:
            print 'ANSIBLE_ENABLE_SQS enabled but cannot connect ' \
                  'to AWS due invalid credentials'
            sys.exit(1)
        if not 'SQS_NAME' in os.environ:
            print 'ANSIBLE_ENABLE_SQS enabled but SQS_NAME not ' \
                  'defined in environment'
            sys.exit(1)
        self.name = os.environ['SQS_NAME']
        self.queue = self.sqs.create_queue(self.name)
        if 'SQS_MSG_PREFIX' in os.environ:
            self.prefix = os.environ['SQS_MSG_PREFIX']
83
        else:
84 85 86
            self.prefix = ''

        self.last_seen_ts = {}
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

    def runner_on_failed(self, host, res, ignore_errors=False):
        if self.enable_sqs:
            if not ignore_errors:
                self._send_queue_message(res, 'FAILURE')

    def runner_on_ok(self, host, res):
        if self.enable_sqs:
            # don't send the setup results
            if res['invocation']['module_name'] != "setup":
                self._send_queue_message(res, 'OK')

    def playbook_on_task_start(self, name, is_conditional):
        if self.enable_sqs:
            self._send_queue_message(name, 'TASK')

    def playbook_on_play_start(self, pattern):
        if self.enable_sqs:
            self._send_queue_message(pattern, 'START')

    def playbook_on_stats(self, stats):
        if self.enable_sqs:
            d = {}
            delta = time.time() - self.start_time
            d['delta'] = delta
            for s in ['changed', 'failures', 'ok', 'processed', 'skipped']:
                d[s] = getattr(stats, s)
            self._send_queue_message(d, 'STATS')

    def _send_queue_message(self, msg, msg_type):
        if self.enable_sqs:
            from_start = time.time() - self.start_time
            payload = {msg_type: msg}
            payload['TS'] = from_start
            payload['PREFIX'] = self.prefix
            # update the last seen timestamp for
            # the message type
            self.last_seen_ts[msg_type] = time.time()
            if msg_type in ['OK', 'FAILURE']:
                # report the delta between the OK/FAILURE and
                # last TASK
                if 'TASK' in self.last_seen_ts:
                    from_task = \
                        self.last_seen_ts[msg_type] - self.last_seen_ts['TASK']
                    payload['delta'] = from_task
                for output in ['stderr', 'stdout']:
                    if output in payload[msg_type]:
                        # only keep the last 1000 characters
                        # of stderr and stdout
                        if len(payload[msg_type][output]) > 1000:
                            payload[msg_type][output] = "(clipping) ... " \
                                    + payload[msg_type][output][-1000:]
John Jarvis committed
139 140 141 142
                if 'stdout_lines' in payload[msg_type]:
                    # only keep the last 20 or so lines to avoid payload size errors
                    if len(payload[msg_type]['stdout_lines']) > 20:
                        payload[msg_type]['stdout_lines'] = ['(clipping) ... '] + payload[msg_type]['stdout_lines'][-20:]
143 144 145 146 147 148 149
            while True:
                try:
                    self.sqs.send_message(self.queue, json.dumps(payload))
                    break
                except socket.gaierror as e:
                    print 'socket.gaierror will retry: ' + e
                    time.sleep(1)
150 151
                except Exception as e:
                    raise e