Commit 95cc2ab6 by rfkelly0

Dokan: improve thread-safety of timeout-protection mechanism

parent 37f7c843
...@@ -67,7 +67,7 @@ import subprocess ...@@ -67,7 +67,7 @@ import subprocess
import cPickle import cPickle
import datetime import datetime
import ctypes import ctypes
import Queue from collections import deque
from fs.base import threading from fs.base import threading
from fs.errors import * from fs.errors import *
...@@ -197,7 +197,8 @@ def handle_fs_errors(func): ...@@ -197,7 +197,8 @@ def handle_fs_errors(func):
_TIMEOUT_PROTECT_THREAD = None _TIMEOUT_PROTECT_THREAD = None
_TIMEOUT_PROTECT_LOCK = threading.Lock() _TIMEOUT_PROTECT_LOCK = threading.Lock()
_TIMEOUT_PROTECT_QUEUE = Queue.Queue() _TIMEOUT_PROTECT_COND = threading.Condition(_TIMEOUT_PROTECT_LOCK)
_TIMEOUT_PROTECT_QUEUE = deque()
_TIMEOUT_PROTECT_WAIT_TIME = 4 * 60 _TIMEOUT_PROTECT_WAIT_TIME = 4 * 60
_TIMEOUT_PROTECT_RESET_TIME = 5 * 60 * 1000 _TIMEOUT_PROTECT_RESET_TIME = 5 * 60 * 1000
...@@ -211,20 +212,30 @@ def _start_timeout_protect_thread(): ...@@ -211,20 +212,30 @@ def _start_timeout_protect_thread():
global _TIMEOUT_PROTECT_THREAD global _TIMEOUT_PROTECT_THREAD
with _TIMEOUT_PROTECT_LOCK: with _TIMEOUT_PROTECT_LOCK:
if _TIMEOUT_PROTECT_THREAD is None: if _TIMEOUT_PROTECT_THREAD is None:
def target(): target = _run_timeout_protect_thread
while True:
(when,info,finished) = _TIMEOUT_PROTECT_QUEUE.get()
if finished:
continue
now = time.time()
wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when)
time.sleep(wait_time)
libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info)
_TIMEOUT_PROTECT_QUEUE.put((now+wait_time,info,finished))
_TIMEOUT_PROTECT_THREAD = threading.Thread(target=target) _TIMEOUT_PROTECT_THREAD = threading.Thread(target=target)
_TIMEOUT_PROTECT_THREAD.daemon = True _TIMEOUT_PROTECT_THREAD.daemon = True
_TIMEOUT_PROTECT_THREAD.start() _TIMEOUT_PROTECT_THREAD.start()
def _run_timeout_protect_thread():
while True:
with _TIMEOUT_PROTECT_COND:
try:
(when,info,finished) = _TIMEOUT_PROTECT_QUEUE.popleft()
except IndexError:
_TIMEOUT_PROTECT_COND.wait()
continue
if finished:
continue
now = time.time()
wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when)
time.sleep(wait_time)
with _TIMEOUT_PROTECT_LOCK:
if finished:
continue
libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info)
_TIMEOUT_PROTECT_QUEUE.append((now+wait_time,info,finished))
def timeout_protect(func): def timeout_protect(func):
"""Method decorator to enable timeout protection during call. """Method decorator to enable timeout protection during call.
...@@ -239,10 +250,13 @@ def timeout_protect(func): ...@@ -239,10 +250,13 @@ def timeout_protect(func):
info = args[-1] info = args[-1]
finished = [] finished = []
try: try:
_TIMEOUT_PROTECT_QUEUE.put((time.time(),info,finished)) with _TIMEOUT_PROTECT_COND:
_TIMEOUT_PROTECT_QUEUE.append((time.time(),info,finished))
_TIMEOUT_PROTECT_COND.notify()
return func(self,*args) return func(self,*args)
finally: finally:
finished.append(True) with _TIMEOUT_PROTECT_LOCK:
finished.append(True)
return wrapper return wrapper
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment