Commit 7f1e44a3 by willmcgugan

Added fsmount command, made memroyfs work with threads

parent 91093534
......@@ -63,4 +63,8 @@
* Separated behaviour of setcontents and createfile
* Added a getmmap to base
* Added command line scripts fsls, fstree, fscat, fscp, fsmv
* Added command line scripts fsmkdir, fsmount
* Made automatically pick up keys if no other authentication is available
* Optimized listdir and listdirinfo in SFTPFS
* Made memoryfs work with threads
......@@ -149,9 +149,11 @@ class FS(object):
:param thread_synconize: If True, a lock object will be created for the object, otherwise a dummy lock will be used.
:type thread_synchronize: bool
"""
super(FS, self).__init__()
self.closed = False
self.thread_synchronize = thread_synchronize
if thread_synchronize:
self._lock = threading.RLock()
else:
......@@ -188,9 +190,10 @@ class FS(object):
return state
def __setstate__(self,state):
for (k,v) in state.iteritems():
self.__dict__[k] = v
lock = state.get("_lock",None)
self.__dict__.update(state)
#for (k,v) in state.iteritems():
# self.__dict__[k] = v
lock = state.get("_lock")
if lock is not None:
if lock:
self._lock = threading.RLock()
......
#!/usr/bin/env python
import sys
from fs.commands.fsmount import run
sys.exit(run())
#!/usr/bin/env python
from fs.opener import opener
from fs.commands.runner import Command
import sys
import platform
import os
import os.path
import time
class FSMount(Command):
usage = """fsmount [SYSTEM PATH] [FS]
or fsmount -u [SYSTEM PATH]
Mounts a file system on a system path"""
version = "1.0"
def get_optparse(self):
optparse = super(FSMount, self).get_optparse()
optparse.add_option('-f', '--foreground', dest='foreground', action="store_true", default=False,
help="run the mount process in the foreground", metavar="FOREGROUND")
optparse.add_option('-u', '--unmount', dest='unmount', action="store_true", default=False,
help="unmount path", metavar="UNMOUNT")
return optparse
def do_run(self, options, args):
if options.unmount:
try:
mount_path = args[0]
except IndexError:
self.error('Mount path required\n')
return 1
from fs.expose import fuse
fuse.unmount(mount_path)
return
try:
mount_path = args[0]
except IndexError:
self.error('Mount path required\n')
return 1
try:
fs_url = args[1]
except IndexError:
self.error('FS required\n')
return 1
if platform.system() == 'Windows':
pass
else:
fs, path = self.open_fs(fs_url, create=True)
if path:
if not fs.isdir(path):
self.error('%s is not a directory on %s' % (fs_url. fs))
return 1
fs = fs.opendir(path)
path = '/'
if not os.path.exists(mount_path):
os.makedirs(mount_path)
from fs.expose import fuse
if options.foreground:
fuse_process = fuse.mount(fs,
mount_path,
foreground=True)
else:
mp = fuse.mount(fs,
mount_path,
foreground=False)
def run():
return FSMount().run()
if __name__ == "__main__":
sys.exit(run())
\ No newline at end of file
......@@ -245,11 +245,14 @@ class FSOperations(Operations):
@handle_fs_errors
def readdir(self, path, fh=None):
path = path.decode(NATIVE_ENCODING)
entries = []
entries = ['.', '..']
#print
#print self.fs
for (nm,info) in self.fs.listdirinfo(path):
#print "*", repr(nm), info
self._fill_stat_dict(pathjoin(path,nm),info)
entries.append((nm.encode(NATIVE_ENCODING),info,0))
entries = [".",".."] + entries
entries.append((nm.encode(NATIVE_ENCODING),info,0))
#print
return entries
@handle_fs_errors
......@@ -472,7 +475,7 @@ def unmount(path):
return
if "not found" in stderr:
return
raise OSError("filesystem could not be unmounted: %s (%s) " % (path,stderr,))
raise OSError("filesystem could not be unmounted: %s (%s) " % (path, str(stderr).rstrip(),))
class MountProcess(subprocess.Popen):
......
......@@ -17,11 +17,9 @@ from fs.remote import RemoteFileBuffer
from ftplib import FTP, error_perm, error_temp, error_proto, error_reply
try:
from ftplib import _GLOBAL_DEFAULT_TIMEOUT
_FTPLIB_TIMEOUT = True
from ftplib import _GLOBAL_DEFAULT_TIMEOUT
except ImportError:
_GLOBAL_DEFAULT_TIMEOUT = None
_FTPLIB_TIMEOUT = False
_GLOBAL_DEFAULT_TIMEOUT = object()
import threading
from time import sleep
......@@ -934,7 +932,7 @@ class FTPFS(FS):
def _open_ftp(self):
try:
ftp = FTP()
if _FTPLIB_TIMEOUT:
if self.timeout is not _GLOBAL_DEFAULT_TIMEOUT:
ftp.connect(self.host, self.port, self.timeout)
else:
ftp.connect(self.host, self.port)
......
......@@ -17,6 +17,8 @@ from fs.base import *
from fs.errors import *
from fs import _thread_synchronize_default
from fs.filelike import StringIO
from os import SEEK_END
import threading
def _check_mode(mode, mode_chars):
......@@ -25,47 +27,46 @@ def _check_mode(mode, mode_chars):
return False
return True
class MemoryFile(object):
def __init__(self, path, memory_fs, value, mode):
def seek_and_lock(f):
def deco(self, *args, **kwargs):
try:
self.lock.acquire()
self.mem_file.seek(self.pos)
ret = f(self, *args, **kwargs)
self.pos = self.mem_file.tell()
return ret
finally:
self.lock.release()
return deco
def __init__(self, path, memory_fs, mem_file, mode, lock):
self.closed = False
self.path = path
self.memory_fs = memory_fs
self.mode = mode
value = value or ''
self.mem_file = None
if '+' in mode:
self.mem_file = StringIO()
self.mem_file.write(value)
self.mem_file.seek(0)
elif _check_mode(mode, 'wa'):
self.mem_file = StringIO()
self.mem_file.write(value)
elif _check_mode(mode, 'w'):
self.mem_file = StringIO()
elif _check_mode(mode, 'ra'):
self.mem_file = StringIO()
self.mem_file.write(value)
elif _check_mode(mode, 'r'):
self.mem_file = StringIO(value)
self.mem_file.seek(0)
elif _check_mode(mode, "a"):
self.mem_file = StringIO()
self.mem_file.write(value)
else:
if value:
self.mem_file = StringIO(value)
else:
self.mem_file = StringIO()
self.memory_fs = memory_fs
self.mem_file = mem_file
self.mode = mode
self.lock = lock
self.pos = 0
if _check_mode(mode, 'a'):
lock.acquire()
try:
self.mem_file.seek(0, SEEK_END)
self.pos = self.mem_file.tell()
finally:
lock.release()
if _check_mode(mode, 'w'):
lock.acquire()
try:
self.mem_file.seek(0)
self.mem_file.truncate()
finally:
lock.release()
assert self.mem_file is not None, "self.mem_file should have a value"
......@@ -82,44 +83,57 @@ class MemoryFile(object):
self.close()
def flush(self):
value = self.mem_file.getvalue()
self.memory_fs._on_flush_memory_file(self.path, value)
pass
def __iter__(self):
return iter(self.mem_file)
return self
def next(self):
@seek_and_lock
def next(self):
return self.mem_file.next()
@seek_and_lock
def readline(self, *args, **kwargs):
return self.mem_file.readline(*args, **kwargs)
#@seek_and_lock
def close(self):
if not self.closed and self.mem_file is not None:
value = self.mem_file.getvalue()
self.memory_fs._on_close_memory_file(self, self.path, value)
self.mem_file.close()
self.closed = True
self.memory_fs._on_close_memory_file(self, self.path)
self.closed = True
@seek_and_lock
def read(self, size=None):
if size is None:
size = -1
return self.mem_file.read(size)
def seek(self, *args, **kwargs):
@seek_and_lock
def seek(self, *args, **kwargs):
return self.mem_file.seek(*args, **kwargs)
@seek_and_lock
def tell(self):
return self.mem_file.tell()
return self.pos
@seek_and_lock
def truncate(self, *args, **kwargs):
return self.mem_file.truncate(*args, **kwargs)
def write(self, data):
#@seek_and_lock
def write(self, data):
self.memory_fs._on_modify_memory_file(self.path)
return self.mem_file.write(data)
def writelines(self, *args, **kwargs):
self.lock.acquire()
try:
self.mem_file.seek(self.pos)
self.mem_file.write(data)
self.pos = self.mem_file.tell()
finally:
self.lock.release()
@seek_and_lock
def writelines(self, *args, **kwargs):
return self.mem_file.writelines(*args, **kwargs)
def __enter__(self):
......@@ -132,6 +146,18 @@ class MemoryFile(object):
class DirEntry(object):
def sync(f):
def deco(self, *args, **kwargs):
if self.lock is not None:
try:
self.lock.acquire()
return f(self, *args, **kwargs)
finally:
self.lock.release()
else:
return f(self, *args, **kwargs)
return deco
def __init__(self, type, name, contents=None):
assert type in ("dir", "file"), "Type must be dir or file!"
......@@ -143,27 +169,32 @@ class DirEntry(object):
contents = {}
self.open_files = []
self.contents = contents
self.data = None
self.locks = 0
self.contents = contents
self.mem_file = None
self.created_time = datetime.datetime.now()
self.modified_time = self.created_time
self.accessed_time = self.created_time
self.xattrs = {}
def lock(self):
self.locks += 1
def unlock(self):
self.locks -= 1
assert self.locks >=0, "Lock / Unlock mismatch!"
self.lock = None
if self.type == 'file':
self.mem_file = StringIO()
self.lock = threading.RLock()
def get_value(self):
self.lock.acquire()
try:
return self.mem_file.getvalue()
finally:
self.lock.release()
data = property(get_value)
def desc_contents(self):
if self.isfile():
return "<file %s>" % self.name
elif self.isdir():
return "<dir %s>" % "".join( "%s: %s"% (k, v.desc_contents()) for k, v in self.contents.iteritems())
return "<dir %s>" % "".join( "%s: %s" % (k, v.desc_contents()) for k, v in self.contents.iteritems())
def isdir(self):
return self.type == "dir"
......@@ -171,12 +202,27 @@ class DirEntry(object):
def isfile(self):
return self.type == "file"
def islocked(self):
return self.locks > 0
def __str__(self):
return "%s: %s" % (self.name, self.desc_contents())
@sync
def __getstate__(self):
state = self.__dict__.copy()
state.pop('lock')
if self.mem_file is not None:
state['mem_file'] = self.data
return state
def __setstate__(self, state):
self.__dict__.update(state)
if self.type == 'file':
self.lock = threading.RLock()
else:
self.lock = None
if self.mem_file is not None:
data = self.mem_file
self.mem_file = StringIO()
self.mem_file.write(data)
class MemoryFS(FS):
......@@ -207,7 +253,7 @@ class MemoryFS(FS):
if not callable(self.file_factory):
raise ValueError("file_factory should be callable")
self.root = self._make_dir_entry('dir', 'root')
self.root = self._make_dir_entry('dir', 'root')
def __str__(self):
return "<MemoryFS>"
......@@ -219,6 +265,7 @@ class MemoryFS(FS):
@synchronize
def _get_dir_entry(self, dirpath):
dirpath = normpath(dirpath)
current_dir = self.root
for path_component in iteratepath(dirpath):
if current_dir.contents is None:
......@@ -247,27 +294,40 @@ class MemoryFS(FS):
@synchronize
def isdir(self, path):
dir_item = self._get_dir_entry(normpath(path))
path = normpath(path)
if path in ('', '/'):
return True
dir_item = self._get_dir_entry(path)
if dir_item is None:
return False
return dir_item.isdir()
@synchronize
def isfile(self, path):
dir_item = self._get_dir_entry(normpath(path))
path = normpath(path)
if path in ('', '/'):
return False
dir_item = self._get_dir_entry(path)
if dir_item is None:
return False
return dir_item.isfile()
@synchronize
def exists(self, path):
def exists(self, path):
path = normpath(path)
if path in ('', '/'):
return True
return self._get_dir_entry(path) is not None
@synchronize
def makedir(self, dirname, recursive=False, allow_recreate=False):
if not dirname and not allow_recreate:
raise PathError(dirname)
fullpath = dirname
fullpath = normpath(dirname)
if fullpath in ('', '/'):
if allow_recreate:
return
raise DestinationExistsError(dirname)
dirpath, dirname = pathsplit(dirname)
if recursive:
......@@ -318,25 +378,10 @@ class MemoryFS(FS):
parent_dir.contents[dirname] = self._make_dir_entry("dir", dirname)
@synchronize
def _orphan_files(self, file_dir_entry):
for f in file_dir_entry.open_files[:]:
f.close()
@synchronize
def _lock_dir_entry(self, path):
dir_entry = self._get_dir_entry(path)
dir_entry.lock()
@synchronize
def _unlock_dir_entry(self, path):
dir_entry = self._get_dir_entry(path)
dir_entry.unlock()
@synchronize
def _is_dir_locked(self, path):
dir_entry = self._get_dir_entry(path)
return dir_entry.islocked()
#@synchronize
#def _orphan_files(self, file_dir_entry):
# for f in file_dir_entry.open_files[:]:
# f.close()
@synchronize
def open(self, path, mode="r", **kwargs):
......@@ -353,14 +398,10 @@ class MemoryFS(FS):
file_dir_entry = parent_dir_entry.contents[filename]
if file_dir_entry.isdir():
raise ResourceInvalidError(path)
if 'a' in mode:
if file_dir_entry.islocked():
raise ResourceLockedError(path)
file_dir_entry.accessed_time = datetime.datetime.now()
self._lock_dir_entry(path)
mem_file = self.file_factory(path, self, file_dir_entry.data, mode)
mem_file = self.file_factory(path, self, file_dir_entry.mem_file, mode, file_dir_entry.lock)
file_dir_entry.open_files.append(mem_file)
return mem_file
......@@ -371,13 +412,9 @@ class MemoryFS(FS):
else:
file_dir_entry = parent_dir_entry.contents[filename]
if file_dir_entry.islocked():
raise ResourceLockedError(path)
file_dir_entry.accessed_time = datetime.datetime.now()
self._lock_dir_entry(path)
mem_file = self.file_factory(path, self, None, mode)
mem_file = self.file_factory(path, self, file_dir_entry.mem_file, mode, file_dir_entry.lock)
file_dir_entry.open_files.append(mem_file)
return mem_file
......@@ -391,10 +428,6 @@ class MemoryFS(FS):
if dir_entry is None:
raise ResourceNotFoundError(path)
if dir_entry.islocked():
self._orphan_files(dir_entry)
#raise ResourceLockedError(path)
if dir_entry.isdir():
raise ResourceInvalidError(path,msg="That's a directory, not a file: %(path)s")
......@@ -410,8 +443,6 @@ class MemoryFS(FS):
if dir_entry is None:
raise ResourceNotFoundError(path)
if dir_entry.islocked():
raise ResourceLockedError(path)
if not dir_entry.isdir():
raise ResourceInvalidError(path, msg="Can't remove resource, its not a directory: %(path)s" )
......@@ -471,17 +502,10 @@ class MemoryFS(FS):
return False
@synchronize
def _on_close_memory_file(self, open_file, path, value):
def _on_close_memory_file(self, open_file, path):
dir_entry = self._get_dir_entry(path)
if dir_entry is not None and value is not None:
dir_entry.data = value
dir_entry.open_files.remove(open_file)
self._unlock_dir_entry(path)
@synchronize
def _on_flush_memory_file(self, path, value):
dir_entry = self._get_dir_entry(path)
dir_entry.data = value
dir_entry.open_files.remove(open_file)
@synchronize
def _on_modify_memory_file(self, path):
......@@ -571,7 +595,7 @@ class MemoryFS(FS):
if dir_entry is None:
raise ResourceNotFoundError(path)
if not dir_entry.isfile():
raise ResourceInvalidError(path, msg="not a directory: %(path)s")
raise ResourceInvalidError(path, msg="not a file: %(path)s")
return dir_entry.data or ''
@synchronize
......@@ -584,7 +608,9 @@ class MemoryFS(FS):
dir_entry = self._get_dir_entry(path)
if not dir_entry.isfile():
raise ResourceInvalidError('Not a directory %(path)s', path)
dir_entry.data = data
new_mem_file = StringIO()
new_mem_file.write(data)
dir_entry.mem_file = new_mem_file
@synchronize
def setxattr(self, path, key, value):
......
......@@ -365,7 +365,7 @@ class TempOpener(Opener):
@classmethod
def get_fs(cls, registry, fs_name, fs_name_params, fs_path, writeable, create):
from fs.tempfs import TempFS
return TempFS(identifier=fs_name_params, temp_dir=fs_path, create=create), None
return TempFS(identifier=fs_name_params, temp_dir=fs_path), None
opener = OpenerRegistry([OSFSOpener,
......
......@@ -98,6 +98,7 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
super(OSFS, self).__init__(thread_synchronize=thread_synchronize)
self.encoding = encoding or sys.getfilesystemencoding()
self.dir_mode = dir_mode
root_path = os.path.expanduser(os.path.expandvars(root_path))
root_path = os.path.normpath(os.path.abspath(root_path))
# Enable long pathnames on win32
......@@ -319,4 +320,23 @@ class OSFS(OSFSXAttrMixin, OSFSWatchMixin, FS):
def getsize(self, path):
return self._stat(path).st_size
@convert_os_errors
def opendir(self, path):
"""A specialised opendir that returns another OSFS rather than a SubDir
This is more optimal than a SubDir because no path delegation is required.
"""
if path in ('', '/'):
return self
path = normpath(path)
if not self.exists(path):
raise ResourceNotFoundError(path)
sub_path = pathjoin(self.root_path, path)
return OSFS(sub_path,
thread_synchronize=self.thread_synchronize,
encoding=self.encoding,
create=False,
dir_mode=self.dir_mode)
......@@ -38,7 +38,7 @@ def re_raise_faults(func):
cls = _object_by_name(cls)
# Re-raise using the remainder of the fault code as message
if cls:
raise cls(msg=msg)
raise cls('', msg=msg)
raise f
except socket.error, e:
raise RemoteConnectionError(str(e), details=e)
......
......@@ -12,7 +12,8 @@ COMMANDS = ['fscat',
'fsrm',
'fsserve',
'fstree',
'fsmkdir']
'fsmkdir',
'fsmount']
classifiers = [
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment