Commit 0b1ccbd1 by rfkelly0

make open(path,"w") immediately create an empty file in S3FS/RPCFS

parent b7711dd3
...@@ -39,19 +39,6 @@ class ObjProxy: ...@@ -39,19 +39,6 @@ class ObjProxy:
def __getattr__(self,attr): def __getattr__(self,attr):
return getattr(self._obj,attr) return getattr(self._obj,attr)
class ReRaiseErrors:
"""XML-RPC proxy wrapper that tries to re-raise Faults as actual errors."""
def __init__(self,proxy):
self._proxy = proxy
def __getattr__(self,attr):
val = getattr(self._proxy,attr)
if not callable(val):
return val
def func(*args,**kwds):
try:
val(*args,**kwds)
except xmlrpclib.Fault, e:
print "ERROR", e, e.faultCode, e.faultString
def re_raise_faults(func): def re_raise_faults(func):
"""Decorator to re-raise XML-RPC faults as proper exceptions.""" """Decorator to re-raise XML-RPC faults as proper exceptions."""
...@@ -108,6 +95,7 @@ class ReRaiseFaults: ...@@ -108,6 +95,7 @@ class ReRaiseFaults:
val = getattr(self._obj,attr) val = getattr(self._obj,attr)
if callable(val): if callable(val):
val = re_raise_faults(val) val = re_raise_faults(val)
self.__dict__[attr] = val
return val return val
...@@ -128,7 +116,7 @@ class RPCFS(FS): ...@@ -128,7 +116,7 @@ class RPCFS(FS):
The only required argument is the uri of the server to connect The only required argument is the uri of the server to connect
to. This will be passed to the underlying XML-RPC server proxy to. This will be passed to the underlying XML-RPC server proxy
object along with any other arguments if they are provided. object along with the 'transport' argument if it is provided.
""" """
self.uri = uri self.uri = uri
if transport is not None: if transport is not None:
...@@ -144,7 +132,9 @@ class RPCFS(FS): ...@@ -144,7 +132,9 @@ class RPCFS(FS):
def open(self,path,mode): def open(self,path,mode):
# TODO: chunked transport of large files # TODO: chunked transport of large files
if "r" in mode or "a" in mode: if "w" in mode:
self.proxy.set_contents(path,xmlrpclib.Binary(""))
if "r" in mode or "a" in mode or "+" in mode:
try: try:
data = self.proxy.get_contents(path).data data = self.proxy.get_contents(path).data
except IOError: except IOError:
...@@ -152,6 +142,7 @@ class RPCFS(FS): ...@@ -152,6 +142,7 @@ class RPCFS(FS):
raise ResourceNotFoundError("NO_FILE",path) raise ResourceNotFoundError("NO_FILE",path)
if not self.isdir(dirname(path)): if not self.isdir(dirname(path)):
raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist") raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist")
self.proxy.set_contents(path,xmlrpclib.Binary(""))
else: else:
data = "" data = ""
f = ObjProxy(StringIO(data)) f = ObjProxy(StringIO(data))
...@@ -159,11 +150,15 @@ class RPCFS(FS): ...@@ -159,11 +150,15 @@ class RPCFS(FS):
f.seek(0,0) f.seek(0,0)
else: else:
f.seek(0,2) f.seek(0,2)
oldflush = f.flush
oldclose = f.close oldclose = f.close
def newflush():
oldflush()
self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
def newclose(): def newclose():
f.flush() f.flush()
self.proxy.set_contents(path,xmlrpclib.Binary(f.getvalue()))
oldclose() oldclose()
f.flush = newflush
f.close = newclose f.close = newclose
return f return f
...@@ -217,7 +212,11 @@ class RPCFS(FS): ...@@ -217,7 +212,11 @@ class RPCFS(FS):
class RPCFSInterface(object): class RPCFSInterface(object):
"""Wrapper to expose an FS via a XML-RPC compatible interface.""" """Wrapper to expose an FS via a XML-RPC compatible interface.
The only real trick is using xmlrpclib.Binary objects to trasnport
the contents of files.
"""
def __init__(self,fs): def __init__(self,fs):
self.fs = fs self.fs = fs
...@@ -281,12 +280,16 @@ class RPCFSInterface(object): ...@@ -281,12 +280,16 @@ class RPCFSInterface(object):
class RPCFSServer(SimpleXMLRPCServer): class RPCFSServer(SimpleXMLRPCServer):
"""Server to expose an FS object via XML-RPC. """Server to expose an FS object via XML-RPC.
This class takes as its first argument an FS instance, and as its second
argument a (hostname,port) tuple on which to listen for XML-RPC requests.
Example: Example:
fs = OSFS('/var/srv/myfiles') fs = OSFS('/var/srv/myfiles')
s = RPCFSServer(fs,("",8080)) s = RPCFSServer(fs,("",8080))
s.serve_forever() s.serve_forever()
To cleanly shut down the server after calling serve_forever, set the
attribute "serve_more_requests" to False.
""" """
def __init__(self,fs,addr,requestHandler=None,logRequests=None): def __init__(self,fs,addr,requestHandler=None,logRequests=None):
......
...@@ -20,11 +20,6 @@ except ImportError: ...@@ -20,11 +20,6 @@ except ImportError:
from fs.base import * from fs.base import *
from fs.helpers import * from fs.helpers import *
try:
from collections import MutableMapping as DictMixin
except ImportError:
from UserDict import DictMixin
class S3FS(FS): class S3FS(FS):
"""A filesystem stored in Amazon S3. """A filesystem stored in Amazon S3.
...@@ -35,15 +30,16 @@ class S3FS(FS): ...@@ -35,15 +30,16 @@ class S3FS(FS):
be stored. be stored.
Local temporary files are used when opening files from this filesystem, Local temporary files are used when opening files from this filesystem,
and any changes are only pushed back into S3 when the files are closed. and any changes are only pushed back into S3 when the files are closed
or flushed.
""" """
class meta: class meta:
PATH_MAX = None PATH_MAX = None
NAME_MAX = None NAME_MAX = None
def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_syncronize=True): def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_syncronize=True,key_sync_timeout=1):
"""Constructor for S3SF objects. """Constructor for S3FS objects.
S3FS objects required the name of the S3 bucket in which to store S3FS objects required the name of the S3 bucket in which to store
files, and can optionally be given a prefix under which the files files, and can optionally be given a prefix under which the files
...@@ -52,11 +48,19 @@ class S3FS(FS): ...@@ -52,11 +48,19 @@ class S3FS(FS):
read from the two environment variables AWS_ACCESS_KEY_ID and read from the two environment variables AWS_ACCESS_KEY_ID and
AWS_SECRET_ACCESS_KEY. AWS_SECRET_ACCESS_KEY.
The keyword argument 'key_sync_timeout' specifies the maximum
time in seconds that the filesystem will spend trying to confirm
that a newly-uploaded S3 key is available for reading. For no
timeout set it to zero. To disable these checks entirely (and
thus reduce the filesystem's consistency guarantees to those of
S3's "eventual consistency" model) set it to None.
By default the path separator is "/", but this can be overridden By default the path separator is "/", but this can be overridden
by specifying the keyword 'separator' in the constructor. by specifying the keyword 'separator' in the constructor.
""" """
self._bucketName = bucket self._bucket_name = bucket
self._separator = separator self._separator = separator
self._key_sync_timeout = key_sync_timeout
self._s3conn = boto.s3.connection.S3Connection(aws_access_key,aws_secret_key) self._s3conn = boto.s3.connection.S3Connection(aws_access_key,aws_secret_key)
self._s3bukt = self._s3conn.create_bucket(bucket) self._s3bukt = self._s3conn.create_bucket(bucket)
# Normalise prefix to this form: path/to/files/ # Normalise prefix to this form: path/to/files/
...@@ -64,16 +68,17 @@ class S3FS(FS): ...@@ -64,16 +68,17 @@ class S3FS(FS):
prefix = prefix[1:] prefix = prefix[1:]
if not prefix.endswith(separator): if not prefix.endswith(separator):
prefix = prefix + separator prefix = prefix + separator
self._prefixStr = prefix self._prefix = prefix
FS.__init__(self, thread_syncronize=thread_syncronize) FS.__init__(self, thread_syncronize=thread_syncronize)
def __str__(self): def __str__(self):
return '<S3FS: %s:%s>' % (self._bucketName,self._prefixStr) return '<S3FS: %s:%s>' % (self._bucket_name,self._prefix)
__repr__ = __str__ __repr__ = __str__
def _s3path(self,path): def _s3path(self,path):
"""Get the absolute path to a file stored in S3.""" """Get the absolute path to a file stored in S3."""
path = self._prefixStr + path path = self._prefix + path
path = self._separator.join(self._pathbits(path)) path = self._separator.join(self._pathbits(path))
return path return path
...@@ -83,6 +88,41 @@ class S3FS(FS): ...@@ -83,6 +88,41 @@ class S3FS(FS):
if bit and bit != ".": if bit and bit != ".":
yield bit yield bit
def _sync_key(self,k):
"""Synchronise on contents of the given key.
Since S3 only offers "eventual consistency" of data, it is possible
to create a key but be unable to read it back straight away. This
method works around that limitation by polling the key until it reads
back the expected by the given key.
Note that this could easily fail if the key is modified by another
program, meaning the content will never be as specified in the given
key. This is the reason for the timeout argument to the construtcor.
"""
timeout = self._key_sync_timeout
if timeout is None:
return k
k2 = self._s3bukt.get_key(k.name)
t = time.time()
while k2 is None or k2.etag != k.etag:
if timeout > 0:
if t + timeout < time.time():
break
time.sleep(0.1)
k2 = self._s3bukt.get_key(k.name)
return k2
def _sync_set_contents(self,key,contents):
"""Synchronously set the contents of a key."""
if isinstance(key,basestring):
key = self._s3bukt.new_key(key)
if isinstance(contents,basestring):
key.set_contents_from_string(contents)
else:
key.set_contents_from_file(contents)
return self._sync_key(key)
def open(self,path,mode="r"): def open(self,path,mode="r"):
"""Open the named file in the given mode. """Open the named file in the given mode.
...@@ -91,19 +131,22 @@ class S3FS(FS): ...@@ -91,19 +131,22 @@ class S3FS(FS):
file are only sent back to S3 when the file is flushed or closed. file are only sent back to S3 when the file is flushed or closed.
""" """
tf = TempFile() tf = TempFile()
oldLM = None
s3path = self._s3path(path) s3path = self._s3path(path)
if "r" in mode or "+" in mode or "a" in mode: # Truncate the file if requested
# Get the file contents into the tempfile. if "w" in mode:
# If it does not exist and has been opened for writing, create it. k = self._sync_set_contents(s3path,"")
else:
k = self._s3bukt.get_key(s3path) k = self._s3bukt.get_key(s3path)
if k is None: if k is None:
# Create the file if it's missing
if "w" not in mode and "a" not in mode: if "w" not in mode and "a" not in mode:
raise ResourceNotFoundError("NO_FILE",path) raise ResourceNotFoundError("NO_FILE",path)
if not self.isdir(dirname(path)): if not self.isdir(dirname(path)):
raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist") raise OperationFailedError("OPEN_FAILED", path,msg="Parent directory does not exist")
k = self._sync_set_contents(s3path,"")
else: else:
oldLM = k.last_modified # Get the file contents into the tempfile.
if "r" in mode or "+" in mode or "a" in mode:
k.get_contents_to_file(tf) k.get_contents_to_file(tf)
if "a" not in mode: if "a" not in mode:
tf.seek(0) tf.seek(0)
...@@ -111,22 +154,16 @@ class S3FS(FS): ...@@ -111,22 +154,16 @@ class S3FS(FS):
if "w" in mode or "a" in mode or "+" in mode: if "w" in mode or "a" in mode or "+" in mode:
oldflush = tf.flush oldflush = tf.flush
oldclose = tf.close oldclose = tf.close
def upload():
tf.seek(0)
k = self._s3bukt.new_key(s3path)
k.set_contents_from_file(tf)
k = self._s3bukt.get_key(s3path)
while k.last_modified == oldLM:
time.sleep(0.1)
k = self._s3bukt.get_key(s3path)
def newflush(): def newflush():
oldflush() oldflush()
pos = tf.tell() pos = tf.tell()
upload() tf.seek(0)
self._sync_set_contents(k,tf)
tf.seek(pos) tf.seek(pos)
def newclose(): def newclose():
oldflush() oldflush()
upload() tf.seek(0)
self._sync_set_contents(k,tf)
oldclose() oldclose()
tf.close = newclose tf.close = newclose
tf.flush = newflush tf.flush = newflush
...@@ -137,7 +174,7 @@ class S3FS(FS): ...@@ -137,7 +174,7 @@ class S3FS(FS):
s3path = self._s3path(path) s3path = self._s3path(path)
s3pathD = s3path + self._separator s3pathD = s3path + self._separator
# The root directory always exists # The root directory always exists
if self._prefixStr.startswith(s3path): if self._prefix.startswith(s3path):
return True return True
ks = self._s3bukt.list(prefix=s3path,delimiter=self._separator) ks = self._s3bukt.list(prefix=s3path,delimiter=self._separator)
for k in ks: for k in ks:
...@@ -153,7 +190,7 @@ class S3FS(FS): ...@@ -153,7 +190,7 @@ class S3FS(FS):
"""Check whether a path exists and is a directory.""" """Check whether a path exists and is a directory."""
s3path = self._s3path(path) + self._separator s3path = self._s3path(path) + self._separator
# Root is always a directory # Root is always a directory
if s3path == self._prefixStr: if s3path == self._prefix:
return True return True
# Use a list request so that we return true if there are any files # Use a list request so that we return true if there are any files
# in that directory. This avoids requiring a special file for the # in that directory. This avoids requiring a special file for the
...@@ -167,7 +204,7 @@ class S3FS(FS): ...@@ -167,7 +204,7 @@ class S3FS(FS):
"""Check whether a path exists and is a regular file.""" """Check whether a path exists and is a regular file."""
s3path = self._s3path(path) s3path = self._s3path(path)
# Root is never a file # Root is never a file
if self._prefixStr.startswith(s3path): if self._prefix.startswith(s3path):
return False return False
k = self._s3bukt.get_key(s3path) k = self._s3bukt.get_key(s3path)
if k is not None: if k is not None:
...@@ -193,7 +230,8 @@ class S3FS(FS): ...@@ -193,7 +230,8 @@ class S3FS(FS):
nm = nm.encode() nm = nm.encode()
paths.append(nm) paths.append(nm)
if not isDir: if not isDir:
if s3path != self._prefixStr: if s3path != self._prefix:
print "NOT A DIR:", s3path
raise OperationFailedError("LISTDIR_FAILED",path) raise OperationFailedError("LISTDIR_FAILED",path)
return self._listdir_helper(path,paths,wildcard,full,absolute,hidden,dirs_only,files_only) return self._listdir_helper(path,paths,wildcard,full,absolute,hidden,dirs_only,files_only)
...@@ -233,14 +271,14 @@ class S3FS(FS): ...@@ -233,14 +271,14 @@ class S3FS(FS):
""" """
s3path = self._s3path(path) s3path = self._s3path(path)
s3pathD = s3path + self._separator s3pathD = s3path + self._separator
if s3pathD == self._prefixStr: if s3pathD == self._prefix:
if allow_recreate: if allow_recreate:
return return
raise OperationFailedError("MAKEDIR_FAILED", path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s") raise OperationFailedError("MAKEDIR_FAILED", path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
s3pathP = self._s3path(dirname(path[:-1])) + self._separator s3pathP = self._s3path(dirname(path[:-1])) + self._separator
# Check various preconditions using list of parent dir # Check various preconditions using list of parent dir
ks = self._s3bukt.list(prefix=s3pathP,delimiter=self._separator) ks = self._s3bukt.list(prefix=s3pathP,delimiter=self._separator)
if s3pathP == self._prefixStr: if s3pathP == self._prefix:
parentExists = True parentExists = True
else: else:
parentExists = False parentExists = False
...@@ -263,12 +301,7 @@ class S3FS(FS): ...@@ -263,12 +301,7 @@ class S3FS(FS):
raise OperationFailedError("MAKEDIR_FAILED",path, msg="Parent directory does not exist: %(path)s") raise OperationFailedError("MAKEDIR_FAILED",path, msg="Parent directory does not exist: %(path)s")
# Create an empty file representing the directory # Create an empty file representing the directory
# TODO: is there some standard scheme for representing empty dirs? # TODO: is there some standard scheme for representing empty dirs?
k = self._s3bukt.new_key(s3pathD) self._sync_set_contents(s3pathD,"")
k.set_contents_from_string("")
k = self._s3bukt.get_key(s3pathD)
while not k:
time.sleep(0.1)
k = self._s3bukt.get_key(s3pathD)
def remove(self,path): def remove(self,path):
"""Remove the file at the given path.""" """Remove the file at the given path."""
...@@ -349,16 +382,14 @@ class S3FS(FS): ...@@ -349,16 +382,14 @@ class S3FS(FS):
# OK, now we can copy the file. # OK, now we can copy the file.
s3path_src = self._s3path(src) s3path_src = self._s3path(src)
try: try:
self._s3bukt.copy_key(s3path_dst,self._bucketName,s3path_src) self._s3bukt.copy_key(s3path_dst,self._bucket_name,s3path_src)
except S3ResponseError, e: except S3ResponseError, e:
if "404 Not Found" in str(e): if "404 Not Found" in str(e):
raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a file: %(path)s") raise ResourceInvalid("WRONG_TYPE", src, msg="Source is not a file: %(path)s")
raise e raise e
else: else:
k = self._s3bukt.get_key(s3path_dst) k = self._s3bukt.get_key(s3path_dst)
while not k: self._sync_key(k)
time.sleep(0.1)
k = self._s3bukt.get_key(s3path_dst)
def move(self,src,dst,chunk_size=16384): def move(self,src,dst,chunk_size=16384):
"""Move a file from one location to another.""" """Move a file from one location to another."""
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment