Commit 12bcfac5 by Brian Wilson

Try moving cluster memmbership output to the end.

See if having it occur after the repartition
of the results makes any difference in the slowness that
seems to have been introduced by its addition.
parent 35ca6238
...@@ -8,7 +8,6 @@ from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url ...@@ -8,7 +8,6 @@ from edx.analytics.tasks.util.url import ExternalURL, get_target_from_url
# from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin # from edx.analytics.tasks.util.overwrite import OverwriteOutputMixin
class ClusterCoursesByTextTask(BasicSparkJobTask): class ClusterCoursesByTextTask(BasicSparkJobTask):
""" """
Clusters courses according to the text provided from the course content. Clusters courses according to the text provided from the course content.
...@@ -22,9 +21,6 @@ class ClusterCoursesByTextTask(BasicSparkJobTask): ...@@ -22,9 +21,6 @@ class ClusterCoursesByTextTask(BasicSparkJobTask):
num_top_words = luigi.IntParameter(default=25) num_top_words = luigi.IntParameter(default=25)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
"""
Call path selection task to get list of log files matching the pattern
"""
super(ClusterCoursesByTextTask, self).__init__(*args, **kwargs) super(ClusterCoursesByTextTask, self).__init__(*args, **kwargs)
if self.output_path == None: if self.output_path == None:
self.output_path = "{}.{}_clusters".format(self.input_path, self.num_clusters) self.output_path = "{}.{}_clusters".format(self.input_path, self.num_clusters)
...@@ -103,18 +99,6 @@ class ClusterCoursesByTextTask(BasicSparkJobTask): ...@@ -103,18 +99,6 @@ class ClusterCoursesByTextTask(BasicSparkJobTask):
output_df = km_model.transform(idf_df) output_df = km_model.transform(idf_df)
result = output_df.select('cluster', 'course_id', 'count').orderBy(['cluster', 'course_id'], ascending=[1,1]) result = output_df.select('cluster', 'course_id', 'count').orderBy(['cluster', 'course_id'], ascending=[1,1])
print_fcn("")
print_fcn("Output cluster memberships...")
for index, center in enumerate(km_centers):
center_ordered = center.argsort()
center_revordered = center_ordered[::-1]
top_indices = center_revordered[:self.num_top_words]
top_vocab = [cv_model.vocabulary[idx] for idx in top_indices]
vocab_string = ', '.join(top_vocab)
print_fcn("Centroid %d: %s " % (index, vocab_string))
members_df = result.filter(result['cluster'] == index).select('course_id')
members_df.show(truncate=False, n=members_df.count())
# persist the output. # persist the output.
# output_path = self.output_dir().path # output_path = self.output_dir().path
output_path = self.output_path output_path = self.output_path
...@@ -126,3 +110,15 @@ class ClusterCoursesByTextTask(BasicSparkJobTask): ...@@ -126,3 +110,15 @@ class ClusterCoursesByTextTask(BasicSparkJobTask):
# But we lose the sorting. # But we lose the sorting.
# result.coalesce(1).write.csv(output_path, sep='\t') # result.coalesce(1).write.csv(output_path, sep='\t')
result.repartition(1).write.csv(output_path, sep='\t') result.repartition(1).write.csv(output_path, sep='\t')
print_fcn("")
print_fcn("Output cluster memberships...")
for index, center in enumerate(km_centers):
center_ordered = center.argsort()
center_revordered = center_ordered[::-1]
top_indices = center_revordered[:self.num_top_words]
top_vocab = [cv_model.vocabulary[idx] for idx in top_indices]
vocab_string = ', '.join(top_vocab)
print_fcn("Centroid %d: %s " % (index, vocab_string))
members_df = result.filter(result['cluster'] == index).select('course_id')
members_df.show(truncate=False, n=members_df.count())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment