diff options
author | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-04-14 01:01:02 +0000 |
---|---|---|
committer | rfkelly0 <rfkelly0@67cdc799-7952-0410-af00-57a81ceafa0f> | 2010-04-14 01:01:02 +0000 |
commit | 3710e72491ecb54cdc2e806a7167039a3a413918 (patch) | |
tree | ee068b73df933544da97f0f950b19cc14f48ac61 | |
parent | 8e112d4e3c1d156c11b6e41c53227b2e795c8041 (diff) | |
download | pyfilesystem-3710e72491ecb54cdc2e806a7167039a3a413918.tar.gz |
osfs/watch_win32: enqueue all IO operations from the same thread
git-svn-id: http://pyfilesystem.googlecode.com/svn/trunk@347 67cdc799-7952-0410-af00-57a81ceafa0f
-rw-r--r-- | fs/osfs/watch_win32.py | 57 |
1 files changed, 42 insertions, 15 deletions
diff --git a/fs/osfs/watch_win32.py b/fs/osfs/watch_win32.py index 47500e3..dacdcdd 100644 --- a/fs/osfs/watch_win32.py +++ b/fs/osfs/watch_win32.py @@ -15,6 +15,7 @@ import stat import struct import ctypes import ctypes.wintypes +import traceback from fs.errors import * from fs.path import * @@ -194,6 +195,7 @@ class WatchedDirectory(object): None) self.result = ctypes.create_string_buffer(1024) self.overlapped = overlapped = OVERLAPPED() + self.ready = threading.Event() def __del__(self): self.close() @@ -243,12 +245,15 @@ class WatchThread(threading.Thread): super(WatchThread,self).__init__() self.closed = False self.watched_directories = {} - self._iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,None,0,1) + self.ready = threading.Event() + self._iocp = None + self._new_watches = Queue.Queue() def close(self): if not self.closed: self.closed = True - PostQueuedCompletionStatus(self._iocp,0,1,None) + if self._iocp: + PostQueuedCompletionStatus(self._iocp,0,1,None) def add_watcher(self,callback,path,events,recursive): if os.path.isfile(path): @@ -330,28 +335,46 @@ class WatchThread(threading.Thread): def run(self): try: + self._iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE,None,0,1) + self.ready.set() nbytes = ctypes.wintypes.DWORD() iocpkey = ctypes.wintypes.DWORD() overlapped = OVERLAPPED() while not self.closed: - GetQueuedCompletionStatus(self._iocp, - ctypes.byref(nbytes), - ctypes.byref(iocpkey), - ctypes.byref(overlapped), - -1) - if iocpkey.value > 1: - w = self.watched_directories[iocpkey.value] - w.complete(nbytes.value) - w.post() + try: + GetQueuedCompletionStatus(self._iocp, + ctypes.byref(nbytes), + ctypes.byref(iocpkey), + ctypes.byref(overlapped), + -1) + except WindowsError: + traceback.print_exc() + else: + if iocpkey.value > 1: + w = self.watched_directories[iocpkey.value] + w.complete(nbytes.value) + w.post() + elif not self.closed: + try: + while True: + w = self._new_watches.get_nowait() + CreateIoCompletionPort(w.handle,self._iocp,hash(w),0) + w.post() + w.ready.set() + except Queue.Empty: + pass finally: + self.ready.set() for w in self.watched_directories.itervalues(): w.close() - CloseHandle(self._iocp) + if self._iocp: + CloseHandle(self._iocp) def attach_watched_directory(self,w): self.watched_directories[hash(w)] = w - CreateIoCompletionPort(w.handle,self._iocp,hash(w),0) - w.post() + self._new_watches.put(w) + PostQueuedCompletionStatus(self._iocp,0,1,None) + w.ready.wait() class OSFSWatchMixin(WatchableFSMixin): @@ -402,6 +425,7 @@ class OSFSWatchMixin(WatchableFSMixin): if self.__watch_thread is None: wt = WatchThread() wt.start() + wt.ready.wait() OSFSWatchMixin.__watch_thread = wt finally: self.__watch_lock.release() @@ -415,7 +439,10 @@ class OSFSWatchMixin(WatchableFSMixin): return if not force and OSFSWatchMixin.__watch_thread.watched_directories: return - OSFSWatchMixin.__watch_thread.close() + try: + OSFSWatchMixin.__watch_thread.close() + except EnvironmentError: + pass OSFSWatchMixin.__watch_thread = None finally: self.__watch_lock.release() |