diff options
author | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2011-04-19 07:13:18 +0000 |
---|---|---|
committer | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2011-04-19 07:13:18 +0000 |
commit | 95cc2ab6317c260190c370347546d23cdd8268cd (patch) | |
tree | b0ff95f6b35c46f67fd4ae6f68d0f9d684564e10 /fs/expose | |
parent | 37f7c843dd953ca62b4520c2cfe0a6e0be721da7 (diff) | |
download | pyfilesystem-git-95cc2ab6317c260190c370347546d23cdd8268cd.tar.gz |
Dokan: improve thread-safety of timeout-protection mechanism
Diffstat (limited to 'fs/expose')
-rw-r--r-- | fs/expose/dokan/__init__.py | 42 |
1 files changed, 28 insertions, 14 deletions
diff --git a/fs/expose/dokan/__init__.py b/fs/expose/dokan/__init__.py index 424b5d5..40cc1b3 100644 --- a/fs/expose/dokan/__init__.py +++ b/fs/expose/dokan/__init__.py @@ -67,7 +67,7 @@ import subprocess import cPickle import datetime import ctypes -import Queue +from collections import deque from fs.base import threading from fs.errors import * @@ -197,7 +197,8 @@ def handle_fs_errors(func): _TIMEOUT_PROTECT_THREAD = None _TIMEOUT_PROTECT_LOCK = threading.Lock() -_TIMEOUT_PROTECT_QUEUE = Queue.Queue() +_TIMEOUT_PROTECT_COND = threading.Condition(_TIMEOUT_PROTECT_LOCK) +_TIMEOUT_PROTECT_QUEUE = deque() _TIMEOUT_PROTECT_WAIT_TIME = 4 * 60 _TIMEOUT_PROTECT_RESET_TIME = 5 * 60 * 1000 @@ -211,20 +212,30 @@ def _start_timeout_protect_thread(): global _TIMEOUT_PROTECT_THREAD with _TIMEOUT_PROTECT_LOCK: if _TIMEOUT_PROTECT_THREAD is None: - def target(): - while True: - (when,info,finished) = _TIMEOUT_PROTECT_QUEUE.get() - if finished: - continue - now = time.time() - wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when) - time.sleep(wait_time) - libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info) - _TIMEOUT_PROTECT_QUEUE.put((now+wait_time,info,finished)) + target = _run_timeout_protect_thread _TIMEOUT_PROTECT_THREAD = threading.Thread(target=target) _TIMEOUT_PROTECT_THREAD.daemon = True _TIMEOUT_PROTECT_THREAD.start() +def _run_timeout_protect_thread(): + while True: + with _TIMEOUT_PROTECT_COND: + try: + (when,info,finished) = _TIMEOUT_PROTECT_QUEUE.popleft() + except IndexError: + _TIMEOUT_PROTECT_COND.wait() + continue + if finished: + continue + now = time.time() + wait_time = max(0,_TIMEOUT_PROTECT_WAIT_TIME - now + when) + time.sleep(wait_time) + with _TIMEOUT_PROTECT_LOCK: + if finished: + continue + libdokan.DokanResetTimeout(_TIMEOUT_PROTECT_RESET_TIME,info) + _TIMEOUT_PROTECT_QUEUE.append((now+wait_time,info,finished)) + def timeout_protect(func): """Method decorator to enable timeout protection during call. @@ -239,10 +250,13 @@ def timeout_protect(func): info = args[-1] finished = [] try: - _TIMEOUT_PROTECT_QUEUE.put((time.time(),info,finished)) + with _TIMEOUT_PROTECT_COND: + _TIMEOUT_PROTECT_QUEUE.append((time.time(),info,finished)) + _TIMEOUT_PROTECT_COND.notify() return func(self,*args) finally: - finished.append(True) + with _TIMEOUT_PROTECT_LOCK: + finished.append(True) return wrapper |