Commit 033746d2 by Renzo Lucioni

Asynchronous order fulfillment

Implements the ecommerce worker and a Celery task responsible for managing order fulfillment. ECOM-2242.
parent d9c6ffb2
[run]
data_file = .coverage
omit = ecommerce_worker/configuration*
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
cover
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# OS X
.DS_Store
# Vim
*.swp
# Local configuration overrides
private.py
[pep8]
ignore=E501
max_line_length=119
exclude=configuration
language: python
python:
- "2.7"
sudo: false
# Cache the pip directory. "cache: pip" doesn't work due to install override.
# See https://github.com/travis-ci/travis-ci/issues/3239.
cache:
- directories:
- $HOME/.cache/pip
install:
- pip install -U pip wheel
- make requirements
- pip install coveralls
script:
- make validate
branches:
only:
- master
after_success:
coveralls
Renzo Lucioni <renzo@edx.org>
This diff is collapsed. Click to expand it.
PACKAGE = ecommerce_worker
help:
@echo ' '
@echo 'Makefile for the edX ecommerce worker project. '
@echo ' '
@echo 'Usage: '
@echo ' make help display this message '
@echo ' make requirements install requirements for local development '
@echo ' make worker start the Celery worker process '
@echo ' make test run unit tests and report on coverage '
@echo ' make html_coverage generate and view HTML coverage report '
@echo ' make quality run pep8 and pylint '
@echo ' make validate run tests and quality checks '
@echo ' make clean delete generated byte code and coverage reports '
@echo ' '
requirements:
pip install -qr requirements/local.txt --exists-action w
worker:
celery -A ecommerce_worker worker --app=$(PACKAGE).celery_app:app --loglevel=info
test:
WORKER_CONFIGURATION_MODULE=ecommerce_worker.configuration.test nosetests \
--with-coverage --cover-branches --cover-html --cover-package=$(PACKAGE) $(PACKAGE)
html_coverage:
coverage html && open htmlcov/index.html
quality:
pep8 --config=.pep8 $(PACKAGE)
pylint --rcfile=pylintrc $(PACKAGE)
validate: clean test quality
clean:
find . -name '*.pyc' -delete
coverage erase
rm -rf cover htmlcov
.PHONY: help requirements worker test html_coverage quality validate clean
# ecommerce-worker
edX Ecommerce Worker |Travis|_ |Coveralls|_
============================================
.. |Travis| image:: https://travis-ci.org/edx/ecommerce-worker.svg?branch=master
.. _Travis: https://travis-ci.org/edx/ecommerce-worker
.. |Coveralls| image:: https://coveralls.io/repos/edx/ecommerce-worker/badge.svg?branch=master
.. _Coveralls: https://coveralls.io/r/edx/ecommerce-worker?branch=master
The Celery tasks contained herein are used to implement asynchronous order fulfillment and other features requiring the asynchronous execution of many small, common operations.
Prerequisites
-------------
* Python 2.7.x (not tested with Python 3.x)
* Celery 3.1.x
* RabbitMQ 3.5.x
Getting Started
---------------
Most commands necessary to develop and run this app can be found in the included Makefile. These instructions assume a working integration between the `edX ecommerce service <https://github.com/edx/ecommerce>`_ and the LMS, with asynchronous fulfillment configured on the ecommerce service.
To get started, create a new virtual environment and install the included requirements.
$ make requirements
This project uses `Celery <http://celery.readthedocs.org/en/latest/>`_ to asynchronously execute tasks, such as those required during order fulfillment. Celery requires a solution to send and receive messages which typically comes in the form of a separate service called a message broker. This project uses `RabbitMQ <http://www.rabbitmq.com/>`_ as a message broker. On OS X, use Homebrew to install it.
$ brew install rabbitmq
By default, most operating systems don't allow enough open files for a message broker. RabbitMQ's docs indicate that allowing at least 4096 file descriptors should be sufficient for most development workloads. Check the limit on the number of file descriptors in your current process.
$ ulimit -n
If it needs to be adjusted, run:
$ ulimit -n 4096
Next, start the RabbitMQ server.
$ rabbitmq-server
In a separate process, start the Celery worker.
$ make worker
In a third process, start the ecommerce service. In order for tasks to be visible to the ecommerce worker, the value of Celery's ``BROKER_URL`` setting must shared by the ecommerce service and the ecommerce worker.
Finally, in a fourth process, start the LMS. At this point, if you attempt to enroll in a course supported by the ecommerce service, enrollment will be handled asynchronously by the ecommerce worker.
If you're forced to shut down the Celery workers prematurely, tasks may remain in the queue. To clear them, you can reset RabbitMQ.
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl start_app
License
-------
The code in this repository is licensed under the AGPL unless otherwise noted. Please see ``LICENSE.txt`` for details.
How To Contribute
-----------------
Contributions are welcome. Please read `How To Contribute <https://github.com/edx/edx-platform/blob/master/CONTRIBUTING.rst>`_ for details. Even though it was written with ``edx-platform`` in mind, these guidelines should be followed for Open edX code in general.
Reporting Security Issues
-------------------------
Please do not report security issues in public. Please email security@edx.org.
Mailing List and IRC Channel
----------------------------
You can discuss this code in the `edx-code Google Group <https://groups.google.com/forum/#!forum/edx-code>`_ or in the ``#edx-code`` IRC channel on Freenode.
import os
from celery import Celery
# Environment variable indicating which configuration module to use.
CONFIGURATION = 'WORKER_CONFIGURATION_MODULE'
# Set the default configuration module, if one is not aleady defined.
os.environ.setdefault(CONFIGURATION, 'ecommerce_worker.configuration.local')
app = Celery('ecommerce_worker')
# See http://celery.readthedocs.org/en/latest/userguide/application.html#config-from-envvar.
app.config_from_envvar(CONFIGURATION)
# CELERY
# Default broker URL. See http://celery.readthedocs.org/en/latest/configuration.html#broker-url.
BROKER_URL = None
# Backend used to store task results.
# See http://celery.readthedocs.org/en/latest/configuration.html#celery-result-backend.
CELERY_RESULT_BACKEND = None
# A sequence of modules to import when the worker starts.
# See http://celery.readthedocs.org/en/latest/configuration.html#celery-imports.
CELERY_IMPORTS = (
'ecommerce_worker.fulfillment.v1.tasks',
)
# END CELERY
# ORDER FULFILLMENT
# Absolute URL used to construct API calls against the ecommerce service.
ECOMMERCE_API_ROOT = None
# Long-lived access token used by Celery workers to authenticate against the ecommerce service.
WORKER_ACCESS_TOKEN = None
# Maximum number of retries before giving up on the fulfillment of an order.
# For reference, 11 retries with exponential backoff yields a maximum waiting
# time of 2047 seconds (about 30 minutes). Defaulting this to None could yield
# unwanted behavior: infinite retries.
MAX_FULFILLMENT_RETRIES = 11
# END ORDER FULFILLMENT
import logging
from logging.config import dictConfig
from ecommerce_worker.configuration.base import *
from ecommerce_worker.configuration.logger import get_logger_config
logger = logging.getLogger(__name__)
# CELERY
BROKER_URL = 'amqp://'
# END CELERY
# ORDER FULFILLMENT
ECOMMERCE_API_ROOT = 'http://localhost:8002/api/v2/'
# END ORDER FULFILLMENT
# LOGGING
logger_config = get_logger_config(debug=True, dev_env=True, local_loglevel='DEBUG')
dictConfig(logger_config)
# END LOGGING
# Apply any developer-defined overrides.
try:
from .private import * # pylint: disable=import-error
except ImportError:
logger.warning('No developer-defined configuration overrides have been applied.')
pass
"""Logging configuration"""
from logging.handlers import SysLogHandler
import os
import platform
import sys
def get_logger_config(log_dir='/var/tmp',
logging_env='no_env',
edx_filename='edx.log',
dev_env=False,
debug=False,
local_loglevel='INFO',
service_variant='ecommerce-worker'):
"""
Returns a dictionary containing logging configuration.
If dev_env is True, logging will not be done via local rsyslogd.
Instead, application logs will be dropped into log_dir. 'edx_filename'
is ignored unless dev_env is True.
"""
# Revert to INFO if an invalid string is passed in
if local_loglevel not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
local_loglevel = 'INFO'
hostname = platform.node().split('.')[0]
syslog_format = (
'[service_variant={service_variant}]'
'[%(name)s][env:{logging_env}] %(levelname)s '
'[{hostname} %(process)d] [%(filename)s:%(lineno)d] '
'- %(message)s'
).format(
service_variant=service_variant,
logging_env=logging_env, hostname=hostname
)
if debug:
handlers = ['console']
else:
handlers = ['local']
logger_config = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'standard': {
'format': '%(asctime)s %(levelname)s %(process)d '
'[%(name)s] %(filename)s:%(lineno)d - %(message)s',
},
'syslog_format': {'format': syslog_format},
'raw': {'format': '%(message)s'},
},
'handlers': {
'console': {
'level': 'DEBUG' if debug else 'INFO',
'class': 'logging.StreamHandler',
'formatter': 'standard',
'stream': sys.stdout,
},
},
'loggers': {
'requests': {
'handlers': handlers,
'level': 'WARNING',
'propagate': True
},
'': {
'handlers': handlers,
'level': 'DEBUG',
'propagate': False
},
}
}
if dev_env:
edx_file_loc = os.path.join(log_dir, edx_filename)
logger_config['handlers'].update({
'local': {
'class': 'logging.handlers.RotatingFileHandler',
'level': local_loglevel,
'formatter': 'standard',
'filename': edx_file_loc,
'maxBytes': 1024 * 1024 * 2,
'backupCount': 5,
},
})
else:
logger_config['handlers'].update({
'local': {
'level': local_loglevel,
'class': 'logging.handlers.SysLogHandler',
# Use a different address for Mac OS X
'address': '/var/run/syslog' if sys.platform == 'darwin' else '/dev/log',
'formatter': 'syslog_format',
'facility': SysLogHandler.LOG_LOCAL0,
},
})
return logger_config
from logging.config import dictConfig
import os
import yaml
from ecommerce_worker.configuration.base import *
from ecommerce_worker.configuration.logger import get_logger_config
def get_overrides_filename(variable):
"""
Get the name of the file containing configuration overrides
from the provided environment variable.
"""
filename = os.environ.get(variable)
if filename is None:
msg = 'Please set the {} environment variable.'.format(variable)
raise EnvironmentError(msg)
return filename
# LOGGING
logger_config = get_logger_config()
dictConfig(logger_config)
# END LOGGING
filename = get_overrides_filename('ECOMMERCE_WORKER_CFG')
with open(filename) as f:
config_from_yaml = yaml.load(f)
# Override base configuration with values from disk.
vars().update(config_from_yaml)
from logging.config import dictConfig
from ecommerce_worker.configuration.base import *
from ecommerce_worker.configuration.logger import get_logger_config
# CELERY
CELERY_ALWAYS_EAGER = True
# END CELERY
# ORDER FULFILLMENT
ECOMMERCE_API_ROOT = 'http://localhost:8002/api/v2/'
WORKER_ACCESS_TOKEN = 'fake-access-token'
# END ORDER FULFILLMENT
# LOGGING
logger_config = get_logger_config(debug=True, dev_env=True, local_loglevel='DEBUG')
dictConfig(logger_config)
# END LOGGING
"""Order fulfillment tasks."""
from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from ecommerce_api_client import exceptions
from ecommerce_api_client.client import EcommerceApiClient
from ecommerce_worker.utils import get_configuration
logger = get_task_logger(__name__) # pylint: disable=invalid-name
@shared_task(bind=True, ignore_result=True)
def fulfill_order(self, order_number):
"""Fulfills an order.
Arguments:
order_number (str): Order number indicating which order to fulfill.
Returns:
None
"""
ecommerce_api_root = get_configuration('ECOMMERCE_API_ROOT')
worker_access_token = get_configuration('WORKER_ACCESS_TOKEN')
max_fulfillment_retries = get_configuration('MAX_FULFILLMENT_RETRIES')
if not (ecommerce_api_root and worker_access_token and max_fulfillment_retries):
raise RuntimeError('Worker is improperly configured for order fulfillment.')
api = EcommerceApiClient(ecommerce_api_root, oauth_access_token=worker_access_token)
try:
logger.info('Requesting fulfillment of order [%s].', order_number)
api.orders(order_number).fulfill.put()
except exceptions.HttpClientError as exc:
status_code = exc.response.status_code
if status_code == 406:
# The order is not fulfillable. Therefore, it must be complete.
logger.info('Order [%s] has already been fulfilled. Ignoring.', order_number)
raise Ignore()
else:
# Unknown client error. Re-raise the exception.
logger.exception('Fulfillment of order [%s] failed.', order_number)
raise exc
except (exceptions.HttpServerError, exceptions.Timeout) as exc:
# Fulfillment failed. Retry with exponential backoff until fulfillment
# succeeds or the retry limit is reached. If the retry limit is exceeded,
# the exception is re-raised.
retries = self.request.retries
if retries == max_fulfillment_retries:
logger.exception('Fulfillment of order [%s] failed.', order_number)
else:
logger.exception('Fulfillment of order [%s] failed. Retrying.', order_number)
countdown = 2 ** retries
raise self.retry(exc=exc, countdown=countdown, max_retries=max_fulfillment_retries)
"""Tests of fulfillment tasks."""
# pylint: disable=no-value-for-parameter
from unittest import TestCase
from celery.exceptions import Ignore
import ddt
from ecommerce_api_client import exceptions
import httpretty
import mock
from ecommerce_worker.fulfillment.v1.tasks import fulfill_order
from ecommerce_worker.utils import get_configuration
@ddt.ddt
class OrderFulfillmentTaskTests(TestCase):
"""Tests of the order fulfillment task."""
ORDER_NUMBER = 'FAKE-123456'
API_URL = '{root}/orders/{number}/fulfill/'.format(
root=get_configuration('ECOMMERCE_API_ROOT').strip('/'),
number=ORDER_NUMBER
)
@ddt.data(
'ECOMMERCE_API_ROOT',
'WORKER_ACCESS_TOKEN',
'MAX_FULFILLMENT_RETRIES',
)
def test_requires_configuration(self, setting):
"""Verify that the task refuses to run without the configuration it requires."""
with mock.patch('ecommerce_worker.configuration.test.' + setting, None):
with self.assertRaises(RuntimeError):
fulfill_order(self.ORDER_NUMBER)
@httpretty.activate
def test_fulfillment_success(self):
"""Verify that the task exits without an error when fulfillment succeeds."""
httpretty.register_uri(httpretty.PUT, self.API_URL, status=200, body={})
result = fulfill_order.delay(self.ORDER_NUMBER).get()
self.assertIsNone(result)
# Validate the value of the HTTP Authorization header.
last_request = httpretty.last_request()
authorization = last_request.headers.get('authorization')
self.assertEqual(authorization, 'Bearer ' + get_configuration('WORKER_ACCESS_TOKEN'))
@httpretty.activate
def test_fulfillment_not_possible(self):
"""Verify that the task exits without an error when fulfillment is not possible."""
httpretty.register_uri(httpretty.PUT, self.API_URL, status=406, body={})
with self.assertRaises(Ignore):
fulfill_order(self.ORDER_NUMBER)
@httpretty.activate
def test_fulfillment_unknown_client_error(self):
"""
Verify that the task raises an exception when fulfillment fails because of an
unknown client error.
"""
httpretty.register_uri(httpretty.PUT, self.API_URL, status=404, body={})
with self.assertRaises(exceptions.HttpClientError):
fulfill_order(self.ORDER_NUMBER)
@httpretty.activate
def test_fulfillment_failure(self):
"""Verify that the task raises an exception when fulfillment fails."""
httpretty.register_uri(httpretty.PUT, self.API_URL, status=500, body={})
with self.assertRaises(exceptions.HttpServerError):
fulfill_order.delay(self.ORDER_NUMBER).get()
@httpretty.activate
def test_fulfillment_timeout(self):
"""Verify that the task raises an exception when fulfillment times out."""
httpretty.register_uri(httpretty.PUT, self.API_URL, status=404, body=self._timeout_body)
with self.assertRaises(exceptions.Timeout):
fulfill_order.delay(self.ORDER_NUMBER).get()
@httpretty.activate
def test_fulfillment_retry_success(self):
"""Verify that the task is capable of successfully retrying after fulfillment failure."""
httpretty.register_uri(httpretty.PUT, self.API_URL, responses=[
httpretty.Response(status=500, body={}),
httpretty.Response(status=200, body={}),
])
result = fulfill_order.delay(self.ORDER_NUMBER).get()
self.assertIsNone(result)
def _timeout_body(self, request, uri, headers): # pylint: disable=unused-argument
"""Helper used to force httpretty to raise Timeout exceptions."""
raise exceptions.Timeout
"""Helper functions."""
import os
import sys
from ecommerce_worker.celery_app import CONFIGURATION
def get_configuration(variable):
"""Get a value from configuration.
Retrieves the value corresponding to the given variable from the
configuration module currently in use by the app.
Arguments:
variable (str): The name of a variable from the configuration module.
Returns:
The value corresponding to the variable, or None if the variable is not found.
"""
name = os.environ.get(CONFIGURATION)
# __import__ performs a full import, but only returns the top-level
# package, not the targeted module. sys.modules is a dictionary
# mapping module names to loaded modules.
__import__(name)
module = sys.modules[name]
return getattr(module, variable, None)
# ***************************
# ** DO NOT EDIT THIS FILE **
# ***************************
#
# It is generated by:
# $ edx_lint write pylintrc
#
#
#
#
#
#
#
#
# STAY AWAY!
#
#
#
#
#
# SERIOUSLY.
#
# ------------------------------
[MASTER]
profile = no
ignore = configuration, celery_app.py
persistent = yes
load-plugins = edx_lint.pylint
[MESSAGES CONTROL]
disable =
locally-disabled,
locally-enabled,
too-few-public-methods,
bad-builtin,
star-args,
abstract-class-not-used,
abstract-class-little-used,
no-init,
fixme,
too-many-lines,
no-self-use,
too-many-ancestors,
too-many-instance-attributes,
too-few-public-methods,
too-many-public-methods,
too-many-return-statements,
too-many-branches,
too-many-arguments,
too-many-locals,
unused-wildcard-import,
duplicate-code
[REPORTS]
output-format = text
files-output = no
reports = no
evaluation = 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10)
comment = no
[BASIC]
required-attributes =
bad-functions = map,filter,apply,input
module-rgx = (([a-z_][a-z0-9_]*)|([A-Z][a-zA-Z0-9]+))$
const-rgx = (([A-Z_][A-Z0-9_]*)|(__.*__)|log|urlpatterns)$
class-rgx = [A-Z_][a-zA-Z0-9]+$
function-rgx = ([a-z_][a-z0-9_]{2,30}|test_[a-z0-9_]+)$
method-rgx = ([a-z_][a-z0-9_]{2,40}|setUp|set[Uu]pClass|tearDown|tear[Dd]ownClass|assert[A-Z]\w*|maxDiff|test_[a-z0-9_]+)$
attr-rgx = [a-z_][a-z0-9_]{2,30}$
argument-rgx = [a-z_][a-z0-9_]{2,30}$
variable-rgx = [a-z_][a-z0-9_]{2,30}$
class-attribute-rgx = ([A-Za-z_][A-Za-z0-9_]{2,30}|(__.*__))$
inlinevar-rgx = [A-Za-z_][A-Za-z0-9_]*$
good-names = f,i,j,k,db,ex,Run,_,__
bad-names = foo,bar,baz,toto,tutu,tata
no-docstring-rgx = __.*__|test_.+|setUp|tearDown
docstring-min-length = -1
[FORMAT]
max-line-length = 120
ignore-long-lines = ^\s*(# )?<?https?://\S+>?$
single-line-if-stmt = no
no-space-check = trailing-comma,dict-separator
max-module-lines = 1000
indent-string = ' '
[MISCELLANEOUS]
notes = FIXME,XXX,TODO
[SIMILARITIES]
min-similarity-lines = 4
ignore-comments = yes
ignore-docstrings = yes
ignore-imports = no
[TYPECHECK]
ignore-mixin-members = yes
ignored-classes = SQLObject
zope = no
generated-members =
REQUEST,
acl_users,
aq_parent,
objects,
DoesNotExist,
can_read,
can_write,
get_url,
size,
content,
status_code,
create,
build,
fields,
tag,
org,
course,
category,
name,
revision,
_meta,
[VARIABLES]
init-import = no
dummy-variables-rgx = _|dummy|unused|.*_unused
additional-builtins =
[CLASSES]
ignore-iface-methods = isImplementedBy,deferred,extends,names,namesAndDescriptions,queryDescriptionFor,getBases,getDescriptionFor,getDoc,getName,getTaggedValue,getTaggedValueTags,isEqualOrExtendedBy,setTaggedValue,isImplementedByInstancesOf,adaptWith,is_implemented_by
defining-attr-methods = __init__,__new__,setUp
valid-classmethod-first-arg = cls
valid-metaclass-classmethod-first-arg = mcs
[DESIGN]
max-args = 5
ignored-argument-names = _.*
max-locals = 15
max-returns = 6
max-branches = 12
max-statements = 50
max-parents = 7
max-attributes = 7
min-public-methods = 2
max-public-methods = 20
[IMPORTS]
deprecated-modules = regsub,TERMIOS,Bastion,rexec
import-graph =
ext-import-graph =
int-import-graph =
[EXCEPTIONS]
overgeneral-exceptions = Exception
# 5053851b5ac2d651899ebff730c1ba2610f1ad8f
# Local tweaks to pylintrc.
[MASTER]
ignore=configuration, celery_app.py
-r requirements/production.txt
celery==3.1.18
edx-ecommerce-api-client==1.1.0
# Packages required for local development
-r test.txt
# Packages required in a production environment
-r base.txt
PyYAML==3.11
# Packages required for running tests
-r base.txt
coverage==3.7.1
ddt==1.0.0
edx-lint==0.2.7
httpretty==0.8.10
mock==1.3.0
nose==1.3.7
pep8==1.6.2
from setuptools import setup, find_packages
with open('README.rst') as readme:
long_description = readme.read()
setup(
name='edx-ecommerce-worker',
version='0.1.0',
description='Celery tasks supporting the operations of edX\'s ecommerce service',
long_description=long_description,
classifiers=[
'Development Status :: 3 - Alpha',
'License :: OSI Approved :: GNU Affero General Public License v3',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Topic :: Internet',
'Intended Audience :: Developers',
'Environment :: Web Environment',
],
keywords='edx ecommerce worker',
url='https://github.com/edx/ecommerce-worker',
author='edX',
author_email='oscm@edx.org',
license='AGPL',
packages=find_packages(exclude=['*.tests']),
install_requires=['celery', 'edx-ecommerce-api-client'],
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment