Commit 62e77b6a by willmcgugan

Optimized FTP fs by caching directory structure

parent 72f3d49f
...@@ -26,4 +26,4 @@ ...@@ -26,4 +26,4 @@
* New FS implementation: * New FS implementation:
* FTPFS: access a plain old FTP server * FTPFS: access a plain old FTP server
* ReadOnlyFS: a WrapFS that makes an fs read-only * ReadOnlyFS: a WrapFS that makes an fs read-only
* Added cache_hint method to base.py
...@@ -107,7 +107,7 @@ try: ...@@ -107,7 +107,7 @@ try:
from functools import wraps from functools import wraps
except ImportError: except ImportError:
wraps = lambda f:f wraps = lambda f:f
def synchronize(func): def synchronize(func):
"""Decorator to synchronize a method on self._lock.""" """Decorator to synchronize a method on self._lock."""
@wraps(func) @wraps(func)
...@@ -132,7 +132,7 @@ class FS(object): ...@@ -132,7 +132,7 @@ class FS(object):
"""The base class for Filesystem objects. """The base class for Filesystem objects.
:param thread_synconize: If True, a lock object will be created for the object, otherwise a dummy lock will be used. :param thread_synconize: If True, a lock object will be created for the object, otherwise a dummy lock will be used.
:type thread_syncronize: bool :type thread_syncronize: bool
""" """
self.closed = False self.closed = False
if thread_synchronize: if thread_synchronize:
...@@ -144,7 +144,17 @@ class FS(object): ...@@ -144,7 +144,17 @@ class FS(object):
if not getattr(self, 'closed', True): if not getattr(self, 'closed', True):
self.close() self.close()
def close(self): def cache_hint(self, enabled):
"""Recommends the use of caching. Implementations are free to use or
ignore this value.
:param enabled: If True the implementation is permitted to cache directory
structure / file info.
"""
pass
def close(self):
self.closed = True self.closed = True
def __getstate__(self): def __getstate__(self):
...@@ -178,12 +188,12 @@ class FS(object): ...@@ -178,12 +188,12 @@ class FS(object):
then a NoSysPathError exception is thrown. Otherwise, the system then a NoSysPathError exception is thrown. Otherwise, the system
path will be returned as a unicode string. path will be returned as a unicode string.
:param path: A path within the filesystem :param path: A path within the filesystem
:param allow_none: If True, this method will return None when there is no system path, :param allow_none: If True, this method will return None when there is no system path,
rather than raising NoSysPathError rather than raising NoSysPathError
:type allow_none: bool :type allow_none: bool
:raises NoSysPathError: If the path does not map on to a system path, and allow_none is set to False (default) :raises NoSysPathError: If the path does not map on to a system path, and allow_none is set to False (default)
:rtype: unicode :rtype: unicode
""" """
if not allow_none: if not allow_none:
raise NoSysPathError(path=path) raise NoSysPathError(path=path)
...@@ -191,8 +201,8 @@ class FS(object): ...@@ -191,8 +201,8 @@ class FS(object):
def hassyspath(self, path): def hassyspath(self, path):
"""Return True if the path maps to a system path (a path recognised by the OS). """Return True if the path maps to a system path (a path recognised by the OS).
:param path: -- Path to check :param path: -- Path to check
:rtype: bool :rtype: bool
""" """
return self.getsyspath(path, allow_none=True) is not None return self.getsyspath(path, allow_none=True) is not None
...@@ -201,7 +211,7 @@ class FS(object): ...@@ -201,7 +211,7 @@ class FS(object):
def open(self, path, mode="r", **kwargs): def open(self, path, mode="r", **kwargs):
"""Open a the given path as a file-like object. """Open a the given path as a file-like object.
:param path: A path to file that should be opened :param path: A path to file that should be opened
:param mode: Mode of file to open, identical to the mode string used :param mode: Mode of file to open, identical to the mode string used
in 'file' and 'open' builtins in 'file' and 'open' builtins
:param kwargs: Additional (optional) keyword parameters that may :param kwargs: Additional (optional) keyword parameters that may
...@@ -212,12 +222,12 @@ class FS(object): ...@@ -212,12 +222,12 @@ class FS(object):
def safeopen(self, *args, **kwargs): def safeopen(self, *args, **kwargs):
"""Like 'open', but returns a NullFile if the file could not be opened. """Like 'open', but returns a NullFile if the file could not be opened.
A NullFile is a dummy file which has all the methods of a file-like object, A NullFile is a dummy file which has all the methods of a file-like object,
but contains no data. but contains no data.
:rtype: file-like object :rtype: file-like object
""" """
try: try:
f = self.open(*args, **kwargs) f = self.open(*args, **kwargs)
...@@ -228,27 +238,27 @@ class FS(object): ...@@ -228,27 +238,27 @@ class FS(object):
def exists(self, path): def exists(self, path):
"""Returns True if the path references a valid resource. """Returns True if the path references a valid resource.
:param path: A path in the filessystem :param path: A path in the filessystem
:rtype: bool :rtype: bool
""" """
return self.isfile(path) or self.isdir(path) return self.isfile(path) or self.isdir(path)
def isdir(self, path): def isdir(self, path):
"""Returns True if a given path references a directory. """Returns True if a given path references a directory.
:param path: A path in the filessystem :param path: A path in the filessystem
:rtype: bool :rtype: bool
""" """
raise UnsupportedError("check for directory") raise UnsupportedError("check for directory")
def isfile(self, path): def isfile(self, path):
"""Returns True if a given path references a file. """Returns True if a given path references a file.
:param path: A path in the filessystem :param path: A path in the filessystem
:rtype: bool :rtype: bool
""" """
raise UnsupportedError("check for file") raise UnsupportedError("check for file")
...@@ -272,23 +282,56 @@ class FS(object): ...@@ -272,23 +282,56 @@ class FS(object):
:param path: Root of the path to list :param path: Root of the path to list
:type path: str :type path: str
:param wildcard: Only returns paths that match this wildcard :param wildcard: Only returns paths that match this wildcard
:type wildcard: str :type wildcard: str
:param full: Returns full paths (relative to the root) :param full: Returns full paths (relative to the root)
:type full: bool :type full: bool
:param absolute: Returns absolute paths (paths begining with /) :param absolute: Returns absolute paths (paths begining with /)
:type absolute: bool :type absolute: bool
:param dirs_only: If True, only return directories :param dirs_only: If True, only return directories
:type dirs_only: bool :type dirs_only: bool
:param files_only: If True, only return files :param files_only: If True, only return files
:type files_only: bool :type files_only: bool
:rtype: iterable of paths :rtype: iterable of paths
:raises ResourceNotFoundError: If the path is not found :raises ResourceNotFoundError: If the path is not found
:raises ResourceInvalidError: If the path exists, but is not a directory :raises ResourceInvalidError: If the path exists, but is not a directory
""" """
raise UnsupportedError("list directory") raise UnsupportedError("list directory")
def listdirinfo(self, path="./",
wildcard=None,
full=False,
absolute=False,
dirs_only=False,
files_only=False):
"""Retrieves an iterable of paths and path info (as returned by getinfo) under
a given path.
:param path: Root of the path to list
:param wildcard: Filter paths that mach this wildcard
:dirs_only: Return only directory paths
:files_only: Return only files
:raises ResourceNotFoundError: If the path is not found
:raises ResourceInvalidError: If the path exists, but is not a directory
"""
def get_path(p):
if not full:
return pathjoin(path, p)
return [(p, self.getinfo(get_path(p)))
for p in self._listdir( path,
widcard=wildcard,
full=full,
absolute=absolute,
dirs_only=dirs_only,
files_only=files_only )]
def _listdir_helper(self, path, entries, def _listdir_helper(self, path, entries,
wildcard=None, wildcard=None,
...@@ -328,14 +371,14 @@ class FS(object): ...@@ -328,14 +371,14 @@ class FS(object):
:param path: Path of directory :param path: Path of directory
:param recursive: If True, any intermediate directories will also be created :param recursive: If True, any intermediate directories will also be created
:type recursive: bool :type recursive: bool
:param allow_recreate: If True, re-creating a directory wont be an error :param allow_recreate: If True, re-creating a directory wont be an error
:type allow_create: bool :type allow_create: bool
:raises DestinationExistsError: If the path is already a directory, and allow_recreate is False :raises DestinationExistsError: If the path is already a directory, and allow_recreate is False
:raises ParentDirectoryMissingError: If a containing directory is missing and recursive is False :raises ParentDirectoryMissingError: If a containing directory is missing and recursive is False
:raises ResourceInvalidError: If a path is an existing file :raises ResourceInvalidError: If a path is an existing file
""" """
raise UnsupportedError("make directory") raise UnsupportedError("make directory")
...@@ -346,7 +389,7 @@ class FS(object): ...@@ -346,7 +389,7 @@ class FS(object):
:raises ResourceNotFoundError: If the path does not exist :raises ResourceNotFoundError: If the path does not exist
:raises ResourceInvalidError: If the path is a directory :raises ResourceInvalidError: If the path is a directory
""" """
raise UnsupportedError("remove resource") raise UnsupportedError("remove resource")
...@@ -358,11 +401,11 @@ class FS(object): ...@@ -358,11 +401,11 @@ class FS(object):
:type recursive: bool :type recursive: bool
:param force: If True, any directory contents will be removed :param force: If True, any directory contents will be removed
:type force: bool :type force: bool
:raises ResourceNotFoundError: If the path does not exist :raises ResourceNotFoundError: If the path does not exist
:raises ResourceInvalidError: If the path is not a directory :raises ResourceInvalidError: If the path is not a directory
:raises DirectoryNotEmptyError: If the directory is not empty and force is False :raises DirectoryNotEmptyError: If the directory is not empty and force is False
""" """
raise UnsupportedError("remove directory") raise UnsupportedError("remove directory")
...@@ -463,7 +506,7 @@ class FS(object): ...@@ -463,7 +506,7 @@ class FS(object):
:param search: -- A string dentifying the method used to walk the directories. There are two such methods: :param search: -- A string dentifying the method used to walk the directories. There are two such methods:
* 'breadth' Yields paths in the top directories first * 'breadth' Yields paths in the top directories first
* 'depth' Yields the deepest paths first * 'depth' Yields the deepest paths first
""" """
if search == "breadth": if search == "breadth":
dirs = [path] dirs = [path]
...@@ -772,7 +815,7 @@ class SubFS(FS): ...@@ -772,7 +815,7 @@ class SubFS(FS):
def __str__(self): def __str__(self):
return "<SubFS: %s in %s>" % (self.sub_dir, self.parent) return "<SubFS: %s in %s>" % (self.sub_dir, self.parent)
def __unicode__(self): def __unicode__(self):
return u"<SubFS: %s in %s>" % (self.sub_dir, self.parent) return u"<SubFS: %s in %s>" % (self.sub_dir, self.parent)
......
...@@ -35,7 +35,7 @@ class InfoFrame(wx.Frame): ...@@ -35,7 +35,7 @@ class InfoFrame(wx.Frame):
self.list_ctrl.SetColumnWidth(1, 300) self.list_ctrl.SetColumnWidth(1, 300)
for key in keys: for key in keys:
self.list_ctrl.Append((key, repr(info.get(key)))) self.list_ctrl.Append((key, str(info.get(key))))
...@@ -99,7 +99,7 @@ class BrowseFrame(wx.Frame): ...@@ -99,7 +99,7 @@ class BrowseFrame(wx.Frame):
return return
paths = [(self.fs.isdir(p), p) for p in self.fs.listdir(path, absolute=True)] paths = [(self.fs.isdir(p), p) for p in self.fs.listdir(path, absolute=True)]
if not paths: if not paths:
#self.tree.SetItemHasChildren(item_id, False) #self.tree.SetItemHasChildren(item_id, False)
#self.tree.Collapse(item_id) #self.tree.Collapse(item_id)
......
...@@ -22,7 +22,7 @@ try: ...@@ -22,7 +22,7 @@ try:
from cStringIO import StringIO from cStringIO import StringIO
except ImportError: except ImportError:
from StringIO import StringIO from StringIO import StringIO
import time import time
import sys import sys
...@@ -33,7 +33,7 @@ import sys ...@@ -33,7 +33,7 @@ import sys
class Enum(object): class Enum(object):
def __init__(self, *names): def __init__(self, *names):
self._names_map = dict((name, i) for i, name in enumerate(names)) self._names_map = dict((name, i) for i, name in enumerate(names))
def __getattr__(self, name): def __getattr__(self, name):
return self._names_map[name] return self._names_map[name]
...@@ -512,13 +512,13 @@ def _skip(s, i, c): ...@@ -512,13 +512,13 @@ def _skip(s, i, c):
class _FTPFile(object): class _FTPFile(object):
""" A file-like that provides access to a file being streamed over ftp.""" """ A file-like that provides access to a file being streamed over ftp."""
def __init__(self, ftpfs, ftp, path, mode): def __init__(self, ftpfs, ftp, path, mode):
if not hasattr(self, '_lock'): if not hasattr(self, '_lock'):
self._lock = threading.RLock() self._lock = threading.RLock()
...@@ -532,14 +532,14 @@ class _FTPFile(object): ...@@ -532,14 +532,14 @@ class _FTPFile(object):
if 'r' in mode or 'a' in mode: if 'r' in mode or 'a' in mode:
self.file_size = ftpfs.getsize(path) self.file_size = ftpfs.getsize(path)
self.conn = None self.conn = None
path = _encode(path) path = _encode(path)
#self._lock = ftpfs._lock #self._lock = ftpfs._lock
if 'r' in mode: if 'r' in mode:
self.ftp.voidcmd('TYPE I') self.ftp.voidcmd('TYPE I')
self.conn = ftp.transfercmd('RETR '+path, None) self.conn = ftp.transfercmd('RETR '+path, None)
#self._ftp_thread = threading.Thread(target=do_read) #self._ftp_thread = threading.Thread(target=do_read)
#self._ftp_thread.start() #self._ftp_thread.start()
elif 'w' in mode or 'a' in mode: elif 'w' in mode or 'a' in mode:
...@@ -556,17 +556,17 @@ class _FTPFile(object): ...@@ -556,17 +556,17 @@ class _FTPFile(object):
# if callback: callback(buf) # if callback: callback(buf)
#conn.close() #conn.close()
#return self.voidresp() #return self.voidresp()
#self._ftp_thread = threading.Thread(target=do_write) #self._ftp_thread = threading.Thread(target=do_write)
#self._ftp_thread.start() #self._ftp_thread.start()
@synchronize @synchronize
def read(self, size=None): def read(self, size=None):
if self.conn is None: if self.conn is None:
return '' return ''
chunks = [] chunks = []
if size is None: if size is None:
while 1: while 1:
data = self.conn.recv(4096) data = self.conn.recv(4096)
if not data: if not data:
...@@ -575,9 +575,9 @@ class _FTPFile(object): ...@@ -575,9 +575,9 @@ class _FTPFile(object):
self.ftp.voidresp() self.ftp.voidresp()
break break
chunks.append(data) chunks.append(data)
self.read_pos += len(data) self.read_pos += len(data)
return ''.join(chunks) return ''.join(chunks)
remaining_bytes = size remaining_bytes = size
while remaining_bytes: while remaining_bytes:
read_size = min(remaining_bytes, 4096) read_size = min(remaining_bytes, 4096)
...@@ -590,42 +590,42 @@ class _FTPFile(object): ...@@ -590,42 +590,42 @@ class _FTPFile(object):
chunks.append(data) chunks.append(data)
self.read_pos += len(data) self.read_pos += len(data)
remaining_bytes -= len(data) remaining_bytes -= len(data)
return ''.join(chunks) return ''.join(chunks)
@synchronize @synchronize
def write(self, data): def write(self, data):
data_pos = 0 data_pos = 0
remaining_data = len(data) remaining_data = len(data)
while remaining_data: while remaining_data:
chunk_size = min(remaining_data, 4096) chunk_size = min(remaining_data, 4096)
self.conn.sendall(data[data_pos:data_pos+chunk_size]) self.conn.sendall(data[data_pos:data_pos+chunk_size])
data_pos += chunk_size data_pos += chunk_size
remaining_data -= chunk_size remaining_data -= chunk_size
self.write_pos += chunk_size self.write_pos += chunk_size
def __enter__(self): def __enter__(self):
return self return self
def __exit__(self,exc_type,exc_value,traceback): def __exit__(self,exc_type,exc_value,traceback):
self.close() self.close()
@synchronize @synchronize
def flush(self): def flush(self):
return return
def seek(self, pos, where=fs.SEEK_SET): def seek(self, pos, where=fs.SEEK_SET):
# Ftp doesn't support a real seek, so we close the transfer and resume # Ftp doesn't support a real seek, so we close the transfer and resume
# it at the new position with the REST command # it at the new position with the REST command
# I'm not sure how reliable this method is! # I'm not sure how reliable this method is!
if not self.file_size: if not self.file_size:
raise ValueError("Seek only works with files open for read") raise ValueError("Seek only works with files open for read")
self._lock.acquire() self._lock.acquire()
try: try:
current = self.tell() current = self.tell()
new_pos = None new_pos = None
if where == fs.SEEK_SET: if where == fs.SEEK_SET:
...@@ -636,49 +636,51 @@ class _FTPFile(object): ...@@ -636,49 +636,51 @@ class _FTPFile(object):
new_pos = self.file_size + pos new_pos = self.file_size + pos
if new_pos < 0: if new_pos < 0:
raise ValueError("Can't seek before start of file") raise ValueError("Can't seek before start of file")
if self.conn is not None: if self.conn is not None:
self.conn.close() self.conn.close()
finally: finally:
self._lock.release() self._lock.release()
self.close() self.close()
self._lock.acquire() self._lock.acquire()
try: try:
self.ftp = self.ftpfs._open_ftp() self.ftp = self.ftpfs._open_ftp()
self.ftp.sendcmd('TYPE I') self.ftp.sendcmd('TYPE I')
self.ftp.sendcmd('REST %i' % (new_pos)) self.ftp.sendcmd('REST %i' % (new_pos))
self.__init__(self.ftpfs, self.ftp, _encode(self.path), self.mode) self.__init__(self.ftpfs, self.ftp, _encode(self.path), self.mode)
self.read_pos = new_pos self.read_pos = new_pos
finally: finally:
self._lock.release() self._lock.release()
#raise UnsupportedError('ftp seek') #raise UnsupportedError('ftp seek')
@synchronize @synchronize
def tell(self): def tell(self):
if 'r' in self.mode: if 'r' in self.mode:
return self.read_pos return self.read_pos
else: else:
return self.write_pos return self.write_pos
@synchronize @synchronize
def close(self): def close(self):
if self.conn is not None: if self.conn is not None:
self.conn.close() self.conn.close()
self.conn = None self.conn = None
self.ftp.voidresp() self.ftp.voidresp()
if self.ftp is not None: if self.ftp is not None:
self.ftp.close() self.ftp.close()
self.closed = True self.closed = True
if 'w' in self.mode or 'a' in self.mode:
self.ftpfs._on_file_written(self.path)
def __iter__(self): def __iter__(self):
return self.next() return self.next()
def next(self): def next(self):
""" Line iterator """ Line iterator
This isn't terribly efficient. It would probably be better to do This isn't terribly efficient. It would probably be better to do
a read followed by splitlines. a read followed by splitlines.
""" """
...@@ -686,13 +688,13 @@ class _FTPFile(object): ...@@ -686,13 +688,13 @@ class _FTPFile(object):
chars = [] chars = []
while True: while True:
char = self.read(1) char = self.read(1)
if not char: if not char:
yield ''.join(chars) yield ''.join(chars)
del chars[:] del chars[:]
break break
chars.append(char) chars.append(char)
if char in endings: if char in endings:
line = ''.join(chars) line = ''.join(chars)
del chars[:] del chars[:]
c = self.read(1) c = self.read(1)
if not char: if not char:
...@@ -702,23 +704,31 @@ class _FTPFile(object): ...@@ -702,23 +704,31 @@ class _FTPFile(object):
yield line + c yield line + c
else: else:
yield line yield line
chars.append(c) chars.append(c)
def ftperrors(f): def ftperrors(f):
@wraps(f) @wraps(f)
def deco(self, *args, **kwargs): def deco(self, *args, **kwargs):
self._lock.acquire()
try: try:
ret = f(self, *args, **kwargs) self._enter_dircache()
except Exception, e: try:
#import traceback try:
#traceback.print_exc() ret = f(self, *args, **kwargs)
self._translate_exception(args[0] if args else '', e) except Exception, e:
self._translate_exception(args[0] if args else '', e)
finally:
self._leave_dircache()
finally:
self._lock.release()
if not self.use_dircache:
self.clear_dircache()
return ret return ret
return deco return deco
def _encode(s): def _encode(s):
if isinstance(s, unicode): if isinstance(s, unicode):
return s.encode('utf-8') return s.encode('utf-8')
...@@ -726,12 +736,12 @@ def _encode(s): ...@@ -726,12 +736,12 @@ def _encode(s):
class FTPFS(FS): class FTPFS(FS):
_locals = threading.local() _locals = threading.local()
def __init__(self, host='', user='', passwd='', acct='', timeout=_GLOBAL_DEFAULT_TIMEOUT, def __init__(self, host='', user='', passwd='', acct='', timeout=_GLOBAL_DEFAULT_TIMEOUT,
port=21, port=21,
dircache=False, dircache=True,
max_buffer_size=128*1024*1024): max_buffer_size=128*1024*1024):
""" """
:param host: :param host:
...@@ -739,30 +749,124 @@ class FTPFS(FS): ...@@ -739,30 +749,124 @@ class FTPFS(FS):
:param passwd: :param passwd:
:param timeout: :param timeout:
:param dircache: If True then directory information will be cached, :param dircache: If True then directory information will be cached,
which will speed up operations such as isdir and isfile, but changes which will speed up operations such as getinfo, isdi, isfile, but changes
to the ftp file structure will not be visible (till clear_dircache) is to the ftp file structure will not be visible untill clear_dircache is
called called
:param max_buffer_size: Number of bytes to hold before blocking write operations. :param max_buffer_size: Number of bytes to hold before blocking write operations.
""" """
super(FTPFS, self).__init__() super(FTPFS, self).__init__()
self.host = host self.host = host
self.port = port self.port = port
self.user = user self.user = user
self.passwd = passwd self.passwd = passwd
self.acct = acct self.acct = acct
self.timeout = timeout self.timeout = timeout
self._dircache = {}
self.use_dircache = dircache self.use_dircache = dircache
self.get_dircache()
self.max_buffer_size = max_buffer_size self.max_buffer_size = max_buffer_size
self._locals._ftp = None self._cache_hint = False
self._locals._ftp = None
self._thread_ftps = set() self._thread_ftps = set()
self.ftp self.ftp
@synchronize
def cache_hint(self, enabled):
self._cache_hint = enabled
@synchronize
def _enter_dircache(self):
self.get_dircache()
count = getattr(self._locals, '_dircache_count', 0)
count += 1
self._locals._dircache_count = count
@synchronize
def _leave_dircache(self):
self._locals._dircache_count -= 1
if not self._locals._dircache_count and not self._cache_hint:
self.clear_dircache()
assert self._locals._dircache_count >= 0, "dircache count should never be negative"
@synchronize
def get_dircache(self):
dircache = getattr(self._locals, '_dircache', None)
if dircache is None:
dircache = {}
self._locals._dircache = dircache
self._locals._dircache_count = 0
return dircache
@synchronize
def _on_file_written(self, path):
self.clear_dircache(dirname(path))
@synchronize
def _readdir(self, path):
dircache = self.get_dircache()
dircache_count = self._locals._dircache_count
if dircache_count:
cached_dirlist = dircache.get(path)
if cached_dirlist is not None:
return cached_dirlist
dirlist = {}
parser = FTPListDataParser()
def on_line(line):
#print repr(line)
if not isinstance(line, unicode):
line = line.decode('utf-8')
info = parser.parse_line(line)
if info:
info = info.__dict__
dirlist[info['name']] = info
try:
self.ftp.dir(_encode(path), on_line)
except error_reply:
pass
dircache[path] = dirlist
return dirlist
@synchronize
def clear_dircache(self, *paths):
"""
Clear cached directory information.
:path: Path of directory to clear cache for, or all directories if
None (the default)
"""
dircache = self.get_dircache()
if not paths:
dircache.clear()
else:
for path in paths:
dircache.pop(path, None)
@synchronize
def _check_path(self, path):
base, fname = pathsplit(abspath(path))
dirlist = self._readdir(base)
if fname and fname not in dirlist:
raise ResourceNotFoundError(path)
return dirlist, fname
def _get_dirlist(self, path):
base, fname = pathsplit(abspath(path))
dirlist = self._readdir(base)
return dirlist, fname
@synchronize @synchronize
def get_ftp(self): def get_ftp(self):
...@@ -771,6 +875,7 @@ class FTPFS(FS): ...@@ -771,6 +875,7 @@ class FTPFS(FS):
ftp = self._locals._ftp ftp = self._locals._ftp
self._thread_ftps.add(ftp) self._thread_ftps.add(ftp)
return self._locals._ftp return self._locals._ftp
@synchronize
def set_ftp(self, ftp): def set_ftp(self, ftp):
self._locals._ftp = ftp self._locals._ftp = ftp
ftp = property(get_ftp, set_ftp) ftp = property(get_ftp, set_ftp)
...@@ -780,11 +885,11 @@ class FTPFS(FS): ...@@ -780,11 +885,11 @@ class FTPFS(FS):
try: try:
ftp = FTP() ftp = FTP()
ftp.connect(self.host, self.port, self.timeout) ftp.connect(self.host, self.port, self.timeout)
ftp.login(self.user, self.passwd, self.acct) ftp.login(self.user, self.passwd, self.acct)
except socket_error, e: except socket_error, e:
raise RemoteConnectionError(str(e), details=e) raise RemoteConnectionError(str(e), details=e)
return ftp return ftp
def __getstate__(self): def __getstate__(self):
state = super(FTPFS, self).__getstate__() state = super(FTPFS, self).__getstate__()
del state["_thread_ftps"] del state["_thread_ftps"]
...@@ -792,33 +897,33 @@ class FTPFS(FS): ...@@ -792,33 +897,33 @@ class FTPFS(FS):
def __setstate__(self,state): def __setstate__(self,state):
super(FTPFS, self).__setstate__(state) super(FTPFS, self).__setstate__(state)
self._thread_ftps = set() self._thread_ftps = set()
self.ftp self.ftp
def __str__(self): def __str__(self):
return '<FTPFS %s>' % self.host return '<FTPFS %s>' % self.host
def __unicode__(self): def __unicode__(self):
return u'<FTPFS %s>' % self.host return u'<FTPFS %s>' % self.host
@convert_os_errors @convert_os_errors
def _translate_exception(self, path, exception): def _translate_exception(self, path, exception):
""" Translates exceptions that my be thrown by the ftp code in to """ Translates exceptions that my be thrown by the ftp code in to
FS exceptions FS exceptions
TODO: Flesh this out with more specific exceptions TODO: Flesh this out with more specific exceptions
""" """
if isinstance(exception, socket_error): if isinstance(exception, socket_error):
raise RemoteConnectionError(str(exception), details=exception) raise RemoteConnectionError(str(exception), details=exception)
elif isinstance(exception, error_temp): elif isinstance(exception, error_temp):
code, message = str(exception).split(' ', 1) code, message = str(exception).split(' ', 1)
raise RemoteConnectionError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception) raise RemoteConnectionError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception)
elif isinstance(exception, error_perm): elif isinstance(exception, error_perm):
code, message = str(exception).split(' ', 1) code, message = str(exception).split(' ', 1)
code = int(code) code = int(code)
...@@ -826,91 +931,35 @@ class FTPFS(FS): ...@@ -826,91 +931,35 @@ class FTPFS(FS):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception) raise PermissionDeniedError(str(exception), path=path, msg="FTP error: %s (see details)" % str(exception), details=exception)
raise exception raise exception
@ftperrors @ftperrors
@synchronize
def close(self): def close(self):
for ftp in self._thread_ftps: for ftp in self._thread_ftps:
ftp.close() ftp.close()
self._thread_ftps.clear()
self.closed = True self.closed = True
@ftperrors @ftperrors
@synchronize
def open(self, path, mode='r'): def open(self, path, mode='r'):
mode = mode.lower()
if 'r' in mode: if 'r' in mode:
if not self.isfile(path): if not self.isfile(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if 'w' in mode or 'a' in mode:
self.clear_dircache(dirname(path))
ftp = self._open_ftp() ftp = self._open_ftp()
f = _FTPFile(self, ftp, path, mode) f = _FTPFile(self, ftp, path, mode)
return f return f
@synchronize
def _readdir(self, path):
if self.use_dircache:
cached_dirlist = self._dircache.get(path)
if cached_dirlist is not None:
return cached_dirlist
dirlist = {}
parser = FTPListDataParser()
def on_line(line):
#print repr(line)
if not isinstance(line, unicode):
line = line.decode('utf-8')
info = parser.parse_line(line)
if info:
info = info.__dict__
dirlist[info['name']] = info
try:
self.ftp.dir(_encode(path), on_line)
except error_reply:
pass
self._dircache[path] = dirlist
return dirlist
@synchronize
def clear_dircache(self, path=None):
"""
Clear cached directory information.
:path: Path of directory to clear cache for, or all directories if
None (the default)
"""
if path is None:
self._dircache.clear()
if path in self._dircache:
del self._dircache[path]
@synchronize
@ftperrors
def _check_path(self, path, ignore_missing=False):
base, fname = pathsplit(abspath(path))
dirlist = self._readdir(base)
if fname and fname not in dirlist:
raise ResourceNotFoundError(path)
return dirlist, fname
def _get_dirlist(self, path):
base, fname = pathsplit(abspath(path))
dirlist = self._readdir(base)
return dirlist, fname
@synchronize
@ftperrors @ftperrors
def exists(self, path): def exists(self, path):
if path in ('', '/'): if path in ('', '/'):
return True return True
dirlist, fname = self._get_dirlist(path) dirlist, fname = self._get_dirlist(path)
return fname in dirlist return fname in dirlist
@synchronize
@ftperrors @ftperrors
def isdir(self, path): def isdir(self, path):
if path in ('', '/'): if path in ('', '/'):
...@@ -920,20 +969,18 @@ class FTPFS(FS): ...@@ -920,20 +969,18 @@ class FTPFS(FS):
if info is None: if info is None:
return False return False
return info['try_cwd'] return info['try_cwd']
@synchronize
@ftperrors @ftperrors
def isfile(self, path): def isfile(self, path):
if path in ('', '/'): if path in ('', '/'):
return False return False
dirlist, fname = self._get_dirlist(path) dirlist, fname = self._get_dirlist(path)
info = dirlist.get(fname) info = dirlist.get(fname)
if info is None: if info is None:
return False return False
return not info['try_cwd'] return not info['try_cwd']
@ftperrors @ftperrors
@synchronize
def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False): def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
path = normpath(path) path = normpath(path)
if not self.exists(path): if not self.exists(path):
...@@ -941,21 +988,21 @@ class FTPFS(FS): ...@@ -941,21 +988,21 @@ class FTPFS(FS):
if not self.isdir(path): if not self.isdir(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
paths = self._readdir(path).keys() paths = self._readdir(path).keys()
return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only) return self._listdir_helper(path, paths, wildcard, full, absolute, dirs_only, files_only)
@ftperrors @ftperrors
@synchronize
def makedir(self, path, recursive=False, allow_recreate=False): def makedir(self, path, recursive=False, allow_recreate=False):
if path in ('', '/'): if path in ('', '/'):
return return
def checkdir(path): def checkdir(path):
self.clear_dircache(dirname(path), path)
try: try:
self.ftp.mkd(_encode(path)) self.ftp.mkd(_encode(path))
except error_reply: except error_reply:
return return
except error_perm, e: except error_perm, e:
if recursive or allow_recreate: if recursive or allow_recreate:
return return
if str(e).split(' ', 1)[0]=='550': if str(e).split(' ', 1)[0]=='550':
...@@ -966,38 +1013,37 @@ class FTPFS(FS): ...@@ -966,38 +1013,37 @@ class FTPFS(FS):
for p in recursepath(path): for p in recursepath(path):
checkdir(p) checkdir(p)
else: else:
base, dirname = pathsplit(path) base = dirname(path)
if not self.exists(base): if not self.exists(base):
raise ParentDirectoryMissingError(path) raise ParentDirectoryMissingError(path)
if not allow_recreate: if not allow_recreate:
if self.exists(path): if self.exists(path):
if self.isfile(path): if self.isfile(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
raise DestinationExistsError(path) raise DestinationExistsError(path)
checkdir(path) checkdir(path)
@ftperrors @ftperrors
@synchronize
def remove(self, path): def remove(self, path):
if not self.exists(path): if not self.exists(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if not self.isfile(path): if not self.isfile(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
self.clear_dircache(dirname(path))
self.ftp.delete(_encode(path)) self.ftp.delete(_encode(path))
@ftperrors @ftperrors
@synchronize
def removedir(self, path, recursive=False, force=False): def removedir(self, path, recursive=False, force=False):
if not self.exists(path): if not self.exists(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
if self.isfile(path): if self.isfile(path):
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
if not force: if not force:
for checkpath in self.listdir(path): for checkpath in self.listdir(path):
raise DirectoryNotEmptyError(path) raise DirectoryNotEmptyError(path)
try: try:
if force: if force:
for rpath in self.listdir(path, full=True): for rpath in self.listdir(path, full=True):
...@@ -1008,6 +1054,7 @@ class FTPFS(FS): ...@@ -1008,6 +1054,7 @@ class FTPFS(FS):
self.removedir(rpath, force=force) self.removedir(rpath, force=force)
except FSError: except FSError:
pass pass
self.clear_dircache(dirname(path), path)
self.ftp.rmd(_encode(path)) self.ftp.rmd(_encode(path))
except error_reply: except error_reply:
pass pass
...@@ -1016,17 +1063,16 @@ class FTPFS(FS): ...@@ -1016,17 +1063,16 @@ class FTPFS(FS):
self.removedir(dirname(path), recursive=True) self.removedir(dirname(path), recursive=True)
except DirectoryNotEmptyError: except DirectoryNotEmptyError:
pass pass
@ftperrors @ftperrors
@synchronize
def rename(self, src, dst): def rename(self, src, dst):
self.clear_dircache(dirname(src), dirname(dst), src, dst)
try: try:
self.ftp.rename(_encode(src), _encode(dst)) self.ftp.rename(_encode(src), _encode(dst))
except error_reply: except error_reply:
pass pass
@ftperrors @ftperrors
@synchronize
def getinfo(self, path): def getinfo(self, path):
dirlist, fname = self._check_path(path) dirlist, fname = self._check_path(path)
if not fname: if not fname:
...@@ -1035,76 +1081,93 @@ class FTPFS(FS): ...@@ -1035,76 +1081,93 @@ class FTPFS(FS):
info['modified_time'] = datetime.datetime.fromtimestamp(info['mtime']) info['modified_time'] = datetime.datetime.fromtimestamp(info['mtime'])
info['created_time'] = info['modified_time'] info['created_time'] = info['modified_time']
return info return info
@ftperrors @ftperrors
@synchronize def getsize(self, path):
def getsize(self, path):
size = None
if self._locals._dircache_count:
dirlist, fname = self._check_path(path)
size = dirlist[fname].get('size')
if size is not None:
return size
self.ftp.sendcmd('TYPE I') self.ftp.sendcmd('TYPE I')
size = self.ftp.size(_encode(path)) size = self.ftp.size(_encode(path))
if size is None: if size is None:
dirlist, fname = self._check_path(path) dirlist, fname = self._check_path(path)
size = dirlist[fname].get('size') size = dirlist[fname].get('size')
if size is None: if size is None:
raise OperationFailedError('getsize', path) raise OperationFailedError('getsize', path)
return size return size
@ftperrors @ftperrors
@synchronize
def desc(self, path): def desc(self, path):
dirlist, fname = self._check_path(path) dirlist, fname = self._check_path(path)
if fname not in dirlist: if fname not in dirlist:
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
return dirlist[fname].get('raw_line', 'No description available') return dirlist[fname].get('raw_line', 'No description available')
@ftperrors @ftperrors
@synchronize
def move(self, src, dst, overwrite=False, chunk_size=16384): def move(self, src, dst, overwrite=False, chunk_size=16384):
if not overwrite and self.exists(dst): if not overwrite and self.exists(dst):
raise DestinationExistsError(dst) raise DestinationExistsError(dst)
try: self.clear_dircache(dirname(src), dirname(dst))
try:
self.rename(src, dst) self.rename(src, dst)
except error_reply: except error_reply:
pass pass
except: except:
self.copy(src, dst) self.copy(src, dst)
self.remove(src) self.remove(src)
@ftperrors
def movedir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
self.clear_dircache(src, dst, dirname(src), dirname(dst))
super(FTPFS, self).movedir(src, dst, overwrite, ignore_errors, chunk_size)
@ftperrors
def copydir(self, src, dst, overwrite=False, ignore_errors=False, chunk_size=16384):
self.clear_dircache(src, dst, dirname(src), dirname(dst))
super(FTPFS, self).copydir(src, dst, overwrite, ignore_errors, chunk_size)
if __name__ == "__main__": if __name__ == "__main__":
ftp_fs = FTPFS('ftp.ncsa.uiuc.edu') ftp_fs = FTPFS('ftp.ncsa.uiuc.edu')
#from fs.browsewin import browse ftp_fs.cache_hint(True)
#browse(ftp_fs) from fs.browsewin import browse
browse(ftp_fs)
ftp_fs = FTPFS('127.0.0.1', 'user', '12345', dircache=True)
#ftp_fs = FTPFS('127.0.0.1', 'user', '12345', dircache=True)
#f = ftp_fs.open('testout.txt', 'w') #f = ftp_fs.open('testout.txt', 'w')
#f.write("Testing writing to an ftp file!") #f.write("Testing writing to an ftp file!")
#f.write("\nHai!") #f.write("\nHai!")
#f.close() #f.close()
#ftp_fs.createfile(u"\N{GREEK CAPITAL LETTER KAPPA}", 'unicode!') #ftp_fs.createfile(u"\N{GREEK CAPITAL LETTER KAPPA}", 'unicode!')
#kappa = u"\N{GREEK CAPITAL LETTER KAPPA}" #kappa = u"\N{GREEK CAPITAL LETTER KAPPA}"
#ftp_fs.makedir(kappa) #ftp_fs.makedir(kappa)
#print repr(ftp_fs.listdir()) #print repr(ftp_fs.listdir())
#print repr(ftp_fs.listdir()) #print repr(ftp_fs.listdir())
#ftp_fs.makedir('a/b/c/d', recursive=True) #ftp_fs.makedir('a/b/c/d', recursive=True)
#print ftp_fs.getsize('/testout.txt') #print ftp_fs.getsize('/testout.txt')
#print f.read() #print f.read()
#for p in ftp_fs: #for p in ftp_fs:
# print p # print p
#from fs.utils import print_fs #from fs.utils import print_fs
#print_fs(ftp_fs) #print_fs(ftp_fs)
#print ftp_fs.getsize('test.txt') #print ftp_fs.getsize('test.txt')
from fs.browsewin import browse #from fs.browsewin import browse
browse(ftp_fs) #browse(ftp_fs)
\ No newline at end of file
...@@ -22,57 +22,53 @@ from fs import ftpfs ...@@ -22,57 +22,53 @@ from fs import ftpfs
ftp_port = 30000 ftp_port = 30000
class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases): class TestFTPFS(unittest.TestCase, FSTestCases, ThreadingTestCases):
def setUp(self): def setUp(self):
global ftp_port global ftp_port
#ftp_port += 1 #ftp_port += 1
use_port = str(ftp_port) use_port = str(ftp_port)
#ftp_port = 10000 #ftp_port = 10000
sys.setcheckinterval(1) sys.setcheckinterval(1)
self.temp_dir = tempfile.mkdtemp(u"ftpfstests") self.temp_dir = tempfile.mkdtemp(u"ftpfstests")
self.ftp_server = subprocess.Popen(['python', abspath(__file__), self.temp_dir, str(use_port)]) self.ftp_server = subprocess.Popen(['python', abspath(__file__), self.temp_dir, str(use_port)])
# Need to sleep to allow ftp server to start # Need to sleep to allow ftp server to start
time.sleep(.2) time.sleep(.2)
self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', port=use_port, timeout=5.0) self.fs = ftpfs.FTPFS('127.0.0.1', 'user', '12345', port=use_port, timeout=5.0)
def tearDown(self): def tearDown(self):
if sys.platform == 'win32': if sys.platform == 'win32':
import win32api import win32api
win32api.TerminateProcess(int(process._handle), -1) win32api.TerminateProcess(int(process._handle), -1)
else: else:
os.system('kill '+str(self.ftp_server.pid)) os.system('kill '+str(self.ftp_server.pid))
shutil.rmtree(self.temp_dir) shutil.rmtree(self.temp_dir)
def check(self, p): def check(self, p):
return os.path.exists(os.path.join(self.temp_dir, relpath(p))) return os.path.exists(os.path.join(self.temp_dir, relpath(p)))
if __name__ == "__main__": if __name__ == "__main__":
# Run an ftp server that exposes a given directory # Run an ftp server that exposes a given directory
import sys import sys
authorizer = ftpserver.DummyAuthorizer() authorizer = ftpserver.DummyAuthorizer()
authorizer.add_user("user", "12345", sys.argv[1], perm="elradfmw") authorizer.add_user("user", "12345", sys.argv[1], perm="elradfmw")
authorizer.add_anonymous(sys.argv[1]) authorizer.add_anonymous(sys.argv[1])
def nolog(*args): def nolog(*args):
pass pass
ftpserver.log = nolog ftpserver.log = nolog
ftpserver.logline = nolog ftpserver.logline = nolog
handler = ftpserver.FTPHandler handler = ftpserver.FTPHandler
handler.authorizer = authorizer handler.authorizer = authorizer
address = ("127.0.0.1", int(sys.argv[2])) address = ("127.0.0.1", int(sys.argv[2]))
#print address #print address
ftpd = ftpserver.FTPServer(address, handler)
ftpd.serve_forever()
ftpd = ftpserver.FTPServer(address, handler)
ftpd.serve_forever()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment