From cce452e2e799079b2e29ee1395264bb37ab2f53b Mon Sep 17 00:00:00 2001 From: Victor Shnayder <victor@mitx.mit.edu> Date: Mon, 27 Aug 2012 16:16:38 -0400 Subject: [PATCH] Add monitoring of course dirs, and reload on change * if the file data_dir/{course_dir}.reload is created or changed, will reload on next access to course descriptor * background thread polls every second * Relies on there being only 1 modulestore (per course dir). --- common/lib/xmodule/xmodule/modulestore/monitor.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ common/lib/xmodule/xmodule/modulestore/xml.py | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 common/lib/xmodule/xmodule/modulestore/monitor.py diff --git a/common/lib/xmodule/xmodule/modulestore/monitor.py b/common/lib/xmodule/xmodule/modulestore/monitor.py new file mode 100644 index 0000000..3f50e6e --- /dev/null +++ b/common/lib/xmodule/xmodule/modulestore/monitor.py @@ -0,0 +1,132 @@ +""" +A module to monitor certain files and have the modulestore reload courses. + +Modified from + http://stevelosh.com/blog/2011/06/django-advice/#watching-for-changes + +NOTE: This code runs in a multi-process, multi-thread setting. If you don't +understand threads or the gunicorn worker process environment, don't mess with +it :) + +Basic structure: + +- for each path being monitoring, maintain a threading.Event object that is signalled when the file is changed. +- the client interface is the following: + +1) Call watch(path) -> returns an Event object e. +2) call e.is_set() to tell whether the file has been modified. +3) call e.clear() to clear the state to start looking for the next modification. +4) go back to step 2 to look for the next modification. +""" + +import os +import sys +import time +import threading +import atexit +import Queue + +_interval = 1.0 +_times = {} # path -> modification time +_events = {} # path -> event for that path + +_queue = Queue.Queue() # used only for thread-local sleeping +_running = False +_lock = threading.Lock() # protects _running and writes to the _events dictionary + +def _restart(path): + _queue.put(True) + prefix = 'monitor (pid=%d):' % os.getpid() + print >> sys.stderr, '%s Change detected to \'%s\'.' % (prefix, path) + +def _modified(path): + """ + Check whether path has been modified. If it has been, save the latest + modification time in _times. + """ + + # Cases: + # - file has disappeared or changed type + # - file has appeared + # - file has been modified + + try: + if not os.path.isfile(path): + # was the file there before? + if _times[path] > 0: + _times[path] = 0 # now it's not here again + return True + # otherwise, nothing has changed + return False + + # When was the file last modified? + mtime = os.stat(path).st_mtime + # has it changed? + if mtime != _times[path]: + _times[path] = mtime + return True + except: + # If any exception occured, likely that file has been been removed just + # before stat(), so say that it's been changed. + return True + + return False + +def _monitor(): + while True: + # Check modification times on files which have + # specifically been registered for monitoring. + #print >> sys.stderr, "Watching %s" % (_events.keys()) + for path, event in _events.items(): + if _modified(path): + # print >> sys.stderr, "%s modified" % (path) + event.set() + + # Sleep for specified interval. + try: + return _queue.get(timeout=_interval) + except: + pass + +_thread = threading.Thread(target=_monitor) +_thread.setDaemon(True) + +def _exiting(): + try: + _queue.put(True) + except: + pass + _thread.join() + +atexit.register(_exiting) + +def watch(path): + """ + Register path for watching. Returns a threading.Event object that will be + set after the file changes. The caller is responsible for calling clear() on the event + to find out about subsequent modifications. + """ + _lock.acquire() + + if not path in _events: + _events[path] = threading.Event() + _times[path] = 0 # give it a dummy modification time to make logic earlier + + _lock.release() + return _events[path] + + + +def start(interval=1.0): + global _interval + if interval < _interval: + _interval = interval + + global _running + _lock.acquire() + if not _running: + prefix = 'monitor (pid=%d):' % os.getpid() + print >> sys.stderr, '%s Starting change monitor.' % prefix + _running = True + _thread.start() + _lock.release() diff --git a/common/lib/xmodule/xmodule/modulestore/xml.py b/common/lib/xmodule/xmodule/modulestore/xml.py index d1bfa93..8409aea 100644 --- a/common/lib/xmodule/xmodule/modulestore/xml.py +++ b/common/lib/xmodule/xmodule/modulestore/xml.py @@ -17,7 +17,8 @@ from xmodule.course_module import CourseDescriptor from xmodule.mako_module import MakoDescriptorSystem from xmodule.x_module import XModuleDescriptor, XMLParsingSystem -from . import ModuleStoreBase, Location +from . import ModuleStoreBase, Location, monitor + from .exceptions import ItemNotFoundError edx_xml_parser = etree.XMLParser(dtd_validation=False, load_dtd=False, @@ -27,6 +28,26 @@ etree.set_default_parser(edx_xml_parser) log = logging.getLogger('mitx.' + __name__) +# Need to start the monitoring thread somewhere. On import of this module seems +# like the best available place. + + +def start_monitor(): + """ + Fire up the monitoring thread. + + Architectural note: there is only one monitoring thread, and so the + monitoring will only work properly if there is a single modulestore per + course per process. + """ + log.info("Starting change monitor.") + monitor.start(interval=1.0) + +start_monitor() + + + + # VS[compat] # TODO (cpennington): Remove this once all fall 2012 courses have been imported # into the cms from xml @@ -249,9 +270,40 @@ class XMLModuleStore(ModuleStoreBase): course_dirs = [d for d in os.listdir(self.data_dir) if os.path.exists(self.data_dir / d / "course.xml")] + self.course_dir_by_id = dict() # course_id -> course_dir + self.monitoring_event = dict() # course_id -> threading.Event monitoring this path. + for course_dir in course_dirs: self.try_load_course(course_dir) + def watch_file(self, course_dir): + """Return path to watch for modification to trigger course reloads.""" + return self.data_dir / course_dir + ".reload" + + def reload_if_needed(self, course_id): + """ + Integration point with thread monitoring for content changes. Returns + true if the content needs reloading. + """ + # Does it need reloading? + course_dir = self.course_dir_by_id.get(course_id) + if not course_dir: + # courses we don't know about don't need reloading + return + + if self.monitoring_event[course_id].is_set(): + # reset monitoring and reload + self.monitoring_event[course_id].clear() + self.try_load_course(course_dir) + + def register_course_dir(self, course_id, course_dir): + """ + Register a mapping between course_id and course_dir, and ask the monitor + module to watch the watch file for that dir. + """ + self.course_dir_by_id[course_id] = course_dir + self.monitoring_event[course_id] = monitor.watch(self.watch_file(course_dir)) + def try_load_course(self, course_dir): ''' Load a course, keeping track of errors as we go along. @@ -390,6 +442,10 @@ class XMLModuleStore(ModuleStoreBase): course_id = CourseDescriptor.make_id(org, course, url_name) + + # Save the course dir so we can look it up by id later (to reload the course) + self.register_course_dir(course_id, course_dir) + system = ImportSystem(self, course_id, course_dir, policy, tracker, self.parent_tracker) course_descriptor = system.process_xml(etree.tostring(course_data)) @@ -429,6 +485,8 @@ class XMLModuleStore(ModuleStoreBase): location: Something that can be passed to Location """ location = Location(location) + if location.category == 'course': + self.reload_if_needed(course_id) try: return self.modules[course_id][location] except KeyError: -- libgit2 0.26.0