Commit c11e1f68 by rfkelly0

got latest unittests passing on Windows

parent 0173c6c7
...@@ -180,6 +180,10 @@ def convert_os_errors(func): ...@@ -180,6 +180,10 @@ def convert_os_errors(func):
raise ResourceInvalidError(path,opname=opname,details=e) raise ResourceInvalidError(path,opname=opname,details=e)
if e.errno == errno.EINVAL: if e.errno == errno.EINVAL:
raise ResourceInvalidError(path,opname=opname,details=e) raise ResourceInvalidError(path,opname=opname,details=e)
# Sometimes windows gives some random errors...
if sys.platform == "win32":
if e.errno in (13,):
raise ResourceInvalidError(path,opname=opname,details=e)
raise OperationFailedError(opname,details=e) raise OperationFailedError(opname,details=e)
return wrapper return wrapper
......
...@@ -56,7 +56,10 @@ from fs.base import flags_to_mode, threading ...@@ -56,7 +56,10 @@ from fs.base import flags_to_mode, threading
from fs.errors import * from fs.errors import *
from fs.path import * from fs.path import *
import fuse_ctypes as fuse try:
import fuse_ctypes as fuse
except NotImplementedError:
raise ImportError("FUSE found but not usable")
try: try:
fuse._libfuse.fuse_get_context fuse._libfuse.fuse_get_context
except AttributeError: except AttributeError:
......
...@@ -67,6 +67,7 @@ class FSTestCases: ...@@ -67,6 +67,7 @@ class FSTestCases:
self.check("test1.txt") self.check("test1.txt")
f = self.fs.open("test1.txt","r") f = self.fs.open("test1.txt","r")
self.assertEquals(f.read(),"test file overwrite") self.assertEquals(f.read(),"test file overwrite")
f.close()
def test_isdir_isfile(self): def test_isdir_isfile(self):
self.assertFalse(self.fs.exists("dir1")) self.assertFalse(self.fs.exists("dir1"))
......
...@@ -42,7 +42,8 @@ class TestRPCFS(unittest.TestCase,FSTestCases,ThreadingTestCases): ...@@ -42,7 +42,8 @@ class TestRPCFS(unittest.TestCase,FSTestCases,ThreadingTestCases):
def runServer(self): def runServer(self):
"""Run the server, swallowing shutdown-related execptions.""" """Run the server, swallowing shutdown-related execptions."""
self.server.socket.settimeout(0.1) if sys.platform != "win32":
self.server.socket.settimeout(0.1)
try: try:
while self.serve_more_requests: while self.serve_more_requests:
self.server.handle_request() self.server.handle_request()
...@@ -97,29 +98,33 @@ class TestSFTPFS(TestRPCFS): ...@@ -97,29 +98,33 @@ class TestSFTPFS(TestRPCFS):
pass pass
from fs.expose import fuse try:
from fs.osfs import OSFS from fs.expose import fuse
class TestFUSE(unittest.TestCase,FSTestCases,ThreadingTestCases): except ImportError:
pass
def setUp(self): else:
self.temp_fs = TempFS() from fs.osfs import OSFS
self.temp_fs.makedir("root") class TestFUSE(unittest.TestCase,FSTestCases,ThreadingTestCases):
self.temp_fs.makedir("mount")
self.mounted_fs = self.temp_fs.opendir("root") def setUp(self):
self.mount_point = self.temp_fs.getsyspath("mount") self.temp_fs = TempFS()
self.fs = OSFS(self.temp_fs.getsyspath("mount")) self.temp_fs.makedir("root")
self.mount_proc = fuse.mount(self.mounted_fs,self.mount_point) self.temp_fs.makedir("mount")
self.mounted_fs = self.temp_fs.opendir("root")
def tearDown(self): self.mount_point = self.temp_fs.getsyspath("mount")
self.mount_proc.unmount() self.fs = OSFS(self.temp_fs.getsyspath("mount"))
try: self.mount_proc = fuse.mount(self.mounted_fs,self.mount_point)
self.temp_fs.close()
except OSError: def tearDown(self):
# Sometimes FUSE hangs onto the mountpoint if mount_proc is self.mount_proc.unmount()
# forcibly killed. Shell out to fusermount to make sure. try:
fuse.unmount(self.mount_point) self.temp_fs.close()
self.temp_fs.close() except OSError:
# Sometimes FUSE hangs onto the mountpoint if mount_proc is
def check(self,p): # forcibly killed. Shell out to fusermount to make sure.
return self.mounted_fs.exists(p) fuse.unmount(self.mount_point)
self.temp_fs.close()
def check(self,p):
return self.mounted_fs.exists(p)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment