Commit 13ced551 by rfkelly0

implement WatchableFS interface for OSFS using pyinotify on linux

parent 488a35a8
......@@ -16,6 +16,7 @@
* expose.sftp: expose an FS object SFTP
* expose.django_storage: convert FS object to Django Storage object
* Extended attribute support (getxattr/setxattr/delxattr/listxattrs)
* Change watching support (add_watcher/del_watcher)
* Insist on unicode paths throughout:
* output paths are always unicode
* bytestring input paths are decoded as early as possible
......
......@@ -21,10 +21,8 @@ from fs.base import *
from fs.path import *
from fs import _thread_synchronize_default
try:
import xattr
except ImportError:
xattr = None
from fs.osfs.xattrs import OSFSXAttrMixin
from fs.osfs.watch import OSFSWatchMixin
@convert_os_errors
......@@ -33,7 +31,7 @@ def _os_stat(path):
return os.stat(path)
class OSFS(FS):
class OSFS(OSFSXAttrMixin,OSFSWatchMixin,FS):
"""Expose the underlying operating-system filesystem as an FS object.
This is the most basic of filesystems, which simply shadows the underlaying
......@@ -225,28 +223,3 @@ class OSFS(FS):
return self._stat(path).st_size
# Provide native xattr support if available
if xattr:
@convert_os_errors
def setxattr(self, path, key, value):
xattr.xattr(self.getsyspath(path))[key]=value
@convert_os_errors
def getxattr(self, path, key, default=None):
try:
return xattr.xattr(self.getsyspath(path)).get(key)
except KeyError:
return default
@convert_os_errors
def delxattr(self, path, key):
try:
del xattr.xattr(self.getsyspath(path))[key]
except KeyError:
pass
@convert_os_errors
def listxattrs(self, path):
return xattr.xattr(self.getsyspath(path)).keys()
"""
fs.osfs.watch
=============
Change watcher support for OSFS
"""
import os
import sys
import errno
import threading
from fs.errors import *
from fs.path import *
from fs.watch import *
try:
import pyinotify
except ImportError:
pyinotify = None
if pyinotify is not None:
class OSFSWatchMixin(WatchableFSMixin):
"""Mixin providing change-watcher support via pyinotify."""
__watch_lock = threading.Lock()
__watch_manager = None
__watch_notifier = None
def close(self):
super(OSFSWatchMixin,self).close()
self.__shutdown_watch_manager(force=True)
self.notify_watchers(CLOSED)
def add_watcher(self,callback,path="/",events=None,recursive=True):
w = super(OSFSWatchMixin,self).add_watcher(callback,path,events,recursive)
syspath = self.getsyspath(path)
if isinstance(syspath,unicode):
syspath = syspath.encode(sys.getfilesystemencoding())
wm = self.__get_watch_manager()
evtmask = self.__get_event_mask(events)
def process_events(event):
self.__route_event(w,event)
kwds = dict(rec=recursive,auto_add=recursive,quiet=False)
try:
wids = wm.add_watch(syspath,evtmask,process_events,**kwds)
except pyinotify.WatchManagerError, e:
raise OperationFailedError("add_watcher",details=e)
w._pyinotify_id = wids[syspath]
return w
def del_watcher(self,watcher_or_callback):
wm = self.__get_watch_manager()
if isinstance(watcher_or_callback,Watcher):
watchers = [watcher_or_callback]
else:
watchers = self._find_watchers(watcher_or_callback)
for watcher in watchers:
wm.rm_watch(watcher._pyinotify_id,rec=watcher.recursive)
super(OSFSWatchMixin,self).del_watcher(watcher)
if not wm._wmd:
self.__shutdown_watch_manager()
def __get_event_mask(self,events):
"""Convert the given set of events into a pyinotify event mask."""
if events is None:
events = (EVENT,)
mask = 0
for evt in events:
if issubclass(ACCESSED,evt):
mask |= pyinotify.IN_ACCESS
if issubclass(CREATED,evt):
mask |= pyinotify.IN_CREATE
if issubclass(REMOVED,evt):
mask |= pyinotify.IN_DELETE
mask |= pyinotify.IN_DELETE_SELF
if issubclass(MODIFIED,evt):
mask |= pyinotify.IN_ATTRIB
mask |= pyinotify.IN_MODIFY
mask |= pyinotify.IN_CLOSE_WRITE
if issubclass(MOVED_SRC,evt):
mask |= pyinotify.IN_MOVED_FROM
mask |= pyinotify.IN_MOVED_TO
if issubclass(MOVED_DST,evt):
mask |= pyinotify.IN_MOVED_FROM
mask |= pyinotify.IN_MOVED_TO
if issubclass(OVERFLOW,evt):
mask |= pyinotify.IN_Q_OVERFLOW
if issubclass(CLOSED,evt):
mask |= pyinotify.IN_UNMOUNT
return mask
def unsyspath(self,path):
"""Convert a system-level path into an FS-level path."""
path = normpath(path)
if not isprefix(self.root_path,path):
raise ValueError("path not within this FS: %s" % (path,))
return path[len(self.root_path):]
def __route_event(self,watcher,inevt):
"""Convert pyinotify event into fs.watch event, then handle it."""
try:
path = self.unsyspath(inevt.pathname)
except ValueError:
return
try:
src_path = inevt.src_pathname
if src_path is not None:
src_path = self.unsyspath(src_path)
except (AttributeError,ValueError):
src_path = None
if inevt.mask & pyinotify.IN_ACCESS:
watcher.handle_event(ACCESSED(self,path))
if inevt.mask & pyinotify.IN_CREATE:
watcher.handle_event(CREATED(self,path))
# Recursive watching of directories in pyinotify requires
# the creation of a new watch for each subdir, resulting in
# a race condition whereby events in the subdir are missed.
# We'd prefer to duplicate events than to miss them.
if inevt.mask & pyinotify.IN_ISDIR:
try:
# pyinotify does this for dirs itself, we only.
# need to worry about newly-created files.
for child in self.listdir(path,files_only=True):
cpath = pathjoin(path,child)
self.notify_watchers(CREATED,cpath)
self.notify_watchers(MODIFIED,cpath,True,True)
except FSError:
pass
if inevt.mask & pyinotify.IN_DELETE:
watcher.handle_event(REMOVED(self,path))
if inevt.mask & pyinotify.IN_DELETE_SELF:
watcher.handle_event(REMOVED(self,path))
if inevt.mask & pyinotify.IN_ATTRIB:
watcher.handle_event(MODIFIED(self,path,True,False))
if inevt.mask & pyinotify.IN_MODIFY:
watcher.handle_event(MODIFIED(self,path,True,True))
if inevt.mask & pyinotify.IN_CLOSE_WRITE:
watcher.handle_event(MODIFIED(self,path,True,True))
if inevt.mask & pyinotify.IN_MOVED_FROM:
# Sorry folks, I'm not up for decoding the destination path.
watcher.handle_event(MOVED_SRC(self,path,None))
if inevt.mask & pyinotify.IN_MOVED_TO:
if getattr(inevt,"src_pathname",None):
watcher.handle_event(MOVED_SRC(self,src_path,path))
watcher.handle_event(MOVED_DST(self,path,src_path))
else:
watcher.handle_event(MOVED_DST(self,path,None))
if inevt.mask & pyinotify.IN_Q_OVERFLOW:
watcher.handle_event(OVERFLOW(self))
if inevt.mask & pyinotify.IN_UNMOUNT:
watcher.handle_event(CLOSE(self))
def __get_watch_manager(self):
"""Get the shared watch manager, initializing if necessary."""
if OSFSWatchMixin.__watch_notifier is None:
self.__watch_lock.acquire()
try:
if self.__watch_notifier is None:
wm = pyinotify.WatchManager()
n = pyinotify.ThreadedNotifier(wm)
n.start()
OSFSWatchMixin.__watch_manager = wm
OSFSWatchMixin.__watch_notifier = n
finally:
self.__watch_lock.release()
return OSFSWatchMixin.__watch_manager
def __shutdown_watch_manager(self,force=False):
"""Stop the shared watch manager, if there are no watches left."""
self.__watch_lock.acquire()
try:
if OSFSWatchMixin.__watch_manager is None:
return
if not force and OSFSWatchMixin.__watch_manager._wmd:
return
OSFSWatchMixin.__watch_notifier.stop()
OSFSWatchMixin.__watch_notifier = None
OSFSWatchMixin.__watch_manager = None
finally:
self.__watch_lock.release()
else:
class OSFSWatchMixin(object):
"""Mixin disabling change-watcher support."""
def add_watcher(self,*args,**kwds):
raise UnsupportedError
def del_watcher(self,watcher_or_callback):
raise UnsupportedError
"""
fs.osfs.xattrs
==============
Extended-attribute support for OSFS
"""
import os
import sys
import errno
from fs.errors import *
from fs.path import *
from fs.base import FS
try:
import xattr
except ImportError:
xattr = None
if xattr is not None:
class OSFSXAttrMixin(FS):
"""Mixin providing extended-attribute support via the 'xattr' module"""
@convert_os_errors
def setxattr(self, path, key, value):
xattr.xattr(self.getsyspath(path))[key]=value
@convert_os_errors
def getxattr(self, path, key, default=None):
try:
return xattr.xattr(self.getsyspath(path)).get(key)
except KeyError:
return default
@convert_os_errors
def delxattr(self, path, key):
try:
del xattr.xattr(self.getsyspath(path))[key]
except KeyError:
pass
@convert_os_errors
def listxattrs(self, path):
return xattr.xattr(self.getsyspath(path)).keys()
else:
class OSFSXAttrMixin(object):
"""Mixin disable extended-attribute support."""
def getxattr(self,path,key):
raise UnsupportedError
def setxattr(self,path,key,value):
raise UnsupportedError
def delxattr(self,path,key):
raise UnsupportedError
def listxattrs(self,path):
raise UnsupportedError
......@@ -455,8 +455,8 @@ class PathMap(object):
m = m[name]
except KeyError:
return
for nm in m:
if nm:
for (nm,subm) in m.iteritems():
if nm and subm:
yield nm
def names(self,root="/"):
......
......@@ -128,4 +128,9 @@ class Test_PathMap(unittest.TestCase):
self.assertEquals(set(map.iternames("hello")),set(("world","kitty")))
self.assertEquals(set(map.iternames("/hello/kitty")),set(("islame",)))
del map["hello/kitty/islame"]
self.assertEquals(set(map.iternames("/hello/kitty")),set())
self.assertEquals(set(map.iterkeys()),set(("/hello/world","/hello/world/howareya","/hello/world/iamfine","/hello/kitty","/batman/isawesome")))
self.assertEquals(set(map.values()),set(range(1,7)) - set((5,)))
......@@ -13,6 +13,11 @@ from fs.errors import *
from fs.watch import *
from fs.tests import FSTestCases
try:
import pyinotify
except ImportError:
pyinotify = None
class WatcherTestCases:
"""Testcases for filesystems providing change watcher support.
......@@ -34,6 +39,8 @@ class WatcherTestCases:
self.watchfs._poll_cond.wait()
self.watchfs._poll_cond.wait()
self.watchfs._poll_cond.release()
else:
time.sleep(0.5)
def assertEventOccurred(self,cls,path=None,**attrs):
if not self.checkEventOccurred(cls,path,**attrs):
......@@ -92,6 +99,30 @@ class WatcherTestCases:
self.fs.setcontents("hello","hello again world")
self.assertEventOccurred(MODIFIED,"/hello")
def test_watch_single_file(self):
self.fs.setcontents("hello","hello world")
events = []
self.watchfs.add_watcher(events.append,"/hello",(MODIFIED,))
self.fs.setcontents("hello","hello again world")
self.fs.remove("hello")
self.waitForEvents()
for evt in events:
assert isinstance(evt,MODIFIED)
self.assertEquals(evt.path,"/hello")
def test_watch_single_file_remove(self):
self.fs.makedir("testing")
self.fs.setcontents("testing/hello","hello world")
events = []
self.watchfs.add_watcher(events.append,"/testing/hello",(REMOVED,))
self.fs.setcontents("testing/hello","hello again world")
self.waitForEvents()
self.fs.remove("testing/hello")
self.waitForEvents()
self.assertEquals(len(events),1)
assert isinstance(events[0],REMOVED)
self.assertEquals(events[0].path,"/testing/hello")
def test_watch_iter_changes(self):
changes = iter_changes(self.watchfs)
self.fs.makedir("test1")
......@@ -101,23 +132,23 @@ class WatcherTestCases:
self.waitForEvents()
self.watchfs.close()
# Locate the CREATED(test1) event
event = changes.next()
event = changes.next(timeout=1)
while not isinstance(event,CREATED) or event.path != "/test1":
event = changes.next()
event = changes.next(timeout=1)
# Locate the CREATED(test1/hello) event
event = changes.next()
event = changes.next(timeout=1)
while not isinstance(event,CREATED) or event.path != "/test1/hello":
event = changes.next()
event = changes.next(timeout=1)
# Locate the REMOVED(test1) event
event = changes.next()
event = changes.next(timeout=1)
while not isinstance(event,REMOVED) or event.path != "/test1":
event = changes.next()
event = changes.next(timeout=1)
# Locate the CLOSED event
event = changes.next()
event = changes.next(timeout=1)
while not isinstance(event,CLOSED):
event = changes.next()
event = changes.next(timeout=1)
# That should be the last event in the list
self.assertRaises(StopIteration,changes.next)
self.assertRaises(StopIteration,changes.next,timeout=1)
changes.close()
......@@ -128,6 +159,8 @@ class TestWatchers_TempFS(unittest.TestCase,FSTestCases,WatcherTestCases):
self.fs = tempfs.TempFS()
watchfs = osfs.OSFS(self.fs.root_path)
self.watchfs = ensure_watchable(watchfs,poll_interval=0.1)
if pyinotify is not None:
self.assertEquals(watchfs,self.watchfs)
def tearDown(self):
self.watchfs.close()
......
......@@ -139,7 +139,8 @@ from fs import tempfs
class TestXAttr_TempFS(unittest.TestCase,FSTestCases,XAttrTestCases):
def setUp(self):
self.fs = ensure_xattrs(tempfs.TempFS())
fs = tempfs.TempFS()
self.fs = ensure_xattrs(fs)
def tearDown(self):
td = self.fs._temp_dir
......
......@@ -25,11 +25,12 @@ An FS object that wants to be "watchable" must provide the following methods:
import weakref
import threading
from Queue import Queue
import Queue
from fs.path import *
from fs.errors import *
from fs.wrapfs import WrapFS
from fs.base import FS
class EVENT(object):
......@@ -65,17 +66,21 @@ class MODIFIED(EVENT):
self.meta = meta
self.data = data
class MOVED_FROM(EVENT):
"""Event fired when a file or directory is moved."""
class MOVED_DST(EVENT):
"""Event fired when a file or directory is the target of a move."""
def __init__(self,fs,path,source):
super(MOVED_FROM,self).__init__(fs,path)
self.source = abspath(normpath(source))
super(MOVED_DST,self).__init__(fs,path)
if source is not None:
source = abspath(normpath(source))
self.source = source
class MOVED_TO(EVENT):
"""Event fired when a file or directory is moved."""
class MOVED_SRC(EVENT):
"""Event fired when a file or directory is the source of a move."""
def __init__(self,fs,path,destination):
super(MOVED_TO,self).__init__(fs,path)
self.destination = abspath(normpath(destination))
super(MOVED_SRC,self).__init__(fs,path)
if destination is not None:
destination = abspath(normpath(destination))
self.destination = destination
class CLOSED(EVENT):
"""Event fired when the filesystem is closed."""
......@@ -130,7 +135,7 @@ class Watcher(object):
self.callback(event)
class WatchableFSMixin(object):
class WatchableFSMixin(FS):
"""Mixin class providing watcher management functions."""
def __init__(self,*args,**kwds):
......@@ -154,7 +159,15 @@ class WatchableFSMixin(object):
del watchers[i]
break
def _find_watchers(self,callback):
"""Find watchers registered with the given callback."""
for watchers in self._watchers.itervalues():
for watcher in watchers:
if watcher.callback is callback:
yield watcher
def notify_watchers(self,event_class,path=None,*args,**kwds):
"""Notify watchers of the given event data."""
event = event_class(self,path,*args,**kwds)
if path is None:
for watchers in self._watchers.itervalues():
......@@ -254,8 +267,8 @@ class WatchableFS(WrapFS,WatchableFSMixin):
super(WatchableFS,self).rename(src,dst)
if d_existed:
self.notify_watchers(REMOVED,dst)
self.notify_watchers(MOVED_TO,src,dst)
self.notify_watchers(MOVED_FROM,dst,src)
self.notify_watchers(MOVED_SRC,src,dst)
self.notify_watchers(MOVED_DST,dst,src)
def copy(self,src,dst,**kwds):
d = self._pre_copy(src,dst)
......@@ -339,9 +352,9 @@ class PollingWatchableFS(WatchableFS):
def __init__(self,wrapped_fs,poll_interval=60*5):
super(PollingWatchableFS,self).__init__(wrapped_fs)
self.poll_interval = poll_interval
self.add_watcher(self._on_path_modify,"/",(CREATED,MOVED_TO,))
self.add_watcher(self._on_path_modify,"/",(CREATED,MOVED_DST,))
self.add_watcher(self._on_path_modify,"/",(MODIFIED,ACCESSED,))
self.add_watcher(self._on_path_delete,"/",(REMOVED,MOVED_FROM,))
self.add_watcher(self._on_path_delete,"/",(REMOVED,MOVED_SRC,))
self._path_info = PathMap()
self._poll_thread = threading.Thread(target=self._poll_for_changes)
self._poll_cond = threading.Condition()
......@@ -364,9 +377,11 @@ class PollingWatchableFS(WatchableFS):
pass
def _on_path_delete(self,event):
print "DELETE", event.path
self._path_info.clear(event.path)
def _poll_for_changes(self):
try:
while not self._poll_close_event.isSet():
# Walk all directories looking for changes.
# Come back to any that give us an error.
......@@ -393,6 +408,9 @@ class PollingWatchableFS(WatchableFS):
self._poll_cond.release()
# Sleep for the specified interval, or until closed.
self._poll_close_event.wait(timeout=self.poll_interval)
except FSError:
if not self.closed:
raise
def _check_for_changes(self,dirnm):
# Check the metadata for the directory itself.
......@@ -445,6 +463,7 @@ class PollingWatchableFS(WatchableFS):
return
cpath = pathjoin(dirnm,childnm)
if not self.wrapped_fs.exists(cpath):
print "REMOVED", cpath
self.notify_watchers(REMOVED,cpath)
......@@ -456,11 +475,12 @@ def ensure_watchable(fs,wrapper_class=PollingWatchableFS,*args,**kwds):
for watcher callbacks. This may be the original object if it supports them
natively, or a wrapper class if they must be simulated.
"""
w = lambda e: None
try:
fs.add_watcher("/",w)
w = fs.add_watcher(lambda e: None,"/somepaththatsnotlikelytoexist")
except (AttributeError,UnsupportedError):
return wrapper_class(fs,*args,**kwds)
except FSError:
pass
else:
fs.del_watcher(w)
return fs
......@@ -477,7 +497,7 @@ class iter_changes(object):
def __init__(self,fs=None,path="/",events=None,**kwds):
self.closed = False
self._queue = Queue()
self._queue = Queue.Queue()
self._watching = set()
if fs is not None:
self.add_watcher(fs,path,events,**kwds)
......@@ -488,10 +508,13 @@ class iter_changes(object):
def __del__(self):
self.close()
def next(self):
def next(self,timeout=None):
if not self._watching:
raise StopIteration
event = self._queue.get()
try:
event = self._queue.get(timeout=timeout)
except Queue.Empty:
raise StopIteration
if event is None:
raise StopIteration
if isinstance(event,CLOSED):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment