Commit c7f9e597 by Jillian Vogel

Fixes Table not found: course_blocks

Removes the erroneous complete() methods, which were causing the hive partition
tasks to appear complete, when they hadn't loaded the hive table yet.

Updates acceptance tests to do two runs to demostrate table not found issue:
one with raw, JSON data from REST API, and one with processed, hive data
parent bad58b51
......@@ -90,20 +90,17 @@ class PullCourseBlocksApiData(CourseBlocksDownstreamMixin, luigi.Task):
)
def requires(self):
results = {
'course_list': CourseListApiDataTask(
date=self.date,
output_root=self.input_root,
overwrite=self.overwrite,
)
}
return results
return CourseListApiDataTask(
date=self.date,
output_root=self.input_root,
overwrite=self.overwrite,
)
def run(self):
self.remove_output_on_overwrite()
courses = []
with self.input()['course_list'].open('r') as course_list_file:
with self.input().open('r') as course_list_file:
for line in course_list_file:
course = CourseRecord.from_tsv(line)
courses.append(course.course_id)
......@@ -283,7 +280,7 @@ class CourseBlocksApiDataTask(CourseBlocksDownstreamMixin, MapReduceJobTask):
def complete(self):
"""
The task is complete if the output_root file is present.
The task is complete if the output_root/_SUCCESS file is present.
"""
return get_target_from_url(url_path_join(self.output_root, '_SUCCESS')).exists()
......@@ -379,9 +376,3 @@ class CourseBlocksPartitionTask(CourseBlocksDownstreamMixin, MapReduceJobTaskMix
def output_root(self):
"""Expose the partition location path as the output root."""
return self.partition_location
def complete(self):
"""
The task is complete if the output_root/_SUCCESS file is present.
"""
return get_target_from_url(url_path_join(self.output_root, '_SUCCESS')).exists()
......@@ -160,7 +160,7 @@ class CourseListApiDataTask(CourseListDownstreamMixin, MapReduceJobTask):
course = json.loads(line)
# eucalpytus API uses 'id' instead of 'course_id'
# eucalyptus API uses 'id' instead of 'course_id'
if 'id' in course:
course_id = course['id']
del course['id']
......@@ -269,9 +269,3 @@ class CourseListPartitionTask(CourseListDownstreamMixin, MapReduceJobTaskMixin,
def output_root(self):
"""Expose the partition location path as the output root."""
return self.partition_location
def complete(self):
"""
The task is complete if the output_root/_SUCCESS file is present.
"""
return get_target_from_url(url_path_join(self.output_root, '_SUCCESS')).exists()
......@@ -388,12 +388,6 @@ class LatestProblemResponsePartitionTask(ProblemResponseTableMixin, HivePartitio
"""Expose the partition location path as the output root."""
return self.partition_location
def complete(self):
"""
The task is complete if the output_root/_SUCCESS file is present.
"""
return get_target_from_url(url_path_join(self.output_root, '_SUCCESS')).exists()
@property
def hive_table_task(self):
return LatestProblemResponseTableTask(
......@@ -525,12 +519,6 @@ class ProblemResponseLocationPartitionTask(ProblemResponseTableMixin, HivePartit
"""Expose the partition location path as the output root."""
return self.partition_location
def complete(self):
"""
The task is complete if the output_root is present.
"""
return get_target_from_url(self.output_root).exists()
@property
def hive_table_task(self):
return ProblemResponseLocationTableTask(
......
......@@ -65,7 +65,6 @@ class CourseBlocksTestMixin(object):
shutil.rmtree(dirname)
@ddt
class CourseBlocksApiDataTaskTest(CourseBlocksTestMixin, TestCase):
"""Tests the CourseBlocksApiDataTask basic functions. """
......
......@@ -46,6 +46,20 @@ class CourseListTestMixin(object):
shutil.rmtree(dirname)
class CourseListApiDataTaskTest(CourseListTestMixin, TestCase):
"""Tests the CourseBlocksApiDataTask basic functions. """
def test_complete(self):
self.create_task()
self.assertFalse(self.task.complete())
# Create the output_root/_SUCCESS file
with open(os.path.join(self.output_dir, '_SUCCESS'), 'w') as success:
success.write('')
self.assertTrue(self.task.output().exists())
self.assertTrue(self.task.complete())
@ddt
class CourseListApiDataMapperTaskTest(CourseListTestMixin, MapperTestMixin, TestCase):
"""Tests the CourseListApiDataTask mapper output"""
......
......@@ -24,13 +24,37 @@ class CourseBlocksPartitionTaskAcceptanceTest(AcceptanceTestCase):
super(CourseBlocksPartitionTaskAcceptanceTest, self).setUp()
self.partition = "dt=" + self.DATE.strftime(self.DAILY_PARTITION_FORMAT)
# Copy course list and course blocks REST API data
for table_name in ('course_list', 'course_blocks'):
file_name = table_name + '.json'
self.upload_file(url_path_join(self.data_dir, 'input', file_name),
url_path_join(self.warehouse_path, table_name + '_raw', self.partition, file_name))
# Copy raw input from the course_blocks REST API task into warehouse
self.copy_raw_input('course_blocks')
def test_partition_task(self):
def copy_raw_input(self, table_name):
"""Copy raw REST API json data for the given table into warehouse."""
file_name = table_name + '.json'
self.upload_file(url_path_join(self.data_dir, 'input', file_name),
url_path_join(self.warehouse_path, table_name + '_raw', self.partition, file_name))
def copy_hive_input(self, table_name):
"""Copy processed hive data for the given table into warehouse."""
daily_partition = "dt=" + self.DATE.strftime(self.DAILY_PARTITION_FORMAT)
for file_name in ('part-00000', 'part-00001', '_SUCCESS'):
self.upload_file(url_path_join(self.data_dir, 'output', table_name, file_name),
url_path_join(self.warehouse_path, table_name, daily_partition, file_name))
def test_partition_task_raw_input(self):
"""Run the CourseBlocksPartitionTask using raw Course List input data."""
self.copy_raw_input('course_list')
self.validate_partition_task()
def test_partition_task_hive_input(self):
"""
Run the CourseBlocksPartitionTask using hive Course List input data.
This simulates what happens when the course blocks task is run more often than the course list task.
"""
self.copy_hive_input('course_list')
self.validate_partition_task()
def validate_partition_task(self):
"""Run the CourseBlocksPartitionTask and test its output."""
date = self.DATE.strftime('%Y-%m-%d')
input_root = url_path_join(self.warehouse_path, 'course_list', self.partition)
......@@ -47,7 +71,6 @@ class CourseBlocksPartitionTaskAcceptanceTest(AcceptanceTestCase):
def validate_hive(self):
"""Ensure hive partition was created as expected."""
table_name = 'course_blocks'
output_dir = url_path_join(self.data_dir, 'output', table_name)
for file_name in ('_SUCCESS', 'part-00000', 'part-00001'):
......
......@@ -27,18 +27,44 @@ class ProblemResponseReportWorkflowAcceptanceTest(AcceptanceTestCase):
"""Copy the input data into place."""
super(ProblemResponseReportWorkflowAcceptanceTest, self).setUp()
# Copy course list and course blocks REST API data
for table_name in ('course_list', 'course_blocks'):
daily_partition = "dt=" + self.DATE.strftime(self.DAILY_PARTITION_FORMAT)
file_name = table_name + '.json'
self.upload_file(url_path_join(self.data_dir, 'input', file_name),
url_path_join(self.warehouse_path, table_name + '_raw', daily_partition, file_name))
self.partition = "dt=" + self.DATE.strftime(self.DAILY_PARTITION_FORMAT)
# Copy tracking logs into hdfs
self.upload_tracking_log(url_path_join(self.data_dir, 'input', self.TRACKING_LOG), self.DATE)
def test_problem_response_report(self):
"""Run the ProblemResponseReportWorkflow task and test its output."""
def setup_raw_input(self):
"""Copy raw course list and course blocks REST API data into warehouse."""
for table_name in ('course_list', 'course_blocks'):
file_name = table_name + '.json'
self.upload_file(url_path_join(self.data_dir, 'input', file_name),
url_path_join(self.warehouse_path, table_name + '_raw', self.partition, file_name))
def setup_hive_input(self):
"""Copy processed hive course list and course blocks data into warehouse."""
for table_name in ('course_list', 'course_blocks'):
for file_name in ('part-00000', 'part-00001', '_SUCCESS'):
self.upload_file(url_path_join(self.data_dir, 'output', table_name, file_name),
url_path_join(self.warehouse_path, table_name, self.partition, file_name))
def test_problem_response_report_raw_input(self):
"""
Run the ProblemResponseReportWorkflow task against raw json input data, to simulate running the reports
prior to having course hive data.
"""
self.setup_raw_input()
self.validate_problem_response_report()
def test_problem_response_report_hive_input(self):
"""
Run the ProblemResponseReportWorkflow task against previously-processed hive input data, to simulate running the
reports after course hive data has been processed. This is what happens when the problem respons report task
runs more frequently than the course data tasks.
"""
self.setup_hive_input()
self.validate_problem_response_report()
def validate_problem_response_report(self):
"""Run the ProblemResponseReportWorkflow task and test the output."""
marker_path = url_path_join(self.test_out, 'marker-{}'.format(str(time.time())))
report_date = self.DATE.strftime('%Y-%m-%d')
self.task.launch([
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment