Commit 1e313d27 by rfkelly0

dokan: implement file locking and unlocking

        use a single thread for managing timeout protection
parent 6e554e75
......@@ -54,6 +54,8 @@ systems with Dokan installed.
# Copyright (c) 2009-2010, Cloud Matrix Pty. Ltd.
# All rights reserved; available under the terms of the MIT License.
from __future__ import with_statement
import sys
import os
......@@ -65,6 +67,7 @@ import subprocess
import pickle
import datetime
import ctypes
import Queue
from fs.base import threading
from fs.errors import *
......@@ -87,7 +90,7 @@ import logging
logger = logging.getLogger("fs.expose.dokan")
# Options controlling the behaiour of the Dokan filesystem
# Options controlling the behaviour of the Dokan filesystem
DOKAN_OPTION_DEBUG = 1
DOKAN_OPTION_STDERR = 2
DOKAN_OPTION_ALT_STREAM = 4
......@@ -132,6 +135,18 @@ FILE_GENERIC_WRITE = 1179926
REQ_GENERIC_READ = 0x80 | 0x08 | 0x01
REQ_GENERIC_WRITE = 0x004 | 0x0100 | 0x002 | 0x0010
ERROR_ACCESS_DENIED = 5
ERROR_LOCK_VIOLATION = 33
ERROR_NOT_SUPPORTED = 50
ERROR_FILE_EXISTS = 80
ERROR_DIR_NOT_EMPTY = 145
ERROR_NOT_LOCKED = 158
ERROR_LOCK_FAILED = 167
ERROR_ALREADY_EXISTS = 183
ERROR_LOCKED = 212
ERROR_INVALID_LOCK_RANGE = 306
# Some useful per-process global information
NATIVE_ENCODING = sys.getfilesystemencoding()
......@@ -181,36 +196,64 @@ def handle_fs_errors(func):
return wrapper
# During long-running operations, Dokan requires that the DokanResetTimeout
# function be called periodically to indicate the progress is still being
# made. Unfortunately we don't have any facility for the underlying FS
# to make these calls for us, so we have to hack around it.
#
# The idea is to use a single background thread to monitor all active Dokan
# method calls, resetting the timeout until they have completed. Note that
# this completely undermines the point of DokanResetTimeout as it's now
# possible for a deadlock to hang the entire filesystem.
_TIMEOUT_PROTECT_THREAD = None
_TIMEOUT_PROTECT_LOCK = threading.Lock()
_TIMEOUT_PROTECT_QUEUE = Queue.Queue()
_TIMEOUT_PROTECT_WAIT_TIME = 4 * 60
_TIMEOUT_PROTECT_RESET_TIME = 5 * 60 * 1000
def start_timeout_protect_thread():
"""Start the background thread used to protect dokan from timeouts.
This function starts the background thread that monitors calls into the
dokan API and resets their timeouts. It's safe to call this more than
once, only a single thread will be started.
"""
global _TIMEOUT_PROTECT_THREAD
with _TIMEOUT_PROTECT_LOCK:
if _TIMEOUT_PROTECT_THREAD is None:
def target():
while True:
(when,info,finished) = _TIMEOUT_PROTECT_QUEUE.get()
if finished:
continue
now = time.time()
wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when)
time.sleep(wait_time)
libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info)
_TIMEOUT_PROTECT_QUEUE.put((now+wait_time,info,finished))
_TIMEOUT_PROTECT_THREAD = threading.Thread(target=target)
_TIMEOUT_PROTECT_THREAD.daemon = True
_TIMEOUT_PROTECT_THREAD.start()
def timeout_protect(func):
"""Method decorator to enable timeout protection during call.
During long-running operations, Dokan requires that the DokanResetTimeout
function be called periodically to indicate the progress is still being
made. Unfortunately we don't have an facility for the underlying FS
to make these calls for us, so we have to hack around it.
The idea is to use a single background thread to monitor all active Dokan
method calls, checking that they haven't deadlocked and resetting the
appropriate timeout.
This decorator adds an entry to the timeout protect queue before executing
the function, and marks it as finished when the function exits.
"""
@wraps(func)
def wrapper(self,*args):
if _TIMEOUT_PROTECT_THREAD is None:
start_timeout_protect_thread()
info = args[-1]
finished = threading.Event()
def reset_timeout_callback():
while not finished.isSet():
finished._Event__cond.acquire()
try:
if not finished.isSet():
libdokan.DokanResetTimeout(5*60*1000,info)
finally:
finished._Event__cond.release()
finished.wait(timeout=4*60)
threading.Thread(target=reset_timeout_callback).start()
finished = []
try:
_TIMEOUT_PROTECT_QUEUE.put((time.time(),info,finished))
return func(self,*args)
finally:
finished.set()
finished.append(True)
return wrapper
......@@ -221,40 +264,43 @@ class FSOperations(object):
def __init__(self, fs, fsname="Dokan FS", volname="Dokan Volume"):
if libdokan is None:
raise OSError("dokan library (http://dokan-dev.net/en/) is not available")
msg = "dokan library (http://dokan-dev.net/en/) is not available"
raise OSError(msg)
self.fs = fs
self.fsname = fsname
self.volname = volname
self._files_by_handle = {}
self._files_lock = threading.Lock()
self._next_handle = MIN_FH
# Windows requires us to implement a kind of "lazy deletion", where
# a handle is marked for deletion but this is not actually done
# until the handle is closed. This set monitors pending deletes.
self._pending_delete = set()
# Since pyfilesystem has no locking API, we manage file locks
# in memory. This maps paths to a list of current locks.
self._active_locks = PathMap()
# Dokan expects a succesful write() to be reflected in the file's
# reported size, but the FS might buffer writes and prevent this.
# We explicitly keep track of the size Dokan expects a file to be.
# This dict is indexed by path, then file handle.
self._files_size_written = {}
self._files_size_written = PathMap()
def get_ops_struct(self):
"""Get a DOKAN_OPERATIONS struct mapping to our methods."""
struct = libdokan.DOKAN_OPERATIONS()
for (nm,typ) in libdokan.DOKAN_OPERATIONS._fields_:
try:
setattr(struct,nm,typ(getattr(self,nm)))
except AttributeError:
# TODO: leaving these null makes funny things happen...
raise
# This bizarre syntax creates a NULL function pointer.
setattr(struct,nm,typ())
setattr(struct,nm,typ(getattr(self,nm)))
return struct
def _get_file(self, fh):
"""Get the information associated with the given file handle."""
try:
return self._files_by_handle[fh]
except KeyError:
raise FSError("invalid file handle")
def _reg_file(self, f, path):
"""Register a new file handle for the given file and path."""
self._files_lock.acquire()
try:
fh = self._next_handle
......@@ -269,6 +315,11 @@ class FSOperations(object):
self._files_lock.release()
def _rereg_file(self, fh, f):
"""Re-register the file handle for the given file.
This might be necessary if we are required to write to a file
after its handle was closed (e.g. to complete an async write).
"""
self._files_lock.acquire()
try:
(f2,path,lock) = self._files_by_handle[fh]
......@@ -279,6 +330,7 @@ class FSOperations(object):
self._files_lock.release()
def _del_file(self, fh):
"""Unregister the given file handle."""
self._files_lock.acquire()
try:
(f,path,lock) = self._files_by_handle.pop(fh)
......@@ -288,12 +340,41 @@ class FSOperations(object):
finally:
self._files_lock.release()
def _is_pending_delete(self,path):
def _is_pending_delete(self, path):
"""Check if the given path is pending deletion.
This is true if the path or any of its parents have been marked
as pending deletion, false otherwise.
"""
for ppath in recursepath(path):
if ppath in self._pending_delete:
return True
return False
def _check_lock(self, path, offset, length, info, locks=None):
"""Check whether the given file range is locked.
This method implements basic lock checking. It checks all the locks
held against the given file, and if any overlap the given byte range
then it returns -ERROR_LOCKED. If the range is not locked, it will
return zero.
"""
if locks is None:
with self._files_lock:
try:
locks = self._active_locks[path]
except KeyError:
return 0
for (lh,lstart,lend) in locks:
if info is not None and info.contents.Context == lf:
continue
if lstart >= offset + length:
continue
if lend < offset:
continue
return -ERROR_LOCKED
return 0
@timeout_protect
@handle_fs_errors
def CreateFile(self, path, access, sharing, disposition, flags, info):
......@@ -332,7 +413,7 @@ class FSOperations(object):
mode = "w+b"
elif disposition == CREATE_NEW:
if self.fs.exists(path):
return -1 * ERROR_ALREADY_EXISTS
return -ERROR_ALREADY_EXISTS
mode = "w+b"
else:
mode = "r+b"
......@@ -416,6 +497,9 @@ class FSOperations(object):
(file,_,lock) = self._get_file(info.contents.Context)
lock.acquire()
try:
errno = self._check_lock(path,offset,nBytesToRead,info)
if errno:
return errno
# This may be called after Cleanup, meaning we
# need to re-open the file.
if file.closed:
......@@ -436,6 +520,9 @@ class FSOperations(object):
(file,_,lock) = self._get_file(fh)
lock.acquire()
try:
errno = self._check_lock(path,offset,nBytesToWrite,info)
if errno:
return errno
# This may be called after Cleanup, meaning we
# need to re-open the file.
if file.closed:
......@@ -503,7 +590,6 @@ class FSOperations(object):
@timeout_protect
@handle_fs_errors
def FindFilesWithPattern(self, path, pattern, fillFindData, info):
print "FIND FILES WITH PATTERN", path, pattern
path = normpath(path)
infolist = []
for (nm,finfo) in self.fs.listdirinfo(path):
......@@ -622,14 +708,39 @@ class FSOperations(object):
pass
@handle_fs_errors
def LockFile(self, path, byteOffset, length, info):
# TODO: implement this using in-memory locking
pass
def LockFile(self, path, offset, length, info):
end = offset + length
with self._files_lock:
try:
locks = self._active_locks[path]
except KeyError:
locks = self._active_locks[path] = []
else:
errno = self._check_lock(path,offset,length,None,locks)
if errno:
return errno
locks.append((info.contents.Context,offset,end))
return 0
@handle_fs_errors
def UnlockFile(self, path, byteOffset, length, info):
# TODO: implement this using in-memory locking
pass
end = offset + length
with self._files_lock:
try:
locks = self._active_locks[path]
except KeyError:
return -ERROR_NOT_LOCKED
todel = []
for i,(lh,lstart,lend) in enumerate(locks):
if info.contents.Context == lh:
if lstart == offset:
if lend == offset + length:
todel.append(i)
if not todel:
return -ERROR_NOT_LOCKED
for i in reversed(todel):
del locks[i]
return 0
@handle_fs_errors
def Unmount(self, info):
......@@ -708,12 +819,6 @@ def _datetime2filetime(dtime):
return _timestamp2filetime(_datetime2timestamp(dtime))
ERROR_FILE_EXISTS = 80
ERROR_DIR_NOT_EMPTY = 145
ERROR_NOT_SUPPORTED = 50
ERROR_ACCESS_DENIED = 5
ERROR_ALREADY_EXISTS = 183
def _errno2syserrcode(eno):
"""Convert an errno into a win32 system error code."""
if eno == errno.EEXIST:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment