Commit 3f3138fe by willmcgugan

Added openers for Tahoe and Dav

parent dc2e9a34
......@@ -64,7 +64,8 @@
* Added a getmmap to base
* Added command line scripts fsls, fstree, fscat, fscp, fsmv
* Added command line scripts fsmkdir, fsmount
* Made automatically pick up keys if no other authentication is available
* Made SFTP automatically pick up keys if no other authentication is available
* Optimized listdir and listdirinfo in SFTPFS
* Made memoryfs work with threads
* Added copyfile_non_atomic and movefile_non_atomic for improved performance of multi-threaded copies
......@@ -845,6 +845,8 @@ class FS(object):
:param overwrite: if True, then an existing file at the destination path
will be silently overwritten; if False then an exception
will be raised in this case.
:param overwrite: When True the destination will be overwritten (if it exists),
otherwise a DestinationExistsError will be thrown
:type overwrite: bool
:param chunk_size: Size of chunks to use when copying, if a simple copy
is required
......
from fs.opener import opener
from fs.utils import copyfile, copystructure
from fs.utils import copyfile, copyfile_non_atomic, copystructure
from fs.path import pathjoin, iswildcard
from fs.errors import FSError
from fs.commands.runner import Command
......@@ -22,7 +22,6 @@ class FileOpThread(threading.Thread):
def run(self):
try:
while not self.finish_event.isSet():
try:
path_type, fs, path, dest_path = self.queue.get(timeout=0.1)
......@@ -34,14 +33,15 @@ class FileOpThread(threading.Thread):
else:
self.action(fs, path, self.dest_fs, dest_path, overwrite=True)
except Exception, e:
print e
self.on_error(e)
self.queue.task_done()
raise
break
else:
self.queue.task_done()
self.on_done(path_type, fs, path, self.dest_fs, dest_path)
except Exception, e:
self.on_error(e)
class FScp(Command):
......@@ -51,6 +51,9 @@ class FScp(Command):
Copy SOURCE to DESTINATION"""
def get_action(self):
if self.options.threads > 1:
return copyfile_non_atomic
else:
return copyfile
def get_verb(self):
......@@ -147,10 +150,11 @@ Copy SOURCE to DESTINATION"""
self.on_done,
self.on_error)
for i in xrange(options.threads)]
for thread in threads:
thread.start()
self.action_error = None
self.action_errors = []
complete = False
try:
enqueue = file_queue.put
......@@ -167,15 +171,10 @@ Copy SOURCE to DESTINATION"""
except KeyboardInterrupt:
options.progress = False
if self.action_error:
self.error(self.wrap_error(unicode(self.action_error)) + '\n')
else:
self.output("\nCancelling...\n")
except SystemExit:
options.progress = False
if self.action_error:
self.error(self.wrap_error(unicode(self.action_error)) + '\n')
finally:
sys.stdout.flush()
......@@ -188,6 +187,12 @@ Copy SOURCE to DESTINATION"""
dst_fs.close()
if self.action_errors:
for error in self.action_errors:
self.error(self.wrap_error(unicode(error)) + '\n')
sys.stdout.write('\n')
sys.stdout.flush()
else:
if complete and options.progress:
sys.stdout.write(self.progress_bar(self.total_files, self.done_files, ''))
sys.stdout.write('\n')
......@@ -212,16 +217,17 @@ Copy SOURCE to DESTINATION"""
self.lock.release()
def on_error(self, e):
print e
self.lock.acquire()
try:
self.action_error = e
self.action_errors.append(e)
finally:
self.lock.release()
def any_error(self):
self.lock.acquire()
try:
return bool(self.action_error)
return bool(self.action_errors)
finally:
self.lock.release()
......
from fs.utils import movefile, contains_files
from fs.utils import movefile, movefile_non_atomic, contains_files
from fs.commands import fscp
import sys
......@@ -11,6 +11,9 @@ Move files from SOURCE to DESTINATION"""
return 'moving...'
def get_action(self):
if self.options.threads > 1:
return movefile_non_atomic
else:
return movefile
def post_actions(self):
......
import sys
from optparse import OptionParser
from fs.opener import opener, OpenerError
from fs.opener import opener, OpenerError, Opener
from fs.errors import FSError
from fs.path import splitext, pathsplit, isdotfile, iswildcard
import platform
......@@ -118,11 +118,7 @@ class Command(object):
return re.sub(re_fs, repl, text)
def open_fs(self, fs_url, writeable=False, create_dir=False):
try:
fs, path = opener.parse(fs_url, writeable=writeable, create_dir=create_dir)
except OpenerError, e:
self.error(str(e), '\n')
sys.exit(1)
fs.cache_hint(True)
return fs, path
......@@ -238,6 +234,8 @@ class Command(object):
help="make output verbose", metavar="VERBOSE")
optparse.add_option('--listopeners', dest='listopeners', action="store_true", default=False,
help="list all FS openers", metavar="LISTOPENERS")
optparse.add_option('--fs', dest='fs', action='append', type="string",
help="import an FS opener e.g --fs foo.bar.MyOpener", metavar="OPENER")
return optparse
def list_openers(self):
......@@ -288,6 +286,31 @@ class Command(object):
self.list_openers()
return 0
ilocals = {}
if options.fs:
for import_opener in options.fs:
module_name, opener_class = import_opener.rsplit('.', 1)
try:
opener_module = __import__(module_name, globals(), ilocals, [opener_class], -1)
except ImportError:
self.error("Unable to import opener %s\n" % import_opener)
return 0
new_opener = getattr(opener_module, opener_class)
try:
if not issubclass(new_opener, Opener):
self.error('%s is not an fs.opener.Opener\n' % import_opener)
return 0
except TypeError:
self.error('%s is not an opener class\n' % import_opener)
return 0
if options.verbose:
self.output('Imported opener %s\n' % import_opener)
opener.add(new_opener)
args = [unicode(arg, sys.getfilesystemencoding()) for arg in args]
self.verbose = options.verbose
try:
......
......@@ -110,7 +110,7 @@ class DAVFS(FS):
if resp.status == 404:
raise ResourceNotFoundError("/",msg="root url gives 404")
if resp.status in (401,403):
raise PermissionDeniedError("listdir")
raise PermissionDeniedError("listdir (http %s)" % resp.status)
if resp.status != 207:
msg = "server at %s doesn't speak WebDAV" % (self.url,)
raise RemoteConnectionError("",msg=msg,details=resp.read())
......@@ -494,6 +494,7 @@ class DAVFS(FS):
if response.status == 405:
raise ResourceInvalidError(path)
if response.status < 200 or response.status >= 300:
print response.read()
raise_generic_error(response,"remove",path)
return True
......
'''
Example (it will use publicly available, but slow-as-hell Tahoe-LAFS cloud):
from fs.tahoefs import TahoeFS, Connection
from fs.contrib.tahoefs import TahoeFS, Connection
dircap = TahoeFS.createdircap(webapi='http://pubgrid.tahoe-lafs.org')
print "Your dircap (unique key to your storage directory) is", dircap
print "Keep it safe!"
......@@ -86,13 +86,13 @@ class TahoeFS(CacheFS):
def __init__(self, dircap, timeout=60, autorun=True, largefilesize=10*1024*1024, webapi='http://127.0.0.1:3456'):
'''
Creates instance of TahoeFS.
dircap - special hash allowing user to work with TahoeLAFS directory.
timeout - how long should underlying CacheFS keep information about files
:param dircap: special hash allowing user to work with TahoeLAFS directory.
:param timeout: how long should underlying CacheFS keep information about files
before asking TahoeLAFS node again.
autorun - Allow listing autorun files? Can be very dangerous on Windows!.
:param autorun: Allow listing autorun files? Can be very dangerous on Windows!.
This is temporary hack, as it should be part of Windows-specific middleware,
not Tahoe itself.
largefilesize - Create placeholder file for files larger than this tresholf.
:param largefilesize: - Create placeholder file for files larger than this treshold.
Uploading and processing of large files can last extremely long (many hours),
so placing this placeholder can help you to remember that upload is processing.
Setting this to None will skip creating placeholder files for any uploads.
......@@ -384,7 +384,7 @@ class _TahoeFS(FS):
offset=offset, length=length)
@_fix_path
def setcontents(self, path, file):
def setcontents(self, path, file, chunk_size=64*1024):
self._log(INFO, 'Uploading file %s' % path)
path = self.tahoeutil.fixwinpath(path, False)
size=None
......
......@@ -32,6 +32,9 @@ def _expand_syspath(path):
def _parse_credentials(url):
scheme = None
if '://' in url:
scheme, url = url.split('://', 1)
username = None
password = None
if '@' in url:
......@@ -40,6 +43,8 @@ def _parse_credentials(url):
username, password = credentials.split(':', 1)
else:
username = credentials
if scheme is not None:
url = '%s://%s' % (scheme, url)
return username, password, url
def _parse_name(fs_name):
......@@ -49,6 +54,13 @@ def _parse_name(fs_name):
else:
return fs_name, None
def _split_url_path(url):
if '://' not in url:
url = 'http://' + url
scheme, netloc, path, params, query, fragment = urlparse(url)
url = '%s://%s' % (scheme, netloc)
return url, path
class OpenerRegistry(object):
......@@ -134,6 +146,9 @@ class OpenerRegistry(object):
fs_path = join(fs_path, path)
if create_dir and fs_path:
fs.makedir(fs_path, allow_recreate=True)
pathname, resourcename = pathsplit(fs_path or '')
if pathname and resourcename:
fs = fs.opendir(pathname)
......@@ -418,7 +433,7 @@ example:
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.wrapfs.debugfs import DebugFS
if fs_path:
fs, path = registry.parse(fs_path, writeable=writeable, create=create_dir)
fs, path = registry.parse(fs_path, writeable=writeable, create_dir=create_dir)
return DebugFS(fs, verbose=False), None
if fs_name_params == 'ram':
from fs.memoryfs import MemoryFS
......@@ -441,11 +456,99 @@ example:
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.tempfs import TempFS
fs = TempFS(identifier=fs_name_params)
if create_dir and fs_path:
fs = fs.makeopendir(fs_path)
fs_path = pathsplit(fs_path)
return fs, fs_path
class S3Opener(Opener):
names = ['s3']
desc = """Opens a filesystem stored on Amazon S3 storage
The environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should be set"""
@classmethod
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.s3fs import S3FS
bucket = fs_path
path =''
if '/' in fs_path:
bucket, path = fs_path.split('/', 1)
fs = S3FS(bucket)
if path:
dirpath, resourcepath = pathsplit(path)
if dirpath:
fs = fs.opendir(dirpath)
path = resourcepath
return fs, path
class TahoeOpener(Opener):
names = ['tahoe']
desc = """Opens a Tahoe-LAFS filesystem
example:
* tahoe://http://pubgrid.tahoe-lafs.org/uri/URI:DIR2:h5bkxelehowscijdb [...]"""
@classmethod
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.contrib.tahoefs import TahoeFS
if '/uri/' not in fs_path:
raise OpenerError("""Tahoe url should be in the form <url>/uri/<dicap>""")
url, dircap = fs_path.split('/uri/')
path = ''
if '/' in dircap:
dircap, path = dircap.split('/', 1)
fs = TahoeFS(dircap, webapi=url)
if '/' in path:
dirname, resourcename = pathsplit(path)
if createdir:
fs = fs.makeopendir(dirname)
else:
fs = fs.opendir(dirname)
path = ''
return fs, path
class DavOpener(Opener):
names = ['dav']
desc = """Opens a WebDAV server
example:
* dav://example.org/dav"""
@classmethod
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create_dir):
from fs.contrib.davfs import DAVFS
url = fs_path
if '://' not in url:
url = 'http://' + url
scheme, url = url.split('://', 1)
username, password, url = _parse_credentials(url)
credentials = None
if username or password:
credentials = {}
if username:
credentials['username'] = username
if password:
credentials['password'] = password
url = '%s://%s' % (scheme, url)
fs = DAVFS(url, credentials=credentials)
return fs, ''
opener = OpenerRegistry([OSFSOpener,
ZipOpener,
......@@ -455,6 +558,9 @@ opener = OpenerRegistry([OSFSOpener,
MemOpener,
DebugOpener,
TempOpener,
S3Opener,
TahoeOpener,
DavOpener,
])
......
......@@ -72,7 +72,7 @@ class S3FS(FS):
PATH_MAX = None
NAME_MAX = None
def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_synchronize=True,key_sync_timeout=1):
def __init__(self, bucket, prefix="", aws_access_key=None, aws_secret_key=None, separator="/", thread_synchronize=True, key_sync_timeout=1):
"""Constructor for S3FS objects.
S3FS objects require the name of the S3 bucket in which to store
......
......@@ -53,6 +53,37 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
FS._shutil_copyfile(src_syspath, dst_syspath)
return
src_lock = getattr(src_fs, '_lock', None)
if src_lock is not None:
src_lock.acquire()
try:
src = None
try:
src = src_fs.open(src_path, 'rb')
dst_fs.setcontents(dst_path, src, chunk_size=chunk_size)
finally:
if src is not None:
src.close()
finally:
if src_lock is not None:
src_lock.release()
def copyfile_non_atomic(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024):
"""A non atomic version of copyfile (will not block other threads using src_fs or dst_fst)
:param src_fs: Source filesystem object
:param src_path: -- Source path
:param dst_fs: Destination filesystem object
:param dst_path: Destination filesystem object
:param chunk_size: Size of chunks to move if system copyfile is not available (default 16K)
"""
if not overwrite and dst_fs.exists(dst_path):
raise DestinationExistsError(dst_path)
src = None
dst = None
try:
......@@ -97,24 +128,64 @@ def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
FS._shutil_movefile(src_syspath, dst_syspath)
return
src_lock = getattr(src_fs, '_lock', None)
if src_lock is not None:
src_lock.acquire()
try:
src = None
try:
# Chunk copy
src = src_fs.open(src_path, 'rb')
dst_fs.setcontents(dst_path, src, chunk_size=chunk_size)
except:
raise
else:
src_fs.remove(src_path)
finally:
if src is not None:
src.close()
finally:
if src_lock is not None:
src_lock.release()
def movefile_non_atomic(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024):
"""A non atomic version of movefile (wont block other threads using src_fs or dst_fs
:param src_fs: Source filesystem object
:param src_path: Source path
:param dst_fs: Destination filesystem object
:param dst_path: Destination filesystem object
:param chunk_size: Size of chunks to move if system copyfile is not available (default 16K)
"""
if not overwrite and dst_fs.exists(dst_path):
raise DestinationExistsError(dst_path)
src = None
dst = None
try:
# Chunk copy
src = src_fs.open(src_path, 'rb')
dst = src_fs.open(dst_path, 'wb')
dst = dst_fs.open(dst_path, 'wb')
write = dst.write
read = src.read
chunk = read(chunk_size)
while chunk:
write(chunk)
chunk = read(chunk_size)
except:
raise
else:
src_fs.remove(src_path)
finally:
if src is not None:
src.close()
if dst is not None:
dst.close()
src_fs.remove(src_path)
def movedir(fs1, fs2, overwrite=False, ignore_errors=False, chunk_size=64*1024):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment