Commit 547e5f54 by rfkelly0

osfs.watch_inotify: fix handling of multiple watchers on a single path.

Turns out the pyinotify "add_watch" method actually updates an existing watch
if there is already one for the given path.  Instead, we create a new manager
(and hence, a new inotify fil descriptor) for each watcher added.
parent 358d7f1d
......@@ -9,6 +9,7 @@ Change watcher support for OSFS, backed by pyinotify.
import os
import sys
import errno
import select
import threading
from fs.errors import *
......@@ -29,20 +30,38 @@ class OSFSWatchMixin(WatchableFSMixin):
"""Mixin providing change-watcher support via pyinotify."""
__watch_lock = threading.Lock()
__watch_manager = None
__watch_notifier = None
__watch_thread = None
def close(self):
super(OSFSWatchMixin,self).close()
self.__shutdown_watch_manager(force=True)
self.notify_watchers(CLOSED)
for watcher_list in self._watchers.values():
for watcher in watcher_list:
self.del_watcher(watcher)
self.__watch_lock.acquire()
try:
wt = self.__watch_thread
if wt is not None and not wt.watchers:
wt.stop()
OSFSWatchMixin.__watch_thread = None
finally:
self.__watch_lock.release()
def add_watcher(self,callback,path="/",events=None,recursive=True):
w = super(OSFSWatchMixin,self).add_watcher(callback,path,events,recursive)
super_add_watcher = super(OSFSWatchMixin,self).add_watcher
w = super_add_watcher(callback,path,events,recursive)
w._pyinotify_id = None
syspath = self.getsyspath(path)
if isinstance(syspath,unicode):
syspath = syspath.encode(sys.getfilesystemencoding())
wm = self.__get_watch_manager()
# Each watch gets its own WatchManager, since it's tricky to make
# a single WatchManager handle multiple callbacks with different
# events for a single path. This means we pay one file descriptor
# for each watcher added to the filesystem. That's not too bad.
w._pyinotify_WatchManager = wm = pyinotify.WatchManager()
# Each individual notifier gets multiplexed by a single shared thread.
w._pyinotify_Notifier = pyinotify.Notifier(wm)
evtmask = self.__get_event_mask(events)
def process_events(event):
self.__route_event(w,event)
......@@ -52,19 +71,30 @@ class OSFSWatchMixin(WatchableFSMixin):
except pyinotify.WatchManagerError, e:
raise OperationFailedError("add_watcher",details=e)
w._pyinotify_id = wids[syspath]
self.__watch_lock.acquire()
try:
wt = self.__get_watch_thread()
wt.add_watcher(w)
finally:
self.__watch_lock.release()
return w
def del_watcher(self,watcher_or_callback):
wm = self.__get_watch_manager()
if isinstance(watcher_or_callback,Watcher):
watchers = [watcher_or_callback]
else:
watchers = self._find_watchers(watcher_or_callback)
for watcher in watchers:
wm = watcher._pyinotify_WatchManager
wm.rm_watch(watcher._pyinotify_id,rec=watcher.recursive)
super(OSFSWatchMixin,self).del_watcher(watcher)
if not wm._wmd:
self.__shutdown_watch_manager()
self.__watch_lock.acquire()
try:
wt = self.__get_watch_thread()
for watcher in watchers:
wt.del_watcher(watcher)
finally:
self.__watch_lock.release()
def __get_event_mask(self,events):
"""Convert the given set of events into a pyinotify event mask."""
......@@ -149,33 +179,81 @@ class OSFSWatchMixin(WatchableFSMixin):
if inevt.mask & pyinotify.IN_UNMOUNT:
watcher.handle_event(CLOSE(self))
def __get_watch_manager(self):
"""Get the shared watch manager, initializing if necessary."""
if OSFSWatchMixin.__watch_notifier is None:
self.__watch_lock.acquire()
try:
if self.__watch_notifier is None:
wm = pyinotify.WatchManager()
n = pyinotify.ThreadedNotifier(wm)
n.start()
OSFSWatchMixin.__watch_manager = wm
OSFSWatchMixin.__watch_notifier = n
finally:
self.__watch_lock.release()
return OSFSWatchMixin.__watch_manager
def __shutdown_watch_manager(self,force=False):
"""Stop the shared watch manager, if there are no watches left."""
self.__watch_lock.acquire()
def __get_watch_thread(self):
"""Get the shared watch thread, initializing if necessary.
This method must only be called while holding self.__watch_lock, or
multiple notifiers could be created.
"""
if OSFSWatchMixin.__watch_thread is None:
OSFSWatchMixin.__watch_thread = SharedThreadedNotifier()
OSFSWatchMixin.__watch_thread.start()
return OSFSWatchMixin.__watch_thread
class SharedThreadedNotifier(threading.Thread):
"""pyinotifer Notifier that can manage multiple WatchManagers.
Each watcher added to an OSFS corresponds to a new pyinotify.WatchManager
instance. Rather than run a notifier thread for each manager, we run a
single thread that multiplexes between them all.
"""
def __init__(self):
super(SharedThreadedNotifier,self).__init__()
self.daemon = True
self.running = False
self._pipe_r, self._pipe_w = os.pipe()
self._poller = select.poll()
self._poller.register(self._pipe_r,select.POLLIN)
self.watchers = {}
def add_watcher(self,watcher):
fd = watcher._pyinotify_WatchManager.get_fd()
self.watchers[fd] = watcher
self._poller.register(fd,select.POLLIN)
# Bump the poll object so it recognises the new fd.
os.write(self._pipe_w,"H")
def del_watcher(self,watcher):
fd = watcher._pyinotify_WatchManager.get_fd()
try:
if OSFSWatchMixin.__watch_manager is None:
return
if not force and OSFSWatchMixin.__watch_manager._wmd:
return
OSFSWatchMixin.__watch_notifier.stop()
OSFSWatchMixin.__watch_notifier = None
OSFSWatchMixin.__watch_manager = None
finally:
self.__watch_lock.release()
del self.watchers[fd]
except KeyError:
pass
else:
self._poller.unregister(fd)
def run(self):
self.running = True
while self.running:
try:
ready_fds = self._poller.poll()
except select.error, e:
if e[0] != errno.EINTR:
raise
else:
for (fd,event) in ready_fds:
# Ignore all events other than "input ready".
if not event & select.POLLIN:
continue
# For signals on our internal pipe, just read and discard.
if fd == self._pipe_r:
os.read(self._pipe_r,1)
# For notifier fds, dispath to the notifier methods.
else:
try:
notifier = self.watchers[fd]._pyinotify_Notifier
except KeyError:
pass
else:
notifier.read_events()
notifier.process_events()
def stop(self):
if self.running:
self.running = False
os.write(self._pipe_w,"S")
os.close(self._pipe_w)
......@@ -15,17 +15,17 @@ from fs.watch import *
from fs.tests import FSTestCases
try:
from fs.osfs import watch_inotify
from fs.osfs import watch_inotify
except ImportError:
watch_inotify = None
watch_inotify = None
if sys.platform == "win32":
try:
from fs.osfs import watch_win32
from fs.osfs import watch_win32
except ImportError:
watch_win32 = None
watch_win32 = None
else:
watch_win32 = None
watch_win32 = None
class WatcherTestCases:
......@@ -74,6 +74,14 @@ class WatcherTestCases:
self.fs.makedir("test1")
self.assertEventOccurred(CREATED,"/test1")
def test_watch_makedir_with_two_watchers(self):
self.setupWatchers()
events2 = []
self.watchfs.add_watcher(events2.append)
self.fs.makedir("test1")
self.assertEventOccurred(CREATED,"/test1")
self.assertTrue(isinstance(events2[0],CREATED))
def test_watch_readfile(self):
self.setupWatchers()
self.fs.setcontents("hello","hello world")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment