Commit d52cc3fb by rfkelly0

Refactor CacheFS for efficiency and usability.

The main class is now CacheFSMixin, a mixin class that can add caching to
any FS implementation.  It operates explicitly on a PathMap of recent 
info dicts, making it more efficient and more robust than the old CacheFS.

The CacheFS wrapper class is still there, but it's a trivial class that
just mixes CacheFSMixin into the base WrapFS class.

TahoeFS has also been modified to use the mixin architecture.
parent e85a0ceb
......@@ -27,13 +27,9 @@ When any problem occurred, you can turn on internal debugging messages:
TODO:
x unicode support
x try network errors / bad happiness
x colon hack (all occurences of ':' in filenames transparently convert to __colon__)
x logging / cleaning sources from print()
x exceptions
x tests
x create ticket and send initial code
x sanitize all path types (., /)
x rewrite listdir, add listdirinfo
x support for extra large file uploads (poster module)
x Possibility to block write until upload done (Tahoe mailing list)
x Report something sane when Tahoe crashed/unavailable
......@@ -41,44 +37,56 @@ TODO:
filetimes
docs & author
python3 support
python 2.3 support
remove creating blank files (depends on FileUploadManager)
TODO (Not TahoeFS specific tasks):
x DebugFS
x RemoteFileBuffer on the fly buffering support
x RemoteFileBuffer unit tests
x RemoteFileBuffer submit to trunk
colon hack -> move outside tahoe, should be in windows-specific FS (maybe in Dokan?)
autorun hack -> move outside tahoe, -||-
Implement FileUploadManager + faking isfile/exists of just processing file
pyfilesystem docs is outdated (rename, movedir, ...)
'''
import stat as statinfo
import logging
from logging import DEBUG, INFO, ERROR, CRITICAL
import fs.errors as errors
from fs.path import abspath, relpath, normpath, dirname, pathjoin
from fs import FS, NullFile, _thread_synchronize_default, SEEK_END
from fs.remote import CacheFS, _cached_method, RemoteFileBuffer
from fs.base import fnmatch
from fs.remote import CacheFSMixin, RemoteFileBuffer
from fs.base import fnmatch, NoDefaultMeta
from util import TahoeUtil
from connection import Connection
#from .debugfs import DebugFS
logger = logging.getLogger('fs.tahoefs')
def _fix_path(func):
"""Method decorator for automatically normalising paths."""
def wrapper(self, *args, **kwds):
if len(args):
args = list(args)
args[0] = abspath(normpath(args[0]))
args[0] = _fixpath(args[0])
return func(self, *args, **kwds)
return wrapper
def _fixpath(path):
"""Normalize the given path."""
return abspath(normpath(path))
class TahoeFS(CacheFS):
class _TahoeFS(FS):
"""FS providing raw access to a Tahoe Filesystem.
This class implements all the details of interacting with a Tahoe-backed filesystem, but you
probably don't want to use it in practise. Use the TahoeFS class instead, which has some internal
caching to improve performance.
"""
_meta = { 'virtual' : False,
'read_only' : False,
......@@ -86,24 +94,22 @@ class TahoeFS(CacheFS):
'case_insensitive_paths' : False,
'network' : True
}
def __init__(self, dircap, timeout=60, autorun=True, largefilesize=10*1024*1024, webapi='http://127.0.0.1:3456'):
'''
Creates instance of TahoeFS.
def __init__(self, dircap, largefilesize=10*1024*1024, webapi='http://127.0.0.1:3456'):
'''Creates instance of TahoeFS.
:param dircap: special hash allowing user to work with TahoeLAFS directory.
:param timeout: how long should underlying CacheFS keep information about files
before asking TahoeLAFS node again.
:param autorun: Allow listing autorun files? Can be very dangerous on Windows!.
This is temporary hack, as it should be part of Windows-specific middleware,
not Tahoe itself.
:param largefilesize: - Create placeholder file for files larger than this treshold.
Uploading and processing of large files can last extremely long (many hours),
so placing this placeholder can help you to remember that upload is processing.
Setting this to None will skip creating placeholder files for any uploads.
'''
fs = _TahoeFS(dircap, autorun=autorun, largefilesize=largefilesize, webapi=webapi)
super(TahoeFS, self).__init__(fs, timeout)
self.dircap = dircap if not dircap.endswith('/') else dircap[:-1]
self.largefilesize = largefilesize
self.connection = Connection(webapi)
self.tahoeutil = TahoeUtil(webapi)
super(_TahoeFS, self).__init__(thread_synchronize=_thread_synchronize_default)
def __str__(self):
return "<TahoeFS: %s>" % self.dircap
......@@ -111,35 +117,37 @@ class TahoeFS(CacheFS):
@classmethod
def createdircap(cls, webapi='http://127.0.0.1:3456'):
return TahoeUtil(webapi).createdircap()
def getmeta(self,meta_name,default=NoDefaultMeta):
if meta_name == "read_only":
return self.dircap.startswith('URI:DIR2-RO')
return super(_TahoeFS,self).getmeta(meta_name,default)
@_fix_path
def open(self, path, mode='r', **kwargs):
self.wrapped_fs._log(INFO, 'Opening file %s in mode %s' % (path, mode))
self._log(INFO, 'Opening file %s in mode %s' % (path, mode))
newfile = False
if not self.exists(path):
if 'w' in mode or 'a' in mode:
newfile = True
else:
self.wrapped_fs._log(DEBUG, "File %s not found while opening for reads" % path)
self._log(DEBUG, "File %s not found while opening for reads" % path)
raise errors.ResourceNotFoundError(path)
elif self.isdir(path):
self.wrapped_fs._log(DEBUG, "Path %s is directory, not a file" % path)
self._log(DEBUG, "Path %s is directory, not a file" % path)
raise errors.ResourceInvalidError(path)
if 'w' in mode:
elif 'w' in mode:
newfile = True
if newfile:
self.wrapped_fs._log(DEBUG, 'Creating empty file %s' % path)
if self.wrapped_fs.readonly:
self._log(DEBUG, 'Creating empty file %s' % path)
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
self.setcontents(path, '')
handler = NullFile()
else:
self.wrapped_fs._log(DEBUG, 'Opening existing file %s for reading' % path)
handler = self.wrapped_fs._get_file_handler(path)
self._log(DEBUG, 'Opening existing file %s for reading' % path)
handler = self.getrange(path,0)
return RemoteFileBuffer(self, path, mode, handler,
write_on_flush=False)
......@@ -155,20 +163,20 @@ class TahoeFS(CacheFS):
def exists(self, path):
try:
self.getinfo(path)
self.wrapped_fs._log(DEBUG, "Path %s exists" % path)
self._log(DEBUG, "Path %s exists" % path)
return True
except errors.ResourceNotFoundError:
self.wrapped_fs._log(DEBUG, "Path %s does not exists" % path)
self._log(DEBUG, "Path %s does not exists" % path)
return False
except errors.ResourceInvalidError:
self.wrapped_fs._log(DEBUG, "Path %s does not exists, probably misspelled URI" % path)
self._log(DEBUG, "Path %s does not exists, probably misspelled URI" % path)
return False
@_fix_path
def getsize(self, path):
try:
size = self.getinfo(path)['size']
self.wrapped_fs._log(DEBUG, "Size of %s is %d" % (path, size))
self._log(DEBUG, "Size of %s is %d" % (path, size))
return size
except errors.ResourceNotFoundError:
return 0
......@@ -180,7 +188,7 @@ class TahoeFS(CacheFS):
except errors.ResourceNotFoundError:
#isfile = not path.endswith('/')
isfile = False
self.wrapped_fs._log(DEBUG, "Path %s is file: %d" % (path, isfile))
self._log(DEBUG, "Path %s is file: %d" % (path, isfile))
return isfile
@_fix_path
......@@ -189,54 +197,55 @@ class TahoeFS(CacheFS):
isdir = (self.getinfo(path)['type'] == 'dirnode')
except errors.ResourceNotFoundError:
isdir = False
self.wrapped_fs._log(DEBUG, "Path %s is directory: %d" % (path, isdir))
self._log(DEBUG, "Path %s is directory: %d" % (path, isdir))
return isdir
def listdir(self, *args, **kwargs):
return [ item[0] for item in self.listdirinfo(*args, **kwargs) ]
def listdirinfo(self, *args, **kwds):
return list(self.ilistdirinfo(*args,**kwds))
def ilistdir(self, *args, **kwds):
for item in self.ilistdirinfo(*args,**kwds):
yield item[0]
@_fix_path
@_cached_method
def listdirinfo(self, path="/", wildcard=None, full=False, absolute=False,
def ilistdirinfo(self, path="/", wildcard=None, full=False, absolute=False,
dirs_only=False, files_only=False):
self.wrapped_fs._log(DEBUG, "Listing directory (listdirinfo) %s" % path)
_fixpath = self.wrapped_fs._fixpath
_path = _fixpath(path)
self._log(DEBUG, "Listing directory (listdirinfo) %s" % path)
if dirs_only and files_only:
raise errors.ValueError("dirs_only and files_only can not both be True")
raise ValueError("dirs_only and files_only can not both be True")
result = []
for item in self.wrapped_fs.tahoeutil.list(self.dircap, _path):
for item in self.tahoeutil.list(self.dircap, path):
if dirs_only and item['type'] == 'filenode':
continue
elif files_only and item['type'] == 'dirnode':
continue
if wildcard is not None and \
not fnmatch.fnmatch(item['name'], wildcard):
continue
if wildcard is not None:
if isinstance(wildcard,basestring):
if not fnmatch.fnmatch(item['name'], wildcard):
continue
else:
if not wildcard(item['name']):
continue
if full:
item_path = relpath(pathjoin(_path, item['name']))
item_path = relpath(pathjoin(path, item['name']))
elif absolute:
item_path = abspath(pathjoin(_path, item['name']))
item_path = abspath(pathjoin(path, item['name']))
else:
item_path = item['name']
cache_name = self.wrapped_fs._fixpath(u"%s/%s" % \
(path, item['name']))
self._cache_set(cache_name, 'getinfo', (), {}, (True, item))
result.append((item_path, item))
return result
yield (item_path, item)
def listdir(self, *args, **kwargs):
return [ item[0] for item in self.listdirinfo(*args, **kwargs) ]
@_fix_path
def remove(self, path):
self.wrapped_fs._log(INFO, 'Removing file %s' % path)
if self.wrapped_fs.readonly:
self._log(INFO, 'Removing file %s' % path)
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
if not self.isfile(path):
......@@ -245,16 +254,14 @@ class TahoeFS(CacheFS):
raise errors.ResourceInvalidError(path)
try:
self.wrapped_fs.tahoeutil.unlink(self.dircap, path)
self.tahoeutil.unlink(self.dircap, path)
except Exception, e:
raise errors.ResourceInvalidError(path)
finally:
self._uncache(path, removed=True)
@_fix_path
def removedir(self, path, recursive=False, force=False):
self.wrapped_fs._log(INFO, "Removing directory %s" % path)
if self.wrapped_fs.readonly:
self._log(INFO, "Removing directory %s" % path)
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
if not self.isdir(path):
if not self.isfile(path):
......@@ -263,10 +270,7 @@ class TahoeFS(CacheFS):
if not force and self.listdir(path):
raise errors.DirectoryNotEmptyError(path)
try:
self.wrapped_fs.tahoeutil.unlink(self.dircap, path)
finally:
self._uncache(path, removed=True)
self.tahoeutil.unlink(self.dircap, path)
if recursive and path != '/':
try:
......@@ -276,9 +280,8 @@ class TahoeFS(CacheFS):
@_fix_path
def makedir(self, path, recursive=False, allow_recreate=False):
self.wrapped_fs._log(INFO, "Creating directory %s" % path)
if self.wrapped_fs.readonly:
self._log(INFO, "Creating directory %s" % path)
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
if self.exists(path):
if not self.isdir(path):
......@@ -287,88 +290,46 @@ class TahoeFS(CacheFS):
raise errors.DestinationExistsError(path)
if not recursive and not self.exists(dirname(path)):
raise errors.ParentDirectoryMissingError(path)
try:
self.wrapped_fs.tahoeutil.mkdir(self.dircap, path)
finally:
self._uncache(path,added=True)
self.tahoeutil.mkdir(self.dircap, path)
def movedir(self, src, dst, overwrite=False):
self.move(src, dst, overwrite)
def move(self, src, dst, overwrite=False):
# FIXME: overwrite not documented
self.wrapped_fs._log(INFO, "Moving file from %s to %s" % (src, dst))
if self.wrapped_fs.readonly:
self._log(INFO, "Moving file from %s to %s" % (src, dst))
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
src = self.wrapped_fs._fixpath(src)
dst = self.wrapped_fs._fixpath(dst)
src = _fixpath(src)
dst = _fixpath(dst)
if not self.exists(dirname(dst)):
# FIXME: Why raise exception when it is legal construct?
raise errors.ParentDirectoryMissingError(dst)
if not overwrite and self.exists(dst):
raise errors.DestinationExistsError(dst)
try:
self.wrapped_fs.tahoeutil.move(self.dircap, src, dst)
finally:
self._uncache(src,removed=True)
self._uncache(dst,added=True)
@_fix_path
def setcontents(self, path, data, chunk_size=64*1024):
try:
self.wrapped_fs.setcontents(path, data, chunk_size=chunk_size)
finally:
self._uncache(path, added=True)
self.tahoeutil.move(self.dircap, src, dst)
def rename(self, src, dst):
self.move(src, dst)
def copy(self, src, dst, overwrite=False, chunk_size=16384):
if self.wrapped_fs.readonly:
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
# FIXME: this is out of date; how to do native tahoe copy?
# FIXME: Workaround because isfile() not exists on _TahoeFS
FS.copy(self, src, dst, overwrite, chunk_size)
def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
if self.wrapped_fs.readonly:
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
# FIXME: this is out of date; how to do native tahoe copy?
# FIXME: Workaround because isfile() not exists on _TahoeFS
FS.copydir(self, src, dst, overwrite, ignore_errors, chunk_size)
class _TahoeFS(FS):
def __init__(self, dircap, autorun, largefilesize, webapi):
self.dircap = dircap if not dircap.endswith('/') else dircap[:-1]
self.autorun = autorun
self.largefilesize = largefilesize
self.connection = Connection(webapi)
self.tahoeutil = TahoeUtil(webapi)
self.readonly = dircap.startswith('URI:DIR2-RO')
super(_TahoeFS, self).__init__(thread_synchronize=_thread_synchronize_default)
def _log(self, level, message):
if not logger.isEnabledFor(level): return
logger.log(level, u'(%d) %s' % (id(self),
unicode(message).encode('ASCII', 'replace')))
def _fixpath(self, path):
return abspath(normpath(path))
def _get_file_handler(self, path):
if not self.autorun:
if path.lower().startswith('/autorun.'):
self._log(DEBUG, 'Access to file %s denied' % path)
return NullFile()
return self.getrange(path, 0)
@_fix_path
def getpathurl(self, path, allow_none=False, webapi=None):
'''
......@@ -376,24 +337,21 @@ class _TahoeFS(FS):
'''
if webapi == None:
webapi = self.connection.webapi
self._log(DEBUG, "Retrieving URL for %s over %s" % (path, webapi))
path = self.tahoeutil.fixwinpath(path, False)
return u"%s/uri/%s%s" % (webapi, self.dircap, path)
@_fix_path
def getrange(self, path, offset, length=None):
path = self.tahoeutil.fixwinpath(path, False)
return self.connection.get(u'/uri/%s%s' % (self.dircap, path),
offset=offset, length=length)
@_fix_path
def setcontents(self, path, file, chunk_size=64*1024):
self._log(INFO, 'Uploading file %s' % path)
path = self.tahoeutil.fixwinpath(path, False)
size=None
if self.readonly:
if self.getmeta("read_only"):
raise errors.UnsupportedError('read only filesystem')
# Workaround for large files:
......@@ -420,4 +378,24 @@ class _TahoeFS(FS):
#info['created_time'] = datetime.datetime.now()
#info['modified_time'] = datetime.datetime.now()
#info['accessed_time'] = datetime.datetime.now()
if info['type'] == 'filenode':
info["st_mode"] = 0x700 | statinfo.S_IFREG
elif info['type'] == 'dirnode':
info["st_mode"] = 0x700 | statinfo.S_IFDIR
return info
class TahoeFS(CacheFSMixin,_TahoeFS):
"""FS providing cached access to a Tahoe Filesystem.
This class is the preferred means to access a Tahoe filesystem. It
maintains an internal cache of recently-accessed metadata to speed
up operations.
"""
def __init__(self, *args, **kwds):
kwds.setdefault("cache_timeout",60)
super(TahoeFS,self).__init__(*args,**kwds)
......@@ -62,7 +62,6 @@ class Connection:
def _urlopen(self, req):
try:
#print req.get_full_url()
return urlopen(req)
except Exception, e:
if not getattr(e, 'getcode', None):
......
......@@ -40,6 +40,12 @@ to subprocess.Popen::
>>> mp = dokan.MountProcess(fs,"Q",stderr=PIPE)
>>> dokan_errors = mp.communicate()[1]
If you are exposing an untrusted filesystem, you may like to apply the
wrapper class Win32SafetyFS before passing it into dokan. This will take
a number of steps to avoid suspicious operations on windows, such as
hiding autorun files.
The binding to Dokan is created via ctypes. Due to the very stable ABI of
win32, this should work without further configuration on just about all
systems with Dokan installed.
......@@ -64,6 +70,7 @@ from fs.base import threading
from fs.errors import *
from fs.path import *
from fs.local_functools import wraps
from fs.wrapfs import WrapFS
try:
import libdokan
......@@ -881,6 +888,39 @@ class MountProcess(subprocess.Popen):
mount(fs,drive,**opts)
class Win32SafetyFS(WrapFS):
"""FS wrapper for extra safety when mounting on win32.
This wrapper class provides some safety features when mounting untrusted
filesystems on win32. Specifically:
* hiding autorun files
* removing colons from paths
"""
def __init__(self,wrapped_fs,allow_autorun=False):
self.allow_autorun = allow_autorun
super(Win32SafetyFS,self).__init__(wrapped_fs)
def _encode(self,path):
path = relpath(normpath(path))
path = path.replace(":","__colon__")
if not self.allow_autorun:
if path.lower().startswith("/_autorun."):
path = "/" + path[2:]
return path
def _decode(self,path):
path = relpath(normpath(path))
path = path.replace("__colon__",":")
if not self.allow_autorun:
if path.lower().startswith("/autorun."):
path = "/_" + path[1:]
return path
if __name__ == "__main__":
import os, os.path
import tempfile
......
......@@ -26,8 +26,10 @@ import sys
import os
import time
import copy
import stat as statinfo
from errno import EINVAL
import fs.utils
from fs.base import FS, threading
from fs.wrapfs import WrapFS, wrap_fs_methods
from fs.wrapfs.lazyfs import LazyFS
......@@ -38,6 +40,9 @@ from fs.filelike import StringIO, SpooledTemporaryFile, FileWrapper
from fs import SEEK_SET, SEEK_CUR, SEEK_END
_SENTINAL = object()
class RemoteFileBuffer(FileWrapper):
"""File-like object providing buffer for local file operations.
......@@ -110,7 +115,10 @@ class RemoteFileBuffer(FileWrapper):
# Don't try to close a partially-constructed file
if "_lock" in self.__dict__:
if not self.closed:
self.close()
try:
self.close()
except FSError:
pass
def _write(self,data,flushing=False):
with self._lock:
......@@ -392,185 +400,320 @@ def _ConnectionManagerFS_method_wrapper(func):
wrap_fs_methods(_ConnectionManagerFS_method_wrapper,ConnectionManagerFS)
def _cached_method(func):
"""Method decorator that caches results for CacheFS."""
@wraps(func)
def wrapper(self,path="",*args,**kwds):
try:
(success,result) = self._cache_get(path,func.__name__,args,kwds)
except KeyError:
try:
res = func(self,path,*args,**kwds)
except Exception, e:
self._cache_set(path,func.__name__,args,kwds,(False,e))
raise
else:
self._cache_set(path,func.__name__,args,kwds,(True,res))
return copy.copy(res)
else:
if not success:
raise result
else:
return copy.copy(result)
return wrapper
class CachedInfo(object):
"""Info objects stored in cache for CacheFS."""
__slots__ = ("timestamp","info","has_full_info","has_full_children")
def __init__(self,info={},has_full_info=True,has_full_children=False):
self.timestamp = time.time()
self.info = info
self.has_full_info = has_full_info
self.has_full_children = has_full_children
def clone(self):
new_ci = self.__class__()
new_ci.update_from(self)
return new_ci
def update_from(self,other):
self.timestamp = other.timestamp
self.info = other.info
self.has_full_info = other.has_full_info
self.has_full_children = other.has_full_children
@classmethod
def new_file_stub(cls):
info = {"info" : 0700 | statinfo.S_IFREG}
return cls(info,has_full_info=False)
@classmethod
def new_dir_stub(cls):
info = {"info" : 0700 | statinfo.S_IFDIR}
return cls(info,has_full_info=False)
class CacheFSMixin(WrapFS):
"""Simple FS mixin to cache meta-data of a remote filesystems.
This FS mixin implements a simplistic cache that can help speed up
access to a remote filesystem. File and directory meta-data is cached
but the actual file contents are not.
If you want to add caching to an exising FS object, use the CacheFS
class instead; it's an easy-to-use wrapper rather than a mixin.
This mixin class is provided for FS implementors who want to use
caching internally in their own classes.
class CacheFS(WrapFS):
"""Simple wrapper to cache meta-data of a remote filesystems.
FYI, the implementation of CacheFS is this:
class CacheFS(CacheFSMixin,WrapFS):
pass
This FS wrapper implements a simplistic cache that can help speed up
access to a remote filesystem. File and directory meta-data is cached
but the actual file contents are not.
"""
def __init__(self,fs,timeout=1):
"""CacheFS constructor.
def __init__(self,*args,**kwds):
"""CacheFSMixin constructor.
The optional argument 'timeout' specifies the cache timeout in
seconds. The default timeout is 1 second. To prevent cache
entries from ever timing out, set it to None.
The optional keyword argument 'cache_timeout' specifies the cache
timeout in seconds. The default timeout is 1 second. To prevent
cache entries from ever timing out, set it to None.
The optional keyword argument 'max_cache_size' specifies the maximum
number of entries to keep in the cache. To allow the cache to grow
unboundedly, set it to None. The default is 1000.
"""
self.timeout = timeout
self._cache = {"":{}}
super(CacheFS,self).__init__(fs)
def _path_cache(self,path):
cache = self._cache
for name in iteratepath(path):
cache = cache.setdefault(name,{"":{}})
return cache
def _cache_get(self,path,func,args,kwds):
now = time.time()
cache = self._path_cache(path)
key = (tuple(args),tuple(sorted(kwds.iteritems())))
(t,v) = cache[""][func][key]
if self.timeout is not None:
if t < now - self.timeout:
raise KeyError
return v
def _cache_set(self,path,func,args,kwds,v):
t = time.time()
cache = self._path_cache(path)
key = (tuple(args),tuple(sorted(kwds.iteritems())))
cache[""].setdefault(func,{})[key] = (t,v)
def _uncache(self,path,added=False,removed=False,unmoved=False):
cache = self._cache
names = list(iteratepath(path))
# If it's not the root dir, also clear some items for ancestors
if names:
# Clear cached 'getinfo' and 'getsize' for all ancestors
for name in names[:-1]:
cache[""].pop("getinfo",None)
cache[""].pop("getsize",None)
cache = cache.get(name,None)
if cache is None:
return
# Adjust cached 'listdir' for parent directory.
# TODO: account for whether it was added, removed, or unmoved
cache[""].pop("getinfo",None)
cache[""].pop("getsize",None)
cache[""].pop("listdir",None)
cache[""].pop("listdirinfo",None)
# Clear all cached info for the path itself.
if names:
cache[names[-1]] = {"":{}}
self.cache_timeout = kwds.pop("cache_timeout",1)
self.max_cache_size = kwds.pop("max_cache_size",1000)
self.__cache = PathMap()
self.__cache_size = 0
self.__cache_lock = threading.RLock()
super(CacheFSMixin,self).__init__(*args,**kwds)
def clear_cache(self,path=""):
with self.__cache_lock:
self.__cache.clear(path)
try:
scc = super(CacheFSMixin,self).clear_cache
except AttributeError:
pass
else:
cache[""] = {}
scc()
def __getstate__(self):
state = super(CacheFSMixin,self).__getstate__()
state.pop("_CacheFSMixin__cache_lock",None)
return state
def __setstate__(self,state):
super(CacheFSMixin,self).__setstate__(state)
self.__cache_lock = threading.RLock()
def __get_cached_info(self,path,default=_SENTINAL):
try:
info = self.__cache[path]
if self.cache_timeout is not None:
now = time.time()
if info.timestamp < (now - self.cache_timeout):
with self.__cache_lock:
self.__expire_from_cache(path)
raise KeyError
return info
except KeyError:
if default is not _SENTINAL:
return default
raise
def __set_cached_info(self,path,new_ci,old_ci=None):
was_room = True
with self.__cache_lock:
# Free up some room in the cache
if self.max_cache_size is not None and old_ci is None:
while self.__cache_size >= self.max_cache_size:
try:
to_del = iter(self.__cache).next()
except StopIteration:
break
else:
was_room = False
self.__expire_from_cache(to_del)
# Atomically add to the cache.
# If there's a race, newest information wins
ci = self.__cache.setdefault(path,new_ci)
if ci is new_ci:
self.__cache_size += 1
else:
if old_ci is None or ci is old_ci:
if ci.timestamp < new_ci.timestamp:
ci.update_from(new_ci)
return was_room
def __expire_from_cache(self,path):
del self.__cache[path]
self.__cache_size -= 1
for ancestor in recursepath(path):
try:
self.__cache[ancestor].has_full_children = False
except KeyError:
pass
def open(self,path,mode="r",**kwds):
# Try to validate the entry using the cached info
try:
ci = self.__get_cached_info(path)
except KeyError:
if path in ("","/"):
raise ResourceInvalidError(path)
try:
pci = self.__get_cached_info(dirname(path))
except KeyError:
pass
else:
if not fs.utils.isdir(super(CacheFSMixin,self),path,pci.info):
raise ResourceInvalidError(path)
if pci.has_full_children:
raise ResourceNotFoundError(path)
else:
if not fs.utils.isfile(super(CacheFSMixin,self),path,ci.info):
raise ResourceInvalidError(path)
return super(CacheFSMixin,self).open(path,mode,**kwds)
@_cached_method
def exists(self,path):
return super(CacheFS,self).exists(path)
try:
self.getinfo(path)
except ResourceNotFoundError:
return False
else:
return True
@_cached_method
def isdir(self,path):
return super(CacheFS,self).isdir(path)
try:
self.__cache.iternames(path).next()
return True
except StopIteration:
pass
try:
info = self.getinfo(path)
except ResourceNotFoundError:
return False
else:
return fs.utils.isdir(super(CacheFSMixin,self),path,info)
@_cached_method
def isfile(self,path):
return super(CacheFS,self).isfile(path)
@_cached_method
def listdir(self,path="",*args,**kwds):
return super(CacheFS,self).listdir(path,*args,**kwds)
@_cached_method
def listdirinfo(self,path="",*args,**kwds):
return super(CacheFS,self).listdirinfo(path,*args,**kwds)
try:
self.__cache.iternames(path).next()
return False
except StopIteration:
pass
try:
info = self.getinfo(path)
except ResourceNotFoundError:
return False
else:
return fs.utils.isfile(super(CacheFSMixin,self),path,info)
@_cached_method
def getinfo(self,path):
return super(CacheFS,self).getinfo(path)
@_cached_method
def getsize(self,path):
return super(CacheFS,self).getsize(path)
try:
ci = self.__get_cached_info(path)
if not ci.has_full_info:
raise KeyError
info = ci.info
except KeyError:
info = super(CacheFSMixin,self).getinfo(path)
self.__set_cached_info(path,CachedInfo(info))
return info
@_cached_method
def getxattr(self,path,name,default=None):
return super(CacheFS,self).getxattr(path,name,default)
def listdir(self,path="",*args,**kwds):
return list(nm for (nm,info) in self.listdirinfo(path,*args,**kwds))
@_cached_method
def listxattrs(self,path):
return super(CacheFS,self).listxattrs(path)
def ilistdir(self,path="",*args,**kwds):
for (nm,info) in self.ilistdirinfo(path,*args,**kwds):
yield nm
def open(self,path,mode="r"):
f = super(CacheFS,self).open(path,mode)
self._uncache(path,unmoved=True)
return f
def listdirinfo(self,path="",*args,**kwds):
items = super(CacheFSMixin,self).listdirinfo(path,*args,**kwds)
with self.__cache_lock:
names = set()
for (nm,info) in items:
names.add(basename(nm))
cpath = pathjoin(path,basename(nm))
ci = CachedInfo(info)
self.__set_cached_info(cpath,ci)
to_del = []
for nm in self.__cache.iternames(path):
if nm not in names:
to_del.append(nm)
for nm in to_del:
self.__cache.clear(pathjoin(path,nm))
#try:
# pci = self.__cache[path]
#except KeyError:
# pci = CachedInfo.new_dir_stub()
# self.__cache[path] = pci
#pci.has_full_children = True
return items
def ilistdirinfo(self,path="",*args,**kwds):
items = super(CacheFSMixin,self).ilistdirinfo(path,*args,**kwds)
for (nm,info) in items:
cpath = pathjoin(path,basename(nm))
ci = CachedInfo(info)
self.__set_cached_info(cpath,ci)
yield (nm,info)
def setcontents(self, path, contents='', chunk_size=64*1024):
res = super(CacheFS,self).setcontents(path, contents, chunk_size=chunk_size)
self._uncache(path,unmoved=True)
def getsize(self,path):
return self.getinfo(path)["size"]
def setcontents(self, path, contents="", chunk_size=64*1024):
supsc = super(CacheFSMixin,self).setcontents
res = supsc(path, contents, chunk_size=chunk_size)
with self.__cache_lock:
self.__cache.clear(path)
self.__cache[path] = CachedInfo.new_file_stub()
return res
def getcontents(self,path):
res = super(CacheFS,self).getcontents(path)
self._uncache(path,unmoved=True)
return res
def createfile(self, path):
super(CacheFSMixin,self).createfile(path)
with self.__cache_lock:
self.__cache.clear(path)
self.__cache[path] = CachedInfo.new_file_stub()
def makedir(self,path,**kwds):
super(CacheFS,self).makedir(path,**kwds)
self._uncache(path,added=True)
def makedir(self,path,*args,**kwds):
super(CacheFSMixin,self).makedir(path,*args,**kwds)
with self.__cache_lock:
self.__cache.clear(path)
self.__cache[path] = CachedInfo.new_dir_stub()
def remove(self,path):
super(CacheFS,self).remove(path)
self._uncache(path,removed=True)
super(CacheFSMixin,self).remove(path)
with self.__cache_lock:
self.__cache.clear(path)
def removedir(self,path,**kwds):
super(CacheFS,self).removedir(path,**kwds)
self._uncache(path,removed=True)
super(CacheFSMixin,self).removedir(path,**kwds)
with self.__cache_lock:
self.__cache.clear(path)
def rename(self,src,dst):
super(CacheFS,self).rename(src,dst)
self._uncache(src,removed=True)
self._uncache(dst,added=True)
super(CacheFSMixin,self).rename(src,dst)
with self.__cache_lock:
for (subpath,ci) in self.__cache.iteritems(src):
self.__cache[pathjoin(dst,subpath)] = ci.clone()
self.__cache.clear(src)
def copy(self,src,dst,**kwds):
super(CacheFS,self).copy(src,dst,**kwds)
self._uncache(dst,added=True)
super(CacheFSMixin,self).copy(src,dst,**kwds)
with self.__cache_lock:
for (subpath,ci) in self.__cache.iteritems(src):
self.__cache[pathjoin(dst,subpath)] = ci.clone()
def copydir(self,src,dst,**kwds):
super(CacheFS,self).copydir(src,dst,**kwds)
self._uncache(dst,added=True)
super(CacheFSMixin,self).copydir(src,dst,**kwds)
with self.__cache_lock:
for (subpath,ci) in self.__cache.iteritems(src):
self.__cache[pathjoin(dst,subpath)] = ci.clone()
def move(self,src,dst,**kwds):
super(CacheFS,self).move(src,dst,**kwds)
self._uncache(src,removed=True)
self._uncache(dst,added=True)
super(CacheFSMixin,self).move(src,dst,**kwds)
with self.__cache_lock:
for (subpath,ci) in self.__cache.iteritems(src):
self.__cache[pathjoin(dst,subpath)] = ci.clone()
self.__cache.clear(src)
def movedir(self,src,dst,**kwds):
super(CacheFS,self).movedir(src,dst,**kwds)
self._uncache(src,removed=True)
self._uncache(dst,added=True)
super(CacheFSMixin,self).movedir(src,dst,**kwds)
with self.__cache_lock:
for (subpath,ci) in self.__cache.iteritems(src):
self.__cache[pathjoin(dst,subpath)] = ci.clone()
self.__cache.clear(src)
def settimes(self,path,*args,**kwds):
super(CacheFSMixin,self).settimes(path,*args,**kwds)
with self.__cache_lock:
self.__cache.pop(path,None)
def setxattr(self,path,name,value):
self._uncache(path,unmoved=True)
return super(CacheFS,self).setxattr(path,name,value)
class CacheFS(CacheFSMixin,WrapFS):
"""Simple FS wraper to cache meta-data of a remote filesystems.
This FS mixin implements a simplistic cache that can help speed up
access to a remote filesystem. File and directory meta-data is cached
but the actual file contents are not.
"""
pass
def delxattr(self,path,name):
self._uncache(path,unmoved=True)
return super(CacheFS,self).delxattr(path,name)
......@@ -98,6 +98,7 @@ class FSTestCases(object):
except ResourceInvalidError:
pass
except Exception:
raise
ecls = sys.exc_info()[0]
assert False, "%s raised instead of ResourceInvalidError" % (ecls,)
else:
......
......@@ -179,6 +179,17 @@ if dokan.is_available:
# out. Disabling the test for now.
pass
def test_safety_wrapper(self):
rawfs = MemoryFS()
safefs = dokan.Win32SafetyFS(rawfs)
rawfs.setcontents("autoRun.inf","evilcodeevilcode")
self.assertFalse(safefs.exists("autoRun.inf"))
self.assertTrue(safefs.exists("_autoRun.inf"))
self.assertTrue("autoRun.inf" not in safefs.listdir("/"))
rawfs.setcontents("file:stream","test")
self.assertFalse(safefs.exists("file:stream"))
self.assertTrue(safefs.exists("file__colon__stream"))
class TestDokan(unittest.TestCase,DokanTestCases,ThreadingTestCases):
def setUp(self):
......@@ -208,4 +219,4 @@ if dokan.is_available:
self.temp_fs.close()
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
unittest.main()
......@@ -216,18 +216,51 @@ class TestRemoteFileBuffer(unittest.TestCase, FSTestCases, ThreadingTestCases):
self.assertEquals(f.read(), contents[:10] + contents2)
f.close()
class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of CacheFS"""
def setUp(self):
self._check_interval = sys.getcheckinterval()
sys.setcheckinterval(10)
self.fs = CacheFS(TempFS())
self.wrapped_fs = TempFS()
self.fs = CacheFS(self.wrapped_fs,cache_timeout=0.01)
def tearDown(self):
self.fs.close()
sys.setcheckinterval(self._check_interval)
def test_values_are_used_from_cache(self):
old_timeout = self.fs.cache_timeout
self.fs.cache_timeout = None
try:
self.assertFalse(self.fs.isfile("hello"))
self.wrapped_fs.setcontents("hello","world")
self.assertTrue(self.fs.isfile("hello"))
self.wrapped_fs.remove("hello")
self.assertTrue(self.fs.isfile("hello"))
self.fs.clear_cache()
self.assertFalse(self.fs.isfile("hello"))
finally:
self.fs.cache_timeout = old_timeout
def test_values_are_updated_in_cache(self):
old_timeout = self.fs.cache_timeout
self.fs.cache_timeout = None
try:
self.assertFalse(self.fs.isfile("hello"))
self.wrapped_fs.setcontents("hello","world")
self.assertTrue(self.fs.isfile("hello"))
self.wrapped_fs.remove("hello")
self.assertTrue(self.fs.isfile("hello"))
self.wrapped_fs.setcontents("hello","world")
self.assertTrue(self.fs.isfile("hello"))
self.fs.remove("hello")
self.assertFalse(self.fs.isfile("hello"))
finally:
self.fs.cache_timeout = old_timeout
class TestConnectionManagerFS(unittest.TestCase,FSTestCases):#,ThreadingTestCases):
"""Test simple operation of ConnectionManagerFS"""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment