Commit 90309475 by rfkelly0

fix some multi-threading-related issues in fs.expose.dokan

parent 2fc00a6b
...@@ -130,6 +130,10 @@ DATETIME_STARTUP = datetime.datetime.utcnow() ...@@ -130,6 +130,10 @@ DATETIME_STARTUP = datetime.datetime.utcnow()
FILETIME_UNIX_EPOCH = 116444736000000000 FILETIME_UNIX_EPOCH = 116444736000000000
def _debug(*args):
#print args; sys.stdout.flush()
pass
def handle_fs_errors(func): def handle_fs_errors(func):
"""Method decorator to report FS errors in the appropriate way. """Method decorator to report FS errors in the appropriate way.
...@@ -143,22 +147,22 @@ def handle_fs_errors(func): ...@@ -143,22 +147,22 @@ def handle_fs_errors(func):
func = convert_fs_errors(func) func = convert_fs_errors(func)
@wraps(func) @wraps(func)
def wrapper(*args,**kwds): def wrapper(*args,**kwds):
#print "CALL", name, args[1:-1]; sys.stdout.flush() _debug("CALL",name,args[1:-1])
try: try:
res = func(*args,**kwds) res = func(*args,**kwds)
except OSError, e: except OSError, e:
#print "ERR", name, e _debug("ERR",name,e)
if e.errno: if e.errno:
res = -1 * _errno2syserrcode(e.errno) res = -1 * _errno2syserrcode(e.errno)
else: else:
res = -1 res = -1
except Exception, e: except Exception, e:
#print "ERR", name, e _debug("ERR",name,e)
raise raise
else: else:
if res is None: if res is None:
res = 0 res = 0
#print "RES", name, res; sys.stdout.flush() _debug("RES",name,res)
return res return res
return wrapper return wrapper
...@@ -177,6 +181,7 @@ class FSOperations(DokanOperations): ...@@ -177,6 +181,7 @@ class FSOperations(DokanOperations):
self._files_by_handle = {} self._files_by_handle = {}
self._files_lock = threading.Lock() self._files_lock = threading.Lock()
self._next_handle = MIN_FH self._next_handle = MIN_FH
self._pending_delete = set()
# TODO: do we need this for dokan? It's a hangover from FUSE. # TODO: do we need this for dokan? It's a hangover from FUSE.
# Dokan expects a succesful write() to be reflected in the file's # Dokan expects a succesful write() to be reflected in the file's
# reported size, but the FS might buffer writes and prevent this. # reported size, but the FS might buffer writes and prevent this.
...@@ -214,14 +219,22 @@ class FSOperations(DokanOperations): ...@@ -214,14 +219,22 @@ class FSOperations(DokanOperations):
finally: finally:
self._files_lock.release() self._files_lock.release()
def _is_pending_delete(self,path):
for ppath in recursepath(path):
if ppath in self._pending_delete:
return True
return False
def Unmount(self, info): def Unmount(self, info):
raise ValueError("HERE")
if self._on_unmount: if self._on_unmount:
self._on_unmount() self._on_unmount()
@handle_fs_errors @handle_fs_errors
def CreateFile(self, path, access, sharing, disposition, flags, info): def CreateFile(self, path, access, sharing, disposition, flags, info):
path = normpath(path) path = normpath(path)
# Can't open files that are pending delete.
if self._is_pending_delete(path):
return -ERROR_ACCESS_DENIED
# If no access rights are requestsed, only basic metadata is queried. # If no access rights are requestsed, only basic metadata is queried.
if not access: if not access:
if self.fs.isdir(path): if self.fs.isdir(path):
...@@ -291,6 +304,8 @@ class FSOperations(DokanOperations): ...@@ -291,6 +304,8 @@ class FSOperations(DokanOperations):
@handle_fs_errors @handle_fs_errors
def OpenDirectory(self, path, info): def OpenDirectory(self, path, info):
path = normpath(path) path = normpath(path)
if self._is_pending_delete(path):
raise ResourceNotFoundError(path)
if not self.fs.isdir(path): if not self.fs.isdir(path):
if not self.fs.exists(path): if not self.fs.exists(path):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
...@@ -301,35 +316,34 @@ class FSOperations(DokanOperations): ...@@ -301,35 +316,34 @@ class FSOperations(DokanOperations):
@handle_fs_errors @handle_fs_errors
def CreateDirectory(self, path, info): def CreateDirectory(self, path, info):
path = normpath(path) path = normpath(path)
if self._is_pending_delete(path):
return -ERROR_ACCESS_DENIED
self.fs.makedir(path) self.fs.makedir(path)
info.contents.IsDirectory = True info.contents.IsDirectory = True
@handle_fs_errors @handle_fs_errors
def Cleanup(self, path, info): def Cleanup(self, path, info):
pass
@handle_fs_errors
def CloseFile(self, path, info):
path = normpath(path) path = normpath(path)
# We can't handle this in CloseFile because that's called async
# to the delete operation. This would let the file stick around
# for a brief period after removal, which is badness.
# Better option: keep a dict of deleted files, and refuse requests
# to CreateFile on them with ERROR_ACCESS_DENIED (this is apparently
# what windows does natively)
if info.contents.DeleteOnClose: if info.contents.DeleteOnClose:
assert path in self._pending_delete
if info.contents.IsDirectory: if info.contents.IsDirectory:
self.fs.removedir(path) self.fs.removedir(path)
self._pending_delete.remove(path)
else: else:
(file,_,lock) = self._get_file(info.contents.Context) (file,_,lock) = self._get_file(info.contents.Context)
lock.acquire() lock.acquire()
try: try:
file.close() file.close()
self.fs.remove(path) self.fs.remove(path)
self._pending_delete.remove(path)
self._del_file(info.contents.Context) self._del_file(info.contents.Context)
finally: finally:
lock.release() lock.release()
elif info.contents.Context >= MIN_FH:
@handle_fs_errors
def CloseFile(self, path, info):
path = normpath(path)
if info.contents.Context >= MIN_FH and not info.contents.DeleteOnClose:
(file,_,lock) = self._get_file(info.contents.Context) (file,_,lock) = self._get_file(info.contents.Context)
lock.acquire() lock.acquire()
try: try:
...@@ -438,14 +452,16 @@ class FSOperations(DokanOperations): ...@@ -438,14 +452,16 @@ class FSOperations(DokanOperations):
raise ResourceNotFoundError(path) raise ResourceNotFoundError(path)
else: else:
raise ResourceInvalidError(path) raise ResourceInvalidError(path)
# the actual delete takes place in self.Cleanup() self._pending_delete.add(path)
# the actual delete takes place in self.CloseFile()
@handle_fs_errors @handle_fs_errors
def DeleteDirectory(self, path, info): def DeleteDirectory(self, path, info):
path = normpath(path) path = normpath(path)
if self.fs.listdir(path): if self.fs.listdir(path):
raise DirectoryNotEmptyError(path) raise DirectoryNotEmptyError(path)
# the actual delete takes place in self.Cleanup() self._pending_delete.add(path)
# the actual delete takes place in self.CloseFile()
@handle_fs_errors @handle_fs_errors
def MoveFile(self, src, dst, overwrite, info): def MoveFile(self, src, dst, overwrite, info):
...@@ -569,6 +585,7 @@ assert d == _filetime2datetime(f) ...@@ -569,6 +585,7 @@ assert d == _filetime2datetime(f)
ERROR_FILE_EXISTS = 80 ERROR_FILE_EXISTS = 80
ERROR_DIR_NOT_EMPTY = 145 ERROR_DIR_NOT_EMPTY = 145
ERROR_DIR_NOT_SUPPORTED = 50 ERROR_DIR_NOT_SUPPORTED = 50
ERROR_ACCESS_DENIED = 5
def _errno2syserrcode(eno): def _errno2syserrcode(eno):
"""Convert an errno into a win32 system error code.""" """Convert an errno into a win32 system error code."""
...@@ -578,6 +595,8 @@ def _errno2syserrcode(eno): ...@@ -578,6 +595,8 @@ def _errno2syserrcode(eno):
return ERROR_DIR_NOT_EMPTY return ERROR_DIR_NOT_EMPTY
if eno == errno.ENOSYS: if eno == errno.ENOSYS:
return ERROR_NOT_SUPPORTED return ERROR_NOT_SUPPORTED
if eno == errno.EACCES:
return ERROR_ACCESS_DENIED
return eno return eno
...@@ -612,7 +631,7 @@ def mount(fs, drive, foreground=False, ready_callback=None, unmount_callback=Non ...@@ -612,7 +631,7 @@ def mount(fs, drive, foreground=False, ready_callback=None, unmount_callback=Non
try: try:
os.stat(drive+":\\") os.stat(drive+":\\")
except EnvironmentError: except EnvironmentError:
time.sleep(0.01) time.sleep(0.05)
else: else:
if mp and mp.poll() != None: if mp and mp.poll() != None:
raise OSError("dokan mount process exited prematurely") raise OSError("dokan mount process exited prematurely")
......
...@@ -149,7 +149,7 @@ else: ...@@ -149,7 +149,7 @@ else:
if self.drive > "Z": if self.drive > "Z":
raise RuntimeError("no free drive letters") raise RuntimeError("no free drive letters")
fs_to_mount = OSFS(self.temp_fs.getsyspath("/")) fs_to_mount = OSFS(self.temp_fs.getsyspath("/"))
self.mount_proc = dokan.mount(fs_to_mount,self.drive) self.mount_proc = dokan.mount(fs_to_mount,self.drive)#,flags=dokan.DOKAN_OPTION_DEBUG|dokan.DOKAN_OPTION_STDERR,numthreads=1)
self.fs = OSFS(self.mount_proc.path) self.fs = OSFS(self.mount_proc.path)
def tearDown(self): def tearDown(self):
...@@ -167,9 +167,6 @@ else: ...@@ -167,9 +167,6 @@ else:
self.mount_proc.terminate() self.mount_proc.terminate()
self.temp_fs.close() self.temp_fs.close()
def check(self,p):
return self.temp_fs.exists(p)
def test_remove(self): def test_remove(self):
self.fs.createfile("a.txt") self.fs.createfile("a.txt")
self.assertTrue(self.check("a.txt")) self.assertTrue(self.check("a.txt"))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment