Commit c37d4a12 by rfkelly0

thread-safety for S3FS

parent 5d352d3b
......@@ -20,6 +20,22 @@ except ImportError:
from fs.base import *
# Boto is not thread-safe, so we need to use a per-thread S3 connection.
try:
from threading import local as thread_local
except ImportError:
class thread_local(object):
def __init__(self):
self._map = {}
def __getattr__(self,attr):
try:
return self._map[(threading.currentThread().ident,attr)]
except KeyError:
raise AttributeError, attr
def __setattr__(self,attr,value):
self._map[(threading.currentThread().ident,attr)] = value
class RemoteFileBuffer(object):
"""File-like object providing buffer for local file operations.
......@@ -125,6 +141,7 @@ class S3FS(FS):
if not prefix.endswith(separator) and prefix != "":
prefix = prefix + separator
self._prefix = prefix
self._tlocal = thread_local()
FS.__init__(self, thread_synchronize=thread_synchronize)
# Make _s3conn and _s3bukt properties that are created on demand,
......@@ -132,39 +149,36 @@ class S3FS(FS):
def _s3conn(self):
try:
return self.__dict__['_s3conn']
except KeyError:
return self._tlocal.s3conn
except AttributeError:
c = boto.s3.connection.S3Connection(*self._access_keys)
self.__dict__['_s3conn'] = c
self._tlocal.s3conn = c
return c
_s3conn = property(_s3conn)
def _s3bukt(self):
try:
return self.__dict__['_s3bukt']
except KeyError:
return self._tlocal.s3bukt
except AttributeError:
try:
b = self._s3conn.get_bucket(self._bucket_name)
except S3ResponseError, e:
if "404 Not Found" not in str(e):
raise e
b = self._s3conn.create_bucket(self._bucket_name)
self.__dict__['_s3bukt'] = b
self._tlocal.s3bukt = b
return b
_s3bukt = property(_s3bukt)
def __getstate__(self):
state = super(S3FS,self).__getstate__()
try:
del state['_s3conn']
except KeyError:
pass
try:
del state['_s3bukt']
except KeyError:
pass
del state['_tlocal']
return state
def __setstate__(self,state):
super(S3FS,self).__setstate__(state)
self._tlocal = thread_local()
def __str__(self):
return '<S3FS: %s:%s>' % (self._bucket_name,self._prefix)
......@@ -274,9 +288,12 @@ class S3FS(FS):
# in that directory. This avoids requiring a special file for the
# the directory itself, which other tools may not create.
ks = self._s3bukt.list(prefix=s3path,delimiter=self._separator)
for k in ks:
try:
iter(ks).next()
except StopIteration:
return False
else:
return True
return False
def isfile(self,path):
"""Check whether a path exists and is a regular file."""
......@@ -441,7 +458,10 @@ class S3FS(FS):
return {}
k = self._s3bukt.get_key(s3path)
if k is None:
raise ResourceNotFoundError(path)
k = self._s3bukt.get_key(s3path+"/")
if k is None:
raise ResourceNotFoundError(path)
return {}
info = {}
if hasattr(k,"size"):
info['size'] = int(k.size)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment