Commit 56227fb8 by willmcgugan@gmail.com

Fix for xmlrpc

parent 2c113c7f
......@@ -41,23 +41,27 @@ class RPCFSInterface(object):
is base64-encoded UTF-8.
"""
if PY3:
return path
return path
return path.encode("utf8").encode("base64")
def decode_path(self, path):
"""Decode paths arriving over the wire."""
if PY3:
return path
return path.decode("base64").decode("utf8")
return path.decode("base64").decode("utf8")
def getmeta(self, meta_name):
meta = self.fs.getmeta(meta_name)
if isinstance(meta, basestring):
meta = meta.decode('base64')
return meta
def getmeta_default(self, meta_name, default):
meta = self.fs.getmeta(meta_name, default)
if isinstance(meta, basestring):
meta = meta.decode('base64')
return meta
def hasmeta(self, meta_name):
return self.fs.hasmeta(meta_name)
......@@ -98,7 +102,7 @@ class RPCFSInterface(object):
def removedir(self, path, recursive=False, force=False):
path = self.decode_path(path)
return self.fs.removedir(path, recursive, force)
def rename(self, src, dst):
src = self.decode_path(src)
dst = self.decode_path(dst)
......@@ -109,12 +113,12 @@ class RPCFSInterface(object):
if isinstance(accessed_time, xmlrpclib.DateTime):
accessed_time = datetime.strptime(accessed_time.value, "%Y%m%dT%H:%M:%S")
if isinstance(modified_time, xmlrpclib.DateTime):
modified_time = datetime.strptime(modified_time.value, "%Y%m%dT%H:%M:%S")
modified_time = datetime.strptime(modified_time.value, "%Y%m%dT%H:%M:%S")
return self.fs.settimes(path, accessed_time, modified_time)
def getinfo(self, path):
def getinfo(self, path):
path = self.decode_path(path)
info = self.fs.getinfo(path)
info = self.fs.getinfo(path)
return info
def desc(self, path):
......
......@@ -8,12 +8,12 @@ For example, lets say we have two filesystems containing config files and resour
[config_fs]
|-- config.cfg
`-- defaults.cfg
`-- defaults.cfg
[resources_fs]
|-- images
| |-- logo.jpg
| `-- photo.jpg
| `-- photo.jpg
`-- data.dat
We can combine these filesystems in to a single filesystem with the following code::
......@@ -31,7 +31,7 @@ This will create a single filesystem where paths under `config` map to `config_f
| `-- defaults.cfg
`-- resources
|-- images
| |-- logo.jpg
| |-- logo.jpg
| `-- photo.jpg
`-- data.dat
......@@ -39,7 +39,7 @@ Now both filesystems can be accessed with the same path structure::
print combined_fs.getcontents('/config/defaults.cfg')
read_jpg(combined_fs.open('/resources/images/logo.jpg')
"""
from fs.base import *
......@@ -51,14 +51,14 @@ from fs import _thread_synchronize_default
class DirMount(object):
def __init__(self, path, fs):
self.path = path
self.fs = fs
self.fs = fs
def __str__(self):
return "<DirMount %s, %s>" % (self.path, self.fs)
def __repr__(self):
return "<DirMount %s, %s>" % (self.path, self.fs)
def __unicode__(self):
return u"<DirMount %s, %s>" % (self.path, self.fs)
......@@ -77,7 +77,7 @@ class MountFS(FS):
_meta = { 'virtual': True,
'read_only' : False,
'unicode_paths' : True,
'case_insensitive_paths' : False,
'case_insensitive_paths' : False,
}
DirMount = DirMount
......@@ -86,7 +86,7 @@ class MountFS(FS):
def __init__(self, auto_close=True, thread_synchronize=_thread_synchronize_default):
self.auto_close = auto_close
super(MountFS, self).__init__(thread_synchronize=thread_synchronize)
self.mount_tree = PathMap()
self.mount_tree = PathMap()
def __str__(self):
return "<%s [%s]>" % (self.__class__.__name__,self.mount_tree.items(),)
......@@ -128,11 +128,11 @@ class MountFS(FS):
def close(self):
# Explicitly closes children if requested
if self.auto_close:
for mount in self.mount_tree.itervalues():
for mount in self.mount_tree.itervalues():
mount.fs.close()
# Free references (which may incidently call the close method of the child filesystems)
self.mount_tree.clear()
super(MountFS, self).close()
self.mount_tree.clear()
super(MountFS, self).close()
def getsyspath(self, path, allow_none=False):
fs, _mount_path, delegate_path = self._delegate(path)
......@@ -142,7 +142,7 @@ class MountFS(FS):
else:
raise NoSysPathError(path=path)
return fs.getsyspath(delegate_path, allow_none=allow_none)
def getpathurl(self, path, allow_none=False):
fs, _mount_path, delegate_path = self._delegate(path)
if fs is self or fs is None:
......@@ -160,7 +160,7 @@ class MountFS(FS):
return "Mount dir"
else:
return "Mounted file"
return "Mounted dir, maps to path %s on %s" % (delegate_path, str(fs))
return "Mounted dir, maps to path %s on %s" % (delegate_path or '/', str(fs))
@synchronize
def isdir(self, path):
......@@ -283,7 +283,7 @@ class MountFS(FS):
if not delegate_path:
if allow_recreate:
return
else:
else:
raise DestinationExistsError(path, msg="Can not create a directory that already exists (try allow_recreate=True): %(path)s")
return fs.makedir(delegate_path, recursive=recursive, allow_recreate=allow_recreate)
......@@ -396,9 +396,9 @@ class MountFS(FS):
@synchronize
def mountdir(self, path, fs):
"""Mounts a host FS object on a given path.
:param path: A path within the MountFS
:param fs: A filesystem object to mount
:param fs: A filesystem object to mount
"""
path = abspath(normpath(path))
......@@ -408,11 +408,11 @@ class MountFS(FS):
@synchronize
def mountfile(self, path, open_callable=None, info_callable=None):
"""Mounts a single file path.
:param path: A path within the MountFS
:param open_callable: A callable that returns a file-like object
:param info_callable: A callable that returns a dictionary with information regarding the file-like object
"""
self.mount_tree[path] = MountFS.FileMount(path, callable, info_callable)
......
......@@ -23,22 +23,22 @@ from six import PY3, b
def re_raise_faults(func):
"""Decorator to re-raise XML-RPC faults as proper exceptions."""
def wrapper(*args,**kwds):
def wrapper(*args,**kwds):
try:
return func(*args,**kwds)
except xmlrpclib.Fault, f:
except (xmlrpclib.Fault), f:
# Make sure it's in a form we can handle
bits = f.faultString.split(" ")
bits = f.faultString.split(" ")
if bits[0] not in ["<type","<class"]:
raise f
# Find the class/type object
bits = " ".join(bits[1:]).split(">:")
cls = bits[0]
msg = ">:".join(bits[1:])
cls = cls.strip('\'')
cls = bits[0]
msg = ">:".join(bits[1:])
cls = cls.strip('\'')
cls = _object_by_name(cls)
# Re-raise using the remainder of the fault code as message
if cls:
if cls:
if issubclass(cls,FSError):
raise cls('', msg=msg)
else:
......@@ -66,7 +66,7 @@ def _object_by_name(name,root=None):
return _object_by_name(".".join(bits[1:]),obj)
else:
return obj
class ReRaiseFaults:
"""XML-RPC proxy wrapper that re-raises Faults as proper Exceptions."""
......@@ -94,9 +94,9 @@ class RPCFS(FS):
"""
_meta = {'thread_safe' : True,
'virtual': False,
'network' : True,
_meta = {'thread_safe' : True,
'virtual': False,
'network' : True,
}
def __init__(self, uri, transport=None):
......@@ -105,30 +105,30 @@ class RPCFS(FS):
The only required argument is the URI of the server to connect
to. This will be passed to the underlying XML-RPC server proxy
object, along with the 'transport' argument if it is provided.
:param uri: address of the server
:param uri: address of the server
"""
super(RPCFS, self).__init__(thread_synchronize=True)
self.uri = uri
self._transport = transport
self.proxy = self._make_proxy()
self.proxy = self._make_proxy()
self.isdir('/')
@synchronize
def _make_proxy(self):
kwds = dict(allow_none=True, use_datetime=True)
if self._transport is not None:
proxy = xmlrpclib.ServerProxy(self.uri,self._transport,**kwds)
else:
proxy = xmlrpclib.ServerProxy(self.uri,**kwds)
proxy = xmlrpclib.ServerProxy(self.uri,**kwds)
return ReRaiseFaults(proxy)
def __str__(self):
return '<RPCFS: %s>' % (self.uri,)
def __repr__(self):
return '<RPCFS: %s>' % (self.uri,)
......@@ -140,10 +140,10 @@ class RPCFS(FS):
except KeyError:
pass
return state
def __setstate__(self, state):
super(RPCFS, self).__setstate__(state)
self.proxy = self._make_proxy()
super(RPCFS, self).__setstate__(state)
self.proxy = self._make_proxy()
def encode_path(self, path):
"""Encode a filesystem path for sending over the wire.
......@@ -154,23 +154,27 @@ class RPCFS(FS):
"""
if PY3:
return path
return path.encode("utf8").encode("base64")
return path.encode("utf8").encode("base64")
def decode_path(self, path):
"""Decode paths arriving over the wire."""
if PY3:
return path
return path.decode("base64").decode("utf8")
return path
return path.decode("base64").decode("utf8")
@synchronize
def getmeta(self, meta_name, default=NoDefaultMeta):
if default is NoDefaultMeta:
return self.proxy.getmeta(meta_name)
if default is NoDefaultMeta:
meta = self.proxy.getmeta(meta_name)
else:
return self.proxy.getmeta_default(meta_name, default)
@synchronize
def hasmeta(self, meta_name):
meta = self.proxy.getmeta_default(meta_name, default)
if isinstance(meta, basestring):
# To allow transport of meta with invalid xml chars (like null)
meta = meta.encode('base64')
return meta
@synchronize
def hasmeta(self, meta_name):
return self.proxy.hasmeta(meta_name)
@synchronize
......@@ -197,7 +201,7 @@ class RPCFS(FS):
f.seek(0,2)
oldflush = f.flush
oldclose = f.close
oldtruncate = f.truncate
oldtruncate = f.truncate
def newflush():
self._lock.acquire()
try:
......@@ -219,7 +223,7 @@ class RPCFS(FS):
f.flush()
finally:
self._lock.release()
f.flush = newflush
f.close = newclose
f.truncate = newtruncate
......@@ -241,7 +245,7 @@ class RPCFS(FS):
return self.proxy.isfile(path)
@synchronize
def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
def listdir(self, path="./", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
enc_path = self.encode_path(path)
if not callable(wildcard):
entries = self.proxy.listdir(enc_path,wildcard,full,absolute,
......@@ -272,7 +276,7 @@ class RPCFS(FS):
def removedir(self, path, recursive=False, force=False):
path = self.encode_path(path)
return self.proxy.removedir(path,recursive,force)
@synchronize
def rename(self, src, dst):
src = self.encode_path(src)
......@@ -286,7 +290,7 @@ class RPCFS(FS):
@synchronize
def getinfo(self, path):
path = self.encode_path(path)
path = self.encode_path(path)
return self.proxy.getinfo(path)
@synchronize
......
......@@ -13,12 +13,20 @@ import shutil
import fs.tests
from fs.path import *
from fs.contrib import archivefs
try:
from fs.contrib import archivefs
except ImportError:
libarchive_available = False
else:
libarchive_available = True
from six import PY3, b
class TestReadArchiveFS(unittest.TestCase):
__test__ = libarchive_available
def setUp(self):
self.temp_filename = "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(6))+".zip"
self.temp_filename = os.path.join(tempfile.gettempdir(), self.temp_filename)
......
......@@ -24,99 +24,7 @@ from fs.expose.xmlrpc import RPCFSServer
import six
from six import PY3, b
class TestRPCFS(unittest.TestCase, FSTestCases, ThreadingTestCases):
def makeServer(self,fs,addr):
return RPCFSServer(fs,addr,logRequests=False)
def startServer(self):
port = 3000
self.temp_fs = TempFS()
self.server = None
self.serve_more_requests = True
self.server_thread = threading.Thread(target=self.runServer)
self.server_thread.setDaemon(True)
self.start_event = threading.Event()
self.end_event = threading.Event()
self.server_thread.start()
self.start_event.wait()
def runServer(self):
"""Run the server, swallowing shutdown-related execptions."""
port = 3000
while not self.server:
try:
self.server = self.makeServer(self.temp_fs,("127.0.0.1",port))
except socket.error, e:
if e.args[1] == "Address already in use":
port += 1
else:
raise
self.server_addr = ("127.0.0.1", port)
self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# if sys.platform != "win32":
# try:
# self.server.socket.settimeout(1)
# except socket.error:
# pass
#
self.start_event.set()
try:
#self.server.serve_forever()
while self.serve_more_requests:
self.server.handle_request()
except Exception, e:
pass
self.end_event.set()
def setUp(self):
self.startServer()
self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr)
def tearDown(self):
self.serve_more_requests = False
#self.server.socket.close()
# self.server.socket.shutdown(socket.SHUT_RDWR)
# self.server.socket.close()
# self.temp_fs.close()
#self.server_thread.join()
#self.end_event.wait()
#return
try:
self.bump()
self.server.server_close()
except Exception:
pass
#self.server_thread.join()
self.temp_fs.close()
def bump(self):
host, port = self.server_addr
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, cn, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(.1)
sock.connect(sa)
sock.send(b("\n"))
except socket.error, e:
pass
finally:
if sock is not None:
sock.close()
from fs.tests.test_rpcfs import TestRPCFS
try:
from fs import sftpfs
......@@ -125,7 +33,7 @@ except ImportError:
if not PY3:
raise
class TestSFTPFS(TestRPCFS):
__test__ = not PY3
def makeServer(self,fs,addr):
......@@ -134,14 +42,6 @@ class TestSFTPFS(TestRPCFS):
def setUp(self):
self.startServer()
self.fs = sftpfs.SFTPFS(self.server_addr, no_auth=True)
#def runServer(self):
# self.server.serve_forever()
#
#def tearDown(self):
# self.server.shutdown()
# self.server_thread.join()
# self.temp_fs.close()
def bump(self):
# paramiko doesn't like being bumped, just wait for it to timeout.
......@@ -158,7 +58,7 @@ else:
class TestFUSE(unittest.TestCase,FSTestCases,ThreadingTestCases):
def setUp(self):
self.temp_fs = TempFS()
self.temp_fs = TempFS()
self.temp_fs.makedir("root")
self.temp_fs.makedir("mount")
self.mounted_fs = self.temp_fs.opendir("root")
......
import unittest
import sys
import os, os.path
import socket
import threading
import time
from fs.tests import FSTestCases, ThreadingTestCases
from fs.tempfs import TempFS
from fs.osfs import OSFS
from fs.memoryfs import MemoryFS
from fs.path import *
from fs.errors import *
from fs import rpcfs
from fs.expose.xmlrpc import RPCFSServer
import six
from six import PY3, b
class TestRPCFS(unittest.TestCase, FSTestCases, ThreadingTestCases):
def makeServer(self,fs,addr):
return RPCFSServer(fs,addr,logRequests=False)
def startServer(self):
port = 3000
self.temp_fs = TempFS()
self.server = None
self.serve_more_requests = True
self.server_thread = threading.Thread(target=self.runServer)
self.server_thread.setDaemon(True)
self.start_event = threading.Event()
self.end_event = threading.Event()
self.server_thread.start()
self.start_event.wait()
def runServer(self):
"""Run the server, swallowing shutdown-related execptions."""
port = 3000
while not self.server:
try:
self.server = self.makeServer(self.temp_fs,("127.0.0.1",port))
except socket.error, e:
if e.args[1] == "Address already in use":
port += 1
else:
raise
self.server_addr = ("127.0.0.1", port)
self.server.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.start_event.set()
try:
#self.server.serve_forever()
while self.serve_more_requests:
self.server.handle_request()
except Exception, e:
pass
self.end_event.set()
def setUp(self):
self.startServer()
self.fs = rpcfs.RPCFS("http://%s:%d" % self.server_addr)
def tearDown(self):
self.serve_more_requests = False
try:
self.bump()
self.server.server_close()
except Exception:
pass
#self.server_thread.join()
self.temp_fs.close()
def bump(self):
host, port = self.server_addr
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, cn, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
sock.settimeout(.1)
sock.connect(sa)
sock.send(b("\n"))
except socket.error, e:
pass
finally:
if sock is not None:
sock.close()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment