Commit 7f2fcb9d by Renzo Lucioni Committed by GitHub

Refine parallel metadata refresh pipeline (#364)

Force each process to use its own database connection, add a migration creating relevant switches, and simplify the logic determining whether it's safe to use threads.

ECOM-5871
parent ba8a85ca
......@@ -2,6 +2,7 @@
branch = True
data_file = .coverage
source=course_discovery
concurrency=multiprocessing
omit =
course_discovery/settings*
course_discovery/conf*
......
......@@ -54,6 +54,7 @@ production-requirements:
test: clean
coverage run ./manage.py test course_discovery --settings=course_discovery.settings.test
coverage combine
coverage report
quality:
......
......@@ -3,6 +3,7 @@ import itertools
import logging
from django.core.management import BaseCommand, CommandError
from django.db import connections
from edx_rest_api_client.client import EdxRestApiClient
import waffle
......@@ -27,6 +28,24 @@ def execute_loader(loader_class, *loader_args):
logger.exception('%s failed!', loader_class.__name__)
def execute_parallel_loader(loader_class, *loader_args):
"""
ProcessPoolExecutor uses the multiprocessing module. Multiprocessing forks processes,
causing connection objects to be copied across processes. The key goal when running
multiple Python processes is to prevent any database connections from being shared
across processes. Depending on specifics of the driver and OS, the issues that arise
here range from non-working connections to socket connections that are used by multiple
processes concurrently, leading to broken messaging (e.g., 'MySQL server has gone away').
To get around this, we force each process to open its own connection to the database by
closing the existing, copied connection as soon as we're within the new process. This works
because Django is smart enough to initialize a new connection the next time one is necessary.
"""
connections.close_all()
execute_loader(loader_class, *loader_args)
class Command(BaseCommand):
help = 'Refresh course metadata from external sources.'
......@@ -105,7 +124,7 @@ class Command(BaseCommand):
# to create courses. If courses do exist, this command is likely being run
# as an update, significantly lowering the probability of race conditions.
courses_exist = Course.objects.filter(partner=partner).exists()
is_threadsafe = True if courses_exist and waffle.switch_is_active('threaded_metadata_write') else False
is_threadsafe = courses_exist and waffle.switch_is_active('threaded_metadata_write')
logger.info(
'Command is{negation} using threads to write data.'.format(negation='' if is_threadsafe else ' not')
......@@ -140,7 +159,7 @@ class Command(BaseCommand):
for loader_class, api_url, max_workers_override in stage:
if api_url:
executor.submit(
execute_loader,
execute_parallel_loader,
loader_class,
partner,
api_url,
......
from django.db import migrations
NAMES = ('threaded_metadata_write', 'parallel_refresh_pipeline')
def create_switches(apps, schema_editor):
"""Create the threaded_metadata_write and parallel_refresh_pipeline switches."""
Switch = apps.get_model('waffle', 'Switch')
for name in NAMES:
Switch.objects.get_or_create(name=name, defaults={'active': False})
def delete_switches(apps, schema_editor):
"""Delete the threaded_metadata_write and parallel_refresh_pipeline switches."""
Switch = apps.get_model('waffle', 'Switch')
Switch.objects.filter(name__in=NAMES).delete()
class Migration(migrations.Migration):
dependencies = [
('course_metadata', '0029_auto_20160923_1306'),
('waffle', '0001_initial'),
]
operations = [
migrations.RunPython(create_switches, reverse_code=delete_switches),
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment