Commit 3a53da3d by Brian Wilson

Make changes necessary for Hive 2.1 upgrade.

* Backquote column names that use reserved keywords.

* Backquote table names as well.

* We use "interval" in the calendar table as the partition,
  so use date_interval for calendar partition instead.
  "Interval" is a reserved SQL word.

* Backquote `date` in explicit Hive queries.

* Change default Hive version from 0.11 to 1.0, in code and config files.
parent 8694135b
[hive]
release = apache
version = 0.11
version = 1.0
database = default
warehouse_path = hdfs://localhost:9000/edx-analytics-pipeline/warehouse/
......
......@@ -2,7 +2,7 @@
[hive]
release = apache
version = 0.11
version = 1.0
database = default
warehouse_path = s3://fake/warehouse/
......
......@@ -346,26 +346,26 @@ class JoinedStudentEngagementTableTask(StudentEngagementTableDownstreamMixin, Hi
# Join with calendar data only if calculating weekly engagement.
calendar_join = ""
if self.interval_type == "daily":
date_where = "ce.date >= '{start}' AND ce.date < '{end}'".format(
date_where = "ce.`date` >= '{start}' AND ce.`date` < '{end}'".format(
start=self.interval.date_a.isoformat(), # pylint: disable=no-member
end=self.interval.date_b.isoformat() # pylint: disable=no-member
)
elif self.interval_type == "weekly":
last_complete_date = self.interval.date_b - datetime.timedelta(days=1) # pylint: disable=no-member
iso_weekday = last_complete_date.isoweekday()
calendar_join = "INNER JOIN calendar cal ON (ce.date = cal.date) "
date_where = "ce.date >= '{start}' AND ce.date < '{end}' AND cal.iso_weekday = {iso_weekday}".format(
calendar_join = "INNER JOIN calendar cal ON (ce.`date` = cal.`date`) "
date_where = "ce.`date` >= '{start}' AND ce.`date` < '{end}' AND cal.iso_weekday = {iso_weekday}".format(
start=self.interval.date_a.isoformat(), # pylint: disable=no-member
end=self.interval.date_b.isoformat(), # pylint: disable=no-member
iso_weekday=iso_weekday,
)
elif self.interval_type == "all":
last_complete_date = self.interval.date_b - datetime.timedelta(days=1) # pylint: disable=no-member
date_where = "ce.date = '{last_complete_date}'".format(last_complete_date=last_complete_date.isoformat())
date_where = "ce.`date` = '{last_complete_date}'".format(last_complete_date=last_complete_date.isoformat())
return """
SELECT
ce.date,
ce.`date`,
ce.course_id,
au.username,
au.email,
......@@ -389,7 +389,7 @@ class JoinedStudentEngagementTableTask(StudentEngagementTableDownstreamMixin, Hi
INNER JOIN auth_user au
ON (ce.user_id = au.id)
LEFT OUTER JOIN student_engagement_raw_{interval_type} ser
ON (au.username = ser.username AND ce.date = ser.end_date and ce.course_id = ser.course_id)
ON (au.username = ser.username AND ce.`date` = ser.end_date and ce.course_id = ser.course_id)
LEFT OUTER JOIN (
SELECT
cugu.user_id,
......
......@@ -89,7 +89,7 @@ class CalendarTableTask(CalendarDownstreamMixin, HiveTableTask):
@property
def partition(self):
return HivePartition('interval', str(self.interval))
return HivePartition('date_interval', str(self.interval))
def requires(self):
return CalendarTask(
......
......@@ -50,20 +50,20 @@ class ImportIntoHiveTableTask(OverwriteOutputMixin, HiveQueryTask):
# information.
query_format = textwrap.dedent("""
USE {database_name};
DROP TABLE IF EXISTS {table_name};
CREATE EXTERNAL TABLE {table_name} (
DROP TABLE IF EXISTS `{table_name}`;
CREATE EXTERNAL TABLE `{table_name}` (
{col_spec}
)
PARTITIONED BY (dt STRING)
{table_format}
LOCATION '{location}';
ALTER TABLE {table_name} ADD PARTITION (dt = '{partition_date}');
ALTER TABLE `{table_name}` ADD PARTITION (dt = '{partition_date}');
""")
query = query_format.format(
database_name=hive_database_name(),
table_name=self.table_name,
col_spec=','.join([' '.join(c) for c in self.columns]),
col_spec=','.join(['`{}` {}'.format(name, col_type) for name, col_type in self.columns]),
location=self.table_location,
table_format=self.table_format,
partition_date=self.partition_date,
......
......@@ -728,7 +728,7 @@ class EnrollmentByGenderTask(EnrollmentTask):
def query(self):
return """
SELECT
ce.date,
ce.`date`,
ce.course_id,
IF(p.gender != '', p.gender, NULL),
SUM(ce.at_end),
......@@ -736,7 +736,7 @@ class EnrollmentByGenderTask(EnrollmentTask):
FROM course_enrollment ce
LEFT OUTER JOIN auth_userprofile p ON p.user_id = ce.user_id
GROUP BY
ce.date,
ce.`date`,
ce.course_id,
IF(p.gender != '', p.gender, NULL)
"""
......@@ -763,16 +763,16 @@ class EnrollmentByBirthYearTask(EnrollmentTask):
def query(self):
query = """
SELECT
ce.date,
ce.`date`,
ce.course_id,
p.year_of_birth,
SUM(ce.at_end),
COUNT(ce.user_id)
FROM course_enrollment ce
LEFT OUTER JOIN auth_userprofile p ON p.user_id = ce.user_id
WHERE ce.date = '{date}'
WHERE ce.`date` = '{date}'
GROUP BY
ce.date,
ce.`date`,
ce.course_id,
p.year_of_birth
""".format(date=self.query_date)
......@@ -800,7 +800,7 @@ class EnrollmentByEducationLevelTask(EnrollmentTask):
def query(self):
query = """
SELECT
ce.date,
ce.`date`,
ce.course_id,
CASE p.level_of_education
WHEN 'el' THEN 'primary'
......@@ -820,9 +820,9 @@ class EnrollmentByEducationLevelTask(EnrollmentTask):
COUNT(ce.user_id)
FROM course_enrollment ce
LEFT OUTER JOIN auth_userprofile p ON p.user_id = ce.user_id
WHERE ce.date = '{date}'
WHERE ce.`date` = '{date}'
GROUP BY
ce.date,
ce.`date`,
ce.course_id,
CASE p.level_of_education
WHEN 'el' THEN 'primary'
......@@ -863,14 +863,14 @@ class EnrollmentByModeTask(EnrollmentTask):
def query(self):
query = """
SELECT
ce.date,
ce.`date`,
ce.course_id,
ce.mode,
SUM(ce.at_end),
COUNT(ce.user_id)
FROM course_enrollment ce
GROUP BY
ce.date,
ce.`date`,
ce.course_id,
ce.mode
""".format(date=self.query_date)
......@@ -899,13 +899,13 @@ class EnrollmentDailyTask(EnrollmentTask):
query = """
SELECT
ce.course_id,
ce.date,
ce.`date`,
SUM(ce.at_end),
COUNT(ce.user_id)
FROM course_enrollment ce
GROUP BY
ce.course_id,
ce.date
ce.`date`
""".format(date=self.query_date)
return query
......@@ -981,13 +981,13 @@ class ImportCourseSummaryEnrollmentsIntoMysql(CourseSummaryEnrollmentDownstreamM
LEFT OUTER JOIN course_enrollment_mode_daily enrollment_start
ON enrollment_start.course_id = enrollment_end.course_id
AND enrollment_start.mode = enrollment_end.mode
AND enrollment_start.date = '{start_date}'
AND enrollment_start.`date` = '{start_date}'
LEFT OUTER JOIN course_catalog course
ON course.course_id = enrollment_end.course_id
LEFT OUTER JOIN course_grade_by_mode
ON enrollment_end.course_id = course_grade_by_mode.course_id
AND enrollment_end.mode = course_grade_by_mode.mode
WHERE enrollment_end.date = '{end_date}'
WHERE enrollment_end.`date` = '{end_date}'
""".format(
start_date=start_date.isoformat(),
end_date=end_date.isoformat(),
......@@ -1240,19 +1240,19 @@ class CourseGradeByModeDataTask(CourseSummaryEnrollmentDownstreamMixin, HiveQuer
LEFT OUTER JOIN (
SELECT ce.course_id,
ce.user_id,
MAX(ce.date) AS enrollment_date,
MAX(ce.`date`) AS enrollment_date,
MAX(grades.passed_timestamp) AS passed_timestamp
FROM course_enrollment ce
INNER JOIN grades_persistentcoursegrade grades
ON grades.course_id = ce.course_id
AND grades.user_id = ce.user_id
WHERE ce.date <= to_date(grades.modified)
WHERE ce.`date` <= to_date(grades.modified)
GROUP BY ce.course_id,
ce.user_id
) closest_enrollment
ON all_enrollments.course_id = closest_enrollment.course_id
AND all_enrollments.user_id = closest_enrollment.user_id
AND all_enrollments.date = closest_enrollment.enrollment_date
AND all_enrollments.`date` = closest_enrollment.enrollment_date
GROUP BY all_enrollments.course_id,
all_enrollments.mode
"""
......
......@@ -497,7 +497,7 @@ class QueryLastCountryPerCourseTask(
USE {database_name};
DROP TABLE IF EXISTS {table_name};
CREATE EXTERNAL TABLE {table_name} (
date STRING,
`date` STRING,
course_id STRING,
country_code STRING,
count INT,
......
......@@ -1142,10 +1142,10 @@ class ModuleEngagementRosterPartitionTask(WeekIntervalMixin, ModuleEngagementDow
SELECT
course_id,
user_id,
MIN(date) AS first_enrollment_date
MIN(`date`) AS first_enrollment_date
FROM course_enrollment
WHERE
at_end = 1 AND date < '{end}'
at_end = 1 AND `date` < '{end}'
GROUP BY course_id, user_id
) lce
ON (ce.course_id = lce.course_id AND ce.user_id = lce.user_id)
......@@ -1160,7 +1160,7 @@ class ModuleEngagementRosterPartitionTask(WeekIntervalMixin, ModuleEngagementDow
) seg
ON (ce.course_id = seg.course_id AND au.username = seg.username)
WHERE
ce.date = '{last_complete_date}'
ce.`date` = '{last_complete_date}'
""".format(
start=self.interval.date_a.isoformat(), # pylint: disable=no-member
end=self.interval.date_b.isoformat(), # pylint: disable=no-member
......
......@@ -30,14 +30,14 @@ class ImportStudentCourseEnrollmentTestCase(TestCase):
expected_query = textwrap.dedent(
"""
USE default;
DROP TABLE IF EXISTS student_courseenrollment;
CREATE EXTERNAL TABLE student_courseenrollment (
id INT,user_id INT,course_id STRING,created TIMESTAMP,is_active BOOLEAN,mode STRING
DROP TABLE IF EXISTS `student_courseenrollment`;
CREATE EXTERNAL TABLE `student_courseenrollment` (
`id` INT,`user_id` INT,`course_id` STRING,`created` TIMESTAMP,`is_active` BOOLEAN,`mode` STRING
)
PARTITIONED BY (dt STRING)
LOCATION 's3://foo/bar/student_courseenrollment';
ALTER TABLE student_courseenrollment ADD PARTITION (dt = '2014-07-01');
ALTER TABLE `student_courseenrollment` ADD PARTITION (dt = '2014-07-01');
"""
)
self.assertEquals(query, expected_query)
......@@ -75,14 +75,14 @@ class ImportPersistentCourseGradeTestCase(TestCase):
expected_query = textwrap.dedent(
"""
USE default;
DROP TABLE IF EXISTS grades_persistentcoursegrade;
CREATE EXTERNAL TABLE grades_persistentcoursegrade (
id INT,user_id INT,course_id STRING,course_edited_timestamp TIMESTAMP,course_version STRING,grading_policy_hash STRING,percent_grade DECIMAL(10,2),letter_grade STRING,passed_timestamp TIMESTAMP,created TIMESTAMP,modified TIMESTAMP
DROP TABLE IF EXISTS `grades_persistentcoursegrade`;
CREATE EXTERNAL TABLE `grades_persistentcoursegrade` (
`id` INT,`user_id` INT,`course_id` STRING,`course_edited_timestamp` TIMESTAMP,`course_version` STRING,`grading_policy_hash` STRING,`percent_grade` DECIMAL(10,2),`letter_grade` STRING,`passed_timestamp` TIMESTAMP,`created` TIMESTAMP,`modified` TIMESTAMP
)
PARTITIONED BY (dt STRING)
LOCATION 's3://foo/bar/grades_persistentcoursegrade';
ALTER TABLE grades_persistentcoursegrade ADD PARTITION (dt = '2014-07-01');
ALTER TABLE `grades_persistentcoursegrade` ADD PARTITION (dt = '2014-07-01');
"""
)
self.assertEquals(query, expected_query)
......@@ -251,7 +251,7 @@ class LastCountryOfUserPartitionTestCase(TestCase):
"""
USE default;
ALTER TABLE last_country_of_user ADD IF NOT EXISTS PARTITION (dt='2014-01-01');
ALTER TABLE `last_country_of_user` ADD IF NOT EXISTS PARTITION (`dt`='2014-01-01');
"""
)
self.assertEquals(query, expected_query)
......@@ -285,7 +285,7 @@ class QueryLastCountryPerCourseTaskTestCase(TestCase):
USE default;
DROP TABLE IF EXISTS course_enrollment_location_current;
CREATE EXTERNAL TABLE course_enrollment_location_current (
date STRING,
`date` STRING,
course_id STRING,
country_code STRING,
count INT,
......
......@@ -230,8 +230,8 @@ class CourseActivityWeeklyTask(WeeklyIntervalMixin, CourseActivityTask):
act.category as label,
COUNT(DISTINCT username) as count
FROM user_activity_daily act
JOIN calendar cal ON act.date = cal.date
WHERE "{interval_start}" <= cal.date AND cal.date < "{interval_end}"
JOIN calendar cal ON act.`date` = cal.`date`
WHERE "{interval_start}" <= cal.`date` AND cal.`date` < "{interval_end}"
GROUP BY
act.course_id,
cal.iso_week_start,
......@@ -268,15 +268,15 @@ class CourseActivityDailyTask(CourseActivityTask):
def activity_query(self):
return """
SELECT
act.date,
act.`date`,
act.course_id as course_id,
act.category as label,
COUNT(DISTINCT username) as count
FROM user_activity_daily act
WHERE "{interval_start}" <= act.date AND act.date < "{interval_end}"
WHERE "{interval_start}" <= act.`date` AND act.`date` < "{interval_end}"
GROUP BY
act.course_id,
act.date,
act.`date`,
act.category;
"""
......@@ -293,7 +293,7 @@ class CourseActivityDailyTask(CourseActivityTask):
def indexes(self):
return [
('course_id', 'label'),
('date',)
('`date`',)
]
......@@ -352,8 +352,8 @@ class CourseActivityMonthlyTask(CourseActivityTask):
act.category as label,
COUNT(DISTINCT username) as count
FROM user_activity_daily act
JOIN calendar cal ON act.date = cal.date
WHERE "{interval_start}" <= cal.date AND cal.date < "{interval_end}"
JOIN calendar cal ON act.`date` = cal.`date`
WHERE "{interval_start}" <= cal.`date` AND cal.`date` < "{interval_end}"
GROUP BY
act.course_id,
cal.year,
......
......@@ -23,12 +23,12 @@ def hive_database_name():
def hive_version():
"""
Returns the version of Hive that is declared in the configuration file. Defaults to 0.11 if it's not specified.
Returns the version of Hive that is declared in the configuration file. Defaults to 1.0 if it's not specified.
Returns: A tuple with each index representing a part of the version. For example: version="0.11.0.1" would return
(0, 11, 0, 1). The 0 indexed integer is the most significant part of the version number.
"""
version_str = luigi.configuration.get_config().get('hive', 'version', '0.11')
version_str = luigi.configuration.get_config().get('hive', 'version', '1.0')
return tuple([int(x) for x in version_str.split('.')])
......@@ -105,20 +105,20 @@ class HiveTableTask(WarehouseMixin, OverwriteOutputMixin, HiveQueryTask):
# Ensure there is exactly one available partition in the table.
query_format = """
USE {database_name};
DROP TABLE IF EXISTS {table};
CREATE EXTERNAL TABLE {table} (
DROP TABLE IF EXISTS `{table}`;
CREATE EXTERNAL TABLE `{table}` (
{col_spec}
)
PARTITIONED BY ({partition.key} STRING)
PARTITIONED BY (`{partition.key}` STRING)
{table_format}
LOCATION '{location}';
ALTER TABLE {table} ADD PARTITION ({partition.query_spec});
ALTER TABLE `{table}` ADD PARTITION ({partition.query_spec});
"""
query = query_format.format(
database_name=hive_database_name(),
table=self.table,
col_spec=','.join([' '.join(c) for c in self.columns]),
col_spec=','.join(['`{}` {}'.format(name, col_type) for name, col_type in self.columns]),
location=self.table_location,
table_format=self.table_format,
partition=self.partition,
......@@ -191,17 +191,17 @@ class BareHiveTableTask(WarehouseMixin, OverwriteOutputMixin, HiveQueryTask):
def query(self):
partition_clause = ''
if self.partition_by:
partition_clause = 'PARTITIONED BY ({partition_by} STRING)'.format(partition_by=self.partition_by)
partition_clause = 'PARTITIONED BY (`{partition_by}` STRING)'.format(partition_by=self.partition_by)
if self.overwrite:
drop_on_overwrite = 'DROP TABLE IF EXISTS {table};'.format(table=self.table)
drop_on_overwrite = 'DROP TABLE IF EXISTS `{table}`;'.format(table=self.table)
else:
drop_on_overwrite = ''
query_format = """
USE {database_name};
{drop_on_overwrite}
CREATE EXTERNAL TABLE IF NOT EXISTS {table} (
CREATE EXTERNAL TABLE IF NOT EXISTS `{table}` (
{col_spec}
)
{partition_clause}
......@@ -212,7 +212,7 @@ class BareHiveTableTask(WarehouseMixin, OverwriteOutputMixin, HiveQueryTask):
query = query_format.format(
database_name=hive_database_name(),
table=self.table,
col_spec=','.join([' '.join(c) for c in self.columns]),
col_spec=','.join(['`{}` {}'.format(name, col_type) for name, col_type in self.columns]),
location=self.table_location,
table_format=self.table_format,
partition_clause=partition_clause,
......@@ -281,7 +281,7 @@ class HivePartitionTask(WarehouseMixin, OverwriteOutputMixin, HiveQueryTask):
def query(self):
if self.overwrite:
drop_on_overwrite = 'ALTER TABLE {table} DROP IF EXISTS PARTITION ({partition.query_spec});'.format(
drop_on_overwrite = 'ALTER TABLE `{table}` DROP IF EXISTS PARTITION ({partition.query_spec});'.format(
table=self.hive_table_task.table,
partition=self.partition
)
......@@ -291,7 +291,7 @@ class HivePartitionTask(WarehouseMixin, OverwriteOutputMixin, HiveQueryTask):
query_format = """
USE {database_name};
{drop_on_overwrite}
ALTER TABLE {table} ADD IF NOT EXISTS PARTITION ({partition.query_spec});
ALTER TABLE `{table}` ADD IF NOT EXISTS PARTITION ({partition.query_spec});
"""
query = query_format.format(
......@@ -371,7 +371,7 @@ class HivePartition(object):
@property
def query_spec(self):
"""This format is used when a partition needs to be referred to in a query"""
return "{key}='{value}'".format(
return "`{key}`='{value}'".format(
key=self.key,
value=self.value,
)
......@@ -402,7 +402,7 @@ class HiveTableFromQueryTask(HiveTableTask): # pylint: disable=abstract-method
def query(self):
create_table_statements = super(HiveTableFromQueryTask, self).query()
full_insert_query = """
INSERT INTO TABLE {table}
INSERT INTO TABLE `{table}`
PARTITION ({partition.query_spec})
{insert_query}
""".format(
......
......@@ -18,7 +18,7 @@ class HivePartitionTest(TestCase):
self.assertEquals(self.partition.as_dict(), {'dt': '2014-01-01'})
def test_query_spec(self):
self.assertEquals(self.partition.query_spec, "dt='2014-01-01'")
self.assertEquals(self.partition.query_spec, "`dt`='2014-01-01'")
def test_path_spec(self):
self.assertEquals(self.partition.path_spec, "dt=2014-01-01")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment