Commit d9986180 by willmcgugan

Major ftp improvments

parent 7f1e44a3
...@@ -419,7 +419,7 @@ class FS(object): ...@@ -419,7 +419,7 @@ class FS(object):
:raises ResourceInvalidError: If the path exists, but is not a directory :raises ResourceInvalidError: If the path exists, but is not a directory
""" """
path = normpath(path)
def getinfo(p): def getinfo(p):
try: try:
if full or absolute: if full or absolute:
...@@ -783,7 +783,7 @@ class FS(object): ...@@ -783,7 +783,7 @@ class FS(object):
raise OperationFailedError("get size of resource", path) raise OperationFailedError("get size of resource", path)
return size return size
def copy(self, src, dst, overwrite=False, chunk_size=16384): def copy(self, src, dst, overwrite=False, chunk_size=1024*64):
"""Copies a file from src to dst. """Copies a file from src to dst.
:param src: the source path :param src: the source path
......
...@@ -13,10 +13,10 @@ Concetanate FILE(s)""" ...@@ -13,10 +13,10 @@ Concetanate FILE(s)"""
def do_run(self, options, args): def do_run(self, options, args):
count = 0 count = 0
for fs, path, is_dir in self.get_resources(args): for fs, path, is_dir in self.get_resources(args):
if is_dir: if is_dir:
self.error('%s is a directory\n' % path) self.error('%s is a directory\n' % path)
return 1 return 1
self.output(fs.getcontents(path)) self.output(fs.getcontents(path))
count += 1 count += 1
if self.is_terminal() and count: if self.is_terminal() and count:
......
...@@ -6,14 +6,14 @@ from fs.commands.runner import Command ...@@ -6,14 +6,14 @@ from fs.commands.runner import Command
from collections import defaultdict from collections import defaultdict
import sys import sys
class FSList(Command): class FSls(Command):
usage = """fsls [OPTIONS]... [PATH] usage = """fsls [OPTIONS]... [PATH]
List contents of [PATH]""" List contents of [PATH]"""
def get_optparse(self): def get_optparse(self):
optparse = super(FSList, self).get_optparse() optparse = super(FSls, self).get_optparse()
optparse.add_option('-u', '--full', dest='fullpath', action="store_true", default=False, optparse.add_option('-u', '--full', dest='fullpath', action="store_true", default=False,
help="output full path", metavar="FULL") help="output full path", metavar="FULL")
optparse.add_option('-s', '--syspath', dest='syspath', action="store_true", default=False, optparse.add_option('-s', '--syspath', dest='syspath', action="store_true", default=False,
...@@ -37,9 +37,11 @@ List contents of [PATH]""" ...@@ -37,9 +37,11 @@ List contents of [PATH]"""
args = [u'.'] args = [u'.']
dir_paths = [] dir_paths = []
file_paths = [] file_paths = []
fs_used = set()
for fs_url in args: for fs_url in args:
fs, path = self.open_fs(fs_url) fs, path = self.open_fs(fs_url)
fs_used.add(fs)
path = path or '.' path = path or '.'
wildcard = None wildcard = None
...@@ -61,7 +63,13 @@ List contents of [PATH]""" ...@@ -61,7 +63,13 @@ List contents of [PATH]"""
wildcard=wildcard, wildcard=wildcard,
full=options.fullpath, full=options.fullpath,
files_only=True) files_only=True)
try:
for fs in fs_used:
fs.close()
except FSError:
pass
if options.syspath: if options.syspath:
dir_paths = [fs.getsyspath(path, allow_none=True) or path for path in dir_paths] dir_paths = [fs.getsyspath(path, allow_none=True) or path for path in dir_paths]
file_paths = [fs.getsyspath(path, allow_none=True) or path for path in file_paths] file_paths = [fs.getsyspath(path, allow_none=True) or path for path in file_paths]
...@@ -154,7 +162,7 @@ List contents of [PATH]""" ...@@ -154,7 +162,7 @@ List contents of [PATH]"""
output('\n') output('\n')
def run(): def run():
return FSList().run() return FSls().run()
if __name__ == "__main__": if __name__ == "__main__":
sys.exit(run()) sys.exit(run())
...@@ -22,6 +22,8 @@ Mounts a file system on a system path""" ...@@ -22,6 +22,8 @@ Mounts a file system on a system path"""
help="run the mount process in the foreground", metavar="FOREGROUND") help="run the mount process in the foreground", metavar="FOREGROUND")
optparse.add_option('-u', '--unmount', dest='unmount', action="store_true", default=False, optparse.add_option('-u', '--unmount', dest='unmount', action="store_true", default=False,
help="unmount path", metavar="UNMOUNT") help="unmount path", metavar="UNMOUNT")
optparse.add_option('-n', '--nocache', dest='nocache', action="store_true", default=False,
help="do not cache network filesystems", metavar="NOCACHE")
return optparse return optparse
...@@ -59,17 +61,20 @@ Mounts a file system on a system path""" ...@@ -59,17 +61,20 @@ Mounts a file system on a system path"""
return 1 return 1
fs = fs.opendir(path) fs = fs.opendir(path)
path = '/' path = '/'
if not options.nocache:
fs.cache_hint(True)
if not os.path.exists(mount_path): if not os.path.exists(mount_path):
os.makedirs(mount_path) os.makedirs(mount_path)
from fs.expose import fuse from fs.expose import fuse
if options.foreground: if options.foreground:
fuse_process = fuse.mount(fs, fuse_process = fuse.mount(fs,
mount_path, mount_path,
foreground=True) foreground=True)
else: else:
mp = fuse.mount(fs, if not os.fork():
mount_path, mp = fuse.mount(fs,
foreground=False) mount_path,
foreground=True)
......
...@@ -131,7 +131,7 @@ class Command(object): ...@@ -131,7 +131,7 @@ class Command(object):
resources = [] resources = []
for fs, path in fs_paths: for fs, path in fs_paths:
if path and iswildcard(path): if path and iswildcard(path):
if not files_only: if not files_only:
dir_paths = fs.listdir(wildcard=path, dirs_only=True) dir_paths = fs.listdir(wildcard=path, dirs_only=True)
......
...@@ -245,14 +245,10 @@ class FSOperations(Operations): ...@@ -245,14 +245,10 @@ class FSOperations(Operations):
@handle_fs_errors @handle_fs_errors
def readdir(self, path, fh=None): def readdir(self, path, fh=None):
path = path.decode(NATIVE_ENCODING) path = path.decode(NATIVE_ENCODING)
entries = ['.', '..'] entries = ['.', '..']
#print for (nm,info) in self.fs.listdirinfo(path):
#print self.fs
for (nm,info) in self.fs.listdirinfo(path):
#print "*", repr(nm), info
self._fill_stat_dict(pathjoin(path,nm),info) self._fill_stat_dict(pathjoin(path,nm),info)
entries.append((nm.encode(NATIVE_ENCODING),info,0)) entries.append((nm.encode(NATIVE_ENCODING),info,0))
#print
return entries return entries
@handle_fs_errors @handle_fs_errors
......
...@@ -11,7 +11,7 @@ __all__ = ['FTPFS'] ...@@ -11,7 +11,7 @@ __all__ = ['FTPFS']
import fs import fs
from fs.base import * from fs.base import *
from fs.errors import * from fs.errors import *
from fs.path import pathsplit, abspath, dirname, recursepath, normpath from fs.path import pathsplit, abspath, dirname, recursepath, normpath, pathjoin, isbase
from fs.remote import RemoteFileBuffer from fs.remote import RemoteFileBuffer
from ftplib import FTP, error_perm, error_temp, error_proto, error_reply from ftplib import FTP, error_perm, error_temp, error_proto, error_reply
...@@ -545,7 +545,7 @@ class _FTPFile(object): ...@@ -545,7 +545,7 @@ class _FTPFile(object):
self.file_size = ftpfs.getsize(path) self.file_size = ftpfs.getsize(path)
self.conn = None self.conn = None
path = _encode(path) path = _encode(abspath(path))
#self._lock = ftpfs._lock #self._lock = ftpfs._lock
self._start_file(mode, path) self._start_file(mode, path)
...@@ -610,6 +610,7 @@ class _FTPFile(object): ...@@ -610,6 +610,7 @@ class _FTPFile(object):
remaining_data -= chunk_size remaining_data -= chunk_size
self.write_pos += chunk_size self.write_pos += chunk_size
def __enter__(self): def __enter__(self):
return self return self
...@@ -618,7 +619,7 @@ class _FTPFile(object): ...@@ -618,7 +619,7 @@ class _FTPFile(object):
#@synchronize #@synchronize
def flush(self): def flush(self):
return self.ftpfs._on_file_written(self.path)
@synchronize @synchronize
def seek(self, pos, where=fs.SEEK_SET): def seek(self, pos, where=fs.SEEK_SET):
...@@ -670,6 +671,7 @@ class _FTPFile(object): ...@@ -670,6 +671,7 @@ class _FTPFile(object):
@synchronize @synchronize
def truncate(self, size=None): def truncate(self, size=None):
self.ftpfs._on_file_written(self.path)
# Inefficient, but I don't know how else to implement this # Inefficient, but I don't know how else to implement this
if size is None: if size is None:
size = self.tell() size = self.tell()
...@@ -697,19 +699,20 @@ class _FTPFile(object): ...@@ -697,19 +699,20 @@ class _FTPFile(object):
@synchronize @synchronize
def close(self): def close(self):
if 'w' in self.mode or 'a' in self.mode or '+' in self.mode:
self.ftpfs._on_file_written(self.path)
if self.conn is not None: if self.conn is not None:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
self.ftp.voidresp() self.ftp.voidresp()
if self.ftp is not None: if self.ftp is not None:
self.ftp.close() self.ftp.close()
self.closed = True self.closed = True
if 'w' in self.mode or 'a' in self.mode:
self.ftpfs._on_file_written(self.path)
def __iter__(self): def __iter__(self):
return self.next() return self.next()
@synchronize
def next(self): def next(self):
""" Line iterator """ Line iterator
...@@ -755,11 +758,9 @@ def ftperrors(f): ...@@ -755,11 +758,9 @@ def ftperrors(f):
except Exception, e: except Exception, e:
self._translate_exception(args[0] if args else '', e) self._translate_exception(args[0] if args else '', e)
finally: finally:
self._leave_dircache() self._leave_dircache()
finally: finally:
self._lock.release() self._lock.release()
if not self.use_dircache:
self.clear_dircache()
return ret return ret
return deco return deco
...@@ -769,10 +770,20 @@ def _encode(s): ...@@ -769,10 +770,20 @@ def _encode(s):
return s.encode('utf-8') return s.encode('utf-8')
return s return s
class _DirCache(dict):
def __init__(self):
super(_DirCache, self).__init__()
self.count = 0
def addref(self):
self.count += 1
return self.count
def decref(self):
self.count -= 1
return self.count
class FTPFS(FS): class FTPFS(FS):
_locals = threading.local()
_meta = { 'network' : True, _meta = { 'network' : True,
'virtual': False, 'virtual': False,
...@@ -799,7 +810,7 @@ class FTPFS(FS): ...@@ -799,7 +810,7 @@ class FTPFS(FS):
:param timeout: Timeout in seconds :param timeout: Timeout in seconds
:param port: Port to connection (default is 21) :param port: Port to connection (default is 21)
:param dircache: If True then directory information will be cached, :param dircache: If True then directory information will be cached,
which will speed up operations such as getinfo, isdi, isfile, but which will speed up operations such as getinfo, isdir, isfile, but
changes to the ftp file structure will not be visible until changes to the ftp file structure will not be visible until
`~fs.ftpfs.FTPFS.clear_dircache` is called `~fs.ftpfs.FTPFS.clear_dircache` is called
:param dircache: If True directory information will be cached for fast access :param dircache: If True directory information will be cached for fast access
...@@ -814,75 +825,67 @@ class FTPFS(FS): ...@@ -814,75 +825,67 @@ class FTPFS(FS):
self.passwd = passwd self.passwd = passwd
self.acct = acct self.acct = acct
self.timeout = timeout self.timeout = timeout
self.default_timeout = timeout is _GLOBAL_DEFAULT_TIMEOUT
self.use_dircache = dircache
self.use_dircache = dircache self._lock = threading.RLock()
self.get_dircache() self._init_dircache()
self._cache_hint = False self._cache_hint = False
self._locals._ftp = None try:
self._thread_ftps = set() self.ftp
self.ftp except FSError:
self.closed = True
raise
@synchronize def _init_dircache(self):
def cache_hint(self, enabled): self.dircache = _DirCache()
self._cache_hint = enabled
@synchronize @synchronize
def cache_hint(self, enabled):
self._cache_hint = bool(enabled)
def _enter_dircache(self): def _enter_dircache(self):
self.get_dircache() self.dircache.addref()
count = getattr(self._locals, '_dircache_count', 0)
count += 1
self._locals._dircache_count = count
@synchronize
def _leave_dircache(self): def _leave_dircache(self):
self._locals._dircache_count -= 1 self.dircache.decref()
if not self._locals._dircache_count and not self._cache_hint: if self.use_dircache:
if not self.dircache.count and not self._cache_hint:
self.clear_dircache()
else:
self.clear_dircache() self.clear_dircache()
assert self._locals._dircache_count >= 0, "dircache count should never be negative" assert self.dircache.count >= 0, "dircache count should never be negative"
@synchronize
def get_dircache(self):
dircache = getattr(self._locals, '_dircache', None)
if dircache is None:
dircache = {}
self._locals._dircache = dircache
self._locals._dircache_count = 0
return dircache
@synchronize @synchronize
def _on_file_written(self, path): def _on_file_written(self, path):
self.clear_dircache(dirname(path)) self.refresh_dircache(dirname(path))
@synchronize @synchronize
def _readdir(self, path): def _readdir(self, path):
path = normpath(path)
dircache = self.get_dircache() if self.dircache.count:
dircache_count = self._locals._dircache_count cached_dirlist = self.dircache.get(path)
if dircache_count:
cached_dirlist = dircache.get(path)
if cached_dirlist is not None: if cached_dirlist is not None:
return cached_dirlist return cached_dirlist
dirlist = {} dirlist = {}
parser = FTPListDataParser() parser = FTPListDataParser()
def on_line(line): def on_line(line):
#print repr(line)
if not isinstance(line, unicode): if not isinstance(line, unicode):
line = line.decode('utf-8') line = line.decode('utf-8')
info = parser.parse_line(line) info = parser.parse_line(line)
if info: if info:
info = info.__dict__ info = info.__dict__
dirlist[info['name']] = info if info['name'] not in ('.', '..'):
dirlist[info['name']] = info
try: try:
self.ftp.dir(_encode(path), on_line) self.ftp.dir(_encode(path), on_line)
except error_reply: except error_reply:
pass pass
dircache[path] = dirlist self.dircache[path] = dirlist
return dirlist return dirlist
...@@ -894,16 +897,29 @@ class FTPFS(FS): ...@@ -894,16 +897,29 @@ class FTPFS(FS):
:param path: Path of directory to clear cache for, or all directories if :param path: Path of directory to clear cache for, or all directories if
None (the default) None (the default)
""" """
dircache = self.get_dircache()
if not paths: if not paths:
dircache.clear() self.dircache.clear()
else: else:
for path in paths: remove_paths = []
dircache.pop(path, None) dircache = self.dircache
paths = [normpath(path) for path in paths]
for cached_path in dircache.keys():
for path in paths:
if isbase(cached_path, path):
dircache.pop(cached_path, None)
break
@synchronize
def refresh_dircache(self, *paths):
for path in paths:
path = abspath(normpath(path))
self.dircache.pop(path, None)
@synchronize @synchronize
def _check_path(self, path): def _check_path(self, path):
path = normpath(path)
base, fname = pathsplit(abspath(path)) base, fname = pathsplit(abspath(path))
dirlist = self._readdir(base) dirlist = self._readdir(base)
if fname and fname not in dirlist: if fname and fname not in dirlist:
...@@ -911,46 +927,47 @@ class FTPFS(FS): ...@@ -911,46 +927,47 @@ class FTPFS(FS):
return dirlist, fname return dirlist, fname
def _get_dirlist(self, path): def _get_dirlist(self, path):
path = normpath(path)
base, fname = pathsplit(abspath(path)) base, fname = pathsplit(abspath(path))
dirlist = self._readdir(base) dirlist = self._readdir(base)
return dirlist, fname return dirlist, fname
@synchronize @ftperrors
def get_ftp(self): def get_ftp(self):
if getattr(self._locals, '_ftp', None) is None: if self.closed:
self._locals._ftp = self._open_ftp() return None
ftp = self._locals._ftp if not getattr(self, '_ftp', None):
self._thread_ftps.add(ftp) self._ftp = self._open_ftp()
return self._locals._ftp return self._ftp
@synchronize
def set_ftp(self, ftp): ftp = property(get_ftp)
self._locals._ftp = ftp
ftp = property(get_ftp, set_ftp)
@synchronize @ftperrors
def _open_ftp(self): def _open_ftp(self):
try: try:
ftp = FTP() ftp = FTP()
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT: if self.default_timeout:
ftp.connect(self.host, self.port, self.timeout)
else:
ftp.connect(self.host, self.port) ftp.connect(self.host, self.port)
else:
ftp.connect(self.host, self.port, self.timeout)
ftp.login(self.user, self.passwd, self.acct) ftp.login(self.user, self.passwd, self.acct)
except socket_error, e: except socket_error, e:
raise RemoteConnectionError(str(e), details=e) raise RemoteConnectionError(str(e), details=e)
return ftp return ftp
def __getstate__(self): def __getstate__(self):
state = super(FTPFS, self).__getstate__() state = super(FTPFS, self).__getstate__()
del state["_thread_ftps"] del state['_lock']
state.pop('_ftp', None)
return state return state
def __setstate__(self,state): def __setstate__(self,state):
super(FTPFS, self).__setstate__(state) super(FTPFS, self).__setstate__(state)
self._thread_ftps = set() self._init_dircache()
self.ftp self._lock = threading.RLock()
#self._ftp = None
#self.ftp
def __str__(self): def __str__(self):
return '<FTPFS %s>' % self.host return '<FTPFS %s>' % self.host
...@@ -969,11 +986,13 @@ class FTPFS(FS): ...@@ -969,11 +986,13 @@ class FTPFS(FS):
""" """
if isinstance(exception, socket_error): if isinstance(exception, socket_error):
self._ftp = None
raise RemoteConnectionError(str(exception), details=exception) raise RemoteConnectionError(str(exception), details=exception)
elif isinstance(exception, error_temp): elif isinstance(exception, error_temp):
code, message = str(exception).split(' ', 1) code, message = str(exception).split(' ', 1)
raise RemoteConnectionError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception) self._ftp = None
raise RemoteConnectionError(str(exception), path=path, msg="FTP error: %s" % str(exception), details=exception)
elif isinstance(exception, error_perm): elif isinstance(exception, error_perm):
code, message = str(exception).split(' ', 1) code, message = str(exception).split(' ', 1)
...@@ -982,17 +1001,18 @@ class FTPFS(FS): ...@@ -982,17 +1001,18 @@ class FTPFS(FS):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if code == 552: if code == 552:
raise StorageSpaceError raise StorageSpaceError
raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception) raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s" % str(exception), details=exception)
raise exception raise exception
@ftperrors @ftperrors
def close(self): def close(self):
for ftp in self._thread_ftps: if not self.closed:
ftp.close() try:
self.ftp.close()
self._thread_ftps.clear() except FSError:
self.closed = True pass
self.closed = True
@ftperrors @ftperrors
def open(self, path, mode='r'): def open(self, path, mode='r'):
...@@ -1002,30 +1022,29 @@ class FTPFS(FS): ...@@ -1002,30 +1022,29 @@ class FTPFS(FS):
if 'r' in mode: if 'r' in mode:
if not self.isfile(path): if not self.isfile(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if 'w' in mode or 'a' in mode: if 'w' in mode or 'a' in mode or '+' in mode:
self.clear_dircache(dirname(path)) self.refresh_dircache(dirname(path))
ftp = self._open_ftp() ftp = self._open_ftp()
f = _FTPFile(self, ftp, path, mode) f = _FTPFile(self, ftp, normpath(path), mode)
return f return f
#remote_f = RemoteFileBuffer(self, path, mode, rfile = f)
#return remote_f
@ftperrors @ftperrors
def setcontents(self, path, data, chunk_size=8192): def setcontents(self, path, data, chunk_size=8192):
path = normpath(path)
if isinstance(data, basestring): if isinstance(data, basestring):
data = StringIO(data) data = StringIO(data)
self.ftp.storbinary('STOR %s' % _encode(normpath(path)), data, blocksize=chunk_size) self.refresh_dircache(dirname(path))
self.ftp.storbinary('STOR %s' % _encode(path), data, blocksize=chunk_size)
@ftperrors @ftperrors
def getcontents(self, path, chunk_size=8192): def getcontents(self, path):
if not self.exists(path): contents = StringIO()
raise ResourceNotFoundError(path=path) self.ftp.retrbinary('RETR %s' % _encode(normpath(path)), contents.write, blocksize=1024*16)
contents = StringIO()
self.ftp.retrbinary('RETR %s' % _encode(normpath(path)), contents.write, blocksize=chunk_size)
return contents.getvalue() return contents.getvalue()
@ftperrors @ftperrors
def exists(self, path): def exists(self, path):
path = normpath(path)
if path in ('', '/'): if path in ('', '/'):
return True return True
dirlist, fname = self._get_dirlist(path) dirlist, fname = self._get_dirlist(path)
...@@ -1033,6 +1052,7 @@ class FTPFS(FS): ...@@ -1033,6 +1052,7 @@ class FTPFS(FS):
@ftperrors @ftperrors
def isdir(self, path): def isdir(self, path):
path = normpath(path)
if path in ('', '/'): if path in ('', '/'):
return True return True
dirlist, fname = self._get_dirlist(path) dirlist, fname = self._get_dirlist(path)
...@@ -1043,6 +1063,7 @@ class FTPFS(FS): ...@@ -1043,6 +1063,7 @@ class FTPFS(FS):
@ftperrors @ftperrors
def isfile(self, path): def isfile(self, path):
path = normpath(path)
if path in ('', '/'): if path in ('', '/'):
return False return False
dirlist, fname = self._get_dirlist(path) dirlist, fname = self._get_dirlist(path)
...@@ -1052,8 +1073,9 @@ class FTPFS(FS): ...@@ -1052,8 +1073,9 @@ class FTPFS(FS):
return not info['try_cwd'] return not info['try_cwd']
@ftperrors @ftperrors
def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
path = normpath(path) path = normpath(path)
self.clear_dircache(path)
if not self.exists(path): if not self.exists(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if not self.isdir(path): if not self.isdir(path):
...@@ -1062,13 +1084,37 @@ class FTPFS(FS): ...@@ -1062,13 +1084,37 @@ class FTPFS(FS):
return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only) return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only)
@ftperrors
def listdirinfo(self, path="./",
wildcard=None,
full=False,
absolute=False,
dirs_only=False,
files_only=False):
path = normpath(path)
def getinfo(p):
try:
if full or absolute:
return self.getinfo(p)
else:
return self.getinfo(pathjoin(path,p))
except FSError:
return {}
return [(p, getinfo(p))
for p in self.listdir(path,
wildcard=wildcard,
full=full,
absolute=absolute,
dirs_only=dirs_only,
files_only=files_only)]
@ftperrors @ftperrors
def makedir(self, path, recursive=False, allow_recreate=False): def makedir(self, path, recursive=False, allow_recreate=False):
if path in ('', '/'): if path in ('', '/'):
return return
def checkdir(path): def checkdir(path):
self.clear_dircache(dirname(path), path) self.clear_dircache(dirname(path))
try: try:
self.ftp.mkd(_encode(path)) self.ftp.mkd(_encode(path))
except error_reply: except error_reply:
...@@ -1093,20 +1139,20 @@ class FTPFS(FS): ...@@ -1093,20 +1139,20 @@ class FTPFS(FS):
if self.isfile(path): if self.isfile(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
raise DestinationExistsError(path) raise DestinationExistsError(path)
checkdir(path) checkdir(path)
@ftperrors @ftperrors
def remove(self, path): def remove(self, path):
if not self.exists(path): if not self.exists(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if not self.isfile(path): if not self.isfile(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
self.clear_dircache(dirname(path)) self.refresh_dircache(dirname(path))
self.ftp.delete(_encode(path)) self.ftp.delete(_encode(path))
@ftperrors @ftperrors
def removedir(self, path, recursive=False, force=False): def removedir(self, path, recursive=False, force=False):
if not self.exists(path): if not self.exists(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if self.isfile(path): if self.isfile(path):
...@@ -1125,7 +1171,7 @@ class FTPFS(FS): ...@@ -1125,7 +1171,7 @@ class FTPFS(FS):
self.removedir(rpath, force=force) self.removedir(rpath, force=force)
except FSError: except FSError:
pass pass
self.clear_dircache(dirname(path), path) self.clear_dircache(dirname(path))
self.ftp.rmd(_encode(path)) self.ftp.rmd(_encode(path))
except error_reply: except error_reply:
pass pass
...@@ -1134,19 +1180,21 @@ class FTPFS(FS): ...@@ -1134,19 +1180,21 @@ class FTPFS(FS):
self.removedir(dirname(path), recursive=True) self.removedir(dirname(path), recursive=True)
except DirectoryNotEmptyError: except DirectoryNotEmptyError:
pass pass
self.clear_dircache(dirname(path), path)
@ftperrors @ftperrors
def rename(self, src, dst): def rename(self, src, dst):
self.clear_dircache(dirname(src), dirname(dst), src, dst)
try: try:
self.ftp.rename(_encode(src), _encode(dst)) self.refresh_dircache(dirname(src), dirname(dst))
self.ftp.rename(_encode(src), _encode(dst))
except error_perm, exception: except error_perm, exception:
code, message = str(exception).split(' ', 1) code, message = str(exception).split(' ', 1)
if code == "550": if code == "550":
if not self.exists(dirname(dst)): if not self.exists(dirname(dst)):
raise ParentDirectoryMissingError(dst) raise ParentDirectoryMissingError(dst)
raise
except error_reply: except error_reply:
pass pass
@ftperrors @ftperrors
def getinfo(self, path): def getinfo(self, path):
...@@ -1162,7 +1210,7 @@ class FTPFS(FS): ...@@ -1162,7 +1210,7 @@ class FTPFS(FS):
def getsize(self, path): def getsize(self, path):
size = None size = None
if self._locals._dircache_count: if self.dircache.count:
dirlist, fname = self._check_path(path) dirlist, fname = self._check_path(path)
size = dirlist[fname].get('size') size = dirlist[fname].get('size')
...@@ -1187,27 +1235,49 @@ class FTPFS(FS): ...@@ -1187,27 +1235,49 @@ class FTPFS(FS):
@ftperrors @ftperrors
def move(self, src, dst, overwrite=False, chunk_size=16384): def move(self, src, dst, overwrite=False, chunk_size=16384):
if not overwrite and self.exists(dst): if not overwrite and self.exists(dst):
raise DestinationExistsError(dst) raise DestinationExistsError(dst)
self.clear_dircache(dirname(src), dirname(dst)) #self.refresh_dircache(dirname(src), dirname(dst))
try: try:
self.rename(src, dst) self.rename(src, dst)
except error_reply:
pass
except: except:
self.copy(src, dst) self.copy(src, dst, overwrite=overwrite)
self.remove(src) self.remove(src)
finally:
self.refresh_dircache(src, dirname(src), dst, dirname(dst))
@ftperrors
def copy(self, src, dst, overwrite=False, chunk_size=1024*64):
if not self.isfile(src):
if self.isdir(src):
raise ResourceInvalidError(src, msg="Source is not a file: %(path)s")
raise ResourceNotFoundError(src)
if not overwrite and self.exists(dst):
raise DestinationExistsError(dst)
src_file = None
try:
src_file = self.open(src, "rb")
ftp = self._open_ftp()
ftp.voidcmd('TYPE I')
ftp.storbinary('STOR %s' % _encode((normpath(dst))), src_file, blocksize=chunk_size)
finally:
self.refresh_dircache(dirname(dst))
if src_file is not None:
src_file.close()
@ftperrors @ftperrors
def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384): def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
self.clear_dircache(src, dst, dirname(src), dirname(dst)) self.clear_dircache(dirname(src), dirname(dst))
super(FTPFS, self).movedir(src, dst, overwrite, ignore_errors, chunk_size) super(FTPFS, self).movedir(src, dst, overwrite, ignore_errors, chunk_size)
@ftperrors @ftperrors
def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384): def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
self.clear_dircache(src, dst, dirname(src), dirname(dst)) self.clear_dircache(dirname(dst))
super(FTPFS, self).copydir(src, dst, overwrite, ignore_errors, chunk_size) super(FTPFS, self).copydir(src, dst, overwrite, ignore_errors, chunk_size)
if __name__ == "__main__": if __name__ == "__main__":
......
...@@ -102,14 +102,12 @@ class OpenerRegistry(object): ...@@ -102,14 +102,12 @@ class OpenerRegistry(object):
fs_name = default_fs_name or self.default_opener fs_name = default_fs_name or self.default_opener
fs_url = _expand_syspath(fs_url) fs_url = _expand_syspath(fs_url)
path = '' path = ''
fs_name, fs_name_params = self.parse_name(fs_name) fs_name, fs_name_params = self.parse_name(fs_name)
opener = self.get_opener(fs_name) opener = self.get_opener(fs_name)
if fs_url is None: if fs_url is None:
raise OpenerError("Unable to parse '%s'" % orig_url) raise OpenerError("Unable to parse '%s'" % orig_url)
fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create) fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create)
...@@ -118,19 +116,14 @@ class OpenerRegistry(object): ...@@ -118,19 +116,14 @@ class OpenerRegistry(object):
if pathname: if pathname:
fs = fs.opendir(pathname) fs = fs.opendir(pathname)
return fs, resourcename return fs, resourcename
#pathname, resourcename = pathsplit(fs_path or '')
#if pathname and resourcename:
# fs = fs.opendir(pathname)
# fs_path = resourcename
fs_path = join(fs_path, path) fs_path = join(fs_path, path)
pathname, resourcename = pathsplit(fs_path or '') pathname, resourcename = pathsplit(fs_path or '')
if pathname and resourcename: if pathname and resourcename:
fs = fs.opendir(pathname) fs = fs.opendir(pathname)
fs_path = resourcename fs_path = resourcename
return fs, fs_path return fs, fs_path
def parse_credentials(self, url): def parse_credentials(self, url):
...@@ -198,13 +191,10 @@ class ZipOpener(Opener): ...@@ -198,13 +191,10 @@ class ZipOpener(Opener):
zip_fs, zip_path = registry.parse(fs_path) zip_fs, zip_path = registry.parse(fs_path)
if zip_path is None: if zip_path is None:
raise OpenerError('File required for zip opener') raise OpenerError('File required for zip opener')
if create: if writeable:
open_mode = 'wb' open_mode = 'r+b'
if append_zip:
open_mode = 'r+b'
else: else:
open_mode = 'rb' open_mode = 'rb'
zip_file = zip_fs.open(zip_path, mode=open_mode) zip_file = zip_fs.open(zip_path, mode=open_mode)
username, password, fs_path = registry.parse_credentials(fs_path) username, password, fs_path = registry.parse_credentials(fs_path)
...@@ -212,18 +202,12 @@ class ZipOpener(Opener): ...@@ -212,18 +202,12 @@ class ZipOpener(Opener):
from fs.zipfs import ZipFS from fs.zipfs import ZipFS
if zip_file is None: if zip_file is None:
zip_file = fs_path zip_file = fs_path
if append_zip: mode = 'r'
if writeable:
mode = 'a' mode = 'a'
elif create:
mode = 'w'
else:
if writeable:
mode = 'w'
else:
mode = 'a'
allow_zip_64 = fs_name == 'zip64' allow_zip_64 = fs_name.endswith('64')
zipfs = ZipFS(zip_file, mode=mode, allow_zip_64=allow_zip_64) zipfs = ZipFS(zip_file, mode=mode, allow_zip_64=allow_zip_64)
return zipfs, None return zipfs, None
...@@ -256,7 +240,7 @@ class FTPOpener(Opener): ...@@ -256,7 +240,7 @@ class FTPOpener(Opener):
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create): def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create):
from fs.ftpfs import FTPFS from fs.ftpfs import FTPFS
username, password, fs_path = registry.parse_credentials(fs_path) username, password, fs_path = registry.parse_credentials(fs_path)
scheme, netloc, path, params, query, fragment = urlparse(fs_path) scheme, netloc, path, params, query, fragment = urlparse(fs_path)
if not scheme: if not scheme:
fs_path = 'ftp://' + fs_path fs_path = 'ftp://' + fs_path
...@@ -365,7 +349,11 @@ class TempOpener(Opener): ...@@ -365,7 +349,11 @@ class TempOpener(Opener):
@classmethod @classmethod
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create): def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create):
from fs.tempfs import TempFS from fs.tempfs import TempFS
return TempFS(identifier=fs_name_params, temp_dir=fs_path), None fs = TempFS(identifier=fs_name_params)
if create and fs_path:
fs = fs.makeopendir(fs_path)
fs_path = pathsplit(fs_path)
return fs, fs_path
opener = OpenerRegistry([OSFSOpener, opener = OpenerRegistry([OSFSOpener,
...@@ -382,7 +370,7 @@ opener = OpenerRegistry([OSFSOpener, ...@@ -382,7 +370,7 @@ opener = OpenerRegistry([OSFSOpener,
def main(): def main():
#fs, path = opener.parse('zip:zip://~/zips.zip!t.zip!') #fs, path = opener.parse('zip:zip://~/zips.zip!t.zip!')
fs, path = opener.parse('rpc://127.0.0.1/a/*.JPG') fs, path = opener.parse('ftp://releases.mozilla.org/welcome.msg')
print fs, path print fs, path
......
...@@ -266,6 +266,10 @@ def issamedir(path1, path2): ...@@ -266,6 +266,10 @@ def issamedir(path1, path2):
""" """
return pathsplit(normpath(path1))[0] == pathsplit(normpath(path2))[0] return pathsplit(normpath(path1))[0] == pathsplit(normpath(path2))[0]
def isbase(path1, path2):
p1 = forcedir(abspath(path1))
p2 = forcedir(abspath(path2))
return p1 == p2 or p1.startswith(p2)
def isprefix(path1, path2): def isprefix(path1, path2):
"""Return true is path1 is a prefix of path2. """Return true is path1 is a prefix of path2.
......
...@@ -108,6 +108,7 @@ class RPCFS(FS): ...@@ -108,6 +108,7 @@ class RPCFS(FS):
self._transport = transport self._transport = transport
self.proxy = self._make_proxy() self.proxy = self._make_proxy()
FS.__init__(self,thread_synchronize=False) FS.__init__(self,thread_synchronize=False)
self.isdir('/')
def _make_proxy(self): def _make_proxy(self):
kwds = dict(allow_none=True) kwds = dict(allow_none=True)
......
...@@ -39,6 +39,9 @@ class TempFS(OSFS): ...@@ -39,6 +39,9 @@ class TempFS(OSFS):
default uses "TempFS" default uses "TempFS"
""" """
self.identifier = identifier
self.temp_dir = temp_dir
self.dir_mode = dir_mode
self._temp_dir = tempfile.mkdtemp(identifier or "TempFS",dir=temp_dir) self._temp_dir = tempfile.mkdtemp(identifier or "TempFS",dir=temp_dir)
self._cleaned = False self._cleaned = False
super(TempFS, self).__init__(self._temp_dir, dir_mode=dir_mode, thread_synchronize=thread_synchronize) super(TempFS, self).__init__(self._temp_dir, dir_mode=dir_mode, thread_synchronize=thread_synchronize)
...@@ -50,6 +53,13 @@ class TempFS(OSFS): ...@@ -50,6 +53,13 @@ class TempFS(OSFS):
def __unicode__(self): def __unicode__(self):
return u'<TempFS: %s>' % self._temp_dir return u'<TempFS: %s>' % self._temp_dir
def __setstate__(self, state):
state = super(TempFS, self).__setstate__(state)
self._temp_dir = tempfile.mkdtemp(self.identifier or "TempFS", dir=self.temp_dir)
super(TempFS, self).__init__(self._temp_dir,
dir_mode=self.dir_mode,
thread_synchronize=self.thread_synchronize)
def close(self): def close(self):
"""Removes the temporary directory. """Removes the temporary directory.
......
...@@ -486,6 +486,7 @@ class FSTestCases(object): ...@@ -486,6 +486,7 @@ class FSTestCases(object):
makefile("foo/bar/a.txt") makefile("foo/bar/a.txt")
self.assert_(check("foo/bar/a.txt")) self.assert_(check("foo/bar/a.txt"))
self.assert_(checkcontents("foo/bar/a.txt")) self.assert_(checkcontents("foo/bar/a.txt"))
#import rpdb2; rpdb2.start_embedded_debugger('password');
self.fs.copy("foo/bar/a.txt", "foo/b.txt") self.fs.copy("foo/bar/a.txt", "foo/b.txt")
self.assert_(check("foo/bar/a.txt")) self.assert_(check("foo/bar/a.txt"))
self.assert_(check("foo/b.txt")) self.assert_(check("foo/b.txt"))
......
...@@ -32,8 +32,9 @@ class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases): ...@@ -32,8 +32,9 @@ class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases):
self.ftp_server = subprocess.Popen([sys.executable, abspath(__file__), self.temp_dir, str(use_port)]) self.ftp_server = subprocess.Popen([sys.executable, abspath(__file__), self.temp_dir, str(use_port)])
# Need to sleep to allow ftp server to start # Need to sleep to allow ftp server to start
time.sleep(.1) time.sleep(.2)
self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', port=use_port, timeout=5.0) self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', dircache=True, port=use_port, timeout=5.0)
self.fs.cache_hint(True)
def tearDown(self): def tearDown(self):
......
...@@ -53,14 +53,23 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1 ...@@ -53,14 +53,23 @@ def copyfile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
FS._shutil_copyfile(src_syspath, dst_syspath) FS._shutil_copyfile(src_syspath, dst_syspath)
return return
src = None src = None
dst = None
try: try:
# Chunk copy # Chunk copy
src = src_fs.open(src_path, 'rb') src = src_fs.open(src_path, 'rb')
dst_fs.setcontents(dst_path, src, chunk_size=chunk_size) dst = src_fs.open(dst_path, 'wb')
write = dst.write
read = src.read
chunk = read(chunk_size)
while chunk:
write(chunk)
chunk = read(chunk_size)
finally: finally:
if src is not None and hasattr(src, 'close'): if src is not None:
src.close() src.close()
if dst is not None:
dst.close()
def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024): def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1024):
...@@ -89,14 +98,23 @@ def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1 ...@@ -89,14 +98,23 @@ def movefile(src_fs, src_path, dst_fs, dst_path, overwrite=True, chunk_size=64*1
FS._shutil_movefile(src_syspath, dst_syspath) FS._shutil_movefile(src_syspath, dst_syspath)
return return
src = None src = None
dst = None
try: try:
# Chunk copy # Chunk copy
src = src_fs.open(src_path, 'rb') src = src_fs.open(src_path, 'rb')
dst_fs.setcontents(dst_path, src, chunk_size=chunk_size) dst = src_fs.open(dst_path, 'wb')
write = dst.write
read = src.read
chunk = read(chunk_size)
while chunk:
write(chunk)
chunk = read(chunk_size)
finally: finally:
if src is not None and hasattr(src, 'close'): if src is not None:
src.close() src.close()
if dst is not None:
dst.close()
src_fs.remove(src_path) src_fs.remove(src_path)
...@@ -416,7 +434,7 @@ def print_fs(fs, path='/', max_levels=5, file_out=None, terminal_colors=None, hi ...@@ -416,7 +434,7 @@ def print_fs(fs, path='/', max_levels=5, file_out=None, terminal_colors=None, hi
if is_dir: if is_dir:
write('%s %s' % (wrap_prefix(prefix + '--'), wrap_dirname(item))) write('%s %s' % (wrap_prefix(prefix + '--'), wrap_dirname(item)))
if max_levels is not None and len(levels) >= max_levels: if max_levels is not None and len(levels) + 1 >= max_levels:
pass pass
#write(wrap_prefix(prefix[:-1] + ' ') + wrap_error('max recursion levels reached')) #write(wrap_prefix(prefix[:-1] + ' ') + wrap_error('max recursion levels reached'))
else: else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment