Commit 32e8c355 by rfkelly0

thread-safe RemoteFileBuffer

parent f2e350a7
...@@ -10,6 +10,7 @@ implementations of this interface such as: ...@@ -10,6 +10,7 @@ implementations of this interface such as:
TempFS: a temporary filesystem that's automatically cleared on exit TempFS: a temporary filesystem that's automatically cleared on exit
MemoryFS: a filesystem that exists only in memory MemoryFS: a filesystem that exists only in memory
ZipFS: access a zipfile like a filesystem ZipFS: access a zipfile like a filesystem
SFTPFS: access files on a SFTP server
S3FS: access files stored in Amazon S3 S3FS: access files stored in Amazon S3
""" """
...@@ -17,11 +18,11 @@ implementations of this interface such as: ...@@ -17,11 +18,11 @@ implementations of this interface such as:
__version__ = "0.2.0" __version__ = "0.2.0"
__author__ = "Will McGugan (will@willmcgugan.com)" __author__ = "Will McGugan (will@willmcgugan.com)"
# 'base' imports * from 'path' and 'errors', so their contents # 'base' imports * from 'path' and 'errors', so their
# will be available here as well. # contents will be available here as well.
from base import * from base import *
# provide these by default so people cna be 'fs.path.basename' etc. # provide these by default so people can use 'fs.path.basename' etc.
import errors import errors
import path import path
......
...@@ -2,6 +2,20 @@ ...@@ -2,6 +2,20 @@
fs.remote: utilities for interfacing with remote filesystems fs.remote: utilities for interfacing with remote filesystems
This module provides reusable utility functions that can be used to construct
FS subclasses interfacing with a remote filesystem. These include:
RemoteFileBuffer: a file-like object that locally buffers the contents
of a remote file, writing them back on flush() or close().
ConnectionManagerFS: a WrapFS subclass that tracks the connection state
of a remote FS, and allows client code to wait for
a connection to be re-established.
CacheFS: a WrapFS subclass that caces file and directory meta-data in
memory, to speed access to a remote FS.
""" """
import time import time
...@@ -49,13 +63,18 @@ class RemoteFileBuffer(object): ...@@ -49,13 +63,18 @@ class RemoteFileBuffer(object):
The owning filesystem, path and mode must be provided. If the The owning filesystem, path and mode must be provided. If the
optional argument 'rfile' is provided, it must be a read()-able optional argument 'rfile' is provided, it must be a read()-able
object containing the initial file contents. object or a string containing the initial file contents.
""" """
self.file = TempFile() self.file = TempFile()
self.fs = fs self.fs = fs
self.path = path self.path = path
self.mode = mode self.mode = mode
self.closed = False self.closed = False
self._flushed = False
if hasattr(fs,"_lock"):
self._lock = copy.deepcopy(fs._lock)
else:
self._lock = threading.RLock()
if rfile is not None: if rfile is not None:
if hasattr(rfile,"read"): if hasattr(rfile,"read"):
data = rfile.read(1024*256) data = rfile.read(1024*256)
...@@ -66,18 +85,26 @@ class RemoteFileBuffer(object): ...@@ -66,18 +85,26 @@ class RemoteFileBuffer(object):
self.file.write(str(rfile)) self.file.write(str(rfile))
if "a" not in mode: if "a" not in mode:
self.file.seek(0) self.file.seek(0)
def __del__(self): def __del__(self):
if not self.closed: if not self.closed:
self.close() self.close()
# This is lifted straight from the stdlib's tempfile.py
def __getattr__(self,name): def __getattr__(self,name):
file = self.__dict__['file'] file = self.__dict__['file']
a = getattr(file, name) a = getattr(file, name)
if not issubclass(type(a), type(0)): if not callable(a):
setattr(self, name, a) return a
return a @wraps(a)
def call_with_lock(*args,**kwds):
self._lock.acquire()
try:
self._flushed = False
return a(*args,**kwds)
finally:
self._lock.release()
setattr(self, name, call_with_lock)
return call_with_lock
def __enter__(self): def __enter__(self):
self.file.__enter__() self.file.__enter__()
...@@ -90,20 +117,37 @@ class RemoteFileBuffer(object): ...@@ -90,20 +117,37 @@ class RemoteFileBuffer(object):
def __iter__(self): def __iter__(self):
return iter(self.file) return iter(self.file)
def truncate(self,size=None):
self._lock.acquire()
try:
self.file.truncate(size)
self.flush()
finally:
self._lock.release()
def flush(self): def flush(self):
self.file.flush() self._lock.acquire()
if "w" in self.mode or "a" in self.mode or "+" in self.mode: try:
pos = self.file.tell() self.file.flush()
self.file.seek(0) if "w" in self.mode or "a" in self.mode or "+" in self.mode:
self.fs.setcontents(self.path,self.file) pos = self.file.tell()
self.file.seek(pos) self.file.seek(0)
self.fs.setcontents(self.path,self.file)
self.file.seek(pos)
self._flushed = True
finally:
self._lock.release()
def close(self): def close(self):
self.closed = True self._lock.acquire()
if "w" in self.mode or "a" in self.mode or "+" in self.mode: try:
self.file.seek(0) self.closed = True
self.fs.setcontents(self.path,self.file) if "w" in self.mode or "a" in self.mode or "+" in self.mode:
self.file.close() self.file.seek(0)
self.fs.setcontents(self.path,self.file)
self.file.close()
finally:
self._lock.release()
class ConnectionManagerFS(WrapFS): class ConnectionManagerFS(WrapFS):
...@@ -285,7 +329,7 @@ def _cached_method(func): ...@@ -285,7 +329,7 @@ def _cached_method(func):
class CacheFS(WrapFS): class CacheFS(WrapFS):
"""Simple wrapper to cache meta-data of a remote filesystems. """Simple wrapper to cache meta-data of a remote filesystems.
This FS wrapper implements a simplistic cache that can help speedup This FS wrapper implements a simplistic cache that can help speed up
access to a remote filesystem. File and directory meta-data is cached access to a remote filesystem. File and directory meta-data is cached
but the actual file contents are not. but the actual file contents are not.
""" """
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment