Commit 9805ca2f by Hassan Javeed

Roster acceptance test.

parent f4b9b9f3
......@@ -117,3 +117,10 @@ cybersource_merchant_ids = test
[videos]
dropoff_threshold = 0.05
[elasticsearch]
connection_type = aws
[module-engagement]
alias = roster
number_of_shards = 5
......@@ -130,8 +130,9 @@ class ElasticsearchIndexTask(ElasticsearchIndexTaskMixin, MapReduceJobTask):
# Find all indexes that are referred to by this alias (currently). These will be deleted after a successful
# load of the new index.
aliases = elasticsearch_client.indices.get_aliases(name=self.alias)
self.indexes_for_alias.update(
elasticsearch_client.indices.get_aliases(name=self.alias).keys()
[index for index, alias_info in aliases.iteritems() if self.alias in alias_info['aliases'].keys()]
)
if self.index in self.indexes_for_alias:
......@@ -150,6 +151,10 @@ class ElasticsearchIndexTask(ElasticsearchIndexTaskMixin, MapReduceJobTask):
)
)
# In order for the OverwriteOutputMixin to recognize that this task has run we need to let it know. This will
# allow it to actually check if the task is complete after it is run.
self.attempted_removal = True
if elasticsearch_client.indices.exists(index=self.index):
elasticsearch_client.indices.delete(index=self.index)
......@@ -382,7 +387,8 @@ class ElasticsearchIndexTask(ElasticsearchIndexTaskMixin, MapReduceJobTask):
# Perform an atomic swap of the alias.
actions = []
for old_index in self.indexes_for_alias:
old_indexes = [ix for ix in self.indexes_for_alias if elasticsearch_client.indices.exists(index=ix)]
for old_index in old_indexes:
actions.append({"remove": {"index": old_index, "alias": self.alias}})
actions.append({"add": {"index": self.index, "alias": self.alias}})
elasticsearch_client.indices.update_aliases({"actions": actions})
......@@ -391,7 +397,7 @@ class ElasticsearchIndexTask(ElasticsearchIndexTaskMixin, MapReduceJobTask):
self.output().touch()
# Attempt to remove any old indexes that are now no longer user-visible.
for old_index in self.indexes_for_alias:
for old_index in old_indexes:
elasticsearch_client.indices.delete(index=old_index)
def rollback(self):
......
......@@ -1113,7 +1113,6 @@ class ModuleEngagementRosterPartitionTask(WeekIntervalMixin, ModuleEngagementDow
CourseEnrollmentTableTask(
interval_end=self.date,
n_reduce_tasks=self.n_reduce_tasks,
overwrite=self.overwrite,
),
ImportAuthUserTask(**kwargs_for_db_import),
ImportCourseUserGroupTask(**kwargs_for_db_import),
......@@ -1289,6 +1288,9 @@ class ModuleEngagementWorkflowTask(ModuleEngagementDownstreamMixin, ModuleEngage
' starting with the most recent date. A value of 0 indicates no days should be overwritten.'
)
# Don't use the OverwriteOutputMixin since it changes the behavior of complete() (which we don't want).
overwrite = luigi.BooleanParameter(default=False)
def requires(self):
overwrite_from_date = self.date - datetime.timedelta(days=self.overwrite_n_days)
yield ModuleEngagementRosterIndexTask(
......@@ -1298,6 +1300,7 @@ class ModuleEngagementWorkflowTask(ModuleEngagementDownstreamMixin, ModuleEngage
obfuscate=self.obfuscate,
n_reduce_tasks=self.n_reduce_tasks,
overwrite_from_date=overwrite_from_date,
overwrite=self.overwrite,
)
yield ModuleEngagementSummaryMetricRangesMysqlTask(
date=self.date,
......
......@@ -7,7 +7,7 @@ import os
import shutil
import unittest
from edx.analytics.tasks.tests.acceptance.services import fs, db, task, hive, vertica
from edx.analytics.tasks.tests.acceptance.services import fs, db, task, hive, vertica, elasticsearch
from edx.analytics.tasks.url import url_path_join, get_target_from_url
......@@ -68,6 +68,13 @@ def when_vertica_not_available(function):
)(function)
def when_elasticsearch_available(function):
config = get_test_config()
es_available = bool(config.get('elasticsearch_host'))
return unittest.skipIf(
not es_available, 'Elasticsearch service is not available'
)(function)
# Utility functions
......@@ -149,6 +156,7 @@ class AcceptanceTestCase(unittest.TestCase):
import_database_name = 'acceptance_import_' + database_name
export_database_name = 'acceptance_export_' + database_name
otto_database_name = 'acceptance_otto_' + database_name
elasticsearch_alias = 'alias_test_' + self.identifier
self.warehouse_path = url_path_join(self.test_root, 'warehouse')
task_config_override = {
'hive': {
......@@ -194,6 +202,13 @@ class AcceptanceTestCase(unittest.TestCase):
'credentials': self.config['vertica_creds_url'],
'schema': schema
}
if 'elasticsearch_host' in self.config:
task_config_override['elasticsearch'] = {
'host': self.config['elasticsearch_host']
}
task_config_override['module-engagement'] = {
'alias': elasticsearch_alias
}
if 'manifest_input_format' in self.config:
task_config_override['manifest']['input_format'] = self.config['manifest_input_format']
......@@ -207,6 +222,7 @@ class AcceptanceTestCase(unittest.TestCase):
self.task = task.TaskService(self.config, task_config_override, self.identifier)
self.hive = hive.HiveService(self.task, self.config, database_name)
self.vertica = vertica.VerticaService(self.config, schema)
self.elasticsearch = elasticsearch.ElasticsearchService(self.config, elasticsearch_alias)
if os.getenv('DISABLE_RESET_STATE', 'false').lower() != 'true':
self.reset_external_state()
......@@ -220,6 +236,7 @@ class AcceptanceTestCase(unittest.TestCase):
self.otto_db.reset()
self.hive.reset()
self.vertica.reset()
self.elasticsearch.reset()
def upload_tracking_log(self, input_file_name, file_date):
# Define a tracking log path on S3 that will be matched by the standard event-log pattern."
......
CREATE TABLE `course_groups_courseusergroup` (`id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL, `course_id` varchar(255) NOT NULL, `group_type` varchar(20) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `name` (`name`,`course_id`), KEY `course_groups_courseusergroup_ff48d8e5` (`course_id`)) ENGINE=InnoDB AUTO_INCREMENT=527 DEFAULT CHARSET=utf8;
INSERT INTO `course_groups_courseusergroup` VALUES (1,'Group 1','edX/DemoX/Demo_Course','cohort'),(2,'Group 2','edX/DemoX/Demo_Course_2','cohort'),(3,'Group 3','course-v1:edX+DemoX+Demo_Course_2015','cohort');
CREATE TABLE `course_groups_courseusergroup_users` (`id` int(11) NOT NULL AUTO_INCREMENT, `courseusergroup_id` int(11) NOT NULL, `user_id` int(11) NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `courseusergroup_id` (`courseusergroup_id`,`user_id`), KEY `course_groups_courseusergroup_users_caee1c64` (`courseusergroup_id`), KEY `course_groups_courseusergroup_users_fbfc09f1` (`user_id`), CONSTRAINT `courseusergroup_id_refs_id_d26180aa` FOREIGN KEY (`courseusergroup_id`) REFERENCES `course_groups_courseusergroup` (`id`), CONSTRAINT `user_id_refs_id_bf33b47a` FOREIGN KEY (`user_id`) REFERENCES `auth_user` (`id`)) ENGINE=InnoDB AUTO_INCREMENT=8517 DEFAULT CHARSET=utf8;
INSERT INTO `course_groups_courseusergroup_users` VALUES (1, 1, 1),(2, 2, 1),(3, 2, 4),(4, 3, 1);
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "edX/DemoX/Demo_Course_2", "org_id": "edX"}, "time": "2015-04-16T17:10:11.440312+00:00", "ip": "", "event": {"course_id": "edX/DemoX/Demo_Course_2", "user_id": 4, "mode": "honor"}, "agent": "", "page": null}
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "edX/DemoX/Demo_Course", "org_id": "edX"}, "time": "2015-04-13T17:10:11.440312+00:00", "ip": "", "event": {"course_id": "edX/DemoX/Demo_Course", "user_id": 1, "mode": "honor"}, "agent": "", "page": null}
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "course-v1:edX+DemoX+Demo_Course_2015", "org_id": "edX"}, "time": "2015-04-13T17:20:11.440312+00:00", "ip": "", "event": {"course_id": "course-v1:edX+DemoX+Demo_Course_2015", "user_id": 1, "mode": "honor"}, "agent": "", "page": null}
{"username": "anonymous", "host": "", "event_source": "server", "event_type": "edx.course.enrollment.activated", "context": {"course_id": "edX/DemoX/Demo_Course_2", "org_id": "edX"}, "time": "2015-04-13T17:20:11.440312+00:00", "ip": "", "event": {"course_id": "edX/DemoX/Demo_Course_2", "user_id": 1, "mode": "honor"}, "agent": "", "page": null}
{"username": "", "event_type": "/", "ip": "10.0.2.2", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36", "host": "precise64", "referer": "", "accept_language": "en-US,en;q=0.8", "event": "{\"POST\": {}, \"GET\": {}}", "event_source": "server", "context": {"user_id": "", "org_id": "", "course_id": "", "path": "/"}, "time": "2015-04-16T21:02:07.141165+00:00", "page": null}
{"username": "", "event_type": "/jsi18n/", "ip": "10.0.2.2", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36", "host": "precise64", "referer": "http://localhost:8000/", "accept_language": "en-US,en;q=0.8", "event": "{\"POST\": {}, \"GET\": {}}", "event_source": "server", "context": {"user_id": "", "org_id": "", "course_id": "", "path": "/jsi18n/"}, "time": "2015-04-16T21:02:18.958434+00:00", "page": null}
{"username": "", "event_type": "/jsi18n/", "ip": "10.0.2.2", "agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.89 Safari/537.36", "host": "precise64", "referer": "http://localhost:8000/login", "accept_language": "en-US,en;q=0.8", "event": "{\"POST\": {}, \"GET\": {}}", "event_source": "server", "context": {"user_id": "", "org_id": "", "course_id": "", "path": "/jsi18n/"}, "time": "2015-04-16T21:02:25.719195+00:00", "page": null}
......
try:
import elasticsearch
except ImportError:
elasticsearch = None
from edx.analytics.tasks.util.aws_elasticsearch_connection import AwsHttpConnection
class ElasticsearchService(object):
def __init__(self, config, alias):
if config.get('elasticsearch_connection_class') == 'aws':
connection_class = AwsHttpConnection
else:
connection_class = None
self._elasticsearch_client = elasticsearch.Elasticsearch(hosts=[config['elasticsearch_host']], connection_class=connection_class)
self._alias = alias
@property
def client(self):
return self._elasticsearch_client
@property
def alias(self):
return self._alias
def reset(self):
response = self._elasticsearch_client.indices.get_aliases(name=self._alias)
for index, alias_info in response.iteritems():
for alias in alias_info['aliases'].keys():
if alias == self._alias:
self._elasticsearch_client.indices.delete(index=index)
# Get documents from the marker index which have their target_index set to current alias.
# Note that there should be only 1 marker document per test run.
if self._elasticsearch_client.indices.exists(index='index_updates'):
query = {"query": {"match": {"target_index": self._alias}}}
response = self._elasticsearch_client.search(index='index_updates', body=query)
for doc in response['hits']['hits']:
self._elasticsearch_client.delete(index='index_updates', doc_type='marker', id=doc['_id'])
......@@ -401,6 +401,7 @@ class ElasticsearchIndexTaskCommitTest(BaseIndexTest, ReducerTestMixin, unittest
self.mock_es.mock_calls,
[
call.indices.refresh(index=self.task.index),
call.indices.exists(index='foo_alias_old'),
call.indices.update_aliases(
{
'actions': [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment