summaryrefslogtreecommitdiff
path: root/fs/expose
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2011-04-19 07:13:18 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2011-04-19 07:13:18 +0000
commit95cc2ab6317c260190c370347546d23cdd8268cd (patch)
treeb0ff95f6b35c46f67fd4ae6f68d0f9d684564e10 /fs/expose
parent37f7c843dd953ca62b4520c2cfe0a6e0be721da7 (diff)
downloadpyfilesystem-git-95cc2ab6317c260190c370347546d23cdd8268cd.tar.gz
Dokan: improve thread-safety of timeout-protection mechanism
Diffstat (limited to 'fs/expose')
-rw-r--r--fs/expose/dokan/__init__.py42
1 files changed, 28 insertions, 14 deletions
diff --git a/fs/expose/dokan/__init__.py b/fs/expose/dokan/__init__.py
index 424b5d5..40cc1b3 100644
--- a/fs/expose/dokan/__init__.py
+++ b/fs/expose/dokan/__init__.py
@@ -67,7 +67,7 @@ import subprocess
import cPickle
import datetime
import ctypes
-import Queue
+from collections import deque
from fs.base import threading
from fs.errors import *
@@ -197,7 +197,8 @@ def handle_fs_errors(func):
_TIMEOUT_PROTECT_THREAD = None
_TIMEOUT_PROTECT_LOCK = threading.Lock()
-_TIMEOUT_PROTECT_QUEUE = Queue.Queue()
+_TIMEOUT_PROTECT_COND = threading.Condition(_TIMEOUT_PROTECT_LOCK)
+_TIMEOUT_PROTECT_QUEUE = deque()
_TIMEOUT_PROTECT_WAIT_TIME = 4 * 60
_TIMEOUT_PROTECT_RESET_TIME = 5 * 60 * 1000
@@ -211,20 +212,30 @@ def _start_timeout_protect_thread():
global _TIMEOUT_PROTECT_THREAD
with _TIMEOUT_PROTECT_LOCK:
if _TIMEOUT_PROTECT_THREAD is None:
- def target():
- while True:
- (when,info,finished) = _TIMEOUT_PROTECT_QUEUE.get()
- if finished:
- continue
- now = time.time()
- wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when)
- time.sleep(wait_time)
- libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info)
- _TIMEOUT_PROTECT_QUEUE.put((now+wait_time,info,finished))
+ target = _run_timeout_protect_thread
_TIMEOUT_PROTECT_THREAD = threading.Thread(target=target)
_TIMEOUT_PROTECT_THREAD.daemon = True
_TIMEOUT_PROTECT_THREAD.start()
+def _run_timeout_protect_thread():
+ while True:
+ with _TIMEOUT_PROTECT_COND:
+ try:
+ (when,info,finished) = _TIMEOUT_PROTECT_QUEUE.popleft()
+ except IndexError:
+ _TIMEOUT_PROTECT_COND.wait()
+ continue
+ if finished:
+ continue
+ now = time.time()
+ wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when)
+ time.sleep(wait_time)
+ with _TIMEOUT_PROTECT_LOCK:
+ if finished:
+ continue
+ libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info)
+ _TIMEOUT_PROTECT_QUEUE.append((now+wait_time,info,finished))
+
def timeout_protect(func):
"""Method decorator to enable timeout protection during call.
@@ -239,10 +250,13 @@ def timeout_protect(func):
info = args[-1]
finished = []
try:
- _TIMEOUT_PROTECT_QUEUE.put((time.time(),info,finished))
+ with _TIMEOUT_PROTECT_COND:
+ _TIMEOUT_PROTECT_QUEUE.append((time.time(),info,finished))
+ _TIMEOUT_PROTECT_COND.notify()
return func(self,*args)
finally:
- finished.append(True)
+ with _TIMEOUT_PROTECT_LOCK:
+ finished.append(True)
return wrapper