Commit 359377ac by rfkelly0

add explicit close() method for ConnectionManagerFS

parent 5af99561
...@@ -162,6 +162,7 @@ class ConnectionManagerFS(WrapFS): ...@@ -162,6 +162,7 @@ class ConnectionManagerFS(WrapFS):
self._fskwds = {} self._fskwds = {}
self._connection_cond = threading.Condition() self._connection_cond = threading.Condition()
self._poll_thread = None self._poll_thread = None
self._poll_sleeper = threading.Event()
self.connected = connected self.connected = connected
@property @property
...@@ -183,12 +184,14 @@ class ConnectionManagerFS(WrapFS): ...@@ -183,12 +184,14 @@ class ConnectionManagerFS(WrapFS):
def __getstate__(self): def __getstate__(self):
state = super(ConnectionManagerFS,self).__getstate__() state = super(ConnectionManagerFS,self).__getstate__()
del state["_connection_cond"] del state["_connection_cond"]
del state["_poll_sleeper"]
state["_poll_thread"] = None state["_poll_thread"] = None
return state return state
def __setstate__(self,state): def __setstate__(self,state):
super(ConnectionManagerFS,self).__setstate__(state) super(ConnectionManagerFS,self).__setstate__(state)
self._connection_cond = threading.Condition() self._connection_cond = threading.Condition()
self._poll_sleeper = threading.Event()
def wait_for_connection(self,timeout=None): def wait_for_connection(self,timeout=None):
self._connection_cond.acquire() self._connection_cond.acquire()
...@@ -205,7 +208,8 @@ class ConnectionManagerFS(WrapFS): ...@@ -205,7 +208,8 @@ class ConnectionManagerFS(WrapFS):
try: try:
self.wrapped_fs.isdir("") self.wrapped_fs.isdir("")
except RemoteConnectionError: except RemoteConnectionError:
time.sleep(self.poll_interval) self._poll_sleeper.wait(self.poll_interval)
self._poll_sleeper.clear()
continue continue
except FSError: except FSError:
break break
...@@ -217,6 +221,17 @@ class ConnectionManagerFS(WrapFS): ...@@ -217,6 +221,17 @@ class ConnectionManagerFS(WrapFS):
self._connection_cond.notifyAll() self._connection_cond.notifyAll()
self._connection_cond.release() self._connection_cond.release()
def close(self):
try:
self.wrapped_fs.close()
except (RemoteConnectionError,AttributeError):
pass
if self._poll_thread:
self.connected = True
self._poll_sleeper.set()
self._poll_thread.join()
self._poll_thread = None
def _ConnectionManagerFS_method_wrapper(func): def _ConnectionManagerFS_method_wrapper(func):
"""Method wrapper for ConnectionManagerFS. """Method wrapper for ConnectionManagerFS.
......
...@@ -470,7 +470,8 @@ class FSTestCases: ...@@ -470,7 +470,8 @@ class FSTestCases:
"""Generate predictable-but-randomy binary content.""" """Generate predictable-but-randomy binary content."""
r = random.Random(0) r = random.Random(0)
for i in xrange(num_chunks): for i in xrange(num_chunks):
yield "".join(chr(r.randint(0,255)) for j in xrange(chunk_size)) c = "".join(chr(r.randint(0,255)) for j in xrange(chunk_size/8))
yield c * 8
f = self.fs.open("bigfile","wb") f = self.fs.open("bigfile","wb")
try: try:
for chunk in chunk_stream(): for chunk in chunk_stream():
......
...@@ -125,6 +125,10 @@ class WrapFS(FS): ...@@ -125,6 +125,10 @@ class WrapFS(FS):
return self._file_wrap(f,mode) return self._file_wrap(f,mode)
@rewrite_errors @rewrite_errors
def setcontents(self,path,contents):
self.wrapped_fs.setcontents(self._encode(path),contents)
@rewrite_errors
def exists(self,path): def exists(self,path):
return self.wrapped_fs.exists(self._encode(path)) return self.wrapped_fs.exists(self._encode(path))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment