Commit cce452e2 by Victor Shnayder

Add monitoring of course dirs, and reload on change

* if the file data_dir/{course_dir}.reload is created or changed, will reload on next access to course descriptor
* background thread polls every second
* Relies on there being only 1 modulestore (per course dir).
parent db667017
"""
A module to monitor certain files and have the modulestore reload courses.
Modified from
http://stevelosh.com/blog/2011/06/django-advice/#watching-for-changes
NOTE: This code runs in a multi-process, multi-thread setting. If you don't
understand threads or the gunicorn worker process environment, don't mess with
it :)
Basic structure:
- for each path being monitoring, maintain a threading.Event object that is signalled when the file is changed.
- the client interface is the following:
1) Call watch(path) -> returns an Event object e.
2) call e.is_set() to tell whether the file has been modified.
3) call e.clear() to clear the state to start looking for the next modification.
4) go back to step 2 to look for the next modification.
"""
import os
import sys
import time
import threading
import atexit
import Queue
_interval = 1.0
_times = {} # path -> modification time
_events = {} # path -> event for that path
_queue = Queue.Queue() # used only for thread-local sleeping
_running = False
_lock = threading.Lock() # protects _running and writes to the _events dictionary
def _restart(path):
_queue.put(True)
prefix = 'monitor (pid=%d):' % os.getpid()
print >> sys.stderr, '%s Change detected to \'%s\'.' % (prefix, path)
def _modified(path):
"""
Check whether path has been modified. If it has been, save the latest
modification time in _times.
"""
# Cases:
# - file has disappeared or changed type
# - file has appeared
# - file has been modified
try:
if not os.path.isfile(path):
# was the file there before?
if _times[path] > 0:
_times[path] = 0 # now it's not here again
return True
# otherwise, nothing has changed
return False
# When was the file last modified?
mtime = os.stat(path).st_mtime
# has it changed?
if mtime != _times[path]:
_times[path] = mtime
return True
except:
# If any exception occured, likely that file has been been removed just
# before stat(), so say that it's been changed.
return True
return False
def _monitor():
while True:
# Check modification times on files which have
# specifically been registered for monitoring.
#print >> sys.stderr, "Watching %s" % (_events.keys())
for path, event in _events.items():
if _modified(path):
# print >> sys.stderr, "%s modified" % (path)
event.set()
# Sleep for specified interval.
try:
return _queue.get(timeout=_interval)
except:
pass
_thread = threading.Thread(target=_monitor)
_thread.setDaemon(True)
def _exiting():
try:
_queue.put(True)
except:
pass
_thread.join()
atexit.register(_exiting)
def watch(path):
"""
Register path for watching. Returns a threading.Event object that will be
set after the file changes. The caller is responsible for calling clear() on the event
to find out about subsequent modifications.
"""
_lock.acquire()
if not path in _events:
_events[path] = threading.Event()
_times[path] = 0 # give it a dummy modification time to make logic earlier
_lock.release()
return _events[path]
def start(interval=1.0):
global _interval
if interval < _interval:
_interval = interval
global _running
_lock.acquire()
if not _running:
prefix = 'monitor (pid=%d):' % os.getpid()
print >> sys.stderr, '%s Starting change monitor.' % prefix
_running = True
_thread.start()
_lock.release()
......@@ -17,7 +17,8 @@ from xmodule.course_module import CourseDescriptor
from xmodule.mako_module import MakoDescriptorSystem
from xmodule.x_module import XModuleDescriptor, XMLParsingSystem
from . import ModuleStoreBase, Location
from . import ModuleStoreBase, Location, monitor
from .exceptions import ItemNotFoundError
edx_xml_parser = etree.XMLParser(dtd_validation=False, load_dtd=False,
......@@ -27,6 +28,26 @@ etree.set_default_parser(edx_xml_parser)
log = logging.getLogger('mitx.' + __name__)
# Need to start the monitoring thread somewhere. On import of this module seems
# like the best available place.
def start_monitor():
"""
Fire up the monitoring thread.
Architectural note: there is only one monitoring thread, and so the
monitoring will only work properly if there is a single modulestore per
course per process.
"""
log.info("Starting change monitor.")
monitor.start(interval=1.0)
start_monitor()
# VS[compat]
# TODO (cpennington): Remove this once all fall 2012 courses have been imported
# into the cms from xml
......@@ -249,9 +270,40 @@ class XMLModuleStore(ModuleStoreBase):
course_dirs = [d for d in os.listdir(self.data_dir) if
os.path.exists(self.data_dir / d / "course.xml")]
self.course_dir_by_id = dict() # course_id -> course_dir
self.monitoring_event = dict() # course_id -> threading.Event monitoring this path.
for course_dir in course_dirs:
self.try_load_course(course_dir)
def watch_file(self, course_dir):
"""Return path to watch for modification to trigger course reloads."""
return self.data_dir / course_dir + ".reload"
def reload_if_needed(self, course_id):
"""
Integration point with thread monitoring for content changes. Returns
true if the content needs reloading.
"""
# Does it need reloading?
course_dir = self.course_dir_by_id.get(course_id)
if not course_dir:
# courses we don't know about don't need reloading
return
if self.monitoring_event[course_id].is_set():
# reset monitoring and reload
self.monitoring_event[course_id].clear()
self.try_load_course(course_dir)
def register_course_dir(self, course_id, course_dir):
"""
Register a mapping between course_id and course_dir, and ask the monitor
module to watch the watch file for that dir.
"""
self.course_dir_by_id[course_id] = course_dir
self.monitoring_event[course_id] = monitor.watch(self.watch_file(course_dir))
def try_load_course(self, course_dir):
'''
Load a course, keeping track of errors as we go along.
......@@ -390,6 +442,10 @@ class XMLModuleStore(ModuleStoreBase):
course_id = CourseDescriptor.make_id(org, course, url_name)
# Save the course dir so we can look it up by id later (to reload the course)
self.register_course_dir(course_id, course_dir)
system = ImportSystem(self, course_id, course_dir, policy, tracker, self.parent_tracker)
course_descriptor = system.process_xml(etree.tostring(course_data))
......@@ -429,6 +485,8 @@ class XMLModuleStore(ModuleStoreBase):
location: Something that can be passed to Location
"""
location = Location(location)
if location.category == 'course':
self.reload_if_needed(course_id)
try:
return self.modules[course_id][location]
except KeyError:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment