summaryrefslogtreecommitdiff
path: root/fs
diff options
context:
space:
mode:
authorrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2010-04-14 01:01:02 +0000
committerrfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f>2010-04-14 01:01:02 +0000
commit4145986771825b3d4efd46d7ab7d2b616271dfcf (patch)
treeee068b73df933544da97f0f950b19cc14f48ac61 /fs
parentc2c878d8c3852de03bacfdc8b454688081879782 (diff)
downloadpyfilesystem-git-4145986771825b3d4efd46d7ab7d2b616271dfcf.tar.gz
osfs/watch_win32: enqueue all IO operations from the same thread
Diffstat (limited to 'fs')
-rw-r--r--fs/osfs/watch_win32.py57
1 files changed, 42 insertions, 15 deletions
diff --git a/fs/osfs/watch_win32.py b/fs/osfs/watch_win32.py
index 47500e3..dacdcdd 100644
--- a/fs/osfs/watch_win32.py
+++ b/fs/osfs/watch_win32.py
@@ -15,6 +15,7 @@ import stat
import struct
import ctypes
import ctypes.wintypes
+import traceback
from fs.errors import *
from fs.path import *
@@ -194,6 +195,7 @@ class WatchedDirectory(object):
None)
self.result = ctypes.create_string_buffer(1024)
self.overlapped = overlapped = OVERLAPPED()
+ self.ready = threading.Event()
def __del__(self):
self.close()
@@ -243,12 +245,15 @@ class WatchThread(threading.Thread):
super(WatchThread,self).__init__()
self.closed = False
self.watched_directories = {}
- self._iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,None,0,1)
+ self.ready = threading.Event()
+ self._iocp = None
+ self._new_watches = Queue.Queue()
def close(self):
if not self.closed:
self.closed = True
- PostQueuedCompletionStatus(self._iocp,0,1,None)
+ if self._iocp:
+ PostQueuedCompletionStatus(self._iocp,0,1,None)
def add_watcher(self,callback,path,events,recursive):
if os.path.isfile(path):
@@ -330,28 +335,46 @@ class WatchThread(threading.Thread):
def run(self):
try:
+ self._iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,None,0,1)
+ self.ready.set()
nbytes = ctypes.wintypes.DWORD()
iocpkey = ctypes.wintypes.DWORD()
overlapped = OVERLAPPED()
while not self.closed:
- GetQueuedCompletionStatus(self._iocp,
- ctypes.byref(nbytes),
- ctypes.byref(iocpkey),
- ctypes.byref(overlapped),
- -1)
- if iocpkey.value > 1:
- w = self.watched_directories[iocpkey.value]
- w.complete(nbytes.value)
- w.post()
+ try:
+ GetQueuedCompletionStatus(self._iocp,
+ ctypes.byref(nbytes),
+ ctypes.byref(iocpkey),
+ ctypes.byref(overlapped),
+ -1)
+ except WindowsError:
+ traceback.print_exc()
+ else:
+ if iocpkey.value > 1:
+ w = self.watched_directories[iocpkey.value]
+ w.complete(nbytes.value)
+ w.post()
+ elif not self.closed:
+ try:
+ while True:
+ w = self._new_watches.get_nowait()
+ CreateIoCompletionPort(w.handle,self._iocp,hash(w),0)
+ w.post()
+ w.ready.set()
+ except Queue.Empty:
+ pass
finally:
+ self.ready.set()
for w in self.watched_directories.itervalues():
w.close()
- CloseHandle(self._iocp)
+ if self._iocp:
+ CloseHandle(self._iocp)
def attach_watched_directory(self,w):
self.watched_directories[hash(w)] = w
- CreateIoCompletionPort(w.handle,self._iocp,hash(w),0)
- w.post()
+ self._new_watches.put(w)
+ PostQueuedCompletionStatus(self._iocp,0,1,None)
+ w.ready.wait()
class OSFSWatchMixin(WatchableFSMixin):
@@ -402,6 +425,7 @@ class OSFSWatchMixin(WatchableFSMixin):
if self.__watch_thread is None:
wt = WatchThread()
wt.start()
+ wt.ready.wait()
OSFSWatchMixin.__watch_thread = wt
finally:
self.__watch_lock.release()
@@ -415,7 +439,10 @@ class OSFSWatchMixin(WatchableFSMixin):
return
if not force and OSFSWatchMixin.__watch_thread.watched_directories:
return
- OSFSWatchMixin.__watch_thread.close()
+ try:
+ OSFSWatchMixin.__watch_thread.close()
+ except EnvironmentError:
+ pass
OSFSWatchMixin.__watch_thread = None
finally:
self.__watch_lock.release()