Commit c369adcd by rfkelly0

S3FS: some code layout cleanups

parent 30f051ef
......@@ -23,6 +23,10 @@ Other useful classes include:
class, patched to more closely preserve the
semantics of a standard file.
* LimitBytesFile: a filelike wrapper that limits the total bytes read
from a file; useful for turning a socket into a file
without reading past end-of-data.
"""
# Copyright (C) 2006-2009, Ryan Kelly
# All rights reserved; available under the terms of the MIT License.
......@@ -743,3 +747,22 @@ class SpooledTemporaryFile(FileWrapper):
return self.wrapped_file.fileno()
class LimitBytesFile(FileWrapper):
"""Filelike wrapper to limit bytes read from a stream."""
def __init__(self,size,fileobj,*args,**kwds):
self.size = size
self.__remaining = size
super(LimitBytesFile,self).__init__(fileobj,*args,**kwds)
def _read(self,sizehint=-1):
if self.__remaining <= 0:
return None
if sizehint is None or sizehint < 0 or sizehint > self.__remaining:
sizehint = self.__remaining
data = super(LimitBytesFile,self)._read(sizehint)
if data is not None:
self.__remaining -= len(data)
return data
......@@ -23,6 +23,7 @@ from fs.base import *
from fs.path import *
from fs.errors import *
from fs.remote import *
from fs.filelike import LimitBytesFile
# Boto is not thread-safe, so we need to use a per-thread S3 connection.
......@@ -33,7 +34,6 @@ else:
def __init__(self):
self._map = {}
def __getattr__(self,attr):
try:
return self._map[(threading.currentThread(),attr)]
except KeyError:
......@@ -171,6 +171,16 @@ class S3FS(FS):
s3path = s3path.encode("utf8")
return s3path
def _uns3path(self,s3path,roots3path=None):
"""Get the local path for a file stored in S3.
This is essentially the opposite of self._s3path().
"""
if roots3path is None:
roots3path = self._s3path("")
i = len(roots3path)
return s3path[i:]
def _sync_key(self,k):
"""Synchronise on contents of the given key.
......@@ -276,8 +286,14 @@ class S3FS(FS):
if not self.isdir(dirname(path)):
raise ParentDirectoryMissingError(path)
k = self._sync_set_contents(s3path,"")
# TODO: support streaming reads
return RemoteFileBuffer(self,path,mode,k)
# Make sure nothing tries to read past end of socket data
f = LimitBytesFile(k.size,k,"r")
# For streaming reads, return the key object directly
if mode == "r-":
return f
# For everything else, use a RemoteFileBuffer.
# This will take care of closing the socket when it's done.
return RemoteFileBuffer(self,path,mode,f)
def exists(self,path):
"""Check whether a path exists."""
......@@ -324,23 +340,31 @@ class S3FS(FS):
return True
return False
def listdir(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False):
def listdir(self,path="./",wildcard=None,full=False,absolute=False,
dirs_only=False,files_only=False):
"""List contents of a directory."""
return list(self.ilistdir(path,wildcard,full,absolute,dirs_only,files_only))
return list(self.ilistdir(path,wildcard,full,absolute,
dirs_only,files_only))
def listdirinfo(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False):
return list(self.ilistdirinfo(path,wildcard,full,absolute,dirs_only,files_only))
def listdirinfo(self,path="./",wildcard=None,full=False,absolute=False,
dirs_only=False,files_only=False):
return list(self.ilistdirinfo(path,wildcard,full,absolute,
dirs_only,files_only))
def ilistdir(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False):
def ilistdir(self,path="./",wildcard=None,full=False,absolute=False,
dirs_only=False,files_only=False):
"""List contents of a directory."""
keys = self._iter_keys(path)
entries = self._filter_keys(path,keys,wildcard,full,absolute,dirs_only,files_only)
entries = self._filter_keys(path,keys,wildcard,full,absolute,
dirs_only,files_only)
return (nm for (nm,k) in entries)
def ilistdirinfo(self,path="./",wildcard=None,full=False,absolute=False,dirs_only=False,files_only=False):
def ilistdirinfo(self,path="./",wildcard=None,full=False,absolute=False,
dirs_only=False,files_only=False):
keys = self._iter_keys(path)
entries = self._filter_keys(path,keys,wildcard,full,absolute,dirs_only,files_only)
return ((nm,self._get_key_info(k)) for (nm,k) in entries)
entries = self._filter_keys(path,keys,wildcard,full,absolute,
dirs_only,files_only)
return ((nm,self._get_key_info(k,nm)) for (nm,k) in entries)
def _iter_keys(self,path):
"""Iterator over keys contained in the given directory.
......@@ -352,14 +376,13 @@ class S3FS(FS):
s3path = self._s3path(path) + self._separator
if s3path == "/":
s3path = ""
i = len(s3path)
isDir = False
for k in self._s3bukt.list(prefix=s3path,delimiter=self._separator):
if not isDir:
isDir = True
# Skip over the entry for the directory itself, if it exists
if k.name[i:] != "":
name = k.name[i:]
name = self._uns3path(k.name,s3path)
if name != "":
if not isinstance(name,unicode):
name = name.decode("utf8")
if name.endswith(self._separator):
......@@ -368,10 +391,12 @@ class S3FS(FS):
if not isDir:
if s3path != self._prefix:
if self.isfile(path):
raise ResourceInvalidError(path,msg="that's not a directory: %(path)s")
msg = "that's not a directory: %(path)s"
raise ResourceInvalidError(path,msg=msg)
raise ResourceNotFoundError(path)
def _filter_keys(self,path,keys,wildcard,full,absolute,dirs_only,files_only):
def _filter_keys(self,path,keys,wildcard,full,absolute,
dirs_only,files_only):
"""Filter out keys not matching the given criteria.
Given a (name,key) iterator as returned by _iter_keys, this method
......@@ -406,7 +431,9 @@ class S3FS(FS):
if s3pathD == self._prefix:
if allow_recreate:
return
raise DestinationExistsError(path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
msg = "Can not create a directory that already exists"\
" (try allow_recreate=True): %(path)s"
raise DestinationExistsError(path, msg=msg)
s3pathP = self._s3path(dirname(path))
if s3pathP:
s3pathP = s3pathP + self._separator
......@@ -421,20 +448,23 @@ class S3FS(FS):
parentExists = True
if _eq_utf8(k.name,s3path):
# It's already a file
raise ResourceInvalidError(path, msg="Destination exists as a regular file: %(path)s")
msg = "Destination exists as a regular file: %(path)s"
raise ResourceInvalidError(path, msg=msg)
if _eq_utf8(k.name,s3pathD):
# It's already a directory
if allow_recreate:
return
raise DestinationExistsError(path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
msg = "Can not create a directory that already exists"\
" (try allow_recreate=True): %(path)s"
raise DestinationExistsError(path, msg=msg)
# Create parent if required
if not parentExists:
if recursive:
self.makedir(dirname(path),recursive,allow_recreate)
else:
raise ParentDirectoryMissingError(path, msg="Parent directory does not exist: %(path)s")
msg = "Parent directory does not exist: %(path)s"
raise ParentDirectoryMissingError(path, msg=msg)
# Create an empty file representing the directory
# TODO: is there some standard scheme for representing empty dirs?
self._sync_set_contents(s3pathD,"")
def remove(self,path):
......@@ -445,7 +475,8 @@ class S3FS(FS):
if _eq_utf8(k.name,s3path):
break
if _startswith_utf8(k.name,s3path + "/"):
raise ResourceInvalidError(path,msg="that's not a file: %(path)s")
msg = "that's not a file: %(path)s"
raise ResourceInvalidError(path,msg=msg)
else:
raise ResourceNotFoundError(path)
self._s3bukt.delete_key(s3path)
......@@ -474,7 +505,8 @@ class S3FS(FS):
self._s3bukt.delete_key(k.name)
if not found:
if self.isfile(path):
raise ResourceInvalidError(path,msg="removedir() called on a regular file: %(path)s")
msg = "removedir() called on a regular file: %(path)s"
raise ResourceInvalidError(path,msg=msg)
if path not in ("","/"):
raise ResourceNotFoundError(path)
self._s3bukt.delete_key(s3path)
......@@ -506,11 +538,14 @@ class S3FS(FS):
break
else:
raise ResourceNotFoundError(path)
return self._get_key_info(k)
return self._get_key_info(k,path)
def _get_key_info(self,key):
def _get_key_info(self,key,name=None):
info = {}
info["name"] = basename(key.name)
if name is not None:
info["name"] = basename(name)
else:
info["name"] = basename(self._uns3key(k.name))
if isinstance(key,Prefix):
info["st_mode"] = 0700 | statinfo.S_IFDIR
else:
......@@ -564,17 +599,21 @@ class S3FS(FS):
s3path_dst = s3path_dstD + nm
dstOK = True
if not dstOK and not self.isdir(dirname(dst)):
raise ParentDirectoryMissingError(dst,msg="Destination directory does not exist: %(path)s")
msg = "Destination directory does not exist: %(path)s"
raise ParentDirectoryMissingError(dst,msg=msg)
# OK, now we can copy the file.
s3path_src = self._s3path(src)
try:
self._s3bukt.copy_key(s3path_dst,self._bucket_name,s3path_src)
except S3ResponseError, e:
if "404 Not Found" in str(e):
raise ResourceInvalidError(src, msg="Source is not a file: %(path)s")
msg = "Source is not a file: %(path)s"
raise ResourceInvalidError(src, msg=msg)
raise e
else:
k = self._s3bukt.get_key(s3path_dst)
while k is None:
k = self._s3bukt.get_key(s3path_dst)
self._sync_key(k)
def move(self,src,dst,overwrite=False,chunk_size=16384):
......@@ -589,13 +628,13 @@ class S3FS(FS):
search="breadth",
ignore_errors=False ):
if search != "breadth" or dir_wildcard is not None:
for item in super(S3FS,self).walkfiles(path,wildcard,dir_wildcard,search,ignore_errors):
args = (wildcard,dir_wildcard,search,ignore_errors)
for item in super(S3FS,self).walkfiles(path,*args):
yield item
else:
prefix = self._s3path(path)
prefix_len = len(prefix)
for k in self._s3bukt.list(prefix=prefix):
name = k.name[prefix_len:]
name = self._uns3path(k.name,prefix)
if name != "":
if not isinstance(name,unicode):
name = name.decode("utf8")
......@@ -616,13 +655,13 @@ class S3FS(FS):
search="breadth",
ignore_errors=False ):
if search != "breadth" or dir_wildcard is not None:
for item in super(S3FS,self).walkfiles(path,wildcard,dir_wildcard,search,ignore_errors):
args = (wildcard,dir_wildcard,search,ignore_errors)
for item in super(S3FS,self).walkfiles(path,*args):
yield (item,self.getinfo(item))
else:
prefix = self._s3path(path)
prefix_len = len(prefix)
for k in self._s3bukt.list(prefix=prefix):
name = k.name[prefix_len:]
name = self._uns3path(k.name,prefix)
if name != "":
if not isinstance(name,unicode):
name = name.decode("utf8")
......@@ -634,7 +673,7 @@ class S3FS(FS):
else:
if not fnmatch(name,wildcard):
continue
yield (abspath(name),self._get_key_info(k))
yield (abspath(name),self._get_key_info(k,name))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment