Commit 742ea821 by Matthew Piatetsky Committed by GitHub

Merge pull request #440 from edx/add_kwargs_to_parallel_loader

ECOM-6376 Add kwargs to parallel loader
parents ef635991 3c12fd43
...@@ -29,7 +29,7 @@ def execute_loader(loader_class, *loader_args, **loader_kwargs): ...@@ -29,7 +29,7 @@ def execute_loader(loader_class, *loader_args, **loader_kwargs):
logger.exception('%s failed!', loader_class.__name__) logger.exception('%s failed!', loader_class.__name__)
def execute_parallel_loader(loader_class, *loader_args): def execute_parallel_loader(loader_class, *loader_args, **loader_kwargs):
""" """
ProcessPoolExecutor uses the multiprocessing module. Multiprocessing forks processes, ProcessPoolExecutor uses the multiprocessing module. Multiprocessing forks processes,
causing connection objects to be copied across processes. The key goal when running causing connection objects to be copied across processes. The key goal when running
...@@ -45,7 +45,7 @@ def execute_parallel_loader(loader_class, *loader_args): ...@@ -45,7 +45,7 @@ def execute_parallel_loader(loader_class, *loader_args):
""" """
connection.close() connection.close()
execute_loader(loader_class, *loader_args) execute_loader(loader_class, *loader_args, **loader_kwargs)
class Command(BaseCommand): class Command(BaseCommand):
......
...@@ -17,6 +17,7 @@ from course_discovery.apps.course_metadata.data_loaders.marketing_site import ( ...@@ -17,6 +17,7 @@ from course_discovery.apps.course_metadata.data_loaders.marketing_site import (
SponsorMarketingSiteDataLoader, PersonMarketingSiteDataLoader, CourseMarketingSiteDataLoader SponsorMarketingSiteDataLoader, PersonMarketingSiteDataLoader, CourseMarketingSiteDataLoader
) )
from course_discovery.apps.course_metadata.data_loaders.tests import mock_data from course_discovery.apps.course_metadata.data_loaders.tests import mock_data
from course_discovery.apps.course_metadata.management.commands.refresh_course_metadata import execute_parallel_loader
from course_discovery.apps.course_metadata.tests import toggle_switch from course_discovery.apps.course_metadata.tests import toggle_switch
from course_discovery.apps.course_metadata.tests.factories import CourseFactory from course_discovery.apps.course_metadata.tests.factories import CourseFactory
...@@ -29,7 +30,18 @@ class RefreshCourseMetadataCommandTests(TransactionTestCase): ...@@ -29,7 +30,18 @@ class RefreshCourseMetadataCommandTests(TransactionTestCase):
def setUp(self): def setUp(self):
super(RefreshCourseMetadataCommandTests, self).setUp() super(RefreshCourseMetadataCommandTests, self).setUp()
self.partner = PartnerFactory() self.partner = PartnerFactory()
partner = self.partner
self.pipeline = [(SubjectMarketingSiteDataLoader, partner.marketing_site_url_root, None),
(SchoolMarketingSiteDataLoader, partner.marketing_site_url_root, None),
(SponsorMarketingSiteDataLoader, partner.marketing_site_url_root, None),
(PersonMarketingSiteDataLoader, partner.marketing_site_url_root, None),
(CourseMarketingSiteDataLoader, partner.marketing_site_url_root, None),
(OrganizationsApiDataLoader, partner.organizations_api_url, None),
(CoursesApiDataLoader, partner.courses_api_url, None),
(EcommerceApiDataLoader, partner.ecommerce_api_url, 1),
(ProgramsApiDataLoader, partner.programs_api_url, None),
(XSeriesMarketingSiteDataLoader, partner.marketing_site_url_root, None)]
self.kwargs = {'username': 'bob'}
self.mock_access_token_api() self.mock_access_token_api()
def mock_apis(self): def mock_apis(self):
...@@ -112,11 +124,24 @@ class RefreshCourseMetadataCommandTests(TransactionTestCase): ...@@ -112,11 +124,24 @@ class RefreshCourseMetadataCommandTests(TransactionTestCase):
) )
return bodies return bodies
@ddt.data(True, False) def test_refresh_course_metadata_serial(self):
def test_refresh_course_metadata(self, is_parallel): with responses.RequestsMock() as rsps:
if is_parallel: self.mock_access_token_api(rsps)
for name in ['threaded_metadata_write', 'parallel_refresh_pipeline']: self.mock_apis()
toggle_switch(name)
with mock.patch('course_discovery.apps.course_metadata.management.commands.'
'refresh_course_metadata.execute_loader') as mock_executor:
call_command('refresh_course_metadata')
# Set up expected calls
expected_calls = [mock.call(loader_class, self.partner, api_url,
ACCESS_TOKEN, 'JWT', max_workers_override or 7, False, **self.kwargs)
for loader_class, api_url, max_workers_override in self.pipeline]
mock_executor.assert_has_calls(expected_calls)
def test_refresh_course_metadata_parallel(self):
for name in ['threaded_metadata_write', 'parallel_refresh_pipeline']:
toggle_switch(name)
with responses.RequestsMock() as rsps: with responses.RequestsMock() as rsps:
self.mock_access_token_api(rsps) self.mock_access_token_api(rsps)
...@@ -126,8 +151,15 @@ class RefreshCourseMetadataCommandTests(TransactionTestCase): ...@@ -126,8 +151,15 @@ class RefreshCourseMetadataCommandTests(TransactionTestCase):
# courses, the command won't risk race conditions between threads trying to # courses, the command won't risk race conditions between threads trying to
# create the same course. # create the same course.
CourseFactory(partner=self.partner) CourseFactory(partner=self.partner)
with mock.patch('concurrent.futures.ProcessPoolExecutor.submit') as mock_executor:
call_command('refresh_course_metadata')
call_command('refresh_course_metadata') # Set up expected calls
expected_calls = [mock.call(execute_parallel_loader, loader_class,
self.partner, api_url, ACCESS_TOKEN,
'JWT', max_workers_override or 7, True, **self.kwargs)
for loader_class, api_url, max_workers_override in self.pipeline]
mock_executor.assert_has_calls(expected_calls, any_order=True)
def test_refresh_course_metadata_with_invalid_partner_code(self): def test_refresh_course_metadata_with_invalid_partner_code(self):
""" Verify an error is raised if an invalid partner code is passed on the command line. """ """ Verify an error is raised if an invalid partner code is passed on the command line. """
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment