Commit 41459867 by rfkelly0

osfs/watch_win32: enqueue all IO operations from the same thread

parent c2c878d8
......@@ -15,6 +15,7 @@ import stat
import struct
import ctypes
import ctypes.wintypes
import traceback
from fs.errors import *
from fs.path import *
......@@ -194,6 +195,7 @@ class WatchedDirectory(object):
None)
self.result = ctypes.create_string_buffer(1024)
self.overlapped = overlapped = OVERLAPPED()
self.ready = threading.Event()
def __del__(self):
self.close()
......@@ -243,12 +245,15 @@ class WatchThread(threading.Thread):
super(WatchThread,self).__init__()
self.closed = False
self.watched_directories = {}
self._iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,None,0,1)
self.ready = threading.Event()
self._iocp = None
self._new_watches = Queue.Queue()
def close(self):
if not self.closed:
self.closed = True
PostQueuedCompletionStatus(self._iocp,0,1,None)
if self._iocp:
PostQueuedCompletionStatus(self._iocp,0,1,None)
def add_watcher(self,callback,path,events,recursive):
if os.path.isfile(path):
......@@ -330,28 +335,46 @@ class WatchThread(threading.Thread):
def run(self):
try:
self._iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,None,0,1)
self.ready.set()
nbytes = ctypes.wintypes.DWORD()
iocpkey = ctypes.wintypes.DWORD()
overlapped = OVERLAPPED()
while not self.closed:
GetQueuedCompletionStatus(self._iocp,
ctypes.byref(nbytes),
ctypes.byref(iocpkey),
ctypes.byref(overlapped),
-1)
if iocpkey.value > 1:
w = self.watched_directories[iocpkey.value]
w.complete(nbytes.value)
w.post()
try:
GetQueuedCompletionStatus(self._iocp,
ctypes.byref(nbytes),
ctypes.byref(iocpkey),
ctypes.byref(overlapped),
-1)
except WindowsError:
traceback.print_exc()
else:
if iocpkey.value > 1:
w = self.watched_directories[iocpkey.value]
w.complete(nbytes.value)
w.post()
elif not self.closed:
try:
while True:
w = self._new_watches.get_nowait()
CreateIoCompletionPort(w.handle,self._iocp,hash(w),0)
w.post()
w.ready.set()
except Queue.Empty:
pass
finally:
self.ready.set()
for w in self.watched_directories.itervalues():
w.close()
CloseHandle(self._iocp)
if self._iocp:
CloseHandle(self._iocp)
def attach_watched_directory(self,w):
self.watched_directories[hash(w)] = w
CreateIoCompletionPort(w.handle,self._iocp,hash(w),0)
w.post()
self._new_watches.put(w)
PostQueuedCompletionStatus(self._iocp,0,1,None)
w.ready.wait()
class OSFSWatchMixin(WatchableFSMixin):
......@@ -402,6 +425,7 @@ class OSFSWatchMixin(WatchableFSMixin):
if self.__watch_thread is None:
wt = WatchThread()
wt.start()
wt.ready.wait()
OSFSWatchMixin.__watch_thread = wt
finally:
self.__watch_lock.release()
......@@ -415,7 +439,10 @@ class OSFSWatchMixin(WatchableFSMixin):
return
if not force and OSFSWatchMixin.__watch_thread.watched_directories:
return
OSFSWatchMixin.__watch_thread.close()
try:
OSFSWatchMixin.__watch_thread.close()
except EnvironmentError:
pass
OSFSWatchMixin.__watch_thread = None
finally:
self.__watch_lock.release()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment