Commit b605265b by Gabe Mulley

Support more complex routing with nested backends

parent bfdfcf48
[pep8]
max-line-length=119
max-line-length=120
......@@ -19,7 +19,7 @@ test.setup:
test: test.unit test.integration test.performance
test.unit: test.setup
nosetests --cover-erase --with-coverage -A 'not integration and not performance' --cover-min-percentage=95
nosetests --cover-erase --with-coverage --cover-branches -A 'not integration and not performance' --cover-min-percentage=95
test.integration: test.setup
nosetests --verbose --nocapture -a 'integration'
......@@ -43,3 +43,8 @@ doc: doc.html
doc.html:
$(MAKE_DOC) html
report:
pep8 eventtracking >pep8.report || true
pylint -f parseable eventtracking >pylint.report || true
coverage xml -o coverage.xml
......@@ -18,7 +18,7 @@ import os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.insert(0, os.path.abspath('.'))
sys.path.insert(0, os.path.abspath('..'))
# -- General configuration ------------------------------------------------
......
......@@ -14,3 +14,31 @@ eventtracking.backends.mongodb
:members:
:undoc-members:
:show-inheritance:
eventtracking.backends.logger
-----------------------------
.. automodule:: eventtracking.backends.logger
:members:
:undoc-members:
:show-inheritance:
eventtracking.backends.routing
------------------------------
.. automodule:: eventtracking.backends.routing
:members:
:undoc-members:
:show-inheritance:
eventtracking.backends.segment
------------------------------
.. automodule:: eventtracking.backends.segment
:members:
:undoc-members:
:show-inheritance:
......@@ -5,12 +5,3 @@ eventtracking.django
:members:
:undoc-members:
:show-inheritance:
eventtracking.django.middleware
-------------------------------
.. automodule:: eventtracking.django.middleware
:members:
:undoc-members:
:show-inheritance:
eventtracking.processors
========================
.. automodule:: eventtracking.processors
:members:
:undoc-members:
:show-inheritance:
eventtracking.processors.whitelist
----------------------------------
.. automodule:: eventtracking.processors.whitelist
:members:
:undoc-members:
:show-inheritance:
eventtracking.processors.exceptions
-----------------------------------
.. automodule:: eventtracking.processors.exceptions
:members:
:undoc-members:
:show-inheritance:
......@@ -15,6 +15,7 @@ eventtracking
eventtracking.backends
eventtracking.django
eventtracking.processors
eventtracking.tracker
......
"""Route events to processors and backends"""
from collections import OrderedDict
import logging
from eventtracking.processors.exceptions import EventEmissionExit
LOG = logging.getLogger(__name__)
class RoutingBackend(object):
"""
Route events to the appropriate backends.
A routing backend has two types of components:
1) Processors - These are run sequentially, processing the output of the previous processor. If you had three
processors [a, b, c], the output of the processing step would be `c(b(a(event)))`. Note that for performance
reasons, the processor is able to actually mutate the event dictionary in-place. Event dictionaries may be large
and highly nested, so creating multiple copies could be problematic. A processor can also choose to prevent the
event from being emitted by raising `EventEmissionExit`. Doing so will prevent any subsequent processors from
running and prevent the event from being sent to the backends. Any other exception raised by a processor will be
logged and swallowed, subsequent processors will execute and the event will be emitted.
2) Backends - Backends are intended to not mutate the event and each receive the same event data. They are not
chained like processors. Once an event has been processed by the processor chain, it is passed to each backend in
order, sorted by the name of the backend. Backends typically persist the event in some way, either by sending it
to an external system or saving it to disk. They are called synchronously and in sequence, so a long running
backend will block other backends until it is done persisting the event. Note that you can register another
`RoutingBackend` as a backend of a `RoutingBackend`, allowing for arbitrary processing trees.
`backends` is a collection that supports iteration over it's items using `iteritems()`. The keys are expected to be
sortable and the values are expected to expose a `send(event)` method that will be called for each event.
`processors` is an iterable of callables.
Raises a `ValueError` if any of the provided backends do not have a callable "send" attribute or any of the
processors are not callable.
"""
def __init__(self, backends=None, processors=None):
self.backends = OrderedDict()
self.processors = []
if backends is not None:
for name in sorted(backends.keys()):
self.register_backend(name, backends[name])
if processors is not None:
for processor in processors:
self.register_processor(processor)
def register_backend(self, name, backend):
"""
Register a new backend that will be called for each processed event
Note that backends are called in the order that they are registered.
"""
if not hasattr(backend, 'send') or not callable(backend.send):
raise ValueError('Backend %s does not have a callable "send" method.' % backend.__class__.__name__)
else:
self.backends[name] = backend
def register_processor(self, processor):
"""
Register a new processor.
Note that processors are called in the order that they are registered.
"""
if not callable(processor):
raise ValueError('Processor %s is not callable.' % processor.__class__.__name__)
else:
self.processors.append(processor)
def send(self, event):
"""
Process the event using all registered processors and send it to all registered backends.
Logs and swallows all `Exception`.
"""
try:
processed_event = self.process_event(event)
except EventEmissionExit:
return
else:
self.send_to_backends(processed_event)
def process_event(self, event):
"""
Executes all event processors on the event in order.
`event` is a nested dictionary that represents the event.
Logs and swallows all `Exception` except `EventEmissionExit` which is re-raised if it is raised by a processor.
Returns the modified event.
"""
if len(self.processors) == 0:
return event
processed_event = event
for processor in self.processors:
try:
modified_event = processor(processed_event)
if modified_event is not None:
processed_event = modified_event
except EventEmissionExit:
raise
except Exception: # pylint: disable=broad-except
LOG.exception(
'Failed to execute processor: %s', str(processor)
)
return processed_event
def send_to_backends(self, event):
"""
Sends the event to all registered backends.
Logs and swallows all `Exception`.
"""
for name, backend in self.backends.iteritems():
try:
backend.send(event)
except Exception: # pylint: disable=broad-except
LOG.exception(
'Unable to send event to backend: %s', name
)
"""Event tracking backend that sends events to segment.com"""
from __future__ import absolute_import
try:
import analytics
except ImportError:
analytics = None
class SegmentBackend(object):
"""
Send events to segment.com
It is assumed that other code elsewhere initializes the segment.com API and makes calls to analytics.identify.
Requires all emitted events to have the following structure (at a minimum)::
{
'name': 'something',
'context': {
'user_id': 10,
}
}
Additionally, the following fields can optionally be defined::
{
'context': {
'client_id': "your google analytics client id",
}
}
Note that although some parts of the event are lifted out to pass explicitly into the Segment.com API, the entire
event is sent as the payload to segment.com, which includes all context, data and other fields in the event.
"""
def send(self, event):
"""Use the segment.com python API to send the event to segment.com"""
if analytics is None:
return
context = event.get('context', {})
user_id = context.get('user_id')
name = event.get('name')
if name is None or user_id is None:
return
segment_context = {}
ga_client_id = context.get('client_id')
if ga_client_id is not None:
segment_context['Google Analytics'] = {
'clientId': ga_client_id
}
analytics.track(
user_id,
name,
event,
context=segment_context
)
......@@ -53,7 +53,7 @@ class PerformanceTestCase(TestCase):
self.num_events = int(os.getenv('EVENT_TRACKING_PERF_EVENTS', 20000))
self.payload_size = int(os.getenv('EVENT_TRACKING_PERF_PAYLOAD_SIZE', 600))
self.random_payload = ''.join(random.choice(string.ascii_letters) for _ in range(self.payload_size))
self.threshold = float(os.getenv('EVENT_TRACKING_PERF_THRESHOLD_SECONDS', 1))
self.threshold = float(os.getenv('EVENT_TRACKING_PERF_THRESHOLD_SECONDS', 1.75))
@contextmanager
def assert_execution_time_less_than_threshold(self):
......
"""Test the routing backend"""
from __future__ import absolute_import
from unittest import TestCase
from mock import MagicMock
from mock import sentinel
from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.backends.routing import RoutingBackend
class TestRoutingBackend(TestCase):
"""Test the routing backend"""
def setUp(self):
self.sample_event = {'name': sentinel.name}
self.mock_backend = MagicMock()
self.router = RoutingBackend(backends={'0': self.mock_backend})
def test_non_callable_backend(self):
with self.assertRaisesRegexp(ValueError, r'Backend \w+ does not have a callable "send" method.'):
RoutingBackend(backends={
'a': 'b'
})
def test_backend_without_send(self):
with self.assertRaisesRegexp(ValueError, r'Backend \w+ does not have a callable "send" method.'):
RoutingBackend(backends={
'a': object()
})
def test_non_callable_processor(self):
with self.assertRaisesRegexp(ValueError, r'Processor \w+ is not callable.'):
RoutingBackend(processors=[
object()
])
def test_non_callable_processor_simple_type(self):
with self.assertRaisesRegexp(ValueError, r'Processor \w+ is not callable.'):
RoutingBackend(processors=[
'b'
])
def test_single_processor(self):
mock_processor = MagicMock()
router = RoutingBackend(processors=[
mock_processor
])
router.send(self.sample_event)
mock_processor.assert_called_once_with(self.sample_event)
def test_single_backend(self):
mock_backend = MagicMock()
router = RoutingBackend(backends={
'mock0': mock_backend
})
router.send(self.sample_event)
mock_backend.send.assert_called_once_with(self.sample_event)
def test_multiple_backends(self):
backends = {
str(i): MagicMock()
for i in range(5)
}
router = RoutingBackend(backends=backends)
router.send(self.sample_event)
for backend in backends.values():
backend.send.assert_called_once_with(self.sample_event)
def test_backend_failure(self):
backends = {
str(i): MagicMock()
for i in range(5)
}
backends['1'].send.side_effect = RuntimeError
router = RoutingBackend(backends=backends)
router.send(self.sample_event)
for backend in backends.itervalues():
backend.send.assert_called_once_with(self.sample_event)
def test_multiple_processors(self):
processors = [
MagicMock()
for __ in range(5)
]
for processor in processors:
processor.return_value = self.sample_event
router = RoutingBackend(processors=processors)
router.send(self.sample_event)
for processor in processors:
processor.assert_called_once_with(self.sample_event)
def test_multiple_backends_and_processors(self):
backends = {
str(i): MagicMock()
for i in range(5)
}
processors = [
MagicMock()
for __ in range(5)
]
for processor in processors:
processor.return_value = self.sample_event
router = RoutingBackend(backends=backends, processors=processors)
router.send(self.sample_event)
for processor in processors:
processor.assert_called_once_with(self.sample_event)
for backend in backends.values():
backend.send.assert_called_once_with(self.sample_event)
def test_callable_class_processor(self):
class SampleProcessor(object):
"""An event processing class"""
def __call__(self, event):
"""Modify the event type"""
event['name'] = sentinel.changed_name
self.router.register_processor(SampleProcessor())
self.router.send(self.sample_event)
self.assert_single_event_emitted({'name': sentinel.changed_name})
def assert_single_event_emitted(self, event):
"""Assert that the mock backend is called exactly once with the provided event"""
self.mock_backend.send.assert_called_once_with(event)
def test_function_processor(self):
def change_name(event):
"""Modify the event type of the event"""
event['name'] = sentinel.changed_name
return event
self.router.register_processor(change_name)
self.router.send(self.sample_event)
self.assert_single_event_emitted({'name': sentinel.changed_name})
def test_processor_chain(self):
def change_name(event):
"""Modify the event type of the event"""
event['name'] = sentinel.changed_name
return event
def inject_fields(event):
"""Add a couple fields to the event"""
event['other'] = sentinel.other
event['to_remove'] = sentinel.to_remove
return event
def remove_field(event):
"""Remove a field to the event"""
self.assertEquals(event['to_remove'], sentinel.to_remove)
del event['to_remove']
return event
def ensure_modified_event(event):
"""Assert the first processor added a field to the event"""
self.assertEquals(event['name'], sentinel.changed_name)
self.assertEquals(event['other'], sentinel.other)
return event
self.router.register_processor(change_name)
self.router.register_processor(inject_fields)
self.router.register_processor(remove_field)
self.router.register_processor(ensure_modified_event)
self.router.send(self.sample_event)
self.assert_single_event_emitted(
{
'name': sentinel.changed_name,
'other': sentinel.other
}
)
def test_processor_failure(self):
def always_fail(event): # pylint: disable=unused-argument
"""Always raises an error"""
raise ValueError
def change_name(event):
"""Modify the event type of the event"""
event['name'] = sentinel.changed_name
return event
self.router.register_processor(always_fail)
self.router.register_processor(change_name)
self.router.send(self.sample_event)
self.assert_single_event_emitted({'name': sentinel.changed_name})
def test_processor_returns_none(self):
def return_none(event): # pylint: disable=unused-argument
"""Don't return the event"""
pass
self.router.register_processor(return_none)
self.router.send(self.sample_event)
self.assert_single_event_emitted(self.sample_event)
def test_processor_modifies_the_same_event_object(self):
def forget_return(event):
"""Modify the event without returning it"""
event['name'] = sentinel.forgotten_return
def ensure_name_changed(event):
"""Assert the event type has been modified even though the event wasn't returned"""
self.assertEquals(event['name'], sentinel.forgotten_return)
self.router.register_processor(forget_return)
self.router.register_processor(ensure_name_changed)
self.router.send(self.sample_event)
self.assert_single_event_emitted({'name': sentinel.forgotten_return})
def test_processor_abort(self):
def abort_processing(event): # pylint: disable=unused-argument
"""Always abort processing"""
raise EventEmissionExit
def fail_if_called(event): # pylint: disable=unused-argument
"""Fail the test immediately if this is called"""
self.fail('This processor should never be called')
self.router.register_processor(abort_processing)
self.router.register_processor(fail_if_called)
self.router.send(self.sample_event)
self.assertEqual(len(self.mock_backend.mock_calls), 0)
def test_nested_routing_with_abort(self):
mock_abort_processing = MagicMock()
mock_abort_processing.side_effect = EventEmissionExit
left_backend = MagicMock()
left_router = RoutingBackend(backends={'0': left_backend}, processors=[mock_abort_processing])
right_backend = MagicMock()
right_router = RoutingBackend(backends={'0': right_backend})
root_router = RoutingBackend(backends={
'left': left_router,
'right': right_router,
})
root_router.send(self.sample_event)
right_backend.send.assert_called_once_with(self.sample_event)
self.assertEqual(len(left_backend.mock_calls), 0)
mock_abort_processing.assert_called_once_with(self.sample_event)
def test_backend_call_order(self):
class OrderRecordingBackend(object):
"""Keep track of the order that the backends are called in"""
def __init__(self, name, call_order):
self._name = name
self._order = call_order
def send(self, event): # pylint: disable=unused-argument
"""Do nothing except record that this was called"""
self._order.append(self._name)
call_order = []
backends = {
str(i): OrderRecordingBackend(str(i), call_order)
for i in range(5)
}
router = RoutingBackend(backends=backends)
router.send(self.sample_event)
self.assertEqual(call_order, ['0', '1', '2', '3', '4'])
"""Test the segment.com backend"""
from __future__ import absolute_import
from unittest import TestCase
from mock import patch
from mock import sentinel
from eventtracking.backends.segment import SegmentBackend
class TestSegmentBackend(TestCase):
"""Test the segment.com backend"""
def setUp(self):
patcher = patch('eventtracking.backends.segment.analytics')
self.addCleanup(patcher.stop)
self.mock_analytics = patcher.start()
self.backend = SegmentBackend()
def test_simple_emit(self):
event = {
'name': sentinel.name,
'context': {
'user_id': sentinel.user_id
},
'data': {
'foo': sentinel.bar
}
}
self.backend.send(event)
self.mock_analytics.track.assert_called_once_with(sentinel.user_id, sentinel.name, event, context={})
def test_missing_name(self):
event = {}
self.backend.send(event)
self.assert_no_event_emitted()
def assert_no_event_emitted(self):
"""Ensure no event was actually sent to segment.com"""
self.assertEqual(len(self.mock_analytics.mock_calls), 0)
def test_missing_context(self):
event = {
'name': sentinel.name,
}
self.backend.send(event)
self.assert_no_event_emitted()
def test_missing_user_id(self):
event = {
'name': sentinel.name,
'context': {}
}
self.backend.send(event)
self.assert_no_event_emitted()
def test_google_analytics_client_id(self):
event = {
'name': sentinel.name,
'context': {
'user_id': sentinel.user_id,
'client_id': sentinel.client_id
}
}
expected_segment_context = {
'Google Analytics': {
'clientId': sentinel.client_id
}
}
self.backend.send(event)
self.mock_analytics.track.assert_called_once_with(
sentinel.user_id, sentinel.name, event, context=expected_segment_context)
class TestSegmentBackendMissingDependency(TestCase):
"""Test the segment.com backend without the package installed"""
def test_no_analytics_api(self):
event = {
'name': sentinel.name,
'context': {
'user_id': sentinel.user_id
},
'data': {
'foo': sentinel.bar
}
}
backend = SegmentBackend()
backend.send(event)
......@@ -51,16 +51,69 @@ class DjangoTracker(Tracker):
"""
config = getattr(settings, DJANGO_BACKEND_SETTING_NAME, {})
backends = {}
for name, values in config.iteritems():
# Ignore empty values to turn-off default tracker backends
if values and 'ENGINE' in values:
backend = self.instantiate_from_dict(values)
backends[name] = backend
backends = self.instantiate_objects(config)
return backends
def instantiate_objects(self, node):
"""
Recursively traverse a structure to identify dictionaries that represent objects that need to be instantiated
Traverse all values of all dictionaries and all elements of all lists to identify dictionaries that contain the
special "ENGINE" key which indicates that a class of that type should be instantiated and passed all key-value
pairs found in the sibling "OPTIONS" dictionary as keyword arguments.
For example::
tree = {
'a': {
'b': {
'first_obj': {
'ENGINE': 'mypackage.mymodule.Clazz',
'OPTIONS': {
'size': 10,
'foo': 'bar'
}
}
},
'c': [
{
'ENGINE': 'mypackage.mymodule.Clazz2',
'OPTIONS': {
'more_objects': {
'd': {'ENGINE': 'mypackage.foo.Bar'}
}
}
}
]
}
}
root = self.instantiate_objects(tree)
That structure of dicts, lists, and strings will end up with (this example assumes that all keyword arguments to
constructors were saved as attributes of the same name):
assert type(root['a']['b']['first_obj']) == <type 'mypackage.mymodule.Clazz'>
assert root['a']['b']['first_obj'].size == 10
assert root['a']['b']['first_obj'].foo == 'bar'
assert type(root['a']['c'][0]) == <type 'mypackage.mymodule.Clazz2'>
assert type(root['a']['c'][0].more_objects['d']) == <type 'mypackage.foo.Bar'>
"""
result = node
if isinstance(node, dict):
if 'ENGINE' in node:
result = self.instantiate_from_dict(node)
else:
result = {}
for key, value in node.iteritems():
result[key] = self.instantiate_objects(value)
elif isinstance(node, list):
result = []
for child in node:
result.append(self.instantiate_objects(child))
return result
def instantiate_from_dict(self, values):
"""
Constructs an object given a dictionary containing an "ENGINE" key
......@@ -84,6 +137,8 @@ class DjangoTracker(Tracker):
except (ValueError, AttributeError, TypeError, ImportError):
raise ValueError('Cannot find class %s' % name)
options = self.instantiate_objects(options)
return cls(**options)
def create_processors_from_settings(self):
......@@ -107,11 +162,7 @@ class DjangoTracker(Tracker):
"""
config = getattr(settings, DJANGO_PROCESSOR_SETTING_NAME, [])
processors = []
for values in config:
# Ignore empty values to turn-off default tracker backends
if values and 'ENGINE' in values:
processors.append(self.instantiate_from_dict(values))
processors = self.instantiate_objects(config)
return processors
......
......@@ -41,9 +41,8 @@ class TestConfiguration(TestCase):
}
})
def test_ignore_no_engine(self):
self.configure_tracker()
with self.assertRaises(KeyError):
self.tracker.get_backend('no_engine')
with self.assertRaises(ValueError):
self.configure_tracker()
@override_settings(EVENT_TRACKING_BACKENDS={
"empty_engine": {
......@@ -127,6 +126,56 @@ class TestConfiguration(TestCase):
def test_configure_class_not_a_backend(self):
self.assert_fails_to_configure_with_error()
@override_settings(EVENT_TRACKING_BACKENDS={
'outer_backend': {
'ENGINE': 'eventtracking.django.tests.test_configuration.NestedBackend',
'OPTIONS': {
'backends': {
'inner_backend': {
'ENGINE': 'eventtracking.django.tests.test_configuration.FakeBackendWithOptions',
'OPTIONS': {
'option': sentinel.option_value,
'extra_option': sentinel.extra_option_value
}
},
'nested_backend': {
'ENGINE': 'eventtracking.django.tests.test_configuration.NestedBackend',
'OPTIONS': {
'backends': {
'trivial': {
'ENGINE': 'eventtracking.django.tests.test_configuration.TrivialFakeBackend'
}
},
'processors': [
{'ENGINE': 'eventtracking.django.tests.test_configuration.NopProcessor'},
]
}
}
},
'processors': [
{'ENGINE': 'eventtracking.django.tests.test_configuration.NopProcessor'},
{'ENGINE': 'eventtracking.django.tests.test_configuration.NopProcessor'},
]
}
}
})
def test_configure_nested_backends(self):
self.configure_tracker()
outer_backend = self.tracker.get_backend('outer_backend')
self.assertEquals(len(outer_backend.backends), 2)
inner_backend = outer_backend.backends['inner_backend']
self.assertEquals(inner_backend.option, sentinel.option_value)
self.assertEquals(len(outer_backend.processors), 2)
self.assertTrue(isinstance(outer_backend.processors[0], NopProcessor))
self.assertTrue(isinstance(outer_backend.processors[1], NopProcessor))
nested_backend = outer_backend.backends['nested_backend']
self.assertEquals(len(nested_backend.backends), 1)
self.assertTrue(isinstance(nested_backend.backends['trivial'], TrivialFakeBackend))
self.assertTrue(isinstance(nested_backend.processors[0], NopProcessor))
@override_settings(EVENT_TRACKING_ENABLED=True)
def test_overrides_default_tracker(self):
django.override_default_tracker()
......@@ -148,11 +197,39 @@ class TestConfiguration(TestCase):
self.assertTrue(isinstance(self.tracker.processors[0], NopProcessor))
@override_settings(EVENT_TRACKING_PROCESSORS=[
{
'ENGINE': 'eventtracking.django.tests.test_configuration.ProcessorWithOptions',
'OPTIONS': {
'option': sentinel.option_value
}
}
])
def test_processor_with_options(self):
self.configure_tracker()
self.assertEquals(len(self.tracker.processors), 1)
self.assertTrue(isinstance(self.tracker.processors[0], ProcessorWithOptions))
self.assertEqual(self.tracker.processors[0].option, sentinel.option_value)
@override_settings(EVENT_TRACKING_PROCESSORS=[
{}
])
def test_missing_processor_engine(self):
with self.assertRaises(ValueError):
self.configure_tracker()
@override_settings(EVENT_TRACKING_PROCESSORS=[
{
'ENGINE': 'eventtracking.django.tests.test_configuration.NopProcessor'
},
{
'ENGINE': 'eventtracking.django.tests.test_configuration.NopProcessor'
}
])
def test_multiple_processor(self):
self.configure_tracker()
self.assertEquals(len(self.tracker.processors), 0)
self.assertEquals(len(self.tracker.processors), 2)
self.assertTrue(isinstance(self.tracker.processors[0], NopProcessor))
self.assertTrue(isinstance(self.tracker.processors[1], NopProcessor))
class TrivialFakeBackend(object):
......@@ -181,3 +258,21 @@ class NopProcessor(object):
def __call__(self, event):
pass
class ProcessorWithOptions(object):
"""Takes in an argument"""
def __init__(self, **kwargs):
self. option = kwargs.get('option', None)
def __call__(self, event):
pass
class NestedBackend(TrivialFakeBackend):
"""Supports other backends as children"""
def __init__(self, backends=None, processors=None, **_kwargs):
self.backends = backends or {}
self.processors = processors or []
"""Custom exceptions that are raised by this package"""
class EventEmissionExit(Exception):
"""
Raising this exception indicates that no further processing of the event should occur and it should be dropped.
This should only be raised by processors.
"""
pass
"""Test the whitelist processor"""
from __future__ import absolute_import
from unittest import TestCase
from mock import sentinel
from eventtracking.processors.exceptions import EventEmissionExit
from eventtracking.processors.whitelist import WhitelistProcessor
class TestWhitelistProcessor(TestCase):
"""Test the whitelist processor"""
def test_filtering_out(self):
whitelist = WhitelistProcessor(whitelist=[sentinel.allowed_event])
with self.assertRaises(EventEmissionExit):
whitelist({'name': sentinel.not_allowed_event})
def test_allowed_event(self):
whitelist = WhitelistProcessor(whitelist=[sentinel.allowed_event])
self.assert_event_passed_through(whitelist, {'name': sentinel.allowed_event})
def assert_event_passed_through(self, whitelist, event):
"""Assert that the whitelist allowed the event processing to procede"""
self.assertEquals(whitelist(event), event)
def test_empty_whitelist(self):
whitelist = WhitelistProcessor(whitelist=[])
with self.assertRaises(EventEmissionExit):
whitelist({'name': sentinel.not_allowed_event})
def test_multi_entry_whitelist(self):
whitelist = WhitelistProcessor(whitelist=[sentinel.allowed_event, sentinel.another_event])
with self.assertRaises(EventEmissionExit):
whitelist({'name': sentinel.not_allowed_event})
self.assert_event_passed_through(whitelist, {'name': sentinel.allowed_event})
self.assert_event_passed_through(whitelist, {'name': sentinel.another_event})
"""Filter out events whose names aren't on a pre-configured whitelist"""
from eventtracking.processors.exceptions import EventEmissionExit
class WhitelistProcessor(object):
"""
Filter out events whose names aren't on a pre-configured whitelist.
`whitelist` is an iterable containing event names that should be allowed to pass.
"""
def __init__(self, **kwargs):
self.whitelist = frozenset(kwargs.get('whitelist', []))
def __call__(self, event):
if event['name'] not in self.whitelist:
raise EventEmissionExit()
else:
return event
......@@ -116,23 +116,6 @@ class TestTrack(TestCase): # pylint: disable=missing-docstring
}
)
def test_multiple_backends(self):
self.configure_mock_backends(2)
self.tracker.emit(sentinel.name)
for backend in self._mock_backends:
self.assert_backend_called_with(
sentinel.name, backend=backend)
def test_single_backend_failure(self):
self.configure_mock_backends(2)
self.get_mock_backend(0).send.side_effect = Exception
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(
sentinel.name, backend=self.get_mock_backend(1))
def test_global_tracker(self):
tracker.emit(sentinel.name)
......@@ -184,75 +167,3 @@ class TestTrack(TestCase): # pylint: disable=missing-docstring
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.name)
def test_single_processor(self):
self.tracker.processors.append(self.change_name)
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.changed_name)
def change_name(self, event):
"""Modify the event type of the event"""
event['name'] = sentinel.changed_name
return event
def test_non_callable_processor(self):
self.tracker.processors.append(object())
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.name)
def test_callable_class_processor(self):
class SampleProcessor(object):
"""An event processing class"""
def __call__(self, event):
"""Modify the event type"""
event['name'] = sentinel.class_name
self.tracker.processors.append(SampleProcessor())
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.class_name)
def test_processor_chain(self):
def ensure_modified_event(event):
"""Assert the first processor added a field to the event"""
self.assertIn(sentinel.key, event)
self.assertEquals(event[sentinel.key], sentinel.value)
return event
self.tracker.processors.extend([self.change_name, ensure_modified_event])
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.changed_name)
def test_processor_failure(self):
def always_fail(event): # pylint: disable=unused-argument
"""Always raises an error"""
raise ValueError
self.tracker.processors.extend([always_fail, self.change_name])
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.changed_name)
def test_processor_returns_none(self):
def return_none(event): # pylint: disable=unused-argument
"""Don't return the event"""
pass
self.tracker.processors.append(return_none)
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.name)
def test_processor_modifies_the_same_event_object(self):
def forget_return(event):
"""Modify the event without returning it"""
event['name'] = sentinel.forgotten_return
def ensure_name_changed(event):
"""Assert the event type has been modified even though the event wasn't returned"""
self.assertEquals(event['name'], sentinel.forgotten_return)
self.tracker.processors.extend([forget_return, ensure_name_changed])
self.tracker.emit(sentinel.name)
self.assert_backend_called_with(sentinel.forgotten_return)
......@@ -24,6 +24,7 @@ import logging
from pytz import UTC
from eventtracking.locator import DefaultContextLocator
from eventtracking.backends.routing import RoutingBackend
UNKNOWN_EVENT_TYPE = 'unknown'
DEFAULT_TRACKER_NAME = 'default'
......@@ -37,13 +38,8 @@ class Tracker(object):
be used to persist any events that are emitted.
"""
def __init__(self, backends=None, context_locator=None, processors=None):
self.backends = backends or {}
self.routing_backend = RoutingBackend(backends=backends, processors=processors)
self.context_locator = context_locator or DefaultContextLocator()
self.processors = processors or []
for backend in backends.itervalues():
if not hasattr(backend, 'send') or not callable(backend.send):
raise ValueError('Backend %s does not have a callable "send" method.' % backend.__class__.__name__)
@property
def located_context(self):
......@@ -56,6 +52,16 @@ class Tracker(object):
"""Gets the backend that was configured with `name`"""
return self.backends[name]
@property
def processors(self):
"""The list of registered processors"""
return self.routing_backend.processors
@property
def backends(self):
"""The dictionary of registered backends"""
return self.routing_backend.backends
def emit(self, name=None, data=None):
"""
Emit an event annotated with the UTC time when this function was called.
......@@ -73,41 +79,7 @@ class Tracker(object):
'context': self.resolve_context()
}
event = self.process_event(event)
self.send_to_backends(event)
def process_event(self, event):
"""
Executes all event processors on the event in order.
`event` is a nested dictionary that represents the event.
Returns the modified event.
"""
for processor in self.processors:
try:
modified_event = processor(event)
if modified_event is not None:
event = modified_event
except Exception: # pylint: disable=broad-except
LOG.exception(
'Failed to execute processor: {0}'.format(processor)
)
return event
def send_to_backends(self, event):
"""Sends the event to all registered backends."""
for name, backend in self.backends.iteritems():
try:
backend.send(event)
except Exception: # pylint: disable=broad-except
LOG.exception(
'Unable to send event to backend: {0}'.format(name)
)
self.routing_backend.send(event)
def resolve_context(self):
"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment