Commit a8a8853e by rfkelly0

added fs.remote.ConnectionManagerFS class, which provides some simple facilities…

added fs.remote.ConnectionManagerFS class, which provides some simple facilities for dealing with RemoteConnectionErrors
parent be9ea223
...@@ -7,9 +7,10 @@ ...@@ -7,9 +7,10 @@
import time import time
import copy import copy
from fs.base import FS, threading
from fs.wrapfs import WrapFS, wrap_fs_methods
from fs.path import * from fs.path import *
from fs.errors import * from fs.errors import *
from fs.wrapfs import WrapFS
try: try:
from tempfile import SpooledTemporaryFile as TempFile from tempfile import SpooledTemporaryFile as TempFile
...@@ -56,10 +57,13 @@ class RemoteFileBuffer(object): ...@@ -56,10 +57,13 @@ class RemoteFileBuffer(object):
self.mode = mode self.mode = mode
self.closed = False self.closed = False
if rfile is not None: if rfile is not None:
if hasattr(rfile,"read"):
data = rfile.read(1024*256) data = rfile.read(1024*256)
while data: while data:
self.file.write(data) self.file.write(data)
data = rfile.read(1024*256) data = rfile.read(1024*256)
else:
self.file.write(str(rfile))
if "a" not in mode: if "a" not in mode:
self.file.seek(0) self.file.seek(0)
...@@ -102,8 +106,147 @@ class RemoteFileBuffer(object): ...@@ -102,8 +106,147 @@ class RemoteFileBuffer(object):
self.file.close() self.file.close()
class ConnectionManagerFS(WrapFS):
"""FS wrapper providing simple connection management of a remote FS.
The ConnectionManagerFS class is designed to wrap a remote FS object
and provide some convenience methods for dealing with its remote
connection state.
The boolean attribute 'connected' indicates whether the remote fileystem
has an active connection, and is initially True. If any of the remote
filesystem methods raises a RemoteConnectionError, 'connected' will
switch to False and remain so until a successful remote method call.
Application code can use the method 'wait_for_connection' to block
until the connection is re-established. Currently this reconnection
is checked by a simple polling loop; eventually more sophisticated
operating-system integration may be added.
Since some remote FS classes can raise RemoteConnectionError during
initialisation, this class also provides a simple "lazy initialisation"
facility. The remote FS can be specified as an FS instance, an FS
subclass, or a (class,args) or (class,args,kwds) tuple. For example:
>>> fs = ConnectionManagerFS(MyRemoteFS("http://www.example.com/"))
Traceback (most recent call last):
...
RemoteConnectionError: couldn't connect to "http://www.example.com/"
>>> fs = ConnectionManagerFS((MyRemoteFS,["http://www.example.com/"]))
>>> fs.connected
False
>>>
"""
poll_interval = 1
def __init__(self,fs,poll_interval=None):
if poll_interval is not None:
self.poll_interval = poll_interval
if isinstance(fs,FS):
self.__dict__["wrapped_fs"] = fs
elif isinstance(fs,type):
self._fsclass = fs
self._fsargs = []
self._fskwds = {}
else:
self._fsclass = fs[0]
try:
self._fsargs = fs[1]
except IndexError:
self._fsargs = []
try:
self._fskwds = fs[2]
except IndexError:
self._fskwds = {}
self._connection_cond = threading.Condition()
self._poll_thread = None
try:
self.wrapped_fs.isdir("")
except RemoteConnectionError:
self.connected = False
else:
self.connected = True
@property
def wrapped_fs(self):
try:
return self.__dict__["wrapped_fs"]
except KeyError:
self._connection_cond.acquire()
try:
try:
return self.__dict__["wrapped_fs"]
except KeyError:
fs = self._fsclass(*self._fsargs,**self._fskwds)
self.__dict__["wrapped_fs"] = fs
return fs
finally:
self._connection_cond.release()
def __getstate__(self):
state = super(ConnectionManagerFS,self).__getstate__()
del state["_connection_cond"]
state["_poll_thread"] = None
return state
def __setstate__(self,state):
super(ConnectionManagerFS,self).__setstate__(state)
self._connection_cond = threading.Condition()
def wait_for_connection(self,timeout=None):
self._connection_cond.acquire()
if not self.connected:
if not self._poll_thread:
target = self._poll_connection
self._poll_thread=threading.Thread(target=target)
self._poll_thread.start()
self._connection_cond.wait(timeout)
self._connection_cond.release()
def _poll_connection(self):
while not self.connected:
try:
self.wrapped_fs.isdir("")
except RemoteConnectionError:
time.sleep(self.poll_interval)
continue
except FSError:
break
else:
break
self._connection_cond.acquire()
self.connected = True
self._poll_thread = None
self._connection_cond.notifyAll()
self._connection_cond.release()
def _ConnectionManagerFS_method_wrapper(func):
"""Method wrapper for ConnectionManagerFS.
This method wrapper keeps an eye out for RemoteConnectionErrors and
adjusts self.connected accordingly.
"""
@wraps(func)
def wrapper(self,*args,**kwds):
try:
result = func(self,*args,**kwds)
except RemoteConnectionError:
self.connected = False
raise
except FSError:
self.connected = True
raise
else:
self.connected = True
return result
return wrapper
wrap_fs_methods(_ConnectionManagerFS_method_wrapper,ConnectionManagerFS)
def cached(func): def _cached_method(func):
"""Method decorator that caches results for CacheFS.""" """Method decorator that caches results for CacheFS."""
@wraps(func) @wraps(func)
def wrapper(self,path="",*args,**kwds): def wrapper(self,path="",*args,**kwds):
...@@ -187,35 +330,35 @@ class CacheFS(WrapFS): ...@@ -187,35 +330,35 @@ class CacheFS(WrapFS):
# Clear all cached info for the path itself. # Clear all cached info for the path itself.
cache[names[-1]] = {"":{}} cache[names[-1]] = {"":{}}
@cached @_cached_method
def exists(self,path): def exists(self,path):
return super(CacheFS,self).exists(path) return super(CacheFS,self).exists(path)
@cached @_cached_method
def isdir(self,path): def isdir(self,path):
return super(CacheFS,self).isdir(path) return super(CacheFS,self).isdir(path)
@cached @_cached_method
def isfile(self,path): def isfile(self,path):
return super(CacheFS,self).isfile(path) return super(CacheFS,self).isfile(path)
@cached @_cached_method
def listdir(self,path="",**kwds): def listdir(self,path="",**kwds):
return super(CacheFS,self).listdir(path,**kwds) return super(CacheFS,self).listdir(path,**kwds)
@cached @_cached_method
def getinfo(self,path): def getinfo(self,path):
return super(CacheFS,self).getinfo(path) return super(CacheFS,self).getinfo(path)
@cached @_cached_method
def getsize(self,path): def getsize(self,path):
return super(CacheFS,self).getsize(path) return super(CacheFS,self).getsize(path)
@cached @_cached_method
def getxattr(self,path,name): def getxattr(self,path,name):
return super(CacheFS,self).getxattr(path,name) return super(CacheFS,self).getxattr(path,name)
@cached @_cached_method
def listxattrs(self,path): def listxattrs(self,path):
return super(CacheFS,self).listxattrs(path) return super(CacheFS,self).listxattrs(path)
......
...@@ -552,6 +552,7 @@ class ThreadingTestCases: ...@@ -552,6 +552,7 @@ class ThreadingTestCases:
continue continue
if self.fs.exists(subdir): if self.fs.exists(subdir):
self.fs.removedir(subdir,force=True) self.fs.removedir(subdir,force=True)
self.assertFalse(self.fs.isdir(subdir))
self.fs.makedir(subdir) self.fs.makedir(subdir)
self._yield() self._yield()
getattr(this,meth)() getattr(this,meth)()
......
""" """
fs.tests.test_remote: testcases for FS remote support fs.tests.test_remote: testcases for FS remote support utilities
""" """
from fs.tests import FSTestCases, ThreadingTestCases from fs.tests import FSTestCases, ThreadingTestCases
import unittest import unittest
import threading
import random
import time
from fs.remote import * from fs.remote import *
from fs.wrapfs import WrapFS, wrap_fs_methods
from fs.tempfs import TempFS from fs.tempfs import TempFS
from fs.path import * from fs.path import *
class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases): class TestCacheFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of CacheFS"""
def setUp(self): def setUp(self):
sys.setcheckinterval(1) sys.setcheckinterval(10)
self.fs = CacheFS(TempFS()) self.fs = CacheFS(TempFS())
def tearDown(self): def tearDown(self):
self.fs.close() self.fs.close()
class TestConnectionManagerFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
"""Test simple operation of ConnectionManagerFS"""
def setUp(self):
sys.setcheckinterval(10)
self.fs = ConnectionManagerFS(TempFS())
def tearDown(self):
self.fs.close()
class DisconnectingFS(WrapFS):
"""FS subclass that raises lots of RemoteConnectionErrors."""
def __init__(self):
super(DisconnectingFS,self).__init__(TempFS())
self._connected = random.choice([True,False])
if not self._connected:
raise RemoteConnectionError("")
self._continue = True
self._bounce_thread = threading.Thread(target=self._bounce)
self._bounce_thread.start()
def __getstate__(self):
state = super(DisconnectingFS,self).__getstate__()
del state["_bounce_thread"]
return state
def __setstate__(self,state):
super(DisconnectingFS,self).__setstate__(state)
self._bounce_thread = threading.Thread(target=self._bounce)
self._bounce_thread.start()
def _bounce(self):
while self._continue:
time.sleep(random.random()*0.1)
self._connected = not self._connected
def close(self):
self._continue = False
self._bounce_thread.join()
self._connected = True
self.wrapped_fs.close()
def _encode(self,path):
if not self._connected:
raise RemoteConnectionError("")
return path
class DisconnectRecoveryFS(WrapFS):
"""FS subclass that recovers from RemoteConnectionErrors by waiting."""
pass
def recovery_wrapper(func):
"""Method wrapper to recover from RemoteConnectionErrors by waiting."""
@wraps(func)
def wrapper(self,*args,**kwds):
while True:
try:
return func(self,*args,**kwds)
except RemoteConnectionError:
self.wrapped_fs.wait_for_connection()
return wrapper
# this also checks that wrap_fs_methods works as a class decorator
DisconnectRecoveryFS = wrap_fs_methods(recovery_wrapper)(DisconnectRecoveryFS)
class TestConnectionManagerFS_disconnect(TestConnectionManagerFS):
"""Test ConnectionManagerFS's ability to wait for reconnection."""
def setUp(self):
sys.setcheckinterval(10)
c_fs = ConnectionManagerFS(DisconnectingFS,poll_interval=0.1)
self.fs = DisconnectRecoveryFS(c_fs)
def tearDown(self):
self.fs.close()
...@@ -38,6 +38,7 @@ class XAttrTestCases: ...@@ -38,6 +38,7 @@ class XAttrTestCases:
def do_list(p): def do_list(p):
self.assertEquals(sorted(self.fs.listxattrs(p)),[]) self.assertEquals(sorted(self.fs.listxattrs(p)),[])
self.fs.setxattr(p,"xattr1","value1") self.fs.setxattr(p,"xattr1","value1")
self.assertEquals(self.fs.getxattr(p,"xattr1"),"value1")
self.assertEquals(sorted(self.fs.listxattrs(p)),["xattr1"]) self.assertEquals(sorted(self.fs.listxattrs(p)),["xattr1"])
self.fs.setxattr(p,"attr2","value2") self.fs.setxattr(p,"attr2","value2")
self.assertEquals(sorted(self.fs.listxattrs(p)),["attr2","xattr1"]) self.assertEquals(sorted(self.fs.listxattrs(p)),["attr2","xattr1"])
...@@ -114,3 +115,4 @@ class TestXAttr_MemoryFS(unittest.TestCase,FSTestCases,XAttrTestCases): ...@@ -114,3 +115,4 @@ class TestXAttr_MemoryFS(unittest.TestCase,FSTestCases,XAttrTestCases):
def check(self, p): def check(self, p):
return self.fs.exists(p) return self.fs.exists(p)
...@@ -59,7 +59,7 @@ class WrapFS(FS): ...@@ -59,7 +59,7 @@ class WrapFS(FS):
super(WrapFS,self).__init__() super(WrapFS,self).__init__()
try: try:
self._lock = fs._lock self._lock = fs._lock
except AttributeError: except (AttributeError,FSError):
self._lock = None self._lock = None
self.wrapped_fs = fs self.wrapped_fs = fs
...@@ -194,6 +194,22 @@ class WrapFS(FS): ...@@ -194,6 +194,22 @@ class WrapFS(FS):
def copydir(self,src,dst,overwrite=False,ignore_errors=False,chunk_size=16384): def copydir(self,src,dst,overwrite=False,ignore_errors=False,chunk_size=16384):
return self.wrapped_fs.copydir(self._encode(src),self._encode(dst),overwrite,ignore_errors,chunk_size) return self.wrapped_fs.copydir(self._encode(src),self._encode(dst),overwrite,ignore_errors,chunk_size)
@rewrite_errors
def getxattr(self,path,name,default=None):
return self.wrapped_fs.getxattr(self._encode(path),name,default)
@rewrite_errors
def setxattr(self,path,name,value):
return self.wrapped_fs.setxattr(self._encode(path),name,value)
@rewrite_errors
def delxattr(self,path,name):
return self.wrapped_fs.delxattr(self._encode(path),name)
@rewrite_errors
def listxattrs(self,path):
return self.wrapped_fs.listxattrs(self._encode(path))
def __getattr__(self,attr): def __getattr__(self,attr):
return getattr(self.wrapped_fs,attr) return getattr(self.wrapped_fs,attr)
...@@ -202,6 +218,38 @@ class WrapFS(FS): ...@@ -202,6 +218,38 @@ class WrapFS(FS):
if hasattr(self.wrapped_fs,"close"): if hasattr(self.wrapped_fs,"close"):
self.wrapped_fs.close() self.wrapped_fs.close()
def wrap_fs_methods(decorator,cls=None):
"""Apply the given decorator to all FS methods on the given class.
This function can be used in two ways. When called with two arguments it
applies the given function 'decorator' to each FS method of the given
class. When called with just a single argument, it creates and returns
a class decorator which will do the same thing when applied. So you can
use it like this:
wrap_fs_methods(mydecorator,MyFSClass)
Or on more recent Python versions, like this:
@wrap_fs_methods(mydecorator)
class MyFSClass(FS):
...
"""
methods = ("open","exists","isdir","isfile","listdir","makedir","remove",
"removedir","rename","getinfo","copy","move","copydir",
"movedir","close","getxattr","setxattr","delxattr","listxattrs")
def apply_decorator(cls):
for method_name in methods:
method = getattr(cls,method_name,None)
if method is not None:
setattr(cls,method_name,decorator(method))
return cls
if cls is not None:
return apply_decorator(cls)
else:
return apply_decorator
class HideDotFiles(WrapFS): class HideDotFiles(WrapFS):
"""FS wrapper class that hides dot-files in directory listings. """FS wrapper class that hides dot-files in directory listings.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment