Unverified Commit f6594f0e by brianhw Committed by GitHub

Merge pull request #457 from edx/brian/course-subjects

Move course-subjects to use the discovery API
parents 446f4025 910c4fd4
{ "key": "edX/Open_DemoX/edx_demo_course", "uuid": "acbdef12-1234-a5eb-90e9-fcfea234a04d", "title": "All about acceptance testing!", "course_runs": [ { "course": "edX+Open_DemoX", "key": "edX/Open_DemoX/edx_demo_course", "title": "All about acceptance testing!", "short_description": "Learn about the frameworks, tools and techniques you can apply to your acceptance testing today!", "full_description": "In theory, this would contain a long description.", "start": "2016-09-01T00:00:00Z", "end": "2016-12-01T00:00:00Z", "enrollment_start": "2016-06-01T00:00:00Z", "enrollment_end": "2016-09-01T00:00:00Z", "announcement": null, "image": { "src": "https://example.com/not-a-real-image.jpg", "description": null, "height": null, "width": null }, "video": null, "seats": [ { "type": "honor", "price": "0.00", "currency": "USD", "upgrade_deadline": "2017-04-01T00:00:00Z", "credit_provider": "edx", "credit_hours": 4 }, { "type": "verified", "price": "50.00", "currency": "USD", "upgrade_deadline": "2016-10-01T08:00:00Z", "credit_provider": "edx", "credit_hours": 3 } ], "content_language": "en-us", "transcript_languages": [], "instructors": [], "staff": [], "pacing_type": "instructor_paced", "min_effort": 2, "max_effort": 4, "modified": "2016-09-07T18:38:32.207371Z", "marketing_url": "https://www.edx.org/course/all-about-acceptance-testing-edx-testing101-3t2016", "level_type": "Intermediate", "availability": "Current", "programs": [ { "uuid": "acb243a0-1234-5abe-099e-ffcae2a340d4", "title": "Testing", "type": "XSeries", "marketing_slug": "tests", "marketing_url": "https://example.com/xseries/testing" } ], "partner_short_code": "openedx", "announcement": "2016-08-02T00:00:00Z", "reporting_type": "test" }, { "course": "edX+Open_DemoX", "key": "course-v1:edX+Open_DemoX+edx_demo_course2", "title": "All about acceptance testing!", "short_description": "Learn about the frameworks, tools and techniques you can apply to your acceptance testing today!", "full_description": "In theory, this would contain a long description.", "start": "2016-06-01T00:00:00Z", "end": "2016-09-01T00:00:00Z", "enrollment_start": "2016-03-01T00:00:00Z", "enrollment_end": "2016-06-01T00:00:00Z", "announcement": null, "image": { "src": "https://example.com/not-a-real-image.jpg", "description": null, "height": null, "width": null }, "video": null, "seats": [ { "type": "honor", "price": "0.00", "currency": "USD", "upgrade_deadline": "2017-03-01T00:00:00Z", "credit_provider": "edx", "credit_hours": 4 }, { "type": "verified", "price": "50.00", "currency": "USD", "upgrade_deadline": "2016-08-01T09:00:00Z", "credit_provider": "edx", "credit_hours": 4 } ], "content_language": "en-us", "transcript_languages": [], "instructors": [], "staff": [], "pacing_type": "self_paced", "min_effort": 3, "max_effort": 5, "modified": "2016-09-07T18:38:32.207371Z", "marketing_url": "https://www.edx.org/course/all-about-acceptance-testing-edx-testing101-2t2016", "level_type": "Introductory", "availability": "Archived", "programs": [ { "uuid": "acb243a0-1234-5abe-099e-ffcae2a340d4", "title": "Testing", "type": "XSeries", "marketing_slug": "tests", "marketing_url": "https://example.com/xseries/testing" } ], "partner_short_code": "openedx", "announcement": "2016-06-05T00:00:00Z", "reporting_type": "mooc" } ], "entitlements": [], "owners": [], "short_description": "Learn about the frameworks, tools and techniques you can apply to your acceptance testing today!", "full_description": "In theory, this would contain a long description.", "level_type": "Introductory", "subjects": [ {"name": "Acceptance", "subtitle": "Learn about acceptance.", "description": "Find online courses in acceptance.", "banner_image_url": "https://www.edx.org/sites/default/files/acceptance-1440x210.jpg", "card_image_url": "https://www.edx.org/sites/default/files/subject/image/card/acceptance.jpg", "slug": "acceptance", "uuid": "00e5d5e0-ce45-4114-84a1-50a5be706da5"}, {"name": "Testing", "subtitle": "Learn about testing.", "description": "EdX offers a wide variety of free online testing courses.", "banner_image_url": "https://www.edx.org/sites/default/files/testing-1440x210.jpg", "card_image_url": "https://www.edx.org/sites/default/files/subject/image/card/testing.jpg", "slug": "testing", "uuid": "74b6ed2a-3ba0-49be-adc9-53f7256a12e1"}], "prerequisites": [], "prerequisites_raw": "None", "expected_learning_items": [], "video": null, "sponsors": [], "modified": "2017-11-30T20:44:59.285976Z", "marketing_url": "https://www.edx.org/course/all-about-acceptance-testing-edx-testing101", "syllabus_raw": "None", "outcome": "<ul><li>Why Acceptance Testing still matters, all through a focus on testing</li>\n\t<li>Ideas and techniques that will help you understand as well as enjoy acceptance testing</li>\n\t<li>How to understand and appreciate acceptance testing and its power</li>\n</ul>", "programs": [ { "uuid": "acb243a0-1234-5abe-099e-ffcae2a340d4", "title": "Testing", "type": "XSeries", "marketing_slug": "tests", "marketing_url": "https://example.com/xseries/testing" } ] }
{ "key": "edX+Testing102", "uuid": "acbdef12-1234-a5eb-90e9-fcfea234a0d4", "title": "All about acceptance testing Part 3!", "course_runs": [ { "course": "edX+Testing102", "key": "course-v1:edX+Testing102x+1T2017", "title": "All about acceptance testing Part 3!", "short_description": "Learn about the frameworks, tools and techniques you can apply to your acceptance testing today!", "full_description": "In theory, this would contain a long description.", "start": "2016-12-01T00:00:00Z", "end": "2017-02-01T00:00:00Z", "enrollment_start": "2016-09-01T00:00:00Z", "enrollment_end": "2016-12-01T00:00:00Z", "announcement": null, "image": { "src": "https://example.com/not-a-real-image.jpg", "description": null, "height": null, "width": null }, "video": null, "seats": [ { "type": "honor", "price": "0.00", "currency": "USD", "upgrade_deadline": "2017-02-01T00:00:00Z", "credit_provider": "edx", "credit_hours": 3 }, { "type": "verified", "price": "50.00", "currency": "USD", "upgrade_deadline": "2016-01-01T10:00:00Z", "credit_provider": "edx", "credit_hours": 4 } ], "content_language": "en-us", "transcript_languages": [], "instructors": [], "staff": [], "pacing_type": "instructor_paced", "min_effort": 2, "max_effort": 5, "modified": "2016-09-07T18:38:32.207371Z", "marketing_url": "https://www.edx.org/course/all-about-acceptance-testing-edx-testing-part-3-102-1t2017", "level_type": "Advanced", "availability": "Upcoming", "programs": [ { "uuid": "acb243a0-1234-5abe-099e-ffcae2a340d4", "title": "Testing", "type": "XSeries", "marketing_slug": "tests", "marketing_url": "https://example.com/xseries/testing" } ], "partner_short_code": "openedx", "announcement": "2016-12-03T00:00:00Z", "reporting_type": "demo" } ], "entitlements": [], "owners": [], "short_description": "Learn about the frameworks, tools and techniques you can apply to your acceptance testing today!", "full_description": "In theory, this would contain a long description.", "level_type": "Introductory", "subjects": [ {"name": "Acceptance Testing", "subtitle": "Learn about acceptance testing.", "description": "Find online courses in acceptance testing.", "banner_image_url": "https://www.edx.org/sites/default/files/acceptance-testing-1440x210.jpg", "card_image_url": "https://www.edx.org/sites/default/files/subject/image/card/acceptancetesting.jpg", "slug": "acceptance_testing", "uuid": "00e5d5e0-ce45-4114-84a1-50a5be706d5a"} ], "prerequisites": [], "prerequisites_raw": "None", "expected_learning_items": [], "video": null, "sponsors": [], "modified": "2017-11-30T20:44:59.285976Z", "marketing_url": "https://www.edx.org/course/all-about-acceptance-testing-edx-testing101", "syllabus_raw": "None", "outcome": "<ul><li>Why Acceptance Testing still matters, all through a focus on testing</li>\n\t<li>Ideas and techniques that will help you understand as well as enjoy acceptance testing</li>\n\t<li>How to understand and appreciate acceptance testing and its power</li>\n</ul>", "programs": [ { "uuid": "acb243a0-1234-5abe-099e-ffcae2a340d4", "title": "Testing", "type": "XSeries", "marketing_slug": "tests", "marketing_url": "https://example.com/xseries/testing" } ] }
row_number,course_id,date,subject_uri,subject_title,subject_language
1,edX/Open_DemoX/edx_demo_course,2016-09-08,/course/subject/acceptance,Acceptance,en
2,edX/Open_DemoX/edx_demo_course,2016-09-08,/course/subject/testing,Testing,en
3,course-v1:edX+Open_DemoX+edx_demo_course2,2016-09-08,/course/subject/acceptance,Acceptance,en
4,course-v1:edX+Open_DemoX+edx_demo_course2,2016-09-08,/course/subject/testing,Testing,en
5,course-v1:edX+Testing102x+1T2017,2016-09-08,/course/subject/acceptance_testing,Acceptance Testing,en
row_number,course_id,date,subject_uri,subject_title,subject_language
1,TestingX/Foo101,2015-06-29,/course/subject/foo,Foo,py
2,TestingX/Foo102,2015-06-29,/course/subject/foo,Foo,py
3,TestingX/Foo102,2015-06-29,/course/subject/bar,Bar,py
4,TestingX/Foo201,2015-06-29,/course/subject/foo,Foo,py
5,AcceptanceX/Foo101,2015-06-29,/course/subject/foo,Foo,py
6,AcceptanceX/Bar101,2015-06-29,/course/subject/bar,Bar,py
"""
End to end test of the course catalog tasks.
"""
import logging
import os
import datetime
import pandas
from edx.analytics.tasks.tests.acceptance import AcceptanceTestCase, when_vertica_available
from edx.analytics.tasks.util.url import url_path_join
log = logging.getLogger(__name__)
class BaseCourseCatalogAcceptanceTest(AcceptanceTestCase):
"""Base class for the end-to-end test of course catalog-based tasks."""
INPUT_FILE = 'catalog.json'
def setUp(self):
super(BaseCourseCatalogAcceptanceTest, self).setUp()
self.upload_data()
def upload_data(self):
"""Puts the test course catalog where the processing task would look for it, bypassing calling the actual API"""
src = os.path.join(self.data_dir, 'input', self.INPUT_FILE)
# IMPORTANT: this path should be of the same format as the path that DailyPullCatalogTask uses for output.
dst = url_path_join(self.warehouse_path, "course_catalog", "catalog", "dt=2015-06-29", self.INPUT_FILE)
# Upload mocked results of the API call
self.s3_client.put(src, dst)
class CourseSubjectsAcceptanceTest(BaseCourseCatalogAcceptanceTest):
"""End-to-end test of pulling the course subject data into Vertica."""
@when_vertica_available
def test_course_subjects(self):
"""Tests the workflow for the course subjects, end to end."""
self.task.launch([
'CourseCatalogWorkflow',
'--date', '2015-06-29'
])
self.validate_output()
def validate_output(self):
"""Validates the output, comparing it to a csv of all the expected output from this workflow."""
columns = ['row_number', 'course_id', 'date', 'subject_uri', 'subject_title', 'subject_language']
with self.vertica.cursor() as cursor:
expected_output_csv = os.path.join(self.data_dir, 'output', 'expected_subjects_for_acceptance.csv')
def convert_date(date_string):
"""Convert date string to a date object."""
return datetime.datetime.strptime(date_string, '%Y-%m-%d').date()
expected = pandas.read_csv(expected_output_csv, converters={'date': convert_date})
cursor.execute("SELECT * FROM {schema}.d_course_subjects;".format(schema=self.vertica.schema_name))
database_subjects = cursor.fetchall()
subjects = pandas.DataFrame(database_subjects, columns=columns)
for frame in (subjects, expected):
frame.sort(['row_number'], inplace=True, ascending=[True])
frame.reset_index(drop=True, inplace=True)
self.assert_data_frames_equal(subjects, expected)
...@@ -37,9 +37,9 @@ class EnrollmentAcceptanceTest(AcceptanceTestCase): ...@@ -37,9 +37,9 @@ class EnrollmentAcceptanceTest(AcceptanceTestCase):
self.execute_sql_fixture_file('load_auth_userprofile.sql') self.execute_sql_fixture_file('load_auth_userprofile.sql')
self.upload_file( self.upload_file(
os.path.join(self.data_dir, 'input', 'course_catalog.json'), os.path.join(self.data_dir, 'input', 'course_runs.json'),
url_path_join(self.warehouse_path, 'course_catalog_raw', 'dt={}'.format(self.CATALOG_DATE), url_path_join(self.warehouse_path, 'discovery_api_raw', 'dt={}'.format(self.CATALOG_DATE),
'course_catalog.json') 'course_runs.json')
) )
self.import_db.execute_sql_file( self.import_db.execute_sql_file(
os.path.join(self.data_dir, 'input', 'load_grades_persistentcoursegrade.sql') os.path.join(self.data_dir, 'input', 'load_grades_persistentcoursegrade.sql')
......
...@@ -36,9 +36,9 @@ class EnrollmentGradesAcceptanceTest(AcceptanceTestCase): ...@@ -36,9 +36,9 @@ class EnrollmentGradesAcceptanceTest(AcceptanceTestCase):
self.execute_sql_fixture_file('load_auth_userprofile.sql') self.execute_sql_fixture_file('load_auth_userprofile.sql')
self.upload_file( self.upload_file(
os.path.join(self.data_dir, 'input', 'course_catalog.json'), os.path.join(self.data_dir, 'input', 'course_runs.json'),
url_path_join(self.warehouse_path, 'course_catalog_raw', 'dt={}'.format(self.CATALOG_DATE), url_path_join(self.warehouse_path, 'discovery_api_raw', 'dt={}'.format(self.CATALOG_DATE),
'course_catalog.json') 'course_runs.json')
) )
self.import_db.execute_sql_file( self.import_db.execute_sql_file(
os.path.join(self.data_dir, 'input', 'load_grades_persistentcoursegrade.sql') os.path.join(self.data_dir, 'input', 'load_grades_persistentcoursegrade.sql')
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
End to end test of the internal reporting d_program_course table loading task. End to end test of the internal reporting d_program_course table loading task.
""" """
import datetime
import os import os
import logging import logging
import pandas import pandas
...@@ -20,12 +21,16 @@ class InternalReportingUserCourseLoadAcceptanceTest(AcceptanceTestCase): ...@@ -20,12 +21,16 @@ class InternalReportingUserCourseLoadAcceptanceTest(AcceptanceTestCase):
def setUp(self): def setUp(self):
super(InternalReportingUserCourseLoadAcceptanceTest, self).setUp() super(InternalReportingUserCourseLoadAcceptanceTest, self).setUp()
self.upload_file( self.upload_file(
os.path.join(self.data_dir, 'input', 'course_catalog.json'), os.path.join(self.data_dir, 'input', 'courses.json'),
url_path_join(self.warehouse_path, 'course_catalog_raw', 'dt=' + self.DATE, 'course_catalog.json') url_path_join(self.warehouse_path, 'discovery_api_raw', 'dt=' + self.DATE, 'courses.json')
)
self.upload_file(
os.path.join(self.data_dir, 'input', 'course_runs.json'),
url_path_join(self.warehouse_path, 'discovery_api_raw', 'dt=' + self.DATE, 'course_runs.json')
) )
self.upload_file( self.upload_file(
os.path.join(self.data_dir, 'input', 'programs.json'), os.path.join(self.data_dir, 'input', 'programs.json'),
url_path_join(self.warehouse_path, 'programs_raw', 'dt=' + self.DATE, 'programs.json') url_path_join(self.warehouse_path, 'discovery_api_raw', 'dt=' + self.DATE, 'programs.json')
) )
@when_vertica_available @when_vertica_available
...@@ -40,6 +45,7 @@ class InternalReportingUserCourseLoadAcceptanceTest(AcceptanceTestCase): ...@@ -40,6 +45,7 @@ class InternalReportingUserCourseLoadAcceptanceTest(AcceptanceTestCase):
self.validate_program_course() self.validate_program_course()
self.validate_course_seat() self.validate_course_seat()
self.validate_course() self.validate_course()
self.validate_course_subjects()
def validate_program_course(self): def validate_program_course(self):
"""Validates the output, comparing it to a csv of all the expected output from this workflow.""" """Validates the output, comparing it to a csv of all the expected output from this workflow."""
...@@ -107,3 +113,27 @@ class InternalReportingUserCourseLoadAcceptanceTest(AcceptanceTestCase): ...@@ -107,3 +113,27 @@ class InternalReportingUserCourseLoadAcceptanceTest(AcceptanceTestCase):
d_course = pandas.DataFrame(response, columns=columns) d_course = pandas.DataFrame(response, columns=columns)
self.assert_data_frames_equal(d_course, expected) self.assert_data_frames_equal(d_course, expected)
def validate_course_subjects(self):
"""Validates the output, comparing it to a csv of all the expected output from this workflow."""
with self.vertica.cursor() as cursor:
expected_output_csv = os.path.join(self.data_dir, 'output', 'acceptance_expected_d_course_subjects.csv')
def convert_date(date_string):
"""Convert date string to a date object."""
return datetime.datetime.strptime(date_string, '%Y-%m-%d').date()
expected = pandas.read_csv(expected_output_csv, converters={'date': convert_date})
columns = ['row_number', 'course_id', 'date', 'subject_uri', 'subject_title', 'subject_language']
cursor.execute("SELECT * FROM {schema}.d_course_subjects;".format(schema=self.vertica.schema_name))
response = cursor.fetchall()
subjects = pandas.DataFrame(response, columns=columns)
for frame in (subjects, expected):
frame.sort(['row_number'], inplace=True, ascending=[True])
frame.reset_index(drop=True, inplace=True)
self.assert_data_frames_equal(subjects, expected)
...@@ -21,12 +21,12 @@ class InternalReportingProgramCourseLoadAcceptanceTest(AcceptanceTestCase): # p ...@@ -21,12 +21,12 @@ class InternalReportingProgramCourseLoadAcceptanceTest(AcceptanceTestCase): # p
super(InternalReportingProgramCourseLoadAcceptanceTest, self).setUp() super(InternalReportingProgramCourseLoadAcceptanceTest, self).setUp()
input_location = os.path.join(self.data_dir, input_location = os.path.join(self.data_dir,
'input', 'input',
'course_catalog.json') 'course_runs.json')
output_location = url_path_join(self.warehouse_path, output_location = url_path_join(self.warehouse_path,
'course_catalog_raw', 'discovery_api_raw',
'dt=' + self.DATE, 'dt=' + self.DATE,
'course_catalog.json') 'course_runs.json')
# The furthest upstream dependency is PullCourseCatalogAPIData. # The furthest upstream dependency is PullDiscoveryCourseRunsAPIData.
# We fixture the expected output of that Task here. # We fixture the expected output of that Task here.
self.upload_file(input_location, output_location) self.upload_file(input_location, output_location)
......
"""Collect the course catalog from the course catalog API for processing of course metadata like subjects or types."""
import datetime
import json
import requests
import luigi
from edx.analytics.tasks.common.vertica_load import VerticaCopyTask, VerticaCopyTaskMixin
from edx.analytics.tasks.util.hive import WarehouseMixin, HivePartition
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
from edx.analytics.tasks.util.url import get_target_from_url, url_path_join
# Tell urllib3 to switch the ssl backend to PyOpenSSL.
# See https://urllib3.readthedocs.org/en/latest/security.html#pyopenssl.
import urllib3.contrib.pyopenssl
urllib3.contrib.pyopenssl.inject_into_urllib3()
# Since we block-encode the offending strings anyways, ignore complaints about unicode escapes in '\N' appearances.
# pylint: disable-msg=anomalous-unicode-escape-in-string
class PullCatalogMixin(OverwriteOutputMixin, WarehouseMixin):
"""Define common parameters for the course catalog API pull and downstream tasks."""
date = luigi.DateParameter(
default=datetime.datetime.utcnow().date(),
description='Default is today, UTC.',
)
catalog_path = luigi.Parameter(
config_path={'section': 'course-catalog', 'name': 'catalog_path'},
description='Base URL for the drupal catalog API, e.g. https://www.edx.org/api/catalog/v2/courses',
)
class DailyPullCatalogTask(PullCatalogMixin, luigi.Task):
"""
A task that reads the course catalog off the API and writes the result json blob to a file.
Pulls are made daily to keep a full historical record.
"""
def requires(self):
pass
def run(self):
self.remove_output_on_overwrite()
response = requests.get(self.catalog_path)
if response.status_code != requests.codes.ok: # pylint: disable=no-member
msg = "Encountered status {} on request to API for {}".format(response.status_code, self.date)
raise Exception(msg)
with self.output().open('w') as output_file:
output_file.write(response.content)
def output(self):
"""Output is in the form {warehouse_path}/course_catalog_api/catalog/dt={CCYY-MM-DD}/catalog.json"""
date_string = "dt=" + self.date.strftime('%Y-%m-%d') # pylint: disable=no-member
url_with_filename = url_path_join(self.warehouse_path, "course_catalog", "catalog", date_string,
"catalog.json")
return get_target_from_url(url_with_filename)
class DailyProcessFromCatalogSubjectTask(PullCatalogMixin, luigi.Task):
"""
A task that reads a local file generated from a daily catalog pull, and writes the course id and subject to a tsv.
The output file should be readable by Hive.
"""
def requires(self):
kwargs = {
'date': self.date,
'catalog_path': self.catalog_path,
'warehouse_path': self.warehouse_path,
'overwrite': self.overwrite,
}
return DailyPullCatalogTask(**kwargs)
def run(self):
# Read the catalog and select just the subjects data for output
self.remove_output_on_overwrite()
with self.input().open('r') as input_file:
# Since the course catalog is of fairly manageable size, we can read it all into memory at once.
# If this needs to change, we should be able to parse the catalog course by course.
catalog = json.loads(input_file.read()).get('items')
with self.output().open('w') as output_file:
if catalog is None:
return
for course in catalog:
# The course catalog occasionally is buggy and has malformed courses, so just skip those.
if not type(course) == dict:
continue
course_id = course.get('course_id')
if course_id is None:
continue
# This will be a list of dictionaries with keys 'title', 'uri', and 'language'.
# Encode in utf-8 as in general the subjects could be given in other languages.
subjects = course.get('subjects')
# It's possible no subjects are given for the course, in which case we record the lack of subjects.
if subjects is None or len(subjects) == 0:
line = [
course_id,
self.date.strftime('%Y-%m-%d'), # pylint: disable=no-member,
'\N',
'\N',
'\N'
]
output_file.write('\t'.join([v.encode('utf-8') for v in line]))
output_file.write('\n')
else:
for subject in subjects:
line = [
course_id,
self.date.strftime('%Y-%m-%d'), # pylint: disable=no-member,
subject.get('uri', '\N'),
subject.get('title', '\N'),
subject.get('language', '\N')
]
output_file.write('\t'.join([v.encode('utf-8') for v in line]))
output_file.write('\n')
def output(self):
"""
Output is set up so that it can be read as a Hive table with partitions,
The form is {warehouse_path}/course_catalog_api/subjects/dt={CCYY-mm-dd}/subjects.tsv.
"""
date_string = self.date.strftime('%Y-%m-%d') # pylint: disable=no-member
partition_path_spec = HivePartition('dt', date_string).path_spec
url_with_filename = url_path_join(self.warehouse_path, "course_catalog", "subjects",
partition_path_spec, "subjects.tsv")
return get_target_from_url(url_with_filename)
class DailyLoadSubjectsToVerticaTask(PullCatalogMixin, VerticaCopyTask):
"""Does the bulk loading of the subjects data into Vertica."""
@property
def insert_source_task(self):
# Note: don't pass overwrite down from here. Use it only for overwriting when copying to Vertica.
return DailyProcessFromCatalogSubjectTask(date=self.date, catalog_path=self.catalog_path)
@property
def table(self):
return "d_course_subjects"
@property
def auto_primary_key(self):
"""Overridden since the database schema specifies a different name for the auto incrementing primary key."""
return ('row_number', 'AUTO_INCREMENT')
@property
def default_columns(self):
"""Overridden since the superclass method includes a time of insertion column we don't want in this table."""
return None
@property
def columns(self):
return [
('course_id', 'VARCHAR(200)'),
('date', 'DATE'),
('subject_uri', 'VARCHAR(200)'),
('subject_title', 'VARCHAR(200)'),
('subject_language', 'VARCHAR(200)')
]
class CourseCatalogWorkflow(PullCatalogMixin, VerticaCopyTaskMixin, luigi.WrapperTask):
"""Upload the course catalog to the data warehouse."""
def requires(self):
# Add additional args for VerticaCopyMixin.
kwargs2 = {
'schema': self.schema,
'credentials': self.credentials,
'date': self.date,
'catalog_path': self.catalog_path,
'overwrite': self.overwrite,
}
kwargs2.update(kwargs2)
yield (
DailyLoadSubjectsToVerticaTask(**kwargs2),
)
...@@ -9,7 +9,6 @@ from edx.analytics.tasks.common.vertica_load import SchemaManagementTask ...@@ -9,7 +9,6 @@ from edx.analytics.tasks.common.vertica_load import SchemaManagementTask
from edx.analytics.tasks.util.hive import WarehouseMixin from edx.analytics.tasks.util.hive import WarehouseMixin
from edx.analytics.tasks.util.url import ExternalURL from edx.analytics.tasks.util.url import ExternalURL
from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget from edx.analytics.tasks.util.vertica_target import CredentialFileVerticaTarget
from edx.analytics.tasks.warehouse.course_catalog import DailyLoadSubjectsToVerticaTask
from edx.analytics.tasks.warehouse.load_internal_reporting_certificates import LoadInternalReportingCertificatesToWarehouse from edx.analytics.tasks.warehouse.load_internal_reporting_certificates import LoadInternalReportingCertificatesToWarehouse
from edx.analytics.tasks.warehouse.load_internal_reporting_country import LoadInternalReportingCountryToWarehouse from edx.analytics.tasks.warehouse.load_internal_reporting_country import LoadInternalReportingCountryToWarehouse
from edx.analytics.tasks.warehouse.load_internal_reporting_course_catalog import LoadInternalReportingCourseCatalogToWarehouse from edx.analytics.tasks.warehouse.load_internal_reporting_course_catalog import LoadInternalReportingCourseCatalogToWarehouse
...@@ -115,10 +114,6 @@ class LoadWarehouseTask(WarehouseWorkflowMixin, luigi.WrapperTask): ...@@ -115,10 +114,6 @@ class LoadWarehouseTask(WarehouseWorkflowMixin, luigi.WrapperTask):
n_reduce_tasks=self.n_reduce_tasks, n_reduce_tasks=self.n_reduce_tasks,
**kwargs **kwargs
), ),
DailyLoadSubjectsToVerticaTask(
date=self.date,
**kwargs
),
) )
......
...@@ -10,7 +10,7 @@ from edx.analytics.tasks.insights.enrollments import EnrollmentSummaryRecord ...@@ -10,7 +10,7 @@ from edx.analytics.tasks.insights.enrollments import EnrollmentSummaryRecord
from edx.analytics.tasks.util.hive import WarehouseMixin, HivePartition from edx.analytics.tasks.util.hive import WarehouseMixin, HivePartition
from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
from edx.analytics.tasks.util.url import ExternalURL, url_path_join from edx.analytics.tasks.util.url import ExternalURL, url_path_join
from edx.analytics.tasks.warehouse.load_internal_reporting_course_catalog import CourseRecord, ProgramCourseRecord, CourseSeatRecord from edx.analytics.tasks.warehouse.load_internal_reporting_course_catalog import CourseRecord, ProgramCourseRecord, CourseSeatRecord, CourseSubjectRecord
class LoadInternalReportingCertificatesToBigQuery(WarehouseMixin, BigQueryLoadTask): class LoadInternalReportingCertificatesToBigQuery(WarehouseMixin, BigQueryLoadTask):
...@@ -89,6 +89,22 @@ class LoadInternalReportingCourseSeatToBigQuery(WarehouseMixin, BigQueryLoadTask ...@@ -89,6 +89,22 @@ class LoadInternalReportingCourseSeatToBigQuery(WarehouseMixin, BigQueryLoadTask
return CourseSeatRecord.get_bigquery_schema() return CourseSeatRecord.get_bigquery_schema()
class LoadInternalReportingCourseSubjectToBigQuery(WarehouseMixin, BigQueryLoadTask):
@property
def insert_source_task(self):
url = url_path_join(self.hive_partition_path('course_subject', self.date), 'course_subject.tsv')
return ExternalURL(url=url)
@property
def table(self):
return 'd_course_subjects'
@property
def schema(self):
return CourseSubjectRecord.get_bigquery_schema()
class LoadInternalReportingProgramCourseToBigQuery(WarehouseMixin, BigQueryLoadTask): class LoadInternalReportingProgramCourseToBigQuery(WarehouseMixin, BigQueryLoadTask):
@property @property
...@@ -120,6 +136,7 @@ class LoadInternalReportingCourseCatalogToBigQuery(WarehouseMixin, BigQueryLoadD ...@@ -120,6 +136,7 @@ class LoadInternalReportingCourseCatalogToBigQuery(WarehouseMixin, BigQueryLoadD
} }
yield LoadInternalReportingCourseToBigQuery(**kwargs) yield LoadInternalReportingCourseToBigQuery(**kwargs)
yield LoadInternalReportingCourseSeatToBigQuery(**kwargs) yield LoadInternalReportingCourseSeatToBigQuery(**kwargs)
yield LoadInternalReportingCourseSubjectToBigQuery(**kwargs)
yield LoadInternalReportingProgramCourseToBigQuery(**kwargs) yield LoadInternalReportingProgramCourseToBigQuery(**kwargs)
def complete(self): def complete(self):
...@@ -214,31 +231,6 @@ class LoadInternalReportingUserToBigQuery(WarehouseMixin, BigQueryLoadTask): ...@@ -214,31 +231,6 @@ class LoadInternalReportingUserToBigQuery(WarehouseMixin, BigQueryLoadTask):
] ]
class DailyLoadSubjectsToBigQueryTask(WarehouseMixin, BigQueryLoadTask):
@property
def insert_source_task(self):
partition_path_spec = HivePartition('dt', self.date.isoformat()).path_spec
url_with_filename = url_path_join(self.warehouse_path, "course_catalog", "subjects",
partition_path_spec, "subjects.tsv")
return ExternalURL(url=url_with_filename)
@property
def table(self):
return "d_course_subjects"
@property
def schema(self):
# Defined in course_catalog, but not in a Record.
return [
bigquery.SchemaField('course_id', 'STRING'),
bigquery.SchemaField('date', 'DATE'),
bigquery.SchemaField('subject_uri', 'STRING'),
bigquery.SchemaField('subject_title', 'STRING'),
bigquery.SchemaField('subject_language', 'STRING'),
]
class LoadWarehouseBigQueryTask(BigQueryLoadDownstreamMixin, WarehouseMixin, luigi.WrapperTask): class LoadWarehouseBigQueryTask(BigQueryLoadDownstreamMixin, WarehouseMixin, luigi.WrapperTask):
date = luigi.DateParameter() date = luigi.DateParameter()
...@@ -265,8 +257,6 @@ class LoadWarehouseBigQueryTask(BigQueryLoadDownstreamMixin, WarehouseMixin, lui ...@@ -265,8 +257,6 @@ class LoadWarehouseBigQueryTask(BigQueryLoadDownstreamMixin, WarehouseMixin, lui
yield LoadInternalReportingUserToBigQuery(**kwargs) yield LoadInternalReportingUserToBigQuery(**kwargs)
yield DailyLoadSubjectsToBigQueryTask(**kwargs)
def complete(self): def complete(self):
# OverwriteOutputMixin changes the complete() method behavior, so we override it. # OverwriteOutputMixin changes the complete() method behavior, so we override it.
return all(r.complete() for r in luigi.task.flatten(self.requires())) return all(r.complete() for r in luigi.task.flatten(self.requires()))
...@@ -54,9 +54,8 @@ edx.analytics.tasks = ...@@ -54,9 +54,8 @@ edx.analytics.tasks =
student_engagement = edx.analytics.tasks.data_api.student_engagement:StudentEngagementTask student_engagement = edx.analytics.tasks.data_api.student_engagement:StudentEngagementTask
# warehouse: # warehouse:
catalog = edx.analytics.tasks.warehouse.course_catalog:CourseCatalogWorkflow
event-type-dist = edx.analytics.tasks.warehouse.event_type_dist:PushToVerticaEventTypeDistributionTask event-type-dist = edx.analytics.tasks.warehouse.event_type_dist:PushToVerticaEventTypeDistributionTask
load-course-catalog = edx.analytics.tasks.warehouse.load_internal_reporting_course_catalog:PullCourseCatalogAPIData load-course-catalog = edx.analytics.tasks.warehouse.load_internal_reporting_course_catalog:PullDiscoveryCoursesAPIData
load-d-certificates = edx.analytics.tasks.warehouse.load_internal_reporting_certificates:LoadInternalReportingCertificatesToWarehouse load-d-certificates = edx.analytics.tasks.warehouse.load_internal_reporting_certificates:LoadInternalReportingCertificatesToWarehouse
load-d-country = edx.analytics.tasks.warehouse.load_internal_reporting_country:LoadInternalReportingCountryToWarehouse load-d-country = edx.analytics.tasks.warehouse.load_internal_reporting_country:LoadInternalReportingCountryToWarehouse
load-d-user = edx.analytics.tasks.warehouse.load_internal_reporting_user:LoadInternalReportingUserToWarehouse load-d-user = edx.analytics.tasks.warehouse.load_internal_reporting_user:LoadInternalReportingUserToWarehouse
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment