Commit a16c90b9 by willmcgugan

Command fixes

parent d9986180
...@@ -579,7 +579,7 @@ class FS(object): ...@@ -579,7 +579,7 @@ class FS(object):
sys_path = self.getsyspath(path) sys_path = self.getsyspath(path)
except NoSysPathError: except NoSysPathError:
return "No description available" return "No description available"
return "OS resource, maps to %s" % sys_path return sys_path
def getcontents(self, path): def getcontents(self, path):
......
...@@ -52,6 +52,9 @@ Copy SOURCE to DESTINATION""" ...@@ -52,6 +52,9 @@ Copy SOURCE to DESTINATION"""
def get_action(self): def get_action(self):
return copyfile return copyfile
def get_verb(self):
return 'copying...'
def get_optparse(self): def get_optparse(self):
optparse = super(FScp, self).get_optparse() optparse = super(FScp, self).get_optparse()
...@@ -83,11 +86,15 @@ Copy SOURCE to DESTINATION""" ...@@ -83,11 +86,15 @@ Copy SOURCE to DESTINATION"""
copy_fs_paths = [] copy_fs_paths = []
progress = options.progress progress = options.progress
if progress:
sys.stdout.write(self.progress_bar(len(srcs), 0, 'scanning...'))
sys.stdout.flush()
self.root_dirs = [] self.root_dirs = []
for fs_url in srcs: for i, fs_url in enumerate(srcs):
src_fs, src_path = self.open_fs(fs_url) src_fs, src_path = self.open_fs(fs_url)
if src_path is None: if src_path is None:
src_path = '/' src_path = '/'
...@@ -111,7 +118,15 @@ Copy SOURCE to DESTINATION""" ...@@ -111,7 +118,15 @@ Copy SOURCE to DESTINATION"""
copy_fs_paths.append((self.FILE, src_fs, src_path, src_path)) copy_fs_paths.append((self.FILE, src_fs, src_path, src_path))
else: else:
self.error('%s is not a file or directory\n' % src_path) self.error('%s is not a file or directory\n' % src_path)
return 1 return 1
if progress:
sys.stdout.write(self.progress_bar(len(srcs), i + 1, 'scanning...'))
sys.stdout.flush()
if progress:
sys.stdout.write(self.progress_bar(len(copy_fs_paths), 0, self.get_verb()))
sys.stdout.flush()
if self.options.threads > 1: if self.options.threads > 1:
copy_fs_dirs = [r for r in copy_fs_paths if r[0] == self.DIR] copy_fs_dirs = [r for r in copy_fs_paths if r[0] == self.DIR]
...@@ -174,7 +189,7 @@ Copy SOURCE to DESTINATION""" ...@@ -174,7 +189,7 @@ Copy SOURCE to DESTINATION"""
dst_fs.close() dst_fs.close()
if complete and options.progress: if complete and options.progress:
sys.stdout.write(self.progress_bar(self.total_files, self.done_files)) sys.stdout.write(self.progress_bar(self.total_files, self.done_files, ''))
sys.stdout.write('\n') sys.stdout.write('\n')
sys.stdout.flush() sys.stdout.flush()
...@@ -191,7 +206,7 @@ Copy SOURCE to DESTINATION""" ...@@ -191,7 +206,7 @@ Copy SOURCE to DESTINATION"""
print "%s -> %s" % (src_fs.desc(src_path), dst_fs.desc(dst_path)) print "%s -> %s" % (src_fs.desc(src_path), dst_fs.desc(dst_path))
elif self.options.progress: elif self.options.progress:
self.done_files += 1 self.done_files += 1
sys.stdout.write(self.progress_bar(self.total_files, self.done_files)) sys.stdout.write(self.progress_bar(self.total_files, self.done_files, self.get_verb()))
sys.stdout.flush() sys.stdout.flush()
finally: finally:
self.lock.release() self.lock.release()
...@@ -219,12 +234,15 @@ Copy SOURCE to DESTINATION""" ...@@ -219,12 +234,15 @@ Copy SOURCE to DESTINATION"""
done_steps = int(done * bar_width) done_steps = int(done * bar_width)
bar_steps = ('#' * done_steps).ljust(bar_width) bar_steps = ('#' * done_steps).ljust(bar_width)
msg = '%i%% ' % int(done * 100.0)
msg = '%s %i%%' % (msg, int(done * 100.0))
msg = msg.ljust(20)
if total == remaining: if total == remaining:
throb = '' throb = ''
bar = '\r%s[%s] %s' % (throb, bar_steps, msg) bar = '\r%s[%s] %s\r' % (throb, bar_steps, msg.lstrip())
return bar return bar
def run(): def run():
......
...@@ -7,6 +7,9 @@ class FSMove(fscp.FScp): ...@@ -7,6 +7,9 @@ class FSMove(fscp.FScp):
usage = """fsmv [OPTION]... [SOURCE] [DESTINATION] usage = """fsmv [OPTION]... [SOURCE] [DESTINATION]
Move files from SOURCE to DESTINATION""" Move files from SOURCE to DESTINATION"""
def get_verb(self):
return 'moving...'
def get_action(self): def get_action(self):
return movefile return movefile
......
...@@ -522,7 +522,19 @@ def _skip(s, i, c): ...@@ -522,7 +522,19 @@ def _skip(s, i, c):
return i return i
def fileftperrors(f):
@wraps(f)
def deco(self, *args, **kwargs):
self._lock.acquire()
try:
try:
ret = f(self, *args, **kwargs)
except Exception, e:
self.ftpfs._translate_exception(args[0] if args else '', e)
finally:
self._lock.release()
return ret
return deco
...@@ -530,6 +542,8 @@ class _FTPFile(object): ...@@ -530,6 +542,8 @@ class _FTPFile(object):
""" A file-like that provides access to a file being streamed over ftp.""" """ A file-like that provides access to a file being streamed over ftp."""
blocksize = 1024 * 64
def __init__(self, ftpfs, ftp, path, mode): def __init__(self, ftpfs, ftp, path, mode):
if not hasattr(self, '_lock'): if not hasattr(self, '_lock'):
self._lock = threading.RLock() self._lock = threading.RLock()
...@@ -549,6 +563,7 @@ class _FTPFile(object): ...@@ -549,6 +563,7 @@ class _FTPFile(object):
#self._lock = ftpfs._lock #self._lock = ftpfs._lock
self._start_file(mode, path) self._start_file(mode, path)
@fileftperrors
def _start_file(self, mode, path): def _start_file(self, mode, path):
self.read_pos = 0 self.read_pos = 0
self.write_pos = 0 self.write_pos = 0
...@@ -564,7 +579,7 @@ class _FTPFile(object): ...@@ -564,7 +579,7 @@ class _FTPFile(object):
else: else:
self.conn = self.ftp.transfercmd('STOR '+path) self.conn = self.ftp.transfercmd('STOR '+path)
@synchronize @fileftperrors
def read(self, size=None): def read(self, size=None):
if self.conn is None: if self.conn is None:
return '' return ''
...@@ -572,7 +587,7 @@ class _FTPFile(object): ...@@ -572,7 +587,7 @@ class _FTPFile(object):
chunks = [] chunks = []
if size is None: if size is None:
while 1: while 1:
data = self.conn.recv(4096) data = self.conn.recv(self.blocksize)
if not data: if not data:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
...@@ -584,7 +599,7 @@ class _FTPFile(object): ...@@ -584,7 +599,7 @@ class _FTPFile(object):
remaining_bytes = size remaining_bytes = size
while remaining_bytes: while remaining_bytes:
read_size = min(remaining_bytes, 4096) read_size = min(remaining_bytes, self.blocksize)
data = self.conn.recv(read_size) data = self.conn.recv(read_size)
if not data: if not data:
self.conn.close() self.conn.close()
...@@ -597,14 +612,14 @@ class _FTPFile(object): ...@@ -597,14 +612,14 @@ class _FTPFile(object):
return ''.join(chunks) return ''.join(chunks)
@synchronize @fileftperrors
def write(self, data): def write(self, data):
data_pos = 0 data_pos = 0
remaining_data = len(data) remaining_data = len(data)
while remaining_data: while remaining_data:
chunk_size = min(remaining_data, 4096) chunk_size = min(remaining_data, self.blocksize)
self.conn.sendall(data[data_pos:data_pos+chunk_size]) self.conn.sendall(data[data_pos:data_pos+chunk_size])
data_pos += chunk_size data_pos += chunk_size
remaining_data -= chunk_size remaining_data -= chunk_size
...@@ -617,11 +632,11 @@ class _FTPFile(object): ...@@ -617,11 +632,11 @@ class _FTPFile(object):
def __exit__(self,exc_type,exc_value,traceback): def __exit__(self,exc_type,exc_value,traceback):
self.close() self.close()
#@synchronize @fileftperrors
def flush(self): def flush(self):
self.ftpfs._on_file_written(self.path) self.ftpfs._on_file_written(self.path)
@synchronize @fileftperrors
def seek(self, pos, where=fs.SEEK_SET): def seek(self, pos, where=fs.SEEK_SET):
# Ftp doesn't support a real seek, so we close the transfer and resume # Ftp doesn't support a real seek, so we close the transfer and resume
# it at the new position with the REST command # it at the new position with the REST command
...@@ -662,14 +677,14 @@ class _FTPFile(object): ...@@ -662,14 +677,14 @@ class _FTPFile(object):
#raise UnsupportedError('ftp seek') #raise UnsupportedError('ftp seek')
@synchronize @fileftperrors
def tell(self): def tell(self):
if 'r' in self.mode: if 'r' in self.mode:
return self.read_pos return self.read_pos
else: else:
return self.write_pos return self.write_pos
@synchronize @fileftperrors
def truncate(self, size=None): def truncate(self, size=None):
self.ftpfs._on_file_written(self.path) self.ftpfs._on_file_written(self.path)
# Inefficient, but I don't know how else to implement this # Inefficient, but I don't know how else to implement this
...@@ -697,16 +712,22 @@ class _FTPFile(object): ...@@ -697,16 +712,22 @@ class _FTPFile(object):
self.write('\0' * (size - len(data))) self.write('\0' * (size - len(data)))
@synchronize @fileftperrors
def close(self): def close(self):
if 'w' in self.mode or 'a' in self.mode or '+' in self.mode: if 'w' in self.mode or 'a' in self.mode or '+' in self.mode:
self.ftpfs._on_file_written(self.path) self.ftpfs._on_file_written(self.path)
if self.conn is not None: if self.conn is not None:
self.conn.close() try:
self.conn = None self.conn.close()
self.ftp.voidresp() self.conn = None
self.ftp.voidresp()
except error_temp, error_perm:
pass
if self.ftp is not None: if self.ftp is not None:
self.ftp.close() try:
self.ftp.close()
except error_temp, error_perm:
pass
self.closed = True self.closed = True
def __iter__(self): def __iter__(self):
...@@ -998,7 +1019,7 @@ class FTPFS(FS): ...@@ -998,7 +1019,7 @@ class FTPFS(FS):
code, message = str(exception).split(' ', 1) code, message = str(exception).split(' ', 1)
code = int(code) code = int(code)
if code == 550: if code == 550:
raise ResourceNotFoundError(path) pass
if code == 552: if code == 552:
raise StorageSpaceError raise StorageSpaceError
raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s" % str(exception), details=exception) raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s" % str(exception), details=exception)
...@@ -1016,10 +1037,10 @@ class FTPFS(FS): ...@@ -1016,10 +1037,10 @@ class FTPFS(FS):
@ftperrors @ftperrors
def open(self, path, mode='r'): def open(self, path, mode='r'):
mode = mode.lower() mode = mode.lower()
if self.isdir(path): if self.isdir(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
if 'r' in mode: if 'r' in mode or 'a' in mode:
if not self.isfile(path): if not self.isfile(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if 'w' in mode or 'a' in mode or '+' in mode: if 'w' in mode or 'a' in mode or '+' in mode:
...@@ -1029,7 +1050,7 @@ class FTPFS(FS): ...@@ -1029,7 +1050,7 @@ class FTPFS(FS):
return f return f
@ftperrors @ftperrors
def setcontents(self, path, data, chunk_size=8192): def setcontents(self, path, data, chunk_size=1024*64):
path = normpath(path) path = normpath(path)
if isinstance(data, basestring): if isinstance(data, basestring):
data = StringIO(data) data = StringIO(data)
...@@ -1039,7 +1060,7 @@ class FTPFS(FS): ...@@ -1039,7 +1060,7 @@ class FTPFS(FS):
@ftperrors @ftperrors
def getcontents(self, path): def getcontents(self, path):
contents = StringIO() contents = StringIO()
self.ftp.retrbinary('RETR %s' % _encode(normpath(path)), contents.write, blocksize=1024*16) self.ftp.retrbinary('RETR %s' % _encode(normpath(path)), contents.write, blocksize=1024*64)
return contents.getvalue() return contents.getvalue()
@ftperrors @ftperrors
......
...@@ -55,10 +55,9 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1 ...@@ -55,10 +55,9 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
src = None src = None
dst = None dst = None
try: try:
# Chunk copy
src = src_fs.open(src_path, 'rb') src = src_fs.open(src_path, 'rb')
dst = src_fs.open(dst_path, 'wb') dst = dst_fs.open(dst_path, 'wb')
write = dst.write write = dst.write
read = src.read read = src.read
chunk = read(chunk_size) chunk = read(chunk_size)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment